mirror of
https://github.com/openai/codex.git
synced 2026-06-02 03:11:59 +00:00
Compare commits
1 Commits
starr/mcp-
...
mark.stein
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3d03e8583c |
@@ -108,6 +108,13 @@ use codex_app_server_protocol::SessionSource as AppServerSessionSource;
|
||||
use codex_app_server_protocol::Thread;
|
||||
use codex_app_server_protocol::ThreadArchiveParams;
|
||||
use codex_app_server_protocol::ThreadArchiveResponse;
|
||||
use codex_app_server_protocol::ThreadGoal;
|
||||
use codex_app_server_protocol::ThreadGoalClearParams;
|
||||
use codex_app_server_protocol::ThreadGoalClearResponse;
|
||||
use codex_app_server_protocol::ThreadGoalSetParams;
|
||||
use codex_app_server_protocol::ThreadGoalSetResponse;
|
||||
use codex_app_server_protocol::ThreadGoalStatus;
|
||||
use codex_app_server_protocol::ThreadGoalUpdatedNotification;
|
||||
use codex_app_server_protocol::ThreadItem;
|
||||
use codex_app_server_protocol::ThreadResumeResponse;
|
||||
use codex_app_server_protocol::ThreadSource as AppServerThreadSource;
|
||||
@@ -266,6 +273,53 @@ fn sample_thread_resume_response_with_source(
|
||||
})
|
||||
}
|
||||
|
||||
fn sample_thread_goal(thread_id: &str, status: ThreadGoalStatus) -> ThreadGoal {
|
||||
ThreadGoal {
|
||||
thread_id: thread_id.to_string(),
|
||||
objective: "sensitive objective excluded from analytics".to_string(),
|
||||
status,
|
||||
token_budget: Some(1000),
|
||||
tokens_used: 250,
|
||||
time_used_seconds: 10,
|
||||
created_at: 1,
|
||||
updated_at: 2,
|
||||
}
|
||||
}
|
||||
|
||||
fn sample_thread_goal_set_request(thread_id: &str, request_id: i64) -> ClientRequest {
|
||||
ClientRequest::ThreadGoalSet {
|
||||
request_id: RequestId::Integer(request_id),
|
||||
params: ThreadGoalSetParams {
|
||||
thread_id: thread_id.to_string(),
|
||||
objective: Some("sensitive objective excluded from analytics".to_string()),
|
||||
status: Some(ThreadGoalStatus::Blocked),
|
||||
token_budget: Some(Some(1000)),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
fn sample_thread_goal_set_response(
|
||||
thread_id: &str,
|
||||
status: ThreadGoalStatus,
|
||||
) -> ClientResponsePayload {
|
||||
ClientResponsePayload::ThreadGoalSet(ThreadGoalSetResponse {
|
||||
goal: sample_thread_goal(thread_id, status),
|
||||
})
|
||||
}
|
||||
|
||||
fn sample_thread_goal_clear_request(thread_id: &str, request_id: i64) -> ClientRequest {
|
||||
ClientRequest::ThreadGoalClear {
|
||||
request_id: RequestId::Integer(request_id),
|
||||
params: ThreadGoalClearParams {
|
||||
thread_id: thread_id.to_string(),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
fn sample_thread_goal_clear_response(cleared: bool) -> ClientResponsePayload {
|
||||
ClientResponsePayload::ThreadGoalClear(ThreadGoalClearResponse { cleared })
|
||||
}
|
||||
|
||||
fn sample_turn_start_request(thread_id: &str, request_id: i64) -> ClientRequest {
|
||||
ClientRequest::TurnStart {
|
||||
request_id: RequestId::Integer(request_id),
|
||||
@@ -554,6 +608,27 @@ async fn ingest_initialize(reducer: &mut AnalyticsReducer, out: &mut Vec<TrackEv
|
||||
.await;
|
||||
}
|
||||
|
||||
async fn ingest_initialized_thread(
|
||||
reducer: &mut AnalyticsReducer,
|
||||
out: &mut Vec<TrackEventRequest>,
|
||||
thread_id: &str,
|
||||
) {
|
||||
ingest_initialize(reducer, out).await;
|
||||
reducer
|
||||
.ingest(
|
||||
AnalyticsFact::ClientResponse {
|
||||
connection_id: 7,
|
||||
request_id: RequestId::Integer(1),
|
||||
response: Box::new(sample_thread_start_response(
|
||||
thread_id, /*ephemeral*/ false, "gpt-5",
|
||||
)),
|
||||
},
|
||||
out,
|
||||
)
|
||||
.await;
|
||||
out.clear();
|
||||
}
|
||||
|
||||
async fn ingest_turn_prerequisites(
|
||||
reducer: &mut AnalyticsReducer,
|
||||
out: &mut Vec<TrackEventRequest>,
|
||||
@@ -4054,6 +4129,212 @@ async fn turn_completed_without_started_notification_emits_null_started_at() {
|
||||
assert_eq!(payload["event_params"]["total_tokens"], json!(null));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn goal_set_response_emits_event_for_initialized_thread() {
|
||||
let mut reducer = AnalyticsReducer::default();
|
||||
let mut out = Vec::new();
|
||||
|
||||
ingest_initialized_thread(&mut reducer, &mut out, "thread-2").await;
|
||||
reducer
|
||||
.ingest(
|
||||
AnalyticsFact::ClientRequest {
|
||||
connection_id: 7,
|
||||
request_id: RequestId::Integer(2),
|
||||
request: Box::new(sample_thread_goal_set_request(
|
||||
"thread-2", /*request_id*/ 2,
|
||||
)),
|
||||
},
|
||||
&mut out,
|
||||
)
|
||||
.await;
|
||||
reducer
|
||||
.ingest(
|
||||
AnalyticsFact::ClientResponse {
|
||||
connection_id: 7,
|
||||
request_id: RequestId::Integer(2),
|
||||
response: Box::new(sample_thread_goal_set_response(
|
||||
"thread-2",
|
||||
ThreadGoalStatus::Blocked,
|
||||
)),
|
||||
},
|
||||
&mut out,
|
||||
)
|
||||
.await;
|
||||
|
||||
assert_eq!(out.len(), 1);
|
||||
let payload = serde_json::to_value(&out[0]).expect("serialize goal event");
|
||||
assert_eq!(payload["event_type"], json!("codex_goal_event"));
|
||||
assert_eq!(payload["event_params"]["thread_id"], json!("thread-2"));
|
||||
assert_eq!(
|
||||
payload["event_params"]["session_id"],
|
||||
json!("session-thread-2")
|
||||
);
|
||||
assert_eq!(payload["event_params"]["turn_id"], json!(null));
|
||||
assert_eq!(payload["event_params"]["goal_event_kind"], json!("set"));
|
||||
assert_eq!(
|
||||
payload["event_params"]["goal_event_source"],
|
||||
json!("app_server")
|
||||
);
|
||||
assert_eq!(payload["event_params"]["goal_status"], json!("blocked"));
|
||||
assert!(payload["event_params"].get("objective").is_none());
|
||||
assert!(payload["event_params"].get("token_budget").is_none());
|
||||
assert!(payload["event_params"].get("tokens_used").is_none());
|
||||
assert!(payload["event_params"].get("time_used_seconds").is_none());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn goal_clear_emits_only_when_an_existing_goal_is_cleared() {
|
||||
let mut reducer = AnalyticsReducer::default();
|
||||
let mut out = Vec::new();
|
||||
|
||||
ingest_initialized_thread(&mut reducer, &mut out, "thread-2").await;
|
||||
for (request_id, cleared) in [(2, false), (3, true)] {
|
||||
reducer
|
||||
.ingest(
|
||||
AnalyticsFact::ClientRequest {
|
||||
connection_id: 7,
|
||||
request_id: RequestId::Integer(request_id),
|
||||
request: Box::new(sample_thread_goal_clear_request("thread-2", request_id)),
|
||||
},
|
||||
&mut out,
|
||||
)
|
||||
.await;
|
||||
reducer
|
||||
.ingest(
|
||||
AnalyticsFact::ClientResponse {
|
||||
connection_id: 7,
|
||||
request_id: RequestId::Integer(request_id),
|
||||
response: Box::new(sample_thread_goal_clear_response(cleared)),
|
||||
},
|
||||
&mut out,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
assert_eq!(out.len(), 1);
|
||||
let payload = serde_json::to_value(&out[0]).expect("serialize goal clear event");
|
||||
assert_eq!(payload["event_type"], json!("codex_goal_event"));
|
||||
assert_eq!(payload["event_params"]["goal_event_kind"], json!("cleared"));
|
||||
assert_eq!(
|
||||
payload["event_params"]["goal_event_source"],
|
||||
json!("app_server")
|
||||
);
|
||||
assert_eq!(payload["event_params"]["goal_status"], json!(null));
|
||||
assert_eq!(payload["event_params"]["turn_id"], json!(null));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn goal_set_error_discards_pending_request() {
|
||||
let mut reducer = AnalyticsReducer::default();
|
||||
let mut out = Vec::new();
|
||||
|
||||
ingest_initialized_thread(&mut reducer, &mut out, "thread-2").await;
|
||||
reducer
|
||||
.ingest(
|
||||
AnalyticsFact::ClientRequest {
|
||||
connection_id: 7,
|
||||
request_id: RequestId::Integer(2),
|
||||
request: Box::new(sample_thread_goal_set_request(
|
||||
"thread-2", /*request_id*/ 2,
|
||||
)),
|
||||
},
|
||||
&mut out,
|
||||
)
|
||||
.await;
|
||||
reducer
|
||||
.ingest(
|
||||
AnalyticsFact::ErrorResponse {
|
||||
connection_id: 7,
|
||||
request_id: RequestId::Integer(2),
|
||||
error: JSONRPCErrorError {
|
||||
code: -32600,
|
||||
message: "goal update failed".to_string(),
|
||||
data: None,
|
||||
},
|
||||
error_type: None,
|
||||
},
|
||||
&mut out,
|
||||
)
|
||||
.await;
|
||||
reducer
|
||||
.ingest(
|
||||
AnalyticsFact::ClientResponse {
|
||||
connection_id: 7,
|
||||
request_id: RequestId::Integer(2),
|
||||
response: Box::new(sample_thread_goal_set_response(
|
||||
"thread-2",
|
||||
ThreadGoalStatus::Active,
|
||||
)),
|
||||
},
|
||||
&mut out,
|
||||
)
|
||||
.await;
|
||||
|
||||
assert!(out.is_empty());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn goal_events_require_initialized_context_and_ignore_notifications() {
|
||||
let mut reducer = AnalyticsReducer::default();
|
||||
let mut out = Vec::new();
|
||||
|
||||
ingest_initialize(&mut reducer, &mut out).await;
|
||||
reducer
|
||||
.ingest(
|
||||
AnalyticsFact::ClientRequest {
|
||||
connection_id: 7,
|
||||
request_id: RequestId::Integer(2),
|
||||
request: Box::new(sample_thread_goal_set_request(
|
||||
"thread-2", /*request_id*/ 2,
|
||||
)),
|
||||
},
|
||||
&mut out,
|
||||
)
|
||||
.await;
|
||||
reducer
|
||||
.ingest(
|
||||
AnalyticsFact::ClientResponse {
|
||||
connection_id: 7,
|
||||
request_id: RequestId::Integer(2),
|
||||
response: Box::new(sample_thread_goal_set_response(
|
||||
"thread-2",
|
||||
ThreadGoalStatus::Active,
|
||||
)),
|
||||
},
|
||||
&mut out,
|
||||
)
|
||||
.await;
|
||||
assert!(out.is_empty());
|
||||
|
||||
reducer
|
||||
.ingest(
|
||||
AnalyticsFact::ClientResponse {
|
||||
connection_id: 7,
|
||||
request_id: RequestId::Integer(3),
|
||||
response: Box::new(sample_thread_start_response(
|
||||
"thread-2", /*ephemeral*/ false, "gpt-5",
|
||||
)),
|
||||
},
|
||||
&mut out,
|
||||
)
|
||||
.await;
|
||||
out.clear();
|
||||
reducer
|
||||
.ingest(
|
||||
AnalyticsFact::Notification(Box::new(ServerNotification::ThreadGoalUpdated(
|
||||
ThreadGoalUpdatedNotification {
|
||||
thread_id: "thread-2".to_string(),
|
||||
turn_id: None,
|
||||
goal: sample_thread_goal("thread-2", ThreadGoalStatus::Active),
|
||||
},
|
||||
))),
|
||||
&mut out,
|
||||
)
|
||||
.await;
|
||||
|
||||
assert!(out.is_empty());
|
||||
}
|
||||
|
||||
fn sample_plugin_metadata() -> PluginTelemetryMetadata {
|
||||
PluginTelemetryMetadata {
|
||||
plugin_id: PluginId::parse("sample@test").expect("valid plugin id"),
|
||||
|
||||
@@ -197,7 +197,10 @@ impl AnalyticsEventsClient {
|
||||
) {
|
||||
if !matches!(
|
||||
request,
|
||||
ClientRequest::TurnStart { .. } | ClientRequest::TurnSteer { .. }
|
||||
ClientRequest::TurnStart { .. }
|
||||
| ClientRequest::TurnSteer { .. }
|
||||
| ClientRequest::ThreadGoalSet { .. }
|
||||
| ClientRequest::ThreadGoalClear { .. }
|
||||
) {
|
||||
return;
|
||||
}
|
||||
@@ -311,6 +314,8 @@ impl AnalyticsEventsClient {
|
||||
| ClientResponsePayload::ThreadFork(_)
|
||||
| ClientResponsePayload::TurnStart(_)
|
||||
| ClientResponsePayload::TurnSteer(_)
|
||||
| ClientResponsePayload::ThreadGoalSet(_)
|
||||
| ClientResponsePayload::ThreadGoalClear(_)
|
||||
) {
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -19,6 +19,12 @@ use codex_app_server_protocol::Thread;
|
||||
use codex_app_server_protocol::ThreadArchiveParams;
|
||||
use codex_app_server_protocol::ThreadArchiveResponse;
|
||||
use codex_app_server_protocol::ThreadForkResponse;
|
||||
use codex_app_server_protocol::ThreadGoal;
|
||||
use codex_app_server_protocol::ThreadGoalClearParams;
|
||||
use codex_app_server_protocol::ThreadGoalClearResponse;
|
||||
use codex_app_server_protocol::ThreadGoalSetParams;
|
||||
use codex_app_server_protocol::ThreadGoalSetResponse;
|
||||
use codex_app_server_protocol::ThreadGoalStatus;
|
||||
use codex_app_server_protocol::ThreadResumeResponse;
|
||||
use codex_app_server_protocol::ThreadStartResponse;
|
||||
use codex_app_server_protocol::ThreadStatus as AppServerThreadStatus;
|
||||
@@ -117,6 +123,27 @@ fn sample_thread_archive_request() -> ClientRequest {
|
||||
}
|
||||
}
|
||||
|
||||
fn sample_thread_goal_set_request() -> ClientRequest {
|
||||
ClientRequest::ThreadGoalSet {
|
||||
request_id: RequestId::Integer(3),
|
||||
params: ThreadGoalSetParams {
|
||||
thread_id: "thread-1".to_string(),
|
||||
objective: Some("finish task".to_string()),
|
||||
status: Some(ThreadGoalStatus::Active),
|
||||
token_budget: None,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
fn sample_thread_goal_clear_request() -> ClientRequest {
|
||||
ClientRequest::ThreadGoalClear {
|
||||
request_id: RequestId::Integer(4),
|
||||
params: ThreadGoalClearParams {
|
||||
thread_id: "thread-1".to_string(),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
fn sample_thread(thread_id: &str) -> Thread {
|
||||
Thread {
|
||||
id: thread_id.to_string(),
|
||||
@@ -213,6 +240,25 @@ fn sample_turn_steer_response() -> ClientResponsePayload {
|
||||
})
|
||||
}
|
||||
|
||||
fn sample_thread_goal_set_response() -> ClientResponsePayload {
|
||||
ClientResponsePayload::ThreadGoalSet(ThreadGoalSetResponse {
|
||||
goal: ThreadGoal {
|
||||
thread_id: "thread-1".to_string(),
|
||||
objective: "finish task".to_string(),
|
||||
status: ThreadGoalStatus::Active,
|
||||
token_budget: None,
|
||||
tokens_used: 0,
|
||||
time_used_seconds: 0,
|
||||
created_at: 1,
|
||||
updated_at: 1,
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
fn sample_thread_goal_clear_response() -> ClientResponsePayload {
|
||||
ClientResponsePayload::ThreadGoalClear(ThreadGoalClearResponse { cleared: true })
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn track_request_only_enqueues_analytics_relevant_requests() {
|
||||
let (client, mut receiver) = client_with_receiver();
|
||||
@@ -220,6 +266,8 @@ fn track_request_only_enqueues_analytics_relevant_requests() {
|
||||
for (request_id, request) in [
|
||||
(RequestId::Integer(1), sample_turn_start_request()),
|
||||
(RequestId::Integer(2), sample_turn_steer_request()),
|
||||
(RequestId::Integer(3), sample_thread_goal_set_request()),
|
||||
(RequestId::Integer(4), sample_thread_goal_clear_request()),
|
||||
] {
|
||||
client.track_request(/*connection_id*/ 7, request_id, &request);
|
||||
assert!(matches!(
|
||||
@@ -231,7 +279,7 @@ fn track_request_only_enqueues_analytics_relevant_requests() {
|
||||
let ignored_request = sample_thread_archive_request();
|
||||
client.track_request(
|
||||
/*connection_id*/ 7,
|
||||
RequestId::Integer(3),
|
||||
RequestId::Integer(5),
|
||||
&ignored_request,
|
||||
);
|
||||
assert!(matches!(receiver.try_recv(), Err(TryRecvError::Empty)));
|
||||
@@ -247,6 +295,8 @@ fn track_response_only_enqueues_analytics_relevant_responses() {
|
||||
(RequestId::Integer(3), sample_thread_fork_response()),
|
||||
(RequestId::Integer(4), sample_turn_start_response()),
|
||||
(RequestId::Integer(5), sample_turn_steer_response()),
|
||||
(RequestId::Integer(6), sample_thread_goal_set_response()),
|
||||
(RequestId::Integer(7), sample_thread_goal_clear_response()),
|
||||
] {
|
||||
client.track_response(/*connection_id*/ 7, request_id, response);
|
||||
assert!(matches!(
|
||||
@@ -257,7 +307,7 @@ fn track_response_only_enqueues_analytics_relevant_responses() {
|
||||
|
||||
client.track_response(
|
||||
/*connection_id*/ 7,
|
||||
RequestId::Integer(6),
|
||||
RequestId::Integer(8),
|
||||
ClientResponsePayload::ThreadArchive(ThreadArchiveResponse {}),
|
||||
);
|
||||
assert!(matches!(receiver.try_recv(), Err(TryRecvError::Empty)));
|
||||
|
||||
@@ -62,6 +62,7 @@ pub(crate) enum TrackEventRequest {
|
||||
AppUsed(CodexAppUsedEventRequest),
|
||||
HookRun(CodexHookRunEventRequest),
|
||||
Compaction(Box<CodexCompactionEventRequest>),
|
||||
Goal(Box<CodexGoalEventRequest>),
|
||||
TurnEvent(Box<CodexTurnEventRequest>),
|
||||
TurnSteer(CodexTurnSteerEventRequest),
|
||||
CommandExecution(CodexCommandExecutionEventRequest),
|
||||
@@ -767,6 +768,51 @@ pub(crate) struct CodexCompactionEventRequest {
|
||||
pub(crate) event_params: CodexCompactionEventParams,
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Debug, Serialize)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub(crate) enum GoalEventKind {
|
||||
Set,
|
||||
Cleared,
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Debug, Serialize)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub(crate) enum GoalEventSource {
|
||||
AppServer,
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Debug, Serialize)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub(crate) enum GoalStatus {
|
||||
Active,
|
||||
Paused,
|
||||
Blocked,
|
||||
UsageLimited,
|
||||
BudgetLimited,
|
||||
Complete,
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
pub(crate) struct CodexGoalEventParams {
|
||||
pub(crate) thread_id: String,
|
||||
pub(crate) session_id: String,
|
||||
pub(crate) turn_id: Option<String>,
|
||||
pub(crate) app_server_client: CodexAppServerClientMetadata,
|
||||
pub(crate) runtime: CodexRuntimeMetadata,
|
||||
pub(crate) thread_source: Option<ThreadSource>,
|
||||
pub(crate) subagent_source: Option<String>,
|
||||
pub(crate) parent_thread_id: Option<String>,
|
||||
pub(crate) goal_event_kind: GoalEventKind,
|
||||
pub(crate) goal_event_source: GoalEventSource,
|
||||
pub(crate) goal_status: Option<GoalStatus>,
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
pub(crate) struct CodexGoalEventRequest {
|
||||
pub(crate) event_type: &'static str,
|
||||
pub(crate) event_params: CodexGoalEventParams,
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
pub(crate) struct CodexTurnEventParams {
|
||||
pub(crate) thread_id: String,
|
||||
|
||||
@@ -15,6 +15,8 @@ use crate::events::CodexDynamicToolCallEventParams;
|
||||
use crate::events::CodexDynamicToolCallEventRequest;
|
||||
use crate::events::CodexFileChangeEventParams;
|
||||
use crate::events::CodexFileChangeEventRequest;
|
||||
use crate::events::CodexGoalEventParams;
|
||||
use crate::events::CodexGoalEventRequest;
|
||||
use crate::events::CodexHookRunEventRequest;
|
||||
use crate::events::CodexImageGenerationEventParams;
|
||||
use crate::events::CodexImageGenerationEventRequest;
|
||||
@@ -33,6 +35,9 @@ use crate::events::CodexTurnSteerEventRequest;
|
||||
use crate::events::CodexWebSearchEventParams;
|
||||
use crate::events::CodexWebSearchEventRequest;
|
||||
use crate::events::FinalApprovalOutcome;
|
||||
use crate::events::GoalEventKind;
|
||||
use crate::events::GoalEventSource;
|
||||
use crate::events::GoalStatus;
|
||||
use crate::events::GuardianReviewEventParams;
|
||||
use crate::events::GuardianReviewEventPayload;
|
||||
use crate::events::GuardianReviewEventRequest;
|
||||
@@ -105,6 +110,7 @@ use codex_app_server_protocol::RequestPermissionProfile;
|
||||
use codex_app_server_protocol::ServerNotification;
|
||||
use codex_app_server_protocol::ServerRequest;
|
||||
use codex_app_server_protocol::ServerResponse;
|
||||
use codex_app_server_protocol::ThreadGoalStatus as AppServerThreadGoalStatus;
|
||||
use codex_app_server_protocol::ThreadItem;
|
||||
use codex_app_server_protocol::TurnSteerResponse;
|
||||
use codex_app_server_protocol::UserInput;
|
||||
@@ -212,6 +218,16 @@ impl<'a> AnalyticsDropSite<'a> {
|
||||
}
|
||||
}
|
||||
|
||||
fn goal(thread_id: &'a str) -> Self {
|
||||
Self {
|
||||
event_name: "goal",
|
||||
thread_id,
|
||||
turn_id: None,
|
||||
review_id: None,
|
||||
item_id: None,
|
||||
}
|
||||
}
|
||||
|
||||
fn turn(thread_id: &'a str, turn_id: &'a str) -> Self {
|
||||
Self {
|
||||
event_name: "turn",
|
||||
@@ -295,6 +311,8 @@ impl ThreadMetadataState {
|
||||
enum RequestState {
|
||||
TurnStart(PendingTurnStartState),
|
||||
TurnSteer(PendingTurnSteerState),
|
||||
GoalSet { thread_id: String },
|
||||
GoalClear { thread_id: String },
|
||||
}
|
||||
|
||||
struct PendingTurnStartState {
|
||||
@@ -592,6 +610,22 @@ impl AnalyticsReducer {
|
||||
}),
|
||||
);
|
||||
}
|
||||
ClientRequest::ThreadGoalSet { params, .. } => {
|
||||
self.requests.insert(
|
||||
(connection_id, request_id),
|
||||
RequestState::GoalSet {
|
||||
thread_id: params.thread_id,
|
||||
},
|
||||
);
|
||||
}
|
||||
ClientRequest::ThreadGoalClear { params, .. } => {
|
||||
self.requests.insert(
|
||||
(connection_id, request_id),
|
||||
RequestState::GoalClear {
|
||||
thread_id: params.thread_id,
|
||||
},
|
||||
);
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
@@ -816,6 +850,42 @@ impl AnalyticsReducer {
|
||||
} => {
|
||||
self.ingest_turn_steer_response(connection_id, request_id, response, out);
|
||||
}
|
||||
ClientResponse::ThreadGoalSet {
|
||||
request_id,
|
||||
response,
|
||||
} => {
|
||||
let Some(RequestState::GoalSet { thread_id }) =
|
||||
self.requests.remove(&(connection_id, request_id))
|
||||
else {
|
||||
return;
|
||||
};
|
||||
self.emit_goal_event(
|
||||
connection_id,
|
||||
thread_id,
|
||||
GoalEventKind::Set,
|
||||
Some(analytics_goal_status(response.goal.status)),
|
||||
out,
|
||||
);
|
||||
}
|
||||
ClientResponse::ThreadGoalClear {
|
||||
request_id,
|
||||
response,
|
||||
} => {
|
||||
let Some(RequestState::GoalClear { thread_id }) =
|
||||
self.requests.remove(&(connection_id, request_id))
|
||||
else {
|
||||
return;
|
||||
};
|
||||
if response.cleared {
|
||||
self.emit_goal_event(
|
||||
connection_id,
|
||||
thread_id,
|
||||
GoalEventKind::Cleared,
|
||||
/*goal_status*/ None,
|
||||
out,
|
||||
);
|
||||
}
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
@@ -1051,6 +1121,7 @@ impl AnalyticsReducer {
|
||||
out,
|
||||
);
|
||||
}
|
||||
RequestState::GoalSet { .. } | RequestState::GoalClear { .. } => {}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1404,6 +1475,44 @@ impl AnalyticsReducer {
|
||||
}));
|
||||
}
|
||||
|
||||
fn emit_goal_event(
|
||||
&self,
|
||||
connection_id: u64,
|
||||
thread_id: String,
|
||||
goal_event_kind: GoalEventKind,
|
||||
goal_status: Option<GoalStatus>,
|
||||
out: &mut Vec<TrackEventRequest>,
|
||||
) {
|
||||
let Some(connection_state) = self.connections.get(&connection_id) else {
|
||||
return;
|
||||
};
|
||||
let drop_site = AnalyticsDropSite::goal(&thread_id);
|
||||
let Some(thread_metadata) = self
|
||||
.threads
|
||||
.get(drop_site.thread_id)
|
||||
.and_then(|thread| thread.metadata.as_ref())
|
||||
else {
|
||||
warn_missing_analytics_context(&drop_site, MissingAnalyticsContext::ThreadMetadata);
|
||||
return;
|
||||
};
|
||||
out.push(TrackEventRequest::Goal(Box::new(CodexGoalEventRequest {
|
||||
event_type: "codex_goal_event",
|
||||
event_params: CodexGoalEventParams {
|
||||
thread_id,
|
||||
session_id: thread_metadata.session_id.clone(),
|
||||
turn_id: None,
|
||||
app_server_client: connection_state.app_server_client.clone(),
|
||||
runtime: connection_state.runtime.clone(),
|
||||
thread_source: thread_metadata.thread_source,
|
||||
subagent_source: thread_metadata.subagent_source.clone(),
|
||||
parent_thread_id: thread_metadata.parent_thread_id.clone(),
|
||||
goal_event_kind,
|
||||
goal_event_source: GoalEventSource::AppServer,
|
||||
goal_status,
|
||||
},
|
||||
})));
|
||||
}
|
||||
|
||||
fn emit_review_event(
|
||||
&mut self,
|
||||
pending_review: PendingReviewState,
|
||||
@@ -2571,6 +2680,17 @@ fn analytics_turn_status(status: codex_app_server_protocol::TurnStatus) -> Optio
|
||||
}
|
||||
}
|
||||
|
||||
fn analytics_goal_status(status: AppServerThreadGoalStatus) -> GoalStatus {
|
||||
match status {
|
||||
AppServerThreadGoalStatus::Active => GoalStatus::Active,
|
||||
AppServerThreadGoalStatus::Paused => GoalStatus::Paused,
|
||||
AppServerThreadGoalStatus::Blocked => GoalStatus::Blocked,
|
||||
AppServerThreadGoalStatus::UsageLimited => GoalStatus::UsageLimited,
|
||||
AppServerThreadGoalStatus::BudgetLimited => GoalStatus::BudgetLimited,
|
||||
AppServerThreadGoalStatus::Complete => GoalStatus::Complete,
|
||||
}
|
||||
}
|
||||
|
||||
fn num_input_images(input: &[UserInput]) -> usize {
|
||||
input
|
||||
.iter()
|
||||
|
||||
@@ -124,6 +124,30 @@ pub(crate) async fn wait_for_analytics_event(
|
||||
server: &MockServer,
|
||||
read_timeout: Duration,
|
||||
event_type: &str,
|
||||
) -> Result<Value> {
|
||||
wait_for_matching_analytics_event(server, read_timeout, |event| {
|
||||
event["event_type"] == event_type
|
||||
})
|
||||
.await
|
||||
}
|
||||
|
||||
pub(crate) async fn wait_for_analytics_event_param(
|
||||
server: &MockServer,
|
||||
read_timeout: Duration,
|
||||
event_type: &str,
|
||||
param_name: &str,
|
||||
param_value: &str,
|
||||
) -> Result<Value> {
|
||||
wait_for_matching_analytics_event(server, read_timeout, |event| {
|
||||
event["event_type"] == event_type && event["event_params"][param_name] == param_value
|
||||
})
|
||||
.await
|
||||
}
|
||||
|
||||
async fn wait_for_matching_analytics_event(
|
||||
server: &MockServer,
|
||||
read_timeout: Duration,
|
||||
matches_event: impl Fn(&Value) -> bool,
|
||||
) -> Result<Value> {
|
||||
timeout(read_timeout, async {
|
||||
loop {
|
||||
@@ -142,10 +166,7 @@ pub(crate) async fn wait_for_analytics_event(
|
||||
let Some(events) = payload["events"].as_array() else {
|
||||
continue;
|
||||
};
|
||||
if let Some(event) = events
|
||||
.iter()
|
||||
.find(|event| event["event_type"] == event_type)
|
||||
{
|
||||
if let Some(event) = events.iter().find(|event| matches_event(event)) {
|
||||
return Ok::<Value, anyhow::Error>(event.clone());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -96,6 +96,7 @@ use wiremock::matchers::path;
|
||||
use super::analytics::assert_basic_thread_initialized_event;
|
||||
use super::analytics::mount_analytics_capture;
|
||||
use super::analytics::thread_initialized_event;
|
||||
use super::analytics::wait_for_analytics_event_param;
|
||||
use super::analytics::wait_for_analytics_payload;
|
||||
|
||||
#[cfg(windows)]
|
||||
@@ -1165,7 +1166,8 @@ async fn thread_goal_set_edits_objective_without_resetting_usage() -> Result<()>
|
||||
async fn thread_goal_clear_deletes_goal_and_notifies() -> Result<()> {
|
||||
let server = create_mock_responses_server_repeating_assistant("Done").await;
|
||||
let codex_home = TempDir::new()?;
|
||||
create_config_toml(codex_home.path(), &server.uri())?;
|
||||
create_config_toml_with_chatgpt_base_url(codex_home.path(), &server.uri(), &server.uri())?;
|
||||
mount_analytics_capture(&server, codex_home.path()).await?;
|
||||
let config_path = codex_home.path().join("config.toml");
|
||||
let config = std::fs::read_to_string(&config_path)?;
|
||||
std::fs::write(
|
||||
@@ -1230,6 +1232,19 @@ async fn thread_goal_clear_deletes_goal_and_notifies() -> Result<()> {
|
||||
mcp.read_stream_until_notification_message("thread/goal/updated"),
|
||||
)
|
||||
.await??;
|
||||
let set_event = wait_for_analytics_event_param(
|
||||
&server,
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
"codex_goal_event",
|
||||
"goal_event_kind",
|
||||
"set",
|
||||
)
|
||||
.await?;
|
||||
assert_eq!(set_event["event_params"]["thread_id"], thread.id);
|
||||
assert_eq!(set_event["event_params"]["session_id"], thread.session_id);
|
||||
assert_eq!(set_event["event_params"]["turn_id"], json!(null));
|
||||
assert_eq!(set_event["event_params"]["goal_event_source"], "app_server");
|
||||
assert_eq!(set_event["event_params"]["goal_status"], "active");
|
||||
|
||||
let clear_id = mcp
|
||||
.send_raw_request(
|
||||
@@ -1252,6 +1267,21 @@ async fn thread_goal_clear_deletes_goal_and_notifies() -> Result<()> {
|
||||
mcp.read_stream_until_notification_message("thread/goal/cleared"),
|
||||
)
|
||||
.await??;
|
||||
let cleared_event = wait_for_analytics_event_param(
|
||||
&server,
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
"codex_goal_event",
|
||||
"goal_event_kind",
|
||||
"cleared",
|
||||
)
|
||||
.await?;
|
||||
assert_eq!(cleared_event["event_params"]["thread_id"], thread.id);
|
||||
assert_eq!(
|
||||
cleared_event["event_params"]["session_id"],
|
||||
thread.session_id
|
||||
);
|
||||
assert_eq!(cleared_event["event_params"]["turn_id"], json!(null));
|
||||
assert_eq!(cleared_event["event_params"]["goal_status"], json!(null));
|
||||
|
||||
let get_id = mcp
|
||||
.send_raw_request(
|
||||
|
||||
Reference in New Issue
Block a user