Compare commits

...

14 Commits

Author SHA1 Message Date
Albin Cassirer
081b0d27f9 Small design doc update. 2026-04-19 18:18:41 -07:00
Albin Cassirer
c58e8e329f Add skill invoked observation projection\n\nAdd a skill.invoked observation and map it through the existing analytics\nskill invocation reducer path. Extend the feature conformance test so the\nlegacy skill fact and shared observation produce the same analytics payload. 2026-04-19 17:32:36 -07:00
Albin Cassirer
1c9756600a Document typed observation compatibility boundary\n\nClarify that typed observation events are internal source types rather than\nexported wire schemas. Reducer outputs remain the compatibility boundary, so\nobservation taxonomy can evolve with reducers in the same change unless raw\nobservations become persisted or cross-version contracts. 2026-04-19 17:23:57 -07:00
Albin Cassirer
df90ed05fe Add review completed observation projection\n\nIntroduce a generic review.completed observation that separates the reviewed action\nfrom the reviewer response. Model user and guardian reviewer responses in the\nshared event stream, while projecting only guardian responses into the current\nlegacy analytics event.\n\nKeep the guardian analytics mapping isolated in a dedicated projection module\nand cover the reviewed-action variants with targeted projection tests. Update the\ndesign doc to clarify that field uses are the analytics selection marker, with\ndetail/class metadata retained for review, auditing, and sink-specific behavior. 2026-04-19 17:22:16 -07:00
Albin Cassirer
a8b9f08fc0 [observability] Add compaction ended observation
Introduce a typed compaction.ended observation and project it through the existing analytics compaction fact so the legacy reducer remains the only analytics emission path.

Add a side-by-side reducer test covering the subagent thread metadata join and verifying the observation path produces the same codex_compaction_event payload as the legacy fact.
2026-04-19 16:49:17 -07:00
Albin Cassirer
9b1329ad22 [observability] Add turn steer observation
Introduce a typed turn.steer observation for same-turn steering outcomes, including accepted and rejected requests with stable rejection reasons.

Project the observation through the existing analytics reducer state so the current codex_turn_steer_event payload and steer-count side effect remain matched. Add side-by-side conformance coverage against the legacy request/response and error-response sources.
2026-04-19 16:42:40 -07:00
Albin Cassirer
04667d0ea6 [observability] Cover thread started analytics variants
Expand the thread.started observation conformance test so the observation path is checked against the existing analytics reducer for new, resumed, forked, and subagent thread activations.

This keeps the scope limited to proof coverage for the thread lifecycle taxonomy without adding another event family or instrumentation path.
2026-04-19 16:36:01 -07:00
Albin Cassirer
6922fe3b52 [observability] Add thread started observation\n\nIntroduce a typed thread.started observation that models when Codex starts\ntracking a thread for a runtime or client connection, including whether the\nactivation is new, forked, or resumed.\n\nProject the observation into the existing analytics reducer state without\nadding a new analytics fact type, so thread connection metadata and later\nturn/compaction joins continue to use the same reducer path as legacy\napp-server responses.\n\nAdd side-by-side conformance coverage proving the observation path emits the\nsame codex_thread_initialized payload as the current thread/start response\nsource. 2026-04-19 16:30:10 -07:00
Albin Cassirer
3198988a52 [observability] Fold turn config into turn started\n\nMove turn-specific observation definitions into an events::turn module and\nre-export them from the shared event namespace.\n\nExtend turn.started with resolved configuration so the canonical event models\nthe runtime fact directly instead of mirroring the analytics reducer's separate\nconfig fact. Project turn.started into the existing resolved-config fact plus\nturn-start notification, and update conformance coverage so the observation path\nmatches the legacy codex_turn_event output.\n\nUpdate the design doc to describe resolved turn config as part of turn.started. 2026-04-19 16:08:18 -07:00
Albin Cassirer
7418d6e364 [observability] Add turn lifecycle observations\n\nIntroduce shared turn.started and turn.ended observations, including terminal\nstatus, timing, and optional provider token usage on the terminal event.\n\nProject those observations into the existing analytics reducer state and add a\nside-by-side conformance test showing the observation path produces the same\ncodex_turn_event payload as the legacy inputs for the migrated fields.\n\nUpdate the design doc so turn token accounting is modeled as part of the\nterminal turn event rather than a separate canonical event. 2026-04-19 15:54:59 -07:00
Albin Cassirer
10f7e100d9 [observability] Add analytics feature observations\n\nIntroduce typed observation events for app mentions, hook runs, and plugin\nusage/state changes, with analytics use markers on the event fields.\n\nAdd a private analytics projection module that maps shared observations into\nthe existing legacy analytics payloads, keeping the observation reducer focused\non ingestion orchestration. Cover the new projection path with side-by-side\ntests against the existing analytics facts.\n\nUpdate the design doc to record that backend-specific schema mapping should\nremain isolated from the shared observation taxonomy. 2026-04-19 15:35:17 -07:00
Albin Cassirer
b615ce9291 [observability] Add explicit field use markers
Add FieldUse metadata so fields declare the exact projections intended to consume them. Detail level and data class remain guardrails enforced by FieldPolicy, while uses drives selection and prevents broad policies from accidentally collecting every safe-looking field.

Support struct-level default uses in the Observation derive, with field-level uses overriding the default for mixed events. AppUsed now declares analytics once at the event level instead of repeating it on every field.

Keep analytics conformance focused on final TrackEventRequest equality and move marker/policy filtering coverage into codex-observability tests. Update the design doc to describe the intent-versus-guardrail split.
2026-04-19 15:17:38 -07:00
Albin Cassirer
590ed405d2 [observability] Add field policy gate
Introduce FieldPolicy as the small shared primitive that decides whether a sink may inspect a field before serialization. The policy combines the field detail limit with the allowed data classes, keeping destination-specific rules outside the event definitions.

Add tests for the basic remote-safe shape and for the important safety invariant: denied content and secret-risk fields are filtered before serialization, proven with fields that panic if a visitor tries to serialize them.
2026-04-19 14:56:32 -07:00
Albin Cassirer
50e23d92c6 [observability] Add shared observation foundation
Introduce a small codex-observability crate with typed observations, field-level policy metadata, and a derive macro that requires each exported field to declare detail level and data class. The derive crate stays narrow: it only generates visitor calls and leaves filtering, serialization, and destination-specific mapping to sinks and reducers.

Add the first canonical app.used observation and a private analytics observation reducer that projects it through the existing AnalyticsReducer. This gives us a side-by-side conformance test where the legacy analytics fact path and the observation path must emit identical TrackEventRequest JSON.

Document the target architecture and staged verification plan so later work can expand the taxonomy, field policy, E2E shadowing, rollout trace, and OTEL projection without widening this first checkpoint.
2026-04-19 14:53:13 -07:00
22 changed files with 4338 additions and 23 deletions

20
codex-rs/Cargo.lock generated
View File

@@ -1382,6 +1382,7 @@ dependencies = [
"codex-app-server-protocol",
"codex-git-utils",
"codex-login",
"codex-observability",
"codex-plugin",
"codex-protocol",
"codex-utils-absolute-path",
@@ -2563,6 +2564,25 @@ dependencies = [
"url",
]
[[package]]
name = "codex-observability"
version = "0.0.0"
dependencies = [
"codex-observability-derive",
"pretty_assertions",
"serde",
"serde_json",
]
[[package]]
name = "codex-observability-derive"
version = "0.0.0"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.114",
]
[[package]]
name = "codex-ollama"
version = "0.0.0"

View File

