Compare commits

...

3 Commits

Author SHA1 Message Date
rhan-oai
c5ba9a7c21 [codex-analytics] emit tool item events from item lifecycle 2026-05-03 15:11:22 -07:00
rhan-oai
89bba5d2c3 [codex-analytics] add tool item event schemas 2026-05-02 23:28:33 -07:00
rhan-oai
73ff402725 [codex-analytics] add core item timing production 2026-05-02 20:56:20 -07:00
31 changed files with 1497 additions and 44 deletions

View File

@@ -3,12 +3,17 @@ use crate::events::AppServerRpcTransport;
use crate::events::CodexAppMentionedEventRequest;
use crate::events::CodexAppServerClientMetadata;
use crate::events::CodexAppUsedEventRequest;
use crate::events::CodexCommandExecutionEventParams;
use crate::events::CodexCommandExecutionEventRequest;
use crate::events::CodexCompactionEventRequest;
use crate::events::CodexHookRunEventRequest;
use crate::events::CodexPluginEventRequest;
use crate::events::CodexPluginUsedEventRequest;
use crate::events::CodexRuntimeMetadata;
use crate::events::CodexToolItemEventBase;
use crate::events::CodexTurnEventRequest;
use crate::events::CommandExecutionFamily;
use crate::events::CommandExecutionSource;
use crate::events::GuardianApprovalRequestSource;
use crate::events::GuardianReviewDecision;
use crate::events::GuardianReviewEventParams;
@@ -17,6 +22,8 @@ use crate::events::GuardianReviewTerminalStatus;
use crate::events::GuardianReviewedAction;
use crate::events::ThreadInitializedEvent;
use crate::events::ThreadInitializedEventParams;
use crate::events::ToolItemFinalApprovalOutcome;
use crate::events::ToolItemTerminalStatus;
use crate::events::TrackEventRequest;
use crate::events::codex_app_metadata;
use crate::events::codex_hook_run_metadata;
@@ -61,8 +68,12 @@ use codex_app_server_protocol::ClientInfo;
use codex_app_server_protocol::ClientRequest;
use codex_app_server_protocol::ClientResponsePayload;
use codex_app_server_protocol::CodexErrorInfo;
use codex_app_server_protocol::CommandExecutionSource as AppServerCommandExecutionSource;
use codex_app_server_protocol::CommandExecutionStatus;
use codex_app_server_protocol::InitializeCapabilities;
use codex_app_server_protocol::InitializeParams;
use codex_app_server_protocol::ItemCompletedNotification;
use codex_app_server_protocol::ItemStartedNotification;
use codex_app_server_protocol::JSONRPCErrorError;
use codex_app_server_protocol::NonSteerableTurnKind;
use codex_app_server_protocol::RequestId;
@@ -72,6 +83,7 @@ use codex_app_server_protocol::SessionSource as AppServerSessionSource;
use codex_app_server_protocol::Thread;
use codex_app_server_protocol::ThreadArchiveParams;
use codex_app_server_protocol::ThreadArchiveResponse;
use codex_app_server_protocol::ThreadItem;
use codex_app_server_protocol::ThreadResumeResponse;
use codex_app_server_protocol::ThreadStartResponse;
use codex_app_server_protocol::ThreadStatus as AppServerThreadStatus;
@@ -581,6 +593,72 @@ async fn ingest_turn_prerequisites(
}
}
async fn ingest_tool_review_prerequisites(
reducer: &mut AnalyticsReducer,
events: &mut Vec<TrackEventRequest>,
) {
reducer
.ingest(sample_initialize_fact(/*connection_id*/ 7), events)
.await;
reducer
.ingest(
AnalyticsFact::ClientResponse {
connection_id: 7,
request_id: RequestId::Integer(1),
response: Box::new(sample_thread_start_response(
"thread-1", /*ephemeral*/ false, "gpt-5",
)),
},
events,
)
.await;
events.clear();
}
fn sample_initialize_fact(connection_id: u64) -> AnalyticsFact {
AnalyticsFact::Initialize {
connection_id,
params: InitializeParams {
client_info: ClientInfo {
name: "codex-tui".to_string(),
title: None,
version: "1.0.0".to_string(),
},
capabilities: Some(InitializeCapabilities {
experimental_api: false,
opt_out_notification_methods: None,
}),
},
product_client_id: DEFAULT_ORIGINATOR.to_string(),
runtime: CodexRuntimeMetadata {
codex_rs_version: "0.99.0".to_string(),
runtime_os: "linux".to_string(),
runtime_os_version: "24.04".to_string(),
runtime_arch: "x86_64".to_string(),
},
rpc_transport: AppServerRpcTransport::Websocket,
}
}
fn sample_command_execution_item(
status: CommandExecutionStatus,
exit_code: Option<i32>,
duration_ms: Option<i64>,
) -> ThreadItem {
ThreadItem::CommandExecution {
id: "item-1".to_string(),
command: "echo hi".to_string(),
cwd: test_path_buf("/tmp").abs(),
process_id: Some("pid-1".to_string()),
source: AppServerCommandExecutionSource::Agent,
status,
command_actions: Vec::new(),
aggregated_output: None,
exit_code,
duration_ms,
}
}
fn expected_absolute_path(path: &PathBuf) -> String {
std::fs::canonicalize(path)
.unwrap_or_else(|_| path.to_path_buf())
@@ -884,6 +962,103 @@ fn thread_initialized_event_serializes_expected_shape() {
);
}
#[test]
fn command_execution_event_serializes_expected_shape() {
let event = TrackEventRequest::CommandExecution(CodexCommandExecutionEventRequest {
event_type: "codex_command_execution_event",
event_params: CodexCommandExecutionEventParams {
base: CodexToolItemEventBase {
thread_id: "thread-1".to_string(),
turn_id: "turn-1".to_string(),
item_id: "item-1".to_string(),
app_server_client: CodexAppServerClientMetadata {
product_client_id: "codex_tui".to_string(),
client_name: Some("codex-tui".to_string()),
client_version: Some("1.2.3".to_string()),
rpc_transport: AppServerRpcTransport::Websocket,
experimental_api_enabled: Some(true),
},
runtime: CodexRuntimeMetadata {
codex_rs_version: "0.99.0".to_string(),
runtime_os: "macos".to_string(),
runtime_os_version: "15.3.1".to_string(),
runtime_arch: "aarch64".to_string(),
},
thread_source: Some("user"),
subagent_source: None,
parent_thread_id: None,
tool_name: "shell".to_string(),
started_at_ms: 123_000,
completed_at_ms: Some(125_000),
duration_ms: Some(2000),
execution_duration_ms: Some(1800),
execution_started: true,
review_count: 0,
guardian_review_count: 0,
user_review_count: 0,
final_approval_outcome: ToolItemFinalApprovalOutcome::NotNeeded,
terminal_status: ToolItemTerminalStatus::Completed,
failure_kind: None,
requested_additional_permissions: false,
requested_network_access: false,
retry_count: 0,
},
command_execution_source: CommandExecutionSource::Agent,
command_execution_family: CommandExecutionFamily::Shell,
exit_code: Some(0),
command_action_count: Some(1),
},
});
let payload = serde_json::to_value(&event).expect("serialize command execution event");
assert_eq!(
payload,
json!({
"event_type": "codex_command_execution_event",
"event_params": {
"thread_id": "thread-1",
"turn_id": "turn-1",
"item_id": "item-1",
"app_server_client": {
"product_client_id": "codex_tui",
"client_name": "codex-tui",
"client_version": "1.2.3",
"rpc_transport": "websocket",
"experimental_api_enabled": true
},
"runtime": {
"codex_rs_version": "0.99.0",
"runtime_os": "macos",
"runtime_os_version": "15.3.1",
"runtime_arch": "aarch64"
},
"thread_source": "user",
"subagent_source": null,
"parent_thread_id": null,
"tool_name": "shell",
"started_at_ms": 123000,
"completed_at_ms": 125000,
"duration_ms": 2000,
"execution_duration_ms": 1800,
"execution_started": true,
"review_count": 0,
"guardian_review_count": 0,
"user_review_count": 0,
"final_approval_outcome": "not_needed",
"terminal_status": "completed",
"failure_kind": null,
"requested_additional_permissions": false,
"requested_network_access": false,
"retry_count": 0,
"command_execution_source": "agent",
"command_execution_family": "shell",
"exit_code": 0,
"command_action_count": 1
}
})
);
}
#[tokio::test]
async fn initialize_caches_client_and_thread_lifecycle_publishes_once_initialized() {
let mut reducer = AnalyticsReducer::default();
@@ -1289,6 +1464,89 @@ async fn guardian_review_event_ingests_custom_fact_with_optional_target_item() {
assert_eq!(payload[0]["event_params"]["review_timeout_ms"], 90_000);
}
#[tokio::test]
async fn item_lifecycle_notifications_publish_command_execution_event() {
let mut reducer = AnalyticsReducer::default();
let mut events = Vec::new();
ingest_tool_review_prerequisites(&mut reducer, &mut events).await;
reducer
.ingest(
AnalyticsFact::Notification(Box::new(ServerNotification::ItemStarted(
ItemStartedNotification {
thread_id: "thread-1".to_string(),
turn_id: "turn-1".to_string(),
item: sample_command_execution_item(
CommandExecutionStatus::InProgress,
/*exit_code*/ None,
/*duration_ms*/ None,
),
started_at_ms: Some(1_000),
},
))),
&mut events,
)
.await;
assert!(
events.is_empty(),
"tool item event should emit on completion"
);
reducer
.ingest(
AnalyticsFact::Notification(Box::new(ServerNotification::ItemCompleted(
ItemCompletedNotification {
thread_id: "thread-1".to_string(),
turn_id: "turn-1".to_string(),
item: sample_command_execution_item(
CommandExecutionStatus::Completed,
Some(0),
Some(42),
),
completed_at_ms: Some(1_042),
},
))),
&mut events,
)
.await;
let payload = serde_json::to_value(&events).expect("serialize events");
assert_eq!(payload.as_array().expect("events array").len(), 1);
assert_eq!(payload[0]["event_type"], "codex_command_execution_event");
assert_eq!(payload[0]["event_params"]["thread_id"], "thread-1");
assert_eq!(payload[0]["event_params"]["turn_id"], "turn-1");
assert_eq!(payload[0]["event_params"]["item_id"], "item-1");
assert_eq!(payload[0]["event_params"]["tool_name"], "shell");
assert_eq!(
payload[0]["event_params"]["command_execution_source"],
"agent"
);
assert_eq!(
payload[0]["event_params"]["command_execution_family"],
"shell"
);
assert_eq!(payload[0]["event_params"]["terminal_status"], "completed");
assert_eq!(
payload[0]["event_params"]["final_approval_outcome"],
"unknown"
);
assert_eq!(
payload[0]["event_params"]["failure_kind"],
serde_json::Value::Null
);
assert_eq!(payload[0]["event_params"]["exit_code"], 0);
assert_eq!(payload[0]["event_params"]["started_at_ms"], 1_000);
assert_eq!(payload[0]["event_params"]["completed_at_ms"], 1_042);
assert_eq!(payload[0]["event_params"]["duration_ms"], 42);
assert_eq!(payload[0]["event_params"]["execution_duration_ms"], 42);
assert_eq!(payload[0]["event_params"]["execution_started"], true);
assert_eq!(
payload[0]["event_params"]["app_server_client"]["client_name"],
"codex-tui"
);
assert_eq!(payload[0]["event_params"]["thread_source"], "user");
}
#[test]
fn subagent_thread_started_review_serializes_expected_shape() {
let event = TrackEventRequest::ThreadInitialized(subagent_thread_started_event_request(
@@ -1572,6 +1830,79 @@ async fn subagent_thread_started_inherits_parent_connection_for_new_thread() {
);
}
#[tokio::test]
async fn subagent_tool_items_inherit_parent_connection_metadata() {
let mut reducer = AnalyticsReducer::default();
let mut events = Vec::new();
ingest_tool_review_prerequisites(&mut reducer, &mut events).await;
reducer
.ingest(
AnalyticsFact::Custom(CustomAnalyticsFact::SubAgentThreadStarted(
SubAgentThreadStartedInput {
thread_id: "thread-subagent".to_string(),
parent_thread_id: Some("thread-1".to_string()),
product_client_id: "codex-tui".to_string(),
client_name: "codex-tui".to_string(),
client_version: "1.0.0".to_string(),
model: "gpt-5".to_string(),
ephemeral: false,
subagent_source: SubAgentSource::Review,
created_at: 128,
},
)),
&mut events,
)
.await;
events.clear();
reducer
.ingest(
AnalyticsFact::Notification(Box::new(ServerNotification::ItemStarted(
ItemStartedNotification {
thread_id: "thread-subagent".to_string(),
turn_id: "turn-subagent".to_string(),
item: sample_command_execution_item(
CommandExecutionStatus::InProgress,
/*exit_code*/ None,
/*duration_ms*/ None,
),
started_at_ms: Some(1_000),
},
))),
&mut events,
)
.await;
reducer
.ingest(
AnalyticsFact::Notification(Box::new(ServerNotification::ItemCompleted(
ItemCompletedNotification {
thread_id: "thread-subagent".to_string(),
turn_id: "turn-subagent".to_string(),
item: sample_command_execution_item(
CommandExecutionStatus::Completed,
Some(0),
Some(42),
),
completed_at_ms: Some(1_042),
},
))),
&mut events,
)
.await;
let payload = serde_json::to_value(&events).expect("serialize events");
assert_eq!(payload.as_array().expect("events array").len(), 1);
assert_eq!(payload[0]["event_type"], "codex_command_execution_event");
assert_eq!(payload[0]["event_params"]["thread_source"], "subagent");
assert_eq!(payload[0]["event_params"]["subagent_source"], "review");
assert_eq!(payload[0]["event_params"]["parent_thread_id"], "thread-1");
assert_eq!(
payload[0]["event_params"]["app_server_client"]["client_name"],
"codex-tui"
);
}
#[test]
fn plugin_used_event_serializes_expected_shape() {
let tracking = TrackEventsContext {

View File

@@ -333,10 +333,6 @@ impl AnalyticsEventsClient {
});
}
pub fn track_notification(&self, notification: ServerNotification) {
self.record_fact(AnalyticsFact::Notification(Box::new(notification)));
}
pub fn track_server_request(&self, connection_id: u64, request: ServerRequest) {
self.record_fact(AnalyticsFact::ServerRequest {
connection_id,
@@ -349,6 +345,21 @@ impl AnalyticsEventsClient {
response: Box::new(response),
});
}
pub fn track_notification(&self, notification: ServerNotification) {
if !matches!(
notification,
ServerNotification::TurnStarted(_)
| ServerNotification::TurnCompleted(_)
| ServerNotification::ItemStarted(_)
| ServerNotification::ItemCompleted(_)
| ServerNotification::ItemGuardianApprovalReviewStarted(_)
| ServerNotification::ItemGuardianApprovalReviewCompleted(_)
) {
return;
}
self.record_fact(AnalyticsFact::Notification(Box::new(notification)));
}
}
async fn send_track_events(

View File

@@ -61,6 +61,20 @@ pub(crate) enum TrackEventRequest {
Compaction(Box<CodexCompactionEventRequest>),
TurnEvent(Box<CodexTurnEventRequest>),
TurnSteer(CodexTurnSteerEventRequest),
#[allow(dead_code)]
CommandExecution(CodexCommandExecutionEventRequest),
#[allow(dead_code)]
FileChange(CodexFileChangeEventRequest),
#[allow(dead_code)]
McpToolCall(CodexMcpToolCallEventRequest),
#[allow(dead_code)]
DynamicToolCall(CodexDynamicToolCallEventRequest),
#[allow(dead_code)]
CollabAgentToolCall(CodexCollabAgentToolCallEventRequest),
#[allow(dead_code)]
WebSearch(CodexWebSearchEventRequest),
#[allow(dead_code)]
ImageGeneration(CodexImageGenerationEventRequest),
PluginUsed(CodexPluginUsedEventRequest),
PluginInstalled(CodexPluginEventRequest),
PluginUninstalled(CodexPluginEventRequest),
@@ -384,6 +398,218 @@ pub(crate) struct GuardianReviewEventPayload {
pub(crate) guardian_review: GuardianReviewEventParams,
}
#[allow(dead_code)]
#[derive(Clone, Copy, Debug, Serialize)]
#[serde(rename_all = "snake_case")]
pub(crate) enum ToolItemFinalApprovalOutcome {
Unknown,
NotNeeded,
ConfigAllowed,
PolicyForbidden,
GuardianApproved,
GuardianDenied,
GuardianAborted,
UserApproved,
UserApprovedForSession,
UserDenied,
UserAborted,
}
#[allow(dead_code)]
#[derive(Clone, Copy, Debug, Serialize)]
#[serde(rename_all = "snake_case")]
pub(crate) enum ToolItemTerminalStatus {
Completed,
Failed,
Rejected,
Interrupted,
}
#[allow(dead_code)]
#[derive(Clone, Copy, Debug, Serialize)]
#[serde(rename_all = "snake_case")]
pub(crate) enum ToolItemFailureKind {
ToolError,
ApprovalDenied,
ApprovalAborted,
SandboxDenied,
PolicyForbidden,
}
#[derive(Serialize)]
pub(crate) struct CodexToolItemEventBase {
pub(crate) thread_id: String,
pub(crate) turn_id: String,
/// App-server ThreadItem.id. For tool-originated items this generally
/// corresponds to the originating core call_id.
pub(crate) item_id: String,
pub(crate) app_server_client: CodexAppServerClientMetadata,
pub(crate) runtime: CodexRuntimeMetadata,
pub(crate) thread_source: Option<&'static str>,
pub(crate) subagent_source: Option<String>,
pub(crate) parent_thread_id: Option<String>,
pub(crate) tool_name: String,
pub(crate) started_at_ms: u64,
pub(crate) completed_at_ms: Option<u64>,
pub(crate) duration_ms: Option<u64>,
pub(crate) execution_duration_ms: Option<u64>,
pub(crate) execution_started: bool,
pub(crate) review_count: u64,
pub(crate) guardian_review_count: u64,
pub(crate) user_review_count: u64,
pub(crate) final_approval_outcome: ToolItemFinalApprovalOutcome,
pub(crate) terminal_status: ToolItemTerminalStatus,
pub(crate) failure_kind: Option<ToolItemFailureKind>,
pub(crate) requested_additional_permissions: bool,
pub(crate) requested_network_access: bool,
pub(crate) retry_count: u64,
}
#[allow(dead_code)]
#[derive(Clone, Copy, Debug, Serialize)]
#[serde(rename_all = "snake_case")]
pub(crate) enum CommandExecutionFamily {
Shell,
UserShell,
UnifiedExec,
}
#[allow(dead_code)]
#[derive(Clone, Copy, Debug, Serialize)]
#[serde(rename_all = "snake_case")]
pub(crate) enum CommandExecutionSource {
Agent,
UserShell,
UnifiedExecStartup,
UnifiedExecInteraction,
}
#[allow(dead_code)]
#[derive(Clone, Copy, Debug, Serialize)]
#[serde(rename_all = "snake_case")]
pub(crate) enum WebSearchActionKind {
Search,
OpenPage,
FindInPage,
Other,
}
#[derive(Serialize)]
pub(crate) struct CodexCommandExecutionEventParams {
#[serde(flatten)]
pub(crate) base: CodexToolItemEventBase,
pub(crate) command_execution_source: CommandExecutionSource,
pub(crate) command_execution_family: CommandExecutionFamily,
pub(crate) exit_code: Option<i32>,
pub(crate) command_action_count: Option<u64>,
}
#[derive(Serialize)]
pub(crate) struct CodexCommandExecutionEventRequest {
pub(crate) event_type: &'static str,
pub(crate) event_params: CodexCommandExecutionEventParams,
}
#[derive(Serialize)]
pub(crate) struct CodexFileChangeEventParams {
#[serde(flatten)]
pub(crate) base: CodexToolItemEventBase,
pub(crate) file_change_count: u64,
pub(crate) file_add_count: u64,
pub(crate) file_update_count: u64,
pub(crate) file_delete_count: u64,
pub(crate) file_move_count: u64,
}
#[derive(Serialize)]
pub(crate) struct CodexFileChangeEventRequest {
pub(crate) event_type: &'static str,
pub(crate) event_params: CodexFileChangeEventParams,
}
#[derive(Serialize)]
pub(crate) struct CodexMcpToolCallEventParams {
#[serde(flatten)]
pub(crate) base: CodexToolItemEventBase,
pub(crate) mcp_server_name: String,
pub(crate) mcp_tool_name: String,
pub(crate) mcp_error_present: bool,
pub(crate) mcp_error_code: Option<String>,
}
#[derive(Serialize)]
pub(crate) struct CodexMcpToolCallEventRequest {
pub(crate) event_type: &'static str,
pub(crate) event_params: CodexMcpToolCallEventParams,
}
#[derive(Serialize)]
pub(crate) struct CodexDynamicToolCallEventParams {
#[serde(flatten)]
pub(crate) base: CodexToolItemEventBase,
pub(crate) dynamic_tool_name: String,
pub(crate) success: Option<bool>,
pub(crate) output_content_item_count: Option<u64>,
pub(crate) output_text_item_count: Option<u64>,
pub(crate) output_image_item_count: Option<u64>,
}
#[derive(Serialize)]
pub(crate) struct CodexDynamicToolCallEventRequest {
pub(crate) event_type: &'static str,
pub(crate) event_params: CodexDynamicToolCallEventParams,
}
#[derive(Serialize)]
pub(crate) struct CodexCollabAgentToolCallEventParams {
#[serde(flatten)]
pub(crate) base: CodexToolItemEventBase,
pub(crate) sender_thread_id: String,
pub(crate) receiver_thread_count: u64,
pub(crate) receiver_thread_ids: Option<Vec<String>>,
pub(crate) requested_model: Option<String>,
pub(crate) requested_reasoning_effort: Option<String>,
pub(crate) agent_state_count: Option<u64>,
pub(crate) completed_agent_count: Option<u64>,
pub(crate) failed_agent_count: Option<u64>,
}
#[derive(Serialize)]
pub(crate) struct CodexCollabAgentToolCallEventRequest {
pub(crate) event_type: &'static str,
pub(crate) event_params: CodexCollabAgentToolCallEventParams,
}
#[derive(Serialize)]
pub(crate) struct CodexWebSearchEventParams {
#[serde(flatten)]
pub(crate) base: CodexToolItemEventBase,
pub(crate) web_search_action: Option<WebSearchActionKind>,
pub(crate) query_present: bool,
pub(crate) query_count: Option<u64>,
}
#[derive(Serialize)]
pub(crate) struct CodexWebSearchEventRequest {
pub(crate) event_type: &'static str,
pub(crate) event_params: CodexWebSearchEventParams,
}
#[derive(Serialize)]
pub(crate) struct CodexImageGenerationEventParams {
#[serde(flatten)]
pub(crate) base: CodexToolItemEventBase,
pub(crate) image_generation_status: String,
pub(crate) revised_prompt_present: bool,
pub(crate) saved_path_present: bool,
}
#[derive(Serialize)]
pub(crate) struct CodexImageGenerationEventRequest {
pub(crate) event_type: &'static str,
pub(crate) event_params: CodexImageGenerationEventParams,
}
#[derive(Serialize)]
pub(crate) struct CodexAppMetadata {
pub(crate) connector_id: Option<String>,

View File

@@ -51,3 +51,27 @@ pub fn now_unix_seconds() -> u64 {
.unwrap_or_default()
.as_secs()
}
pub fn now_unix_millis() -> u64 {
u64::try_from(
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_millis(),
)
.unwrap_or(u64::MAX)
}
pub(crate) fn serialize_enum_as_string<T: serde::Serialize>(value: &T) -> Option<String> {
serde_json::to_value(value)
.ok()
.and_then(|value| value.as_str().map(str::to_string))
}
pub(crate) fn usize_to_u64(value: usize) -> u64 {
u64::try_from(value).unwrap_or(u64::MAX)
}
pub(crate) fn option_i64_to_u64(value: Option<i64>) -> Option<u64> {
value.and_then(|value| u64::try_from(value).ok())
}

View File

@@ -2,15 +2,32 @@ use crate::events::AppServerRpcTransport;
use crate::events::CodexAppMentionedEventRequest;
use crate::events::CodexAppServerClientMetadata;
use crate::events::CodexAppUsedEventRequest;
use crate::events::CodexCollabAgentToolCallEventParams;
use crate::events::CodexCollabAgentToolCallEventRequest;
use crate::events::CodexCommandExecutionEventParams;
use crate::events::CodexCommandExecutionEventRequest;
use crate::events::CodexCompactionEventRequest;
use crate::events::CodexDynamicToolCallEventParams;
use crate::events::CodexDynamicToolCallEventRequest;
use crate::events::CodexFileChangeEventParams;
use crate::events::CodexFileChangeEventRequest;
use crate::events::CodexHookRunEventRequest;
use crate::events::CodexImageGenerationEventParams;
use crate::events::CodexImageGenerationEventRequest;
use crate::events::CodexMcpToolCallEventParams;
use crate::events::CodexMcpToolCallEventRequest;
use crate::events::CodexPluginEventRequest;
use crate::events::CodexPluginUsedEventRequest;
use crate::events::CodexRuntimeMetadata;
use crate::events::CodexToolItemEventBase;
use crate::events::CodexTurnEventParams;
use crate::events::CodexTurnEventRequest;
use crate::events::CodexTurnSteerEventParams;
use crate::events::CodexTurnSteerEventRequest;
use crate::events::CodexWebSearchEventParams;
use crate::events::CodexWebSearchEventRequest;
use crate::events::CommandExecutionFamily;
use crate::events::CommandExecutionSource;
use crate::events::GuardianReviewEventParams;
use crate::events::GuardianReviewEventPayload;
use crate::events::GuardianReviewEventRequest;
@@ -18,7 +35,11 @@ use crate::events::SkillInvocationEventParams;
use crate::events::SkillInvocationEventRequest;
use crate::events::ThreadInitializedEvent;
use crate::events::ThreadInitializedEventParams;
use crate::events::ToolItemFailureKind;
use crate::events::ToolItemFinalApprovalOutcome;
use crate::events::ToolItemTerminalStatus;
use crate::events::TrackEventRequest;
use crate::events::WebSearchActionKind;
use crate::events::codex_app_metadata;
use crate::events::codex_compaction_event_params;
use crate::events::codex_hook_run_metadata;
@@ -47,14 +68,29 @@ use crate::facts::TurnSteerRejectionReason;
use crate::facts::TurnSteerResult;
use crate::facts::TurnTokenUsageFact;
use crate::now_unix_seconds;
use crate::option_i64_to_u64;
use crate::serialize_enum_as_string;
use crate::usize_to_u64;
use codex_app_server_protocol::ClientRequest;
use codex_app_server_protocol::ClientResponse;
use codex_app_server_protocol::CodexErrorInfo;
use codex_app_server_protocol::CollabAgentStatus;
use codex_app_server_protocol::CollabAgentTool;
use codex_app_server_protocol::CollabAgentToolCallStatus;
use codex_app_server_protocol::CommandExecutionSource as AppServerCommandExecutionSource;
use codex_app_server_protocol::CommandExecutionStatus;
use codex_app_server_protocol::DynamicToolCallOutputContentItem;
use codex_app_server_protocol::DynamicToolCallStatus;
use codex_app_server_protocol::InitializeParams;
use codex_app_server_protocol::McpToolCallStatus;
use codex_app_server_protocol::PatchApplyStatus;
use codex_app_server_protocol::PatchChangeKind;
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::ServerNotification;
use codex_app_server_protocol::ThreadItem;
use codex_app_server_protocol::TurnSteerResponse;
use codex_app_server_protocol::UserInput;
use codex_app_server_protocol::WebSearchAction;
use codex_git_utils::collect_git_info;
use codex_git_utils::get_git_repo_root;
use codex_login::default_client::originator;
@@ -75,6 +111,7 @@ pub(crate) struct AnalyticsReducer {
turns: HashMap<String, TurnState>,
connections: HashMap<u64, ConnectionState>,
threads: HashMap<String, ThreadAnalyticsState>,
tool_item_state: HashMap<ToolItemKey, ToolItemState>,
}
struct ConnectionState {
@@ -118,6 +155,19 @@ impl<'a> AnalyticsDropSite<'a> {
}
}
fn tool_item(
notification: &'a codex_app_server_protocol::ItemCompletedNotification,
item_id: &'a str,
) -> Self {
Self {
event_name: "tool item",
thread_id: &notification.thread_id,
turn_id: Some(&notification.turn_id),
review_id: None,
item_id: Some(item_id),
}
}
fn turn_steer(thread_id: &'a str) -> Self {
Self {
event_name: "turn steer",
@@ -197,6 +247,27 @@ struct PendingTurnSteerState {
created_at: u64,
}
#[derive(Hash, PartialEq, Eq)]
struct ToolItemKey {
thread_id: String,
turn_id: String,
item_id: String,
}
struct ToolItemState {
started_at_ms: u64,
}
impl ToolItemKey {
fn new(thread_id: &str, turn_id: &str, item_id: &str) -> Self {
Self {
thread_id: thread_id.to_string(),
turn_id: turn_id.to_string(),
item_id: item_id.to_string(),
}
}
}
#[derive(Clone)]
struct CompletedTurnState {
status: Option<TurnStatus>,
@@ -686,6 +757,50 @@ impl AnalyticsReducer {
out: &mut Vec<TrackEventRequest>,
) {
match notification {
ServerNotification::ItemStarted(notification) => {
let Some(item_id) = tool_item_id(&notification.item) else {
return;
};
let Some(started_at_ms) = option_i64_to_u64(notification.started_at_ms) else {
return;
};
self.tool_item_state.insert(
ToolItemKey::new(&notification.thread_id, &notification.turn_id, item_id),
ToolItemState { started_at_ms },
);
}
ServerNotification::ItemCompleted(notification) => {
let Some(item_id) = tool_item_id(&notification.item) else {
return;
};
let Some(tool_item_state) = self.tool_item_state.remove(&ToolItemKey::new(
&notification.thread_id,
&notification.turn_id,
item_id,
)) else {
return;
};
let started_at_ms = tool_item_state.started_at_ms;
let Some(completed_at_ms) = option_i64_to_u64(notification.completed_at_ms) else {
return;
};
let Some((connection_state, thread_metadata)) = self
.thread_context_or_warn(AnalyticsDropSite::tool_item(&notification, item_id))
else {
return;
};
if let Some(event) = tool_item_event(
&notification.thread_id,
&notification.turn_id,
&notification.item,
started_at_ms,
completed_at_ms,
connection_state,
thread_metadata,
) {
out.push(event);
}
}
ServerNotification::TurnStarted(notification) => {
let turn_state = self.turns.entry(notification.turn.id).or_insert(TurnState {
connection_id: None,
@@ -772,8 +887,8 @@ impl AnalyticsReducer {
ephemeral: thread.ephemeral,
thread_source: thread_metadata.thread_source,
initialization_mode,
subagent_source: thread_metadata.subagent_source,
parent_thread_id: thread_metadata.parent_thread_id,
subagent_source: thread_metadata.subagent_source.clone(),
parent_thread_id: thread_metadata.parent_thread_id.clone(),
created_at: u64::try_from(thread.created_at).unwrap_or_default(),
},
},
@@ -976,6 +1091,562 @@ fn warn_missing_analytics_context(
);
}
fn tool_item_id(item: &ThreadItem) -> Option<&str> {
match item {
ThreadItem::CommandExecution { id, .. }
| ThreadItem::FileChange { id, .. }
| ThreadItem::McpToolCall { id, .. }
| ThreadItem::DynamicToolCall { id, .. }
| ThreadItem::CollabAgentToolCall { id, .. }
| ThreadItem::WebSearch { id, .. }
| ThreadItem::ImageGeneration { id, .. } => Some(id),
_ => None,
}
}
fn tool_item_event(
thread_id: &str,
turn_id: &str,
item: &ThreadItem,
started_at_ms: u64,
completed_at_ms: u64,
connection_state: &ConnectionState,
thread_metadata: &ThreadMetadataState,
) -> Option<TrackEventRequest> {
match item {
ThreadItem::CommandExecution {
id,
source,
status,
command_actions,
exit_code,
duration_ms,
..
} => {
let (terminal_status, failure_kind) = command_execution_outcome(status)?;
let base = tool_item_base(
thread_id,
turn_id,
id.clone(),
command_execution_tool_name(*source).to_string(),
ToolItemOutcome {
terminal_status,
failure_kind,
execution_duration_ms: option_i64_to_u64(*duration_ms),
},
ToolItemContext {
started_at_ms,
completed_at_ms,
connection_state,
thread_metadata,
},
);
Some(TrackEventRequest::CommandExecution(
CodexCommandExecutionEventRequest {
event_type: "codex_command_execution_event",
event_params: CodexCommandExecutionEventParams {
base,
command_execution_source: (*source).into(),
command_execution_family: command_execution_family(*source),
exit_code: *exit_code,
command_action_count: Some(usize_to_u64(command_actions.len())),
},
},
))
}
ThreadItem::FileChange {
id,
changes,
status,
} => {
let (terminal_status, failure_kind) = patch_apply_outcome(status)?;
let counts = file_change_counts(changes);
let base = tool_item_base(
thread_id,
turn_id,
id.clone(),
"apply_patch".to_string(),
ToolItemOutcome {
terminal_status,
failure_kind,
execution_duration_ms: None,
},
ToolItemContext {
started_at_ms,
completed_at_ms,
connection_state,
thread_metadata,
},
);
Some(TrackEventRequest::FileChange(CodexFileChangeEventRequest {
event_type: "codex_file_change_event",
event_params: CodexFileChangeEventParams {
base,
file_change_count: usize_to_u64(changes.len()),
file_add_count: counts.add,
file_update_count: counts.update,
file_delete_count: counts.delete,
file_move_count: counts.move_,
},
}))
}
ThreadItem::McpToolCall {
id,
server,
tool,
status,
error,
duration_ms,
..
} => {
let (terminal_status, failure_kind) = mcp_tool_call_outcome(status)?;
let base = tool_item_base(
thread_id,
turn_id,
id.clone(),
tool.clone(),
ToolItemOutcome {
terminal_status,
failure_kind,
execution_duration_ms: option_i64_to_u64(*duration_ms),
},
ToolItemContext {
started_at_ms,
completed_at_ms,
connection_state,
thread_metadata,
},
);
Some(TrackEventRequest::McpToolCall(
CodexMcpToolCallEventRequest {
event_type: "codex_mcp_tool_call_event",
event_params: CodexMcpToolCallEventParams {
base,
mcp_server_name: server.clone(),
mcp_tool_name: tool.clone(),
mcp_error_present: error.is_some(),
mcp_error_code: None,
},
},
))
}
ThreadItem::DynamicToolCall {
id,
tool,
status,
content_items,
success,
duration_ms,
..
} => {
let (terminal_status, failure_kind) = dynamic_tool_call_outcome(status)?;
let counts = content_items
.as_ref()
.map(|items| dynamic_content_counts(items));
let base = tool_item_base(
thread_id,
turn_id,
id.clone(),
tool.clone(),
ToolItemOutcome {
terminal_status,
failure_kind,
execution_duration_ms: option_i64_to_u64(*duration_ms),
},
ToolItemContext {
started_at_ms,
completed_at_ms,
connection_state,
thread_metadata,
},
);
Some(TrackEventRequest::DynamicToolCall(
CodexDynamicToolCallEventRequest {
event_type: "codex_dynamic_tool_call_event",
event_params: CodexDynamicToolCallEventParams {
base,
dynamic_tool_name: tool.clone(),
success: *success,
output_content_item_count: counts.map(|counts| counts.total),
output_text_item_count: counts.map(|counts| counts.text),
output_image_item_count: counts.map(|counts| counts.image),
},
},
))
}
ThreadItem::CollabAgentToolCall {
id,
tool,
status,
sender_thread_id,
receiver_thread_ids,
model,
reasoning_effort,
agents_states,
..
} => {
let (terminal_status, failure_kind) = collab_tool_call_outcome(status)?;
let base = tool_item_base(
thread_id,
turn_id,
id.clone(),
collab_agent_tool_name(tool).to_string(),
ToolItemOutcome {
terminal_status,
failure_kind,
execution_duration_ms: None,
},
ToolItemContext {
started_at_ms,
completed_at_ms,
connection_state,
thread_metadata,
},
);
Some(TrackEventRequest::CollabAgentToolCall(
CodexCollabAgentToolCallEventRequest {
event_type: "codex_collab_agent_tool_call_event",
event_params: CodexCollabAgentToolCallEventParams {
base,
sender_thread_id: sender_thread_id.clone(),
receiver_thread_count: usize_to_u64(receiver_thread_ids.len()),
receiver_thread_ids: Some(receiver_thread_ids.clone()),
requested_model: model.clone(),
requested_reasoning_effort: reasoning_effort
.as_ref()
.and_then(serialize_enum_as_string),
agent_state_count: Some(usize_to_u64(agents_states.len())),
completed_agent_count: Some(usize_to_u64(
agents_states
.values()
.filter(|state| state.status == CollabAgentStatus::Completed)
.count(),
)),
failed_agent_count: Some(usize_to_u64(
agents_states
.values()
.filter(|state| {
matches!(
state.status,
CollabAgentStatus::Errored
| CollabAgentStatus::Shutdown
| CollabAgentStatus::NotFound
)
})
.count(),
)),
},
},
))
}
ThreadItem::WebSearch { id, query, action } => {
let base = tool_item_base(
thread_id,
turn_id,
id.clone(),
"web_search".to_string(),
ToolItemOutcome {
terminal_status: ToolItemTerminalStatus::Completed,
failure_kind: None,
execution_duration_ms: None,
},
ToolItemContext {
started_at_ms,
completed_at_ms,
connection_state,
thread_metadata,
},
);
Some(TrackEventRequest::WebSearch(CodexWebSearchEventRequest {
event_type: "codex_web_search_event",
event_params: CodexWebSearchEventParams {
base,
web_search_action: action.as_ref().map(web_search_action_kind),
query_present: !query.trim().is_empty(),
query_count: web_search_query_count(query, action.as_ref()),
},
}))
}
ThreadItem::ImageGeneration {
id,
status,
revised_prompt,
saved_path,
..
} => {
let (terminal_status, failure_kind) = image_generation_outcome(status.as_str());
let base = tool_item_base(
thread_id,
turn_id,
id.clone(),
"image_generation".to_string(),
ToolItemOutcome {
terminal_status,
failure_kind,
execution_duration_ms: None,
},
ToolItemContext {
started_at_ms,
completed_at_ms,
connection_state,
thread_metadata,
},
);
Some(TrackEventRequest::ImageGeneration(
CodexImageGenerationEventRequest {
event_type: "codex_image_generation_event",
event_params: CodexImageGenerationEventParams {
base,
image_generation_status: status.clone(),
revised_prompt_present: revised_prompt.is_some(),
saved_path_present: saved_path.is_some(),
},
},
))
}
_ => None,
}
}
struct ToolItemOutcome {
terminal_status: ToolItemTerminalStatus,
failure_kind: Option<ToolItemFailureKind>,
execution_duration_ms: Option<u64>,
}
struct ToolItemContext<'a> {
started_at_ms: u64,
completed_at_ms: u64,
connection_state: &'a ConnectionState,
thread_metadata: &'a ThreadMetadataState,
}
fn tool_item_base(
thread_id: &str,
turn_id: &str,
item_id: String,
tool_name: String,
outcome: ToolItemOutcome,
context: ToolItemContext<'_>,
) -> CodexToolItemEventBase {
let thread_metadata = context.thread_metadata;
CodexToolItemEventBase {
thread_id: thread_id.to_string(),
turn_id: turn_id.to_string(),
item_id,
app_server_client: context.connection_state.app_server_client.clone(),
runtime: context.connection_state.runtime.clone(),
thread_source: thread_metadata.thread_source,
subagent_source: thread_metadata.subagent_source.clone(),
parent_thread_id: thread_metadata.parent_thread_id.clone(),
tool_name,
started_at_ms: context.started_at_ms,
completed_at_ms: Some(context.completed_at_ms),
duration_ms: context.completed_at_ms.checked_sub(context.started_at_ms),
execution_duration_ms: outcome.execution_duration_ms,
execution_started: true,
review_count: 0,
guardian_review_count: 0,
user_review_count: 0,
final_approval_outcome: ToolItemFinalApprovalOutcome::Unknown,
terminal_status: outcome.terminal_status,
failure_kind: outcome.failure_kind,
requested_additional_permissions: false,
requested_network_access: false,
retry_count: 0,
}
}
impl From<AppServerCommandExecutionSource> for CommandExecutionSource {
fn from(source: AppServerCommandExecutionSource) -> Self {
match source {
AppServerCommandExecutionSource::Agent => Self::Agent,
AppServerCommandExecutionSource::UserShell => Self::UserShell,
AppServerCommandExecutionSource::UnifiedExecStartup => Self::UnifiedExecStartup,
AppServerCommandExecutionSource::UnifiedExecInteraction => Self::UnifiedExecInteraction,
}
}
}
fn command_execution_family(source: AppServerCommandExecutionSource) -> CommandExecutionFamily {
match source {
AppServerCommandExecutionSource::Agent => CommandExecutionFamily::Shell,
AppServerCommandExecutionSource::UserShell => CommandExecutionFamily::UserShell,
AppServerCommandExecutionSource::UnifiedExecStartup
| AppServerCommandExecutionSource::UnifiedExecInteraction => {
CommandExecutionFamily::UnifiedExec
}
}
}
fn command_execution_tool_name(source: AppServerCommandExecutionSource) -> &'static str {
match source {
AppServerCommandExecutionSource::UnifiedExecStartup
| AppServerCommandExecutionSource::UnifiedExecInteraction => "unified_exec",
AppServerCommandExecutionSource::UserShell => "user_shell",
AppServerCommandExecutionSource::Agent => "shell",
}
}
fn command_execution_outcome(
status: &CommandExecutionStatus,
) -> Option<(ToolItemTerminalStatus, Option<ToolItemFailureKind>)> {
match status {
CommandExecutionStatus::InProgress => None,
CommandExecutionStatus::Completed => Some((ToolItemTerminalStatus::Completed, None)),
CommandExecutionStatus::Failed => Some((
ToolItemTerminalStatus::Failed,
Some(ToolItemFailureKind::ToolError),
)),
CommandExecutionStatus::Declined => Some((
ToolItemTerminalStatus::Rejected,
Some(ToolItemFailureKind::ApprovalDenied),
)),
}
}
fn patch_apply_outcome(
status: &PatchApplyStatus,
) -> Option<(ToolItemTerminalStatus, Option<ToolItemFailureKind>)> {
match status {
PatchApplyStatus::InProgress => None,
PatchApplyStatus::Completed => Some((ToolItemTerminalStatus::Completed, None)),
PatchApplyStatus::Failed => Some((
ToolItemTerminalStatus::Failed,
Some(ToolItemFailureKind::ToolError),
)),
PatchApplyStatus::Declined => Some((
ToolItemTerminalStatus::Rejected,
Some(ToolItemFailureKind::ApprovalDenied),
)),
}
}
fn mcp_tool_call_outcome(
status: &McpToolCallStatus,
) -> Option<(ToolItemTerminalStatus, Option<ToolItemFailureKind>)> {
match status {
McpToolCallStatus::InProgress => None,
McpToolCallStatus::Completed => Some((ToolItemTerminalStatus::Completed, None)),
McpToolCallStatus::Failed => Some((
ToolItemTerminalStatus::Failed,
Some(ToolItemFailureKind::ToolError),
)),
}
}
fn dynamic_tool_call_outcome(
status: &DynamicToolCallStatus,
) -> Option<(ToolItemTerminalStatus, Option<ToolItemFailureKind>)> {
match status {
DynamicToolCallStatus::InProgress => None,
DynamicToolCallStatus::Completed => Some((ToolItemTerminalStatus::Completed, None)),
DynamicToolCallStatus::Failed => Some((
ToolItemTerminalStatus::Failed,
Some(ToolItemFailureKind::ToolError),
)),
}
}
fn collab_tool_call_outcome(
status: &CollabAgentToolCallStatus,
) -> Option<(ToolItemTerminalStatus, Option<ToolItemFailureKind>)> {
match status {
CollabAgentToolCallStatus::InProgress => None,
CollabAgentToolCallStatus::Completed => Some((ToolItemTerminalStatus::Completed, None)),
CollabAgentToolCallStatus::Failed => Some((
ToolItemTerminalStatus::Failed,
Some(ToolItemFailureKind::ToolError),
)),
}
}
fn image_generation_outcome(status: &str) -> (ToolItemTerminalStatus, Option<ToolItemFailureKind>) {
match status {
"failed" | "error" => (
ToolItemTerminalStatus::Failed,
Some(ToolItemFailureKind::ToolError),
),
_ => (ToolItemTerminalStatus::Completed, None),
}
}
fn collab_agent_tool_name(tool: &CollabAgentTool) -> &'static str {
match tool {
CollabAgentTool::SpawnAgent => "spawn_agent",
CollabAgentTool::SendInput => "send_input",
CollabAgentTool::ResumeAgent => "resume_agent",
CollabAgentTool::Wait => "wait_agent",
CollabAgentTool::CloseAgent => "close_agent",
}
}
#[derive(Default)]
struct FileChangeCounts {
add: u64,
update: u64,
delete: u64,
move_: u64,
}
fn file_change_counts(changes: &[codex_app_server_protocol::FileUpdateChange]) -> FileChangeCounts {
let mut counts = FileChangeCounts::default();
for change in changes {
match &change.kind {
PatchChangeKind::Add => counts.add += 1,
PatchChangeKind::Delete => counts.delete += 1,
PatchChangeKind::Update { move_path: Some(_) } => counts.move_ += 1,
PatchChangeKind::Update { move_path: None } => counts.update += 1,
}
}
counts
}
#[derive(Clone, Copy)]
struct DynamicContentCounts {
total: u64,
text: u64,
image: u64,
}
fn dynamic_content_counts(items: &[DynamicToolCallOutputContentItem]) -> DynamicContentCounts {
let mut text = 0;
let mut image = 0;
for item in items {
match item {
DynamicToolCallOutputContentItem::InputText { .. } => text += 1,
DynamicToolCallOutputContentItem::InputImage { .. } => image += 1,
}
}
DynamicContentCounts {
total: usize_to_u64(items.len()),
text,
image,
}
}
fn web_search_action_kind(action: &WebSearchAction) -> WebSearchActionKind {
match action {
WebSearchAction::Search { .. } => WebSearchActionKind::Search,
WebSearchAction::OpenPage { .. } => WebSearchActionKind::OpenPage,
WebSearchAction::FindInPage { .. } => WebSearchActionKind::FindInPage,
WebSearchAction::Other => WebSearchActionKind::Other,
}
}
fn web_search_query_count(query: &str, action: Option<&WebSearchAction>) -> Option<u64> {
match action {
Some(WebSearchAction::Search { query, queries }) => queries
.as_ref()
.map(|queries| usize_to_u64(queries.len()))
.or_else(|| query.as_ref().map(|_| 1)),
Some(WebSearchAction::OpenPage { .. })
| Some(WebSearchAction::FindInPage { .. })
| Some(WebSearchAction::Other) => None,
None => (!query.trim().is_empty()).then_some(1),
}
}
fn codex_turn_event_params(
app_server_client: CodexAppServerClientMetadata,
runtime: CodexRuntimeMetadata,

View File

@@ -1134,6 +1134,7 @@ mod tests {
ServerNotification::ItemCompleted(codex_app_server_protocol::ItemCompletedNotification {
thread_id: "thread".to_string(),
turn_id: "turn".to_string(),
completed_at_ms: None,
item: codex_app_server_protocol::ThreadItem::AgentMessage {
id: "item".to_string(),
text: text.to_string(),
@@ -2007,6 +2008,7 @@ mod tests {
codex_app_server_protocol::ItemCompletedNotification {
thread_id: "thread".to_string(),
turn_id: "turn".to_string(),
completed_at_ms: None,
item: codex_app_server_protocol::ThreadItem::AgentMessage {
id: "item".to_string(),
text: "hello".to_string(),

View File

@@ -1932,6 +1932,14 @@
},
"ItemCompletedNotification": {
"properties": {
"completedAtMs": {
"description": "Unix timestamp (in milliseconds) when this item lifecycle completed, if known.",
"format": "int64",
"type": [
"integer",
"null"
]
},
"item": {
"$ref": "#/definitions/ThreadItem"
},
@@ -2030,6 +2038,14 @@
"item": {
"$ref": "#/definitions/ThreadItem"
},
"startedAtMs": {
"description": "Unix timestamp (in milliseconds) when this item lifecycle started, if known.",
"format": "int64",
"type": [
"integer",
"null"
]
},
"threadId": {
"type": "string"
},

View File

@@ -10109,6 +10109,14 @@
"ItemCompletedNotification": {
"$schema": "http://json-schema.org/draft-07/schema#",
"properties": {
"completedAtMs": {
"description": "Unix timestamp (in milliseconds) when this item lifecycle completed, if known.",
"format": "int64",
"type": [
"integer",
"null"
]
},
"item": {
"$ref": "#/definitions/v2/ThreadItem"
},
@@ -10213,6 +10221,14 @@
"item": {
"$ref": "#/definitions/v2/ThreadItem"
},
"startedAtMs": {
"description": "Unix timestamp (in milliseconds) when this item lifecycle started, if known.",
"format": "int64",
"type": [
"integer",
"null"
]
},
"threadId": {
"type": "string"
},

View File

@@ -6762,6 +6762,14 @@
"ItemCompletedNotification": {
"$schema": "http://json-schema.org/draft-07/schema#",
"properties": {
"completedAtMs": {
"description": "Unix timestamp (in milliseconds) when this item lifecycle completed, if known.",
"format": "int64",
"type": [
"integer",
"null"
]
},
"item": {
"$ref": "#/definitions/ThreadItem"
},
@@ -6866,6 +6874,14 @@
"item": {
"$ref": "#/definitions/ThreadItem"
},
"startedAtMs": {
"description": "Unix timestamp (in milliseconds) when this item lifecycle started, if known.",
"format": "int64",
"type": [
"integer",
"null"
]
},
"threadId": {
"type": "string"
},

View File

@@ -1370,6 +1370,14 @@
}
},
"properties": {
"completedAtMs": {
"description": "Unix timestamp (in milliseconds) when this item lifecycle completed, if known.",
"format": "int64",
"type": [
"integer",
"null"
]
},
"item": {
"$ref": "#/definitions/ThreadItem"
},

View File

@@ -1373,6 +1373,14 @@
"item": {
"$ref": "#/definitions/ThreadItem"
},
"startedAtMs": {
"description": "Unix timestamp (in milliseconds) when this item lifecycle started, if known.",
"format": "int64",
"type": [
"integer",
"null"
]
},
"threadId": {
"type": "string"
},

View File

@@ -3,4 +3,8 @@
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
import type { ThreadItem } from "./ThreadItem";
export type ItemCompletedNotification = { item: ThreadItem, threadId: string, turnId: string, };
export type ItemCompletedNotification = { item: ThreadItem, threadId: string, turnId: string,
/**
* Unix timestamp (in milliseconds) when this item lifecycle completed, if known.
*/
completedAtMs: number | null, };

View File

@@ -3,4 +3,8 @@
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
import type { ThreadItem } from "./ThreadItem";
export type ItemStartedNotification = { item: ThreadItem, threadId: string, turnId: string, };
export type ItemStartedNotification = { item: ThreadItem, threadId: string, turnId: string,
/**
* Unix timestamp (in milliseconds) when this item lifecycle started, if known.
*/
startedAtMs: number | null, };

View File

@@ -73,6 +73,7 @@ pub fn item_event_to_server_notification(
thread_id,
turn_id: response.turn_id,
item,
completed_at_ms: None,
})
}
EventMsg::McpToolCallBegin(begin_event) => {
@@ -91,6 +92,7 @@ pub fn item_event_to_server_notification(
thread_id,
turn_id,
item,
started_at_ms: None,
})
}
EventMsg::McpToolCallEnd(end_event) => {
@@ -131,6 +133,7 @@ pub fn item_event_to_server_notification(
thread_id,
turn_id,
item,
completed_at_ms: None,
})
}
EventMsg::CollabAgentSpawnBegin(begin_event) => {
@@ -149,6 +152,7 @@ pub fn item_event_to_server_notification(
thread_id,
turn_id,
item,
started_at_ms: None,
})
}
EventMsg::CollabAgentSpawnEnd(end_event) => {
@@ -187,6 +191,7 @@ pub fn item_event_to_server_notification(
thread_id,
turn_id,
item,
completed_at_ms: None,
})
}
EventMsg::CollabAgentInteractionBegin(begin_event) => {
@@ -206,6 +211,7 @@ pub fn item_event_to_server_notification(
thread_id,
turn_id,
item,
started_at_ms: None,
})
}
EventMsg::CollabAgentInteractionEnd(end_event) => {
@@ -233,6 +239,7 @@ pub fn item_event_to_server_notification(
thread_id,
turn_id,
item,
completed_at_ms: None,
})
}
EventMsg::CollabWaitingBegin(begin_event) => {
@@ -256,6 +263,7 @@ pub fn item_event_to_server_notification(
thread_id,
turn_id,
item,
started_at_ms: None,
})
}
EventMsg::CollabWaitingEnd(end_event) => {
@@ -291,6 +299,7 @@ pub fn item_event_to_server_notification(
thread_id,
turn_id,
item,
completed_at_ms: None,
})
}
EventMsg::CollabCloseBegin(begin_event) => {
@@ -309,6 +318,7 @@ pub fn item_event_to_server_notification(
thread_id,
turn_id,
item,
started_at_ms: None,
})
}
EventMsg::CollabCloseEnd(end_event) => {
@@ -341,6 +351,7 @@ pub fn item_event_to_server_notification(
thread_id,
turn_id,
item,
completed_at_ms: None,
})
}
EventMsg::CollabResumeBegin(begin_event) => {
@@ -359,6 +370,7 @@ pub fn item_event_to_server_notification(
thread_id,
turn_id,
item,
started_at_ms: None,
})
}
EventMsg::CollabResumeEnd(end_event) => {
@@ -391,6 +403,7 @@ pub fn item_event_to_server_notification(
thread_id,
turn_id,
item,
completed_at_ms: None,
})
}
EventMsg::AgentMessageContentDelta(event) => {
@@ -440,6 +453,7 @@ pub fn item_event_to_server_notification(
thread_id,
turn_id,
item: item_started_event.item.into(),
started_at_ms: item_started_event.started_at_ms,
})
}
EventMsg::ItemCompleted(item_completed_event) => {
@@ -447,6 +461,7 @@ pub fn item_event_to_server_notification(
thread_id,
turn_id,
item: item_completed_event.item.into(),
completed_at_ms: item_completed_event.completed_at_ms,
})
}
EventMsg::PatchApplyUpdated(event) => {
@@ -462,6 +477,7 @@ pub fn item_event_to_server_notification(
thread_id,
turn_id,
item: build_command_execution_begin_item(&exec_command_begin_event),
started_at_ms: None,
})
}
EventMsg::ExecCommandOutputDelta(exec_command_output_delta_event) => {
@@ -490,6 +506,7 @@ pub fn item_event_to_server_notification(
thread_id,
turn_id,
item: build_command_execution_end_item(&exec_command_end_event),
completed_at_ms: None,
})
}
_ => unreachable!("unsupported item event"),
@@ -564,6 +581,7 @@ mod tests {
ItemStartedNotification {
thread_id: "thread-1".to_string(),
turn_id: "turn-1".to_string(),
started_at_ms: None,
item: ThreadItem::CollabAgentToolCall {
id: event.call_id,
tool: CollabAgentTool::ResumeAgent,
@@ -601,6 +619,7 @@ mod tests {
ItemCompletedNotification {
thread_id: "thread-2".to_string(),
turn_id: "turn-2".to_string(),
completed_at_ms: None,
item: ThreadItem::CollabAgentToolCall {
id: event.call_id,
tool: CollabAgentTool::ResumeAgent,
@@ -643,6 +662,7 @@ mod tests {
ItemStartedNotification {
thread_id: "thread-1".to_string(),
turn_id: "turn_1".to_string(),
started_at_ms: None,
item: ThreadItem::McpToolCall {
id: begin_event.call_id,
server: begin_event.invocation.server,
@@ -680,6 +700,7 @@ mod tests {
ItemStartedNotification {
thread_id: "thread-2".to_string(),
turn_id: "turn_2".to_string(),
started_at_ms: None,
item: ThreadItem::McpToolCall {
id: begin_event.call_id,
server: begin_event.invocation.server,
@@ -732,6 +753,7 @@ mod tests {
ItemCompletedNotification {
thread_id: "thread-3".to_string(),
turn_id: "turn_3".to_string(),
completed_at_ms: None,
item: ThreadItem::McpToolCall {
id: end_event.call_id,
server: end_event.invocation.server,
@@ -777,6 +799,7 @@ mod tests {
ItemCompletedNotification {
thread_id: "thread-4".to_string(),
turn_id: "turn_4".to_string(),
completed_at_ms: None,
item: ThreadItem::McpToolCall {
id: end_event.call_id,
server: end_event.invocation.server,

View File

@@ -1354,6 +1354,7 @@ mod tests {
id: "user-item-id".to_string(),
content: Vec::new(),
}),
started_at_ms: None,
}),
EventMsg::TurnComplete(TurnCompleteEvent {
turn_id: turn_id.to_string(),

View File

@@ -6878,6 +6878,9 @@ pub struct ItemStartedNotification {
pub item: ThreadItem,
pub thread_id: String,
pub turn_id: String,
/// Unix timestamp (in milliseconds) when this item lifecycle started, if known.
#[ts(type = "number | null")]
pub started_at_ms: Option<i64>,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
@@ -6940,6 +6943,9 @@ pub struct ItemCompletedNotification {
pub item: ThreadItem,
pub thread_id: String,
pub turn_id: String,
/// Unix timestamp (in milliseconds) when this item lifecycle completed, if known.
#[ts(type = "number | null")]
pub completed_at_ms: Option<i64>,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]

View File

@@ -11,7 +11,6 @@ use crate::thread_state::TurnSummary;
use crate::thread_state::resolve_server_request_on_thread_listener;
use crate::thread_status::ThreadWatchActiveGuard;
use crate::thread_status::ThreadWatchManager;
use codex_analytics::AnalyticsEventsClient;
use codex_app_server_protocol::AccountRateLimitsUpdatedNotification;
use codex_app_server_protocol::AdditionalPermissionProfile as V2AdditionalPermissionProfile;
use codex_app_server_protocol::CodexErrorInfo as V2CodexErrorInfo;
@@ -139,7 +138,6 @@ pub(crate) async fn apply_bespoke_event_handling(
conversation_id: ThreadId,
conversation: Arc<CodexThread>,
thread_manager: Arc<ThreadManager>,
analytics_events_client: Option<AnalyticsEventsClient>,
outgoing: ThreadScopedOutgoingMessageSender,
thread_state: Arc<tokio::sync::Mutex<ThreadState>>,
thread_watch_manager: ThreadWatchManager,
@@ -194,7 +192,6 @@ pub(crate) async fn apply_bespoke_event_handling(
conversation_id,
event_turn_id,
turn_complete_event,
analytics_events_client.as_ref(),
&outgoing,
&thread_state,
)
@@ -817,6 +814,7 @@ pub(crate) async fn apply_bespoke_event_handling(
thread_id: conversation_id.to_string(),
turn_id: turn_id.clone(),
item,
started_at_ms: None,
};
outgoing
.send_server_notification(ServerNotification::ItemStarted(notification))
@@ -967,6 +965,7 @@ pub(crate) async fn apply_bespoke_event_handling(
thread_id: conversation_id.to_string(),
turn_id: event_turn_id.clone(),
item: item.clone(),
started_at_ms: None,
};
outgoing
.send_server_notification(ServerNotification::ItemStarted(started))
@@ -975,6 +974,7 @@ pub(crate) async fn apply_bespoke_event_handling(
thread_id: conversation_id.to_string(),
turn_id: event_turn_id.clone(),
item,
completed_at_ms: None,
};
outgoing
.send_server_notification(ServerNotification::ItemCompleted(completed))
@@ -1024,6 +1024,7 @@ pub(crate) async fn apply_bespoke_event_handling(
thread_id: conversation_id.to_string(),
turn_id: event_turn_id.clone(),
item: item.clone(),
started_at_ms: None,
};
outgoing
.send_server_notification(ServerNotification::ItemStarted(started))
@@ -1032,6 +1033,7 @@ pub(crate) async fn apply_bespoke_event_handling(
thread_id: conversation_id.to_string(),
turn_id: event_turn_id.clone(),
item,
completed_at_ms: None,
};
outgoing
.send_server_notification(ServerNotification::ItemCompleted(completed))
@@ -1130,7 +1132,6 @@ pub(crate) async fn apply_bespoke_event_handling(
conversation_id,
event_turn_id,
turn_aborted_event,
analytics_events_client.as_ref(),
&outgoing,
&thread_state,
)
@@ -1319,7 +1320,6 @@ async fn emit_turn_completed_with_status(
conversation_id: ThreadId,
event_turn_id: String,
turn_completion_metadata: TurnCompletionMetadata,
analytics_events_client: Option<&AnalyticsEventsClient>,
outgoing: &ThreadScopedOutgoingMessageSender,
) {
let notification = TurnCompletedNotification {
@@ -1334,10 +1334,6 @@ async fn emit_turn_completed_with_status(
duration_ms: turn_completion_metadata.duration_ms,
},
};
if let Some(analytics_events_client) = analytics_events_client {
analytics_events_client
.track_notification(ServerNotification::TurnCompleted(notification.clone()));
}
outgoing
.send_server_notification(ServerNotification::TurnCompleted(notification))
.await;
@@ -1378,6 +1374,7 @@ async fn start_command_execution_item(
exit_code: None,
duration_ms: None,
},
started_at_ms: None,
};
outgoing
.send_server_notification(ServerNotification::ItemStarted(notification))
@@ -1426,6 +1423,7 @@ async fn complete_command_execution_item(
thread_id: conversation_id.to_string(),
turn_id,
item,
completed_at_ms: None,
};
outgoing
.send_server_notification(ServerNotification::ItemCompleted(notification))
@@ -1480,6 +1478,7 @@ pub(crate) async fn maybe_emit_hook_prompt_item_completed(
.map(codex_app_server_protocol::HookPromptFragment::from)
.collect(),
},
completed_at_ms: None,
};
outgoing
.send_server_notification(ServerNotification::ItemCompleted(notification))
@@ -1498,7 +1497,6 @@ async fn handle_turn_complete(
conversation_id: ThreadId,
event_turn_id: String,
turn_complete_event: TurnCompleteEvent,
analytics_events_client: Option<&AnalyticsEventsClient>,
outgoing: &ThreadScopedOutgoingMessageSender,
thread_state: &Arc<Mutex<ThreadState>>,
) {
@@ -1519,7 +1517,6 @@ async fn handle_turn_complete(
completed_at: turn_complete_event.completed_at,
duration_ms: turn_complete_event.duration_ms,
},
analytics_events_client,
outgoing,
)
.await;
@@ -1529,7 +1526,6 @@ async fn handle_turn_interrupted(
conversation_id: ThreadId,
event_turn_id: String,
turn_aborted_event: TurnAbortedEvent,
analytics_events_client: Option<&AnalyticsEventsClient>,
outgoing: &ThreadScopedOutgoingMessageSender,
thread_state: &Arc<Mutex<ThreadState>>,
) {
@@ -1545,7 +1541,6 @@ async fn handle_turn_interrupted(
completed_at: turn_aborted_event.completed_at,
duration_ms: turn_aborted_event.duration_ms,
},
analytics_events_client,
outgoing,
)
.await;
@@ -2090,7 +2085,6 @@ mod tests {
use codex_app_server_protocol::GuardianApprovalReviewStatus;
use codex_app_server_protocol::JSONRPCErrorError;
use codex_app_server_protocol::TurnPlanStepStatus;
use codex_login::AuthManager;
use codex_login::CodexAuth;
use codex_protocol::items::HookPromptFragment;
use codex_protocol::items::build_hook_prompt_message;
@@ -2221,7 +2215,6 @@ mod tests {
outgoing: ThreadScopedOutgoingMessageSender,
thread_state: Arc<Mutex<ThreadState>>,
thread_watch_manager: ThreadWatchManager,
analytics_events_client: AnalyticsEventsClient,
codex_home: PathBuf,
}
@@ -2236,7 +2229,6 @@ mod tests {
self.conversation_id,
self.conversation.clone(),
self.thread_manager.clone(),
Some(self.analytics_events_client.clone()),
self.outgoing.clone(),
self.thread_state.clone(),
self.thread_watch_manager.clone(),
@@ -2570,13 +2562,6 @@ mod tests {
outgoing: outgoing.clone(),
thread_state: thread_state.clone(),
thread_watch_manager: thread_watch_manager.clone(),
analytics_events_client: AnalyticsEventsClient::new(
AuthManager::from_auth_for_testing(
CodexAuth::create_dummy_chatgpt_auth_for_testing(),
),
"http://localhost".to_string(),
Some(false),
),
codex_home: codex_home.path().to_path_buf(),
};
@@ -3156,7 +3141,6 @@ mod tests {
conversation_id,
event_turn_id.clone(),
turn_complete_event(&event_turn_id),
/*analytics_events_client*/ None,
&outgoing,
&thread_state,
)
@@ -3208,7 +3192,6 @@ mod tests {
conversation_id,
event_turn_id.clone(),
turn_aborted_event(&event_turn_id),
/*analytics_events_client*/ None,
&outgoing,
&thread_state,
)
@@ -3259,7 +3242,6 @@ mod tests {
conversation_id,
event_turn_id.clone(),
turn_complete_event(&event_turn_id),
/*analytics_events_client*/ None,
&outgoing,
&thread_state,
)
@@ -3494,7 +3476,6 @@ mod tests {
conversation_a,
a_turn1.clone(),
turn_complete_event(&a_turn1),
/*analytics_events_client*/ None,
&outgoing,
&thread_state,
)
@@ -3516,7 +3497,6 @@ mod tests {
conversation_b,
b_turn1.clone(),
turn_complete_event(&b_turn1),
/*analytics_events_client*/ None,
&outgoing,
&thread_state,
)
@@ -3528,7 +3508,6 @@ mod tests {
conversation_a,
a_turn2.clone(),
turn_complete_event(&a_turn2),
/*analytics_events_client*/ None,
&outgoing,
&thread_state,
)

View File

@@ -7756,7 +7756,6 @@ impl CodexMessageProcessor {
conversation_id,
conversation.clone(),
thread_manager.clone(),
Some(listener_task_context.analytics_events_client.clone()),
thread_outgoing,
thread_state.clone(),
thread_watch_manager.clone(),

View File

@@ -141,6 +141,9 @@ impl ThreadScopedOutgoingMessageSender {
}
pub(crate) async fn send_server_notification(&self, notification: ServerNotification) {
self.outgoing
.analytics_events_client
.track_notification(notification.clone());
if self.connection_ids.is_empty() {
return;
}
@@ -524,7 +527,7 @@ impl OutgoingMessageSender {
targeted_connections = connection_ids.len(),
"app-server event: {notification}"
);
let outgoing_message = OutgoingMessage::AppServerNotification(notification);
let outgoing_message = OutgoingMessage::AppServerNotification(notification.clone());
if connection_ids.is_empty() {
if let Err(err) = self
.sender
@@ -558,7 +561,7 @@ impl OutgoingMessageSender {
notification: ServerNotification,
) {
tracing::trace!("app-server event: {notification}");
let outgoing_message = OutgoingMessage::AppServerNotification(notification);
let outgoing_message = OutgoingMessage::AppServerNotification(notification.clone());
let (write_complete_tx, write_complete_rx) = oneshot::channel();
if let Err(err) = self
.sender

View File

@@ -41,6 +41,7 @@ use crate::session_prefix::format_subagent_notification_message;
use crate::skills::SkillRenderSideEffects;
use crate::skills_load_input_from_config;
use crate::turn_metadata::TurnMetadataState;
use crate::turn_timing::now_unix_timestamp_ms;
use async_channel::Receiver;
use async_channel::Sender;
use chrono::Local;
@@ -1641,6 +1642,7 @@ impl Session {
thread_id: self.conversation_id,
turn_id: turn_context.sub_id.clone(),
item: item.clone(),
started_at_ms: Some(now_unix_timestamp_ms()),
}),
)
.await;
@@ -1658,6 +1660,7 @@ impl Session {
thread_id: self.conversation_id,
turn_id: turn_context.sub_id.clone(),
item,
completed_at_ms: Some(now_unix_timestamp_ms()),
}),
)
.await;

View File

@@ -107,7 +107,7 @@ fn now_unix_timestamp_secs() -> i64 {
now_unix_timestamp_ms() / 1000
}
fn now_unix_timestamp_ms() -> i64 {
pub(crate) fn now_unix_timestamp_ms() -> i64 {
let duration = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default();

View File

@@ -303,6 +303,15 @@ async fn web_search_item_is_emitted() -> anyhow::Result<()> {
})
.await?;
let started = wait_for_event_match(&codex, |ev| match ev {
EventMsg::ItemStarted(ItemStartedEvent {
item: TurnItem::WebSearch(item),
started_at_ms,
..
}) => Some((item.clone(), *started_at_ms)),
_ => None,
})
.await;
let begin = wait_for_event_match(&codex, |ev| match ev {
EventMsg::WebSearchBegin(event) => Some(event.clone()),
_ => None,
@@ -311,16 +320,20 @@ async fn web_search_item_is_emitted() -> anyhow::Result<()> {
let completed = wait_for_event_match(&codex, |ev| match ev {
EventMsg::ItemCompleted(ItemCompletedEvent {
item: TurnItem::WebSearch(item),
completed_at_ms,
..
}) => Some(item.clone()),
}) => Some((item.clone(), *completed_at_ms)),
_ => None,
})
.await;
assert_eq!(begin.call_id, "web-search-1");
assert_eq!(completed.id, begin.call_id);
assert_eq!(started.0.id, begin.call_id);
assert!(started.1.is_some());
assert_eq!(completed.0.id, begin.call_id);
assert!(completed.1.is_some());
assert_eq!(
completed.action,
completed.0.action,
WebSearchAction::Search {
query: Some("weather seattle".to_string()),
queries: None,
@@ -369,11 +382,29 @@ async fn image_generation_call_event_is_emitted() -> anyhow::Result<()> {
})
.await?;
let started = wait_for_event_match(&codex, |ev| match ev {
EventMsg::ItemStarted(ItemStartedEvent {
item: TurnItem::ImageGeneration(item),
started_at_ms,
..
}) => Some((item.clone(), *started_at_ms)),
_ => None,
})
.await;
let begin = wait_for_event_match(&codex, |ev| match ev {
EventMsg::ImageGenerationBegin(event) => Some(event.clone()),
_ => None,
})
.await;
let completed = wait_for_event_match(&codex, |ev| match ev {
EventMsg::ItemCompleted(ItemCompletedEvent {
item: TurnItem::ImageGeneration(item),
completed_at_ms,
..
}) => Some((item.clone(), *completed_at_ms)),
_ => None,
})
.await;
let end = wait_for_event_match(&codex, |ev| match ev {
EventMsg::ImageGenerationEnd(event) => Some(event.clone()),
_ => None,
@@ -381,6 +412,10 @@ async fn image_generation_call_event_is_emitted() -> anyhow::Result<()> {
.await;
assert_eq!(begin.call_id, call_id);
assert_eq!(started.0.id, call_id);
assert!(started.1.is_some());
assert_eq!(completed.0.id, call_id);
assert!(completed.1.is_some());
assert_eq!(end.call_id, call_id);
assert_eq!(end.status, "completed");
assert_eq!(end.revised_prompt, Some("A tiny blue square".to_string()));

View File

@@ -20,6 +20,7 @@ fn failed_turn_does_not_overwrite_output_last_message_file() {
},
thread_id: "thread-1".to_string(),
turn_id: "turn-1".to_string(),
completed_at_ms: None,
},
));

View File

@@ -181,6 +181,7 @@ fn command_execution_started_and_completed_translate_to_thread_events() {
item: command_item,
thread_id: "thread-1".to_string(),
turn_id: "turn-1".to_string(),
started_at_ms: None,
}));
assert_eq!(
started,
@@ -216,6 +217,7 @@ fn command_execution_started_and_completed_translate_to_thread_events() {
},
thread_id: "thread-1".to_string(),
turn_id: "turn-1".to_string(),
completed_at_ms: None,
},
));
assert_eq!(
@@ -250,6 +252,7 @@ fn empty_reasoning_items_are_ignored() {
},
thread_id: "thread-1".to_string(),
turn_id: "turn-1".to_string(),
completed_at_ms: None,
},
));
@@ -274,6 +277,7 @@ fn unsupported_items_do_not_consume_synthetic_ids() {
},
thread_id: "thread-1".to_string(),
turn_id: "turn-1".to_string(),
completed_at_ms: None,
},
));
@@ -295,6 +299,7 @@ fn unsupported_items_do_not_consume_synthetic_ids() {
},
thread_id: "thread-1".to_string(),
turn_id: "turn-1".to_string(),
completed_at_ms: None,
},
));
@@ -327,6 +332,7 @@ fn reasoning_items_emit_summary_not_raw_content() {
},
thread_id: "thread-1".to_string(),
turn_id: "turn-1".to_string(),
completed_at_ms: None,
},
));
@@ -362,6 +368,7 @@ fn web_search_completion_preserves_query_and_action() {
},
thread_id: "thread-1".to_string(),
turn_id: "turn-1".to_string(),
completed_at_ms: None,
},
));
@@ -399,6 +406,7 @@ fn web_search_start_and_completion_reuse_item_id() {
},
thread_id: "thread-1".to_string(),
turn_id: "turn-1".to_string(),
started_at_ms: None,
}));
let completed = processor.collect_thread_events(ServerNotification::ItemCompleted(
@@ -413,6 +421,7 @@ fn web_search_start_and_completion_reuse_item_id() {
},
thread_id: "thread-1".to_string(),
turn_id: "turn-1".to_string(),
completed_at_ms: None,
},
));
@@ -472,6 +481,7 @@ fn mcp_tool_call_begin_and_end_emit_item_events() {
},
thread_id: "thread-1".to_string(),
turn_id: "turn-1".to_string(),
started_at_ms: None,
}));
let completed = processor.collect_thread_events(ServerNotification::ItemCompleted(
ItemCompletedNotification {
@@ -492,6 +502,7 @@ fn mcp_tool_call_begin_and_end_emit_item_events() {
},
thread_id: "thread-1".to_string(),
turn_id: "turn-1".to_string(),
completed_at_ms: None,
},
));
@@ -559,6 +570,7 @@ fn mcp_tool_call_failure_sets_failed_status() {
},
thread_id: "thread-1".to_string(),
turn_id: "turn-1".to_string(),
completed_at_ms: None,
},
));
@@ -604,6 +616,7 @@ fn mcp_tool_call_defaults_arguments_and_preserves_structured_content() {
},
thread_id: "thread-1".to_string(),
turn_id: "turn-1".to_string(),
started_at_ms: None,
}));
let completed = processor.collect_thread_events(ServerNotification::ItemCompleted(
ItemCompletedNotification {
@@ -627,6 +640,7 @@ fn mcp_tool_call_defaults_arguments_and_preserves_structured_content() {
},
thread_id: "thread-1".to_string(),
turn_id: "turn-1".to_string(),
completed_at_ms: None,
},
));
@@ -695,6 +709,7 @@ fn collab_spawn_begin_and_end_emit_item_events() {
},
thread_id: "thread-parent".to_string(),
turn_id: "turn-1".to_string(),
started_at_ms: None,
}));
let completed = processor.collect_thread_events(ServerNotification::ItemCompleted(
ItemCompletedNotification {
@@ -717,6 +732,7 @@ fn collab_spawn_begin_and_end_emit_item_events() {
},
thread_id: "thread-parent".to_string(),
turn_id: "turn-1".to_string(),
completed_at_ms: None,
},
));
@@ -795,6 +811,7 @@ fn file_change_completion_maps_change_kinds() {
},
thread_id: "thread-1".to_string(),
turn_id: "turn-1".to_string(),
completed_at_ms: None,
},
));
@@ -845,6 +862,7 @@ fn file_change_declined_maps_to_failed_status() {
},
thread_id: "thread-1".to_string(),
turn_id: "turn-1".to_string(),
completed_at_ms: None,
},
));
@@ -882,6 +900,7 @@ fn agent_message_item_updates_final_message() {
},
thread_id: "thread-1".to_string(),
turn_id: "turn-1".to_string(),
completed_at_ms: None,
},
));
@@ -916,6 +935,7 @@ fn agent_message_item_started_is_ignored() {
},
thread_id: "thread-1".to_string(),
turn_id: "turn-1".to_string(),
started_at_ms: None,
}));
assert_eq!(
@@ -940,6 +960,7 @@ fn reasoning_item_completed_uses_synthetic_id() {
},
thread_id: "thread-1".to_string(),
turn_id: "turn-1".to_string(),
completed_at_ms: None,
},
));
@@ -1296,6 +1317,7 @@ fn turn_completion_reconciles_started_items_from_turn_items() {
},
thread_id: "thread-1".to_string(),
turn_id: "turn-1".to_string(),
started_at_ms: None,
}));
assert_eq!(
started,
@@ -1378,6 +1400,7 @@ fn turn_completion_overwrites_stale_final_message_from_turn_items() {
},
thread_id: "thread-1".to_string(),
turn_id: "turn-1".to_string(),
completed_at_ms: None,
},
));
@@ -1426,6 +1449,7 @@ fn turn_completion_preserves_streamed_final_message_when_turn_items_are_empty()
},
thread_id: "thread-1".to_string(),
turn_id: "turn-1".to_string(),
completed_at_ms: None,
},
));
@@ -1470,6 +1494,7 @@ fn failed_turn_clears_stale_final_message() {
},
thread_id: "thread-1".to_string(),
turn_id: "turn-1".to_string(),
completed_at_ms: None,
},
));

