Compare commits

...

1 Commits

Author SHA1 Message Date
Mark Steinbrick
3d03e8583c Emit analytics events for client goal mutations 2026-05-27 18:09:13 -07:00
7 changed files with 561 additions and 8 deletions

View File

@@ -108,6 +108,13 @@ use codex_app_server_protocol::SessionSource as AppServerSessionSource;
use codex_app_server_protocol::Thread;
use codex_app_server_protocol::ThreadArchiveParams;
use codex_app_server_protocol::ThreadArchiveResponse;
use codex_app_server_protocol::ThreadGoal;
use codex_app_server_protocol::ThreadGoalClearParams;
use codex_app_server_protocol::ThreadGoalClearResponse;
use codex_app_server_protocol::ThreadGoalSetParams;
use codex_app_server_protocol::ThreadGoalSetResponse;
use codex_app_server_protocol::ThreadGoalStatus;
use codex_app_server_protocol::ThreadGoalUpdatedNotification;
use codex_app_server_protocol::ThreadItem;
use codex_app_server_protocol::ThreadResumeResponse;
use codex_app_server_protocol::ThreadSource as AppServerThreadSource;
@@ -266,6 +273,53 @@ fn sample_thread_resume_response_with_source(
})
}
fn sample_thread_goal(thread_id: &str, status: ThreadGoalStatus) -> ThreadGoal {
ThreadGoal {
thread_id: thread_id.to_string(),
objective: "sensitive objective excluded from analytics".to_string(),
status,
token_budget: Some(1000),
tokens_used: 250,
time_used_seconds: 10,
created_at: 1,
updated_at: 2,
}
}
fn sample_thread_goal_set_request(thread_id: &str, request_id: i64) -> ClientRequest {
ClientRequest::ThreadGoalSet {
request_id: RequestId::Integer(request_id),
params: ThreadGoalSetParams {
thread_id: thread_id.to_string(),
objective: Some("sensitive objective excluded from analytics".to_string()),
status: Some(ThreadGoalStatus::Blocked),
token_budget: Some(Some(1000)),
},
}
}
fn sample_thread_goal_set_response(
thread_id: &str,
status: ThreadGoalStatus,
) -> ClientResponsePayload {
ClientResponsePayload::ThreadGoalSet(ThreadGoalSetResponse {
goal: sample_thread_goal(thread_id, status),
})
}
fn sample_thread_goal_clear_request(thread_id: &str, request_id: i64) -> ClientRequest {
ClientRequest::ThreadGoalClear {
request_id: RequestId::Integer(request_id),
params: ThreadGoalClearParams {
thread_id: thread_id.to_string(),
},
}
}
fn sample_thread_goal_clear_response(cleared: bool) -> ClientResponsePayload {
ClientResponsePayload::ThreadGoalClear(ThreadGoalClearResponse { cleared })
}
fn sample_turn_start_request(thread_id: &str, request_id: i64) -> ClientRequest {
ClientRequest::TurnStart {
request_id: RequestId::Integer(request_id),
@@ -554,6 +608,27 @@ async fn ingest_initialize(reducer: &mut AnalyticsReducer, out: &mut Vec<TrackEv
.await;
}
async fn ingest_initialized_thread(
reducer: &mut AnalyticsReducer,
out: &mut Vec<TrackEventRequest>,
thread_id: &str,
) {
ingest_initialize(reducer, out).await;
reducer
.ingest(
AnalyticsFact::ClientResponse {
connection_id: 7,
request_id: RequestId::Integer(1),
response: Box::new(sample_thread_start_response(
thread_id, /*ephemeral*/ false, "gpt-5",
)),
},
out,
)
.await;
out.clear();
}
async fn ingest_turn_prerequisites(
reducer: &mut AnalyticsReducer,
out: &mut Vec<TrackEventRequest>,
@@ -4054,6 +4129,212 @@ async fn turn_completed_without_started_notification_emits_null_started_at() {
assert_eq!(payload["event_params"]["total_tokens"], json!(null));
}
#[tokio::test]
async fn goal_set_response_emits_event_for_initialized_thread() {
let mut reducer = AnalyticsReducer::default();
let mut out = Vec::new();
ingest_initialized_thread(&mut reducer, &mut out, "thread-2").await;
reducer
.ingest(
AnalyticsFact::ClientRequest {
connection_id: 7,
request_id: RequestId::Integer(2),
request: Box::new(sample_thread_goal_set_request(
"thread-2", /*request_id*/ 2,
)),
},
&mut out,
)
.await;
reducer
.ingest(
AnalyticsFact::ClientResponse {
connection_id: 7,
request_id: RequestId::Integer(2),
response: Box::new(sample_thread_goal_set_response(
"thread-2",
ThreadGoalStatus::Blocked,
)),
},
&mut out,
)
.await;
assert_eq!(out.len(), 1);
let payload = serde_json::to_value(&out[0]).expect("serialize goal event");
assert_eq!(payload["event_type"], json!("codex_goal_event"));
assert_eq!(payload["event_params"]["thread_id"], json!("thread-2"));
assert_eq!(
payload["event_params"]["session_id"],
json!("session-thread-2")
);
assert_eq!(payload["event_params"]["turn_id"], json!(null));
assert_eq!(payload["event_params"]["goal_event_kind"], json!("set"));
assert_eq!(
payload["event_params"]["goal_event_source"],
json!("app_server")
);
assert_eq!(payload["event_params"]["goal_status"], json!("blocked"));
assert!(payload["event_params"].get("objective").is_none());
assert!(payload["event_params"].get("token_budget").is_none());
assert!(payload["event_params"].get("tokens_used").is_none());
assert!(payload["event_params"].get("time_used_seconds").is_none());
}
#[tokio::test]
async fn goal_clear_emits_only_when_an_existing_goal_is_cleared() {
let mut reducer = AnalyticsReducer::default();
let mut out = Vec::new();
ingest_initialized_thread(&mut reducer, &mut out, "thread-2").await;
for (request_id, cleared) in [(2, false), (3, true)] {
reducer
.ingest(
AnalyticsFact::ClientRequest {
connection_id: 7,
request_id: RequestId::Integer(request_id),
request: Box::new(sample_thread_goal_clear_request("thread-2", request_id)),
},
&mut out,
)
.await;
reducer
.ingest(
AnalyticsFact::ClientResponse {
connection_id: 7,
request_id: RequestId::Integer(request_id),
response: Box::new(sample_thread_goal_clear_response(cleared)),
},
&mut out,
)
.await;
}
assert_eq!(out.len(), 1);
let payload = serde_json::to_value(&out[0]).expect("serialize goal clear event");
assert_eq!(payload["event_type"], json!("codex_goal_event"));
assert_eq!(payload["event_params"]["goal_event_kind"], json!("cleared"));
assert_eq!(
payload["event_params"]["goal_event_source"],
json!("app_server")
);
assert_eq!(payload["event_params"]["goal_status"], json!(null));
assert_eq!(payload["event_params"]["turn_id"], json!(null));
}
#[tokio::test]
async fn goal_set_error_discards_pending_request() {
let mut reducer = AnalyticsReducer::default();
let mut out = Vec::new();
ingest_initialized_thread(&mut reducer, &mut out, "thread-2").await;
reducer
.ingest(
AnalyticsFact::ClientRequest {
connection_id: 7,
request_id: RequestId::Integer(2),
request: Box::new(sample_thread_goal_set_request(
"thread-2", /*request_id*/ 2,
)),
},
&mut out,
)
.await;
reducer
.ingest(
AnalyticsFact::ErrorResponse {
connection_id: 7,
request_id: RequestId::Integer(2),
error: JSONRPCErrorError {
code: -32600,
message: "goal update failed".to_string(),
data: None,
},
error_type: None,
},
&mut out,
)
.await;
reducer
.ingest(
AnalyticsFact::ClientResponse {
connection_id: 7,
request_id: RequestId::Integer(2),
response: Box::new(sample_thread_goal_set_response(
"thread-2",
ThreadGoalStatus::Active,
)),
},
&mut out,
)
.await;
assert!(out.is_empty());
}
#[tokio::test]
async fn goal_events_require_initialized_context_and_ignore_notifications() {
let mut reducer = AnalyticsReducer::default();
let mut out = Vec::new();
ingest_initialize(&mut reducer, &mut out).await;
reducer
.ingest(
AnalyticsFact::ClientRequest {
connection_id: 7,
request_id: RequestId::Integer(2),
request: Box::new(sample_thread_goal_set_request(
"thread-2", /*request_id*/ 2,
)),
},
&mut out,
)
.await;
reducer
.ingest(
AnalyticsFact::ClientResponse {
connection_id: 7,
request_id: RequestId::Integer(2),
response: Box::new(sample_thread_goal_set_response(
"thread-2",
ThreadGoalStatus::Active,
)),
},
&mut out,
)
.await;
assert!(out.is_empty());
reducer
.ingest(
AnalyticsFact::ClientResponse {
connection_id: 7,
request_id: RequestId::Integer(3),
response: Box::new(sample_thread_start_response(
"thread-2", /*ephemeral*/ false, "gpt-5",
)),
},
&mut out,
)
.await;
out.clear();
reducer
.ingest(
AnalyticsFact::Notification(Box::new(ServerNotification::ThreadGoalUpdated(
ThreadGoalUpdatedNotification {
thread_id: "thread-2".to_string(),
turn_id: None,
goal: sample_thread_goal("thread-2", ThreadGoalStatus::Active),
},
))),
&mut out,
)
.await;
assert!(out.is_empty());
}
fn sample_plugin_metadata() -> PluginTelemetryMetadata {
PluginTelemetryMetadata {
plugin_id: PluginId::parse("sample@test").expect("valid plugin id"),

View File

@@ -197,7 +197,10 @@ impl AnalyticsEventsClient {
) {
if !matches!(
request,
ClientRequest::TurnStart { .. } | ClientRequest::TurnSteer { .. }
ClientRequest::TurnStart { .. }
| ClientRequest::TurnSteer { .. }
| ClientRequest::ThreadGoalSet { .. }
| ClientRequest::ThreadGoalClear { .. }
) {
return;
}
@@ -311,6 +314,8 @@ impl AnalyticsEventsClient {
| ClientResponsePayload::ThreadFork(_)
| ClientResponsePayload::TurnStart(_)
| ClientResponsePayload::TurnSteer(_)
| ClientResponsePayload::ThreadGoalSet(_)
| ClientResponsePayload::ThreadGoalClear(_)
) {
return;
}

View File

@@ -19,6 +19,12 @@ use codex_app_server_protocol::Thread;
use codex_app_server_protocol::ThreadArchiveParams;
use codex_app_server_protocol::ThreadArchiveResponse;
use codex_app_server_protocol::ThreadForkResponse;
use codex_app_server_protocol::ThreadGoal;
use codex_app_server_protocol::ThreadGoalClearParams;
use codex_app_server_protocol::ThreadGoalClearResponse;
use codex_app_server_protocol::ThreadGoalSetParams;
use codex_app_server_protocol::ThreadGoalSetResponse;
use codex_app_server_protocol::ThreadGoalStatus;
use codex_app_server_protocol::ThreadResumeResponse;
use codex_app_server_protocol::ThreadStartResponse;
use codex_app_server_protocol::ThreadStatus as AppServerThreadStatus;
@@ -117,6 +123,27 @@ fn sample_thread_archive_request() -> ClientRequest {
}
}
fn sample_thread_goal_set_request() -> ClientRequest {
ClientRequest::ThreadGoalSet {
request_id: RequestId::Integer(3),
params: ThreadGoalSetParams {
thread_id: "thread-1".to_string(),
objective: Some("finish task".to_string()),
status: Some(ThreadGoalStatus::Active),
token_budget: None,
},
}
}
fn sample_thread_goal_clear_request() -> ClientRequest {
ClientRequest::ThreadGoalClear {
request_id: RequestId::Integer(4),
params: ThreadGoalClearParams {
thread_id: "thread-1".to_string(),
},
}
}
fn sample_thread(thread_id: &str) -> Thread {
Thread {
id: thread_id.to_string(),
@@ -213,6 +240,25 @@ fn sample_turn_steer_response() -> ClientResponsePayload {
})
}
fn sample_thread_goal_set_response() -> ClientResponsePayload {
ClientResponsePayload::ThreadGoalSet(ThreadGoalSetResponse {
goal: ThreadGoal {
thread_id: "thread-1".to_string(),
objective: "finish task".to_string(),
status: ThreadGoalStatus::Active,
token_budget: None,
tokens_used: 0,
time_used_seconds: 0,
created_at: 1,
updated_at: 1,
},
})
}
fn sample_thread_goal_clear_response() -> ClientResponsePayload {
ClientResponsePayload::ThreadGoalClear(ThreadGoalClearResponse { cleared: true })
}
#[test]
fn track_request_only_enqueues_analytics_relevant_requests() {
let (client, mut receiver) = client_with_receiver();
@@ -220,6 +266,8 @@ fn track_request_only_enqueues_analytics_relevant_requests() {
for (request_id, request) in [
(RequestId::Integer(1), sample_turn_start_request()),
(RequestId::Integer(2), sample_turn_steer_request()),
(RequestId::Integer(3), sample_thread_goal_set_request()),
(RequestId::Integer(4), sample_thread_goal_clear_request()),
] {
client.track_request(/*connection_id*/ 7, request_id, &request);
assert!(matches!(
@@ -231,7 +279,7 @@ fn track_request_only_enqueues_analytics_relevant_requests() {
let ignored_request = sample_thread_archive_request();
client.track_request(
/*connection_id*/ 7,
RequestId::Integer(3),
RequestId::Integer(5),
&ignored_request,
);
assert!(matches!(receiver.try_recv(), Err(TryRecvError::Empty)));
@@ -247,6 +295,8 @@ fn track_response_only_enqueues_analytics_relevant_responses() {
(RequestId::Integer(3), sample_thread_fork_response()),
(RequestId::Integer(4), sample_turn_start_response()),
(RequestId::Integer(5), sample_turn_steer_response()),
(RequestId::Integer(6), sample_thread_goal_set_response()),
(RequestId::Integer(7), sample_thread_goal_clear_response()),
] {
client.track_response(/*connection_id*/ 7, request_id, response);
assert!(matches!(
@@ -257,7 +307,7 @@ fn track_response_only_enqueues_analytics_relevant_responses() {
client.track_response(
/*connection_id*/ 7,
RequestId::Integer(6),
RequestId::Integer(8),
ClientResponsePayload::ThreadArchive(ThreadArchiveResponse {}),
);
assert!(matches!(receiver.try_recv(), Err(TryRecvError::Empty)));

View File

@@ -62,6 +62,7 @@ pub(crate) enum TrackEventRequest {
AppUsed(CodexAppUsedEventRequest),
HookRun(CodexHookRunEventRequest),
Compaction(Box<CodexCompactionEventRequest>),
Goal(Box<CodexGoalEventRequest>),
TurnEvent(Box<CodexTurnEventRequest>),
TurnSteer(CodexTurnSteerEventRequest),
CommandExecution(CodexCommandExecutionEventRequest),
@@ -767,6 +768,51 @@ pub(crate) struct CodexCompactionEventRequest {
pub(crate) event_params: CodexCompactionEventParams,
}
#[derive(Clone, Copy, Debug, Serialize)]
#[serde(rename_all = "snake_case")]
pub(crate) enum GoalEventKind {
Set,
Cleared,
}
#[derive(Clone, Copy, Debug, Serialize)]
#[serde(rename_all = "snake_case")]
pub(crate) enum GoalEventSource {
AppServer,
}
#[derive(Clone, Copy, Debug, Serialize)]
#[serde(rename_all = "snake_case")]
pub(crate) enum GoalStatus {
Active,
Paused,
Blocked,
UsageLimited,
BudgetLimited,
Complete,
}
#[derive(Serialize)]
pub(crate) struct CodexGoalEventParams {
pub(crate) thread_id: String,
pub(crate) session_id: String,
pub(crate) turn_id: Option<String>,
pub(crate) app_server_client: CodexAppServerClientMetadata,
pub(crate) runtime: CodexRuntimeMetadata,
pub(crate) thread_source: Option<ThreadSource>,
pub(crate) subagent_source: Option<String>,
pub(crate) parent_thread_id: Option<String>,
pub(crate) goal_event_kind: GoalEventKind,
pub(crate) goal_event_source: GoalEventSource,
pub(crate) goal_status: Option<GoalStatus>,
}
#[derive(Serialize)]
pub(crate) struct CodexGoalEventRequest {
pub(crate) event_type: &'static str,
pub(crate) event_params: CodexGoalEventParams,
}
#[derive(Serialize)]
pub(crate) struct CodexTurnEventParams {
pub(crate) thread_id: String,

View File

@@ -15,6 +15,8 @@ use crate::events::CodexDynamicToolCallEventParams;
use crate::events::CodexDynamicToolCallEventRequest;
use crate::events::CodexFileChangeEventParams;
use crate::events::CodexFileChangeEventRequest;
use crate::events::CodexGoalEventParams;
use crate::events::CodexGoalEventRequest;
use crate::events::CodexHookRunEventRequest;
use crate::events::CodexImageGenerationEventParams;
use crate::events::CodexImageGenerationEventRequest;
@@ -33,6 +35,9 @@ use crate::events::CodexTurnSteerEventRequest;
use crate::events::CodexWebSearchEventParams;
use crate::events::CodexWebSearchEventRequest;
use crate::events::FinalApprovalOutcome;
use crate::events::GoalEventKind;
use crate::events::GoalEventSource;
use crate::events::GoalStatus;
use crate::events::GuardianReviewEventParams;
use crate::events::GuardianReviewEventPayload;
use crate::events::GuardianReviewEventRequest;
@@ -105,6 +110,7 @@ use codex_app_server_protocol::RequestPermissionProfile;
use codex_app_server_protocol::ServerNotification;
use codex_app_server_protocol::ServerRequest;
use codex_app_server_protocol::ServerResponse;
use codex_app_server_protocol::ThreadGoalStatus as AppServerThreadGoalStatus;
use codex_app_server_protocol::ThreadItem;
use codex_app_server_protocol::TurnSteerResponse;
use codex_app_server_protocol::UserInput;
@@ -212,6 +218,16 @@ impl<'a> AnalyticsDropSite<'a> {
}
}
fn goal(thread_id: &'a str) -> Self {
Self {
event_name: "goal",
thread_id,
turn_id: None,
review_id: None,
item_id: None,
}
}
fn turn(thread_id: &'a str, turn_id: &'a str) -> Self {
Self {
event_name: "turn",
@@ -295,6 +311,8 @@ impl ThreadMetadataState {
enum RequestState {
TurnStart(PendingTurnStartState),
TurnSteer(PendingTurnSteerState),
GoalSet { thread_id: String },
GoalClear { thread_id: String },
}
struct PendingTurnStartState {
@@ -592,6 +610,22 @@ impl AnalyticsReducer {
}),
);
}
ClientRequest::ThreadGoalSet { params, .. } => {
self.requests.insert(
(connection_id, request_id),
RequestState::GoalSet {
thread_id: params.thread_id,
},
);
}
ClientRequest::ThreadGoalClear { params, .. } => {
self.requests.insert(
(connection_id, request_id),
RequestState::GoalClear {
thread_id: params.thread_id,
},
);
}
_ => {}
}
}
@@ -816,6 +850,42 @@ impl AnalyticsReducer {
} => {
self.ingest_turn_steer_response(connection_id, request_id, response, out);
}
ClientResponse::ThreadGoalSet {
request_id,
response,
} => {
let Some(RequestState::GoalSet { thread_id }) =
self.requests.remove(&(connection_id, request_id))
else {
return;
};
self.emit_goal_event(
connection_id,
thread_id,
GoalEventKind::Set,
Some(analytics_goal_status(response.goal.status)),
out,
);
}
ClientResponse::ThreadGoalClear {
request_id,
response,
} => {
let Some(RequestState::GoalClear { thread_id }) =
self.requests.remove(&(connection_id, request_id))
else {
return;
};
if response.cleared {
self.emit_goal_event(
connection_id,
thread_id,
GoalEventKind::Cleared,
/*goal_status*/ None,
out,
);
}
}
_ => {}
}
}
@@ -1051,6 +1121,7 @@ impl AnalyticsReducer {
out,
);
}
RequestState::GoalSet { .. } | RequestState::GoalClear { .. } => {}
}
}
@@ -1404,6 +1475,44 @@ impl AnalyticsReducer {
}));
}
fn emit_goal_event(
&self,
connection_id: u64,
thread_id: String,
goal_event_kind: GoalEventKind,
goal_status: Option<GoalStatus>,
out: &mut Vec<TrackEventRequest>,
) {
let Some(connection_state) = self.connections.get(&connection_id) else {
return;
};
let drop_site = AnalyticsDropSite::goal(&thread_id);
let Some(thread_metadata) = self
.threads
.get(drop_site.thread_id)
.and_then(|thread| thread.metadata.as_ref())
else {
warn_missing_analytics_context(&drop_site, MissingAnalyticsContext::ThreadMetadata);
return;
};
out.push(TrackEventRequest::Goal(Box::new(CodexGoalEventRequest {
event_type: "codex_goal_event",
event_params: CodexGoalEventParams {
thread_id,
session_id: thread_metadata.session_id.clone(),
turn_id: None,
app_server_client: connection_state.app_server_client.clone(),
runtime: connection_state.runtime.clone(),
thread_source: thread_metadata.thread_source,
subagent_source: thread_metadata.subagent_source.clone(),
parent_thread_id: thread_metadata.parent_thread_id.clone(),
goal_event_kind,
goal_event_source: GoalEventSource::AppServer,
goal_status,
},
})));
}
fn emit_review_event(
&mut self,
pending_review: PendingReviewState,
@@ -2571,6 +2680,17 @@ fn analytics_turn_status(status: codex_app_server_protocol::TurnStatus) -> Optio
}
}
fn analytics_goal_status(status: AppServerThreadGoalStatus) -> GoalStatus {
match status {
AppServerThreadGoalStatus::Active => GoalStatus::Active,
AppServerThreadGoalStatus::Paused => GoalStatus::Paused,
AppServerThreadGoalStatus::Blocked => GoalStatus::Blocked,
AppServerThreadGoalStatus::UsageLimited => GoalStatus::UsageLimited,
AppServerThreadGoalStatus::BudgetLimited => GoalStatus::BudgetLimited,
AppServerThreadGoalStatus::Complete => GoalStatus::Complete,
}
}
fn num_input_images(input: &[UserInput]) -> usize {
input
.iter()

View File

@@ -124,6 +124,30 @@ pub(crate) async fn wait_for_analytics_event(
server: &MockServer,
read_timeout: Duration,
event_type: &str,
) -> Result<Value> {
wait_for_matching_analytics_event(server, read_timeout, |event| {
event["event_type"] == event_type
})
.await
}
pub(crate) async fn wait_for_analytics_event_param(
server: &MockServer,
read_timeout: Duration,
event_type: &str,
param_name: &str,
param_value: &str,
) -> Result<Value> {
wait_for_matching_analytics_event(server, read_timeout, |event| {
event["event_type"] == event_type && event["event_params"][param_name] == param_value
})
.await
}
async fn wait_for_matching_analytics_event(
server: &MockServer,
read_timeout: Duration,
matches_event: impl Fn(&Value) -> bool,
) -> Result<Value> {
timeout(read_timeout, async {
loop {
@@ -142,10 +166,7 @@ pub(crate) async fn wait_for_analytics_event(
let Some(events) = payload["events"].as_array() else {
continue;
};
if let Some(event) = events
.iter()
.find(|event| event["event_type"] == event_type)
{
if let Some(event) = events.iter().find(|event| matches_event(event)) {
return Ok::<Value, anyhow::Error>(event.clone());
}
}

View File

@@ -96,6 +96,7 @@ use wiremock::matchers::path;
use super::analytics::assert_basic_thread_initialized_event;
use super::analytics::mount_analytics_capture;
use super::analytics::thread_initialized_event;
use super::analytics::wait_for_analytics_event_param;
use super::analytics::wait_for_analytics_payload;
#[cfg(windows)]
@@ -1165,7 +1166,8 @@ async fn thread_goal_set_edits_objective_without_resetting_usage() -> Result<()>
async fn thread_goal_clear_deletes_goal_and_notifies() -> Result<()> {
let server = create_mock_responses_server_repeating_assistant("Done").await;
let codex_home = TempDir::new()?;
create_config_toml(codex_home.path(), &server.uri())?;
create_config_toml_with_chatgpt_base_url(codex_home.path(), &server.uri(), &server.uri())?;
mount_analytics_capture(&server, codex_home.path()).await?;
let config_path = codex_home.path().join("config.toml");
let config = std::fs::read_to_string(&config_path)?;
std::fs::write(
@@ -1230,6 +1232,19 @@ async fn thread_goal_clear_deletes_goal_and_notifies() -> Result<()> {
mcp.read_stream_until_notification_message("thread/goal/updated"),
)
.await??;
let set_event = wait_for_analytics_event_param(
&server,
DEFAULT_READ_TIMEOUT,
"codex_goal_event",
"goal_event_kind",
"set",
)
.await?;
assert_eq!(set_event["event_params"]["thread_id"], thread.id);
assert_eq!(set_event["event_params"]["session_id"], thread.session_id);
assert_eq!(set_event["event_params"]["turn_id"], json!(null));
assert_eq!(set_event["event_params"]["goal_event_source"], "app_server");
assert_eq!(set_event["event_params"]["goal_status"], "active");
let clear_id = mcp
.send_raw_request(
@@ -1252,6 +1267,21 @@ async fn thread_goal_clear_deletes_goal_and_notifies() -> Result<()> {
mcp.read_stream_until_notification_message("thread/goal/cleared"),
)
.await??;
let cleared_event = wait_for_analytics_event_param(
&server,
DEFAULT_READ_TIMEOUT,
"codex_goal_event",
"goal_event_kind",
"cleared",
)
.await?;
assert_eq!(cleared_event["event_params"]["thread_id"], thread.id);
assert_eq!(
cleared_event["event_params"]["session_id"],
thread.session_id
);
assert_eq!(cleared_event["event_params"]["turn_id"], json!(null));
assert_eq!(cleared_event["event_params"]["goal_status"], json!(null));
let get_id = mcp
.send_raw_request(