@@ -48,6 +48,8 @@ members = [
"models-manager",
"network-proxy",
"ollama",
"observability",
"observability-derive",
"process-hardening",
"protocol",
"realtime-webrtc",
@@ -153,6 +155,8 @@ codex-model-provider-info = { path = "model-provider-info" }
codex-models-manager = { path = "models-manager" }
codex-network-proxy = { path = "network-proxy" }
codex-ollama = { path = "ollama" }
codex-observability = { path = "observability" }
codex-observability-derive = { path = "observability-derive" }
codex-otel = { path = "otel" }
codex-plugin = { path = "plugin" }
codex-model-provider = { path = "model-provider" }

View File

@@ -16,6 +16,7 @@ workspace = true
codex-app-server-protocol = { workspace = true }
codex-git-utils = { workspace = true }
codex-login = { workspace = true }
codex-observability = { workspace = true }
codex-plugin = { workspace = true }
codex-protocol = { workspace = true }
os_info = { workspace = true }

View File

@@ -46,6 +46,7 @@ use crate::facts::TurnResolvedConfigFact;
use crate::facts::TurnStatus;
use crate::facts::TurnSteerRequestError;
use crate::facts::TurnTokenUsageFact;
use crate::observation_reducer::AnalyticsObservationReducer;
use crate::reducer::AnalyticsReducer;
use crate::reducer::normalize_path_for_skill_id;
use crate::reducer::skill_id_for_local_skill;
@@ -64,6 +65,7 @@ use codex_app_server_protocol::SandboxPolicy as AppServerSandboxPolicy;
use codex_app_server_protocol::ServerNotification;
use codex_app_server_protocol::SessionSource as AppServerSessionSource;
use codex_app_server_protocol::Thread;
use codex_app_server_protocol::ThreadForkResponse;
use codex_app_server_protocol::ThreadResumeResponse;
use codex_app_server_protocol::ThreadStartResponse;
use codex_app_server_protocol::ThreadStatus as AppServerThreadStatus;
@@ -201,6 +203,27 @@ fn sample_thread_resume_response_with_source(
}
}
fn sample_thread_fork_response(thread_id: &str, ephemeral: bool, model: &str) -> ClientResponse {
let mut thread = sample_thread(thread_id, ephemeral);
thread.forked_from_id = Some("parent-thread".to_string());
ClientResponse::ThreadFork {
request_id: RequestId::Integer(3),
response: ThreadForkResponse {
thread,
model: model.to_string(),
model_provider: "openai".to_string(),
service_tier: None,
cwd: test_path_buf("/tmp").abs(),
instruction_sources: Vec::new(),
approval_policy: AppServerAskForApproval::OnFailure,
approvals_reviewer: AppServerApprovalsReviewer::User,
sandbox: AppServerSandboxPolicy::DangerFullAccess,
reasoning_effort: None,
},
}
}
fn sample_turn_start_request(thread_id: &str, request_id: i64) -> ClientRequest {
ClientRequest::TurnStart {
request_id: RequestId::Integer(request_id),
@@ -435,25 +458,24 @@ async fn ingest_rejected_turn_steer(
}
async fn ingest_initialize(reducer: &mut AnalyticsReducer, out: &mut Vec<TrackEventRequest>) {
reducer
.ingest(
AnalyticsFact::Initialize {
connection_id: 7,
params: InitializeParams {
client_info: ClientInfo {
name: "codex-tui".to_string(),
title: None,
version: "1.0.0".to_string(),
},
capabilities: None,
},
product_client_id: "codex-tui".to_string(),
runtime: sample_runtime_metadata(),
rpc_transport: AppServerRpcTransport::Stdio,
reducer.ingest(sample_initialize_fact(), out).await;
}
fn sample_initialize_fact() -> AnalyticsFact {
AnalyticsFact::Initialize {
connection_id: 7,
params: InitializeParams {
client_info: ClientInfo {
name: "codex-tui".to_string(),
title: None,
version: "1.0.0".to_string(),
},
out,
)
.await;
capabilities: None,
},
product_client_id: "codex-tui".to_string(),
runtime: sample_runtime_metadata(),
rpc_transport: AppServerRpcTransport::Stdio,
}
}
async fn ingest_turn_prerequisites(
@@ -1553,6 +1575,438 @@ async fn reducer_ingests_app_and_plugin_facts() {
assert_eq!(payload[2]["event_type"], "codex_plugin_used");
}
#[tokio::test]
async fn app_used_observation_matches_legacy_analytics_fact() {
let mut legacy_reducer = AnalyticsReducer::default();
let mut observation_reducer = AnalyticsObservationReducer::default();
let mut legacy_events = Vec::new();
let mut observation_events = Vec::new();
let observation = codex_observability::events::AppUsed {
model_slug: "gpt-5",
thread_id: "thread-1",
turn_id: "turn-1",
connector_id: Some("drive"),
app_name: Some("Drive"),
invocation_type: Some(codex_observability::events::InvocationType::Implicit),
};
legacy_reducer
.ingest(
AnalyticsFact::Custom(CustomAnalyticsFact::AppUsed(AppUsedInput {
tracking: TrackEventsContext {
model_slug: "gpt-5".to_string(),
thread_id: "thread-1".to_string(),
turn_id: "turn-1".to_string(),
},
app: AppInvocation {
connector_id: Some("drive".to_string()),
app_name: Some("Drive".to_string()),
invocation_type: Some(InvocationType::Implicit),
},
})),
&mut legacy_events,
)
.await;
observation_reducer
.ingest_app_used(observation, &mut observation_events)
.await;
let legacy_payload = serde_json::to_value(&legacy_events).expect("serialize legacy events");
let observation_payload =
serde_json::to_value(&observation_events).expect("serialize observation events");
assert_eq!(observation_payload, legacy_payload);
}
#[tokio::test]
async fn feature_observations_match_legacy_analytics_facts() {
let mut legacy_reducer = AnalyticsReducer::default();
let mut observation_reducer = AnalyticsObservationReducer::default();
let mut legacy_events = Vec::new();
let mut observation_events = Vec::new();
let tracking = TrackEventsContext {
model_slug: "gpt-5".to_string(),
thread_id: "thread-1".to_string(),
turn_id: "turn-1".to_string(),
};
let connector_ids = vec!["calendar".to_string(), "drive".to_string()];
let skill_path = PathBuf::from("/Users/abc/.codex/skills/doc/SKILL.md");
legacy_reducer
.ingest(
AnalyticsFact::Custom(CustomAnalyticsFact::SkillInvoked(SkillInvokedInput {
tracking: tracking.clone(),
invocations: vec![SkillInvocation {
skill_name: "doc".to_string(),
skill_scope: codex_protocol::protocol::SkillScope::User,
skill_path: skill_path.clone(),
invocation_type: InvocationType::Explicit,
}],
})),
&mut legacy_events,
)
.await;
legacy_reducer
.ingest(
AnalyticsFact::Custom(CustomAnalyticsFact::AppMentioned(AppMentionedInput {
tracking: tracking.clone(),
mentions: vec![AppInvocation {
connector_id: Some("calendar".to_string()),
app_name: Some("Calendar".to_string()),
invocation_type: Some(InvocationType::Explicit),
}],
})),
&mut legacy_events,
)
.await;
legacy_reducer
.ingest(
AnalyticsFact::Custom(CustomAnalyticsFact::HookRun(HookRunInput {
tracking: tracking.clone(),
hook: HookRunFact {
event_name: HookEventName::PostToolUse,
hook_source: HookSource::Unknown,
status: HookRunStatus::Failed,
},
})),
&mut legacy_events,
)
.await;
legacy_reducer
.ingest(
AnalyticsFact::Custom(CustomAnalyticsFact::PluginUsed(PluginUsedInput {
tracking: tracking.clone(),
plugin: sample_plugin_metadata(),
})),
&mut legacy_events,
)
.await;
legacy_reducer
.ingest(
AnalyticsFact::Custom(CustomAnalyticsFact::PluginStateChanged(
PluginStateChangedInput {
plugin: sample_plugin_metadata(),
state: PluginState::Disabled,
},
)),
&mut legacy_events,
)
.await;
observation_reducer
.ingest_skill_invoked(
codex_observability::events::SkillInvoked {
model_slug: "gpt-5",
thread_id: "thread-1",
turn_id: "turn-1",
skill_name: "doc",
skill_scope: codex_observability::events::SkillScope::User,
skill_path: skill_path.as_path(),
invocation_type: codex_observability::events::InvocationType::Explicit,
},
&mut observation_events,
)
.await;
observation_reducer
.ingest_app_mentioned(
codex_observability::events::AppMentioned {
model_slug: "gpt-5",
thread_id: "thread-1",
turn_id: "turn-1",
connector_id: Some("calendar"),
app_name: Some("Calendar"),
invocation_type: Some(codex_observability::events::InvocationType::Explicit),
},
&mut observation_events,
)
.await;
observation_reducer.ingest_hook_run_completed(
codex_observability::events::HookRunCompleted {
model_slug: "gpt-5",
thread_id: "thread-1",
turn_id: "turn-1",
hook_name: "PostToolUse",
hook_source: "unknown",
status: codex_observability::events::HookRunStatus::Failed,
},
&mut observation_events,
);
observation_reducer.ingest_plugin_used(
codex_observability::events::PluginUsed {
model_slug: "gpt-5",
thread_id: "thread-1",
turn_id: "turn-1",
plugin_id: "sample@test",
plugin_name: "sample",
marketplace_name: "test",
has_skills: Some(true),
mcp_server_count: Some(2),
connector_ids: Some(connector_ids.as_slice()),
},
&mut observation_events,
);
observation_reducer.ingest_plugin_state_changed(
codex_observability::events::PluginStateChanged {
plugin_id: "sample@test",
plugin_name: "sample",
marketplace_name: "test",
has_skills: Some(true),
mcp_server_count: Some(2),
connector_ids: Some(connector_ids.as_slice()),
state: codex_observability::events::PluginState::Disabled,
},
&mut observation_events,
);
let legacy_payload = serde_json::to_value(&legacy_events).expect("serialize legacy events");
let observation_payload =
serde_json::to_value(&observation_events).expect("serialize observation events");
assert_eq!(observation_payload, legacy_payload);
}
#[tokio::test]
async fn thread_started_observation_matches_legacy_responses() {
assert_thread_started_observation_matches_legacy_response(
sample_thread_start_response("thread-observed", /*ephemeral*/ false, "gpt-5"),
codex_observability::events::ThreadStarted {
thread_id: "thread-observed",
source: codex_observability::events::ThreadSource::User,
parent_thread_id: None,
initialization_mode: codex_observability::events::ThreadInitializationMode::New,
model: "gpt-5",
ephemeral: false,
created_at: 1,
},
)
.await;
assert_thread_started_observation_matches_legacy_response(
sample_thread_resume_response("thread-resumed", /*ephemeral*/ true, "gpt-5.1"),
codex_observability::events::ThreadStarted {
thread_id: "thread-resumed",
source: codex_observability::events::ThreadSource::User,
parent_thread_id: None,
initialization_mode: codex_observability::events::ThreadInitializationMode::Resumed,
model: "gpt-5.1",
ephemeral: true,
created_at: 1,
},
)
.await;
assert_thread_started_observation_matches_legacy_response(
sample_thread_fork_response("thread-forked", /*ephemeral*/ false, "gpt-5.2"),
codex_observability::events::ThreadStarted {
thread_id: "thread-forked",
source: codex_observability::events::ThreadSource::User,
parent_thread_id: None,
initialization_mode: codex_observability::events::ThreadInitializationMode::Forked,
model: "gpt-5.2",
ephemeral: false,
created_at: 1,
},
)
.await;
let parent_thread_id =
codex_protocol::ThreadId::from_string("22222222-2222-2222-2222-222222222222")
.expect("valid parent thread id");
assert_thread_started_observation_matches_legacy_response(
sample_thread_resume_response_with_source(
"thread-subagent",
/*ephemeral*/ false,
"gpt-5",
AppServerSessionSource::SubAgent(SubAgentSource::ThreadSpawn {
parent_thread_id,
depth: 1,
agent_path: None,
agent_nickname: None,
agent_role: None,
}),
),
codex_observability::events::ThreadStarted {
thread_id: "thread-subagent",
source: codex_observability::events::ThreadSource::Subagent(
codex_observability::events::ThreadSubagentKind::ThreadSpawn,
),
parent_thread_id: Some("22222222-2222-2222-2222-222222222222"),
initialization_mode: codex_observability::events::ThreadInitializationMode::Resumed,
model: "gpt-5",
ephemeral: false,
created_at: 1,
},
)
.await;
}
async fn assert_thread_started_observation_matches_legacy_response(
legacy_response: ClientResponse,
observation: codex_observability::events::ThreadStarted<'_>,
) {
let mut legacy_reducer = AnalyticsReducer::default();
let mut observation_reducer = AnalyticsObservationReducer::default();
let mut legacy_events = Vec::new();
let mut observation_events = Vec::new();
ingest_initialize(&mut legacy_reducer, &mut legacy_events).await;
legacy_reducer
.ingest(
AnalyticsFact::Response {
connection_id: 7,
response: Box::new(legacy_response),
},
&mut legacy_events,
)
.await;
observation_reducer
.ingest_existing_fact_for_test(sample_initialize_fact(), &mut observation_events)
.await;
observation_reducer.ingest_thread_started(7, observation, &mut observation_events);
let legacy_payload = serde_json::to_value(&legacy_events).expect("serialize legacy events");
let observation_payload =
serde_json::to_value(&observation_events).expect("serialize observation events");
assert_eq!(observation_payload, legacy_payload);
}
#[tokio::test]
async fn turn_observations_match_legacy_sources() {
let mut legacy_reducer = AnalyticsReducer::default();
let mut observation_reducer = AnalyticsObservationReducer::default();
let mut legacy_events = Vec::new();
let mut observation_events = Vec::new();
ingest_turn_prerequisites(
&mut legacy_reducer,
&mut legacy_events,
/*include_initialize*/ true,
/*include_resolved_config*/ true,
/*include_started*/ true,
/*include_token_usage*/ true,
)
.await;
legacy_reducer
.ingest(
AnalyticsFact::Notification(Box::new(sample_turn_completed_notification(
"thread-2",
"turn-2",
AppServerTurnStatus::Completed,
/*codex_error_info*/ None,
))),
&mut legacy_events,
)
.await;
observation_reducer
.ingest_existing_fact_for_test(
AnalyticsFact::Initialize {
connection_id: 7,
params: InitializeParams {
client_info: ClientInfo {
name: "codex-tui".to_string(),
title: None,
version: "1.0.0".to_string(),
},
capabilities: None,
},
product_client_id: "codex-tui".to_string(),
runtime: sample_runtime_metadata(),
rpc_transport: AppServerRpcTransport::Stdio,
},
&mut observation_events,
)
.await;
observation_reducer
.ingest_existing_fact_for_test(
AnalyticsFact::Response {
connection_id: 7,
response: Box::new(sample_thread_start_response(
"thread-2", /*ephemeral*/ false, "gpt-5",
)),
},
&mut observation_events,
)
.await;
observation_events.clear();
// Keep not-yet-migrated request/thread context identical on both sides.
// This test swaps started/ended turn facts, including resolved config and
// token accounting, to observations while comparing the final payload.
observation_reducer
.ingest_existing_fact_for_test(
AnalyticsFact::Request {
connection_id: 7,
request_id: RequestId::Integer(3),
request: Box::new(sample_turn_start_request("thread-2", /*request_id*/ 3)),
},
&mut observation_events,
)
.await;
observation_reducer
.ingest_existing_fact_for_test(
AnalyticsFact::Response {
connection_id: 7,
response: Box::new(sample_turn_start_response("turn-2", /*request_id*/ 3)),
},
&mut observation_events,
)
.await;
observation_reducer
.ingest_turn_started(
codex_observability::events::TurnStarted {
thread_id: "thread-2",
turn_id: "turn-2",
config: codex_observability::events::TurnConfig {
num_input_images: 1,
submission_type: None,
ephemeral: false,
model: "gpt-5",
model_provider: "openai",
sandbox_mode: codex_observability::events::SandboxMode::ReadOnly,
sandbox_network_access: true,
reasoning_effort: None,
reasoning_summary: None,
service_tier: None,
approval_policy: codex_observability::events::ApprovalPolicy::OnRequest,
approval_reviewer:
codex_observability::events::ApprovalReviewer::GuardianSubagent,
collaboration_mode: codex_observability::events::CollaborationMode::Plan,
personality: None,
is_first_turn: true,
},
started_at: 455,
},
&mut observation_events,
)
.await;
observation_reducer
.ingest_turn_ended(
codex_observability::events::TurnEnded {
thread_id: "thread-2",
turn_id: "turn-2",
status: codex_observability::events::TurnStatus::Completed,
token_usage: Some(codex_observability::events::TurnTokenUsage {
input_tokens: 123,
cached_input_tokens: 45,
output_tokens: 140,
reasoning_output_tokens: 13,
total_tokens: 321,
}),
ended_at: 456,
duration_ms: 1234,
},
&mut observation_events,
)
.await;
let legacy_payload = serde_json::to_value(&legacy_events).expect("serialize legacy events");
let observation_payload =
serde_json::to_value(&observation_events).expect("serialize observation events");
assert_eq!(observation_payload, legacy_payload);
}
#[tokio::test]
async fn reducer_ingests_plugin_state_changed_fact() {
let mut reducer = AnalyticsReducer::default();
@@ -1769,6 +2223,409 @@ async fn accepted_turn_steer_emits_expected_event() {
assert!(payload["event_params"].get("product_client_id").is_none());
}
#[tokio::test]
async fn turn_steer_observations_match_legacy_sources() {
let mut legacy_reducer = AnalyticsReducer::default();
let mut observation_reducer = AnalyticsObservationReducer::default();
let mut legacy_events = Vec::new();
let mut observation_events = Vec::new();
ingest_initialize(&mut legacy_reducer, &mut legacy_events).await;
legacy_reducer
.ingest(
AnalyticsFact::Response {
connection_id: 7,
response: Box::new(sample_thread_start_response(
"thread-2", /*ephemeral*/ false, "gpt-5",
)),
},
&mut legacy_events,
)
.await;
legacy_events.clear();
observation_reducer
.ingest_existing_fact_for_test(sample_initialize_fact(), &mut observation_events)
.await;
observation_reducer
.ingest_existing_fact_for_test(
AnalyticsFact::Response {
connection_id: 7,
response: Box::new(sample_thread_start_response(
"thread-2", /*ephemeral*/ false, "gpt-5",
)),
},
&mut observation_events,
)
.await;
observation_events.clear();
legacy_reducer
.ingest(
AnalyticsFact::Request {
connection_id: 7,
request_id: RequestId::Integer(4),
request: Box::new(sample_turn_steer_request(
"thread-2", "turn-2", /*request_id*/ 4,
)),
},
&mut legacy_events,
)
.await;
legacy_reducer
.ingest(
AnalyticsFact::Response {
connection_id: 7,
response: Box::new(sample_turn_steer_response("turn-2", /*request_id*/ 4)),
},
&mut legacy_events,
)
.await;
let accepted_created_at = legacy_event_created_at(&legacy_events[0]);
observation_reducer.ingest_turn_steer(
7,
codex_observability::events::TurnSteer {
thread_id: "thread-2",
expected_turn_id: "turn-2",
accepted_turn_id: Some("turn-2"),
num_input_images: 1,
result: codex_observability::events::TurnSteerResult::Accepted,
rejection_reason: None,
created_at: accepted_created_at,
},
&mut observation_events,
);
legacy_reducer
.ingest(
AnalyticsFact::Request {
connection_id: 7,
request_id: RequestId::Integer(5),
request: Box::new(sample_turn_steer_request(
"thread-2", "turn-2", /*request_id*/ 5,
)),
},
&mut legacy_events,
)
.await;
legacy_reducer
.ingest(
AnalyticsFact::ErrorResponse {
connection_id: 7,
request_id: RequestId::Integer(5),
error: no_active_turn_steer_error(),
error_type: Some(no_active_turn_steer_error_type()),
},
&mut legacy_events,
)
.await;
let rejected_created_at = legacy_event_created_at(&legacy_events[1]);
observation_reducer.ingest_turn_steer(
7,
codex_observability::events::TurnSteer {
thread_id: "thread-2",
expected_turn_id: "turn-2",
accepted_turn_id: None,
num_input_images: 1,
result: codex_observability::events::TurnSteerResult::Rejected,
rejection_reason: Some(
codex_observability::events::TurnSteerRejectionReason::NoActiveTurn,
),
created_at: rejected_created_at,
},
&mut observation_events,
);
let legacy_payload = serde_json::to_value(&legacy_events).expect("serialize legacy events");
let observation_payload =
serde_json::to_value(&observation_events).expect("serialize observation events");
assert_eq!(observation_payload, legacy_payload);
}
#[tokio::test]
async fn compaction_ended_observation_matches_legacy_fact() {
use codex_observability::events as observation_events;
let mut legacy_reducer = AnalyticsReducer::default();
let mut observation_reducer = AnalyticsObservationReducer::default();
let mut legacy_events = Vec::new();
let mut observation_events = Vec::new();
let parent_thread_id =
codex_protocol::ThreadId::from_string("22222222-2222-2222-2222-222222222222")
.expect("valid parent thread id");
ingest_initialize(&mut legacy_reducer, &mut legacy_events).await;
legacy_reducer
.ingest(
AnalyticsFact::Response {
connection_id: 7,
response: Box::new(sample_thread_resume_response_with_source(
"thread-1",
/*ephemeral*/ false,
"gpt-5",
AppServerSessionSource::SubAgent(SubAgentSource::ThreadSpawn {
parent_thread_id,
depth: 1,
agent_path: None,
agent_nickname: None,
agent_role: None,
}),
)),
},
&mut legacy_events,
)
.await;
legacy_events.clear();
observation_reducer
.ingest_existing_fact_for_test(sample_initialize_fact(), &mut observation_events)
.await;
let parent_thread_id =
codex_protocol::ThreadId::from_string("22222222-2222-2222-2222-222222222222")
.expect("valid parent thread id");
observation_reducer
.ingest_existing_fact_for_test(
AnalyticsFact::Response {
connection_id: 7,
response: Box::new(sample_thread_resume_response_with_source(
"thread-1",
/*ephemeral*/ false,
"gpt-5",
AppServerSessionSource::SubAgent(SubAgentSource::ThreadSpawn {
parent_thread_id,
depth: 1,
agent_path: None,
agent_nickname: None,
agent_role: None,
}),
)),
},
&mut observation_events,
)
.await;
observation_events.clear();
legacy_reducer
.ingest(
AnalyticsFact::Custom(CustomAnalyticsFact::Compaction(Box::new(
CodexCompactionEvent {
thread_id: "thread-1".to_string(),
turn_id: "turn-compact".to_string(),
trigger: CompactionTrigger::Manual,
reason: CompactionReason::UserRequested,
implementation: CompactionImplementation::Responses,
phase: CompactionPhase::StandaloneTurn,
strategy: CompactionStrategy::Memento,
status: CompactionStatus::Failed,
error: Some("context limit exceeded".to_string()),
active_context_tokens_before: 131_000,
active_context_tokens_after: 131_000,
started_at: 100,
completed_at: 101,
duration_ms: Some(1200),
},
))),
&mut legacy_events,
)
.await;
observation_reducer
.ingest_compaction_ended(
observation_events::CompactionEnded {
thread_id: "thread-1",
turn_id: "turn-compact",
trigger: observation_events::CompactionTrigger::Manual,
reason: observation_events::CompactionReason::UserRequested,
implementation: observation_events::CompactionImplementation::Responses,
phase: observation_events::CompactionPhase::StandaloneTurn,
strategy: observation_events::CompactionStrategy::Memento,
status: observation_events::CompactionStatus::Failed {
error: Some("context limit exceeded"),
},
active_context_tokens_before: 131_000,
active_context_tokens_after: 131_000,
started_at: 100,
ended_at: 101,
duration_ms: Some(1200),
},
&mut observation_events,
)
.await;
let legacy_payload = serde_json::to_value(&legacy_events).expect("serialize legacy events");
let observation_payload =
serde_json::to_value(&observation_events).expect("serialize observation events");
assert_eq!(observation_payload, legacy_payload);
}
#[tokio::test]
async fn review_completed_observation_matches_legacy_guardian_fact() {
use crate::events as analytics_events;
use codex_observability::events as observation_events;
let mut legacy_reducer = AnalyticsReducer::default();
let mut observation_reducer = AnalyticsObservationReducer::default();
let mut legacy_events = Vec::new();
let mut observation_events = Vec::new();
ingest_initialize(&mut legacy_reducer, &mut legacy_events).await;
legacy_reducer
.ingest(
AnalyticsFact::Response {
connection_id: 7,
response: Box::new(sample_thread_start_response(
"thread-guardian",
/*ephemeral*/ false,
"gpt-5",
)),
},
&mut legacy_events,
)
.await;
legacy_events.clear();
observation_reducer
.ingest_existing_fact_for_test(sample_initialize_fact(), &mut observation_events)
.await;
observation_reducer
.ingest_existing_fact_for_test(
AnalyticsFact::Response {
connection_id: 7,
response: Box::new(sample_thread_start_response(
"thread-guardian",
/*ephemeral*/ false,
"gpt-5",
)),
},
&mut observation_events,
)
.await;
observation_events.clear();
legacy_reducer
.ingest(
AnalyticsFact::Custom(CustomAnalyticsFact::GuardianReview(Box::new(
analytics_events::GuardianReviewEventParams {
thread_id: "thread-guardian".to_string(),
turn_id: "turn-guardian".to_string(),
review_id: "review-1".to_string(),
target_item_id: "item-1".to_string(),
retry_reason: Some("parse_retry".to_string()),
approval_request_source:
analytics_events::GuardianApprovalRequestSource::DelegatedSubagent,
reviewed_action: analytics_events::GuardianReviewedAction::McpToolCall {
server: "drive".to_string(),
tool_name: "search".to_string(),
connector_id: Some("drive-connector".to_string()),
connector_name: Some("Drive".to_string()),
tool_title: Some("Search Drive".to_string()),
},
reviewed_action_truncated: false,
decision: analytics_events::GuardianReviewDecision::Aborted,
terminal_status: analytics_events::GuardianReviewTerminalStatus::FailedClosed,
failure_reason: Some(analytics_events::GuardianReviewFailureReason::ParseError),
risk_level: Some(analytics_events::GuardianReviewRiskLevel::High),
user_authorization: Some(
analytics_events::GuardianReviewUserAuthorization::Medium,
),
outcome: Some(analytics_events::GuardianReviewOutcome::Deny),
rationale: Some("review could not parse final policy".to_string()),
guardian_thread_id: Some("guardian-thread-1".to_string()),
guardian_session_kind: Some(
analytics_events::GuardianReviewSessionKind::TrunkNew,
),
guardian_model: Some("gpt-5".to_string()),
guardian_reasoning_effort: Some("medium".to_string()),
had_prior_review_context: Some(true),
review_timeout_ms: 30_000,
tool_call_count: 2,
time_to_first_token_ms: Some(100),
completion_latency_ms: Some(1500),
started_at: 100,
completed_at: Some(102),
input_tokens: Some(123),
cached_input_tokens: Some(45),
output_tokens: Some(140),
reasoning_output_tokens: Some(13),
total_tokens: Some(321),
},
))),
&mut legacy_events,
)
.await;
observation_reducer
.ingest_review_completed(
observation_events::ReviewCompleted {
thread_id: "thread-guardian",
turn_id: "turn-guardian",
review_id: "review-1",
target_item_id: "item-1",
retry_reason: Some("parse_retry"),
request_source: observation_events::ReviewRequestSource::DelegatedSubagent,
reviewed_action: observation_events::ReviewedAction::McpToolCall {
server: "drive",
tool_name: "search",
connector_id: Some("drive-connector"),
connector_name: Some("Drive"),
tool_title: Some("Search Drive"),
},
reviewed_action_truncated: false,
response: observation_events::ReviewResponse::Guardian(
observation_events::GuardianReviewResponse {
decision: observation_events::ReviewDecision::Aborted,
terminal_status: observation_events::ReviewTerminalStatus::FailedClosed {
failure_reason: Some(
observation_events::ReviewFailureReason::ParseError,
),
},
risk_level: Some(observation_events::ReviewRiskLevel::High),
user_authorization: Some(
observation_events::ReviewUserAuthorization::Medium,
),
outcome: Some(observation_events::ReviewOutcome::Deny),
rationale: Some("review could not parse final policy"),
session: Some(observation_events::GuardianReviewSession {
guardian_thread_id: "guardian-thread-1",
session_kind: observation_events::GuardianReviewSessionKind::TrunkNew,
model: "gpt-5",
reasoning_effort: Some("medium"),
had_prior_review_context: true,
}),
review_timeout_ms: 30_000,
tool_call_count: 2,
time_to_first_token_ms: Some(100),
completion_latency_ms: Some(1500),
token_usage: Some(observation_events::TurnTokenUsage {
input_tokens: 123,
cached_input_tokens: 45,
output_tokens: 140,
reasoning_output_tokens: 13,
total_tokens: 321,
}),
},
),
started_at: 100,
ended_at: 102,
},
&mut observation_events,
)
.await;
let legacy_payload = serde_json::to_value(&legacy_events).expect("serialize legacy events");
let observation_payload =
serde_json::to_value(&observation_events).expect("serialize observation events");
assert_eq!(observation_payload, legacy_payload);
}
fn legacy_event_created_at(event: &TrackEventRequest) -> i64 {
serde_json::to_value(event)
.expect("serialize event")
.pointer("/event_params/created_at")
.and_then(serde_json::Value::as_i64)
.expect("created_at")
}
#[tokio::test]
async fn rejected_turn_steer_uses_request_connection_metadata() {
let mut reducer = AnalyticsReducer::default();

View File

@@ -1,7 +1,10 @@
mod client;
mod events;
mod facts;
mod observation_projection;
mod observation_reducer;
mod reducer;
mod review_observation_projection;
use std::time::SystemTime;
use std::time::UNIX_EPOCH;

View File

@@ -0,0 +1,416 @@
//! Projection from shared observations into the current analytics schema.
//!
//! The observation taxonomy is intended to describe what Codex did, not the
//! shape of any particular telemetry backend. This private module is the
//! adapter boundary where typed observations are translated into the legacy
//! analytics facts and track-event payloads that already exist today.
use crate::events::CodexHookRunEventRequest;
use crate::events::CodexHookRunMetadata;
use crate::events::CodexPluginEventRequest;
use crate::events::CodexPluginMetadata;
use crate::events::CodexPluginUsedEventRequest;
use crate::events::CodexPluginUsedMetadata;
use crate::events::TrackEventRequest;
use crate::facts;
use crate::facts::AppInvocation;
use crate::facts::AppMentionedInput;
use crate::facts::AppUsedInput;
use crate::facts::SkillInvocation;
use crate::facts::SkillInvokedInput;
use crate::facts::TrackEventsContext;
use crate::facts::TurnResolvedConfigFact;
use crate::facts::TurnSubmissionType as AnalyticsTurnSubmissionType;
use crate::facts::TurnTokenUsageFact;
use codex_app_server_protocol::ServerNotification;
use codex_app_server_protocol::Turn;
use codex_app_server_protocol::TurnCompletedNotification;
use codex_app_server_protocol::TurnStartedNotification;
use codex_app_server_protocol::TurnStatus as AppServerTurnStatus;
use codex_login::default_client::originator;
use codex_observability::events;
use codex_protocol::config_types::ApprovalsReviewer;
use codex_protocol::config_types::ModeKind;
use codex_protocol::config_types::Personality;
use codex_protocol::config_types::ReasoningSummary;
use codex_protocol::config_types::ServiceTier;
use codex_protocol::openai_models::ReasoningEffort;
use codex_protocol::protocol::AskForApproval;
use codex_protocol::protocol::GranularApprovalConfig;
use codex_protocol::protocol::HookRunStatus as ProtocolHookRunStatus;
use codex_protocol::protocol::NetworkAccess;
use codex_protocol::protocol::ReadOnlyAccess;
use codex_protocol::protocol::SandboxPolicy;
use codex_protocol::protocol::SessionSource;
use codex_protocol::protocol::SkillScope as ProtocolSkillScope;
use codex_protocol::protocol::TokenUsage;
pub(crate) fn skill_invoked_input(observation: events::SkillInvoked<'_>) -> SkillInvokedInput {
SkillInvokedInput {
tracking: tracking_from_fields(
observation.model_slug,
observation.thread_id,
observation.turn_id,
),
invocations: vec![SkillInvocation {
skill_name: observation.skill_name.to_string(),
skill_scope: match observation.skill_scope {
events::SkillScope::User => ProtocolSkillScope::User,
events::SkillScope::Repo => ProtocolSkillScope::Repo,
events::SkillScope::System => ProtocolSkillScope::System,
events::SkillScope::Admin => ProtocolSkillScope::Admin,
},
skill_path: observation.skill_path.to_path_buf(),
invocation_type: map_invocation_type(observation.invocation_type),
}],
}
}
pub(crate) fn app_mentioned_input(observation: events::AppMentioned<'_>) -> AppMentionedInput {
AppMentionedInput {
tracking: tracking_from_fields(
observation.model_slug,
observation.thread_id,
observation.turn_id,
),
mentions: vec![app_invocation_from_fields(
observation.connector_id,
observation.app_name,
observation.invocation_type,
)],
}
}
pub(crate) fn app_used_input(observation: events::AppUsed<'_>) -> AppUsedInput {
AppUsedInput {
tracking: tracking_from_fields(
observation.model_slug,
observation.thread_id,
observation.turn_id,
),
app: app_invocation_from_fields(
observation.connector_id,
observation.app_name,
observation.invocation_type,
),
}
}
pub(crate) fn hook_run_completed_event(
observation: events::HookRunCompleted<'_>,
) -> TrackEventRequest {
TrackEventRequest::HookRun(CodexHookRunEventRequest {
event_type: "codex_hook_run",
event_params: CodexHookRunMetadata {
thread_id: Some(observation.thread_id.to_string()),
turn_id: Some(observation.turn_id.to_string()),
model_slug: Some(observation.model_slug.to_string()),
hook_name: Some(observation.hook_name.to_string()),
hook_source: Some(observation.hook_source),
status: Some(match observation.status {
events::HookRunStatus::Completed => ProtocolHookRunStatus::Completed,
events::HookRunStatus::Failed => ProtocolHookRunStatus::Failed,
events::HookRunStatus::Blocked => ProtocolHookRunStatus::Blocked,
events::HookRunStatus::Stopped => ProtocolHookRunStatus::Stopped,
}),
},
})
}
pub(crate) fn plugin_used_event(observation: events::PluginUsed<'_>) -> TrackEventRequest {
TrackEventRequest::PluginUsed(CodexPluginUsedEventRequest {
event_type: "codex_plugin_used",
event_params: CodexPluginUsedMetadata {
plugin: plugin_metadata_from_fields(
observation.plugin_id,
observation.plugin_name,
observation.marketplace_name,
observation.has_skills,
observation.mcp_server_count,
observation.connector_ids,
),
thread_id: Some(observation.thread_id.to_string()),
turn_id: Some(observation.turn_id.to_string()),
model_slug: Some(observation.model_slug.to_string()),
},
})
}
pub(crate) fn plugin_state_changed_event(
observation: events::PluginStateChanged<'_>,
) -> TrackEventRequest {
let event = CodexPluginEventRequest {
event_type: match observation.state {
events::PluginState::Installed => "codex_plugin_installed",
events::PluginState::Uninstalled => "codex_plugin_uninstalled",
events::PluginState::Enabled => "codex_plugin_enabled",
events::PluginState::Disabled => "codex_plugin_disabled",
},
event_params: plugin_metadata_from_fields(
observation.plugin_id,
observation.plugin_name,
observation.marketplace_name,
observation.has_skills,
observation.mcp_server_count,
observation.connector_ids,
),
};
match observation.state {
events::PluginState::Installed => TrackEventRequest::PluginInstalled(event),
events::PluginState::Uninstalled => TrackEventRequest::PluginUninstalled(event),
events::PluginState::Enabled => TrackEventRequest::PluginEnabled(event),
events::PluginState::Disabled => TrackEventRequest::PluginDisabled(event),
}
}
pub(crate) fn turn_started_notification(
observation: events::TurnStarted<'_>,
) -> ServerNotification {
ServerNotification::TurnStarted(TurnStartedNotification {
thread_id: observation.thread_id.to_string(),
turn: Turn {
id: observation.turn_id.to_string(),
items: vec![],
status: AppServerTurnStatus::InProgress,
error: None,
started_at: Some(observation.started_at),
completed_at: None,
duration_ms: None,
},
})
}
pub(crate) fn turn_resolved_config_fact(
observation: &events::TurnStarted<'_>,
) -> TurnResolvedConfigFact {
let config = observation.config;
TurnResolvedConfigFact {
turn_id: observation.turn_id.to_string(),
thread_id: observation.thread_id.to_string(),
num_input_images: config.num_input_images,
submission_type: config
.submission_type
.map(|submission_type| match submission_type {
events::TurnSubmissionType::Default => AnalyticsTurnSubmissionType::Default,
events::TurnSubmissionType::Queued => AnalyticsTurnSubmissionType::Queued,
}),
ephemeral: config.ephemeral,
// The legacy fact carries session_source, but codex_turn_event derives
// thread source from thread lifecycle metadata instead. Keep this
// placeholder local to the projection until a consumer needs it.
session_source: SessionSource::Unknown,
model: config.model.to_string(),
model_provider: config.model_provider.to_string(),
sandbox_policy: match config.sandbox_mode {
events::SandboxMode::FullAccess => SandboxPolicy::DangerFullAccess,
events::SandboxMode::ReadOnly => SandboxPolicy::ReadOnly {
access: ReadOnlyAccess::FullAccess,
network_access: config.sandbox_network_access,
},
events::SandboxMode::WorkspaceWrite => SandboxPolicy::WorkspaceWrite {
writable_roots: Vec::new(),
read_only_access: ReadOnlyAccess::FullAccess,
network_access: config.sandbox_network_access,
exclude_tmpdir_env_var: false,
exclude_slash_tmp: false,
},
events::SandboxMode::ExternalSandbox => SandboxPolicy::ExternalSandbox {
network_access: if config.sandbox_network_access {
NetworkAccess::Enabled
} else {
NetworkAccess::Restricted
},
},
},
reasoning_effort: config
.reasoning_effort
.map(|reasoning_effort| match reasoning_effort {
events::ReasoningEffort::None => ReasoningEffort::None,
events::ReasoningEffort::Minimal => ReasoningEffort::Minimal,
events::ReasoningEffort::Low => ReasoningEffort::Low,
events::ReasoningEffort::Medium => ReasoningEffort::Medium,
events::ReasoningEffort::High => ReasoningEffort::High,
events::ReasoningEffort::XHigh => ReasoningEffort::XHigh,
}),
reasoning_summary: config.reasoning_summary.map(
|reasoning_summary| match reasoning_summary {
events::ReasoningSummary::Auto => ReasoningSummary::Auto,
events::ReasoningSummary::Concise => ReasoningSummary::Concise,
events::ReasoningSummary::Detailed => ReasoningSummary::Detailed,
events::ReasoningSummary::None => ReasoningSummary::None,
},
),
service_tier: config.service_tier.map(|service_tier| match service_tier {
events::ServiceTier::Fast => ServiceTier::Fast,
events::ServiceTier::Flex => ServiceTier::Flex,
}),
approval_policy: match config.approval_policy {
events::ApprovalPolicy::Untrusted => AskForApproval::UnlessTrusted,
events::ApprovalPolicy::OnFailure => AskForApproval::OnFailure,
events::ApprovalPolicy::OnRequest => AskForApproval::OnRequest,
events::ApprovalPolicy::Granular => AskForApproval::Granular(GranularApprovalConfig {
sandbox_approval: true,
rules: true,
skill_approval: true,
request_permissions: true,
mcp_elicitations: true,
}),
events::ApprovalPolicy::Never => AskForApproval::Never,
},
approvals_reviewer: match config.approval_reviewer {
events::ApprovalReviewer::User => ApprovalsReviewer::User,
events::ApprovalReviewer::GuardianSubagent => ApprovalsReviewer::GuardianSubagent,
},
sandbox_network_access: config.sandbox_network_access,
collaboration_mode: match config.collaboration_mode {
events::CollaborationMode::Default => ModeKind::Default,
events::CollaborationMode::Plan => ModeKind::Plan,
},
personality: config.personality.map(|personality| match personality {
events::Personality::None => Personality::None,
events::Personality::Friendly => Personality::Friendly,
events::Personality::Pragmatic => Personality::Pragmatic,
}),
is_first_turn: config.is_first_turn,
}
}
pub(crate) fn turn_token_usage_fact(
observation: &events::TurnEnded<'_>,
) -> Option<TurnTokenUsageFact> {
let token_usage = observation.token_usage?;
Some(TurnTokenUsageFact {
turn_id: observation.turn_id.to_string(),
thread_id: observation.thread_id.to_string(),
token_usage: TokenUsage {
input_tokens: token_usage.input_tokens,
cached_input_tokens: token_usage.cached_input_tokens,
output_tokens: token_usage.output_tokens,
reasoning_output_tokens: token_usage.reasoning_output_tokens,
total_tokens: token_usage.total_tokens,
},
})
}
pub(crate) fn turn_ended_notification(observation: events::TurnEnded<'_>) -> ServerNotification {
ServerNotification::TurnCompleted(TurnCompletedNotification {
thread_id: observation.thread_id.to_string(),
turn: Turn {
id: observation.turn_id.to_string(),
items: vec![],
status: match observation.status {
events::TurnStatus::Completed => AppServerTurnStatus::Completed,
events::TurnStatus::Failed => AppServerTurnStatus::Failed,
events::TurnStatus::Interrupted => AppServerTurnStatus::Interrupted,
},
// Error taxonomy needs a separate design pass. Keeping it out of
// the first terminal-turn observation avoids baking app-server
// transport categories into the shared event model.
error: None,
started_at: None,
completed_at: Some(observation.ended_at),
duration_ms: Some(observation.duration_ms),
},
})
}
pub(crate) fn compaction_ended_event(
observation: events::CompactionEnded<'_>,
) -> facts::CodexCompactionEvent {
let (status, error) = match observation.status {
events::CompactionStatus::Completed => (facts::CompactionStatus::Completed, None),
events::CompactionStatus::Failed { error } => {
(facts::CompactionStatus::Failed, error.map(str::to_string))
}
events::CompactionStatus::Interrupted => (facts::CompactionStatus::Interrupted, None),
};
facts::CodexCompactionEvent {
thread_id: observation.thread_id.to_string(),
turn_id: observation.turn_id.to_string(),
trigger: match observation.trigger {
events::CompactionTrigger::Manual => facts::CompactionTrigger::Manual,
events::CompactionTrigger::Auto => facts::CompactionTrigger::Auto,
},
reason: match observation.reason {
events::CompactionReason::UserRequested => facts::CompactionReason::UserRequested,
events::CompactionReason::ContextLimit => facts::CompactionReason::ContextLimit,
events::CompactionReason::ModelDownshift => facts::CompactionReason::ModelDownshift,
},
implementation: match observation.implementation {
events::CompactionImplementation::Responses => {
facts::CompactionImplementation::Responses
}
events::CompactionImplementation::ResponsesCompact => {
facts::CompactionImplementation::ResponsesCompact
}
},
phase: match observation.phase {
events::CompactionPhase::StandaloneTurn => facts::CompactionPhase::StandaloneTurn,
events::CompactionPhase::PreTurn => facts::CompactionPhase::PreTurn,
events::CompactionPhase::MidTurn => facts::CompactionPhase::MidTurn,
},
strategy: match observation.strategy {
events::CompactionStrategy::Memento => facts::CompactionStrategy::Memento,
events::CompactionStrategy::PrefixCompaction => {
facts::CompactionStrategy::PrefixCompaction
}
},
status,
error,
active_context_tokens_before: observation.active_context_tokens_before,
active_context_tokens_after: observation.active_context_tokens_after,
started_at: u64::try_from(observation.started_at).unwrap_or_default(),
completed_at: u64::try_from(observation.ended_at).unwrap_or_default(),
duration_ms: observation
.duration_ms
.and_then(|duration_ms| u64::try_from(duration_ms).ok()),
}
}
fn tracking_from_fields(model_slug: &str, thread_id: &str, turn_id: &str) -> TrackEventsContext {
TrackEventsContext {
model_slug: model_slug.to_string(),
thread_id: thread_id.to_string(),
turn_id: turn_id.to_string(),
}
}
fn app_invocation_from_fields(
connector_id: Option<&str>,
app_name: Option<&str>,
invocation_type: Option<events::InvocationType>,
) -> AppInvocation {
AppInvocation {
connector_id: connector_id.map(str::to_string),
app_name: app_name.map(str::to_string),
invocation_type: invocation_type.map(map_invocation_type),
}
}
fn plugin_metadata_from_fields(
plugin_id: &str,
plugin_name: &str,
marketplace_name: &str,
has_skills: Option<bool>,
mcp_server_count: Option<usize>,
connector_ids: Option<&[String]>,
) -> CodexPluginMetadata {
CodexPluginMetadata {
plugin_id: Some(plugin_id.to_string()),
plugin_name: Some(plugin_name.to_string()),
marketplace_name: Some(marketplace_name.to_string()),
has_skills,
mcp_server_count,
connector_ids: connector_ids.map(<[String]>::to_vec),
product_client_id: Some(originator().value),
}
}
fn map_invocation_type(invocation_type: events::InvocationType) -> facts::InvocationType {
match invocation_type {
events::InvocationType::Explicit => facts::InvocationType::Explicit,
events::InvocationType::Implicit => facts::InvocationType::Implicit,
}
}

View File

@@ -0,0 +1,230 @@
//! Analytics reducer entrypoint for shared observations.
//!
//! This module deliberately reuses the existing analytics reducer while the
//! shared observation stream is being introduced. That gives conformance tests
//! a small, stable bridge: one side feeds legacy analytics facts and the other
//! feeds typed observations, then both paths must produce identical track
//! requests.
use crate::events::TrackEventRequest;
use crate::facts::AnalyticsFact;
use crate::facts::CustomAnalyticsFact;
use crate::observation_projection;
use crate::reducer::AnalyticsReducer;
use crate::review_observation_projection;
use codex_observability::events;
/// Analytics reducer entrypoint for typed observations.
#[derive(Default)]
pub(crate) struct AnalyticsObservationReducer {
legacy: AnalyticsReducer,
}
impl AnalyticsObservationReducer {
/// Feeds an existing analytics fact into the wrapped reducer for conformance tests.
///
/// The observation stream is being introduced incrementally, so tests need
/// to hold the not-yet-migrated lifecycle context constant while swapping a
/// specific source from legacy facts to observations.
#[cfg(test)]
pub(crate) async fn ingest_existing_fact_for_test(
&mut self,
fact: AnalyticsFact,
out: &mut Vec<TrackEventRequest>,
) {
self.legacy.ingest(fact, out).await;
}
/// Ingests a skill.invoked observation and emits the current analytics event.
pub(crate) async fn ingest_skill_invoked(
&mut self,
observation: events::SkillInvoked<'_>,
out: &mut Vec<TrackEventRequest>,
) {
self.legacy
.ingest(
AnalyticsFact::Custom(CustomAnalyticsFact::SkillInvoked(
observation_projection::skill_invoked_input(observation),
)),
out,
)
.await;
}
/// Ingests an app.mentioned observation and emits the current analytics event.
pub(crate) async fn ingest_app_mentioned(
&mut self,
observation: events::AppMentioned<'_>,
out: &mut Vec<TrackEventRequest>,
) {
self.legacy
.ingest(
AnalyticsFact::Custom(CustomAnalyticsFact::AppMentioned(
observation_projection::app_mentioned_input(observation),
)),
out,
)
.await;
}
/// Ingests an app.used observation and emits the current analytics event.
pub(crate) async fn ingest_app_used(
&mut self,
observation: events::AppUsed<'_>,
out: &mut Vec<TrackEventRequest>,
) {
self.legacy
.ingest(
AnalyticsFact::Custom(CustomAnalyticsFact::AppUsed(
observation_projection::app_used_input(observation),
)),
out,
)
.await;
}
/// Ingests a thread.started observation into the current analytics reducer state.
pub(crate) fn ingest_thread_started(
&mut self,
connection_id: u64,
observation: events::ThreadStarted<'_>,
out: &mut Vec<TrackEventRequest>,
) {
self.legacy
.ingest_observed_thread_started(connection_id, observation, out);
}
/// Ingests a turn.started observation into the current turn-event reducer state.
pub(crate) async fn ingest_turn_started(
&mut self,
observation: events::TurnStarted<'_>,
out: &mut Vec<TrackEventRequest>,
) {
self.legacy
.ingest(
AnalyticsFact::Custom(CustomAnalyticsFact::TurnResolvedConfig(Box::new(
observation_projection::turn_resolved_config_fact(&observation),
))),
out,
)
.await;
self.legacy
.ingest(
AnalyticsFact::Notification(Box::new(
observation_projection::turn_started_notification(observation),
)),
out,
)
.await;
}
/// Ingests a turn.ended observation into the current turn-event reducer state.
pub(crate) async fn ingest_turn_ended(
&mut self,
observation: events::TurnEnded<'_>,
out: &mut Vec<TrackEventRequest>,
) {
if let Some(token_usage) = observation_projection::turn_token_usage_fact(&observation) {
self.legacy
.ingest(
AnalyticsFact::Custom(CustomAnalyticsFact::TurnTokenUsage(Box::new(
token_usage,
))),
out,
)
.await;
}
self.legacy
.ingest(
AnalyticsFact::Notification(Box::new(
observation_projection::turn_ended_notification(observation),
)),
out,
)
.await;
}
/// Ingests a turn.steer observation and emits the current analytics event.
pub(crate) fn ingest_turn_steer(
&mut self,
connection_id: u64,
observation: events::TurnSteer<'_>,
out: &mut Vec<TrackEventRequest>,
) {
self.legacy
.ingest_observed_turn_steer(connection_id, observation, out);
}
/// Ingests a compaction.ended observation and emits the current analytics event.
pub(crate) async fn ingest_compaction_ended(
&mut self,
observation: events::CompactionEnded<'_>,
out: &mut Vec<TrackEventRequest>,
) {
self.legacy
.ingest(
AnalyticsFact::Custom(CustomAnalyticsFact::Compaction(Box::new(
observation_projection::compaction_ended_event(observation),
))),
out,
)
.await;
}
/// Ingests a review.completed observation for the legacy guardian analytics event.
///
/// User review responses are represented in the shared observation type but
/// do not have a legacy analytics event today.
pub(crate) async fn ingest_review_completed(
&mut self,
observation: events::ReviewCompleted<'_>,
out: &mut Vec<TrackEventRequest>,
) {
let Some(guardian_review) =
review_observation_projection::legacy_guardian_review_event(observation)
else {
return;
};
self.legacy
.ingest(
AnalyticsFact::Custom(CustomAnalyticsFact::GuardianReview(Box::new(
guardian_review,
))),
out,
)
.await;
}
/// Ingests a hook.run_completed observation and emits the current analytics event.
pub(crate) fn ingest_hook_run_completed(
&mut self,
observation: events::HookRunCompleted<'_>,
out: &mut Vec<TrackEventRequest>,
) {
out.push(observation_projection::hook_run_completed_event(
observation,
));
}
/// Ingests a plugin.used observation and emits the current analytics event.
pub(crate) fn ingest_plugin_used(
&mut self,
observation: events::PluginUsed<'_>,
out: &mut Vec<TrackEventRequest>,
) {
out.push(observation_projection::plugin_used_event(observation));
}
/// Ingests a plugin.state_changed observation and emits the current analytics event.
pub(crate) fn ingest_plugin_state_changed(
&mut self,
observation: events::PluginStateChanged<'_>,
out: &mut Vec<TrackEventRequest>,
) {
out.push(observation_projection::plugin_state_changed_event(
observation,
));
}
}

View File

@@ -58,6 +58,7 @@ use codex_app_server_protocol::UserInput;
use codex_git_utils::collect_git_info;
use codex_git_utils::get_git_repo_root;
use codex_login::default_client::originator;
use codex_observability::events as observation_events;
use codex_protocol::config_types::ModeKind;
use codex_protocol::config_types::Personality;
use codex_protocol::config_types::ReasoningSummary;
@@ -91,6 +92,18 @@ struct ThreadMetadataState {
parent_thread_id: Option<String>,
}
struct ThreadInitializedInput {
connection_id: u64,
thread_id: String,
thread_source: Option<&'static str>,
initialization_mode: ThreadInitializationMode,
subagent_source: Option<String>,
parent_thread_id: Option<String>,
model: String,
ephemeral: bool,
created_at: u64,
}
impl ThreadMetadataState {
fn from_thread_metadata(
session_source: &SessionSource,
@@ -670,11 +683,150 @@ impl AnalyticsReducer {
) {
let thread_source: SessionSource = thread.source.into();
let thread_id = thread.id;
let thread_metadata =
ThreadMetadataState::from_thread_metadata(&thread_source, initialization_mode);
self.emit_thread_initialized_event(
ThreadInitializedInput {
connection_id,
thread_id,
thread_source: thread_metadata.thread_source,
initialization_mode: thread_metadata.initialization_mode,
subagent_source: thread_metadata.subagent_source,
parent_thread_id: thread_metadata.parent_thread_id,
model,
ephemeral: thread.ephemeral,
created_at: u64::try_from(thread.created_at).unwrap_or_default(),
},
out,
);
}
pub(crate) fn ingest_observed_thread_started(
&mut self,
connection_id: u64,
observation: observation_events::ThreadStarted<'_>,
out: &mut Vec<TrackEventRequest>,
) {
let (thread_source, subagent_source) = match observation.source {
observation_events::ThreadSource::User => (Some("user"), None),
observation_events::ThreadSource::Subagent(kind) => (
Some("subagent"),
Some(match kind {
observation_events::ThreadSubagentKind::Review => "review".to_string(),
observation_events::ThreadSubagentKind::Compact => "compact".to_string(),
observation_events::ThreadSubagentKind::ThreadSpawn => {
"thread_spawn".to_string()
}
observation_events::ThreadSubagentKind::MemoryConsolidation => {
"memory_consolidation".to_string()
}
observation_events::ThreadSubagentKind::Other(source) => source.to_string(),
}),
),
observation_events::ThreadSource::AppServer
| observation_events::ThreadSource::Custom(_)
| observation_events::ThreadSource::Unknown => (None, None),
};
let initialization_mode = match observation.initialization_mode {
observation_events::ThreadInitializationMode::New => ThreadInitializationMode::New,
observation_events::ThreadInitializationMode::Forked => {
ThreadInitializationMode::Forked
}
observation_events::ThreadInitializationMode::Resumed => {
ThreadInitializationMode::Resumed
}
};
let input = ThreadInitializedInput {
connection_id,
thread_id: observation.thread_id.to_string(),
thread_source,
initialization_mode,
subagent_source,
parent_thread_id: observation.parent_thread_id.map(str::to_string),
model: observation.model.to_string(),
ephemeral: observation.ephemeral,
created_at: u64::try_from(observation.created_at).unwrap_or_default(),
};
self.emit_thread_initialized_event(input, out);
}
pub(crate) fn ingest_observed_turn_steer(
&mut self,
connection_id: u64,
observation: observation_events::TurnSteer<'_>,
out: &mut Vec<TrackEventRequest>,
) {
if let Some(accepted_turn_id) = observation.accepted_turn_id
&& let Some(turn_state) = self.turns.get_mut(accepted_turn_id)
{
turn_state.steer_count += 1;
}
self.emit_turn_steer_event(
connection_id,
PendingTurnSteerState {
thread_id: observation.thread_id.to_string(),
expected_turn_id: observation.expected_turn_id.to_string(),
num_input_images: observation.num_input_images,
created_at: u64::try_from(observation.created_at).unwrap_or_default(),
},
observation.accepted_turn_id.map(str::to_string),
match observation.result {
observation_events::TurnSteerResult::Accepted => TurnSteerResult::Accepted,
observation_events::TurnSteerResult::Rejected => TurnSteerResult::Rejected,
},
observation
.rejection_reason
.map(|rejection_reason| match rejection_reason {
observation_events::TurnSteerRejectionReason::NoActiveTurn => {
TurnSteerRejectionReason::NoActiveTurn
}
observation_events::TurnSteerRejectionReason::ExpectedTurnMismatch => {
TurnSteerRejectionReason::ExpectedTurnMismatch
}
observation_events::TurnSteerRejectionReason::NonSteerableReview => {
TurnSteerRejectionReason::NonSteerableReview
}
observation_events::TurnSteerRejectionReason::NonSteerableCompact => {
TurnSteerRejectionReason::NonSteerableCompact
}
observation_events::TurnSteerRejectionReason::EmptyInput => {
TurnSteerRejectionReason::EmptyInput
}
observation_events::TurnSteerRejectionReason::InputTooLarge => {
TurnSteerRejectionReason::InputTooLarge
}
}),
out,
);
}
fn emit_thread_initialized_event(
&mut self,
input: ThreadInitializedInput,
out: &mut Vec<TrackEventRequest>,
) {
let ThreadInitializedInput {
connection_id,
thread_id,
thread_source,
initialization_mode,
subagent_source,
parent_thread_id,
model,
ephemeral,
created_at,
} = input;
let thread_metadata = ThreadMetadataState {
thread_source,
initialization_mode,
subagent_source,
parent_thread_id,
};
let Some(connection_state) = self.connections.get(&connection_id) else {
return;
};
let thread_metadata =
ThreadMetadataState::from_thread_metadata(&thread_source, initialization_mode);
self.thread_connections
.insert(thread_id.clone(), connection_id);
self.thread_metadata
@@ -687,12 +839,12 @@ impl AnalyticsReducer {
app_server_client: connection_state.app_server_client.clone(),
runtime: connection_state.runtime.clone(),
model,
ephemeral: thread.ephemeral,
ephemeral,
thread_source: thread_metadata.thread_source,
initialization_mode,
initialization_mode: thread_metadata.initialization_mode,
subagent_source: thread_metadata.subagent_source,
parent_thread_id: thread_metadata.parent_thread_id,
created_at: u64::try_from(thread.created_at).unwrap_or_default(),
created_at,
},
},
));

View File

@@ -0,0 +1,471 @@
//! Projection from review observations into the current analytics schema.
use crate::events::GuardianApprovalRequestSource as AnalyticsGuardianApprovalRequestSource;
use crate::events::GuardianCommandSource as AnalyticsGuardianCommandSource;
use crate::events::GuardianReviewDecision as AnalyticsGuardianReviewDecision;
use crate::events::GuardianReviewEventParams;
use crate::events::GuardianReviewFailureReason as AnalyticsGuardianReviewFailureReason;
use crate::events::GuardianReviewOutcome as AnalyticsGuardianReviewOutcome;
use crate::events::GuardianReviewRiskLevel as AnalyticsGuardianReviewRiskLevel;
use crate::events::GuardianReviewSessionKind as AnalyticsGuardianReviewSessionKind;
use crate::events::GuardianReviewTerminalStatus as AnalyticsGuardianReviewTerminalStatus;
use crate::events::GuardianReviewUserAuthorization as AnalyticsGuardianReviewUserAuthorization;
use crate::events::GuardianReviewedAction as AnalyticsGuardianReviewedAction;
use codex_observability::events;
use codex_protocol::approvals::NetworkApprovalProtocol;
use codex_protocol::models::PermissionProfile;
use codex_protocol::models::SandboxPermissions;
/// Projects a generic review completion into the legacy guardian analytics event.
///
/// This preserves the existing guardian analytics payload, including reviewed
/// action details and guardian rationale that are explicitly marked for
/// analytics on the observation fields.
///
/// User review responses intentionally return None: the current analytics
/// schema only has a guardian review event, while the shared observation is
/// generic enough to represent both user and guardian review completions.
pub(crate) fn legacy_guardian_review_event(
observation: events::ReviewCompleted<'_>,
) -> Option<GuardianReviewEventParams> {
let events::ReviewResponse::Guardian(response) = observation.response else {
return None;
};
let (terminal_status, failure_reason) = match response.terminal_status {
events::ReviewTerminalStatus::Approved => {
(AnalyticsGuardianReviewTerminalStatus::Approved, None)
}
events::ReviewTerminalStatus::Denied => {
(AnalyticsGuardianReviewTerminalStatus::Denied, None)
}
events::ReviewTerminalStatus::Aborted { failure_reason } => (
AnalyticsGuardianReviewTerminalStatus::Aborted,
failure_reason.map(review_failure_reason),
),
events::ReviewTerminalStatus::TimedOut { failure_reason } => (
AnalyticsGuardianReviewTerminalStatus::TimedOut,
failure_reason.map(review_failure_reason),
),
events::ReviewTerminalStatus::FailedClosed { failure_reason } => (
AnalyticsGuardianReviewTerminalStatus::FailedClosed,
failure_reason.map(review_failure_reason),
),
};
let token_usage = response.token_usage;
let guardian_session = response.session;
Some(GuardianReviewEventParams {
thread_id: observation.thread_id.to_string(),
turn_id: observation.turn_id.to_string(),
review_id: observation.review_id.to_string(),
target_item_id: observation.target_item_id.to_string(),
retry_reason: observation.retry_reason.map(str::to_string),
approval_request_source: match observation.request_source {
events::ReviewRequestSource::MainTurn => {
AnalyticsGuardianApprovalRequestSource::MainTurn
}
events::ReviewRequestSource::DelegatedSubagent => {
AnalyticsGuardianApprovalRequestSource::DelegatedSubagent
}
},
reviewed_action: reviewed_action(observation.reviewed_action),
reviewed_action_truncated: observation.reviewed_action_truncated,
decision: match response.decision {
events::ReviewDecision::Approved => AnalyticsGuardianReviewDecision::Approved,
events::ReviewDecision::Denied => AnalyticsGuardianReviewDecision::Denied,
events::ReviewDecision::Aborted => AnalyticsGuardianReviewDecision::Aborted,
},
terminal_status,
failure_reason,
risk_level: response.risk_level.map(|risk_level| match risk_level {
events::ReviewRiskLevel::Low => AnalyticsGuardianReviewRiskLevel::Low,
events::ReviewRiskLevel::Medium => AnalyticsGuardianReviewRiskLevel::Medium,
events::ReviewRiskLevel::High => AnalyticsGuardianReviewRiskLevel::High,
events::ReviewRiskLevel::Critical => AnalyticsGuardianReviewRiskLevel::Critical,
}),
user_authorization: response.user_authorization.map(|user_authorization| {
match user_authorization {
events::ReviewUserAuthorization::Unknown => {
AnalyticsGuardianReviewUserAuthorization::Unknown
}
events::ReviewUserAuthorization::Low => {
AnalyticsGuardianReviewUserAuthorization::Low
}
events::ReviewUserAuthorization::Medium => {
AnalyticsGuardianReviewUserAuthorization::Medium
}
events::ReviewUserAuthorization::High => {
AnalyticsGuardianReviewUserAuthorization::High
}
}
}),
outcome: response.outcome.map(|outcome| match outcome {
events::ReviewOutcome::Allow => AnalyticsGuardianReviewOutcome::Allow,
events::ReviewOutcome::Deny => AnalyticsGuardianReviewOutcome::Deny,
}),
rationale: response.rationale.map(str::to_string),
guardian_thread_id: guardian_session.map(|session| session.guardian_thread_id.to_string()),
guardian_session_kind: guardian_session.map(|session| match session.session_kind {
events::GuardianReviewSessionKind::TrunkNew => {
AnalyticsGuardianReviewSessionKind::TrunkNew
}
events::GuardianReviewSessionKind::TrunkReused => {
AnalyticsGuardianReviewSessionKind::TrunkReused
}
events::GuardianReviewSessionKind::EphemeralForked => {
AnalyticsGuardianReviewSessionKind::EphemeralForked
}
}),
guardian_model: guardian_session.map(|session| session.model.to_string()),
guardian_reasoning_effort: guardian_session
.and_then(|session| session.reasoning_effort.map(str::to_string)),
had_prior_review_context: guardian_session.map(|session| session.had_prior_review_context),
review_timeout_ms: response.review_timeout_ms,
tool_call_count: response.tool_call_count,
time_to_first_token_ms: response.time_to_first_token_ms,
completion_latency_ms: response.completion_latency_ms,
started_at: u64::try_from(observation.started_at).unwrap_or_default(),
completed_at: Some(u64::try_from(observation.ended_at).unwrap_or_default()),
input_tokens: token_usage.map(|token_usage| token_usage.input_tokens),
cached_input_tokens: token_usage.map(|token_usage| token_usage.cached_input_tokens),
output_tokens: token_usage.map(|token_usage| token_usage.output_tokens),
reasoning_output_tokens: token_usage.map(|token_usage| token_usage.reasoning_output_tokens),
total_tokens: token_usage.map(|token_usage| token_usage.total_tokens),
})
}
fn reviewed_action(action: events::ReviewedAction<'_>) -> AnalyticsGuardianReviewedAction {
match action {
events::ReviewedAction::Shell {
command,
command_display,
cwd,
sandbox_permissions,
additional_permissions,
justification,
} => AnalyticsGuardianReviewedAction::Shell {
command: command.to_vec(),
command_display: command_display.to_string(),
cwd: cwd.to_string(),
sandbox_permissions: sandbox_permissions_for_review(sandbox_permissions),
additional_permissions: additional_permissions.and_then(permission_profile_for_review),
justification: justification.map(str::to_string),
},
events::ReviewedAction::UnifiedExec {
command,
command_display,
cwd,
sandbox_permissions,
additional_permissions,
justification,
tty,
} => AnalyticsGuardianReviewedAction::UnifiedExec {
command: command.to_vec(),
command_display: command_display.to_string(),
cwd: cwd.to_string(),
sandbox_permissions: sandbox_permissions_for_review(sandbox_permissions),
additional_permissions: additional_permissions.and_then(permission_profile_for_review),
justification: justification.map(str::to_string),
tty,
},
events::ReviewedAction::ProcessExec {
source,
program,
argv,
cwd,
additional_permissions,
} => AnalyticsGuardianReviewedAction::Execve {
source: match source {
events::ReviewCommandSource::Shell => AnalyticsGuardianCommandSource::Shell,
events::ReviewCommandSource::UnifiedExec => {
AnalyticsGuardianCommandSource::UnifiedExec
}
},
program: program.to_string(),
argv: argv.to_vec(),
cwd: cwd.to_string(),
additional_permissions: additional_permissions.and_then(permission_profile_for_review),
},
events::ReviewedAction::ApplyPatch { cwd, files } => {
AnalyticsGuardianReviewedAction::ApplyPatch {
cwd: cwd.to_string(),
files: files.to_vec(),
}
}
events::ReviewedAction::NetworkAccess {
target,
host,
protocol,
port,
} => AnalyticsGuardianReviewedAction::NetworkAccess {
target: target.to_string(),
host: host.to_string(),
protocol: match protocol {
events::ReviewNetworkApprovalProtocol::Http => NetworkApprovalProtocol::Http,
events::ReviewNetworkApprovalProtocol::Https => NetworkApprovalProtocol::Https,
events::ReviewNetworkApprovalProtocol::Socks5Tcp => {
NetworkApprovalProtocol::Socks5Tcp
}
events::ReviewNetworkApprovalProtocol::Socks5Udp => {
NetworkApprovalProtocol::Socks5Udp
}
},
port,
},
events::ReviewedAction::McpToolCall {
server,
tool_name,
connector_id,
connector_name,
tool_title,
} => AnalyticsGuardianReviewedAction::McpToolCall {
server: server.to_string(),
tool_name: tool_name.to_string(),
connector_id: connector_id.map(str::to_string),
connector_name: connector_name.map(str::to_string),
tool_title: tool_title.map(str::to_string),
},
}
}
fn sandbox_permissions_for_review(
sandbox_permissions: events::ReviewSandboxPermissions,
) -> SandboxPermissions {
match sandbox_permissions {
events::ReviewSandboxPermissions::UseDefault => SandboxPermissions::UseDefault,
events::ReviewSandboxPermissions::RequireEscalated => SandboxPermissions::RequireEscalated,
events::ReviewSandboxPermissions::WithAdditionalPermissions => {
SandboxPermissions::WithAdditionalPermissions
}
}
}
fn permission_profile_for_review(
profile: events::ReviewPermissionProfile<'_>,
) -> Option<PermissionProfile> {
// Keep observability independent of codex-protocol types. The protocol
// serde shape is the compatibility boundary for this nested payload.
let value = serde_json::to_value(profile).ok()?;
serde_json::from_value(value).ok()
}
fn review_failure_reason(
failure_reason: events::ReviewFailureReason,
) -> AnalyticsGuardianReviewFailureReason {
match failure_reason {
events::ReviewFailureReason::Timeout => AnalyticsGuardianReviewFailureReason::Timeout,
events::ReviewFailureReason::Cancelled => AnalyticsGuardianReviewFailureReason::Cancelled,
events::ReviewFailureReason::PromptBuildError => {
AnalyticsGuardianReviewFailureReason::PromptBuildError
}
events::ReviewFailureReason::SessionError => {
AnalyticsGuardianReviewFailureReason::SessionError
}
events::ReviewFailureReason::ParseError => AnalyticsGuardianReviewFailureReason::ParseError,
}
}
#[cfg(test)]
mod tests {
use super::*;
use pretty_assertions::assert_eq;
use serde_json::json;
fn projected_reviewed_action(action: events::ReviewedAction<'_>) -> serde_json::Value {
let event =
legacy_guardian_review_event(review_completed_with_action(action)).expect("project");
serde_json::to_value(event.reviewed_action).expect("serialize reviewed action")
}
fn review_completed_with_action(
reviewed_action: events::ReviewedAction<'_>,
) -> events::ReviewCompleted<'_> {
events::ReviewCompleted {
thread_id: "thread-1",
turn_id: "turn-1",
review_id: "review-1",
target_item_id: "item-1",
retry_reason: None,
request_source: events::ReviewRequestSource::MainTurn,
reviewed_action,
reviewed_action_truncated: false,
response: events::ReviewResponse::Guardian(events::GuardianReviewResponse {
decision: events::ReviewDecision::Approved,
terminal_status: events::ReviewTerminalStatus::Approved,
risk_level: None,
user_authorization: None,
outcome: None,
rationale: None,
session: None,
review_timeout_ms: 30_000,
tool_call_count: 0,
time_to_first_token_ms: None,
completion_latency_ms: None,
token_usage: None,
}),
started_at: 1,
ended_at: 2,
}
}
#[test]
fn legacy_guardian_projection_ignores_user_review_responses() {
let observation = events::ReviewCompleted {
thread_id: "thread-1",
turn_id: "turn-1",
review_id: "review-1",
target_item_id: "item-1",
retry_reason: None,
request_source: events::ReviewRequestSource::MainTurn,
reviewed_action: events::ReviewedAction::ApplyPatch {
cwd: "/repo",
files: &[],
},
reviewed_action_truncated: false,
response: events::ReviewResponse::User(events::UserReviewResponse {
decision: events::ReviewDecision::Approved,
}),
started_at: 1,
ended_at: 2,
};
assert!(legacy_guardian_review_event(observation).is_none());
}
#[test]
fn projects_shell_reviewed_action_with_permission_profile() {
let command = vec!["git".to_string(), "status".to_string()];
let read_paths = vec!["/repo".to_string()];
let write_paths = vec!["/repo/tmp".to_string()];
let action = events::ReviewedAction::Shell {
command: &command,
command_display: "git status",
cwd: "/repo",
sandbox_permissions: events::ReviewSandboxPermissions::WithAdditionalPermissions,
additional_permissions: Some(events::ReviewPermissionProfile {
network: Some(events::ReviewNetworkPermissions {
enabled: Some(true),
}),
file_system: Some(events::ReviewFileSystemPermissions {
read: Some(&read_paths),
write: Some(&write_paths),
}),
}),
justification: Some("inspect repository state"),
};
assert_eq!(
projected_reviewed_action(action),
json!({
"type": "shell",
"command": ["git", "status"],
"command_display": "git status",
"cwd": "/repo",
"sandbox_permissions": "with_additional_permissions",
"additional_permissions": {
"network": {
"enabled": true
},
"file_system": {
"read": ["/repo"],
"write": ["/repo/tmp"]
}
},
"justification": "inspect repository state"
})
);
}
#[test]
fn projects_remaining_reviewed_action_variants() {
let unified_command = vec!["cargo".to_string(), "test".to_string()];
assert_eq!(
projected_reviewed_action(events::ReviewedAction::UnifiedExec {
command: &unified_command,
command_display: "cargo test",
cwd: "/repo",
sandbox_permissions: events::ReviewSandboxPermissions::RequireEscalated,
additional_permissions: None,
justification: None,
tty: true,
}),
json!({
"type": "unified_exec",
"command": ["cargo", "test"],
"command_display": "cargo test",
"cwd": "/repo",
"sandbox_permissions": "require_escalated",
"additional_permissions": null,
"justification": null,
"tty": true
})
);
let argv = vec!["git".to_string(), "diff".to_string()];
assert_eq!(
projected_reviewed_action(events::ReviewedAction::ProcessExec {
source: events::ReviewCommandSource::UnifiedExec,
program: "git",
argv: &argv,
cwd: "/repo",
additional_permissions: None,
}),
json!({
"type": "execve",
"source": "unified_exec",
"program": "git",
"argv": ["git", "diff"],
"cwd": "/repo",
"additional_permissions": null
})
);
let files = vec!["src/lib.rs".to_string(), "Cargo.toml".to_string()];
assert_eq!(
projected_reviewed_action(events::ReviewedAction::ApplyPatch {
cwd: "/repo",
files: &files,
}),
json!({
"type": "apply_patch",
"cwd": "/repo",
"files": ["src/lib.rs", "Cargo.toml"]
})
);
assert_eq!(
projected_reviewed_action(events::ReviewedAction::NetworkAccess {
target: "https://example.com",
host: "example.com",
protocol: events::ReviewNetworkApprovalProtocol::Https,
port: 443,
}),
json!({
"type": "network_access",
"target": "https://example.com",
"host": "example.com",
"protocol": "https",
"port": 443
})
);
assert_eq!(
projected_reviewed_action(events::ReviewedAction::McpToolCall {
server: "drive",
tool_name: "search",
connector_id: Some("drive-connector"),
connector_name: Some("Drive"),
tool_title: Some("Search Drive"),
}),
json!({
"type": "mcp_tool_call",
"server": "drive",
"tool_name": "search",
"connector_id": "drive-connector",
"connector_name": "Drive",
"tool_title": "Search Drive"
})
);
}
}

View File

@@ -0,0 +1,7 @@
load("//:defs.bzl", "codex_rust_crate")
codex_rust_crate(
name = "observability-derive",
crate_name = "codex_observability_derive",
proc_macro = True,
)

View File

@@ -0,0 +1,16 @@
[package]
edition.workspace = true
license.workspace = true
name = "codex-observability-derive"
version.workspace = true
[lib]
proc-macro = true
[dependencies]
proc-macro2 = "1"
quote = "1"
syn = { version = "2", features = ["full", "extra-traits"] }
[lints]
workspace = true

View File

@@ -0,0 +1,267 @@
//! Derive macro implementation for `codex-observability`.
//!
//! The macro is intentionally small: it turns annotated struct fields into
//! `ObservationFieldVisitor` calls and refuses to compile if any exported field
//! is missing policy metadata. Filtering, serialization, redaction, and
//! destination-specific mapping stay in sinks/reducers.
use proc_macro::TokenStream;
use quote::format_ident;
use quote::quote;
use syn::Data;
use syn::DeriveInput;
use syn::Expr;
use syn::ExprLit;
use syn::Fields;
use syn::Lit;
use syn::LitStr;
use syn::Path;
use syn::parse_macro_input;
/// Derives `codex_observability::Observation` for a named struct.
///
/// Required attributes:
///
/// - `#[observation(name = "domain.event")]` on the struct.
/// - `#[obs(level = "basic|detailed|trace", class = "...")]` on every field.
/// - Optional struct-level or field-level uses markers for exact sink
/// selection. Field-level markers override the struct default.
///
/// Event definitions inside `codex-observability` itself may use
/// `#[observation(crate = "crate")]` so generated code refers to local types
/// instead of the externally visible crate path.
#[proc_macro_derive(Observation, attributes(observation, obs))]
pub fn derive_observation(input: TokenStream) -> TokenStream {
let input = parse_macro_input!(input as DeriveInput);
expand_observation(input)
.unwrap_or_else(syn::Error::into_compile_error)
.into()
}
fn expand_observation(input: DeriveInput) -> syn::Result<proc_macro2::TokenStream> {
let name = input.ident;
let observation_attr = observation_attr(&input.attrs)?;
let event_name = observation_attr.name;
let crate_path = observation_attr.crate_path;
let default_uses = observation_attr.uses;
let Data::Struct(data) = input.data else {
return Err(syn::Error::new_spanned(
name,
"Observation can only be derived for structs",
));
};
let Fields::Named(fields) = data.fields else {
return Err(syn::Error::new_spanned(
name,
"Observation requires a struct with named fields",
));
};
let mut visits = Vec::new();
for field in fields.named {
let Some(field_name) = field.ident else {
continue;
};
let meta = obs_meta(&field.attrs, &field_name, &crate_path, &default_uses)?;
let field_name_lit = LitStr::new(&field_name.to_string(), field_name.span());
visits.push(quote! {
visitor.field(#field_name_lit, #meta, &self.#field_name);
});
}
let (impl_generics, ty_generics, where_clause) = input.generics.split_for_impl();
Ok(quote! {
impl #impl_generics #crate_path::Observation for #name #ty_generics #where_clause {
const NAME: &'static str = #event_name;
fn visit_fields<V: #crate_path::ObservationFieldVisitor>(&self, visitor: &mut V) {
#(#visits)*
}
}
})
}
struct ObservationAttr {
name: LitStr,
crate_path: Path,
uses: Vec<LitStr>,
}
fn observation_attr(attrs: &[syn::Attribute]) -> syn::Result<ObservationAttr> {
for attr in attrs {
if !attr.path().is_ident("observation") {
continue;
}
let mut name = None;
let mut crate_path = None;
let mut uses = Vec::new();
attr.parse_nested_meta(|meta| {
if meta.path.is_ident("name") {
name = Some(meta.value()?.parse::<LitStr>()?);
Ok(())
} else if meta.path.is_ident("crate") {
let value = meta.value()?.parse::<LitStr>()?;
// The override is needed only when deriving observations from
// inside `codex-observability`, where the external crate name is
// not available. Ordinary users get the stable public path.
crate_path = Some(value.parse::<Path>()?);
Ok(())
} else if meta.path.is_ident("uses") {
uses = use_literals(meta.value()?.parse::<Expr>()?)?;
Ok(())
} else {
Err(meta.error("unsupported observation attribute"))
}
})?;
if let Some(name) = name {
return Ok(ObservationAttr {
name,
crate_path: crate_path.unwrap_or_else(|| syn::parse_quote!(::codex_observability)),
uses,
});
}
}
Err(syn::Error::new(
proc_macro2::Span::call_site(),
"missing #[observation(name = \"...\")]",
))
}
fn obs_meta(
attrs: &[syn::Attribute],
field_name: &syn::Ident,
crate_path: &Path,
default_uses: &[LitStr],
) -> syn::Result<proc_macro2::TokenStream> {
for attr in attrs {
if !attr.path().is_ident("obs") {
continue;
}
let mut level = None;
let mut class = None;
let mut uses = None;
attr.parse_nested_meta(|meta| {
if meta.path.is_ident("level") {
level = Some(meta.value()?.parse::<LitStr>()?);
Ok(())
} else if meta.path.is_ident("class") {
class = Some(meta.value()?.parse::<LitStr>()?);
Ok(())
} else if meta.path.is_ident("uses") {
uses = Some(use_literals(meta.value()?.parse::<Expr>()?)?);
Ok(())
} else {
Err(meta.error("unsupported obs attribute"))
}
})?;
let level = level.ok_or_else(|| {
syn::Error::new_spanned(attr, "missing obs level, expected level = \"...\"")
})?;
let class = class.ok_or_else(|| {
syn::Error::new_spanned(attr, "missing obs class, expected class = \"...\"")
})?;
let detail = detail_expr(&level, crate_path)?;
let data_class = data_class_expr(&class, crate_path)?;
let uses = uses
.as_deref()
.unwrap_or(default_uses)
.iter()
.map(|value| field_use_expr(value, crate_path))
.collect::<syn::Result<Vec<_>>>()?;
return Ok(quote! {
#crate_path::FieldMeta::with_uses(#detail, #data_class, &[#(#uses),*])
});
}
Err(syn::Error::new_spanned(
field_name,
"missing #[obs(level = \"...\", class = \"...\")]",
))
}
fn use_literals(expr: Expr) -> syn::Result<Vec<LitStr>> {
let Expr::Array(array) = expr else {
return Err(syn::Error::new_spanned(
expr,
"obs uses must be a string array, for example uses = [\"analytics\"]",
));
};
array
.elems
.into_iter()
.map(|elem| match elem {
Expr::Lit(ExprLit {
lit: Lit::Str(value),
..
}) => Ok(value),
other => Err(syn::Error::new_spanned(
other,
"obs uses entries must be string literals",
)),
})
.collect()
}
fn detail_expr(value: &LitStr, crate_path: &Path) -> syn::Result<proc_macro2::TokenStream> {
enum_expr(
value,
"detail level",
&[
("basic", "Basic"),
("detailed", "Detailed"),
("trace", "Trace"),
],
quote!(#crate_path::DetailLevel),
)
}
fn data_class_expr(value: &LitStr, crate_path: &Path) -> syn::Result<proc_macro2::TokenStream> {
enum_expr(
value,
"data class",
&[
("identifier", "Identifier"),
("operational", "Operational"),
("environment", "Environment"),
("content", "Content"),
("secret_risk", "SecretRisk"),
],
quote!(#crate_path::DataClass),
)
}
fn field_use_expr(value: &LitStr, crate_path: &Path) -> syn::Result<proc_macro2::TokenStream> {
enum_expr(
value,
"field use",
&[
("analytics", "Analytics"),
("otel", "Otel"),
("rollout_trace", "RolloutTrace"),
],
quote!(#crate_path::FieldUse),
)
}
fn enum_expr(
value: &LitStr,
label: &str,
variants: &[(&str, &str)],
path: proc_macro2::TokenStream,
) -> syn::Result<proc_macro2::TokenStream> {
let raw = value.value();
let Some((_, variant)) = variants.iter().find(|(name, _)| *name == raw) else {
let expected = variants
.iter()
.map(|(name, _)| format!("\"{name}\""))
.collect::<Vec<_>>()
.join(", ");
return Err(syn::Error::new_spanned(
value,
format!("invalid {label} {raw:?}, expected one of {expected}"),
));
};
let variant = format_ident!("{variant}");
let expr: Expr = syn::parse_quote!(#path::#variant);
Ok(quote!(#expr))
}

View File

@@ -0,0 +1,6 @@
load("//:defs.bzl", "codex_rust_crate")
codex_rust_crate(
name = "observability",
crate_name = "codex_observability",
)

View File

@@ -0,0 +1,21 @@
[package]
edition.workspace = true
license.workspace = true
name = "codex-observability"
version.workspace = true
[lib]
doctest = true
name = "codex_observability"
path = "src/lib.rs"
[dependencies]
codex-observability-derive = { workspace = true }
serde = { workspace = true, features = ["derive"] }
[dev-dependencies]
pretty_assertions = { workspace = true }
serde_json = { workspace = true }
[lints]
workspace = true

View File

@@ -0,0 +1,257 @@
//! Canonical Codex observation event definitions.
use crate::Observation;
use serde::Serialize;
use std::path::Path;
mod compaction;
mod review;
mod thread;
mod turn;
pub use compaction::*;
pub use review::*;
pub use thread::*;
pub use turn::*;
/// How an app/tool/plugin capability was selected by the user or system.
#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize)]
#[serde(rename_all = "lowercase")]
pub enum InvocationType {
/// The user explicitly mentioned or selected the capability.
Explicit,
/// Codex inferred that the capability should be used.
Implicit,
}
/// Scope where a skill definition was found.
#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize)]
#[serde(rename_all = "snake_case")]
pub enum SkillScope {
User,
Repo,
System,
Admin,
}
/// Status reported after a hook run reaches a terminal state.
#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize)]
#[serde(rename_all = "snake_case")]
pub enum HookRunStatus {
/// The hook completed successfully.
Completed,
/// The hook failed.
Failed,
/// The hook blocked the triggering action.
Blocked,
/// The hook stopped execution.
Stopped,
}
/// Plugin lifecycle state after a plugin management operation.
#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize)]
#[serde(rename_all = "snake_case")]
pub enum PluginState {
/// Plugin was installed.
Installed,
/// Plugin was uninstalled.
Uninstalled,
/// Plugin was enabled.
Enabled,
/// Plugin was disabled.
Disabled,
}
/// Observation emitted when a skill is invoked during a turn.
#[derive(Observation)]
#[observation(name = "skill.invoked", crate = "crate", uses = ["analytics"])]
pub struct SkillInvoked<'a> {
/// Model slug active for the turn where the skill was invoked.
#[obs(level = "basic", class = "operational")]
pub model_slug: &'a str,
/// Thread that owns the turn.
#[obs(level = "basic", class = "identifier")]
pub thread_id: &'a str,
/// Turn where the skill was invoked.
#[obs(level = "basic", class = "identifier")]
pub turn_id: &'a str,
/// Skill display name.
#[obs(level = "basic", class = "operational")]
pub skill_name: &'a str,
/// Scope where the skill was discovered.
#[obs(level = "basic", class = "operational")]
pub skill_scope: SkillScope,
/// Local skill definition path used to derive the existing analytics skill id.
#[obs(level = "basic", class = "environment")]
pub skill_path: &'a Path,
/// Whether the skill was explicitly requested or inferred.
#[obs(level = "basic", class = "operational")]
pub invocation_type: InvocationType,
}
/// Observation emitted when an app connector is mentioned during a turn.
#[derive(Observation)]
#[observation(name = "app.mentioned", crate = "crate", uses = ["analytics"])]
pub struct AppMentioned<'a> {
/// Model slug active for the turn where the app was mentioned.
#[obs(level = "basic", class = "operational")]
pub model_slug: &'a str,
/// Thread that owns the turn.
#[obs(level = "basic", class = "identifier")]
pub thread_id: &'a str,
/// Turn where the app was mentioned.
#[obs(level = "basic", class = "identifier")]
pub turn_id: &'a str,
/// Stable connector identifier when the app is backed by a connector.
#[obs(level = "basic", class = "identifier")]
pub connector_id: Option<&'a str>,
/// User-facing app name.
#[obs(level = "basic", class = "operational")]
pub app_name: Option<&'a str>,
/// Whether the mention was explicit or inferred.
#[obs(level = "basic", class = "operational")]
pub invocation_type: Option<InvocationType>,
}
/// Observation emitted when Codex uses an app connector during a turn.
#[derive(Observation)]
#[observation(name = "app.used", crate = "crate", uses = ["analytics"])]
pub struct AppUsed<'a> {
/// Model slug active for the turn where the app was used.
#[obs(level = "basic", class = "operational")]
pub model_slug: &'a str,
/// Thread that owns the turn.
#[obs(level = "basic", class = "identifier")]
pub thread_id: &'a str,
/// Turn where the app was used.
#[obs(level = "basic", class = "identifier")]
pub turn_id: &'a str,
/// Stable connector identifier when the app is backed by a connector.
#[obs(level = "basic", class = "identifier")]
pub connector_id: Option<&'a str>,
/// User-facing app name.
#[obs(level = "basic", class = "operational")]
pub app_name: Option<&'a str>,
/// Whether usage was explicit or implicit.
#[obs(level = "basic", class = "operational")]
pub invocation_type: Option<InvocationType>,
}
/// Observation emitted after a configured hook run completes.
#[derive(Observation)]
#[observation(name = "hook.run_completed", crate = "crate", uses = ["analytics"])]
pub struct HookRunCompleted<'a> {
/// Model slug active for the turn where the hook ran.
#[obs(level = "basic", class = "operational")]
pub model_slug: &'a str,
/// Thread that owns the turn.
#[obs(level = "basic", class = "identifier")]
pub thread_id: &'a str,
/// Turn where the hook ran.
#[obs(level = "basic", class = "identifier")]
pub turn_id: &'a str,
/// Hook lifecycle point, for example PostToolUse.
#[obs(level = "basic", class = "operational")]
pub hook_name: &'a str,
/// Source that configured the hook, for example user or project.
#[obs(level = "basic", class = "operational")]
pub hook_source: &'static str,
/// Final hook run status.
#[obs(level = "basic", class = "operational")]
pub status: HookRunStatus,
}
/// Observation emitted when a plugin capability is used during a turn.
#[derive(Observation)]
#[observation(name = "plugin.used", crate = "crate", uses = ["analytics"])]
pub struct PluginUsed<'a> {
/// Model slug active for the turn where the plugin was used.
#[obs(level = "basic", class = "operational")]
pub model_slug: &'a str,
/// Thread that owns the turn.
#[obs(level = "basic", class = "identifier")]
pub thread_id: &'a str,
/// Turn where the plugin was used.
#[obs(level = "basic", class = "identifier")]
pub turn_id: &'a str,
/// Stable plugin identifier.
#[obs(level = "basic", class = "identifier")]
pub plugin_id: &'a str,
/// Plugin name component.
#[obs(level = "basic", class = "operational")]
pub plugin_name: &'a str,
/// Marketplace or namespace component.
#[obs(level = "basic", class = "operational")]
pub marketplace_name: &'a str,
/// Whether the plugin exposes skills.
#[obs(level = "basic", class = "operational")]
pub has_skills: Option<bool>,
/// Number of MCP servers exposed by the plugin.
#[obs(level = "basic", class = "operational")]
pub mcp_server_count: Option<usize>,
/// Connector identifiers exposed by the plugin.
#[obs(level = "basic", class = "identifier")]
pub connector_ids: Option<&'a [String]>,
}
/// Observation emitted after a plugin lifecycle state changes.
#[derive(Observation)]
#[observation(name = "plugin.state_changed", crate = "crate", uses = ["analytics"])]
pub struct PluginStateChanged<'a> {
/// Stable plugin identifier.
#[obs(level = "basic", class = "identifier")]
pub plugin_id: &'a str,
/// Plugin name component.
#[obs(level = "basic", class = "operational")]
pub plugin_name: &'a str,
/// Marketplace or namespace component.
#[obs(level = "basic", class = "operational")]
pub marketplace_name: &'a str,
/// Whether the plugin exposes skills.
#[obs(level = "basic", class = "operational")]
pub has_skills: Option<bool>,
/// Number of MCP servers exposed by the plugin.
#[obs(level = "basic", class = "operational")]
pub mcp_server_count: Option<usize>,
/// Connector identifiers exposed by the plugin.
#[obs(level = "basic", class = "identifier")]
pub connector_ids: Option<&'a [String]>,
/// New plugin lifecycle state.
#[obs(level = "basic", class = "operational")]
pub state: PluginState,
}

View File

@@ -0,0 +1,105 @@
//! Compaction observation event definitions.
use crate::Observation;
use serde::Serialize;
/// What initiated the compaction work.
#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize)]
#[serde(rename_all = "snake_case")]
pub enum CompactionTrigger {
Manual,
Auto,
}
/// Why the runtime chose to compact context.
#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize)]
#[serde(rename_all = "snake_case")]
pub enum CompactionReason {
UserRequested,
ContextLimit,
ModelDownshift,
}
/// Runtime implementation used for the compaction.
#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize)]
#[serde(rename_all = "snake_case")]
pub enum CompactionImplementation {
Responses,
ResponsesCompact,
}
/// Point in the turn lifecycle where compaction ran.
#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize)]
#[serde(rename_all = "snake_case")]
pub enum CompactionPhase {
StandaloneTurn,
PreTurn,
MidTurn,
}
/// Strategy used to build the compacted context.
#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize)]
#[serde(rename_all = "snake_case")]
pub enum CompactionStrategy {
Memento,
PrefixCompaction,
}
/// Terminal status of a compaction run.
#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize)]
#[serde(rename_all = "snake_case")]
pub enum CompactionStatus<'a> {
Completed,
Failed {
/// Bounded failure detail intended for remote export.
error: Option<&'a str>,
},
Interrupted,
}
/// Observation emitted when a compaction run reaches a terminal state.
#[derive(Observation)]
#[observation(name = "compaction.ended", crate = "crate", uses = ["analytics"])]
pub struct CompactionEnded<'a> {
#[obs(level = "basic", class = "identifier")]
pub thread_id: &'a str,
#[obs(level = "basic", class = "identifier")]
pub turn_id: &'a str,
#[obs(level = "basic", class = "operational")]
pub trigger: CompactionTrigger,
#[obs(level = "basic", class = "operational")]
pub reason: CompactionReason,
#[obs(level = "basic", class = "operational")]
pub implementation: CompactionImplementation,
#[obs(level = "basic", class = "operational")]
pub phase: CompactionPhase,
#[obs(level = "basic", class = "operational")]
pub strategy: CompactionStrategy,
#[obs(level = "basic", class = "operational")]
pub status: CompactionStatus<'a>,
#[obs(level = "basic", class = "operational")]
pub active_context_tokens_before: i64,
#[obs(level = "basic", class = "operational")]
pub active_context_tokens_after: i64,
/// Unix timestamp in seconds when compaction work began.
#[obs(level = "basic", class = "operational")]
pub started_at: i64,
/// Unix timestamp in seconds when compaction reached its terminal state.
#[obs(level = "basic", class = "operational")]
pub ended_at: i64,
/// Absent when the callsite cannot measure elapsed wall time.
#[obs(level = "basic", class = "operational")]
pub duration_ms: Option<i64>,
}

View File

@@ -0,0 +1,272 @@
//! Review observation event definitions.
use crate::Observation;
use serde::Serialize;
/// Final decision returned by a review.
#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize)]
#[serde(rename_all = "snake_case")]
pub enum ReviewDecision {
Approved,
Denied,
Aborted,
}
/// Terminal state of a review.
#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize)]
#[serde(rename_all = "snake_case")]
pub enum ReviewTerminalStatus {
Approved,
Denied,
Aborted {
failure_reason: Option<ReviewFailureReason>,
},
TimedOut {
failure_reason: Option<ReviewFailureReason>,
},
FailedClosed {
failure_reason: Option<ReviewFailureReason>,
},
}
/// Stable failure category for review terminals.
#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize)]
#[serde(rename_all = "snake_case")]
pub enum ReviewFailureReason {
Timeout,
Cancelled,
PromptBuildError,
SessionError,
ParseError,
}
/// Source of the request sent for review.
#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize)]
#[serde(rename_all = "snake_case")]
pub enum ReviewRequestSource {
MainTurn,
DelegatedSubagent,
}
/// Per-command sandbox override requested by the reviewed action.
#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize)]
#[serde(rename_all = "snake_case")]
pub enum ReviewSandboxPermissions {
UseDefault,
RequireEscalated,
WithAdditionalPermissions,
}
/// Additional filesystem permissions requested for a reviewed action.
#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize)]
pub struct ReviewFileSystemPermissions<'a> {
pub read: Option<&'a [String]>,
pub write: Option<&'a [String]>,
}
/// Additional network permissions requested for a reviewed action.
#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize)]
pub struct ReviewNetworkPermissions {
pub enabled: Option<bool>,
}
/// Additional permissions requested for a reviewed action.
#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize)]
pub struct ReviewPermissionProfile<'a> {
pub network: Option<ReviewNetworkPermissions>,
pub file_system: Option<ReviewFileSystemPermissions<'a>>,
}
/// Source tool that produced an exec-style reviewed action.
#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize)]
#[serde(rename_all = "snake_case")]
pub enum ReviewCommandSource {
Shell,
UnifiedExec,
}
/// Network protocol involved in a reviewed network access request.
#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize)]
#[serde(rename_all = "snake_case")]
pub enum ReviewNetworkApprovalProtocol {
Http,
Https,
Socks5Tcp,
Socks5Udp,
}
/// Action that was evaluated by a review.
#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum ReviewedAction<'a> {
Shell {
command: &'a [String],
command_display: &'a str,
cwd: &'a str,
sandbox_permissions: ReviewSandboxPermissions,
additional_permissions: Option<ReviewPermissionProfile<'a>>,
justification: Option<&'a str>,
},
UnifiedExec {
command: &'a [String],
command_display: &'a str,
cwd: &'a str,
sandbox_permissions: ReviewSandboxPermissions,
additional_permissions: Option<ReviewPermissionProfile<'a>>,
justification: Option<&'a str>,
tty: bool,
},
ProcessExec {
source: ReviewCommandSource,
program: &'a str,
argv: &'a [String],
cwd: &'a str,
additional_permissions: Option<ReviewPermissionProfile<'a>>,
},
ApplyPatch {
cwd: &'a str,
files: &'a [String],
},
NetworkAccess {
target: &'a str,
host: &'a str,
protocol: ReviewNetworkApprovalProtocol,
port: u16,
},
McpToolCall {
server: &'a str,
tool_name: &'a str,
connector_id: Option<&'a str>,
connector_name: Option<&'a str>,
tool_title: Option<&'a str>,
},
}
/// Risk level assigned by an automated reviewer.
#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize)]
#[serde(rename_all = "snake_case")]
pub enum ReviewRiskLevel {
Low,
Medium,
High,
Critical,
}
/// User authorization level observed by an automated reviewer.
#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize)]
#[serde(rename_all = "snake_case")]
pub enum ReviewUserAuthorization {
Unknown,
Low,
Medium,
High,
}
/// Policy outcome recommended by an automated reviewer.
#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize)]
#[serde(rename_all = "snake_case")]
pub enum ReviewOutcome {
Allow,
Deny,
}
/// How guardian review obtained a model session.
#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize)]
#[serde(rename_all = "snake_case")]
pub enum GuardianReviewSessionKind {
TrunkNew,
TrunkReused,
EphemeralForked,
}
/// Guardian model session used to perform a review.
#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize)]
pub struct GuardianReviewSession<'a> {
pub guardian_thread_id: &'a str,
pub session_kind: GuardianReviewSessionKind,
pub model: &'a str,
/// Absent when the selected model/provider has no explicit effort setting.
pub reasoning_effort: Option<&'a str>,
pub had_prior_review_context: bool,
}
/// Response produced by a user reviewer.
#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize)]
pub struct UserReviewResponse {
pub decision: ReviewDecision,
}
/// Response produced by guardian review.
#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize)]
pub struct GuardianReviewResponse<'a> {
pub decision: ReviewDecision,
/// Terminal runtime state; non-success states carry their stable failure category.
pub terminal_status: ReviewTerminalStatus,
/// Absent when review stops before the model reports a risk classification.
pub risk_level: Option<ReviewRiskLevel>,
/// Absent when review stops before user authorization is assessed.
pub user_authorization: Option<ReviewUserAuthorization>,
/// Absent when review stops before a policy outcome is produced.
pub outcome: Option<ReviewOutcome>,
/// Model-authored rationale text returned by guardian review.
pub rationale: Option<&'a str>,
/// Absent when review fails before a guardian model session is created or reused.
pub session: Option<GuardianReviewSession<'a>>,
pub review_timeout_ms: u64,
pub tool_call_count: u64,
/// Absent when the guardian session did not stream model tokens.
pub time_to_first_token_ms: Option<u64>,
/// Absent when review ended before a model completion was received.
pub completion_latency_ms: Option<u64>,
/// Absent when the guardian model did not report token accounting.
pub token_usage: Option<super::TurnTokenUsage>,
}
/// Reviewer response that completed a review.
#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize)]
#[serde(tag = "reviewer", rename_all = "snake_case")]
pub enum ReviewResponse<'a> {
User(UserReviewResponse),
Guardian(GuardianReviewResponse<'a>),
}
/// Observation emitted when review of an action reaches a terminal state.
#[derive(Observation)]
#[observation(name = "review.completed", crate = "crate", uses = ["analytics"])]
pub struct ReviewCompleted<'a> {
#[obs(level = "basic", class = "identifier")]
pub thread_id: &'a str,
#[obs(level = "basic", class = "identifier")]
pub turn_id: &'a str,
#[obs(level = "basic", class = "identifier")]
pub review_id: &'a str,
#[obs(level = "basic", class = "identifier")]
pub target_item_id: &'a str,
/// Absent on the first review attempt; present when this review retries an earlier attempt.
#[obs(level = "basic", class = "operational")]
pub retry_reason: Option<&'a str>,
#[obs(level = "basic", class = "operational")]
pub request_source: ReviewRequestSource,
/// The reviewed action may contain command text, paths, or tool names.
#[obs(level = "basic", class = "content")]
pub reviewed_action: ReviewedAction<'a>,
#[obs(level = "basic", class = "operational")]
pub reviewed_action_truncated: bool,
/// Contains the reviewer-specific terminal result.
#[obs(level = "basic", class = "content")]
pub response: ReviewResponse<'a>,
#[obs(level = "basic", class = "operational")]
pub started_at: i64,
#[obs(level = "basic", class = "operational")]
pub ended_at: i64,
}