View File

@@ -1828,6 +1828,9 @@ pub struct ItemStartedEvent {
pub thread_id: ThreadId,
pub turn_id: String,
pub item: TurnItem,
#[serde(default, skip_serializing_if = "Option::is_none")]
#[ts(optional)]
pub started_at_ms: Option<i64>,
}
impl HasLegacyEvent for ItemStartedEvent {
@@ -1853,6 +1856,9 @@ pub struct ItemCompletedEvent {
pub thread_id: ThreadId,
pub turn_id: String,
pub item: TurnItem,
#[serde(default, skip_serializing_if = "Option::is_none")]
#[ts(optional)]
pub completed_at_ms: Option<i64>,
}
pub trait HasLegacyEvent {
@@ -4592,6 +4598,7 @@ mod tests {
queries: None,
},
}),
started_at_ms: None,
};
let legacy_events = event.as_legacy_events(/*show_raw_agent_reasoning*/ false);
@@ -4608,6 +4615,7 @@ mod tests {
thread_id: ThreadId::new(),
turn_id: "turn-1".into(),
item: TurnItem::UserMessage(UserMessageItem::new(&[])),
started_at_ms: None,
};
assert!(
@@ -4629,6 +4637,7 @@ mod tests {
result: String::new(),
saved_path: None,
}),
started_at_ms: None,
};
let legacy_events = event.as_legacy_events(/*show_raw_agent_reasoning*/ false);
@@ -4659,6 +4668,7 @@ mod tests {
stdout: None,
stderr: None,
}),
started_at_ms: None,
};
let legacy_events = event.as_legacy_events(/*show_raw_agent_reasoning*/ false);
@@ -4686,6 +4696,7 @@ mod tests {
result: "Zm9v".into(),
saved_path: Some(test_path_buf("/tmp/ig-1.png").abs()),
}),
completed_at_ms: None,
};
let legacy_events = event.as_legacy_events(/*show_raw_agent_reasoning*/ false);
@@ -4725,6 +4736,7 @@ mod tests {
stdout: Some("Done!".into()),
stderr: Some(String::new()),
}),
completed_at_ms: None,
};
let legacy_events = event.as_legacy_events(/*show_raw_agent_reasoning*/ false);

