mirror of
https://github.com/openai/codex.git
synced 2026-05-16 09:12:54 +00:00
## Why We want terminal tool review analytics, but the reducer should not stamp review timing from its own wall clock. This PR plumbs review timing through the real protocol and app-server seams so downstream analytics can consume the emitter's timestamps directly. Guardian reviews keep their enriched `started_at` / `completed_at` analytics fields by deriving those legacy second-based values from the same protocol-native millisecond lifecycle timestamps, rather than sampling a separate analytics clock. ## What changed - add `started_at_ms` to user approval request payloads - add `started_at_ms` / `completed_at_ms` to guardian review notifications - preserve Guardian review `started_at` / `completed_at` enrichment from the protocol-native timing source - stamp typed `ServerResponse` analytics facts with app-server-observed `completed_at_ms` - thread the new timing fields through core, protocol, app-server, TUI, and analytics fixtures ## Verification - `cargo test -p codex-app-server outgoing_message --manifest-path codex-rs/Cargo.toml` - `cargo test -p codex-app-server-protocol guardian --manifest-path codex-rs/Cargo.toml` - `cargo test -p codex-tui guardian --manifest-path codex-rs/Cargo.toml` - `cargo test -p codex-analytics analytics_client_tests --manifest-path codex-rs/Cargo.toml` --- [//]: # (BEGIN SAPLING FOOTER) Stack created with [Sapling](https://sapling-scm.com). Best reviewed with [ReviewStack](https://reviewstack.dev/openai/codex/pull/21434). * #18748 * __->__ #21434 * #18747 * #17090 * #17089 * #20514
446 lines
16 KiB
Rust
446 lines
16 KiB
Rust
use super::*;
|
|
use crate::mcp_tool_call::MCP_TOOL_APPROVAL_DECLINE_SYNTHETIC;
|
|
use crate::mcp_tool_call::MCP_TOOL_APPROVAL_QUESTION_ID_PREFIX;
|
|
use async_channel::bounded;
|
|
use codex_protocol::config_types::ApprovalsReviewer;
|
|
use codex_protocol::models::NetworkPermissions;
|
|
use codex_protocol::models::ResponseItem;
|
|
use codex_protocol::protocol::AgentStatus;
|
|
use codex_protocol::protocol::AskForApproval;
|
|
use codex_protocol::protocol::EventMsg;
|
|
use codex_protocol::protocol::ExecApprovalRequestEvent;
|
|
use codex_protocol::protocol::GuardianAssessmentAction;
|
|
use codex_protocol::protocol::GuardianAssessmentStatus;
|
|
use codex_protocol::protocol::GuardianCommandSource;
|
|
use codex_protocol::protocol::McpInvocation;
|
|
use codex_protocol::protocol::RawResponseItemEvent;
|
|
use codex_protocol::protocol::ReviewDecision;
|
|
use codex_protocol::protocol::TurnAbortReason;
|
|
use codex_protocol::protocol::TurnAbortedEvent;
|
|
use codex_protocol::request_permissions::RequestPermissionProfile;
|
|
use codex_protocol::request_permissions::RequestPermissionsEvent;
|
|
use codex_protocol::request_permissions::RequestPermissionsResponse;
|
|
use codex_protocol::request_user_input::RequestUserInputAnswer;
|
|
use codex_protocol::request_user_input::RequestUserInputEvent;
|
|
use codex_protocol::request_user_input::RequestUserInputQuestion;
|
|
use core_test_support::PathBufExt;
|
|
use core_test_support::test_path_buf;
|
|
use pretty_assertions::assert_eq;
|
|
use std::collections::HashMap;
|
|
use std::sync::Arc;
|
|
use tokio::sync::Mutex;
|
|
use tokio::sync::watch;
|
|
use tokio::time::timeout;
|
|
|
|
#[tokio::test]
|
|
async fn forward_events_cancelled_while_send_blocked_shuts_down_delegate() {
|
|
let (tx_events, rx_events) = bounded(1);
|
|
let (tx_sub, rx_sub) = bounded(SUBMISSION_CHANNEL_CAPACITY);
|
|
let (_agent_status_tx, agent_status) = watch::channel(AgentStatus::PendingInit);
|
|
let (session, ctx, _rx_evt) = crate::session::tests::make_session_and_context_with_rx().await;
|
|
let codex = Arc::new(Codex {
|
|
tx_sub,
|
|
rx_event: rx_events,
|
|
agent_status,
|
|
session: Arc::clone(&session),
|
|
session_loop_termination: completed_session_loop_termination(),
|
|
});
|
|
|
|
let (tx_out, rx_out) = bounded(1);
|
|
tx_out
|
|
.send(Event {
|
|
id: "full".to_string(),
|
|
msg: EventMsg::TurnAborted(TurnAbortedEvent {
|
|
turn_id: Some("turn-1".to_string()),
|
|
reason: TurnAbortReason::Interrupted,
|
|
completed_at: None,
|
|
duration_ms: None,
|
|
}),
|
|
})
|
|
.await
|
|
.unwrap();
|
|
|
|
let cancel = CancellationToken::new();
|
|
let forward = tokio::spawn(forward_events(
|
|
Arc::clone(&codex),
|
|
tx_out.clone(),
|
|
session,
|
|
ctx,
|
|
Arc::new(Mutex::new(HashMap::new())),
|
|
cancel.clone(),
|
|
));
|
|
|
|
tx_events
|
|
.send(Event {
|
|
id: "evt".to_string(),
|
|
msg: EventMsg::RawResponseItem(RawResponseItemEvent {
|
|
item: ResponseItem::CustomToolCall {
|
|
id: None,
|
|
status: None,
|
|
call_id: "call-1".to_string(),
|
|
name: "tool".to_string(),
|
|
input: "{}".to_string(),
|
|
},
|
|
}),
|
|
})
|
|
.await
|
|
.unwrap();
|
|
|
|
drop(tx_events);
|
|
cancel.cancel();
|
|
timeout(std::time::Duration::from_millis(1000), forward)
|
|
.await
|
|
.expect("forward_events hung")
|
|
.expect("forward_events join error");
|
|
|
|
let received = rx_out.recv().await.expect("prefilled event missing");
|
|
assert_eq!("full", received.id);
|
|
let mut ops = Vec::new();
|
|
while let Ok(sub) = rx_sub.try_recv() {
|
|
ops.push(sub.op);
|
|
}
|
|
assert!(
|
|
ops.iter().any(|op| matches!(op, Op::Interrupt)),
|
|
"expected Interrupt op after cancellation"
|
|
);
|
|
assert!(
|
|
ops.iter().any(|op| matches!(op, Op::Shutdown)),
|
|
"expected Shutdown op after cancellation"
|
|
);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn forward_ops_preserves_submission_trace_context() {
|
|
let (tx_sub, rx_sub) = bounded(SUBMISSION_CHANNEL_CAPACITY);
|
|
let (_tx_events, rx_events) = bounded(SUBMISSION_CHANNEL_CAPACITY);
|
|
let (_agent_status_tx, agent_status) = watch::channel(AgentStatus::PendingInit);
|
|
let (session, _ctx, _rx_evt) = crate::session::tests::make_session_and_context_with_rx().await;
|
|
let codex = Arc::new(Codex {
|
|
tx_sub,
|
|
rx_event: rx_events,
|
|
agent_status,
|
|
session,
|
|
session_loop_termination: completed_session_loop_termination(),
|
|
});
|
|
let (tx_ops, rx_ops) = bounded(1);
|
|
let cancel = CancellationToken::new();
|
|
let forward = tokio::spawn(forward_ops(Arc::clone(&codex), rx_ops, cancel));
|
|
|
|
let submission = Submission {
|
|
id: "sub-1".to_string(),
|
|
op: Op::Interrupt,
|
|
trace: Some(codex_protocol::protocol::W3cTraceContext {
|
|
traceparent: Some(
|
|
"00-1234567890abcdef1234567890abcdef-1234567890abcdef-01".to_string(),
|
|
),
|
|
tracestate: Some("vendor=state".to_string()),
|
|
}),
|
|
};
|
|
tx_ops.send(submission.clone()).await.unwrap();
|
|
drop(tx_ops);
|
|
|
|
let forwarded = timeout(Duration::from_secs(1), rx_sub.recv())
|
|
.await
|
|
.expect("forward_ops hung")
|
|
.expect("forwarded submission missing");
|
|
assert_eq!(submission.id, forwarded.id);
|
|
assert_eq!(submission.op, forwarded.op);
|
|
assert_eq!(submission.trace, forwarded.trace);
|
|
|
|
timeout(Duration::from_secs(1), forward)
|
|
.await
|
|
.expect("forward_ops did not exit")
|
|
.expect("forward_ops join error");
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn run_codex_thread_interactive_respects_pre_cancelled_spawn() {
|
|
let (parent_session, parent_ctx, _rx_events) =
|
|
crate::session::tests::make_session_and_context_with_rx().await;
|
|
let cancel_token = CancellationToken::new();
|
|
cancel_token.cancel();
|
|
|
|
let result = timeout(
|
|
Duration::from_secs(/*secs*/ 1),
|
|
run_codex_thread_interactive(
|
|
parent_ctx.config.as_ref().clone(),
|
|
Arc::clone(&parent_session.services.auth_manager),
|
|
Arc::clone(&parent_session.services.models_manager),
|
|
parent_session,
|
|
parent_ctx,
|
|
cancel_token,
|
|
SubAgentSource::Review,
|
|
/*initial_history*/ None,
|
|
),
|
|
)
|
|
.await
|
|
.expect("cancelled delegate spawn should not hang");
|
|
|
|
assert!(matches!(result, Err(CodexErr::TurnAborted)));
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn handle_request_permissions_uses_tool_call_id_for_round_trip() {
|
|
let (parent_session, parent_ctx, rx_events) =
|
|
crate::session::tests::make_session_and_context_with_rx().await;
|
|
*parent_session.active_turn.lock().await = Some(crate::state::ActiveTurn::default());
|
|
|
|
let (tx_sub, rx_sub) = bounded(SUBMISSION_CHANNEL_CAPACITY);
|
|
let (_tx_events, rx_events_child) = bounded(SUBMISSION_CHANNEL_CAPACITY);
|
|
let (_agent_status_tx, agent_status) = watch::channel(AgentStatus::PendingInit);
|
|
let codex = Arc::new(Codex {
|
|
tx_sub,
|
|
rx_event: rx_events_child,
|
|
agent_status,
|
|
session: Arc::clone(&parent_session),
|
|
session_loop_termination: completed_session_loop_termination(),
|
|
});
|
|
|
|
let call_id = "tool-call-1".to_string();
|
|
let expected_response = RequestPermissionsResponse {
|
|
permissions: RequestPermissionProfile {
|
|
network: Some(NetworkPermissions {
|
|
enabled: Some(true),
|
|
}),
|
|
..RequestPermissionProfile::default()
|
|
},
|
|
scope: PermissionGrantScope::Turn,
|
|
strict_auto_review: false,
|
|
};
|
|
let delegated_cwd = parent_ctx.cwd.join("delegated-cwd");
|
|
let cancel_token = CancellationToken::new();
|
|
let request_call_id = call_id.clone();
|
|
let request_cwd = delegated_cwd.clone();
|
|
|
|
let handle = tokio::spawn({
|
|
let codex = Arc::clone(&codex);
|
|
let parent_session = Arc::clone(&parent_session);
|
|
let parent_ctx = Arc::clone(&parent_ctx);
|
|
let cancel_token = cancel_token.clone();
|
|
async move {
|
|
handle_request_permissions(
|
|
codex.as_ref(),
|
|
&parent_session,
|
|
&parent_ctx,
|
|
RequestPermissionsEvent {
|
|
call_id: request_call_id,
|
|
turn_id: "child-turn-1".to_string(),
|
|
started_at_ms: 0,
|
|
reason: Some("need access".to_string()),
|
|
permissions: RequestPermissionProfile {
|
|
network: Some(NetworkPermissions {
|
|
enabled: Some(true),
|
|
}),
|
|
..RequestPermissionProfile::default()
|
|
},
|
|
cwd: Some(request_cwd),
|
|
},
|
|
&cancel_token,
|
|
)
|
|
.await;
|
|
}
|
|
});
|
|
|
|
let request_event = timeout(Duration::from_secs(1), rx_events.recv())
|
|
.await
|
|
.expect("request_permissions event timed out")
|
|
.expect("request_permissions event missing");
|
|
let EventMsg::RequestPermissions(request) = request_event.msg else {
|
|
panic!("expected RequestPermissions event");
|
|
};
|
|
assert_eq!(request.call_id, call_id.clone());
|
|
assert_eq!(request.cwd, Some(delegated_cwd));
|
|
|
|
parent_session
|
|
.notify_request_permissions_response(&call_id, expected_response.clone())
|
|
.await;
|
|
|
|
timeout(Duration::from_secs(1), handle)
|
|
.await
|
|
.expect("handle_request_permissions hung")
|
|
.expect("handle_request_permissions join error");
|
|
|
|
let submission = timeout(Duration::from_secs(1), rx_sub.recv())
|
|
.await
|
|
.expect("request_permissions response timed out")
|
|
.expect("request_permissions response missing");
|
|
assert_eq!(
|
|
submission.op,
|
|
Op::RequestPermissionsResponse {
|
|
id: call_id,
|
|
response: expected_response,
|
|
}
|
|
);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn handle_exec_approval_uses_call_id_for_guardian_review_and_approval_id_for_reply() {
|
|
let (parent_session, parent_ctx, rx_events) =
|
|
crate::session::tests::make_session_and_context_with_rx().await;
|
|
let mut parent_ctx = Arc::try_unwrap(parent_ctx).expect("single turn context ref");
|
|
let mut config = (*parent_ctx.config).clone();
|
|
config.approvals_reviewer = ApprovalsReviewer::AutoReview;
|
|
parent_ctx.config = Arc::new(config);
|
|
parent_ctx
|
|
.approval_policy
|
|
.set(AskForApproval::OnRequest)
|
|
.expect("set on-request policy");
|
|
let parent_ctx = Arc::new(parent_ctx);
|
|
|
|
let (tx_sub, rx_sub) = bounded(SUBMISSION_CHANNEL_CAPACITY);
|
|
let (_tx_events, rx_events_child) = bounded(SUBMISSION_CHANNEL_CAPACITY);
|
|
let (_agent_status_tx, agent_status) = watch::channel(AgentStatus::PendingInit);
|
|
let codex = Arc::new(Codex {
|
|
tx_sub,
|
|
rx_event: rx_events_child,
|
|
agent_status,
|
|
session: Arc::clone(&parent_session),
|
|
session_loop_termination: completed_session_loop_termination(),
|
|
});
|
|
|
|
let cancel_token = CancellationToken::new();
|
|
let handle = tokio::spawn({
|
|
let codex = Arc::clone(&codex);
|
|
let parent_session = Arc::clone(&parent_session);
|
|
let parent_ctx = Arc::clone(&parent_ctx);
|
|
let cancel_token = cancel_token.clone();
|
|
async move {
|
|
handle_exec_approval(
|
|
codex.as_ref(),
|
|
"child-turn-1".to_string(),
|
|
&parent_session,
|
|
&parent_ctx,
|
|
ExecApprovalRequestEvent {
|
|
call_id: "command-item-1".to_string(),
|
|
approval_id: Some("callback-approval-1".to_string()),
|
|
turn_id: "child-turn-1".to_string(),
|
|
started_at_ms: 0,
|
|
command: vec!["rm".to_string(), "-rf".to_string(), "tmp".to_string()],
|
|
cwd: test_path_buf("/tmp").abs(),
|
|
reason: Some("unsafe subcommand".to_string()),
|
|
network_approval_context: None,
|
|
proposed_execpolicy_amendment: None,
|
|
proposed_network_policy_amendments: None,
|
|
additional_permissions: None,
|
|
available_decisions: Some(vec![
|
|
ReviewDecision::Approved,
|
|
ReviewDecision::Abort,
|
|
]),
|
|
parsed_cmd: Vec::new(),
|
|
},
|
|
&cancel_token,
|
|
)
|
|
.await;
|
|
}
|
|
});
|
|
|
|
let assessment_event = timeout(Duration::from_secs(2), async {
|
|
loop {
|
|
let event = rx_events.recv().await.expect("guardian assessment event");
|
|
if let EventMsg::GuardianAssessment(assessment) = event.msg {
|
|
return assessment;
|
|
}
|
|
}
|
|
})
|
|
.await
|
|
.expect("timed out waiting for guardian assessment");
|
|
let expected_action = GuardianAssessmentAction::Command {
|
|
source: GuardianCommandSource::Shell,
|
|
command: "rm -rf tmp".to_string(),
|
|
cwd: test_path_buf("/tmp").abs(),
|
|
};
|
|
assert!(!assessment_event.id.is_empty());
|
|
assert_eq!(
|
|
assessment_event.target_item_id.as_deref(),
|
|
Some("command-item-1")
|
|
);
|
|
assert_eq!(assessment_event.turn_id, parent_ctx.sub_id);
|
|
assert_eq!(
|
|
assessment_event.status,
|
|
GuardianAssessmentStatus::InProgress
|
|
);
|
|
assert_eq!(assessment_event.risk_level, None);
|
|
assert_eq!(assessment_event.user_authorization, None);
|
|
assert_eq!(assessment_event.rationale, None);
|
|
assert_eq!(assessment_event.decision_source, None);
|
|
assert_eq!(assessment_event.action, expected_action);
|
|
|
|
cancel_token.cancel();
|
|
|
|
timeout(Duration::from_secs(2), handle)
|
|
.await
|
|
.expect("handle_exec_approval hung")
|
|
.expect("handle_exec_approval join error");
|
|
|
|
let submission = timeout(Duration::from_secs(2), rx_sub.recv())
|
|
.await
|
|
.expect("exec approval response timed out")
|
|
.expect("exec approval response missing");
|
|
assert_eq!(
|
|
submission.op,
|
|
Op::ExecApproval {
|
|
id: "callback-approval-1".to_string(),
|
|
turn_id: Some("child-turn-1".to_string()),
|
|
decision: ReviewDecision::Abort,
|
|
}
|
|
);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn delegated_mcp_guardian_abort_returns_synthetic_decline_answer() {
|
|
let (parent_session, parent_ctx, _rx_events) =
|
|
crate::session::tests::make_session_and_context_with_rx().await;
|
|
let mut parent_ctx = Arc::try_unwrap(parent_ctx).expect("single turn context ref");
|
|
let mut config = (*parent_ctx.config).clone();
|
|
config.approvals_reviewer = ApprovalsReviewer::AutoReview;
|
|
parent_ctx.config = Arc::new(config);
|
|
parent_ctx
|
|
.approval_policy
|
|
.set(AskForApproval::OnRequest)
|
|
.expect("set on-request policy");
|
|
let parent_ctx = Arc::new(parent_ctx);
|
|
|
|
let pending_mcp_invocations = Arc::new(Mutex::new(HashMap::from([(
|
|
"call-1".to_string(),
|
|
McpInvocation {
|
|
server: "custom_server".to_string(),
|
|
tool: "dangerous_tool".to_string(),
|
|
arguments: None,
|
|
},
|
|
)])));
|
|
let cancel_token = CancellationToken::new();
|
|
cancel_token.cancel();
|
|
|
|
let response = maybe_auto_review_mcp_request_user_input(
|
|
&parent_session,
|
|
&parent_ctx,
|
|
&pending_mcp_invocations,
|
|
&RequestUserInputEvent {
|
|
call_id: "call-1".to_string(),
|
|
turn_id: "child-turn-1".to_string(),
|
|
questions: vec![RequestUserInputQuestion {
|
|
id: format!("{MCP_TOOL_APPROVAL_QUESTION_ID_PREFIX}_call-1"),
|
|
header: "Approve app tool call?".to_string(),
|
|
question: "Allow this app tool?".to_string(),
|
|
is_other: false,
|
|
is_secret: false,
|
|
options: None,
|
|
}],
|
|
},
|
|
&cancel_token,
|
|
)
|
|
.await;
|
|
|
|
assert_eq!(
|
|
response,
|
|
Some(RequestUserInputResponse {
|
|
answers: HashMap::from([(
|
|
format!("{MCP_TOOL_APPROVAL_QUESTION_ID_PREFIX}_call-1"),
|
|
RequestUserInputAnswer {
|
|
answers: vec![MCP_TOOL_APPROVAL_DECLINE_SYNTHETIC.to_string()],
|
|
},
|
|
)]),
|
|
})
|
|
);
|
|
}
|