View File

@@ -0,0 +1,89 @@
//! Thread lifecycle observation event definitions.
use crate::Observation;
use serde::Serialize;
/// How a thread became active in the runtime.
///
/// This describes the open operation, not the long-term origin of the thread.
/// A resumed thread already existed; it still becomes active again for the
/// runtime or client connection handling the resume.
#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize)]
#[serde(rename_all = "snake_case")]
pub enum ThreadInitializationMode {
New,
Forked,
Resumed,
}
/// Subagent work that caused a thread to start.
#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize)]
#[serde(rename_all = "snake_case")]
pub enum ThreadSubagentKind<'a> {
Review,
Compact,
ThreadSpawn,
MemoryConsolidation,
Other(&'a str),
}
/// Origin of the request that made a thread active.
///
/// Keep this separate from `ThreadInitializationMode`: source answers who or
/// what opened the thread, while initialization mode answers whether the
/// thread was new, forked, or resumed.
#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize)]
#[serde(rename_all = "snake_case")]
pub enum ThreadSource<'a> {
User,
AppServer,
Custom(&'a str),
Subagent(ThreadSubagentKind<'a>),
Unknown,
}
/// Observation emitted when Codex starts tracking a thread.
///
/// "Started" means the thread became active for this runtime or client
/// connection. It does not imply the thread was newly created; see
/// `initialization_mode` for new, forked, and resumed activations.
#[derive(Observation)]
#[observation(name = "thread.started", crate = "crate", uses = ["analytics"])]
pub struct ThreadStarted<'a> {
/// Thread that became active.
#[obs(level = "basic", class = "identifier")]
pub thread_id: &'a str,
#[obs(level = "basic", class = "operational")]
pub source: ThreadSource<'a>,
/// Parent thread that created this thread, when the source represents subagent work.
///
/// This stays top-level instead of being nested inside the source enum so
/// sinks can apply identifier policy directly to the field.
#[obs(level = "basic", class = "identifier")]
pub parent_thread_id: Option<&'a str>,
#[obs(level = "basic", class = "operational")]
pub initialization_mode: ThreadInitializationMode,
/// Model associated with the thread at activation time.
///
/// Turn configuration also records a model because turns may later run with
/// overrides or migrated settings. This field is thread lifecycle metadata.
#[obs(level = "basic", class = "operational")]
pub model: &'a str,
/// Whether the thread is persisted beyond the active runtime session.
///
/// Turn configuration repeats this for legacy analytics compatibility, but
/// the stable owner of the value is the thread lifecycle.
#[obs(level = "basic", class = "operational")]
pub ephemeral: bool,
/// Unix timestamp in seconds when the thread was originally created.
///
/// For resumed threads this is historical creation time, not resume time.
#[obs(level = "basic", class = "operational")]
pub created_at: i64,
}

