rebase onto client response changes

This commit is contained in:
Roy Han
2026-03-26 22:38:10 -07:00
5 changed files with 156 additions and 76 deletions

View File

@@ -1,4 +1,5 @@
use codex_app_server_protocol::ClientRequest;
use codex_app_server_protocol::ClientResponse;
use codex_app_server_protocol::InitializeParams;
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::ServerNotification;
@@ -90,7 +91,7 @@ pub enum TurnStatus {
}
#[derive(Clone)]
pub struct ThreadInitializedInput {
struct ThreadInitializedInput {
pub connection_id: u64,
pub thread_id: String,
pub model: String,
@@ -148,11 +149,15 @@ pub enum AnalyticsFact {
Request {
connection_id: u64,
request_id: RequestId,
request: ClientRequest,
request: Box<ClientRequest>,
},
Response {
connection_id: u64,
response: Box<ClientResponse>,
},
Notification {
connection_id: u64,
notification: ServerNotification,
notification: Box<ServerNotification>,
},
// Facts that do not naturally exist on the app-server protocol surface, or
// would require non-trivial protocol reshaping on this branch.
@@ -160,9 +165,6 @@ pub enum AnalyticsFact {
}
pub enum CustomAnalyticsFact {
// This remains custom on this branch because app-server-protocol does not
// yet expose a generic client response enum we can reduce over directly.
ThreadInitialized(ThreadInitializedInput),
TurnStarted(Box<TurnStartedInput>),
TurnCompleted(TurnCompletedInput),
SkillInvoked(SkillInvokedInput),
@@ -331,12 +333,6 @@ impl AnalyticsEventsClient {
});
}
pub fn track_thread_initialized(&self, input: ThreadInitializedInput) {
self.record_fact(AnalyticsFact::Custom(
CustomAnalyticsFact::ThreadInitialized(input),
));
}
pub fn track_app_mentioned(&self, tracking: TrackEventsContext, mentions: Vec<AppInvocation>) {
if mentions.is_empty() {
return;
@@ -421,6 +417,13 @@ impl AnalyticsEventsClient {
}
self.queue.try_send(input);
}
pub fn track_response(&self, connection_id: u64, response: ClientResponse) {
self.record_fact(AnalyticsFact::Response {
connection_id,
response: Box::new(response),
});
}
}
const ANALYTICS_EVENTS_QUEUE_SIZE: usize = 256;
@@ -599,14 +602,17 @@ impl AnalyticsReducer {
request_id: _request_id,
request: _request,
} => {}
AnalyticsFact::Response {
connection_id,
response,
} => {
self.ingest_response(connection_id, *response, out);
}
AnalyticsFact::Notification {
connection_id: _connection_id,
notification: _notification,
} => {}
AnalyticsFact::Custom(input) => match input {
CustomAnalyticsFact::ThreadInitialized(input) => {
self.ingest_thread_initialized(input, out);
}
CustomAnalyticsFact::TurnStarted(input) => {
self.ingest_turn_started(*input);
}
@@ -683,6 +689,42 @@ impl AnalyticsReducer {
)));
}
fn ingest_response(
&mut self,
connection_id: u64,
response: ClientResponse,
out: &mut Vec<TrackEventRequest>,
) {
let input = match response {
ClientResponse::ThreadStart { response, .. } => ThreadInitializedInput {
connection_id,
thread_id: response.thread.id,
model: response.model,
ephemeral: response.thread.ephemeral,
session_source: response.thread.source.into(),
initialization_mode: InitializationMode::New,
},
ClientResponse::ThreadResume { response, .. } => ThreadInitializedInput {
connection_id,
thread_id: response.thread.id,
model: response.model,
ephemeral: response.thread.ephemeral,
session_source: response.thread.source.into(),
initialization_mode: InitializationMode::Resumed,
},
ClientResponse::ThreadFork { response, .. } => ThreadInitializedInput {
connection_id,
thread_id: response.thread.id,
model: response.model,
ephemeral: response.thread.ephemeral,
session_source: response.thread.source.into(),
initialization_mode: InitializationMode::Forked,
},
_ => return,
};
self.ingest_thread_initialized(input, out);
}
async fn ingest_skill_invoked(
&mut self,
input: SkillInvokedInput,

View File

@@ -22,8 +22,18 @@ use super::codex_plugin_used_metadata;
use super::codex_thread_initialized_event_request;
use super::codex_turn_event_params;
use super::normalize_path_for_skill_id;
use codex_app_server_protocol::ApprovalsReviewer as AppServerApprovalsReviewer;
use codex_app_server_protocol::AskForApproval as AppServerAskForApproval;
use codex_app_server_protocol::ClientInfo;
use codex_app_server_protocol::ClientResponse;
use codex_app_server_protocol::InitializeParams;
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::SandboxPolicy as AppServerSandboxPolicy;
use codex_app_server_protocol::SessionSource as AppServerSessionSource;
use codex_app_server_protocol::Thread;
use codex_app_server_protocol::ThreadResumeResponse;
use codex_app_server_protocol::ThreadStartResponse;
use codex_app_server_protocol::ThreadStatus as AppServerThreadStatus;
use codex_login::default_client::originator;
use codex_plugin::AppConnectorId;
use codex_plugin::PluginCapabilitySummary;
@@ -46,6 +56,61 @@ use std::sync::Arc;
use std::sync::Mutex;
use tokio::sync::mpsc;
fn sample_thread(thread_id: &str, ephemeral: bool) -> Thread {
Thread {
id: thread_id.to_string(),
preview: "first prompt".to_string(),
ephemeral,
model_provider: "openai".to_string(),
created_at: 1,
updated_at: 2,
status: AppServerThreadStatus::Idle,
path: None,
cwd: PathBuf::from("/tmp"),
cli_version: "0.0.0".to_string(),
source: AppServerSessionSource::Exec,
agent_nickname: None,
agent_role: None,
git_info: None,
name: None,
turns: Vec::new(),
}
}
fn sample_thread_start_response(thread_id: &str, ephemeral: bool, model: &str) -> ClientResponse {
ClientResponse::ThreadStart {
request_id: RequestId::Integer(1),
response: ThreadStartResponse {
thread: sample_thread(thread_id, ephemeral),
model: model.to_string(),
model_provider: "openai".to_string(),
service_tier: None,
cwd: PathBuf::from("/tmp"),
approval_policy: AppServerAskForApproval::OnFailure,
approvals_reviewer: AppServerApprovalsReviewer::User,
sandbox: AppServerSandboxPolicy::DangerFullAccess,
reasoning_effort: None,
},
}
}
fn sample_thread_resume_response(thread_id: &str, ephemeral: bool, model: &str) -> ClientResponse {
ClientResponse::ThreadResume {
request_id: RequestId::Integer(2),
response: ThreadResumeResponse {
thread: sample_thread(thread_id, ephemeral),
model: model.to_string(),
model_provider: "openai".to_string(),
service_tier: None,
cwd: PathBuf::from("/tmp"),
approval_policy: AppServerAskForApproval::OnFailure,
approvals_reviewer: AppServerApprovalsReviewer::User,
sandbox: AppServerSandboxPolicy::DangerFullAccess,
reasoning_effort: None,
},
}
}
fn expected_absolute_path(path: &PathBuf) -> String {
std::fs::canonicalize(path)
.unwrap_or_else(|_| path.to_path_buf())
@@ -420,16 +485,14 @@ async fn initialize_caches_client_and_thread_lifecycle_publishes_once_initialize
reducer
.ingest(
AnalyticsFact::Custom(CustomAnalyticsFact::ThreadInitialized(
ThreadInitializedInput {
connection_id: 7,
thread_id: "thread-no-client".to_string(),
model: "gpt-5".to_string(),
ephemeral: false,
session_source: SessionSource::Exec,
initialization_mode: InitializationMode::New,
},
)),
AnalyticsFact::Response {
connection_id: 7,
response: Box::new(sample_thread_start_response(
"thread-no-client",
false,
"gpt-5",
)),
},
&mut events,
)
.await;
@@ -455,16 +518,10 @@ async fn initialize_caches_client_and_thread_lifecycle_publishes_once_initialize
reducer
.ingest(
AnalyticsFact::Custom(CustomAnalyticsFact::ThreadInitialized(
ThreadInitializedInput {
connection_id: 7,
thread_id: "thread-1".to_string(),
model: "gpt-5".to_string(),
ephemeral: true,
session_source: SessionSource::Exec,
initialization_mode: InitializationMode::Resumed,
},
)),
AnalyticsFact::Response {
connection_id: 7,
response: Box::new(sample_thread_resume_response("thread-1", true, "gpt-5")),
},
&mut events,
)
.await;

View File

@@ -15,7 +15,6 @@ pub use analytics_client::PluginStateChangedInput;
pub use analytics_client::PluginUsedInput;
pub use analytics_client::SkillInvocation;
pub use analytics_client::SkillInvokedInput;
pub use analytics_client::ThreadInitializedInput;
pub use analytics_client::TrackEventsContext;
pub use analytics_client::TurnCompletedInput;
pub use analytics_client::TurnStartedInput;

View File

@@ -32,6 +32,7 @@ use codex_app_server_protocol::CancelLoginAccountParams;
use codex_app_server_protocol::CancelLoginAccountResponse;
use codex_app_server_protocol::CancelLoginAccountStatus;
use codex_app_server_protocol::ClientRequest;
use codex_app_server_protocol::ClientResponse;
use codex_app_server_protocol::CodexErrorInfo as AppServerCodexErrorInfo;
use codex_app_server_protocol::CollaborationModeListParams;
use codex_app_server_protocol::CollaborationModeListResponse;
@@ -185,13 +186,11 @@ use codex_core::CodexAuth;
use codex_core::CodexThread;
use codex_core::Cursor as RolloutCursor;
use codex_core::ForkSnapshot;
use codex_core::InitializationMode;
use codex_core::NewThread;
use codex_core::RolloutRecorder;
use codex_core::SessionMeta;
use codex_core::SteerInputError;
use codex_core::ThreadConfigSnapshot;
use codex_core::ThreadInitializedInput;
use codex_core::ThreadManager;
use codex_core::ThreadSortKey as CoreThreadSortKey;
use codex_core::auth::AuthMode as CoreAuthMode;
@@ -2170,12 +2169,13 @@ impl CodexMessageProcessor {
};
listener_task_context
.analytics_events_client
.track_thread_initialized(thread_initialize_input(
request_id.connection_id,
&thread,
response.model.clone(),
InitializationMode::New,
));
.track_response(
request_id.connection_id.0,
ClientResponse::ThreadStart {
request_id: request_id.request_id.clone(),
response: response.clone(),
},
);
listener_task_context
.outgoing
@@ -3654,13 +3654,13 @@ impl CodexMessageProcessor {
sandbox: session_configured.sandbox_policy.into(),
reasoning_effort: session_configured.reasoning_effort,
};
self.analytics_events_client
.track_thread_initialized(thread_initialize_input(
request_id.connection_id,
&response.thread,
response.model.clone(),
InitializationMode::Resumed,
));
self.analytics_events_client.track_response(
request_id.connection_id.0,
ClientResponse::ThreadResume {
request_id: request_id.request_id.clone(),
response: response.clone(),
},
);
self.outgoing.send_response(request_id, response).await;
}
@@ -4268,13 +4268,13 @@ impl CodexMessageProcessor {
sandbox: session_configured.sandbox_policy.into(),
reasoning_effort: session_configured.reasoning_effort,
};
self.analytics_events_client
.track_thread_initialized(thread_initialize_input(
request_id.connection_id,
&thread,
response.model.clone(),
InitializationMode::Forked,
));
self.analytics_events_client.track_response(
request_id.connection_id.0,
ClientResponse::ThreadFork {
request_id: request_id.request_id.clone(),
response: response.clone(),
},
);
self.outgoing.send_response(request_id, response).await;
@@ -7894,22 +7894,6 @@ fn cloud_requirements_load_error(err: &std::io::Error) -> Option<&CloudRequireme
None
}
fn thread_initialize_input(
connection_id: ConnectionId,
thread: &Thread,
model: String,
initialization_mode: InitializationMode,
) -> ThreadInitializedInput {
ThreadInitializedInput {
connection_id: connection_id.0,
thread_id: thread.id.clone(),
model,
ephemeral: thread.ephemeral,
session_source: thread.source.clone().into(),
initialization_mode,
}
}
fn config_load_error(err: &std::io::Error) -> JSONRPCErrorError {
let data = cloud_requirements_load_error(err).map(|cloud_error| {
let mut data = serde_json::json!({

View File

@@ -136,8 +136,6 @@ pub use auth::CodexAuth;
pub use codex_analytics::AnalyticsEventsClient;
pub use codex_analytics::AnalyticsFact;
pub use codex_analytics::CustomAnalyticsFact;
pub use codex_analytics::InitializationMode;
pub use codex_analytics::ThreadInitializedInput;
mod default_client_forwarding;
/// Default Codex HTTP client headers and reqwest construction.