mirror of
https://github.com/openai/codex.git
synced 2026-05-01 09:56:37 +00:00
Compare commits
14 Commits
codex-fix/
...
codex/obse
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
081b0d27f9 | ||
|
|
c58e8e329f | ||
|
|
1c9756600a | ||
|
|
df90ed05fe | ||
|
|
a8b9f08fc0 | ||
|
|
9b1329ad22 | ||
|
|
04667d0ea6 | ||
|
|
6922fe3b52 | ||
|
|
3198988a52 | ||
|
|
7418d6e364 | ||
|
|
10f7e100d9 | ||
|
|
b615ce9291 | ||
|
|
590ed405d2 | ||
|
|
50e23d92c6 |
20
codex-rs/Cargo.lock
generated
20
codex-rs/Cargo.lock
generated
@@ -1382,6 +1382,7 @@ dependencies = [
|
||||
"codex-app-server-protocol",
|
||||
"codex-git-utils",
|
||||
"codex-login",
|
||||
"codex-observability",
|
||||
"codex-plugin",
|
||||
"codex-protocol",
|
||||
"codex-utils-absolute-path",
|
||||
@@ -2563,6 +2564,25 @@ dependencies = [
|
||||
"url",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "codex-observability"
|
||||
version = "0.0.0"
|
||||
dependencies = [
|
||||
"codex-observability-derive",
|
||||
"pretty_assertions",
|
||||
"serde",
|
||||
"serde_json",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "codex-observability-derive"
|
||||
version = "0.0.0"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 2.0.114",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "codex-ollama"
|
||||
version = "0.0.0"
|
||||
|
||||
@@ -48,6 +48,8 @@ members = [
|
||||
"models-manager",
|
||||
"network-proxy",
|
||||
"ollama",
|
||||
"observability",
|
||||
"observability-derive",
|
||||
"process-hardening",
|
||||
"protocol",
|
||||
"realtime-webrtc",
|
||||
@@ -153,6 +155,8 @@ codex-model-provider-info = { path = "model-provider-info" }
|
||||
codex-models-manager = { path = "models-manager" }
|
||||
codex-network-proxy = { path = "network-proxy" }
|
||||
codex-ollama = { path = "ollama" }
|
||||
codex-observability = { path = "observability" }
|
||||
codex-observability-derive = { path = "observability-derive" }
|
||||
codex-otel = { path = "otel" }
|
||||
codex-plugin = { path = "plugin" }
|
||||
codex-model-provider = { path = "model-provider" }
|
||||
|
||||
@@ -16,6 +16,7 @@ workspace = true
|
||||
codex-app-server-protocol = { workspace = true }
|
||||
codex-git-utils = { workspace = true }
|
||||
codex-login = { workspace = true }
|
||||
codex-observability = { workspace = true }
|
||||
codex-plugin = { workspace = true }
|
||||
codex-protocol = { workspace = true }
|
||||
os_info = { workspace = true }
|
||||
|
||||
@@ -46,6 +46,7 @@ use crate::facts::TurnResolvedConfigFact;
|
||||
use crate::facts::TurnStatus;
|
||||
use crate::facts::TurnSteerRequestError;
|
||||
use crate::facts::TurnTokenUsageFact;
|
||||
use crate::observation_reducer::AnalyticsObservationReducer;
|
||||
use crate::reducer::AnalyticsReducer;
|
||||
use crate::reducer::normalize_path_for_skill_id;
|
||||
use crate::reducer::skill_id_for_local_skill;
|
||||
@@ -64,6 +65,7 @@ use codex_app_server_protocol::SandboxPolicy as AppServerSandboxPolicy;
|
||||
use codex_app_server_protocol::ServerNotification;
|
||||
use codex_app_server_protocol::SessionSource as AppServerSessionSource;
|
||||
use codex_app_server_protocol::Thread;
|
||||
use codex_app_server_protocol::ThreadForkResponse;
|
||||
use codex_app_server_protocol::ThreadResumeResponse;
|
||||
use codex_app_server_protocol::ThreadStartResponse;
|
||||
use codex_app_server_protocol::ThreadStatus as AppServerThreadStatus;
|
||||
@@ -201,6 +203,27 @@ fn sample_thread_resume_response_with_source(
|
||||
}
|
||||
}
|
||||
|
||||
fn sample_thread_fork_response(thread_id: &str, ephemeral: bool, model: &str) -> ClientResponse {
|
||||
let mut thread = sample_thread(thread_id, ephemeral);
|
||||
thread.forked_from_id = Some("parent-thread".to_string());
|
||||
|
||||
ClientResponse::ThreadFork {
|
||||
request_id: RequestId::Integer(3),
|
||||
response: ThreadForkResponse {
|
||||
thread,
|
||||
model: model.to_string(),
|
||||
model_provider: "openai".to_string(),
|
||||
service_tier: None,
|
||||
cwd: test_path_buf("/tmp").abs(),
|
||||
instruction_sources: Vec::new(),
|
||||
approval_policy: AppServerAskForApproval::OnFailure,
|
||||
approvals_reviewer: AppServerApprovalsReviewer::User,
|
||||
sandbox: AppServerSandboxPolicy::DangerFullAccess,
|
||||
reasoning_effort: None,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
fn sample_turn_start_request(thread_id: &str, request_id: i64) -> ClientRequest {
|
||||
ClientRequest::TurnStart {
|
||||
request_id: RequestId::Integer(request_id),
|
||||
@@ -435,25 +458,24 @@ async fn ingest_rejected_turn_steer(
|
||||
}
|
||||
|
||||
async fn ingest_initialize(reducer: &mut AnalyticsReducer, out: &mut Vec<TrackEventRequest>) {
|
||||
reducer
|
||||
.ingest(
|
||||
AnalyticsFact::Initialize {
|
||||
connection_id: 7,
|
||||
params: InitializeParams {
|
||||
client_info: ClientInfo {
|
||||
name: "codex-tui".to_string(),
|
||||
title: None,
|
||||
version: "1.0.0".to_string(),
|
||||
},
|
||||
capabilities: None,
|
||||
},
|
||||
product_client_id: "codex-tui".to_string(),
|
||||
runtime: sample_runtime_metadata(),
|
||||
rpc_transport: AppServerRpcTransport::Stdio,
|
||||
reducer.ingest(sample_initialize_fact(), out).await;
|
||||
}
|
||||
|
||||
fn sample_initialize_fact() -> AnalyticsFact {
|
||||
AnalyticsFact::Initialize {
|
||||
connection_id: 7,
|
||||
params: InitializeParams {
|
||||
client_info: ClientInfo {
|
||||
name: "codex-tui".to_string(),
|
||||
title: None,
|
||||
version: "1.0.0".to_string(),
|
||||
},
|
||||
out,
|
||||
)
|
||||
.await;
|
||||
capabilities: None,
|
||||
},
|
||||
product_client_id: "codex-tui".to_string(),
|
||||
runtime: sample_runtime_metadata(),
|
||||
rpc_transport: AppServerRpcTransport::Stdio,
|
||||
}
|
||||
}
|
||||
|
||||
async fn ingest_turn_prerequisites(
|
||||
@@ -1553,6 +1575,438 @@ async fn reducer_ingests_app_and_plugin_facts() {
|
||||
assert_eq!(payload[2]["event_type"], "codex_plugin_used");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn app_used_observation_matches_legacy_analytics_fact() {
|
||||
let mut legacy_reducer = AnalyticsReducer::default();
|
||||
let mut observation_reducer = AnalyticsObservationReducer::default();
|
||||
let mut legacy_events = Vec::new();
|
||||
let mut observation_events = Vec::new();
|
||||
let observation = codex_observability::events::AppUsed {
|
||||
model_slug: "gpt-5",
|
||||
thread_id: "thread-1",
|
||||
turn_id: "turn-1",
|
||||
connector_id: Some("drive"),
|
||||
app_name: Some("Drive"),
|
||||
invocation_type: Some(codex_observability::events::InvocationType::Implicit),
|
||||
};
|
||||
|
||||
legacy_reducer
|
||||
.ingest(
|
||||
AnalyticsFact::Custom(CustomAnalyticsFact::AppUsed(AppUsedInput {
|
||||
tracking: TrackEventsContext {
|
||||
model_slug: "gpt-5".to_string(),
|
||||
thread_id: "thread-1".to_string(),
|
||||
turn_id: "turn-1".to_string(),
|
||||
},
|
||||
app: AppInvocation {
|
||||
connector_id: Some("drive".to_string()),
|
||||
app_name: Some("Drive".to_string()),
|
||||
invocation_type: Some(InvocationType::Implicit),
|
||||
},
|
||||
})),
|
||||
&mut legacy_events,
|
||||
)
|
||||
.await;
|
||||
|
||||
observation_reducer
|
||||
.ingest_app_used(observation, &mut observation_events)
|
||||
.await;
|
||||
|
||||
let legacy_payload = serde_json::to_value(&legacy_events).expect("serialize legacy events");
|
||||
let observation_payload =
|
||||
serde_json::to_value(&observation_events).expect("serialize observation events");
|
||||
assert_eq!(observation_payload, legacy_payload);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn feature_observations_match_legacy_analytics_facts() {
|
||||
let mut legacy_reducer = AnalyticsReducer::default();
|
||||
let mut observation_reducer = AnalyticsObservationReducer::default();
|
||||
let mut legacy_events = Vec::new();
|
||||
let mut observation_events = Vec::new();
|
||||
let tracking = TrackEventsContext {
|
||||
model_slug: "gpt-5".to_string(),
|
||||
thread_id: "thread-1".to_string(),
|
||||
turn_id: "turn-1".to_string(),
|
||||
};
|
||||
let connector_ids = vec!["calendar".to_string(), "drive".to_string()];
|
||||
let skill_path = PathBuf::from("/Users/abc/.codex/skills/doc/SKILL.md");
|
||||
|
||||
legacy_reducer
|
||||
.ingest(
|
||||
AnalyticsFact::Custom(CustomAnalyticsFact::SkillInvoked(SkillInvokedInput {
|
||||
tracking: tracking.clone(),
|
||||
invocations: vec![SkillInvocation {
|
||||
skill_name: "doc".to_string(),
|
||||
skill_scope: codex_protocol::protocol::SkillScope::User,
|
||||
skill_path: skill_path.clone(),
|
||||
invocation_type: InvocationType::Explicit,
|
||||
}],
|
||||
})),
|
||||
&mut legacy_events,
|
||||
)
|
||||
.await;
|
||||
|
||||
legacy_reducer
|
||||
.ingest(
|
||||
AnalyticsFact::Custom(CustomAnalyticsFact::AppMentioned(AppMentionedInput {
|
||||
tracking: tracking.clone(),
|
||||
mentions: vec![AppInvocation {
|
||||
connector_id: Some("calendar".to_string()),
|
||||
app_name: Some("Calendar".to_string()),
|
||||
invocation_type: Some(InvocationType::Explicit),
|
||||
}],
|
||||
})),
|
||||
&mut legacy_events,
|
||||
)
|
||||
.await;
|
||||
legacy_reducer
|
||||
.ingest(
|
||||
AnalyticsFact::Custom(CustomAnalyticsFact::HookRun(HookRunInput {
|
||||
tracking: tracking.clone(),
|
||||
hook: HookRunFact {
|
||||
event_name: HookEventName::PostToolUse,
|
||||
hook_source: HookSource::Unknown,
|
||||
status: HookRunStatus::Failed,
|
||||
},
|
||||
})),
|
||||
&mut legacy_events,
|
||||
)
|
||||
.await;
|
||||
legacy_reducer
|
||||
.ingest(
|
||||
AnalyticsFact::Custom(CustomAnalyticsFact::PluginUsed(PluginUsedInput {
|
||||
tracking: tracking.clone(),
|
||||
plugin: sample_plugin_metadata(),
|
||||
})),
|
||||
&mut legacy_events,
|
||||
)
|
||||
.await;
|
||||
legacy_reducer
|
||||
.ingest(
|
||||
AnalyticsFact::Custom(CustomAnalyticsFact::PluginStateChanged(
|
||||
PluginStateChangedInput {
|
||||
plugin: sample_plugin_metadata(),
|
||||
state: PluginState::Disabled,
|
||||
},
|
||||
)),
|
||||
&mut legacy_events,
|
||||
)
|
||||
.await;
|
||||
|
||||
observation_reducer
|
||||
.ingest_skill_invoked(
|
||||
codex_observability::events::SkillInvoked {
|
||||
model_slug: "gpt-5",
|
||||
thread_id: "thread-1",
|
||||
turn_id: "turn-1",
|
||||
skill_name: "doc",
|
||||
skill_scope: codex_observability::events::SkillScope::User,
|
||||
skill_path: skill_path.as_path(),
|
||||
invocation_type: codex_observability::events::InvocationType::Explicit,
|
||||
},
|
||||
&mut observation_events,
|
||||
)
|
||||
.await;
|
||||
observation_reducer
|
||||
.ingest_app_mentioned(
|
||||
codex_observability::events::AppMentioned {
|
||||
model_slug: "gpt-5",
|
||||
thread_id: "thread-1",
|
||||
turn_id: "turn-1",
|
||||
connector_id: Some("calendar"),
|
||||
app_name: Some("Calendar"),
|
||||
invocation_type: Some(codex_observability::events::InvocationType::Explicit),
|
||||
},
|
||||
&mut observation_events,
|
||||
)
|
||||
.await;
|
||||
observation_reducer.ingest_hook_run_completed(
|
||||
codex_observability::events::HookRunCompleted {
|
||||
model_slug: "gpt-5",
|
||||
thread_id: "thread-1",
|
||||
turn_id: "turn-1",
|
||||
hook_name: "PostToolUse",
|
||||
hook_source: "unknown",
|
||||
status: codex_observability::events::HookRunStatus::Failed,
|
||||
},
|
||||
&mut observation_events,
|
||||
);
|
||||
observation_reducer.ingest_plugin_used(
|
||||
codex_observability::events::PluginUsed {
|
||||
model_slug: "gpt-5",
|
||||
thread_id: "thread-1",
|
||||
turn_id: "turn-1",
|
||||
plugin_id: "sample@test",
|
||||
plugin_name: "sample",
|
||||
marketplace_name: "test",
|
||||
has_skills: Some(true),
|
||||
mcp_server_count: Some(2),
|
||||
connector_ids: Some(connector_ids.as_slice()),
|
||||
},
|
||||
&mut observation_events,
|
||||
);
|
||||
observation_reducer.ingest_plugin_state_changed(
|
||||
codex_observability::events::PluginStateChanged {
|
||||
plugin_id: "sample@test",
|
||||
plugin_name: "sample",
|
||||
marketplace_name: "test",
|
||||
has_skills: Some(true),
|
||||
mcp_server_count: Some(2),
|
||||
connector_ids: Some(connector_ids.as_slice()),
|
||||
state: codex_observability::events::PluginState::Disabled,
|
||||
},
|
||||
&mut observation_events,
|
||||
);
|
||||
|
||||
let legacy_payload = serde_json::to_value(&legacy_events).expect("serialize legacy events");
|
||||
let observation_payload =
|
||||
serde_json::to_value(&observation_events).expect("serialize observation events");
|
||||
assert_eq!(observation_payload, legacy_payload);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn thread_started_observation_matches_legacy_responses() {
|
||||
assert_thread_started_observation_matches_legacy_response(
|
||||
sample_thread_start_response("thread-observed", /*ephemeral*/ false, "gpt-5"),
|
||||
codex_observability::events::ThreadStarted {
|
||||
thread_id: "thread-observed",
|
||||
source: codex_observability::events::ThreadSource::User,
|
||||
parent_thread_id: None,
|
||||
initialization_mode: codex_observability::events::ThreadInitializationMode::New,
|
||||
model: "gpt-5",
|
||||
ephemeral: false,
|
||||
created_at: 1,
|
||||
},
|
||||
)
|
||||
.await;
|
||||
|
||||
assert_thread_started_observation_matches_legacy_response(
|
||||
sample_thread_resume_response("thread-resumed", /*ephemeral*/ true, "gpt-5.1"),
|
||||
codex_observability::events::ThreadStarted {
|
||||
thread_id: "thread-resumed",
|
||||
source: codex_observability::events::ThreadSource::User,
|
||||
parent_thread_id: None,
|
||||
initialization_mode: codex_observability::events::ThreadInitializationMode::Resumed,
|
||||
model: "gpt-5.1",
|
||||
ephemeral: true,
|
||||
created_at: 1,
|
||||
},
|
||||
)
|
||||
.await;
|
||||
|
||||
assert_thread_started_observation_matches_legacy_response(
|
||||
sample_thread_fork_response("thread-forked", /*ephemeral*/ false, "gpt-5.2"),
|
||||
codex_observability::events::ThreadStarted {
|
||||
thread_id: "thread-forked",
|
||||
source: codex_observability::events::ThreadSource::User,
|
||||
parent_thread_id: None,
|
||||
initialization_mode: codex_observability::events::ThreadInitializationMode::Forked,
|
||||
model: "gpt-5.2",
|
||||
ephemeral: false,
|
||||
created_at: 1,
|
||||
},
|
||||
)
|
||||
.await;
|
||||
|
||||
let parent_thread_id =
|
||||
codex_protocol::ThreadId::from_string("22222222-2222-2222-2222-222222222222")
|
||||
.expect("valid parent thread id");
|
||||
assert_thread_started_observation_matches_legacy_response(
|
||||
sample_thread_resume_response_with_source(
|
||||
"thread-subagent",
|
||||
/*ephemeral*/ false,
|
||||
"gpt-5",
|
||||
AppServerSessionSource::SubAgent(SubAgentSource::ThreadSpawn {
|
||||
parent_thread_id,
|
||||
depth: 1,
|
||||
agent_path: None,
|
||||
agent_nickname: None,
|
||||
agent_role: None,
|
||||
}),
|
||||
),
|
||||
codex_observability::events::ThreadStarted {
|
||||
thread_id: "thread-subagent",
|
||||
source: codex_observability::events::ThreadSource::Subagent(
|
||||
codex_observability::events::ThreadSubagentKind::ThreadSpawn,
|
||||
),
|
||||
parent_thread_id: Some("22222222-2222-2222-2222-222222222222"),
|
||||
initialization_mode: codex_observability::events::ThreadInitializationMode::Resumed,
|
||||
model: "gpt-5",
|
||||
ephemeral: false,
|
||||
created_at: 1,
|
||||
},
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
async fn assert_thread_started_observation_matches_legacy_response(
|
||||
legacy_response: ClientResponse,
|
||||
observation: codex_observability::events::ThreadStarted<'_>,
|
||||
) {
|
||||
let mut legacy_reducer = AnalyticsReducer::default();
|
||||
let mut observation_reducer = AnalyticsObservationReducer::default();
|
||||
let mut legacy_events = Vec::new();
|
||||
let mut observation_events = Vec::new();
|
||||
|
||||
ingest_initialize(&mut legacy_reducer, &mut legacy_events).await;
|
||||
legacy_reducer
|
||||
.ingest(
|
||||
AnalyticsFact::Response {
|
||||
connection_id: 7,
|
||||
response: Box::new(legacy_response),
|
||||
},
|
||||
&mut legacy_events,
|
||||
)
|
||||
.await;
|
||||
|
||||
observation_reducer
|
||||
.ingest_existing_fact_for_test(sample_initialize_fact(), &mut observation_events)
|
||||
.await;
|
||||
observation_reducer.ingest_thread_started(7, observation, &mut observation_events);
|
||||
|
||||
let legacy_payload = serde_json::to_value(&legacy_events).expect("serialize legacy events");
|
||||
let observation_payload =
|
||||
serde_json::to_value(&observation_events).expect("serialize observation events");
|
||||
assert_eq!(observation_payload, legacy_payload);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn turn_observations_match_legacy_sources() {
|
||||
let mut legacy_reducer = AnalyticsReducer::default();
|
||||
let mut observation_reducer = AnalyticsObservationReducer::default();
|
||||
let mut legacy_events = Vec::new();
|
||||
let mut observation_events = Vec::new();
|
||||
|
||||
ingest_turn_prerequisites(
|
||||
&mut legacy_reducer,
|
||||
&mut legacy_events,
|
||||
/*include_initialize*/ true,
|
||||
/*include_resolved_config*/ true,
|
||||
/*include_started*/ true,
|
||||
/*include_token_usage*/ true,
|
||||
)
|
||||
.await;
|
||||
legacy_reducer
|
||||
.ingest(
|
||||
AnalyticsFact::Notification(Box::new(sample_turn_completed_notification(
|
||||
"thread-2",
|
||||
"turn-2",
|
||||
AppServerTurnStatus::Completed,
|
||||
/*codex_error_info*/ None,
|
||||
))),
|
||||
&mut legacy_events,
|
||||
)
|
||||
.await;
|
||||
|
||||
observation_reducer
|
||||
.ingest_existing_fact_for_test(
|
||||
AnalyticsFact::Initialize {
|
||||
connection_id: 7,
|
||||
params: InitializeParams {
|
||||
client_info: ClientInfo {
|
||||
name: "codex-tui".to_string(),
|
||||
title: None,
|
||||
version: "1.0.0".to_string(),
|
||||
},
|
||||
capabilities: None,
|
||||
},
|
||||
product_client_id: "codex-tui".to_string(),
|
||||
runtime: sample_runtime_metadata(),
|
||||
rpc_transport: AppServerRpcTransport::Stdio,
|
||||
},
|
||||
&mut observation_events,
|
||||
)
|
||||
.await;
|
||||
observation_reducer
|
||||
.ingest_existing_fact_for_test(
|
||||
AnalyticsFact::Response {
|
||||
connection_id: 7,
|
||||
response: Box::new(sample_thread_start_response(
|
||||
"thread-2", /*ephemeral*/ false, "gpt-5",
|
||||
)),
|
||||
},
|
||||
&mut observation_events,
|
||||
)
|
||||
.await;
|
||||
observation_events.clear();
|
||||
|
||||
// Keep not-yet-migrated request/thread context identical on both sides.
|
||||
// This test swaps started/ended turn facts, including resolved config and
|
||||
// token accounting, to observations while comparing the final payload.
|
||||
observation_reducer
|
||||
.ingest_existing_fact_for_test(
|
||||
AnalyticsFact::Request {
|
||||
connection_id: 7,
|
||||
request_id: RequestId::Integer(3),
|
||||
request: Box::new(sample_turn_start_request("thread-2", /*request_id*/ 3)),
|
||||
},
|
||||
&mut observation_events,
|
||||
)
|
||||
.await;
|
||||
observation_reducer
|
||||
.ingest_existing_fact_for_test(
|
||||
AnalyticsFact::Response {
|
||||
connection_id: 7,
|
||||
response: Box::new(sample_turn_start_response("turn-2", /*request_id*/ 3)),
|
||||
},
|
||||
&mut observation_events,
|
||||
)
|
||||
.await;
|
||||
observation_reducer
|
||||
.ingest_turn_started(
|
||||
codex_observability::events::TurnStarted {
|
||||
thread_id: "thread-2",
|
||||
turn_id: "turn-2",
|
||||
config: codex_observability::events::TurnConfig {
|
||||
num_input_images: 1,
|
||||
submission_type: None,
|
||||
ephemeral: false,
|
||||
model: "gpt-5",
|
||||
model_provider: "openai",
|
||||
sandbox_mode: codex_observability::events::SandboxMode::ReadOnly,
|
||||
sandbox_network_access: true,
|
||||
reasoning_effort: None,
|
||||
reasoning_summary: None,
|
||||
service_tier: None,
|
||||
approval_policy: codex_observability::events::ApprovalPolicy::OnRequest,
|
||||
approval_reviewer:
|
||||
codex_observability::events::ApprovalReviewer::GuardianSubagent,
|
||||
collaboration_mode: codex_observability::events::CollaborationMode::Plan,
|
||||
personality: None,
|
||||
is_first_turn: true,
|
||||
},
|
||||
started_at: 455,
|
||||
},
|
||||
&mut observation_events,
|
||||
)
|
||||
.await;
|
||||
observation_reducer
|
||||
.ingest_turn_ended(
|
||||
codex_observability::events::TurnEnded {
|
||||
thread_id: "thread-2",
|
||||
turn_id: "turn-2",
|
||||
status: codex_observability::events::TurnStatus::Completed,
|
||||
token_usage: Some(codex_observability::events::TurnTokenUsage {
|
||||
input_tokens: 123,
|
||||
cached_input_tokens: 45,
|
||||
output_tokens: 140,
|
||||
reasoning_output_tokens: 13,
|
||||
total_tokens: 321,
|
||||
}),
|
||||
ended_at: 456,
|
||||
duration_ms: 1234,
|
||||
},
|
||||
&mut observation_events,
|
||||
)
|
||||
.await;
|
||||
|
||||
let legacy_payload = serde_json::to_value(&legacy_events).expect("serialize legacy events");
|
||||
let observation_payload =
|
||||
serde_json::to_value(&observation_events).expect("serialize observation events");
|
||||
assert_eq!(observation_payload, legacy_payload);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn reducer_ingests_plugin_state_changed_fact() {
|
||||
let mut reducer = AnalyticsReducer::default();
|
||||
@@ -1769,6 +2223,409 @@ async fn accepted_turn_steer_emits_expected_event() {
|
||||
assert!(payload["event_params"].get("product_client_id").is_none());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn turn_steer_observations_match_legacy_sources() {
|
||||
let mut legacy_reducer = AnalyticsReducer::default();
|
||||
let mut observation_reducer = AnalyticsObservationReducer::default();
|
||||
let mut legacy_events = Vec::new();
|
||||
let mut observation_events = Vec::new();
|
||||
|
||||
ingest_initialize(&mut legacy_reducer, &mut legacy_events).await;
|
||||
legacy_reducer
|
||||
.ingest(
|
||||
AnalyticsFact::Response {
|
||||
connection_id: 7,
|
||||
response: Box::new(sample_thread_start_response(
|
||||
"thread-2", /*ephemeral*/ false, "gpt-5",
|
||||
)),
|
||||
},
|
||||
&mut legacy_events,
|
||||
)
|
||||
.await;
|
||||
legacy_events.clear();
|
||||
|
||||
observation_reducer
|
||||
.ingest_existing_fact_for_test(sample_initialize_fact(), &mut observation_events)
|
||||
.await;
|
||||
observation_reducer
|
||||
.ingest_existing_fact_for_test(
|
||||
AnalyticsFact::Response {
|
||||
connection_id: 7,
|
||||
response: Box::new(sample_thread_start_response(
|
||||
"thread-2", /*ephemeral*/ false, "gpt-5",
|
||||
)),
|
||||
},
|
||||
&mut observation_events,
|
||||
)
|
||||
.await;
|
||||
observation_events.clear();
|
||||
|
||||
legacy_reducer
|
||||
.ingest(
|
||||
AnalyticsFact::Request {
|
||||
connection_id: 7,
|
||||
request_id: RequestId::Integer(4),
|
||||
request: Box::new(sample_turn_steer_request(
|
||||
"thread-2", "turn-2", /*request_id*/ 4,
|
||||
)),
|
||||
},
|
||||
&mut legacy_events,
|
||||
)
|
||||
.await;
|
||||
legacy_reducer
|
||||
.ingest(
|
||||
AnalyticsFact::Response {
|
||||
connection_id: 7,
|
||||
response: Box::new(sample_turn_steer_response("turn-2", /*request_id*/ 4)),
|
||||
},
|
||||
&mut legacy_events,
|
||||
)
|
||||
.await;
|
||||
let accepted_created_at = legacy_event_created_at(&legacy_events[0]);
|
||||
observation_reducer.ingest_turn_steer(
|
||||
7,
|
||||
codex_observability::events::TurnSteer {
|
||||
thread_id: "thread-2",
|
||||
expected_turn_id: "turn-2",
|
||||
accepted_turn_id: Some("turn-2"),
|
||||
num_input_images: 1,
|
||||
result: codex_observability::events::TurnSteerResult::Accepted,
|
||||
rejection_reason: None,
|
||||
created_at: accepted_created_at,
|
||||
},
|
||||
&mut observation_events,
|
||||
);
|
||||
|
||||
legacy_reducer
|
||||
.ingest(
|
||||
AnalyticsFact::Request {
|
||||
connection_id: 7,
|
||||
request_id: RequestId::Integer(5),
|
||||
request: Box::new(sample_turn_steer_request(
|
||||
"thread-2", "turn-2", /*request_id*/ 5,
|
||||
)),
|
||||
},
|
||||
&mut legacy_events,
|
||||
)
|
||||
.await;
|
||||
legacy_reducer
|
||||
.ingest(
|
||||
AnalyticsFact::ErrorResponse {
|
||||
connection_id: 7,
|
||||
request_id: RequestId::Integer(5),
|
||||
error: no_active_turn_steer_error(),
|
||||
error_type: Some(no_active_turn_steer_error_type()),
|
||||
},
|
||||
&mut legacy_events,
|
||||
)
|
||||
.await;
|
||||
let rejected_created_at = legacy_event_created_at(&legacy_events[1]);
|
||||
observation_reducer.ingest_turn_steer(
|
||||
7,
|
||||
codex_observability::events::TurnSteer {
|
||||
thread_id: "thread-2",
|
||||
expected_turn_id: "turn-2",
|
||||
accepted_turn_id: None,
|
||||
num_input_images: 1,
|
||||
result: codex_observability::events::TurnSteerResult::Rejected,
|
||||
rejection_reason: Some(
|
||||
codex_observability::events::TurnSteerRejectionReason::NoActiveTurn,
|
||||
),
|
||||
created_at: rejected_created_at,
|
||||
},
|
||||
&mut observation_events,
|
||||
);
|
||||
|
||||
let legacy_payload = serde_json::to_value(&legacy_events).expect("serialize legacy events");
|
||||
let observation_payload =
|
||||
serde_json::to_value(&observation_events).expect("serialize observation events");
|
||||
assert_eq!(observation_payload, legacy_payload);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn compaction_ended_observation_matches_legacy_fact() {
|
||||
use codex_observability::events as observation_events;
|
||||
|
||||
let mut legacy_reducer = AnalyticsReducer::default();
|
||||
let mut observation_reducer = AnalyticsObservationReducer::default();
|
||||
let mut legacy_events = Vec::new();
|
||||
let mut observation_events = Vec::new();
|
||||
let parent_thread_id =
|
||||
codex_protocol::ThreadId::from_string("22222222-2222-2222-2222-222222222222")
|
||||
.expect("valid parent thread id");
|
||||
|
||||
ingest_initialize(&mut legacy_reducer, &mut legacy_events).await;
|
||||
legacy_reducer
|
||||
.ingest(
|
||||
AnalyticsFact::Response {
|
||||
connection_id: 7,
|
||||
response: Box::new(sample_thread_resume_response_with_source(
|
||||
"thread-1",
|
||||
/*ephemeral*/ false,
|
||||
"gpt-5",
|
||||
AppServerSessionSource::SubAgent(SubAgentSource::ThreadSpawn {
|
||||
parent_thread_id,
|
||||
depth: 1,
|
||||
agent_path: None,
|
||||
agent_nickname: None,
|
||||
agent_role: None,
|
||||
}),
|
||||
)),
|
||||
},
|
||||
&mut legacy_events,
|
||||
)
|
||||
.await;
|
||||
legacy_events.clear();
|
||||
|
||||
observation_reducer
|
||||
.ingest_existing_fact_for_test(sample_initialize_fact(), &mut observation_events)
|
||||
.await;
|
||||
let parent_thread_id =
|
||||
codex_protocol::ThreadId::from_string("22222222-2222-2222-2222-222222222222")
|
||||
.expect("valid parent thread id");
|
||||
observation_reducer
|
||||
.ingest_existing_fact_for_test(
|
||||
AnalyticsFact::Response {
|
||||
connection_id: 7,
|
||||
response: Box::new(sample_thread_resume_response_with_source(
|
||||
"thread-1",
|
||||
/*ephemeral*/ false,
|
||||
"gpt-5",
|
||||
AppServerSessionSource::SubAgent(SubAgentSource::ThreadSpawn {
|
||||
parent_thread_id,
|
||||
depth: 1,
|
||||
agent_path: None,
|
||||
agent_nickname: None,
|
||||
agent_role: None,
|
||||
}),
|
||||
)),
|
||||
},
|
||||
&mut observation_events,
|
||||
)
|
||||
.await;
|
||||
observation_events.clear();
|
||||
|
||||
legacy_reducer
|
||||
.ingest(
|
||||
AnalyticsFact::Custom(CustomAnalyticsFact::Compaction(Box::new(
|
||||
CodexCompactionEvent {
|
||||
thread_id: "thread-1".to_string(),
|
||||
turn_id: "turn-compact".to_string(),
|
||||
trigger: CompactionTrigger::Manual,
|
||||
reason: CompactionReason::UserRequested,
|
||||
implementation: CompactionImplementation::Responses,
|
||||
phase: CompactionPhase::StandaloneTurn,
|
||||
strategy: CompactionStrategy::Memento,
|
||||
status: CompactionStatus::Failed,
|
||||
error: Some("context limit exceeded".to_string()),
|
||||
active_context_tokens_before: 131_000,
|
||||
active_context_tokens_after: 131_000,
|
||||
started_at: 100,
|
||||
completed_at: 101,
|
||||
duration_ms: Some(1200),
|
||||
},
|
||||
))),
|
||||
&mut legacy_events,
|
||||
)
|
||||
.await;
|
||||
|
||||
observation_reducer
|
||||
.ingest_compaction_ended(
|
||||
observation_events::CompactionEnded {
|
||||
thread_id: "thread-1",
|
||||
turn_id: "turn-compact",
|
||||
trigger: observation_events::CompactionTrigger::Manual,
|
||||
reason: observation_events::CompactionReason::UserRequested,
|
||||
implementation: observation_events::CompactionImplementation::Responses,
|
||||
phase: observation_events::CompactionPhase::StandaloneTurn,
|
||||
strategy: observation_events::CompactionStrategy::Memento,
|
||||
status: observation_events::CompactionStatus::Failed {
|
||||
error: Some("context limit exceeded"),
|
||||
},
|
||||
active_context_tokens_before: 131_000,
|
||||
active_context_tokens_after: 131_000,
|
||||
started_at: 100,
|
||||
ended_at: 101,
|
||||
duration_ms: Some(1200),
|
||||
},
|
||||
&mut observation_events,
|
||||
)
|
||||
.await;
|
||||
|
||||
let legacy_payload = serde_json::to_value(&legacy_events).expect("serialize legacy events");
|
||||
let observation_payload =
|
||||
serde_json::to_value(&observation_events).expect("serialize observation events");
|
||||
assert_eq!(observation_payload, legacy_payload);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn review_completed_observation_matches_legacy_guardian_fact() {
|
||||
use crate::events as analytics_events;
|
||||
use codex_observability::events as observation_events;
|
||||
|
||||
let mut legacy_reducer = AnalyticsReducer::default();
|
||||
let mut observation_reducer = AnalyticsObservationReducer::default();
|
||||
let mut legacy_events = Vec::new();
|
||||
let mut observation_events = Vec::new();
|
||||
|
||||
ingest_initialize(&mut legacy_reducer, &mut legacy_events).await;
|
||||
legacy_reducer
|
||||
.ingest(
|
||||
AnalyticsFact::Response {
|
||||
connection_id: 7,
|
||||
response: Box::new(sample_thread_start_response(
|
||||
"thread-guardian",
|
||||
/*ephemeral*/ false,
|
||||
"gpt-5",
|
||||
)),
|
||||
},
|
||||
&mut legacy_events,
|
||||
)
|
||||
.await;
|
||||
legacy_events.clear();
|
||||
|
||||
observation_reducer
|
||||
.ingest_existing_fact_for_test(sample_initialize_fact(), &mut observation_events)
|
||||
.await;
|
||||
observation_reducer
|
||||
.ingest_existing_fact_for_test(
|
||||
AnalyticsFact::Response {
|
||||
connection_id: 7,
|
||||
response: Box::new(sample_thread_start_response(
|
||||
"thread-guardian",
|
||||
/*ephemeral*/ false,
|
||||
"gpt-5",
|
||||
)),
|
||||
},
|
||||
&mut observation_events,
|
||||
)
|
||||
.await;
|
||||
observation_events.clear();
|
||||
|
||||
legacy_reducer
|
||||
.ingest(
|
||||
AnalyticsFact::Custom(CustomAnalyticsFact::GuardianReview(Box::new(
|
||||
analytics_events::GuardianReviewEventParams {
|
||||
thread_id: "thread-guardian".to_string(),
|
||||
turn_id: "turn-guardian".to_string(),
|
||||
review_id: "review-1".to_string(),
|
||||
target_item_id: "item-1".to_string(),
|
||||
retry_reason: Some("parse_retry".to_string()),
|
||||
approval_request_source:
|
||||
analytics_events::GuardianApprovalRequestSource::DelegatedSubagent,
|
||||
reviewed_action: analytics_events::GuardianReviewedAction::McpToolCall {
|
||||
server: "drive".to_string(),
|
||||
tool_name: "search".to_string(),
|
||||
connector_id: Some("drive-connector".to_string()),
|
||||
connector_name: Some("Drive".to_string()),
|
||||
tool_title: Some("Search Drive".to_string()),
|
||||
},
|
||||
reviewed_action_truncated: false,
|
||||
decision: analytics_events::GuardianReviewDecision::Aborted,
|
||||
terminal_status: analytics_events::GuardianReviewTerminalStatus::FailedClosed,
|
||||
failure_reason: Some(analytics_events::GuardianReviewFailureReason::ParseError),
|
||||
risk_level: Some(analytics_events::GuardianReviewRiskLevel::High),
|
||||
user_authorization: Some(
|
||||
analytics_events::GuardianReviewUserAuthorization::Medium,
|
||||
),
|
||||
outcome: Some(analytics_events::GuardianReviewOutcome::Deny),
|
||||
rationale: Some("review could not parse final policy".to_string()),
|
||||
guardian_thread_id: Some("guardian-thread-1".to_string()),
|
||||
guardian_session_kind: Some(
|
||||
analytics_events::GuardianReviewSessionKind::TrunkNew,
|
||||
),
|
||||
guardian_model: Some("gpt-5".to_string()),
|
||||
guardian_reasoning_effort: Some("medium".to_string()),
|
||||
had_prior_review_context: Some(true),
|
||||
review_timeout_ms: 30_000,
|
||||
tool_call_count: 2,
|
||||
time_to_first_token_ms: Some(100),
|
||||
completion_latency_ms: Some(1500),
|
||||
started_at: 100,
|
||||
completed_at: Some(102),
|
||||
input_tokens: Some(123),
|
||||
cached_input_tokens: Some(45),
|
||||
output_tokens: Some(140),
|
||||
reasoning_output_tokens: Some(13),
|
||||
total_tokens: Some(321),
|
||||
},
|
||||
))),
|
||||
&mut legacy_events,
|
||||
)
|
||||
.await;
|
||||
|
||||
observation_reducer
|
||||
.ingest_review_completed(
|
||||
observation_events::ReviewCompleted {
|
||||
thread_id: "thread-guardian",
|
||||
turn_id: "turn-guardian",
|
||||
review_id: "review-1",
|
||||
target_item_id: "item-1",
|
||||
retry_reason: Some("parse_retry"),
|
||||
request_source: observation_events::ReviewRequestSource::DelegatedSubagent,
|
||||
reviewed_action: observation_events::ReviewedAction::McpToolCall {
|
||||
server: "drive",
|
||||
tool_name: "search",
|
||||
connector_id: Some("drive-connector"),
|
||||
connector_name: Some("Drive"),
|
||||
tool_title: Some("Search Drive"),
|
||||
},
|
||||
reviewed_action_truncated: false,
|
||||
response: observation_events::ReviewResponse::Guardian(
|
||||
observation_events::GuardianReviewResponse {
|
||||
decision: observation_events::ReviewDecision::Aborted,
|
||||
terminal_status: observation_events::ReviewTerminalStatus::FailedClosed {
|
||||
failure_reason: Some(
|
||||
observation_events::ReviewFailureReason::ParseError,
|
||||
),
|
||||
},
|
||||
risk_level: Some(observation_events::ReviewRiskLevel::High),
|
||||
user_authorization: Some(
|
||||
observation_events::ReviewUserAuthorization::Medium,
|
||||
),
|
||||
outcome: Some(observation_events::ReviewOutcome::Deny),
|
||||
rationale: Some("review could not parse final policy"),
|
||||
session: Some(observation_events::GuardianReviewSession {
|
||||
guardian_thread_id: "guardian-thread-1",
|
||||
session_kind: observation_events::GuardianReviewSessionKind::TrunkNew,
|
||||
model: "gpt-5",
|
||||
reasoning_effort: Some("medium"),
|
||||
had_prior_review_context: true,
|
||||
}),
|
||||
review_timeout_ms: 30_000,
|
||||
tool_call_count: 2,
|
||||
time_to_first_token_ms: Some(100),
|
||||
completion_latency_ms: Some(1500),
|
||||
token_usage: Some(observation_events::TurnTokenUsage {
|
||||
input_tokens: 123,
|
||||
cached_input_tokens: 45,
|
||||
output_tokens: 140,
|
||||
reasoning_output_tokens: 13,
|
||||
total_tokens: 321,
|
||||
}),
|
||||
},
|
||||
),
|
||||
started_at: 100,
|
||||
ended_at: 102,
|
||||
},
|
||||
&mut observation_events,
|
||||
)
|
||||
.await;
|
||||
|
||||
let legacy_payload = serde_json::to_value(&legacy_events).expect("serialize legacy events");
|
||||
let observation_payload =
|
||||
serde_json::to_value(&observation_events).expect("serialize observation events");
|
||||
assert_eq!(observation_payload, legacy_payload);
|
||||
}
|
||||
|
||||
fn legacy_event_created_at(event: &TrackEventRequest) -> i64 {
|
||||
serde_json::to_value(event)
|
||||
.expect("serialize event")
|
||||
.pointer("/event_params/created_at")
|
||||
.and_then(serde_json::Value::as_i64)
|
||||
.expect("created_at")
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn rejected_turn_steer_uses_request_connection_metadata() {
|
||||
let mut reducer = AnalyticsReducer::default();
|
||||
|
||||
@@ -1,7 +1,10 @@
|
||||
mod client;
|
||||
mod events;
|
||||
mod facts;
|
||||
mod observation_projection;
|
||||
mod observation_reducer;
|
||||
mod reducer;
|
||||
mod review_observation_projection;
|
||||
|
||||
use std::time::SystemTime;
|
||||
use std::time::UNIX_EPOCH;
|
||||
|
||||
416
codex-rs/analytics/src/observation_projection.rs
Normal file
416
codex-rs/analytics/src/observation_projection.rs
Normal file
@@ -0,0 +1,416 @@
|
||||
//! Projection from shared observations into the current analytics schema.
|
||||
//!
|
||||
//! The observation taxonomy is intended to describe what Codex did, not the
|
||||
//! shape of any particular telemetry backend. This private module is the
|
||||
//! adapter boundary where typed observations are translated into the legacy
|
||||
//! analytics facts and track-event payloads that already exist today.
|
||||
|
||||
use crate::events::CodexHookRunEventRequest;
|
||||
use crate::events::CodexHookRunMetadata;
|
||||
use crate::events::CodexPluginEventRequest;
|
||||
use crate::events::CodexPluginMetadata;
|
||||
use crate::events::CodexPluginUsedEventRequest;
|
||||
use crate::events::CodexPluginUsedMetadata;
|
||||
use crate::events::TrackEventRequest;
|
||||
use crate::facts;
|
||||
use crate::facts::AppInvocation;
|
||||
use crate::facts::AppMentionedInput;
|
||||
use crate::facts::AppUsedInput;
|
||||
use crate::facts::SkillInvocation;
|
||||
use crate::facts::SkillInvokedInput;
|
||||
use crate::facts::TrackEventsContext;
|
||||
use crate::facts::TurnResolvedConfigFact;
|
||||
use crate::facts::TurnSubmissionType as AnalyticsTurnSubmissionType;
|
||||
use crate::facts::TurnTokenUsageFact;
|
||||
use codex_app_server_protocol::ServerNotification;
|
||||
use codex_app_server_protocol::Turn;
|
||||
use codex_app_server_protocol::TurnCompletedNotification;
|
||||
use codex_app_server_protocol::TurnStartedNotification;
|
||||
use codex_app_server_protocol::TurnStatus as AppServerTurnStatus;
|
||||
use codex_login::default_client::originator;
|
||||
use codex_observability::events;
|
||||
use codex_protocol::config_types::ApprovalsReviewer;
|
||||
use codex_protocol::config_types::ModeKind;
|
||||
use codex_protocol::config_types::Personality;
|
||||
use codex_protocol::config_types::ReasoningSummary;
|
||||
use codex_protocol::config_types::ServiceTier;
|
||||
use codex_protocol::openai_models::ReasoningEffort;
|
||||
use codex_protocol::protocol::AskForApproval;
|
||||
use codex_protocol::protocol::GranularApprovalConfig;
|
||||
use codex_protocol::protocol::HookRunStatus as ProtocolHookRunStatus;
|
||||
use codex_protocol::protocol::NetworkAccess;
|
||||
use codex_protocol::protocol::ReadOnlyAccess;
|
||||
use codex_protocol::protocol::SandboxPolicy;
|
||||
use codex_protocol::protocol::SessionSource;
|
||||
use codex_protocol::protocol::SkillScope as ProtocolSkillScope;
|
||||
use codex_protocol::protocol::TokenUsage;
|
||||
|
||||
pub(crate) fn skill_invoked_input(observation: events::SkillInvoked<'_>) -> SkillInvokedInput {
|
||||
SkillInvokedInput {
|
||||
tracking: tracking_from_fields(
|
||||
observation.model_slug,
|
||||
observation.thread_id,
|
||||
observation.turn_id,
|
||||
),
|
||||
invocations: vec![SkillInvocation {
|
||||
skill_name: observation.skill_name.to_string(),
|
||||
skill_scope: match observation.skill_scope {
|
||||
events::SkillScope::User => ProtocolSkillScope::User,
|
||||
events::SkillScope::Repo => ProtocolSkillScope::Repo,
|
||||
events::SkillScope::System => ProtocolSkillScope::System,
|
||||
events::SkillScope::Admin => ProtocolSkillScope::Admin,
|
||||
},
|
||||
skill_path: observation.skill_path.to_path_buf(),
|
||||
invocation_type: map_invocation_type(observation.invocation_type),
|
||||
}],
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn app_mentioned_input(observation: events::AppMentioned<'_>) -> AppMentionedInput {
|
||||
AppMentionedInput {
|
||||
tracking: tracking_from_fields(
|
||||
observation.model_slug,
|
||||
observation.thread_id,
|
||||
observation.turn_id,
|
||||
),
|
||||
mentions: vec![app_invocation_from_fields(
|
||||
observation.connector_id,
|
||||
observation.app_name,
|
||||
observation.invocation_type,
|
||||
)],
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn app_used_input(observation: events::AppUsed<'_>) -> AppUsedInput {
|
||||
AppUsedInput {
|
||||
tracking: tracking_from_fields(
|
||||
observation.model_slug,
|
||||
observation.thread_id,
|
||||
observation.turn_id,
|
||||
),
|
||||
app: app_invocation_from_fields(
|
||||
observation.connector_id,
|
||||
observation.app_name,
|
||||
observation.invocation_type,
|
||||
),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn hook_run_completed_event(
|
||||
observation: events::HookRunCompleted<'_>,
|
||||
) -> TrackEventRequest {
|
||||
TrackEventRequest::HookRun(CodexHookRunEventRequest {
|
||||
event_type: "codex_hook_run",
|
||||
event_params: CodexHookRunMetadata {
|
||||
thread_id: Some(observation.thread_id.to_string()),
|
||||
turn_id: Some(observation.turn_id.to_string()),
|
||||
model_slug: Some(observation.model_slug.to_string()),
|
||||
hook_name: Some(observation.hook_name.to_string()),
|
||||
hook_source: Some(observation.hook_source),
|
||||
status: Some(match observation.status {
|
||||
events::HookRunStatus::Completed => ProtocolHookRunStatus::Completed,
|
||||
events::HookRunStatus::Failed => ProtocolHookRunStatus::Failed,
|
||||
events::HookRunStatus::Blocked => ProtocolHookRunStatus::Blocked,
|
||||
events::HookRunStatus::Stopped => ProtocolHookRunStatus::Stopped,
|
||||
}),
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
pub(crate) fn plugin_used_event(observation: events::PluginUsed<'_>) -> TrackEventRequest {
|
||||
TrackEventRequest::PluginUsed(CodexPluginUsedEventRequest {
|
||||
event_type: "codex_plugin_used",
|
||||
event_params: CodexPluginUsedMetadata {
|
||||
plugin: plugin_metadata_from_fields(
|
||||
observation.plugin_id,
|
||||
observation.plugin_name,
|
||||
observation.marketplace_name,
|
||||
observation.has_skills,
|
||||
observation.mcp_server_count,
|
||||
observation.connector_ids,
|
||||
),
|
||||
thread_id: Some(observation.thread_id.to_string()),
|
||||
turn_id: Some(observation.turn_id.to_string()),
|
||||
model_slug: Some(observation.model_slug.to_string()),
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
pub(crate) fn plugin_state_changed_event(
|
||||
observation: events::PluginStateChanged<'_>,
|
||||
) -> TrackEventRequest {
|
||||
let event = CodexPluginEventRequest {
|
||||
event_type: match observation.state {
|
||||
events::PluginState::Installed => "codex_plugin_installed",
|
||||
events::PluginState::Uninstalled => "codex_plugin_uninstalled",
|
||||
events::PluginState::Enabled => "codex_plugin_enabled",
|
||||
events::PluginState::Disabled => "codex_plugin_disabled",
|
||||
},
|
||||
event_params: plugin_metadata_from_fields(
|
||||
observation.plugin_id,
|
||||
observation.plugin_name,
|
||||
observation.marketplace_name,
|
||||
observation.has_skills,
|
||||
observation.mcp_server_count,
|
||||
observation.connector_ids,
|
||||
),
|
||||
};
|
||||
|
||||
match observation.state {
|
||||
events::PluginState::Installed => TrackEventRequest::PluginInstalled(event),
|
||||
events::PluginState::Uninstalled => TrackEventRequest::PluginUninstalled(event),
|
||||
events::PluginState::Enabled => TrackEventRequest::PluginEnabled(event),
|
||||
events::PluginState::Disabled => TrackEventRequest::PluginDisabled(event),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn turn_started_notification(
|
||||
observation: events::TurnStarted<'_>,
|
||||
) -> ServerNotification {
|
||||
ServerNotification::TurnStarted(TurnStartedNotification {
|
||||
thread_id: observation.thread_id.to_string(),
|
||||
turn: Turn {
|
||||
id: observation.turn_id.to_string(),
|
||||
items: vec![],
|
||||
status: AppServerTurnStatus::InProgress,
|
||||
error: None,
|
||||
started_at: Some(observation.started_at),
|
||||
completed_at: None,
|
||||
duration_ms: None,
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
pub(crate) fn turn_resolved_config_fact(
|
||||
observation: &events::TurnStarted<'_>,
|
||||
) -> TurnResolvedConfigFact {
|
||||
let config = observation.config;
|
||||
TurnResolvedConfigFact {
|
||||
turn_id: observation.turn_id.to_string(),
|
||||
thread_id: observation.thread_id.to_string(),
|
||||
num_input_images: config.num_input_images,
|
||||
submission_type: config
|
||||
.submission_type
|
||||
.map(|submission_type| match submission_type {
|
||||
events::TurnSubmissionType::Default => AnalyticsTurnSubmissionType::Default,
|
||||
events::TurnSubmissionType::Queued => AnalyticsTurnSubmissionType::Queued,
|
||||
}),
|
||||
ephemeral: config.ephemeral,
|
||||
// The legacy fact carries session_source, but codex_turn_event derives
|
||||
// thread source from thread lifecycle metadata instead. Keep this
|
||||
// placeholder local to the projection until a consumer needs it.
|
||||
session_source: SessionSource::Unknown,
|
||||
model: config.model.to_string(),
|
||||
model_provider: config.model_provider.to_string(),
|
||||
sandbox_policy: match config.sandbox_mode {
|
||||
events::SandboxMode::FullAccess => SandboxPolicy::DangerFullAccess,
|
||||
events::SandboxMode::ReadOnly => SandboxPolicy::ReadOnly {
|
||||
access: ReadOnlyAccess::FullAccess,
|
||||
network_access: config.sandbox_network_access,
|
||||
},
|
||||
events::SandboxMode::WorkspaceWrite => SandboxPolicy::WorkspaceWrite {
|
||||
writable_roots: Vec::new(),
|
||||
read_only_access: ReadOnlyAccess::FullAccess,
|
||||
network_access: config.sandbox_network_access,
|
||||
exclude_tmpdir_env_var: false,
|
||||
exclude_slash_tmp: false,
|
||||
},
|
||||
events::SandboxMode::ExternalSandbox => SandboxPolicy::ExternalSandbox {
|
||||
network_access: if config.sandbox_network_access {
|
||||
NetworkAccess::Enabled
|
||||
} else {
|
||||
NetworkAccess::Restricted
|
||||
},
|
||||
},
|
||||
},
|
||||
reasoning_effort: config
|
||||
.reasoning_effort
|
||||
.map(|reasoning_effort| match reasoning_effort {
|
||||
events::ReasoningEffort::None => ReasoningEffort::None,
|
||||
events::ReasoningEffort::Minimal => ReasoningEffort::Minimal,
|
||||
events::ReasoningEffort::Low => ReasoningEffort::Low,
|
||||
events::ReasoningEffort::Medium => ReasoningEffort::Medium,
|
||||
events::ReasoningEffort::High => ReasoningEffort::High,
|
||||
events::ReasoningEffort::XHigh => ReasoningEffort::XHigh,
|
||||
}),
|
||||
reasoning_summary: config.reasoning_summary.map(
|
||||
|reasoning_summary| match reasoning_summary {
|
||||
events::ReasoningSummary::Auto => ReasoningSummary::Auto,
|
||||
events::ReasoningSummary::Concise => ReasoningSummary::Concise,
|
||||
events::ReasoningSummary::Detailed => ReasoningSummary::Detailed,
|
||||
events::ReasoningSummary::None => ReasoningSummary::None,
|
||||
},
|
||||
),
|
||||
service_tier: config.service_tier.map(|service_tier| match service_tier {
|
||||
events::ServiceTier::Fast => ServiceTier::Fast,
|
||||
events::ServiceTier::Flex => ServiceTier::Flex,
|
||||
}),
|
||||
approval_policy: match config.approval_policy {
|
||||
events::ApprovalPolicy::Untrusted => AskForApproval::UnlessTrusted,
|
||||
events::ApprovalPolicy::OnFailure => AskForApproval::OnFailure,
|
||||
events::ApprovalPolicy::OnRequest => AskForApproval::OnRequest,
|
||||
events::ApprovalPolicy::Granular => AskForApproval::Granular(GranularApprovalConfig {
|
||||
sandbox_approval: true,
|
||||
rules: true,
|
||||
skill_approval: true,
|
||||
request_permissions: true,
|
||||
mcp_elicitations: true,
|
||||
}),
|
||||
events::ApprovalPolicy::Never => AskForApproval::Never,
|
||||
},
|
||||
approvals_reviewer: match config.approval_reviewer {
|
||||
events::ApprovalReviewer::User => ApprovalsReviewer::User,
|
||||
events::ApprovalReviewer::GuardianSubagent => ApprovalsReviewer::GuardianSubagent,
|
||||
},
|
||||
sandbox_network_access: config.sandbox_network_access,
|
||||
collaboration_mode: match config.collaboration_mode {
|
||||
events::CollaborationMode::Default => ModeKind::Default,
|
||||
events::CollaborationMode::Plan => ModeKind::Plan,
|
||||
},
|
||||
personality: config.personality.map(|personality| match personality {
|
||||
events::Personality::None => Personality::None,
|
||||
events::Personality::Friendly => Personality::Friendly,
|
||||
events::Personality::Pragmatic => Personality::Pragmatic,
|
||||
}),
|
||||
is_first_turn: config.is_first_turn,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn turn_token_usage_fact(
|
||||
observation: &events::TurnEnded<'_>,
|
||||
) -> Option<TurnTokenUsageFact> {
|
||||
let token_usage = observation.token_usage?;
|
||||
Some(TurnTokenUsageFact {
|
||||
turn_id: observation.turn_id.to_string(),
|
||||
thread_id: observation.thread_id.to_string(),
|
||||
token_usage: TokenUsage {
|
||||
input_tokens: token_usage.input_tokens,
|
||||
cached_input_tokens: token_usage.cached_input_tokens,
|
||||
output_tokens: token_usage.output_tokens,
|
||||
reasoning_output_tokens: token_usage.reasoning_output_tokens,
|
||||
total_tokens: token_usage.total_tokens,
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
pub(crate) fn turn_ended_notification(observation: events::TurnEnded<'_>) -> ServerNotification {
|
||||
ServerNotification::TurnCompleted(TurnCompletedNotification {
|
||||
thread_id: observation.thread_id.to_string(),
|
||||
turn: Turn {
|
||||
id: observation.turn_id.to_string(),
|
||||
items: vec![],
|
||||
status: match observation.status {
|
||||
events::TurnStatus::Completed => AppServerTurnStatus::Completed,
|
||||
events::TurnStatus::Failed => AppServerTurnStatus::Failed,
|
||||
events::TurnStatus::Interrupted => AppServerTurnStatus::Interrupted,
|
||||
},
|
||||
// Error taxonomy needs a separate design pass. Keeping it out of
|
||||
// the first terminal-turn observation avoids baking app-server
|
||||
// transport categories into the shared event model.
|
||||
error: None,
|
||||
started_at: None,
|
||||
completed_at: Some(observation.ended_at),
|
||||
duration_ms: Some(observation.duration_ms),
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
pub(crate) fn compaction_ended_event(
|
||||
observation: events::CompactionEnded<'_>,
|
||||
) -> facts::CodexCompactionEvent {
|
||||
let (status, error) = match observation.status {
|
||||
events::CompactionStatus::Completed => (facts::CompactionStatus::Completed, None),
|
||||
events::CompactionStatus::Failed { error } => {
|
||||
(facts::CompactionStatus::Failed, error.map(str::to_string))
|
||||
}
|
||||
events::CompactionStatus::Interrupted => (facts::CompactionStatus::Interrupted, None),
|
||||
};
|
||||
|
||||
facts::CodexCompactionEvent {
|
||||
thread_id: observation.thread_id.to_string(),
|
||||
turn_id: observation.turn_id.to_string(),
|
||||
trigger: match observation.trigger {
|
||||
events::CompactionTrigger::Manual => facts::CompactionTrigger::Manual,
|
||||
events::CompactionTrigger::Auto => facts::CompactionTrigger::Auto,
|
||||
},
|
||||
reason: match observation.reason {
|
||||
events::CompactionReason::UserRequested => facts::CompactionReason::UserRequested,
|
||||
events::CompactionReason::ContextLimit => facts::CompactionReason::ContextLimit,
|
||||
events::CompactionReason::ModelDownshift => facts::CompactionReason::ModelDownshift,
|
||||
},
|
||||
implementation: match observation.implementation {
|
||||
events::CompactionImplementation::Responses => {
|
||||
facts::CompactionImplementation::Responses
|
||||
}
|
||||
events::CompactionImplementation::ResponsesCompact => {
|
||||
facts::CompactionImplementation::ResponsesCompact
|
||||
}
|
||||
},
|
||||
phase: match observation.phase {
|
||||
events::CompactionPhase::StandaloneTurn => facts::CompactionPhase::StandaloneTurn,
|
||||
events::CompactionPhase::PreTurn => facts::CompactionPhase::PreTurn,
|
||||
events::CompactionPhase::MidTurn => facts::CompactionPhase::MidTurn,
|
||||
},
|
||||
strategy: match observation.strategy {
|
||||
events::CompactionStrategy::Memento => facts::CompactionStrategy::Memento,
|
||||
events::CompactionStrategy::PrefixCompaction => {
|
||||
facts::CompactionStrategy::PrefixCompaction
|
||||
}
|
||||
},
|
||||
status,
|
||||
error,
|
||||
active_context_tokens_before: observation.active_context_tokens_before,
|
||||
active_context_tokens_after: observation.active_context_tokens_after,
|
||||
started_at: u64::try_from(observation.started_at).unwrap_or_default(),
|
||||
completed_at: u64::try_from(observation.ended_at).unwrap_or_default(),
|
||||
duration_ms: observation
|
||||
.duration_ms
|
||||
.and_then(|duration_ms| u64::try_from(duration_ms).ok()),
|
||||
}
|
||||
}
|
||||
|
||||
fn tracking_from_fields(model_slug: &str, thread_id: &str, turn_id: &str) -> TrackEventsContext {
|
||||
TrackEventsContext {
|
||||
model_slug: model_slug.to_string(),
|
||||
thread_id: thread_id.to_string(),
|
||||
turn_id: turn_id.to_string(),
|
||||
}
|
||||
}
|
||||
|
||||
fn app_invocation_from_fields(
|
||||
connector_id: Option<&str>,
|
||||
app_name: Option<&str>,
|
||||
invocation_type: Option<events::InvocationType>,
|
||||
) -> AppInvocation {
|
||||
AppInvocation {
|
||||
connector_id: connector_id.map(str::to_string),
|
||||
app_name: app_name.map(str::to_string),
|
||||
invocation_type: invocation_type.map(map_invocation_type),
|
||||
}
|
||||
}
|
||||
|
||||
fn plugin_metadata_from_fields(
|
||||
plugin_id: &str,
|
||||
plugin_name: &str,
|
||||
marketplace_name: &str,
|
||||
has_skills: Option<bool>,
|
||||
mcp_server_count: Option<usize>,
|
||||
connector_ids: Option<&[String]>,
|
||||
) -> CodexPluginMetadata {
|
||||
CodexPluginMetadata {
|
||||
plugin_id: Some(plugin_id.to_string()),
|
||||
plugin_name: Some(plugin_name.to_string()),
|
||||
marketplace_name: Some(marketplace_name.to_string()),
|
||||
has_skills,
|
||||
mcp_server_count,
|
||||
connector_ids: connector_ids.map(<[String]>::to_vec),
|
||||
product_client_id: Some(originator().value),
|
||||
}
|
||||
}
|
||||
|
||||
fn map_invocation_type(invocation_type: events::InvocationType) -> facts::InvocationType {
|
||||
match invocation_type {
|
||||
events::InvocationType::Explicit => facts::InvocationType::Explicit,
|
||||
events::InvocationType::Implicit => facts::InvocationType::Implicit,
|
||||
}
|
||||
}
|
||||
230
codex-rs/analytics/src/observation_reducer.rs
Normal file
230
codex-rs/analytics/src/observation_reducer.rs
Normal file
@@ -0,0 +1,230 @@
|
||||
//! Analytics reducer entrypoint for shared observations.
|
||||
//!
|
||||
//! This module deliberately reuses the existing analytics reducer while the
|
||||
//! shared observation stream is being introduced. That gives conformance tests
|
||||
//! a small, stable bridge: one side feeds legacy analytics facts and the other
|
||||
//! feeds typed observations, then both paths must produce identical track
|
||||
//! requests.
|
||||
|
||||
use crate::events::TrackEventRequest;
|
||||
use crate::facts::AnalyticsFact;
|
||||
use crate::facts::CustomAnalyticsFact;
|
||||
use crate::observation_projection;
|
||||
use crate::reducer::AnalyticsReducer;
|
||||
use crate::review_observation_projection;
|
||||
use codex_observability::events;
|
||||
|
||||
/// Analytics reducer entrypoint for typed observations.
|
||||
#[derive(Default)]
|
||||
pub(crate) struct AnalyticsObservationReducer {
|
||||
legacy: AnalyticsReducer,
|
||||
}
|
||||
|
||||
impl AnalyticsObservationReducer {
|
||||
/// Feeds an existing analytics fact into the wrapped reducer for conformance tests.
|
||||
///
|
||||
/// The observation stream is being introduced incrementally, so tests need
|
||||
/// to hold the not-yet-migrated lifecycle context constant while swapping a
|
||||
/// specific source from legacy facts to observations.
|
||||
#[cfg(test)]
|
||||
pub(crate) async fn ingest_existing_fact_for_test(
|
||||
&mut self,
|
||||
fact: AnalyticsFact,
|
||||
out: &mut Vec<TrackEventRequest>,
|
||||
) {
|
||||
self.legacy.ingest(fact, out).await;
|
||||
}
|
||||
|
||||
/// Ingests a skill.invoked observation and emits the current analytics event.
|
||||
pub(crate) async fn ingest_skill_invoked(
|
||||
&mut self,
|
||||
observation: events::SkillInvoked<'_>,
|
||||
out: &mut Vec<TrackEventRequest>,
|
||||
) {
|
||||
self.legacy
|
||||
.ingest(
|
||||
AnalyticsFact::Custom(CustomAnalyticsFact::SkillInvoked(
|
||||
observation_projection::skill_invoked_input(observation),
|
||||
)),
|
||||
out,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
/// Ingests an app.mentioned observation and emits the current analytics event.
|
||||
pub(crate) async fn ingest_app_mentioned(
|
||||
&mut self,
|
||||
observation: events::AppMentioned<'_>,
|
||||
out: &mut Vec<TrackEventRequest>,
|
||||
) {
|
||||
self.legacy
|
||||
.ingest(
|
||||
AnalyticsFact::Custom(CustomAnalyticsFact::AppMentioned(
|
||||
observation_projection::app_mentioned_input(observation),
|
||||
)),
|
||||
out,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
/// Ingests an app.used observation and emits the current analytics event.
|
||||
pub(crate) async fn ingest_app_used(
|
||||
&mut self,
|
||||
observation: events::AppUsed<'_>,
|
||||
out: &mut Vec<TrackEventRequest>,
|
||||
) {
|
||||
self.legacy
|
||||
.ingest(
|
||||
AnalyticsFact::Custom(CustomAnalyticsFact::AppUsed(
|
||||
observation_projection::app_used_input(observation),
|
||||
)),
|
||||
out,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
/// Ingests a thread.started observation into the current analytics reducer state.
|
||||
pub(crate) fn ingest_thread_started(
|
||||
&mut self,
|
||||
connection_id: u64,
|
||||
observation: events::ThreadStarted<'_>,
|
||||
out: &mut Vec<TrackEventRequest>,
|
||||
) {
|
||||
self.legacy
|
||||
.ingest_observed_thread_started(connection_id, observation, out);
|
||||
}
|
||||
|
||||
/// Ingests a turn.started observation into the current turn-event reducer state.
|
||||
pub(crate) async fn ingest_turn_started(
|
||||
&mut self,
|
||||
observation: events::TurnStarted<'_>,
|
||||
out: &mut Vec<TrackEventRequest>,
|
||||
) {
|
||||
self.legacy
|
||||
.ingest(
|
||||
AnalyticsFact::Custom(CustomAnalyticsFact::TurnResolvedConfig(Box::new(
|
||||
observation_projection::turn_resolved_config_fact(&observation),
|
||||
))),
|
||||
out,
|
||||
)
|
||||
.await;
|
||||
|
||||
self.legacy
|
||||
.ingest(
|
||||
AnalyticsFact::Notification(Box::new(
|
||||
observation_projection::turn_started_notification(observation),
|
||||
)),
|
||||
out,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
/// Ingests a turn.ended observation into the current turn-event reducer state.
|
||||
pub(crate) async fn ingest_turn_ended(
|
||||
&mut self,
|
||||
observation: events::TurnEnded<'_>,
|
||||
out: &mut Vec<TrackEventRequest>,
|
||||
) {
|
||||
if let Some(token_usage) = observation_projection::turn_token_usage_fact(&observation) {
|
||||
self.legacy
|
||||
.ingest(
|
||||
AnalyticsFact::Custom(CustomAnalyticsFact::TurnTokenUsage(Box::new(
|
||||
token_usage,
|
||||
))),
|
||||
out,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
self.legacy
|
||||
.ingest(
|
||||
AnalyticsFact::Notification(Box::new(
|
||||
observation_projection::turn_ended_notification(observation),
|
||||
)),
|
||||
out,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
/// Ingests a turn.steer observation and emits the current analytics event.
|
||||
pub(crate) fn ingest_turn_steer(
|
||||
&mut self,
|
||||
connection_id: u64,
|
||||
observation: events::TurnSteer<'_>,
|
||||
out: &mut Vec<TrackEventRequest>,
|
||||
) {
|
||||
self.legacy
|
||||
.ingest_observed_turn_steer(connection_id, observation, out);
|
||||
}
|
||||
|
||||
/// Ingests a compaction.ended observation and emits the current analytics event.
|
||||
pub(crate) async fn ingest_compaction_ended(
|
||||
&mut self,
|
||||
observation: events::CompactionEnded<'_>,
|
||||
out: &mut Vec<TrackEventRequest>,
|
||||
) {
|
||||
self.legacy
|
||||
.ingest(
|
||||
AnalyticsFact::Custom(CustomAnalyticsFact::Compaction(Box::new(
|
||||
observation_projection::compaction_ended_event(observation),
|
||||
))),
|
||||
out,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
/// Ingests a review.completed observation for the legacy guardian analytics event.
|
||||
///
|
||||
/// User review responses are represented in the shared observation type but
|
||||
/// do not have a legacy analytics event today.
|
||||
pub(crate) async fn ingest_review_completed(
|
||||
&mut self,
|
||||
observation: events::ReviewCompleted<'_>,
|
||||
out: &mut Vec<TrackEventRequest>,
|
||||
) {
|
||||
let Some(guardian_review) =
|
||||
review_observation_projection::legacy_guardian_review_event(observation)
|
||||
else {
|
||||
return;
|
||||
};
|
||||
self.legacy
|
||||
.ingest(
|
||||
AnalyticsFact::Custom(CustomAnalyticsFact::GuardianReview(Box::new(
|
||||
guardian_review,
|
||||
))),
|
||||
out,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
/// Ingests a hook.run_completed observation and emits the current analytics event.
|
||||
pub(crate) fn ingest_hook_run_completed(
|
||||
&mut self,
|
||||
observation: events::HookRunCompleted<'_>,
|
||||
out: &mut Vec<TrackEventRequest>,
|
||||
) {
|
||||
out.push(observation_projection::hook_run_completed_event(
|
||||
observation,
|
||||
));
|
||||
}
|
||||
|
||||
/// Ingests a plugin.used observation and emits the current analytics event.
|
||||
pub(crate) fn ingest_plugin_used(
|
||||
&mut self,
|
||||
observation: events::PluginUsed<'_>,
|
||||
out: &mut Vec<TrackEventRequest>,
|
||||
) {
|
||||
out.push(observation_projection::plugin_used_event(observation));
|
||||
}
|
||||
|
||||
/// Ingests a plugin.state_changed observation and emits the current analytics event.
|
||||
pub(crate) fn ingest_plugin_state_changed(
|
||||
&mut self,
|
||||
observation: events::PluginStateChanged<'_>,
|
||||
out: &mut Vec<TrackEventRequest>,
|
||||
) {
|
||||
out.push(observation_projection::plugin_state_changed_event(
|
||||
observation,
|
||||
));
|
||||
}
|
||||
}
|
||||
@@ -58,6 +58,7 @@ use codex_app_server_protocol::UserInput;
|
||||
use codex_git_utils::collect_git_info;
|
||||
use codex_git_utils::get_git_repo_root;
|
||||
use codex_login::default_client::originator;
|
||||
use codex_observability::events as observation_events;
|
||||
use codex_protocol::config_types::ModeKind;
|
||||
use codex_protocol::config_types::Personality;
|
||||
use codex_protocol::config_types::ReasoningSummary;
|
||||
@@ -91,6 +92,18 @@ struct ThreadMetadataState {
|
||||
parent_thread_id: Option<String>,
|
||||
}
|
||||
|
||||
struct ThreadInitializedInput {
|
||||
connection_id: u64,
|
||||
thread_id: String,
|
||||
thread_source: Option<&'static str>,
|
||||
initialization_mode: ThreadInitializationMode,
|
||||
subagent_source: Option<String>,
|
||||
parent_thread_id: Option<String>,
|
||||
model: String,
|
||||
ephemeral: bool,
|
||||
created_at: u64,
|
||||
}
|
||||
|
||||
impl ThreadMetadataState {
|
||||
fn from_thread_metadata(
|
||||
session_source: &SessionSource,
|
||||
@@ -670,11 +683,150 @@ impl AnalyticsReducer {
|
||||
) {
|
||||
let thread_source: SessionSource = thread.source.into();
|
||||
let thread_id = thread.id;
|
||||
let thread_metadata =
|
||||
ThreadMetadataState::from_thread_metadata(&thread_source, initialization_mode);
|
||||
self.emit_thread_initialized_event(
|
||||
ThreadInitializedInput {
|
||||
connection_id,
|
||||
thread_id,
|
||||
thread_source: thread_metadata.thread_source,
|
||||
initialization_mode: thread_metadata.initialization_mode,
|
||||
subagent_source: thread_metadata.subagent_source,
|
||||
parent_thread_id: thread_metadata.parent_thread_id,
|
||||
model,
|
||||
ephemeral: thread.ephemeral,
|
||||
created_at: u64::try_from(thread.created_at).unwrap_or_default(),
|
||||
},
|
||||
out,
|
||||
);
|
||||
}
|
||||
|
||||
pub(crate) fn ingest_observed_thread_started(
|
||||
&mut self,
|
||||
connection_id: u64,
|
||||
observation: observation_events::ThreadStarted<'_>,
|
||||
out: &mut Vec<TrackEventRequest>,
|
||||
) {
|
||||
let (thread_source, subagent_source) = match observation.source {
|
||||
observation_events::ThreadSource::User => (Some("user"), None),
|
||||
observation_events::ThreadSource::Subagent(kind) => (
|
||||
Some("subagent"),
|
||||
Some(match kind {
|
||||
observation_events::ThreadSubagentKind::Review => "review".to_string(),
|
||||
observation_events::ThreadSubagentKind::Compact => "compact".to_string(),
|
||||
observation_events::ThreadSubagentKind::ThreadSpawn => {
|
||||
"thread_spawn".to_string()
|
||||
}
|
||||
observation_events::ThreadSubagentKind::MemoryConsolidation => {
|
||||
"memory_consolidation".to_string()
|
||||
}
|
||||
observation_events::ThreadSubagentKind::Other(source) => source.to_string(),
|
||||
}),
|
||||
),
|
||||
observation_events::ThreadSource::AppServer
|
||||
| observation_events::ThreadSource::Custom(_)
|
||||
| observation_events::ThreadSource::Unknown => (None, None),
|
||||
};
|
||||
let initialization_mode = match observation.initialization_mode {
|
||||
observation_events::ThreadInitializationMode::New => ThreadInitializationMode::New,
|
||||
observation_events::ThreadInitializationMode::Forked => {
|
||||
ThreadInitializationMode::Forked
|
||||
}
|
||||
observation_events::ThreadInitializationMode::Resumed => {
|
||||
ThreadInitializationMode::Resumed
|
||||
}
|
||||
};
|
||||
|
||||
let input = ThreadInitializedInput {
|
||||
connection_id,
|
||||
thread_id: observation.thread_id.to_string(),
|
||||
thread_source,
|
||||
initialization_mode,
|
||||
subagent_source,
|
||||
parent_thread_id: observation.parent_thread_id.map(str::to_string),
|
||||
model: observation.model.to_string(),
|
||||
ephemeral: observation.ephemeral,
|
||||
created_at: u64::try_from(observation.created_at).unwrap_or_default(),
|
||||
};
|
||||
self.emit_thread_initialized_event(input, out);
|
||||
}
|
||||
|
||||
pub(crate) fn ingest_observed_turn_steer(
|
||||
&mut self,
|
||||
connection_id: u64,
|
||||
observation: observation_events::TurnSteer<'_>,
|
||||
out: &mut Vec<TrackEventRequest>,
|
||||
) {
|
||||
if let Some(accepted_turn_id) = observation.accepted_turn_id
|
||||
&& let Some(turn_state) = self.turns.get_mut(accepted_turn_id)
|
||||
{
|
||||
turn_state.steer_count += 1;
|
||||
}
|
||||
|
||||
self.emit_turn_steer_event(
|
||||
connection_id,
|
||||
PendingTurnSteerState {
|
||||
thread_id: observation.thread_id.to_string(),
|
||||
expected_turn_id: observation.expected_turn_id.to_string(),
|
||||
num_input_images: observation.num_input_images,
|
||||
created_at: u64::try_from(observation.created_at).unwrap_or_default(),
|
||||
},
|
||||
observation.accepted_turn_id.map(str::to_string),
|
||||
match observation.result {
|
||||
observation_events::TurnSteerResult::Accepted => TurnSteerResult::Accepted,
|
||||
observation_events::TurnSteerResult::Rejected => TurnSteerResult::Rejected,
|
||||
},
|
||||
observation
|
||||
.rejection_reason
|
||||
.map(|rejection_reason| match rejection_reason {
|
||||
observation_events::TurnSteerRejectionReason::NoActiveTurn => {
|
||||
TurnSteerRejectionReason::NoActiveTurn
|
||||
}
|
||||
observation_events::TurnSteerRejectionReason::ExpectedTurnMismatch => {
|
||||
TurnSteerRejectionReason::ExpectedTurnMismatch
|
||||
}
|
||||
observation_events::TurnSteerRejectionReason::NonSteerableReview => {
|
||||
TurnSteerRejectionReason::NonSteerableReview
|
||||
}
|
||||
observation_events::TurnSteerRejectionReason::NonSteerableCompact => {
|
||||
TurnSteerRejectionReason::NonSteerableCompact
|
||||
}
|
||||
observation_events::TurnSteerRejectionReason::EmptyInput => {
|
||||
TurnSteerRejectionReason::EmptyInput
|
||||
}
|
||||
observation_events::TurnSteerRejectionReason::InputTooLarge => {
|
||||
TurnSteerRejectionReason::InputTooLarge
|
||||
}
|
||||
}),
|
||||
out,
|
||||
);
|
||||
}
|
||||
|
||||
fn emit_thread_initialized_event(
|
||||
&mut self,
|
||||
input: ThreadInitializedInput,
|
||||
out: &mut Vec<TrackEventRequest>,
|
||||
) {
|
||||
let ThreadInitializedInput {
|
||||
connection_id,
|
||||
thread_id,
|
||||
thread_source,
|
||||
initialization_mode,
|
||||
subagent_source,
|
||||
parent_thread_id,
|
||||
model,
|
||||
ephemeral,
|
||||
created_at,
|
||||
} = input;
|
||||
let thread_metadata = ThreadMetadataState {
|
||||
thread_source,
|
||||
initialization_mode,
|
||||
subagent_source,
|
||||
parent_thread_id,
|
||||
};
|
||||
let Some(connection_state) = self.connections.get(&connection_id) else {
|
||||
return;
|
||||
};
|
||||
let thread_metadata =
|
||||
ThreadMetadataState::from_thread_metadata(&thread_source, initialization_mode);
|
||||
self.thread_connections
|
||||
.insert(thread_id.clone(), connection_id);
|
||||
self.thread_metadata
|
||||
@@ -687,12 +839,12 @@ impl AnalyticsReducer {
|
||||
app_server_client: connection_state.app_server_client.clone(),
|
||||
runtime: connection_state.runtime.clone(),
|
||||
model,
|
||||
ephemeral: thread.ephemeral,
|
||||
ephemeral,
|
||||
thread_source: thread_metadata.thread_source,
|
||||
initialization_mode,
|
||||
initialization_mode: thread_metadata.initialization_mode,
|
||||
subagent_source: thread_metadata.subagent_source,
|
||||
parent_thread_id: thread_metadata.parent_thread_id,
|
||||
created_at: u64::try_from(thread.created_at).unwrap_or_default(),
|
||||
created_at,
|
||||
},
|
||||
},
|
||||
));
|
||||
|
||||
471
codex-rs/analytics/src/review_observation_projection.rs
Normal file
471
codex-rs/analytics/src/review_observation_projection.rs
Normal file
@@ -0,0 +1,471 @@
|
||||
//! Projection from review observations into the current analytics schema.
|
||||
|
||||
use crate::events::GuardianApprovalRequestSource as AnalyticsGuardianApprovalRequestSource;
|
||||
use crate::events::GuardianCommandSource as AnalyticsGuardianCommandSource;
|
||||
use crate::events::GuardianReviewDecision as AnalyticsGuardianReviewDecision;
|
||||
use crate::events::GuardianReviewEventParams;
|
||||
use crate::events::GuardianReviewFailureReason as AnalyticsGuardianReviewFailureReason;
|
||||
use crate::events::GuardianReviewOutcome as AnalyticsGuardianReviewOutcome;
|
||||
use crate::events::GuardianReviewRiskLevel as AnalyticsGuardianReviewRiskLevel;
|
||||
use crate::events::GuardianReviewSessionKind as AnalyticsGuardianReviewSessionKind;
|
||||
use crate::events::GuardianReviewTerminalStatus as AnalyticsGuardianReviewTerminalStatus;
|
||||
use crate::events::GuardianReviewUserAuthorization as AnalyticsGuardianReviewUserAuthorization;
|
||||
use crate::events::GuardianReviewedAction as AnalyticsGuardianReviewedAction;
|
||||
use codex_observability::events;
|
||||
use codex_protocol::approvals::NetworkApprovalProtocol;
|
||||
use codex_protocol::models::PermissionProfile;
|
||||
use codex_protocol::models::SandboxPermissions;
|
||||
|
||||
/// Projects a generic review completion into the legacy guardian analytics event.
|
||||
///
|
||||
/// This preserves the existing guardian analytics payload, including reviewed
|
||||
/// action details and guardian rationale that are explicitly marked for
|
||||
/// analytics on the observation fields.
|
||||
///
|
||||
/// User review responses intentionally return None: the current analytics
|
||||
/// schema only has a guardian review event, while the shared observation is
|
||||
/// generic enough to represent both user and guardian review completions.
|
||||
pub(crate) fn legacy_guardian_review_event(
|
||||
observation: events::ReviewCompleted<'_>,
|
||||
) -> Option<GuardianReviewEventParams> {
|
||||
let events::ReviewResponse::Guardian(response) = observation.response else {
|
||||
return None;
|
||||
};
|
||||
let (terminal_status, failure_reason) = match response.terminal_status {
|
||||
events::ReviewTerminalStatus::Approved => {
|
||||
(AnalyticsGuardianReviewTerminalStatus::Approved, None)
|
||||
}
|
||||
events::ReviewTerminalStatus::Denied => {
|
||||
(AnalyticsGuardianReviewTerminalStatus::Denied, None)
|
||||
}
|
||||
events::ReviewTerminalStatus::Aborted { failure_reason } => (
|
||||
AnalyticsGuardianReviewTerminalStatus::Aborted,
|
||||
failure_reason.map(review_failure_reason),
|
||||
),
|
||||
events::ReviewTerminalStatus::TimedOut { failure_reason } => (
|
||||
AnalyticsGuardianReviewTerminalStatus::TimedOut,
|
||||
failure_reason.map(review_failure_reason),
|
||||
),
|
||||
events::ReviewTerminalStatus::FailedClosed { failure_reason } => (
|
||||
AnalyticsGuardianReviewTerminalStatus::FailedClosed,
|
||||
failure_reason.map(review_failure_reason),
|
||||
),
|
||||
};
|
||||
let token_usage = response.token_usage;
|
||||
let guardian_session = response.session;
|
||||
|
||||
Some(GuardianReviewEventParams {
|
||||
thread_id: observation.thread_id.to_string(),
|
||||
turn_id: observation.turn_id.to_string(),
|
||||
review_id: observation.review_id.to_string(),
|
||||
target_item_id: observation.target_item_id.to_string(),
|
||||
retry_reason: observation.retry_reason.map(str::to_string),
|
||||
approval_request_source: match observation.request_source {
|
||||
events::ReviewRequestSource::MainTurn => {
|
||||
AnalyticsGuardianApprovalRequestSource::MainTurn
|
||||
}
|
||||
events::ReviewRequestSource::DelegatedSubagent => {
|
||||
AnalyticsGuardianApprovalRequestSource::DelegatedSubagent
|
||||
}
|
||||
},
|
||||
reviewed_action: reviewed_action(observation.reviewed_action),
|
||||
reviewed_action_truncated: observation.reviewed_action_truncated,
|
||||
decision: match response.decision {
|
||||
events::ReviewDecision::Approved => AnalyticsGuardianReviewDecision::Approved,
|
||||
events::ReviewDecision::Denied => AnalyticsGuardianReviewDecision::Denied,
|
||||
events::ReviewDecision::Aborted => AnalyticsGuardianReviewDecision::Aborted,
|
||||
},
|
||||
terminal_status,
|
||||
failure_reason,
|
||||
risk_level: response.risk_level.map(|risk_level| match risk_level {
|
||||
events::ReviewRiskLevel::Low => AnalyticsGuardianReviewRiskLevel::Low,
|
||||
events::ReviewRiskLevel::Medium => AnalyticsGuardianReviewRiskLevel::Medium,
|
||||
events::ReviewRiskLevel::High => AnalyticsGuardianReviewRiskLevel::High,
|
||||
events::ReviewRiskLevel::Critical => AnalyticsGuardianReviewRiskLevel::Critical,
|
||||
}),
|
||||
user_authorization: response.user_authorization.map(|user_authorization| {
|
||||
match user_authorization {
|
||||
events::ReviewUserAuthorization::Unknown => {
|
||||
AnalyticsGuardianReviewUserAuthorization::Unknown
|
||||
}
|
||||
events::ReviewUserAuthorization::Low => {
|
||||
AnalyticsGuardianReviewUserAuthorization::Low
|
||||
}
|
||||
events::ReviewUserAuthorization::Medium => {
|
||||
AnalyticsGuardianReviewUserAuthorization::Medium
|
||||
}
|
||||
events::ReviewUserAuthorization::High => {
|
||||
AnalyticsGuardianReviewUserAuthorization::High
|
||||
}
|
||||
}
|
||||
}),
|
||||
outcome: response.outcome.map(|outcome| match outcome {
|
||||
events::ReviewOutcome::Allow => AnalyticsGuardianReviewOutcome::Allow,
|
||||
events::ReviewOutcome::Deny => AnalyticsGuardianReviewOutcome::Deny,
|
||||
}),
|
||||
rationale: response.rationale.map(str::to_string),
|
||||
guardian_thread_id: guardian_session.map(|session| session.guardian_thread_id.to_string()),
|
||||
guardian_session_kind: guardian_session.map(|session| match session.session_kind {
|
||||
events::GuardianReviewSessionKind::TrunkNew => {
|
||||
AnalyticsGuardianReviewSessionKind::TrunkNew
|
||||
}
|
||||
events::GuardianReviewSessionKind::TrunkReused => {
|
||||
AnalyticsGuardianReviewSessionKind::TrunkReused
|
||||
}
|
||||
events::GuardianReviewSessionKind::EphemeralForked => {
|
||||
AnalyticsGuardianReviewSessionKind::EphemeralForked
|
||||
}
|
||||
}),
|
||||
guardian_model: guardian_session.map(|session| session.model.to_string()),
|
||||
guardian_reasoning_effort: guardian_session
|
||||
.and_then(|session| session.reasoning_effort.map(str::to_string)),
|
||||
had_prior_review_context: guardian_session.map(|session| session.had_prior_review_context),
|
||||
review_timeout_ms: response.review_timeout_ms,
|
||||
tool_call_count: response.tool_call_count,
|
||||
time_to_first_token_ms: response.time_to_first_token_ms,
|
||||
completion_latency_ms: response.completion_latency_ms,
|
||||
started_at: u64::try_from(observation.started_at).unwrap_or_default(),
|
||||
completed_at: Some(u64::try_from(observation.ended_at).unwrap_or_default()),
|
||||
input_tokens: token_usage.map(|token_usage| token_usage.input_tokens),
|
||||
cached_input_tokens: token_usage.map(|token_usage| token_usage.cached_input_tokens),
|
||||
output_tokens: token_usage.map(|token_usage| token_usage.output_tokens),
|
||||
reasoning_output_tokens: token_usage.map(|token_usage| token_usage.reasoning_output_tokens),
|
||||
total_tokens: token_usage.map(|token_usage| token_usage.total_tokens),
|
||||
})
|
||||
}
|
||||
|
||||
fn reviewed_action(action: events::ReviewedAction<'_>) -> AnalyticsGuardianReviewedAction {
|
||||
match action {
|
||||
events::ReviewedAction::Shell {
|
||||
command,
|
||||
command_display,
|
||||
cwd,
|
||||
sandbox_permissions,
|
||||
additional_permissions,
|
||||
justification,
|
||||
} => AnalyticsGuardianReviewedAction::Shell {
|
||||
command: command.to_vec(),
|
||||
command_display: command_display.to_string(),
|
||||
cwd: cwd.to_string(),
|
||||
sandbox_permissions: sandbox_permissions_for_review(sandbox_permissions),
|
||||
additional_permissions: additional_permissions.and_then(permission_profile_for_review),
|
||||
justification: justification.map(str::to_string),
|
||||
},
|
||||
events::ReviewedAction::UnifiedExec {
|
||||
command,
|
||||
command_display,
|
||||
cwd,
|
||||
sandbox_permissions,
|
||||
additional_permissions,
|
||||
justification,
|
||||
tty,
|
||||
} => AnalyticsGuardianReviewedAction::UnifiedExec {
|
||||
command: command.to_vec(),
|
||||
command_display: command_display.to_string(),
|
||||
cwd: cwd.to_string(),
|
||||
sandbox_permissions: sandbox_permissions_for_review(sandbox_permissions),
|
||||
additional_permissions: additional_permissions.and_then(permission_profile_for_review),
|
||||
justification: justification.map(str::to_string),
|
||||
tty,
|
||||
},
|
||||
events::ReviewedAction::ProcessExec {
|
||||
source,
|
||||
program,
|
||||
argv,
|
||||
cwd,
|
||||
additional_permissions,
|
||||
} => AnalyticsGuardianReviewedAction::Execve {
|
||||
source: match source {
|
||||
events::ReviewCommandSource::Shell => AnalyticsGuardianCommandSource::Shell,
|
||||
events::ReviewCommandSource::UnifiedExec => {
|
||||
AnalyticsGuardianCommandSource::UnifiedExec
|
||||
}
|
||||
},
|
||||
program: program.to_string(),
|
||||
argv: argv.to_vec(),
|
||||
cwd: cwd.to_string(),
|
||||
additional_permissions: additional_permissions.and_then(permission_profile_for_review),
|
||||
},
|
||||
events::ReviewedAction::ApplyPatch { cwd, files } => {
|
||||
AnalyticsGuardianReviewedAction::ApplyPatch {
|
||||
cwd: cwd.to_string(),
|
||||
files: files.to_vec(),
|
||||
}
|
||||
}
|
||||
events::ReviewedAction::NetworkAccess {
|
||||
target,
|
||||
host,
|
||||
protocol,
|
||||
port,
|
||||
} => AnalyticsGuardianReviewedAction::NetworkAccess {
|
||||
target: target.to_string(),
|
||||
host: host.to_string(),
|
||||
protocol: match protocol {
|
||||
events::ReviewNetworkApprovalProtocol::Http => NetworkApprovalProtocol::Http,
|
||||
events::ReviewNetworkApprovalProtocol::Https => NetworkApprovalProtocol::Https,
|
||||
events::ReviewNetworkApprovalProtocol::Socks5Tcp => {
|
||||
NetworkApprovalProtocol::Socks5Tcp
|
||||
}
|
||||
events::ReviewNetworkApprovalProtocol::Socks5Udp => {
|
||||
NetworkApprovalProtocol::Socks5Udp
|
||||
}
|
||||
},
|
||||
port,
|
||||
},
|
||||
events::ReviewedAction::McpToolCall {
|
||||
server,
|
||||
tool_name,
|
||||
connector_id,
|
||||
connector_name,
|
||||
tool_title,
|
||||
} => AnalyticsGuardianReviewedAction::McpToolCall {
|
||||
server: server.to_string(),
|
||||
tool_name: tool_name.to_string(),
|
||||
connector_id: connector_id.map(str::to_string),
|
||||
connector_name: connector_name.map(str::to_string),
|
||||
tool_title: tool_title.map(str::to_string),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
fn sandbox_permissions_for_review(
|
||||
sandbox_permissions: events::ReviewSandboxPermissions,
|
||||
) -> SandboxPermissions {
|
||||
match sandbox_permissions {
|
||||
events::ReviewSandboxPermissions::UseDefault => SandboxPermissions::UseDefault,
|
||||
events::ReviewSandboxPermissions::RequireEscalated => SandboxPermissions::RequireEscalated,
|
||||
events::ReviewSandboxPermissions::WithAdditionalPermissions => {
|
||||
SandboxPermissions::WithAdditionalPermissions
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn permission_profile_for_review(
|
||||
profile: events::ReviewPermissionProfile<'_>,
|
||||
) -> Option<PermissionProfile> {
|
||||
// Keep observability independent of codex-protocol types. The protocol
|
||||
// serde shape is the compatibility boundary for this nested payload.
|
||||
let value = serde_json::to_value(profile).ok()?;
|
||||
serde_json::from_value(value).ok()
|
||||
}
|
||||
|
||||
fn review_failure_reason(
|
||||
failure_reason: events::ReviewFailureReason,
|
||||
) -> AnalyticsGuardianReviewFailureReason {
|
||||
match failure_reason {
|
||||
events::ReviewFailureReason::Timeout => AnalyticsGuardianReviewFailureReason::Timeout,
|
||||
events::ReviewFailureReason::Cancelled => AnalyticsGuardianReviewFailureReason::Cancelled,
|
||||
events::ReviewFailureReason::PromptBuildError => {
|
||||
AnalyticsGuardianReviewFailureReason::PromptBuildError
|
||||
}
|
||||
events::ReviewFailureReason::SessionError => {
|
||||
AnalyticsGuardianReviewFailureReason::SessionError
|
||||
}
|
||||
events::ReviewFailureReason::ParseError => AnalyticsGuardianReviewFailureReason::ParseError,
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use pretty_assertions::assert_eq;
|
||||
use serde_json::json;
|
||||
|
||||
fn projected_reviewed_action(action: events::ReviewedAction<'_>) -> serde_json::Value {
|
||||
let event =
|
||||
legacy_guardian_review_event(review_completed_with_action(action)).expect("project");
|
||||
serde_json::to_value(event.reviewed_action).expect("serialize reviewed action")
|
||||
}
|
||||
|
||||
fn review_completed_with_action(
|
||||
reviewed_action: events::ReviewedAction<'_>,
|
||||
) -> events::ReviewCompleted<'_> {
|
||||
events::ReviewCompleted {
|
||||
thread_id: "thread-1",
|
||||
turn_id: "turn-1",
|
||||
review_id: "review-1",
|
||||
target_item_id: "item-1",
|
||||
retry_reason: None,
|
||||
request_source: events::ReviewRequestSource::MainTurn,
|
||||
reviewed_action,
|
||||
reviewed_action_truncated: false,
|
||||
response: events::ReviewResponse::Guardian(events::GuardianReviewResponse {
|
||||
decision: events::ReviewDecision::Approved,
|
||||
terminal_status: events::ReviewTerminalStatus::Approved,
|
||||
risk_level: None,
|
||||
user_authorization: None,
|
||||
outcome: None,
|
||||
rationale: None,
|
||||
session: None,
|
||||
review_timeout_ms: 30_000,
|
||||
tool_call_count: 0,
|
||||
time_to_first_token_ms: None,
|
||||
completion_latency_ms: None,
|
||||
token_usage: None,
|
||||
}),
|
||||
started_at: 1,
|
||||
ended_at: 2,
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn legacy_guardian_projection_ignores_user_review_responses() {
|
||||
let observation = events::ReviewCompleted {
|
||||
thread_id: "thread-1",
|
||||
turn_id: "turn-1",
|
||||
review_id: "review-1",
|
||||
target_item_id: "item-1",
|
||||
retry_reason: None,
|
||||
request_source: events::ReviewRequestSource::MainTurn,
|
||||
reviewed_action: events::ReviewedAction::ApplyPatch {
|
||||
cwd: "/repo",
|
||||
files: &[],
|
||||
},
|
||||
reviewed_action_truncated: false,
|
||||
response: events::ReviewResponse::User(events::UserReviewResponse {
|
||||
decision: events::ReviewDecision::Approved,
|
||||
}),
|
||||
started_at: 1,
|
||||
ended_at: 2,
|
||||
};
|
||||
|
||||
assert!(legacy_guardian_review_event(observation).is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn projects_shell_reviewed_action_with_permission_profile() {
|
||||
let command = vec!["git".to_string(), "status".to_string()];
|
||||
let read_paths = vec!["/repo".to_string()];
|
||||
let write_paths = vec!["/repo/tmp".to_string()];
|
||||
|
||||
let action = events::ReviewedAction::Shell {
|
||||
command: &command,
|
||||
command_display: "git status",
|
||||
cwd: "/repo",
|
||||
sandbox_permissions: events::ReviewSandboxPermissions::WithAdditionalPermissions,
|
||||
additional_permissions: Some(events::ReviewPermissionProfile {
|
||||
network: Some(events::ReviewNetworkPermissions {
|
||||
enabled: Some(true),
|
||||
}),
|
||||
file_system: Some(events::ReviewFileSystemPermissions {
|
||||
read: Some(&read_paths),
|
||||
write: Some(&write_paths),
|
||||
}),
|
||||
}),
|
||||
justification: Some("inspect repository state"),
|
||||
};
|
||||
|
||||
assert_eq!(
|
||||
projected_reviewed_action(action),
|
||||
json!({
|
||||
"type": "shell",
|
||||
"command": ["git", "status"],
|
||||
"command_display": "git status",
|
||||
"cwd": "/repo",
|
||||
"sandbox_permissions": "with_additional_permissions",
|
||||
"additional_permissions": {
|
||||
"network": {
|
||||
"enabled": true
|
||||
},
|
||||
"file_system": {
|
||||
"read": ["/repo"],
|
||||
"write": ["/repo/tmp"]
|
||||
}
|
||||
},
|
||||
"justification": "inspect repository state"
|
||||
})
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn projects_remaining_reviewed_action_variants() {
|
||||
let unified_command = vec!["cargo".to_string(), "test".to_string()];
|
||||
assert_eq!(
|
||||
projected_reviewed_action(events::ReviewedAction::UnifiedExec {
|
||||
command: &unified_command,
|
||||
command_display: "cargo test",
|
||||
cwd: "/repo",
|
||||
sandbox_permissions: events::ReviewSandboxPermissions::RequireEscalated,
|
||||
additional_permissions: None,
|
||||
justification: None,
|
||||
tty: true,
|
||||
}),
|
||||
json!({
|
||||
"type": "unified_exec",
|
||||
"command": ["cargo", "test"],
|
||||
"command_display": "cargo test",
|
||||
"cwd": "/repo",
|
||||
"sandbox_permissions": "require_escalated",
|
||||
"additional_permissions": null,
|
||||
"justification": null,
|
||||
"tty": true
|
||||
})
|
||||
);
|
||||
|
||||
let argv = vec!["git".to_string(), "diff".to_string()];
|
||||
assert_eq!(
|
||||
projected_reviewed_action(events::ReviewedAction::ProcessExec {
|
||||
source: events::ReviewCommandSource::UnifiedExec,
|
||||
program: "git",
|
||||
argv: &argv,
|
||||
cwd: "/repo",
|
||||
additional_permissions: None,
|
||||
}),
|
||||
json!({
|
||||
"type": "execve",
|
||||
"source": "unified_exec",
|
||||
"program": "git",
|
||||
"argv": ["git", "diff"],
|
||||
"cwd": "/repo",
|
||||
"additional_permissions": null
|
||||
})
|
||||
);
|
||||
|
||||
let files = vec!["src/lib.rs".to_string(), "Cargo.toml".to_string()];
|
||||
assert_eq!(
|
||||
projected_reviewed_action(events::ReviewedAction::ApplyPatch {
|
||||
cwd: "/repo",
|
||||
files: &files,
|
||||
}),
|
||||
json!({
|
||||
"type": "apply_patch",
|
||||
"cwd": "/repo",
|
||||
"files": ["src/lib.rs", "Cargo.toml"]
|
||||
})
|
||||
);
|
||||
|
||||
assert_eq!(
|
||||
projected_reviewed_action(events::ReviewedAction::NetworkAccess {
|
||||
target: "https://example.com",
|
||||
host: "example.com",
|
||||
protocol: events::ReviewNetworkApprovalProtocol::Https,
|
||||
port: 443,
|
||||
}),
|
||||
json!({
|
||||
"type": "network_access",
|
||||
"target": "https://example.com",
|
||||
"host": "example.com",
|
||||
"protocol": "https",
|
||||
"port": 443
|
||||
})
|
||||
);
|
||||
|
||||
assert_eq!(
|
||||
projected_reviewed_action(events::ReviewedAction::McpToolCall {
|
||||
server: "drive",
|
||||
tool_name: "search",
|
||||
connector_id: Some("drive-connector"),
|
||||
connector_name: Some("Drive"),
|
||||
tool_title: Some("Search Drive"),
|
||||
}),
|
||||
json!({
|
||||
"type": "mcp_tool_call",
|
||||
"server": "drive",
|
||||
"tool_name": "search",
|
||||
"connector_id": "drive-connector",
|
||||
"connector_name": "Drive",
|
||||
"tool_title": "Search Drive"
|
||||
})
|
||||
);
|
||||
}
|
||||
}
|
||||
7
codex-rs/observability-derive/BUILD.bazel
Normal file
7
codex-rs/observability-derive/BUILD.bazel
Normal file
@@ -0,0 +1,7 @@
|
||||
load("//:defs.bzl", "codex_rust_crate")
|
||||
|
||||
codex_rust_crate(
|
||||
name = "observability-derive",
|
||||
crate_name = "codex_observability_derive",
|
||||
proc_macro = True,
|
||||
)
|
||||
16
codex-rs/observability-derive/Cargo.toml
Normal file
16
codex-rs/observability-derive/Cargo.toml
Normal file
@@ -0,0 +1,16 @@
|
||||
[package]
|
||||
edition.workspace = true
|
||||
license.workspace = true
|
||||
name = "codex-observability-derive"
|
||||
version.workspace = true
|
||||
|
||||
[lib]
|
||||
proc-macro = true
|
||||
|
||||
[dependencies]
|
||||
proc-macro2 = "1"
|
||||
quote = "1"
|
||||
syn = { version = "2", features = ["full", "extra-traits"] }
|
||||
|
||||
[lints]
|
||||
workspace = true
|
||||
267
codex-rs/observability-derive/src/lib.rs
Normal file
267
codex-rs/observability-derive/src/lib.rs
Normal file
@@ -0,0 +1,267 @@
|
||||
//! Derive macro implementation for `codex-observability`.
|
||||
//!
|
||||
//! The macro is intentionally small: it turns annotated struct fields into
|
||||
//! `ObservationFieldVisitor` calls and refuses to compile if any exported field
|
||||
//! is missing policy metadata. Filtering, serialization, redaction, and
|
||||
//! destination-specific mapping stay in sinks/reducers.
|
||||
|
||||
use proc_macro::TokenStream;
|
||||
use quote::format_ident;
|
||||
use quote::quote;
|
||||
use syn::Data;
|
||||
use syn::DeriveInput;
|
||||
use syn::Expr;
|
||||
use syn::ExprLit;
|
||||
use syn::Fields;
|
||||
use syn::Lit;
|
||||
use syn::LitStr;
|
||||
use syn::Path;
|
||||
use syn::parse_macro_input;
|
||||
|
||||
/// Derives `codex_observability::Observation` for a named struct.
|
||||
///
|
||||
/// Required attributes:
|
||||
///
|
||||
/// - `#[observation(name = "domain.event")]` on the struct.
|
||||
/// - `#[obs(level = "basic|detailed|trace", class = "...")]` on every field.
|
||||
/// - Optional struct-level or field-level uses markers for exact sink
|
||||
/// selection. Field-level markers override the struct default.
|
||||
///
|
||||
/// Event definitions inside `codex-observability` itself may use
|
||||
/// `#[observation(crate = "crate")]` so generated code refers to local types
|
||||
/// instead of the externally visible crate path.
|
||||
#[proc_macro_derive(Observation, attributes(observation, obs))]
|
||||
pub fn derive_observation(input: TokenStream) -> TokenStream {
|
||||
let input = parse_macro_input!(input as DeriveInput);
|
||||
expand_observation(input)
|
||||
.unwrap_or_else(syn::Error::into_compile_error)
|
||||
.into()
|
||||
}
|
||||
|
||||
fn expand_observation(input: DeriveInput) -> syn::Result<proc_macro2::TokenStream> {
|
||||
let name = input.ident;
|
||||
let observation_attr = observation_attr(&input.attrs)?;
|
||||
let event_name = observation_attr.name;
|
||||
let crate_path = observation_attr.crate_path;
|
||||
let default_uses = observation_attr.uses;
|
||||
let Data::Struct(data) = input.data else {
|
||||
return Err(syn::Error::new_spanned(
|
||||
name,
|
||||
"Observation can only be derived for structs",
|
||||
));
|
||||
};
|
||||
let Fields::Named(fields) = data.fields else {
|
||||
return Err(syn::Error::new_spanned(
|
||||
name,
|
||||
"Observation requires a struct with named fields",
|
||||
));
|
||||
};
|
||||
|
||||
let mut visits = Vec::new();
|
||||
for field in fields.named {
|
||||
let Some(field_name) = field.ident else {
|
||||
continue;
|
||||
};
|
||||
let meta = obs_meta(&field.attrs, &field_name, &crate_path, &default_uses)?;
|
||||
let field_name_lit = LitStr::new(&field_name.to_string(), field_name.span());
|
||||
visits.push(quote! {
|
||||
visitor.field(#field_name_lit, #meta, &self.#field_name);
|
||||
});
|
||||
}
|
||||
|
||||
let (impl_generics, ty_generics, where_clause) = input.generics.split_for_impl();
|
||||
Ok(quote! {
|
||||
impl #impl_generics #crate_path::Observation for #name #ty_generics #where_clause {
|
||||
const NAME: &'static str = #event_name;
|
||||
|
||||
fn visit_fields<V: #crate_path::ObservationFieldVisitor>(&self, visitor: &mut V) {
|
||||
#(#visits)*
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
struct ObservationAttr {
|
||||
name: LitStr,
|
||||
crate_path: Path,
|
||||
uses: Vec<LitStr>,
|
||||
}
|
||||
|
||||
fn observation_attr(attrs: &[syn::Attribute]) -> syn::Result<ObservationAttr> {
|
||||
for attr in attrs {
|
||||
if !attr.path().is_ident("observation") {
|
||||
continue;
|
||||
}
|
||||
let mut name = None;
|
||||
let mut crate_path = None;
|
||||
let mut uses = Vec::new();
|
||||
attr.parse_nested_meta(|meta| {
|
||||
if meta.path.is_ident("name") {
|
||||
name = Some(meta.value()?.parse::<LitStr>()?);
|
||||
Ok(())
|
||||
} else if meta.path.is_ident("crate") {
|
||||
let value = meta.value()?.parse::<LitStr>()?;
|
||||
// The override is needed only when deriving observations from
|
||||
// inside `codex-observability`, where the external crate name is
|
||||
// not available. Ordinary users get the stable public path.
|
||||
crate_path = Some(value.parse::<Path>()?);
|
||||
Ok(())
|
||||
} else if meta.path.is_ident("uses") {
|
||||
uses = use_literals(meta.value()?.parse::<Expr>()?)?;
|
||||
Ok(())
|
||||
} else {
|
||||
Err(meta.error("unsupported observation attribute"))
|
||||
}
|
||||
})?;
|
||||
if let Some(name) = name {
|
||||
return Ok(ObservationAttr {
|
||||
name,
|
||||
crate_path: crate_path.unwrap_or_else(|| syn::parse_quote!(::codex_observability)),
|
||||
uses,
|
||||
});
|
||||
}
|
||||
}
|
||||
Err(syn::Error::new(
|
||||
proc_macro2::Span::call_site(),
|
||||
"missing #[observation(name = \"...\")]",
|
||||
))
|
||||
}
|
||||
|
||||
fn obs_meta(
|
||||
attrs: &[syn::Attribute],
|
||||
field_name: &syn::Ident,
|
||||
crate_path: &Path,
|
||||
default_uses: &[LitStr],
|
||||
) -> syn::Result<proc_macro2::TokenStream> {
|
||||
for attr in attrs {
|
||||
if !attr.path().is_ident("obs") {
|
||||
continue;
|
||||
}
|
||||
let mut level = None;
|
||||
let mut class = None;
|
||||
let mut uses = None;
|
||||
attr.parse_nested_meta(|meta| {
|
||||
if meta.path.is_ident("level") {
|
||||
level = Some(meta.value()?.parse::<LitStr>()?);
|
||||
Ok(())
|
||||
} else if meta.path.is_ident("class") {
|
||||
class = Some(meta.value()?.parse::<LitStr>()?);
|
||||
Ok(())
|
||||
} else if meta.path.is_ident("uses") {
|
||||
uses = Some(use_literals(meta.value()?.parse::<Expr>()?)?);
|
||||
Ok(())
|
||||
} else {
|
||||
Err(meta.error("unsupported obs attribute"))
|
||||
}
|
||||
})?;
|
||||
let level = level.ok_or_else(|| {
|
||||
syn::Error::new_spanned(attr, "missing obs level, expected level = \"...\"")
|
||||
})?;
|
||||
let class = class.ok_or_else(|| {
|
||||
syn::Error::new_spanned(attr, "missing obs class, expected class = \"...\"")
|
||||
})?;
|
||||
let detail = detail_expr(&level, crate_path)?;
|
||||
let data_class = data_class_expr(&class, crate_path)?;
|
||||
let uses = uses
|
||||
.as_deref()
|
||||
.unwrap_or(default_uses)
|
||||
.iter()
|
||||
.map(|value| field_use_expr(value, crate_path))
|
||||
.collect::<syn::Result<Vec<_>>>()?;
|
||||
return Ok(quote! {
|
||||
#crate_path::FieldMeta::with_uses(#detail, #data_class, &[#(#uses),*])
|
||||
});
|
||||
}
|
||||
Err(syn::Error::new_spanned(
|
||||
field_name,
|
||||
"missing #[obs(level = \"...\", class = \"...\")]",
|
||||
))
|
||||
}
|
||||
|
||||
fn use_literals(expr: Expr) -> syn::Result<Vec<LitStr>> {
|
||||
let Expr::Array(array) = expr else {
|
||||
return Err(syn::Error::new_spanned(
|
||||
expr,
|
||||
"obs uses must be a string array, for example uses = [\"analytics\"]",
|
||||
));
|
||||
};
|
||||
|
||||
array
|
||||
.elems
|
||||
.into_iter()
|
||||
.map(|elem| match elem {
|
||||
Expr::Lit(ExprLit {
|
||||
lit: Lit::Str(value),
|
||||
..
|
||||
}) => Ok(value),
|
||||
other => Err(syn::Error::new_spanned(
|
||||
other,
|
||||
"obs uses entries must be string literals",
|
||||
)),
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
fn detail_expr(value: &LitStr, crate_path: &Path) -> syn::Result<proc_macro2::TokenStream> {
|
||||
enum_expr(
|
||||
value,
|
||||
"detail level",
|
||||
&[
|
||||
("basic", "Basic"),
|
||||
("detailed", "Detailed"),
|
||||
("trace", "Trace"),
|
||||
],
|
||||
quote!(#crate_path::DetailLevel),
|
||||
)
|
||||
}
|
||||
|
||||
fn data_class_expr(value: &LitStr, crate_path: &Path) -> syn::Result<proc_macro2::TokenStream> {
|
||||
enum_expr(
|
||||
value,
|
||||
"data class",
|
||||
&[
|
||||
("identifier", "Identifier"),
|
||||
("operational", "Operational"),
|
||||
("environment", "Environment"),
|
||||
("content", "Content"),
|
||||
("secret_risk", "SecretRisk"),
|
||||
],
|
||||
quote!(#crate_path::DataClass),
|
||||
)
|
||||
}
|
||||
|
||||
fn field_use_expr(value: &LitStr, crate_path: &Path) -> syn::Result<proc_macro2::TokenStream> {
|
||||
enum_expr(
|
||||
value,
|
||||
"field use",
|
||||
&[
|
||||
("analytics", "Analytics"),
|
||||
("otel", "Otel"),
|
||||
("rollout_trace", "RolloutTrace"),
|
||||
],
|
||||
quote!(#crate_path::FieldUse),
|
||||
)
|
||||
}
|
||||
|
||||
fn enum_expr(
|
||||
value: &LitStr,
|
||||
label: &str,
|
||||
variants: &[(&str, &str)],
|
||||
path: proc_macro2::TokenStream,
|
||||
) -> syn::Result<proc_macro2::TokenStream> {
|
||||
let raw = value.value();
|
||||
let Some((_, variant)) = variants.iter().find(|(name, _)| *name == raw) else {
|
||||
let expected = variants
|
||||
.iter()
|
||||
.map(|(name, _)| format!("\"{name}\""))
|
||||
.collect::<Vec<_>>()
|
||||
.join(", ");
|
||||
return Err(syn::Error::new_spanned(
|
||||
value,
|
||||
format!("invalid {label} {raw:?}, expected one of {expected}"),
|
||||
));
|
||||
};
|
||||
let variant = format_ident!("{variant}");
|
||||
let expr: Expr = syn::parse_quote!(#path::#variant);
|
||||
Ok(quote!(#expr))
|
||||
}
|
||||
6
codex-rs/observability/BUILD.bazel
Normal file
6
codex-rs/observability/BUILD.bazel
Normal file
@@ -0,0 +1,6 @@
|
||||
load("//:defs.bzl", "codex_rust_crate")
|
||||
|
||||
codex_rust_crate(
|
||||
name = "observability",
|
||||
crate_name = "codex_observability",
|
||||
)
|
||||
21
codex-rs/observability/Cargo.toml
Normal file
21
codex-rs/observability/Cargo.toml
Normal file
@@ -0,0 +1,21 @@
|
||||
[package]
|
||||
edition.workspace = true
|
||||
license.workspace = true
|
||||
name = "codex-observability"
|
||||
version.workspace = true
|
||||
|
||||
[lib]
|
||||
doctest = true
|
||||
name = "codex_observability"
|
||||
path = "src/lib.rs"
|
||||
|
||||
[dependencies]
|
||||
codex-observability-derive = { workspace = true }
|
||||
serde = { workspace = true, features = ["derive"] }
|
||||
|
||||
[dev-dependencies]
|
||||
pretty_assertions = { workspace = true }
|
||||
serde_json = { workspace = true }
|
||||
|
||||
[lints]
|
||||
workspace = true
|
||||
257
codex-rs/observability/src/events.rs
Normal file
257
codex-rs/observability/src/events.rs
Normal file
@@ -0,0 +1,257 @@
|
||||
//! Canonical Codex observation event definitions.
|
||||
|
||||
use crate::Observation;
|
||||
use serde::Serialize;
|
||||
use std::path::Path;
|
||||
|
||||
mod compaction;
|
||||
mod review;
|
||||
mod thread;
|
||||
mod turn;
|
||||
|
||||
pub use compaction::*;
|
||||
pub use review::*;
|
||||
pub use thread::*;
|
||||
pub use turn::*;
|
||||
|
||||
/// How an app/tool/plugin capability was selected by the user or system.
|
||||
#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize)]
|
||||
#[serde(rename_all = "lowercase")]
|
||||
pub enum InvocationType {
|
||||
/// The user explicitly mentioned or selected the capability.
|
||||
Explicit,
|
||||
/// Codex inferred that the capability should be used.
|
||||
Implicit,
|
||||
}
|
||||
|
||||
/// Scope where a skill definition was found.
|
||||
#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub enum SkillScope {
|
||||
User,
|
||||
Repo,
|
||||
System,
|
||||
Admin,
|
||||
}
|
||||
|
||||
/// Status reported after a hook run reaches a terminal state.
|
||||
#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub enum HookRunStatus {
|
||||
/// The hook completed successfully.
|
||||
Completed,
|
||||
/// The hook failed.
|
||||
Failed,
|
||||
/// The hook blocked the triggering action.
|
||||
Blocked,
|
||||
/// The hook stopped execution.
|
||||
Stopped,
|
||||
}
|
||||
|
||||
/// Plugin lifecycle state after a plugin management operation.
|
||||
#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub enum PluginState {
|
||||
/// Plugin was installed.
|
||||
Installed,
|
||||
/// Plugin was uninstalled.
|
||||
Uninstalled,
|
||||
/// Plugin was enabled.
|
||||
Enabled,
|
||||
/// Plugin was disabled.
|
||||
Disabled,
|
||||
}
|
||||
|
||||
/// Observation emitted when a skill is invoked during a turn.
|
||||
#[derive(Observation)]
|
||||
#[observation(name = "skill.invoked", crate = "crate", uses = ["analytics"])]
|
||||
pub struct SkillInvoked<'a> {
|
||||
/// Model slug active for the turn where the skill was invoked.
|
||||
#[obs(level = "basic", class = "operational")]
|
||||
pub model_slug: &'a str,
|
||||
|
||||
/// Thread that owns the turn.
|
||||
#[obs(level = "basic", class = "identifier")]
|
||||
pub thread_id: &'a str,
|
||||
|
||||
/// Turn where the skill was invoked.
|
||||
#[obs(level = "basic", class = "identifier")]
|
||||
pub turn_id: &'a str,
|
||||
|
||||
/// Skill display name.
|
||||
#[obs(level = "basic", class = "operational")]
|
||||
pub skill_name: &'a str,
|
||||
|
||||
/// Scope where the skill was discovered.
|
||||
#[obs(level = "basic", class = "operational")]
|
||||
pub skill_scope: SkillScope,
|
||||
|
||||
/// Local skill definition path used to derive the existing analytics skill id.
|
||||
#[obs(level = "basic", class = "environment")]
|
||||
pub skill_path: &'a Path,
|
||||
|
||||
/// Whether the skill was explicitly requested or inferred.
|
||||
#[obs(level = "basic", class = "operational")]
|
||||
pub invocation_type: InvocationType,
|
||||
}
|
||||
|
||||
/// Observation emitted when an app connector is mentioned during a turn.
|
||||
#[derive(Observation)]
|
||||
#[observation(name = "app.mentioned", crate = "crate", uses = ["analytics"])]
|
||||
pub struct AppMentioned<'a> {
|
||||
/// Model slug active for the turn where the app was mentioned.
|
||||
#[obs(level = "basic", class = "operational")]
|
||||
pub model_slug: &'a str,
|
||||
|
||||
/// Thread that owns the turn.
|
||||
#[obs(level = "basic", class = "identifier")]
|
||||
pub thread_id: &'a str,
|
||||
|
||||
/// Turn where the app was mentioned.
|
||||
#[obs(level = "basic", class = "identifier")]
|
||||
pub turn_id: &'a str,
|
||||
|
||||
/// Stable connector identifier when the app is backed by a connector.
|
||||
#[obs(level = "basic", class = "identifier")]
|
||||
pub connector_id: Option<&'a str>,
|
||||
|
||||
/// User-facing app name.
|
||||
#[obs(level = "basic", class = "operational")]
|
||||
pub app_name: Option<&'a str>,
|
||||
|
||||
/// Whether the mention was explicit or inferred.
|
||||
#[obs(level = "basic", class = "operational")]
|
||||
pub invocation_type: Option<InvocationType>,
|
||||
}
|
||||
|
||||
/// Observation emitted when Codex uses an app connector during a turn.
|
||||
#[derive(Observation)]
|
||||
#[observation(name = "app.used", crate = "crate", uses = ["analytics"])]
|
||||
pub struct AppUsed<'a> {
|
||||
/// Model slug active for the turn where the app was used.
|
||||
#[obs(level = "basic", class = "operational")]
|
||||
pub model_slug: &'a str,
|
||||
|
||||
/// Thread that owns the turn.
|
||||
#[obs(level = "basic", class = "identifier")]
|
||||
pub thread_id: &'a str,
|
||||
|
||||
/// Turn where the app was used.
|
||||
#[obs(level = "basic", class = "identifier")]
|
||||
pub turn_id: &'a str,
|
||||
|
||||
/// Stable connector identifier when the app is backed by a connector.
|
||||
#[obs(level = "basic", class = "identifier")]
|
||||
pub connector_id: Option<&'a str>,
|
||||
|
||||
/// User-facing app name.
|
||||
#[obs(level = "basic", class = "operational")]
|
||||
pub app_name: Option<&'a str>,
|
||||
|
||||
/// Whether usage was explicit or implicit.
|
||||
#[obs(level = "basic", class = "operational")]
|
||||
pub invocation_type: Option<InvocationType>,
|
||||
}
|
||||
|
||||
/// Observation emitted after a configured hook run completes.
|
||||
#[derive(Observation)]
|
||||
#[observation(name = "hook.run_completed", crate = "crate", uses = ["analytics"])]
|
||||
pub struct HookRunCompleted<'a> {
|
||||
/// Model slug active for the turn where the hook ran.
|
||||
#[obs(level = "basic", class = "operational")]
|
||||
pub model_slug: &'a str,
|
||||
|
||||
/// Thread that owns the turn.
|
||||
#[obs(level = "basic", class = "identifier")]
|
||||
pub thread_id: &'a str,
|
||||
|
||||
/// Turn where the hook ran.
|
||||
#[obs(level = "basic", class = "identifier")]
|
||||
pub turn_id: &'a str,
|
||||
|
||||
/// Hook lifecycle point, for example PostToolUse.
|
||||
#[obs(level = "basic", class = "operational")]
|
||||
pub hook_name: &'a str,
|
||||
|
||||
/// Source that configured the hook, for example user or project.
|
||||
#[obs(level = "basic", class = "operational")]
|
||||
pub hook_source: &'static str,
|
||||
|
||||
/// Final hook run status.
|
||||
#[obs(level = "basic", class = "operational")]
|
||||
pub status: HookRunStatus,
|
||||
}
|
||||
|
||||
/// Observation emitted when a plugin capability is used during a turn.
|
||||
#[derive(Observation)]
|
||||
#[observation(name = "plugin.used", crate = "crate", uses = ["analytics"])]
|
||||
pub struct PluginUsed<'a> {
|
||||
/// Model slug active for the turn where the plugin was used.
|
||||
#[obs(level = "basic", class = "operational")]
|
||||
pub model_slug: &'a str,
|
||||
|
||||
/// Thread that owns the turn.
|
||||
#[obs(level = "basic", class = "identifier")]
|
||||
pub thread_id: &'a str,
|
||||
|
||||
/// Turn where the plugin was used.
|
||||
#[obs(level = "basic", class = "identifier")]
|
||||
pub turn_id: &'a str,
|
||||
|
||||
/// Stable plugin identifier.
|
||||
#[obs(level = "basic", class = "identifier")]
|
||||
pub plugin_id: &'a str,
|
||||
|
||||
/// Plugin name component.
|
||||
#[obs(level = "basic", class = "operational")]
|
||||
pub plugin_name: &'a str,
|
||||
|
||||
/// Marketplace or namespace component.
|
||||
#[obs(level = "basic", class = "operational")]
|
||||
pub marketplace_name: &'a str,
|
||||
|
||||
/// Whether the plugin exposes skills.
|
||||
#[obs(level = "basic", class = "operational")]
|
||||
pub has_skills: Option<bool>,
|
||||
|
||||
/// Number of MCP servers exposed by the plugin.
|
||||
#[obs(level = "basic", class = "operational")]
|
||||
pub mcp_server_count: Option<usize>,
|
||||
|
||||
/// Connector identifiers exposed by the plugin.
|
||||
#[obs(level = "basic", class = "identifier")]
|
||||
pub connector_ids: Option<&'a [String]>,
|
||||
}
|
||||
|
||||
/// Observation emitted after a plugin lifecycle state changes.
|
||||
#[derive(Observation)]
|
||||
#[observation(name = "plugin.state_changed", crate = "crate", uses = ["analytics"])]
|
||||
pub struct PluginStateChanged<'a> {
|
||||
/// Stable plugin identifier.
|
||||
#[obs(level = "basic", class = "identifier")]
|
||||
pub plugin_id: &'a str,
|
||||
|
||||
/// Plugin name component.
|
||||
#[obs(level = "basic", class = "operational")]
|
||||
pub plugin_name: &'a str,
|
||||
|
||||
/// Marketplace or namespace component.
|
||||
#[obs(level = "basic", class = "operational")]
|
||||
pub marketplace_name: &'a str,
|
||||
|
||||
/// Whether the plugin exposes skills.
|
||||
#[obs(level = "basic", class = "operational")]
|
||||
pub has_skills: Option<bool>,
|
||||
|
||||
/// Number of MCP servers exposed by the plugin.
|
||||
#[obs(level = "basic", class = "operational")]
|
||||
pub mcp_server_count: Option<usize>,
|
||||
|
||||
/// Connector identifiers exposed by the plugin.
|
||||
#[obs(level = "basic", class = "identifier")]
|
||||
pub connector_ids: Option<&'a [String]>,
|
||||
|
||||
/// New plugin lifecycle state.
|
||||
#[obs(level = "basic", class = "operational")]
|
||||
pub state: PluginState,
|
||||
}
|
||||
105
codex-rs/observability/src/events/compaction.rs
Normal file
105
codex-rs/observability/src/events/compaction.rs
Normal file
@@ -0,0 +1,105 @@
|
||||
//! Compaction observation event definitions.
|
||||
|
||||
use crate::Observation;
|
||||
use serde::Serialize;
|
||||
|
||||
/// What initiated the compaction work.
|
||||
#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub enum CompactionTrigger {
|
||||
Manual,
|
||||
Auto,
|
||||
}
|
||||
|
||||
/// Why the runtime chose to compact context.
|
||||
#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub enum CompactionReason {
|
||||
UserRequested,
|
||||
ContextLimit,
|
||||
ModelDownshift,
|
||||
}
|
||||
|
||||
/// Runtime implementation used for the compaction.
|
||||
#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub enum CompactionImplementation {
|
||||
Responses,
|
||||
ResponsesCompact,
|
||||
}
|
||||
|
||||
/// Point in the turn lifecycle where compaction ran.
|
||||
#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub enum CompactionPhase {
|
||||
StandaloneTurn,
|
||||
PreTurn,
|
||||
MidTurn,
|
||||
}
|
||||
|
||||
/// Strategy used to build the compacted context.
|
||||
#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub enum CompactionStrategy {
|
||||
Memento,
|
||||
PrefixCompaction,
|
||||
}
|
||||
|
||||
/// Terminal status of a compaction run.
|
||||
#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub enum CompactionStatus<'a> {
|
||||
Completed,
|
||||
Failed {
|
||||
/// Bounded failure detail intended for remote export.
|
||||
error: Option<&'a str>,
|
||||
},
|
||||
Interrupted,
|
||||
}
|
||||
|
||||
/// Observation emitted when a compaction run reaches a terminal state.
|
||||
#[derive(Observation)]
|
||||
#[observation(name = "compaction.ended", crate = "crate", uses = ["analytics"])]
|
||||
pub struct CompactionEnded<'a> {
|
||||
#[obs(level = "basic", class = "identifier")]
|
||||
pub thread_id: &'a str,
|
||||
|
||||
#[obs(level = "basic", class = "identifier")]
|
||||
pub turn_id: &'a str,
|
||||
|
||||
#[obs(level = "basic", class = "operational")]
|
||||
pub trigger: CompactionTrigger,
|
||||
|
||||
#[obs(level = "basic", class = "operational")]
|
||||
pub reason: CompactionReason,
|
||||
|
||||
#[obs(level = "basic", class = "operational")]
|
||||
pub implementation: CompactionImplementation,
|
||||
|
||||
#[obs(level = "basic", class = "operational")]
|
||||
pub phase: CompactionPhase,
|
||||
|
||||
#[obs(level = "basic", class = "operational")]
|
||||
pub strategy: CompactionStrategy,
|
||||
|
||||
#[obs(level = "basic", class = "operational")]
|
||||
pub status: CompactionStatus<'a>,
|
||||
|
||||
#[obs(level = "basic", class = "operational")]
|
||||
pub active_context_tokens_before: i64,
|
||||
|
||||
#[obs(level = "basic", class = "operational")]
|
||||
pub active_context_tokens_after: i64,
|
||||
|
||||
/// Unix timestamp in seconds when compaction work began.
|
||||
#[obs(level = "basic", class = "operational")]
|
||||
pub started_at: i64,
|
||||
|
||||
/// Unix timestamp in seconds when compaction reached its terminal state.
|
||||
#[obs(level = "basic", class = "operational")]
|
||||
pub ended_at: i64,
|
||||
|
||||
/// Absent when the callsite cannot measure elapsed wall time.
|
||||
#[obs(level = "basic", class = "operational")]
|
||||
pub duration_ms: Option<i64>,
|
||||
}
|
||||
272
codex-rs/observability/src/events/review.rs
Normal file
272
codex-rs/observability/src/events/review.rs
Normal file
@@ -0,0 +1,272 @@
|
||||
//! Review observation event definitions.
|
||||
|
||||
use crate::Observation;
|
||||
use serde::Serialize;
|
||||
|
||||
/// Final decision returned by a review.
|
||||
#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub enum ReviewDecision {
|
||||
Approved,
|
||||
Denied,
|
||||
Aborted,
|
||||
}
|
||||
|
||||
/// Terminal state of a review.
|
||||
#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub enum ReviewTerminalStatus {
|
||||
Approved,
|
||||
Denied,
|
||||
Aborted {
|
||||
failure_reason: Option<ReviewFailureReason>,
|
||||
},
|
||||
TimedOut {
|
||||
failure_reason: Option<ReviewFailureReason>,
|
||||
},
|
||||
FailedClosed {
|
||||
failure_reason: Option<ReviewFailureReason>,
|
||||
},
|
||||
}
|
||||
|
||||
/// Stable failure category for review terminals.
|
||||
#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub enum ReviewFailureReason {
|
||||
Timeout,
|
||||
Cancelled,
|
||||
PromptBuildError,
|
||||
SessionError,
|
||||
ParseError,
|
||||
}
|
||||
|
||||
/// Source of the request sent for review.
|
||||
#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub enum ReviewRequestSource {
|
||||
MainTurn,
|
||||
DelegatedSubagent,
|
||||
}
|
||||
|
||||
/// Per-command sandbox override requested by the reviewed action.
|
||||
#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub enum ReviewSandboxPermissions {
|
||||
UseDefault,
|
||||
RequireEscalated,
|
||||
WithAdditionalPermissions,
|
||||
}
|
||||
|
||||
/// Additional filesystem permissions requested for a reviewed action.
|
||||
#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize)]
|
||||
pub struct ReviewFileSystemPermissions<'a> {
|
||||
pub read: Option<&'a [String]>,
|
||||
pub write: Option<&'a [String]>,
|
||||
}
|
||||
|
||||
/// Additional network permissions requested for a reviewed action.
|
||||
#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize)]
|
||||
pub struct ReviewNetworkPermissions {
|
||||
pub enabled: Option<bool>,
|
||||
}
|
||||
|
||||
/// Additional permissions requested for a reviewed action.
|
||||
#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize)]
|
||||
pub struct ReviewPermissionProfile<'a> {
|
||||
pub network: Option<ReviewNetworkPermissions>,
|
||||
pub file_system: Option<ReviewFileSystemPermissions<'a>>,
|
||||
}
|
||||
|
||||
/// Source tool that produced an exec-style reviewed action.
|
||||
#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub enum ReviewCommandSource {
|
||||
Shell,
|
||||
UnifiedExec,
|
||||
}
|
||||
|
||||
/// Network protocol involved in a reviewed network access request.
|
||||
#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub enum ReviewNetworkApprovalProtocol {
|
||||
Http,
|
||||
Https,
|
||||
Socks5Tcp,
|
||||
Socks5Udp,
|
||||
}
|
||||
|
||||
/// Action that was evaluated by a review.
|
||||
#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize)]
|
||||
#[serde(tag = "type", rename_all = "snake_case")]
|
||||
pub enum ReviewedAction<'a> {
|
||||
Shell {
|
||||
command: &'a [String],
|
||||
command_display: &'a str,
|
||||
cwd: &'a str,
|
||||
sandbox_permissions: ReviewSandboxPermissions,
|
||||
additional_permissions: Option<ReviewPermissionProfile<'a>>,
|
||||
justification: Option<&'a str>,
|
||||
},
|
||||
UnifiedExec {
|
||||
command: &'a [String],
|
||||
command_display: &'a str,
|
||||
cwd: &'a str,
|
||||
sandbox_permissions: ReviewSandboxPermissions,
|
||||
additional_permissions: Option<ReviewPermissionProfile<'a>>,
|
||||
justification: Option<&'a str>,
|
||||
tty: bool,
|
||||
},
|
||||
ProcessExec {
|
||||
source: ReviewCommandSource,
|
||||
program: &'a str,
|
||||
argv: &'a [String],
|
||||
cwd: &'a str,
|
||||
additional_permissions: Option<ReviewPermissionProfile<'a>>,
|
||||
},
|
||||
ApplyPatch {
|
||||
cwd: &'a str,
|
||||
files: &'a [String],
|
||||
},
|
||||
NetworkAccess {
|
||||
target: &'a str,
|
||||
host: &'a str,
|
||||
protocol: ReviewNetworkApprovalProtocol,
|
||||
port: u16,
|
||||
},
|
||||
McpToolCall {
|
||||
server: &'a str,
|
||||
tool_name: &'a str,
|
||||
connector_id: Option<&'a str>,
|
||||
connector_name: Option<&'a str>,
|
||||
tool_title: Option<&'a str>,
|
||||
},
|
||||
}
|
||||
|
||||
/// Risk level assigned by an automated reviewer.
|
||||
#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub enum ReviewRiskLevel {
|
||||
Low,
|
||||
Medium,
|
||||
High,
|
||||
Critical,
|
||||
}
|
||||
|
||||
/// User authorization level observed by an automated reviewer.
|
||||
#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub enum ReviewUserAuthorization {
|
||||
Unknown,
|
||||
Low,
|
||||
Medium,
|
||||
High,
|
||||
}
|
||||
|
||||
/// Policy outcome recommended by an automated reviewer.
|
||||
#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub enum ReviewOutcome {
|
||||
Allow,
|
||||
Deny,
|
||||
}
|
||||
|
||||
/// How guardian review obtained a model session.
|
||||
#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub enum GuardianReviewSessionKind {
|
||||
TrunkNew,
|
||||
TrunkReused,
|
||||
EphemeralForked,
|
||||
}
|
||||
|
||||
/// Guardian model session used to perform a review.
|
||||
#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize)]
|
||||
pub struct GuardianReviewSession<'a> {
|
||||
pub guardian_thread_id: &'a str,
|
||||
pub session_kind: GuardianReviewSessionKind,
|
||||
pub model: &'a str,
|
||||
/// Absent when the selected model/provider has no explicit effort setting.
|
||||
pub reasoning_effort: Option<&'a str>,
|
||||
pub had_prior_review_context: bool,
|
||||
}
|
||||
|
||||
/// Response produced by a user reviewer.
|
||||
#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize)]
|
||||
pub struct UserReviewResponse {
|
||||
pub decision: ReviewDecision,
|
||||
}
|
||||
|
||||
/// Response produced by guardian review.
|
||||
#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize)]
|
||||
pub struct GuardianReviewResponse<'a> {
|
||||
pub decision: ReviewDecision,
|
||||
/// Terminal runtime state; non-success states carry their stable failure category.
|
||||
pub terminal_status: ReviewTerminalStatus,
|
||||
/// Absent when review stops before the model reports a risk classification.
|
||||
pub risk_level: Option<ReviewRiskLevel>,
|
||||
/// Absent when review stops before user authorization is assessed.
|
||||
pub user_authorization: Option<ReviewUserAuthorization>,
|
||||
/// Absent when review stops before a policy outcome is produced.
|
||||
pub outcome: Option<ReviewOutcome>,
|
||||
/// Model-authored rationale text returned by guardian review.
|
||||
pub rationale: Option<&'a str>,
|
||||
/// Absent when review fails before a guardian model session is created or reused.
|
||||
pub session: Option<GuardianReviewSession<'a>>,
|
||||
pub review_timeout_ms: u64,
|
||||
pub tool_call_count: u64,
|
||||
/// Absent when the guardian session did not stream model tokens.
|
||||
pub time_to_first_token_ms: Option<u64>,
|
||||
/// Absent when review ended before a model completion was received.
|
||||
pub completion_latency_ms: Option<u64>,
|
||||
/// Absent when the guardian model did not report token accounting.
|
||||
pub token_usage: Option<super::TurnTokenUsage>,
|
||||
}
|
||||
|
||||
/// Reviewer response that completed a review.
|
||||
#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize)]
|
||||
#[serde(tag = "reviewer", rename_all = "snake_case")]
|
||||
pub enum ReviewResponse<'a> {
|
||||
User(UserReviewResponse),
|
||||
Guardian(GuardianReviewResponse<'a>),
|
||||
}
|
||||
|
||||
/// Observation emitted when review of an action reaches a terminal state.
|
||||
#[derive(Observation)]
|
||||
#[observation(name = "review.completed", crate = "crate", uses = ["analytics"])]
|
||||
pub struct ReviewCompleted<'a> {
|
||||
#[obs(level = "basic", class = "identifier")]
|
||||
pub thread_id: &'a str,
|
||||
|
||||
#[obs(level = "basic", class = "identifier")]
|
||||
pub turn_id: &'a str,
|
||||
|
||||
#[obs(level = "basic", class = "identifier")]
|
||||
pub review_id: &'a str,
|
||||
|
||||
#[obs(level = "basic", class = "identifier")]
|
||||
pub target_item_id: &'a str,
|
||||
|
||||
/// Absent on the first review attempt; present when this review retries an earlier attempt.
|
||||
#[obs(level = "basic", class = "operational")]
|
||||
pub retry_reason: Option<&'a str>,
|
||||
|
||||
#[obs(level = "basic", class = "operational")]
|
||||
pub request_source: ReviewRequestSource,
|
||||
|
||||
/// The reviewed action may contain command text, paths, or tool names.
|
||||
#[obs(level = "basic", class = "content")]
|
||||
pub reviewed_action: ReviewedAction<'a>,
|
||||
|
||||
#[obs(level = "basic", class = "operational")]
|
||||
pub reviewed_action_truncated: bool,
|
||||
|
||||
/// Contains the reviewer-specific terminal result.
|
||||
#[obs(level = "basic", class = "content")]
|
||||
pub response: ReviewResponse<'a>,
|
||||
|
||||
#[obs(level = "basic", class = "operational")]
|
||||
pub started_at: i64,
|
||||
|
||||
#[obs(level = "basic", class = "operational")]
|
||||
pub ended_at: i64,
|
||||
}
|
||||
89
codex-rs/observability/src/events/thread.rs
Normal file
89
codex-rs/observability/src/events/thread.rs
Normal file
@@ -0,0 +1,89 @@
|
||||
//! Thread lifecycle observation event definitions.
|
||||
|
||||
use crate::Observation;
|
||||
use serde::Serialize;
|
||||
|
||||
/// How a thread became active in the runtime.
|
||||
///
|
||||
/// This describes the open operation, not the long-term origin of the thread.
|
||||
/// A resumed thread already existed; it still becomes active again for the
|
||||
/// runtime or client connection handling the resume.
|
||||
#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub enum ThreadInitializationMode {
|
||||
New,
|
||||
Forked,
|
||||
Resumed,
|
||||
}
|
||||
|
||||
/// Subagent work that caused a thread to start.
|
||||
#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub enum ThreadSubagentKind<'a> {
|
||||
Review,
|
||||
Compact,
|
||||
ThreadSpawn,
|
||||
MemoryConsolidation,
|
||||
Other(&'a str),
|
||||
}
|
||||
|
||||
/// Origin of the request that made a thread active.
|
||||
///
|
||||
/// Keep this separate from `ThreadInitializationMode`: source answers who or
|
||||
/// what opened the thread, while initialization mode answers whether the
|
||||
/// thread was new, forked, or resumed.
|
||||
#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub enum ThreadSource<'a> {
|
||||
User,
|
||||
AppServer,
|
||||
Custom(&'a str),
|
||||
Subagent(ThreadSubagentKind<'a>),
|
||||
Unknown,
|
||||
}
|
||||
|
||||
/// Observation emitted when Codex starts tracking a thread.
|
||||
///
|
||||
/// "Started" means the thread became active for this runtime or client
|
||||
/// connection. It does not imply the thread was newly created; see
|
||||
/// `initialization_mode` for new, forked, and resumed activations.
|
||||
#[derive(Observation)]
|
||||
#[observation(name = "thread.started", crate = "crate", uses = ["analytics"])]
|
||||
pub struct ThreadStarted<'a> {
|
||||
/// Thread that became active.
|
||||
#[obs(level = "basic", class = "identifier")]
|
||||
pub thread_id: &'a str,
|
||||
|
||||
#[obs(level = "basic", class = "operational")]
|
||||
pub source: ThreadSource<'a>,
|
||||
|
||||
/// Parent thread that created this thread, when the source represents subagent work.
|
||||
///
|
||||
/// This stays top-level instead of being nested inside the source enum so
|
||||
/// sinks can apply identifier policy directly to the field.
|
||||
#[obs(level = "basic", class = "identifier")]
|
||||
pub parent_thread_id: Option<&'a str>,
|
||||
|
||||
#[obs(level = "basic", class = "operational")]
|
||||
pub initialization_mode: ThreadInitializationMode,
|
||||
|
||||
/// Model associated with the thread at activation time.
|
||||
///
|
||||
/// Turn configuration also records a model because turns may later run with
|
||||
/// overrides or migrated settings. This field is thread lifecycle metadata.
|
||||
#[obs(level = "basic", class = "operational")]
|
||||
pub model: &'a str,
|
||||
|
||||
/// Whether the thread is persisted beyond the active runtime session.
|
||||
///
|
||||
/// Turn configuration repeats this for legacy analytics compatibility, but
|
||||
/// the stable owner of the value is the thread lifecycle.
|
||||
#[obs(level = "basic", class = "operational")]
|
||||
pub ephemeral: bool,
|
||||
|
||||
/// Unix timestamp in seconds when the thread was originally created.
|
||||
///
|
||||
/// For resumed threads this is historical creation time, not resume time.
|
||||
#[obs(level = "basic", class = "operational")]
|
||||
pub created_at: i64,
|
||||
}
|
||||
225
codex-rs/observability/src/events/turn.rs
Normal file
225
codex-rs/observability/src/events/turn.rs
Normal file
@@ -0,0 +1,225 @@
|
||||
//! Turn lifecycle observation event definitions.
|
||||
|
||||
use crate::Observation;
|
||||
use serde::Serialize;
|
||||
|
||||
/// Terminal turn status after Codex stops working on a turn.
|
||||
#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub enum TurnStatus {
|
||||
Completed,
|
||||
Failed,
|
||||
Interrupted,
|
||||
}
|
||||
|
||||
/// Result of trying to steer an active turn.
|
||||
#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub enum TurnSteerResult {
|
||||
Accepted,
|
||||
Rejected,
|
||||
}
|
||||
|
||||
/// Stable reason why a turn steering request was rejected.
|
||||
#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub enum TurnSteerRejectionReason {
|
||||
NoActiveTurn,
|
||||
ExpectedTurnMismatch,
|
||||
NonSteerableReview,
|
||||
NonSteerableCompact,
|
||||
EmptyInput,
|
||||
InputTooLarge,
|
||||
}
|
||||
|
||||
/// How a turn was submitted for execution.
|
||||
#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub enum TurnSubmissionType {
|
||||
Default,
|
||||
Queued,
|
||||
}
|
||||
|
||||
/// Filesystem sandbox mode resolved for a turn.
|
||||
#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub enum SandboxMode {
|
||||
FullAccess,
|
||||
ReadOnly,
|
||||
WorkspaceWrite,
|
||||
ExternalSandbox,
|
||||
}
|
||||
|
||||
/// Approval policy resolved for a turn.
|
||||
#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub enum ApprovalPolicy {
|
||||
/// Legacy name for the policy that asks unless an action is trusted.
|
||||
Untrusted,
|
||||
OnFailure,
|
||||
OnRequest,
|
||||
Granular,
|
||||
Never,
|
||||
}
|
||||
|
||||
/// Destination that reviews approval requests for a turn.
|
||||
#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub enum ApprovalReviewer {
|
||||
User,
|
||||
GuardianSubagent,
|
||||
}
|
||||
|
||||
/// Collaboration mode resolved for a turn.
|
||||
#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub enum CollaborationMode {
|
||||
Default,
|
||||
Plan,
|
||||
}
|
||||
|
||||
/// Reasoning effort resolved for a turn.
|
||||
#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize)]
|
||||
#[serde(rename_all = "lowercase")]
|
||||
pub enum ReasoningEffort {
|
||||
None,
|
||||
Minimal,
|
||||
Low,
|
||||
Medium,
|
||||
High,
|
||||
XHigh,
|
||||
}
|
||||
|
||||
/// Reasoning summary mode resolved for a turn.
|
||||
#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize)]
|
||||
#[serde(rename_all = "lowercase")]
|
||||
pub enum ReasoningSummary {
|
||||
Auto,
|
||||
Concise,
|
||||
Detailed,
|
||||
None,
|
||||
}
|
||||
|
||||
/// Service tier resolved for a turn.
|
||||
#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize)]
|
||||
#[serde(rename_all = "lowercase")]
|
||||
pub enum ServiceTier {
|
||||
Fast,
|
||||
Flex,
|
||||
}
|
||||
|
||||
/// Response personality resolved for a turn.
|
||||
#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize)]
|
||||
#[serde(rename_all = "lowercase")]
|
||||
pub enum Personality {
|
||||
None,
|
||||
Friendly,
|
||||
Pragmatic,
|
||||
}
|
||||
|
||||
/// Configuration resolved before a turn starts executing.
|
||||
#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize)]
|
||||
pub struct TurnConfig<'a> {
|
||||
pub num_input_images: usize,
|
||||
/// Absent when the caller cannot distinguish default from queued submission.
|
||||
pub submission_type: Option<TurnSubmissionType>,
|
||||
pub ephemeral: bool,
|
||||
pub model: &'a str,
|
||||
pub model_provider: &'a str,
|
||||
pub sandbox_mode: SandboxMode,
|
||||
/// Kept separate from sandbox mode because analytics reports network
|
||||
/// capability as an independent resolved setting.
|
||||
pub sandbox_network_access: bool,
|
||||
/// Absent when the selected model/provider has no explicit effort setting.
|
||||
pub reasoning_effort: Option<ReasoningEffort>,
|
||||
/// None means no summary setting was resolved; Some(None) means summaries
|
||||
/// were explicitly disabled.
|
||||
pub reasoning_summary: Option<ReasoningSummary>,
|
||||
pub service_tier: Option<ServiceTier>,
|
||||
pub approval_policy: ApprovalPolicy,
|
||||
pub approval_reviewer: ApprovalReviewer,
|
||||
pub collaboration_mode: CollaborationMode,
|
||||
/// Absent when no personality setting was resolved.
|
||||
pub personality: Option<Personality>,
|
||||
pub is_first_turn: bool,
|
||||
}
|
||||
|
||||
/// Observation emitted when execution of a turn starts.
|
||||
#[derive(Observation)]
|
||||
#[observation(name = "turn.started", crate = "crate", uses = ["analytics"])]
|
||||
pub struct TurnStarted<'a> {
|
||||
#[obs(level = "basic", class = "identifier")]
|
||||
pub thread_id: &'a str,
|
||||
|
||||
#[obs(level = "basic", class = "identifier")]
|
||||
pub turn_id: &'a str,
|
||||
|
||||
#[obs(level = "basic", class = "operational")]
|
||||
pub config: TurnConfig<'a>,
|
||||
|
||||
/// Unix timestamp in seconds when the turn started.
|
||||
#[obs(level = "basic", class = "operational")]
|
||||
pub started_at: i64,
|
||||
}
|
||||
|
||||
/// Observation emitted when the outcome of a same-turn steering request is known.
|
||||
#[derive(Observation)]
|
||||
#[observation(name = "turn.steer", crate = "crate", uses = ["analytics"])]
|
||||
pub struct TurnSteer<'a> {
|
||||
#[obs(level = "basic", class = "identifier")]
|
||||
pub thread_id: &'a str,
|
||||
|
||||
#[obs(level = "basic", class = "identifier")]
|
||||
pub expected_turn_id: &'a str,
|
||||
|
||||
#[obs(level = "basic", class = "identifier")]
|
||||
pub accepted_turn_id: Option<&'a str>,
|
||||
|
||||
#[obs(level = "basic", class = "operational")]
|
||||
pub num_input_images: usize,
|
||||
|
||||
#[obs(level = "basic", class = "operational")]
|
||||
pub result: TurnSteerResult,
|
||||
|
||||
#[obs(level = "basic", class = "operational")]
|
||||
pub rejection_reason: Option<TurnSteerRejectionReason>,
|
||||
|
||||
/// Unix timestamp in seconds when the steering request was made.
|
||||
#[obs(level = "basic", class = "operational")]
|
||||
pub created_at: i64,
|
||||
}
|
||||
|
||||
/// Token accounting reported for a completed turn.
|
||||
#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize)]
|
||||
pub struct TurnTokenUsage {
|
||||
pub input_tokens: i64,
|
||||
pub cached_input_tokens: i64,
|
||||
pub output_tokens: i64,
|
||||
pub reasoning_output_tokens: i64,
|
||||
pub total_tokens: i64,
|
||||
}
|
||||
|
||||
/// Observation emitted when a turn reaches a terminal state.
|
||||
#[derive(Observation)]
|
||||
#[observation(name = "turn.ended", crate = "crate", uses = ["analytics"])]
|
||||
pub struct TurnEnded<'a> {
|
||||
#[obs(level = "basic", class = "identifier")]
|
||||
pub thread_id: &'a str,
|
||||
|
||||
#[obs(level = "basic", class = "identifier")]
|
||||
pub turn_id: &'a str,
|
||||
|
||||
#[obs(level = "basic", class = "operational")]
|
||||
pub status: TurnStatus,
|
||||
|
||||
/// Absent when a turn ends before provider usage is available.
|
||||
#[obs(level = "basic", class = "operational")]
|
||||
pub token_usage: Option<TurnTokenUsage>,
|
||||
|
||||
/// Unix timestamp in seconds when the turn ended.
|
||||
#[obs(level = "basic", class = "operational")]
|
||||
pub ended_at: i64,
|
||||
|
||||
#[obs(level = "basic", class = "operational")]
|
||||
pub duration_ms: i64,
|
||||
}
|
||||
277
codex-rs/observability/src/lib.rs
Normal file
277
codex-rs/observability/src/lib.rs
Normal file
@@ -0,0 +1,277 @@
|
||||
//! Shared semantic observation API for Codex runtime facts.
|
||||
//!
|
||||
//! Observations describe facts that occurred in Codex. Destination-specific
|
||||
//! systems such as analytics, rollout trace, OTEL events, and OTEL metrics
|
||||
//! consume observations through sinks and reducers.
|
||||
//!
|
||||
//! Field metadata is required because sinks must decide whether they may read a
|
||||
//! field before serializing or exporting it. Missing field annotations are a
|
||||
//! compile-time error:
|
||||
//!
|
||||
//! ```compile_fail
|
||||
//! use codex_observability::Observation;
|
||||
//!
|
||||
//! #[derive(Observation)]
|
||||
//! #[observation(name = "example.missing_field_meta")]
|
||||
//! struct MissingFieldMeta {
|
||||
//! field: &'static str,
|
||||
//! }
|
||||
//! ```
|
||||
//!
|
||||
//! Observation names are also required:
|
||||
//!
|
||||
//! ```compile_fail
|
||||
//! use codex_observability::Observation;
|
||||
//!
|
||||
//! #[derive(Observation)]
|
||||
//! struct MissingObservationName {
|
||||
//! #[obs(level = "basic", class = "operational")]
|
||||
//! field: &'static str,
|
||||
//! }
|
||||
//! ```
|
||||
|
||||
pub mod events;
|
||||
|
||||
pub use codex_observability_derive::Observation;
|
||||
use serde::Serialize;
|
||||
|
||||
/// A runtime fact emitted by Codex.
|
||||
///
|
||||
/// Implementations visit every exported field together with its field metadata.
|
||||
/// Sinks use that metadata to apply destination-specific policy before
|
||||
/// serialization, storage, or export.
|
||||
pub trait Observation {
|
||||
/// Stable semantic event name, for example `turn.started`.
|
||||
const NAME: &'static str;
|
||||
|
||||
/// Visits the fields that make up this observation.
|
||||
fn visit_fields<V: ObservationFieldVisitor>(&self, visitor: &mut V);
|
||||
}
|
||||
|
||||
/// Receives observation fields after policy metadata has been attached.
|
||||
///
|
||||
/// Implementations should inspect `meta` before serializing `value`. This keeps
|
||||
/// remote sinks from accidentally materializing local-only content fields.
|
||||
pub trait ObservationFieldVisitor {
|
||||
/// Visits one field from an observation.
|
||||
fn field<T: Serialize + ?Sized>(&mut self, name: &'static str, meta: FieldMeta, value: &T);
|
||||
}
|
||||
|
||||
/// Consumes observations.
|
||||
///
|
||||
/// A sink may serialize immediately, reduce into another event shape, or fan
|
||||
/// out to additional sinks. The trait is generic so callers can pass borrowed
|
||||
/// typed observations without allocating an intermediate event object.
|
||||
pub trait ObservationSink {
|
||||
/// Observes a single typed event.
|
||||
fn observe<E: Observation>(&self, event: &E);
|
||||
}
|
||||
|
||||
/// Visits fields intended for one sink and allowed by the supplied policy.
|
||||
///
|
||||
/// This helper is the safe path for sinks that serialize fields in their
|
||||
/// visitor. The explicit field-use marker selects the intended fields; the
|
||||
/// policy gate then rejects unsafe metadata before the wrapped visitor can
|
||||
/// materialize the value.
|
||||
pub fn visit_fields_for_use<E, V>(
|
||||
event: &E,
|
||||
field_use: FieldUse,
|
||||
policy: FieldPolicy,
|
||||
visitor: &mut V,
|
||||
) where
|
||||
E: Observation,
|
||||
V: ObservationFieldVisitor,
|
||||
{
|
||||
let mut visitor = PolicyFilteredVisitor {
|
||||
field_use,
|
||||
policy,
|
||||
inner: visitor,
|
||||
};
|
||||
event.visit_fields(&mut visitor);
|
||||
}
|
||||
|
||||
struct PolicyFilteredVisitor<'a, V> {
|
||||
field_use: FieldUse,
|
||||
policy: FieldPolicy,
|
||||
inner: &'a mut V,
|
||||
}
|
||||
|
||||
impl<V> ObservationFieldVisitor for PolicyFilteredVisitor<'_, V>
|
||||
where
|
||||
V: ObservationFieldVisitor,
|
||||
{
|
||||
fn field<T: Serialize + ?Sized>(&mut self, name: &'static str, meta: FieldMeta, value: &T) {
|
||||
if meta.is_used_by(self.field_use) && self.policy.allows(meta) {
|
||||
self.inner.field(name, meta, value);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Policy metadata attached to a single observation field.
|
||||
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
|
||||
pub struct FieldMeta {
|
||||
/// How much detail a sink must be allowed to read before consuming the field.
|
||||
pub detail: DetailLevel,
|
||||
/// Semantic/privacy class for the field.
|
||||
pub class: DataClass,
|
||||
/// Exact sinks or projections that are intended to consume the field.
|
||||
pub uses: &'static [FieldUse],
|
||||
}
|
||||
|
||||
impl FieldMeta {
|
||||
/// Creates metadata for a field that is not consumed by any exact sink.
|
||||
pub const fn new(detail: DetailLevel, class: DataClass) -> Self {
|
||||
Self {
|
||||
detail,
|
||||
class,
|
||||
uses: &[],
|
||||
}
|
||||
}
|
||||
|
||||
/// Creates metadata for a field with explicit sink-use markers.
|
||||
pub const fn with_uses(
|
||||
detail: DetailLevel,
|
||||
class: DataClass,
|
||||
uses: &'static [FieldUse],
|
||||
) -> Self {
|
||||
Self {
|
||||
detail,
|
||||
class,
|
||||
uses,
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns true when the field was explicitly marked for `field_use`.
|
||||
pub fn is_used_by(self, field_use: FieldUse) -> bool {
|
||||
self.uses.contains(&field_use)
|
||||
}
|
||||
}
|
||||
|
||||
/// Decides whether a sink may read an observation field.
|
||||
///
|
||||
/// Policies are checked before serialization. This matters because denied
|
||||
/// fields may contain content, secrets, or large trace payloads that remote
|
||||
/// sinks must not materialize even transiently.
|
||||
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
|
||||
pub struct FieldPolicy {
|
||||
max_detail: DetailLevel,
|
||||
allowed_classes: &'static [DataClass],
|
||||
}
|
||||
|
||||
impl FieldPolicy {
|
||||
/// Creates a policy that permits fields at or below the configured detail
|
||||
/// limit and whose data class is present in the allowed class list.
|
||||
pub const fn new(max_detail: DetailLevel, allowed_classes: &'static [DataClass]) -> Self {
|
||||
Self {
|
||||
max_detail,
|
||||
allowed_classes,
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns true when a sink may inspect and serialize a field.
|
||||
pub fn allows(self, meta: FieldMeta) -> bool {
|
||||
let detail_allowed = match self.max_detail {
|
||||
DetailLevel::Basic => matches!(meta.detail, DetailLevel::Basic),
|
||||
DetailLevel::Detailed => {
|
||||
matches!(meta.detail, DetailLevel::Basic | DetailLevel::Detailed)
|
||||
}
|
||||
DetailLevel::Trace => true,
|
||||
};
|
||||
|
||||
detail_allowed && self.allowed_classes.contains(&meta.class)
|
||||
}
|
||||
}
|
||||
|
||||
/// Coarse detail level for a field.
|
||||
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
|
||||
pub enum DetailLevel {
|
||||
/// Lifecycle, identifiers, status, counts, model/config, and timing.
|
||||
Basic,
|
||||
/// Bounded previews and richer runtime summaries.
|
||||
Detailed,
|
||||
/// Raw or near-raw diagnostic evidence intended for local traces.
|
||||
Trace,
|
||||
}
|
||||
|
||||
/// Semantic/privacy class for a field.
|
||||
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
|
||||
pub enum DataClass {
|
||||
/// Thread IDs, turn IDs, call IDs, and similar correlation identifiers.
|
||||
Identifier,
|
||||
/// Status, duration, model, provider, token counts, and tool kind.
|
||||
Operational,
|
||||
/// Working directory, repository, OS/runtime, or client metadata.
|
||||
Environment,
|
||||
/// User text, assistant text, command text, tool output, or model payloads.
|
||||
Content,
|
||||
/// Headers, environment values, auth-like payloads, or raw request blobs.
|
||||
SecretRisk,
|
||||
}
|
||||
|
||||
/// Exact sink or projection that is intended to consume a field.
|
||||
///
|
||||
/// This marker is separate from `DetailLevel` and `DataClass`: it expresses
|
||||
/// intent, while detail/class remain guardrails that a sink policy enforces
|
||||
/// before it serializes or exports the selected field.
|
||||
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
|
||||
pub enum FieldUse {
|
||||
/// Remote product analytics.
|
||||
Analytics,
|
||||
/// OpenTelemetry events, logs, or metrics.
|
||||
Otel,
|
||||
/// Local rollout trace bundles.
|
||||
RolloutTrace,
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use pretty_assertions::assert_eq;
|
||||
|
||||
#[test]
|
||||
fn field_meta_preserves_detail_and_class() {
|
||||
assert_eq!(
|
||||
FieldMeta::new(DetailLevel::Trace, DataClass::Content),
|
||||
FieldMeta {
|
||||
detail: DetailLevel::Trace,
|
||||
class: DataClass::Content,
|
||||
uses: &[],
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn field_policy_requires_allowed_detail_and_class() {
|
||||
let policy = FieldPolicy::new(
|
||||
DetailLevel::Basic,
|
||||
&[DataClass::Identifier, DataClass::Operational],
|
||||
);
|
||||
let cases = [
|
||||
(
|
||||
FieldMeta::new(DetailLevel::Basic, DataClass::Identifier),
|
||||
true,
|
||||
),
|
||||
(
|
||||
FieldMeta::new(DetailLevel::Basic, DataClass::Operational),
|
||||
true,
|
||||
),
|
||||
(
|
||||
FieldMeta::new(DetailLevel::Detailed, DataClass::Operational),
|
||||
false,
|
||||
),
|
||||
(
|
||||
FieldMeta::new(DetailLevel::Basic, DataClass::Content),
|
||||
false,
|
||||
),
|
||||
(
|
||||
FieldMeta::new(DetailLevel::Basic, DataClass::SecretRisk),
|
||||
false,
|
||||
),
|
||||
];
|
||||
|
||||
assert_eq!(
|
||||
cases.map(|(meta, _expected)| policy.allows(meta)),
|
||||
cases.map(|(_meta, expected)| expected)
|
||||
);
|
||||
}
|
||||
}
|
||||
161
codex-rs/observability/tests/derive.rs
Normal file
161
codex-rs/observability/tests/derive.rs
Normal file
@@ -0,0 +1,161 @@
|
||||
use codex_observability::DataClass;
|
||||
use codex_observability::DetailLevel;
|
||||
use codex_observability::FieldMeta;
|
||||
use codex_observability::FieldPolicy;
|
||||
use codex_observability::FieldUse;
|
||||
use codex_observability::Observation;
|
||||
use codex_observability::ObservationFieldVisitor;
|
||||
use codex_observability::visit_fields_for_use;
|
||||
use pretty_assertions::assert_eq;
|
||||
use serde::Serialize;
|
||||
use serde::Serializer;
|
||||
use serde_json::Value;
|
||||
|
||||
#[derive(Observation)]
|
||||
#[observation(name = "turn.config_resolved")]
|
||||
struct TurnConfigResolved<'a> {
|
||||
#[obs(level = "basic", class = "identifier")]
|
||||
thread_id: &'a str,
|
||||
|
||||
#[obs(level = "basic", class = "identifier")]
|
||||
turn_id: &'a str,
|
||||
|
||||
#[obs(level = "basic", class = "operational")]
|
||||
model: &'a str,
|
||||
}
|
||||
|
||||
#[derive(Observation)]
|
||||
#[observation(name = "test.policy_filtered", uses = ["analytics"])]
|
||||
struct PolicyFiltered<'a> {
|
||||
#[obs(level = "basic", class = "identifier")]
|
||||
thread_id: &'a str,
|
||||
|
||||
#[obs(level = "basic", class = "operational")]
|
||||
status: &'a str,
|
||||
|
||||
#[obs(level = "trace", class = "content")]
|
||||
raw_prompt: PanicsIfSerialized,
|
||||
|
||||
#[obs(level = "basic", class = "secret_risk")]
|
||||
api_key: PanicsIfSerialized,
|
||||
|
||||
#[obs(level = "basic", class = "operational", uses = ["rollout_trace"])]
|
||||
rollout_only_status: PanicsIfSerialized,
|
||||
}
|
||||
|
||||
struct PanicsIfSerialized;
|
||||
|
||||
impl Serialize for PanicsIfSerialized {
|
||||
fn serialize<S>(&self, _serializer: S) -> Result<S::Ok, S::Error>
|
||||
where
|
||||
S: Serializer,
|
||||
{
|
||||
panic!("denied observation field should not be serialized")
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq)]
|
||||
struct CapturedField {
|
||||
name: &'static str,
|
||||
meta: FieldMeta,
|
||||
value: Value,
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
struct CapturingVisitor {
|
||||
fields: Vec<CapturedField>,
|
||||
}
|
||||
|
||||
impl ObservationFieldVisitor for CapturingVisitor {
|
||||
fn field<T: serde::Serialize + ?Sized>(
|
||||
&mut self,
|
||||
name: &'static str,
|
||||
meta: FieldMeta,
|
||||
value: &T,
|
||||
) {
|
||||
let value = match serde_json::to_value(value) {
|
||||
Ok(value) => value,
|
||||
Err(err) => panic!("field should serialize: {err}"),
|
||||
};
|
||||
self.fields.push(CapturedField { name, meta, value });
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn derive_visits_annotated_fields_with_metadata() {
|
||||
let event = TurnConfigResolved {
|
||||
thread_id: "thread-1",
|
||||
turn_id: "turn-1",
|
||||
model: "gpt-5.4",
|
||||
};
|
||||
|
||||
let mut visitor = CapturingVisitor::default();
|
||||
event.visit_fields(&mut visitor);
|
||||
|
||||
assert_eq!(TurnConfigResolved::NAME, "turn.config_resolved");
|
||||
assert_eq!(
|
||||
visitor.fields,
|
||||
vec![
|
||||
CapturedField {
|
||||
name: "thread_id",
|
||||
meta: FieldMeta::new(DetailLevel::Basic, DataClass::Identifier),
|
||||
value: Value::String("thread-1".to_string()),
|
||||
},
|
||||
CapturedField {
|
||||
name: "turn_id",
|
||||
meta: FieldMeta::new(DetailLevel::Basic, DataClass::Identifier),
|
||||
value: Value::String("turn-1".to_string()),
|
||||
},
|
||||
CapturedField {
|
||||
name: "model",
|
||||
meta: FieldMeta::new(DetailLevel::Basic, DataClass::Operational),
|
||||
value: Value::String("gpt-5.4".to_string()),
|
||||
},
|
||||
]
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn use_and_policy_filter_before_serializing_denied_fields() {
|
||||
let event = PolicyFiltered {
|
||||
thread_id: "thread-1",
|
||||
status: "completed",
|
||||
raw_prompt: PanicsIfSerialized,
|
||||
api_key: PanicsIfSerialized,
|
||||
rollout_only_status: PanicsIfSerialized,
|
||||
};
|
||||
let mut visitor = CapturingVisitor::default();
|
||||
visit_fields_for_use(
|
||||
&event,
|
||||
FieldUse::Analytics,
|
||||
FieldPolicy::new(
|
||||
DetailLevel::Basic,
|
||||
&[DataClass::Identifier, DataClass::Operational],
|
||||
),
|
||||
&mut visitor,
|
||||
);
|
||||
|
||||
assert_eq!(
|
||||
visitor.fields,
|
||||
vec![
|
||||
CapturedField {
|
||||
name: "thread_id",
|
||||
meta: FieldMeta::with_uses(
|
||||
DetailLevel::Basic,
|
||||
DataClass::Identifier,
|
||||
&[FieldUse::Analytics],
|
||||
),
|
||||
value: Value::String("thread-1".to_string()),
|
||||
},
|
||||
CapturedField {
|
||||
name: "status",
|
||||
meta: FieldMeta::with_uses(
|
||||
DetailLevel::Basic,
|
||||
DataClass::Operational,
|
||||
&[FieldUse::Analytics],
|
||||
),
|
||||
value: Value::String("completed".to_string()),
|
||||
},
|
||||
]
|
||||
);
|
||||
}
|
||||
458
docs/observability-event-stream-design.md
Normal file
458
docs/observability-event-stream-design.md
Normal file
@@ -0,0 +1,458 @@
|
||||
# Observability Event Stream
|
||||
|
||||
Codex currently has three overlapping observability systems:
|
||||
|
||||
- `codex-rs/analytics` reduces runtime facts into remote product analytics.
|
||||
- `codex-rs/rollout-trace` records rich local diagnostic traces and reduces
|
||||
them into a replay/debug graph.
|
||||
- `codex-rs/otel` emits logs, trace-safe events, and metrics, while native
|
||||
`tracing` spans carry execution scope and W3C trace context.
|
||||
|
||||
The overlap is real: thread lifecycle, turns, model calls, tools, compaction,
|
||||
subagents, transport, and feature usage are described more than once. Directly
|
||||
merging the crates would couple incompatible outputs. The shared layer should
|
||||
instead define the facts Codex emits; each destination should decide how those
|
||||
facts are filtered, reduced, stored, or exported.
|
||||
|
||||
## Decision
|
||||
|
||||
Introduce a shared observation stream for semantic facts.
|
||||
|
||||
```text
|
||||
Observations own semantic facts and metrics derived from facts.
|
||||
tracing owns spans and trace context propagation.
|
||||
OTEL is a consumer of observations, not a second event taxonomy.
|
||||
```
|
||||
|
||||
This gives one rule for callers:
|
||||
|
||||
- Use observations when a thing happened.
|
||||
- Use sinks/reducers for analytics, rollout trace, OTEL events, OTEL metrics,
|
||||
local buffers, and feedback upload.
|
||||
- Use `tracing::span!` when code needs active execution scope, parent/child
|
||||
causality, span mutation, or W3C context propagation.
|
||||
|
||||
The target state should not be a long-lived mix where some facts use
|
||||
observations and other facts call analytics or OTEL directly. Direct OTEL code
|
||||
should remain for exporter setup, native spans, context propagation, and
|
||||
low-level tracing plumbing.
|
||||
|
||||
## Goals
|
||||
|
||||
- Define canonical observation events that describe what Codex did, not where
|
||||
the data is going.
|
||||
- Keep analytics, rollout trace, OTEL logs, and OTEL metrics as projections.
|
||||
- Apply filtering at field level so a single observation can contain both safe
|
||||
aggregate fields and richer local-only evidence.
|
||||
- Preserve existing analytics and OTEL output behavior while moving callsites
|
||||
toward the shared stream.
|
||||
- Support local rich retention for feedback/debug upload with explicit policy.
|
||||
- Prove analytics equivalence with side-by-side conformance tests.
|
||||
|
||||
## Non-Goals
|
||||
|
||||
- Do not make analytics facts the source of truth for rollout trace.
|
||||
- Do not make rollout trace raw events the source of truth for analytics.
|
||||
- Do not encode downstream event names such as `codex_turn_event` or
|
||||
`codex.tool_result` into the canonical taxonomy.
|
||||
- Do not replace native spans with `SpanStarted` / `SpanEnded` observations.
|
||||
- Do not require full rollout trace graph equivalence before the first version;
|
||||
rollout trace is experimental and can use lighter validation initially.
|
||||
|
||||
## Shape
|
||||
|
||||
```text
|
||||
codex-core / app-server
|
||||
emit observations
|
||||
|
||||
codex-observability
|
||||
observation traits, field metadata, sink policy, event definitions
|
||||
|
||||
codex-observability-derive
|
||||
derive macro for field metadata and visitors
|
||||
|
||||
sinks/reducers
|
||||
analytics projection -> TrackEventRequest
|
||||
rollout trace projection -> local trace bundle + reduced graph
|
||||
OTEL event projection -> current log and trace-safe event names
|
||||
OTEL metric projection -> current counters and histograms
|
||||
ringbuffer -> recent rich local observations
|
||||
|
||||
native tracing spans
|
||||
remain direct tracing instrumentation
|
||||
```
|
||||
|
||||
The derive crate exists because Rust does not expose struct field attributes at
|
||||
runtime. The public API can still re-export the derive from
|
||||
`codex-observability` so most callsites only import one crate.
|
||||
|
||||
Typed observation events are internal source types, not exported wire schemas.
|
||||
Their compatibility boundary is the reducer output: analytics event JSON, OTEL
|
||||
exports, rollout trace bundles, or any other persisted/shared destination
|
||||
schema. This keeps the event taxonomy easy to rename, split, merge, or reshape
|
||||
in the same PR that updates its reducers, while preserving compatibility where
|
||||
it matters.
|
||||
|
||||
If raw observations are ever persisted or consumed across process/version
|
||||
boundaries, they should be treated as versioned schemas instead.
|
||||
|
||||
## Why Not Just OTEL?
|
||||
|
||||
An alternative is to emit semantic facts as ordinary `tracing` events and let a
|
||||
local subscriber, OTEL exporter, analytics reducer, or trace reducer all consume
|
||||
the same event stream. That is attractive because Codex already uses
|
||||
`tracing`, and local JSON event capture is easy to wire up.
|
||||
|
||||
We should still avoid making OTEL/tracing events the canonical schema:
|
||||
|
||||
- `tracing` fields are stringly typed at the reducer boundary. Renames or type
|
||||
changes become runtime reducer failures instead of Rust compile errors.
|
||||
- OTEL trace-safe export has a different privacy posture than local diagnostic
|
||||
traces. Rich local fields such as tool output, prompts, model payloads, and
|
||||
terminal output should not share a target with remotely exported span events.
|
||||
- Field-level policy needs metadata that `tracing` does not preserve in a
|
||||
structured way. We need to know both detail level and data class before a
|
||||
sink serializes a field.
|
||||
- Analytics conformance is easier from typed observations than from flattened
|
||||
JSON logs, because reducers can match on Rust event types and fields.
|
||||
- Spans and semantic facts have different lifecycles. Spans model active scope
|
||||
and context propagation; observations model facts that happened.
|
||||
|
||||
OTEL should therefore consume observations for semantic events and derived
|
||||
metrics, while native `tracing` remains the right tool for spans, context
|
||||
propagation, and exporter plumbing. A local trace sink can still write JSONL
|
||||
bundles; it should write observation envelopes rather than raw
|
||||
`tracing_subscriber` event JSON.
|
||||
|
||||
## Observation API
|
||||
|
||||
The shared schema API can be small:
|
||||
|
||||
```rust
|
||||
pub trait Observation {
|
||||
const NAME: &'static str;
|
||||
|
||||
fn visit_fields<V: ObservationFieldVisitor>(&self, visitor: &mut V);
|
||||
}
|
||||
|
||||
pub trait ObservationFieldVisitor {
|
||||
fn field<T: serde::Serialize + ?Sized>(
|
||||
&mut self,
|
||||
name: &'static str,
|
||||
meta: FieldMeta,
|
||||
value: &T,
|
||||
);
|
||||
}
|
||||
|
||||
pub trait ObservationSink {
|
||||
fn observe<E: Observation>(&self, event: &E);
|
||||
}
|
||||
```
|
||||
|
||||
Example:
|
||||
|
||||
```rust
|
||||
use codex_observability::Observation;
|
||||
|
||||
#[derive(Observation)]
|
||||
#[observation(name = "turn.started", uses = ["analytics"])]
|
||||
struct TurnStarted<'a> {
|
||||
#[obs(level = "basic", class = "identifier")]
|
||||
thread_id: &'a str,
|
||||
|
||||
#[obs(level = "basic", class = "identifier")]
|
||||
turn_id: &'a str,
|
||||
|
||||
#[obs(level = "basic", class = "operational")]
|
||||
started_at: i64,
|
||||
}
|
||||
```
|
||||
|
||||
The derive only exposes metadata. It should not implement analytics mapping,
|
||||
trace persistence, redaction, schema generation, or export behavior.
|
||||
|
||||
## Integration API
|
||||
|
||||
Product/runtime code should integrate through small semantic helper groups, not
|
||||
by spreading raw event construction through the codebase. Direct construction of
|
||||
`events::*` structs is fine in tests and inside the helpers, but ordinary
|
||||
product callsites should read like the domain action that just happened.
|
||||
|
||||
The helper layer should do the tedious, failure-prone work near the callsite:
|
||||
extract common IDs, join session/thread/turn context, compute durations,
|
||||
normalize status and error fields, avoid trace-only allocations when no sink
|
||||
can read them, and then emit the typed observation.
|
||||
|
||||
Example shape:
|
||||
|
||||
```rust
|
||||
observability.turn().started(&turn, &resolved_config);
|
||||
observability.compaction().ended(&run, &outcome);
|
||||
observability.features().skill_invoked(&turn, &resolved_skill);
|
||||
```
|
||||
|
||||
The important boundary is what the helpers accept. They should take existing
|
||||
runtime concepts such as sessions, turns, tool invocations, compaction runs,
|
||||
and resolved feature metadata. They should not take analytics facts, OTEL tag
|
||||
maps, rollout trace rows, or other destination schemas. If a helper cannot
|
||||
produce the needed output without depending on a destination-specific type, the
|
||||
event definition or the runtime callsite is not ready yet.
|
||||
|
||||
Initial integration should add one narrow helper group at a time. A useful
|
||||
vertical slice has:
|
||||
|
||||
- one domain helper method at the real product boundary,
|
||||
- one canonical event emitted by that helper,
|
||||
- one projection test showing how the destination reducer consumes the event.
|
||||
|
||||
This mirrors the useful part of the existing systems:
|
||||
|
||||
- rollout trace has `RolloutTraceRecorder::record_*` helpers
|
||||
- analytics has `AnalyticsEventsClient::track_*` helpers
|
||||
- OTEL has `SessionTelemetry::record_*` helpers
|
||||
|
||||
The helper layer must not become a second taxonomy. It is ergonomic glue over
|
||||
canonical observation structs. Prefer small domain-specific helper groups such
|
||||
as turn, tool, inference, transport, compaction, and feature usage over one
|
||||
large recorder with every method in the system.
|
||||
|
||||
## Rich Payloads
|
||||
|
||||
Start simple. Ordinary observation fields should be eager values or borrowed
|
||||
references. Sinks should reject fields by metadata before serializing them, so
|
||||
analytics and OTEL metrics do not inspect trace/content fields.
|
||||
|
||||
Some trace fields point at large objects that usually already exist at the
|
||||
callsite: model requests, model responses, tool invocations, tool results, or
|
||||
terminal output. Prefer passing borrowed serializable views of those objects to
|
||||
the observation helper rather than building `serde_json::Value` eagerly.
|
||||
|
||||
Do not introduce a generalized lazy-field system in the first implementation.
|
||||
Closures such as `|| build_trace_payload(...)` are useful when constructing the
|
||||
trace view itself is expensive, but they add API complexity. Treat them as a
|
||||
later optimization for measured hot spots, not as the default observation
|
||||
style.
|
||||
|
||||
## Field Policy
|
||||
|
||||
Filtering must be field-level. Event-level labels such as basic/detailed/trace
|
||||
are too coarse because one event often contains both remote-safe counters and
|
||||
local-only content.
|
||||
|
||||
Each field should carry at least:
|
||||
|
||||
- **Use markers**: exact projections intended to consume the field, for example
|
||||
`analytics`, `otel`, or `rollout_trace`.
|
||||
- **Detail level**: `basic`, `detailed`, or `trace`.
|
||||
- **Data class**: `identifier`, `operational`, `environment`, `content`, or
|
||||
`secret_risk`.
|
||||
|
||||
Use markers are the allow marker: a sink should consume only fields explicitly
|
||||
marked for that sink. Detail level and data class are descriptive metadata for
|
||||
review, auditing, and sinks that need coarse verbosity or redaction modes.
|
||||
Event structs may define default use markers when most fields feed the same
|
||||
projection; field-level use markers override that default for mixed events.
|
||||
|
||||
Detail level is not privacy by itself. A tiny field can still be unsafe for
|
||||
remote export, and a trace-level field can be trace-level because it is large
|
||||
rather than sensitive. Data class is also not enough for selection: analytics
|
||||
must not consume every basic operational field just because it would be safe.
|
||||
|
||||
Expected sink behavior:
|
||||
|
||||
- Analytics consumes fields marked `analytics`. Content-bearing analytics fields
|
||||
should be rare and called out on the field or projection that exports them.
|
||||
- Rollout trace allows rich local fields, with explicit redaction rules for
|
||||
secret-risk material.
|
||||
- OTEL logs preserve today's log export behavior, including account/email only
|
||||
where today's policy allows them.
|
||||
- OTEL trace-safe events prefer lengths, counts, status, timing, and coarse
|
||||
categories over content.
|
||||
- OTEL metrics select exact metric dimensions and then apply the OTEL policy.
|
||||
- Feedback upload applies an explicit user-approved policy over the ringbuffer.
|
||||
|
||||
## Event Taxonomy
|
||||
|
||||
The taxonomy should be designed from Codex workflows, not from existing
|
||||
destination schemas. Names should describe facts that occurred in the system:
|
||||
`turn.ended`, `tool_call.ended`, `transport.api_request_completed`.
|
||||
|
||||
Rules:
|
||||
|
||||
- Prefer stable domain facts over analytics, OTEL, or trace storage names.
|
||||
- Model lifecycle facts explicitly when downstream reducers need ordering or
|
||||
duration.
|
||||
- Keep event count small until a consumer needs a distinct fact.
|
||||
- Put richness in fields, not in parallel event variants.
|
||||
- Add transport/runtime facts when they are first-class telemetry today, even
|
||||
if analytics ignores them.
|
||||
- Keep an escape hatch only for local experimental tracing, not remote export.
|
||||
|
||||
Initial workflow coverage should be chosen by conformance need:
|
||||
|
||||
| Workflow | Canonical examples | Primary consumers |
|
||||
| --- | --- | --- |
|
||||
| Session/thread | `session.config_resolved`, `thread.started`, `thread.ended` | analytics, OTEL, rollout |
|
||||
| Turn lifecycle | `turn.requested`, `turn.started`, `turn.ended` | analytics, OTEL, rollout |
|
||||
| Turn timing | `turn.first_token_observed`, `turn.first_message_observed` | OTEL metrics, rollout |
|
||||
| Model I/O | `inference.started`, `inference.sse_event_observed`, `inference.completed`, `inference.failed` | OTEL, rollout |
|
||||
| Tools | `tool_call.started`, `tool_call.approval_resolved`, `tool_call.ended` | OTEL, rollout |
|
||||
| Compaction | `compaction.started`, `compaction.installed`, `compaction.ended` | analytics, rollout |
|
||||
| Agents | `agent.task_sent`, `agent.message_sent`, `agent.result_delivered`, `agent.closed` | rollout, analytics subset |
|
||||
| Product features | `skill.invoked`, `app.mentioned`, `app.used`, `hook.run_completed`, `plugin.used`, `plugin.state_changed`, `review.completed` | analytics |
|
||||
| Transport/auth | `transport.api_request_completed`, `transport.websocket_request_completed`, `auth.recovery_step_completed` | OTEL |
|
||||
|
||||
This table is not a complete schema. Each event still needs a typed Rust
|
||||
definition with field annotations before implementation.
|
||||
|
||||
## Projections
|
||||
|
||||
### Analytics
|
||||
|
||||
Analytics becomes a reducer over observations. Existing product events remain
|
||||
output schemas.
|
||||
|
||||
| Observations | Analytics output |
|
||||
| --- | --- |
|
||||
| `thread.started` | `codex_thread_initialized` |
|
||||
| `turn.started` with resolved config, `turn.ended` with token usage | `codex_turn_event` |
|
||||
| `turn.steer` | `codex_turn_steer_event` |
|
||||
| compaction lifecycle observations | `codex_compaction_event` |
|
||||
| skill/app/plugin/review observations | existing feature events |
|
||||
|
||||
`review.completed` is generic, but the current analytics projection only emits
|
||||
the legacy guardian event for guardian reviewer responses. User reviewer
|
||||
responses remain represented in the observation stream until a consumer needs
|
||||
an analytics output for them.
|
||||
|
||||
The analytics crate should keep the observation-to-legacy-schema translation in
|
||||
a private projection module. The reducer then stays responsible for ingestion
|
||||
orchestration, batching, deduplication, connection metadata joins, and product
|
||||
event naming rather than becoming a mixed bag of mapping code.
|
||||
|
||||
### Rollout Trace
|
||||
|
||||
Rollout trace consumes observations and decides which fields become trace
|
||||
payloads. Trace storage details such as `RawPayloadRef` should remain a trace
|
||||
implementation detail, not part of the shared event definitions.
|
||||
|
||||
Example: `inference.completed` can include response material as a trace-level
|
||||
field. The trace sink may write it to `payloads/*.json`; analytics and OTEL
|
||||
metrics never read it.
|
||||
|
||||
### OTEL
|
||||
|
||||
OTEL consumes observations for every semantic fact and derived metric. The
|
||||
projection preserves existing exported names while removing separate callsite
|
||||
event definitions.
|
||||
|
||||
| Observations | OTEL output |
|
||||
| --- | --- |
|
||||
| session/thread observations | `codex.conversation_starts`, `codex.thread.started` |
|
||||
| `turn.input_received` | `codex.user_prompt` |
|
||||
| `tool_call.approval_resolved` | `codex.tool_decision` |
|
||||
| `tool_call.ended` | `codex.tool_result`, tool count/duration metrics |
|
||||
| transport observations | API/websocket events and count/duration metrics |
|
||||
| `auth.recovery_step_completed` | `codex.auth_recovery` |
|
||||
| inference/SSE observations | `codex.sse_event`, SSE duration, response timing metrics |
|
||||
| turn timing/token observations | TTFT, TTFM, E2E duration, token usage metrics |
|
||||
|
||||
Log-only and trace-safe OTEL variants can be emitted from the same observation
|
||||
by applying different field policies.
|
||||
|
||||
## Conformance Testing
|
||||
|
||||
The highest-value equivalence target is analytics because it is existing
|
||||
product behavior.
|
||||
|
||||
```text
|
||||
same E2E Codex run
|
||||
legacy analytics facts -> current AnalyticsReducer -> TrackEventRequest JSON
|
||||
observations -> analytics observation reducer -> TrackEventRequest JSON
|
||||
|
||||
assert exact equality after stable JSON normalization
|
||||
```
|
||||
|
||||
At least one conformance path should apply exact analytics use markers plus the
|
||||
analytics guardrail policy, so missing markers and unsafe annotations fail
|
||||
tests. Recommended first scenarios:
|
||||
|
||||
- thread start
|
||||
- normal turn
|
||||
- failed or interrupted turn
|
||||
- accepted and rejected turn steering
|
||||
- compaction
|
||||
- skill, app, plugin, and review events
|
||||
- subagent thread start
|
||||
|
||||
Rollout trace can start with observation-to-trace unit tests plus one or two
|
||||
end-to-end smoke tests that write and reduce a bundle. Full graph equivalence
|
||||
is not necessary for the initial experimental rollout feature.
|
||||
|
||||
OTEL should get projection tests that compare exported event names, fields,
|
||||
metric names, and tags for representative observations. Native span tests
|
||||
should remain in the tracing layer.
|
||||
|
||||
## Implementation Stages
|
||||
|
||||
Build the proof of life on a fresh `main` branch, but keep commits separable so
|
||||
the working branch can later be split into reviewable PRs.
|
||||
Each stage should leave a small, documented API surface and a passing local
|
||||
verification gate. Non-essential behavior should stay as a TODO until a later
|
||||
stage needs it.
|
||||
|
||||
1. **Observation foundation**
|
||||
- Add `codex-observability` and `codex-observability-derive`.
|
||||
- Verify with crate-local tests that derived observations expose field
|
||||
metadata and reject missing annotations at compile time.
|
||||
- Run: `cargo test -p codex-observability`.
|
||||
|
||||
2. **Analytics conformance slice**
|
||||
- Add the first typed observations and an analytics projection for one
|
||||
existing analytics flow.
|
||||
- Verify legacy analytics facts and observation-derived facts produce
|
||||
identical `TrackEventRequest` JSON after stable normalization.
|
||||
- Run: targeted `cargo test -p codex-analytics`.
|
||||
|
||||
3. **E2E analytics shadowing**
|
||||
- Capture legacy facts and observations from the same core/app-server test
|
||||
run for selected scenarios.
|
||||
- Verify exact analytics output equality for thread start, normal turn,
|
||||
failed/interrupted turn, steering, compaction, and one feature event.
|
||||
- Run the targeted core/app-server integration tests added for each
|
||||
scenario.
|
||||
|
||||
4. **Rollout trace projection**
|
||||
- Add a rollout trace sink that consumes the same observations for
|
||||
thread/turn, inference, and tool lifecycle.
|
||||
- Verify with observation-to-trace unit tests and one smoke test that writes
|
||||
and reduces a local bundle.
|
||||
|
||||
5. **OTEL projection proof**
|
||||
- Add a small OTEL projection for a high-value event such as tool result.
|
||||
- Verify exported event names, field policy, metric names, and tags match
|
||||
the current behavior.
|
||||
|
||||
## Migration
|
||||
|
||||
Start with a shadow observation stream and conformance tests. Once analytics
|
||||
equivalence is reliable, move callsites from direct analytics facts to
|
||||
observations.
|
||||
|
||||
Rollout trace can adopt observations directly because it is experimental.
|
||||
|
||||
OTEL should be migrated by category rather than left permanently mixed:
|
||||
tool/user/turn events, transport events, auth events, and derived metrics can
|
||||
move behind the OTEL projection. Native spans and trace context stay where they
|
||||
are.
|
||||
|
||||
## Open Questions
|
||||
|
||||
- What is the smallest useful field-class set?
|
||||
- Should timestamps and sequence numbers live in the observation envelope or be
|
||||
supplied by sinks?
|
||||
- Where should fanout live so callsites do not grow `codex-core` further?
|
||||
- How should borrowed trace-level payloads avoid hot-path allocations while
|
||||
still allowing sinks to serialize when policy permits?
|
||||
- Which direct OTEL counters are first-class observations, and which are
|
||||
metrics derived from broader observations?
|
||||
Reference in New Issue
Block a user