View File

@@ -0,0 +1,225 @@
//! Turn lifecycle observation event definitions.
use crate::Observation;
use serde::Serialize;
/// Terminal turn status after Codex stops working on a turn.
#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize)]
#[serde(rename_all = "snake_case")]
pub enum TurnStatus {
Completed,
Failed,
Interrupted,
}
/// Result of trying to steer an active turn.
#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize)]
#[serde(rename_all = "snake_case")]
pub enum TurnSteerResult {
Accepted,
Rejected,
}
/// Stable reason why a turn steering request was rejected.
#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize)]
#[serde(rename_all = "snake_case")]
pub enum TurnSteerRejectionReason {
NoActiveTurn,
ExpectedTurnMismatch,
NonSteerableReview,
NonSteerableCompact,
EmptyInput,
InputTooLarge,
}
/// How a turn was submitted for execution.
#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize)]
#[serde(rename_all = "snake_case")]
pub enum TurnSubmissionType {
Default,
Queued,
}
/// Filesystem sandbox mode resolved for a turn.
#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize)]
#[serde(rename_all = "snake_case")]
pub enum SandboxMode {
FullAccess,
ReadOnly,
WorkspaceWrite,
ExternalSandbox,
}
/// Approval policy resolved for a turn.
#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize)]
#[serde(rename_all = "snake_case")]
pub enum ApprovalPolicy {
/// Legacy name for the policy that asks unless an action is trusted.
Untrusted,
OnFailure,
OnRequest,
Granular,
Never,
}
/// Destination that reviews approval requests for a turn.
#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize)]
#[serde(rename_all = "snake_case")]
pub enum ApprovalReviewer {
User,
GuardianSubagent,
}
/// Collaboration mode resolved for a turn.
#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize)]
#[serde(rename_all = "snake_case")]
pub enum CollaborationMode {
Default,
Plan,
}
/// Reasoning effort resolved for a turn.
#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize)]
#[serde(rename_all = "lowercase")]
pub enum ReasoningEffort {
None,
Minimal,
Low,
Medium,
High,
XHigh,
}
/// Reasoning summary mode resolved for a turn.
#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize)]
#[serde(rename_all = "lowercase")]
pub enum ReasoningSummary {
Auto,
Concise,
Detailed,
None,
}
/// Service tier resolved for a turn.
#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize)]
#[serde(rename_all = "lowercase")]
pub enum ServiceTier {
Fast,
Flex,
}
/// Response personality resolved for a turn.
#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize)]
#[serde(rename_all = "lowercase")]
pub enum Personality {
None,
Friendly,
Pragmatic,
}
/// Configuration resolved before a turn starts executing.
#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize)]
pub struct TurnConfig<'a> {
pub num_input_images: usize,
/// Absent when the caller cannot distinguish default from queued submission.
pub submission_type: Option<TurnSubmissionType>,
pub ephemeral: bool,
pub model: &'a str,
pub model_provider: &'a str,
pub sandbox_mode: SandboxMode,
/// Kept separate from sandbox mode because analytics reports network
/// capability as an independent resolved setting.
pub sandbox_network_access: bool,
/// Absent when the selected model/provider has no explicit effort setting.
pub reasoning_effort: Option<ReasoningEffort>,
/// None means no summary setting was resolved; Some(None) means summaries
/// were explicitly disabled.
pub reasoning_summary: Option<ReasoningSummary>,
pub service_tier: Option<ServiceTier>,
pub approval_policy: ApprovalPolicy,
pub approval_reviewer: ApprovalReviewer,
pub collaboration_mode: CollaborationMode,
/// Absent when no personality setting was resolved.
pub personality: Option<Personality>,
pub is_first_turn: bool,
}
/// Observation emitted when execution of a turn starts.
#[derive(Observation)]
#[observation(name = "turn.started", crate = "crate", uses = ["analytics"])]
pub struct TurnStarted<'a> {
#[obs(level = "basic", class = "identifier")]
pub thread_id: &'a str,
#[obs(level = "basic", class = "identifier")]
pub turn_id: &'a str,
#[obs(level = "basic", class = "operational")]
pub config: TurnConfig<'a>,
/// Unix timestamp in seconds when the turn started.
#[obs(level = "basic", class = "operational")]
pub started_at: i64,
}
/// Observation emitted when the outcome of a same-turn steering request is known.
#[derive(Observation)]
#[observation(name = "turn.steer", crate = "crate", uses = ["analytics"])]
pub struct TurnSteer<'a> {
#[obs(level = "basic", class = "identifier")]
pub thread_id: &'a str,
#[obs(level = "basic", class = "identifier")]
pub expected_turn_id: &'a str,
#[obs(level = "basic", class = "identifier")]
pub accepted_turn_id: Option<&'a str>,
#[obs(level = "basic", class = "operational")]
pub num_input_images: usize,
#[obs(level = "basic", class = "operational")]
pub result: TurnSteerResult,
#[obs(level = "basic", class = "operational")]
pub rejection_reason: Option<TurnSteerRejectionReason>,
/// Unix timestamp in seconds when the steering request was made.
#[obs(level = "basic", class = "operational")]
pub created_at: i64,
}
/// Token accounting reported for a completed turn.
#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize)]
pub struct TurnTokenUsage {
pub input_tokens: i64,
pub cached_input_tokens: i64,
pub output_tokens: i64,
pub reasoning_output_tokens: i64,
pub total_tokens: i64,
}
/// Observation emitted when a turn reaches a terminal state.
#[derive(Observation)]
#[observation(name = "turn.ended", crate = "crate", uses = ["analytics"])]
pub struct TurnEnded<'a> {
#[obs(level = "basic", class = "identifier")]
pub thread_id: &'a str,
#[obs(level = "basic", class = "identifier")]
pub turn_id: &'a str,
#[obs(level = "basic", class = "operational")]
pub status: TurnStatus,
/// Absent when a turn ends before provider usage is available.
#[obs(level = "basic", class = "operational")]
pub token_usage: Option<TurnTokenUsage>,
/// Unix timestamp in seconds when the turn ended.
#[obs(level = "basic", class = "operational")]
pub ended_at: i64,
#[obs(level = "basic", class = "operational")]
pub duration_ms: i64,
}