View File

@@ -2597,6 +2597,7 @@ async fn inactive_thread_file_change_approval_recovers_buffered_changes() {
ServerNotification::ItemStarted(ItemStartedNotification {
thread_id: thread_id.to_string(),
turn_id: "turn-approval".to_string(),
started_at_ms: None,
item: ThreadItem::FileChange {
id: "patch-approval".to_string(),
changes: vec![FileUpdateChange {
@@ -4697,6 +4698,7 @@ async fn replace_chat_widget_reseeds_collab_agent_metadata_for_replay() {
codex_app_server_protocol::ItemStartedNotification {
thread_id: "thread-1".to_string(),
turn_id: "turn-1".to_string(),
started_at_ms: None,
item: ThreadItem::CollabAgentToolCall {
id: "wait-1".to_string(),
tool: codex_app_server_protocol::CollabAgentTool::Wait,

View File

@@ -16,6 +16,7 @@ async fn collab_spawn_end_shows_requested_model_and_effort() {
ServerNotification::ItemStarted(ItemStartedNotification {
thread_id: "thread-1".to_string(),
turn_id: "turn-1".to_string(),
started_at_ms: None,
item: AppServerThreadItem::CollabAgentToolCall {
id: "call-spawn".to_string(),
tool: AppServerCollabAgentTool::SpawnAgent,
@@ -34,6 +35,7 @@ async fn collab_spawn_end_shows_requested_model_and_effort() {
ServerNotification::ItemCompleted(ItemCompletedNotification {
thread_id: "thread-1".to_string(),
turn_id: "turn-1".to_string(),
completed_at_ms: None,
item: AppServerThreadItem::CollabAgentToolCall {
id: "call-spawn".to_string(),
tool: AppServerCollabAgentTool::SpawnAgent,
@@ -90,6 +92,7 @@ async fn live_app_server_user_message_item_completed_does_not_duplicate_rendered
ServerNotification::ItemCompleted(ItemCompletedNotification {
thread_id: "thread-1".to_string(),
turn_id: "turn-1".to_string(),
completed_at_ms: None,
item: AppServerThreadItem::UserMessage {
id: "user-1".to_string(),
content: vec![AppServerUserInput::Text {
@@ -135,6 +138,7 @@ async fn live_app_server_turn_completed_clears_working_status_after_answer_item(
ServerNotification::ItemCompleted(ItemCompletedNotification {
thread_id: "thread-1".to_string(),
turn_id: "turn-1".to_string(),
completed_at_ms: None,
item: AppServerThreadItem::AgentMessage {
id: "msg-1".to_string(),
text: "Yes. What do you need?".to_string(),
@@ -287,6 +291,7 @@ async fn live_app_server_file_change_item_started_preserves_changes() {
ServerNotification::ItemStarted(ItemStartedNotification {
thread_id: "thread-1".to_string(),
turn_id: "turn-1".to_string(),
started_at_ms: None,
item: AppServerThreadItem::FileChange {
id: "patch-1".to_string(),
changes: vec![FileUpdateChange {
@@ -320,6 +325,7 @@ async fn live_app_server_command_execution_strips_shell_wrapper() {
ServerNotification::ItemStarted(ItemStartedNotification {
thread_id: "thread-1".to_string(),
turn_id: "turn-1".to_string(),
started_at_ms: None,
item: AppServerThreadItem::CommandExecution {
id: "cmd-1".to_string(),
command: command.clone(),
@@ -341,6 +347,7 @@ async fn live_app_server_command_execution_strips_shell_wrapper() {
ServerNotification::ItemCompleted(ItemCompletedNotification {
thread_id: "thread-1".to_string(),
turn_id: "turn-1".to_string(),
completed_at_ms: None,
item: AppServerThreadItem::CommandExecution {
id: "cmd-1".to_string(),
command,
@@ -396,6 +403,7 @@ async fn live_app_server_collab_wait_items_render_history() {
ServerNotification::ItemStarted(ItemStartedNotification {
thread_id: "thread-1".to_string(),
turn_id: "turn-1".to_string(),
started_at_ms: None,
item: AppServerThreadItem::CollabAgentToolCall {
id: "wait-1".to_string(),
tool: AppServerCollabAgentTool::Wait,
@@ -418,6 +426,7 @@ async fn live_app_server_collab_wait_items_render_history() {
ServerNotification::ItemCompleted(ItemCompletedNotification {
thread_id: "thread-1".to_string(),
turn_id: "turn-1".to_string(),
completed_at_ms: None,
item: AppServerThreadItem::CollabAgentToolCall {
id: "wait-1".to_string(),
tool: AppServerCollabAgentTool::Wait,
@@ -471,6 +480,7 @@ async fn live_app_server_collab_spawn_completed_renders_requested_model_and_effo
ServerNotification::ItemStarted(ItemStartedNotification {
thread_id: "thread-1".to_string(),
turn_id: "turn-1".to_string(),
started_at_ms: None,
item: AppServerThreadItem::CollabAgentToolCall {
id: "spawn-1".to_string(),
tool: AppServerCollabAgentTool::SpawnAgent,
@@ -490,6 +500,7 @@ async fn live_app_server_collab_spawn_completed_renders_requested_model_and_effo
ServerNotification::ItemCompleted(ItemCompletedNotification {
thread_id: "thread-1".to_string(),
turn_id: "turn-1".to_string(),
completed_at_ms: None,
item: AppServerThreadItem::CollabAgentToolCall {
id: "spawn-1".to_string(),
tool: AppServerCollabAgentTool::SpawnAgent,

View File

@@ -654,6 +654,7 @@ pub(super) fn handle_agent_reasoning_final(chat: &mut ChatWidget) {
.last_turn_id
.clone()
.unwrap_or_else(|| "turn-1".to_string()),
completed_at_ms: None,
item: AppServerThreadItem::Reasoning {
id: "reasoning-1".to_string(),
summary: Vec::new(),
@@ -672,6 +673,7 @@ pub(super) fn handle_entered_review_mode(chat: &mut ChatWidget, review: impl Int
.last_turn_id
.clone()
.unwrap_or_else(|| "turn-1".to_string()),
started_at_ms: None,
item: AppServerThreadItem::EnteredReviewMode {
id: "review-start".to_string(),
review: review.into(),
@@ -700,6 +702,7 @@ pub(super) fn handle_exited_review_mode(chat: &mut ChatWidget) {
.last_turn_id
.clone()
.unwrap_or_else(|| "turn-1".to_string()),
completed_at_ms: None,
item: AppServerThreadItem::ExitedReviewMode {
id: "review-end".to_string(),
review: String::new(),
@@ -756,6 +759,7 @@ pub(super) fn handle_patch_apply_begin(
ServerNotification::ItemStarted(ItemStartedNotification {
thread_id: thread_id(chat),
turn_id: turn_id.into(),
started_at_ms: None,
item: AppServerThreadItem::FileChange {
id: call_id.into(),
changes: file_update_changes_from_tui(changes),
@@ -777,6 +781,7 @@ pub(super) fn handle_patch_apply_end(
ServerNotification::ItemCompleted(ItemCompletedNotification {
thread_id: thread_id(chat),
turn_id: turn_id.into(),
completed_at_ms: None,
item: AppServerThreadItem::FileChange {
id: call_id.into(),
changes: file_update_changes_from_tui(changes),
@@ -796,6 +801,7 @@ pub(super) fn handle_view_image_tool_call(
ServerNotification::ItemCompleted(ItemCompletedNotification {
thread_id: thread_id(chat),
turn_id: "turn-1".to_string(),
completed_at_ms: None,
item: AppServerThreadItem::ImageView {
id: call_id.into(),
path,
@@ -815,6 +821,7 @@ pub(super) fn handle_image_generation_end(
ServerNotification::ItemCompleted(ItemCompletedNotification {
thread_id: thread_id(chat),
turn_id: "turn-1".to_string(),
completed_at_ms: None,
item: AppServerThreadItem::ImageGeneration {
id: call_id.into(),
status: "completed".to_string(),
@@ -972,6 +979,7 @@ pub(super) fn handle_exec_begin(chat: &mut ChatWidget, item: AppServerThreadItem
.last_turn_id
.clone()
.unwrap_or_else(|| "turn-1".to_string()),
started_at_ms: None,
item,
}),
/*replay_kind*/ None,
@@ -1011,6 +1019,7 @@ pub(super) fn complete_assistant_message(
ServerNotification::ItemCompleted(ItemCompletedNotification {
thread_id: chat.thread_id.map(|id| id.to_string()).unwrap_or_default(),
turn_id: "turn-1".to_string(),
completed_at_ms: None,
item: AppServerThreadItem::AgentMessage {
id: item_id.to_string(),
text: text.to_string(),
@@ -1053,6 +1062,7 @@ pub(super) fn complete_user_message_for_inputs(
ServerNotification::ItemCompleted(ItemCompletedNotification {
thread_id: chat.thread_id.map(|id| id.to_string()).unwrap_or_default(),
turn_id: "turn-1".to_string(),
completed_at_ms: None,
item: AppServerThreadItem::UserMessage {
id: item_id.to_string(),
content,
@@ -1194,6 +1204,7 @@ pub(super) fn handle_exec_end(chat: &mut ChatWidget, item: AppServerThreadItem)
.last_turn_id
.clone()
.unwrap_or_else(|| "turn-1".to_string()),
completed_at_ms: None,
item,
}),
/*replay_kind*/ None,

View File

@@ -801,6 +801,7 @@ async fn live_reasoning_summary_is_not_rendered_twice_when_item_completes() {
ServerNotification::ItemCompleted(ItemCompletedNotification {
thread_id: "thread-1".to_string(),
turn_id: "turn-1".to_string(),
completed_at_ms: None,
item: AppServerThreadItem::Reasoning {
id: "reasoning-1".to_string(),
summary: vec!["Summary only".to_string()],

View File

@@ -154,6 +154,7 @@ async fn live_app_server_review_prompt_item_is_not_rendered() {
ServerNotification::ItemStarted(ItemStartedNotification {
thread_id: "thread-1".to_string(),
turn_id: "turn-1".to_string(),
started_at_ms: None,
item: review_mode_item.clone(),
}),
/*replay_kind*/ None,
@@ -166,6 +167,7 @@ async fn live_app_server_review_prompt_item_is_not_rendered() {
ServerNotification::ItemCompleted(ItemCompletedNotification {
thread_id: "thread-1".to_string(),
turn_id: "turn-1".to_string(),
completed_at_ms: None,
item: review_mode_item,
}),
/*replay_kind*/ None,
@@ -176,6 +178,7 @@ async fn live_app_server_review_prompt_item_is_not_rendered() {
ServerNotification::ItemCompleted(ItemCompletedNotification {
thread_id: "thread-1".to_string(),
turn_id: "turn-1".to_string(),
completed_at_ms: None,
item: AppServerThreadItem::UserMessage {
id: "review-prompt".to_string(),
content: vec![AppServerUserInput::Text {

View File

@@ -1149,6 +1149,7 @@ async fn slash_copy_state_tracks_plan_item_completion() {
ServerNotification::ItemCompleted(ItemCompletedNotification {
thread_id: String::new(),
turn_id: "turn-1".to_string(),
completed_at_ms: None,
item: AppServerThreadItem::Plan {
id: "plan-1".to_string(),
text: plan_text.clone(),