View File

@@ -0,0 +1,277 @@
//! Shared semantic observation API for Codex runtime facts.
//!
//! Observations describe facts that occurred in Codex. Destination-specific
//! systems such as analytics, rollout trace, OTEL events, and OTEL metrics
//! consume observations through sinks and reducers.
//!
//! Field metadata is required because sinks must decide whether they may read a
//! field before serializing or exporting it. Missing field annotations are a
//! compile-time error:
//!
//! ```compile_fail
//! use codex_observability::Observation;
//!
//! #[derive(Observation)]
//! #[observation(name = "example.missing_field_meta")]
//! struct MissingFieldMeta {
//! field: &'static str,
//! }
//! ```
//!
//! Observation names are also required:
//!
//! ```compile_fail
//! use codex_observability::Observation;
//!
//! #[derive(Observation)]
//! struct MissingObservationName {
//! #[obs(level = "basic", class = "operational")]
//! field: &'static str,
//! }
//! ```
pub mod events;
pub use codex_observability_derive::Observation;
use serde::Serialize;
/// A runtime fact emitted by Codex.
///
/// Implementations visit every exported field together with its field metadata.
/// Sinks use that metadata to apply destination-specific policy before
/// serialization, storage, or export.
pub trait Observation {
/// Stable semantic event name, for example `turn.started`.
const NAME: &'static str;
/// Visits the fields that make up this observation.
fn visit_fields<V: ObservationFieldVisitor>(&self, visitor: &mut V);
}
/// Receives observation fields after policy metadata has been attached.
///
/// Implementations should inspect `meta` before serializing `value`. This keeps
/// remote sinks from accidentally materializing local-only content fields.
pub trait ObservationFieldVisitor {
/// Visits one field from an observation.
fn field<T: Serialize + ?Sized>(&mut self, name: &'static str, meta: FieldMeta, value: &T);
}
/// Consumes observations.
///
/// A sink may serialize immediately, reduce into another event shape, or fan
/// out to additional sinks. The trait is generic so callers can pass borrowed
/// typed observations without allocating an intermediate event object.
pub trait ObservationSink {
/// Observes a single typed event.
fn observe<E: Observation>(&self, event: &E);
}
/// Visits fields intended for one sink and allowed by the supplied policy.
///
/// This helper is the safe path for sinks that serialize fields in their
/// visitor. The explicit field-use marker selects the intended fields; the
/// policy gate then rejects unsafe metadata before the wrapped visitor can
/// materialize the value.
pub fn visit_fields_for_use<E, V>(
event: &E,
field_use: FieldUse,
policy: FieldPolicy,
visitor: &mut V,
) where
E: Observation,
V: ObservationFieldVisitor,
{
let mut visitor = PolicyFilteredVisitor {
field_use,
policy,
inner: visitor,
};
event.visit_fields(&mut visitor);
}
struct PolicyFilteredVisitor<'a, V> {
field_use: FieldUse,
policy: FieldPolicy,
inner: &'a mut V,
}
impl<V> ObservationFieldVisitor for PolicyFilteredVisitor<'_, V>
where
V: ObservationFieldVisitor,
{
fn field<T: Serialize + ?Sized>(&mut self, name: &'static str, meta: FieldMeta, value: &T) {
if meta.is_used_by(self.field_use) && self.policy.allows(meta) {
self.inner.field(name, meta, value);
}
}
}
/// Policy metadata attached to a single observation field.
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub struct FieldMeta {
/// How much detail a sink must be allowed to read before consuming the field.
pub detail: DetailLevel,
/// Semantic/privacy class for the field.
pub class: DataClass,
/// Exact sinks or projections that are intended to consume the field.
pub uses: &'static [FieldUse],
}
impl FieldMeta {
/// Creates metadata for a field that is not consumed by any exact sink.
pub const fn new(detail: DetailLevel, class: DataClass) -> Self {
Self {
detail,
class,
uses: &[],
}
}
/// Creates metadata for a field with explicit sink-use markers.
pub const fn with_uses(
detail: DetailLevel,
class: DataClass,
uses: &'static [FieldUse],
) -> Self {
Self {
detail,
class,
uses,
}
}
/// Returns true when the field was explicitly marked for `field_use`.
pub fn is_used_by(self, field_use: FieldUse) -> bool {
self.uses.contains(&field_use)
}
}
/// Decides whether a sink may read an observation field.
///
/// Policies are checked before serialization. This matters because denied
/// fields may contain content, secrets, or large trace payloads that remote
/// sinks must not materialize even transiently.
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub struct FieldPolicy {
max_detail: DetailLevel,
allowed_classes: &'static [DataClass],
}
impl FieldPolicy {
/// Creates a policy that permits fields at or below the configured detail
/// limit and whose data class is present in the allowed class list.
pub const fn new(max_detail: DetailLevel, allowed_classes: &'static [DataClass]) -> Self {
Self {
max_detail,
allowed_classes,
}
}
/// Returns true when a sink may inspect and serialize a field.
pub fn allows(self, meta: FieldMeta) -> bool {
let detail_allowed = match self.max_detail {
DetailLevel::Basic => matches!(meta.detail, DetailLevel::Basic),
DetailLevel::Detailed => {
matches!(meta.detail, DetailLevel::Basic | DetailLevel::Detailed)
}
DetailLevel::Trace => true,
};
detail_allowed && self.allowed_classes.contains(&meta.class)
}
}
/// Coarse detail level for a field.
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub enum DetailLevel {
/// Lifecycle, identifiers, status, counts, model/config, and timing.
Basic,
/// Bounded previews and richer runtime summaries.
Detailed,
/// Raw or near-raw diagnostic evidence intended for local traces.
Trace,
}
/// Semantic/privacy class for a field.
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub enum DataClass {
/// Thread IDs, turn IDs, call IDs, and similar correlation identifiers.
Identifier,
/// Status, duration, model, provider, token counts, and tool kind.
Operational,
/// Working directory, repository, OS/runtime, or client metadata.
Environment,
/// User text, assistant text, command text, tool output, or model payloads.
Content,
/// Headers, environment values, auth-like payloads, or raw request blobs.
SecretRisk,
}
/// Exact sink or projection that is intended to consume a field.
///
/// This marker is separate from `DetailLevel` and `DataClass`: it expresses
/// intent, while detail/class remain guardrails that a sink policy enforces
/// before it serializes or exports the selected field.
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub enum FieldUse {
/// Remote product analytics.
Analytics,
/// OpenTelemetry events, logs, or metrics.
Otel,
/// Local rollout trace bundles.
RolloutTrace,
}
#[cfg(test)]
mod tests {
use super::*;
use pretty_assertions::assert_eq;
#[test]
fn field_meta_preserves_detail_and_class() {
assert_eq!(
FieldMeta::new(DetailLevel::Trace, DataClass::Content),
FieldMeta {
detail: DetailLevel::Trace,
class: DataClass::Content,
uses: &[],
}
);
}
#[test]
fn field_policy_requires_allowed_detail_and_class() {
let policy = FieldPolicy::new(
DetailLevel::Basic,
&[DataClass::Identifier, DataClass::Operational],
);
let cases = [
(
FieldMeta::new(DetailLevel::Basic, DataClass::Identifier),
true,
),
(
FieldMeta::new(DetailLevel::Basic, DataClass::Operational),
true,
),
(
FieldMeta::new(DetailLevel::Detailed, DataClass::Operational),
false,
),
(
FieldMeta::new(DetailLevel::Basic, DataClass::Content),
false,
),
(
FieldMeta::new(DetailLevel::Basic, DataClass::SecretRisk),
false,
),
];
assert_eq!(
cases.map(|(meta, _expected)| policy.allows(meta)),
cases.map(|(_meta, expected)| expected)
);
}
}

View File

@@ -0,0 +1,161 @@
use codex_observability::DataClass;
use codex_observability::DetailLevel;
use codex_observability::FieldMeta;
use codex_observability::FieldPolicy;
use codex_observability::FieldUse;
use codex_observability::Observation;
use codex_observability::ObservationFieldVisitor;
use codex_observability::visit_fields_for_use;
use pretty_assertions::assert_eq;
use serde::Serialize;
use serde::Serializer;
use serde_json::Value;
#[derive(Observation)]
#[observation(name = "turn.config_resolved")]
struct TurnConfigResolved<'a> {
#[obs(level = "basic", class = "identifier")]
thread_id: &'a str,
#[obs(level = "basic", class = "identifier")]
turn_id: &'a str,
#[obs(level = "basic", class = "operational")]
model: &'a str,
}
#[derive(Observation)]
#[observation(name = "test.policy_filtered", uses = ["analytics"])]
struct PolicyFiltered<'a> {
#[obs(level = "basic", class = "identifier")]
thread_id: &'a str,
#[obs(level = "basic", class = "operational")]
status: &'a str,
#[obs(level = "trace", class = "content")]
raw_prompt: PanicsIfSerialized,
#[obs(level = "basic", class = "secret_risk")]
api_key: PanicsIfSerialized,
#[obs(level = "basic", class = "operational", uses = ["rollout_trace"])]
rollout_only_status: PanicsIfSerialized,
}
struct PanicsIfSerialized;
impl Serialize for PanicsIfSerialized {
fn serialize<S>(&self, _serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
panic!("denied observation field should not be serialized")
}
}
#[derive(Debug, PartialEq)]
struct CapturedField {
name: &'static str,
meta: FieldMeta,
value: Value,
}
#[derive(Default)]
struct CapturingVisitor {
fields: Vec<CapturedField>,
}
impl ObservationFieldVisitor for CapturingVisitor {
fn field<T: serde::Serialize + ?Sized>(
&mut self,
name: &'static str,
meta: FieldMeta,
value: &T,
) {
let value = match serde_json::to_value(value) {
Ok(value) => value,
Err(err) => panic!("field should serialize: {err}"),
};
self.fields.push(CapturedField { name, meta, value });
}
}
#[test]
fn derive_visits_annotated_fields_with_metadata() {
let event = TurnConfigResolved {
thread_id: "thread-1",
turn_id: "turn-1",
model: "gpt-5.4",
};
let mut visitor = CapturingVisitor::default();
event.visit_fields(&mut visitor);
assert_eq!(TurnConfigResolved::NAME, "turn.config_resolved");
assert_eq!(
visitor.fields,
vec![
CapturedField {
name: "thread_id",
meta: FieldMeta::new(DetailLevel::Basic, DataClass::Identifier),
value: Value::String("thread-1".to_string()),
},
CapturedField {
name: "turn_id",
meta: FieldMeta::new(DetailLevel::Basic, DataClass::Identifier),
value: Value::String("turn-1".to_string()),
},
CapturedField {
name: "model",
meta: FieldMeta::new(DetailLevel::Basic, DataClass::Operational),
value: Value::String("gpt-5.4".to_string()),
},
]
);
}
#[test]
fn use_and_policy_filter_before_serializing_denied_fields() {
let event = PolicyFiltered {
thread_id: "thread-1",
status: "completed",
raw_prompt: PanicsIfSerialized,
api_key: PanicsIfSerialized,
rollout_only_status: PanicsIfSerialized,
};
let mut visitor = CapturingVisitor::default();
visit_fields_for_use(
&event,
FieldUse::Analytics,
FieldPolicy::new(
DetailLevel::Basic,
&[DataClass::Identifier, DataClass::Operational],
),
&mut visitor,
);
assert_eq!(
visitor.fields,
vec![
CapturedField {
name: "thread_id",
meta: FieldMeta::with_uses(
DetailLevel::Basic,
DataClass::Identifier,
&[FieldUse::Analytics],
),
value: Value::String("thread-1".to_string()),
},
CapturedField {
name: "status",
meta: FieldMeta::with_uses(
DetailLevel::Basic,
DataClass::Operational,
&[FieldUse::Analytics],
),
value: Value::String("completed".to_string()),
},
]
);
}

View File

@@ -0,0 +1,458 @@
# Observability Event Stream
Codex currently has three overlapping observability systems:
- `codex-rs/analytics` reduces runtime facts into remote product analytics.
- `codex-rs/rollout-trace` records rich local diagnostic traces and reduces
them into a replay/debug graph.
- `codex-rs/otel` emits logs, trace-safe events, and metrics, while native
`tracing` spans carry execution scope and W3C trace context.
The overlap is real: thread lifecycle, turns, model calls, tools, compaction,
subagents, transport, and feature usage are described more than once. Directly
merging the crates would couple incompatible outputs. The shared layer should
instead define the facts Codex emits; each destination should decide how those
facts are filtered, reduced, stored, or exported.
## Decision
Introduce a shared observation stream for semantic facts.
```text
Observations own semantic facts and metrics derived from facts.
tracing owns spans and trace context propagation.
OTEL is a consumer of observations, not a second event taxonomy.
```
This gives one rule for callers:
- Use observations when a thing happened.
- Use sinks/reducers for analytics, rollout trace, OTEL events, OTEL metrics,
local buffers, and feedback upload.
- Use `tracing::span!` when code needs active execution scope, parent/child
causality, span mutation, or W3C context propagation.
The target state should not be a long-lived mix where some facts use
observations and other facts call analytics or OTEL directly. Direct OTEL code
should remain for exporter setup, native spans, context propagation, and
low-level tracing plumbing.
## Goals
- Define canonical observation events that describe what Codex did, not where
the data is going.
- Keep analytics, rollout trace, OTEL logs, and OTEL metrics as projections.
- Apply filtering at field level so a single observation can contain both safe
aggregate fields and richer local-only evidence.
- Preserve existing analytics and OTEL output behavior while moving callsites
toward the shared stream.
- Support local rich retention for feedback/debug upload with explicit policy.
- Prove analytics equivalence with side-by-side conformance tests.
## Non-Goals
- Do not make analytics facts the source of truth for rollout trace.
- Do not make rollout trace raw events the source of truth for analytics.
- Do not encode downstream event names such as `codex_turn_event` or
`codex.tool_result` into the canonical taxonomy.
- Do not replace native spans with `SpanStarted` / `SpanEnded` observations.
- Do not require full rollout trace graph equivalence before the first version;
rollout trace is experimental and can use lighter validation initially.
## Shape
```text
codex-core / app-server
emit observations
codex-observability
observation traits, field metadata, sink policy, event definitions
codex-observability-derive
derive macro for field metadata and visitors
sinks/reducers
analytics projection -> TrackEventRequest
rollout trace projection -> local trace bundle + reduced graph
OTEL event projection -> current log and trace-safe event names
OTEL metric projection -> current counters and histograms
ringbuffer -> recent rich local observations
native tracing spans
remain direct tracing instrumentation
```
The derive crate exists because Rust does not expose struct field attributes at
runtime. The public API can still re-export the derive from
`codex-observability` so most callsites only import one crate.
Typed observation events are internal source types, not exported wire schemas.
Their compatibility boundary is the reducer output: analytics event JSON, OTEL
exports, rollout trace bundles, or any other persisted/shared destination
schema. This keeps the event taxonomy easy to rename, split, merge, or reshape
in the same PR that updates its reducers, while preserving compatibility where
it matters.
If raw observations are ever persisted or consumed across process/version
boundaries, they should be treated as versioned schemas instead.
## Why Not Just OTEL?
An alternative is to emit semantic facts as ordinary `tracing` events and let a
local subscriber, OTEL exporter, analytics reducer, or trace reducer all consume
the same event stream. That is attractive because Codex already uses
`tracing`, and local JSON event capture is easy to wire up.
We should still avoid making OTEL/tracing events the canonical schema:
- `tracing` fields are stringly typed at the reducer boundary. Renames or type
changes become runtime reducer failures instead of Rust compile errors.
- OTEL trace-safe export has a different privacy posture than local diagnostic
traces. Rich local fields such as tool output, prompts, model payloads, and
terminal output should not share a target with remotely exported span events.
- Field-level policy needs metadata that `tracing` does not preserve in a
structured way. We need to know both detail level and data class before a
sink serializes a field.
- Analytics conformance is easier from typed observations than from flattened
JSON logs, because reducers can match on Rust event types and fields.
- Spans and semantic facts have different lifecycles. Spans model active scope
and context propagation; observations model facts that happened.
OTEL should therefore consume observations for semantic events and derived
metrics, while native `tracing` remains the right tool for spans, context
propagation, and exporter plumbing. A local trace sink can still write JSONL
bundles; it should write observation envelopes rather than raw
`tracing_subscriber` event JSON.
## Observation API
The shared schema API can be small:
```rust
pub trait Observation {
const NAME: &'static str;
fn visit_fields<V: ObservationFieldVisitor>(&self, visitor: &mut V);
}
pub trait ObservationFieldVisitor {
fn field<T: serde::Serialize + ?Sized>(
&mut self,
name: &'static str,
meta: FieldMeta,
value: &T,
);
}
pub trait ObservationSink {
fn observe<E: Observation>(&self, event: &E);
}
```
Example:
```rust
use codex_observability::Observation;
#[derive(Observation)]
#[observation(name = "turn.started", uses = ["analytics"])]
struct TurnStarted<'a> {
#[obs(level = "basic", class = "identifier")]
thread_id: &'a str,
#[obs(level = "basic", class = "identifier")]
turn_id: &'a str,
#[obs(level = "basic", class = "operational")]
started_at: i64,
}
```
The derive only exposes metadata. It should not implement analytics mapping,
trace persistence, redaction, schema generation, or export behavior.
## Integration API
Product/runtime code should integrate through small semantic helper groups, not
by spreading raw event construction through the codebase. Direct construction of
`events::*` structs is fine in tests and inside the helpers, but ordinary
product callsites should read like the domain action that just happened.
The helper layer should do the tedious, failure-prone work near the callsite:
extract common IDs, join session/thread/turn context, compute durations,
normalize status and error fields, avoid trace-only allocations when no sink
can read them, and then emit the typed observation.
Example shape:
```rust
observability.turn().started(&turn, &resolved_config);
observability.compaction().ended(&run, &outcome);
observability.features().skill_invoked(&turn, &resolved_skill);
```
The important boundary is what the helpers accept. They should take existing
runtime concepts such as sessions, turns, tool invocations, compaction runs,
and resolved feature metadata. They should not take analytics facts, OTEL tag
maps, rollout trace rows, or other destination schemas. If a helper cannot
produce the needed output without depending on a destination-specific type, the
event definition or the runtime callsite is not ready yet.
Initial integration should add one narrow helper group at a time. A useful
vertical slice has:
- one domain helper method at the real product boundary,
- one canonical event emitted by that helper,
- one projection test showing how the destination reducer consumes the event.
This mirrors the useful part of the existing systems:
- rollout trace has `RolloutTraceRecorder::record_*` helpers
- analytics has `AnalyticsEventsClient::track_*` helpers
- OTEL has `SessionTelemetry::record_*` helpers
The helper layer must not become a second taxonomy. It is ergonomic glue over
canonical observation structs. Prefer small domain-specific helper groups such
as turn, tool, inference, transport, compaction, and feature usage over one
large recorder with every method in the system.
## Rich Payloads
Start simple. Ordinary observation fields should be eager values or borrowed
references. Sinks should reject fields by metadata before serializing them, so
analytics and OTEL metrics do not inspect trace/content fields.
Some trace fields point at large objects that usually already exist at the
callsite: model requests, model responses, tool invocations, tool results, or
terminal output. Prefer passing borrowed serializable views of those objects to
the observation helper rather than building `serde_json::Value` eagerly.
Do not introduce a generalized lazy-field system in the first implementation.
Closures such as `|| build_trace_payload(...)` are useful when constructing the
trace view itself is expensive, but they add API complexity. Treat them as a
later optimization for measured hot spots, not as the default observation
style.
## Field Policy
Filtering must be field-level. Event-level labels such as basic/detailed/trace
are too coarse because one event often contains both remote-safe counters and
local-only content.
Each field should carry at least:
- **Use markers**: exact projections intended to consume the field, for example
`analytics`, `otel`, or `rollout_trace`.
- **Detail level**: `basic`, `detailed`, or `trace`.
- **Data class**: `identifier`, `operational`, `environment`, `content`, or
`secret_risk`.
Use markers are the allow marker: a sink should consume only fields explicitly
marked for that sink. Detail level and data class are descriptive metadata for
review, auditing, and sinks that need coarse verbosity or redaction modes.
Event structs may define default use markers when most fields feed the same
projection; field-level use markers override that default for mixed events.
Detail level is not privacy by itself. A tiny field can still be unsafe for
remote export, and a trace-level field can be trace-level because it is large
rather than sensitive. Data class is also not enough for selection: analytics
must not consume every basic operational field just because it would be safe.
Expected sink behavior:
- Analytics consumes fields marked `analytics`. Content-bearing analytics fields
should be rare and called out on the field or projection that exports them.
- Rollout trace allows rich local fields, with explicit redaction rules for
secret-risk material.
- OTEL logs preserve today's log export behavior, including account/email only
where today's policy allows them.
- OTEL trace-safe events prefer lengths, counts, status, timing, and coarse
categories over content.
- OTEL metrics select exact metric dimensions and then apply the OTEL policy.
- Feedback upload applies an explicit user-approved policy over the ringbuffer.
## Event Taxonomy
The taxonomy should be designed from Codex workflows, not from existing
destination schemas. Names should describe facts that occurred in the system:
`turn.ended`, `tool_call.ended`, `transport.api_request_completed`.
Rules:
- Prefer stable domain facts over analytics, OTEL, or trace storage names.
- Model lifecycle facts explicitly when downstream reducers need ordering or
duration.
- Keep event count small until a consumer needs a distinct fact.
- Put richness in fields, not in parallel event variants.
- Add transport/runtime facts when they are first-class telemetry today, even
if analytics ignores them.
- Keep an escape hatch only for local experimental tracing, not remote export.
Initial workflow coverage should be chosen by conformance need:
| Workflow | Canonical examples | Primary consumers |
| --- | --- | --- |
| Session/thread | `session.config_resolved`, `thread.started`, `thread.ended` | analytics, OTEL, rollout |
| Turn lifecycle | `turn.requested`, `turn.started`, `turn.ended` | analytics, OTEL, rollout |
| Turn timing | `turn.first_token_observed`, `turn.first_message_observed` | OTEL metrics, rollout |
| Model I/O | `inference.started`, `inference.sse_event_observed`, `inference.completed`, `inference.failed` | OTEL, rollout |
| Tools | `tool_call.started`, `tool_call.approval_resolved`, `tool_call.ended` | OTEL, rollout |
| Compaction | `compaction.started`, `compaction.installed`, `compaction.ended` | analytics, rollout |
| Agents | `agent.task_sent`, `agent.message_sent`, `agent.result_delivered`, `agent.closed` | rollout, analytics subset |
| Product features | `skill.invoked`, `app.mentioned`, `app.used`, `hook.run_completed`, `plugin.used`, `plugin.state_changed`, `review.completed` | analytics |
| Transport/auth | `transport.api_request_completed`, `transport.websocket_request_completed`, `auth.recovery_step_completed` | OTEL |
This table is not a complete schema. Each event still needs a typed Rust
definition with field annotations before implementation.
## Projections
### Analytics
Analytics becomes a reducer over observations. Existing product events remain
output schemas.
| Observations | Analytics output |
| --- | --- |
| `thread.started` | `codex_thread_initialized` |
| `turn.started` with resolved config, `turn.ended` with token usage | `codex_turn_event` |
| `turn.steer` | `codex_turn_steer_event` |
| compaction lifecycle observations | `codex_compaction_event` |
| skill/app/plugin/review observations | existing feature events |
`review.completed` is generic, but the current analytics projection only emits
the legacy guardian event for guardian reviewer responses. User reviewer
responses remain represented in the observation stream until a consumer needs
an analytics output for them.
The analytics crate should keep the observation-to-legacy-schema translation in
a private projection module. The reducer then stays responsible for ingestion
orchestration, batching, deduplication, connection metadata joins, and product
event naming rather than becoming a mixed bag of mapping code.
### Rollout Trace
Rollout trace consumes observations and decides which fields become trace
payloads. Trace storage details such as `RawPayloadRef` should remain a trace
implementation detail, not part of the shared event definitions.
Example: `inference.completed` can include response material as a trace-level
field. The trace sink may write it to `payloads/*.json`; analytics and OTEL
metrics never read it.
### OTEL
OTEL consumes observations for every semantic fact and derived metric. The
projection preserves existing exported names while removing separate callsite
event definitions.
| Observations | OTEL output |
| --- | --- |
| session/thread observations | `codex.conversation_starts`, `codex.thread.started` |
| `turn.input_received` | `codex.user_prompt` |
| `tool_call.approval_resolved` | `codex.tool_decision` |
| `tool_call.ended` | `codex.tool_result`, tool count/duration metrics |
| transport observations | API/websocket events and count/duration metrics |
| `auth.recovery_step_completed` | `codex.auth_recovery` |
| inference/SSE observations | `codex.sse_event`, SSE duration, response timing metrics |
| turn timing/token observations | TTFT, TTFM, E2E duration, token usage metrics |
Log-only and trace-safe OTEL variants can be emitted from the same observation
by applying different field policies.
## Conformance Testing
The highest-value equivalence target is analytics because it is existing
product behavior.
```text
same E2E Codex run
legacy analytics facts -> current AnalyticsReducer -> TrackEventRequest JSON
observations -> analytics observation reducer -> TrackEventRequest JSON
assert exact equality after stable JSON normalization
```
At least one conformance path should apply exact analytics use markers plus the
analytics guardrail policy, so missing markers and unsafe annotations fail
tests. Recommended first scenarios:
- thread start
- normal turn
- failed or interrupted turn
- accepted and rejected turn steering
- compaction
- skill, app, plugin, and review events
- subagent thread start
Rollout trace can start with observation-to-trace unit tests plus one or two
end-to-end smoke tests that write and reduce a bundle. Full graph equivalence
is not necessary for the initial experimental rollout feature.
OTEL should get projection tests that compare exported event names, fields,
metric names, and tags for representative observations. Native span tests
should remain in the tracing layer.
## Implementation Stages
Build the proof of life on a fresh `main` branch, but keep commits separable so
the working branch can later be split into reviewable PRs.
Each stage should leave a small, documented API surface and a passing local
verification gate. Non-essential behavior should stay as a TODO until a later
stage needs it.
1. **Observation foundation**
- Add `codex-observability` and `codex-observability-derive`.
- Verify with crate-local tests that derived observations expose field
metadata and reject missing annotations at compile time.
- Run: `cargo test -p codex-observability`.
2. **Analytics conformance slice**
- Add the first typed observations and an analytics projection for one
existing analytics flow.
- Verify legacy analytics facts and observation-derived facts produce
identical `TrackEventRequest` JSON after stable normalization.
- Run: targeted `cargo test -p codex-analytics`.
3. **E2E analytics shadowing**
- Capture legacy facts and observations from the same core/app-server test
run for selected scenarios.
- Verify exact analytics output equality for thread start, normal turn,
failed/interrupted turn, steering, compaction, and one feature event.
- Run the targeted core/app-server integration tests added for each
scenario.
4. **Rollout trace projection**
- Add a rollout trace sink that consumes the same observations for
thread/turn, inference, and tool lifecycle.
- Verify with observation-to-trace unit tests and one smoke test that writes
and reduces a local bundle.
5. **OTEL projection proof**
- Add a small OTEL projection for a high-value event such as tool result.
- Verify exported event names, field policy, metric names, and tags match
the current behavior.
## Migration
Start with a shadow observation stream and conformance tests. Once analytics
equivalence is reliable, move callsites from direct analytics facts to
observations.
Rollout trace can adopt observations directly because it is experimental.
OTEL should be migrated by category rather than left permanently mixed:
tool/user/turn events, transport events, auth events, and derived metrics can
move behind the OTEL projection. Native spans and trace context stay where they
are.
## Open Questions
- What is the smallest useful field-class set?
- Should timestamps and sequence numbers live in the observation envelope or be
supplied by sinks?
- Where should fanout live so callsites do not grow `codex-core` further?
- How should borrowed trace-level payloads avoid hot-path allocations while
still allowing sinks to serialize when policy permits?
- Which direct OTEL counters are first-class observations, and which are
metrics derived from broader observations?