Compare commits

...

12 Commits

Author SHA1 Message Date
evan-oai
10f5cc6403 Fix thread resume response tracking 2026-04-30 17:35:26 -07:00
evan-oai
59013f0856 Merge remote-tracking branch 'origin/main' into codex/remote-execution-turn-analytics
# Conflicts:
#	codex-rs/analytics/src/analytics_client_tests.rs
#	codex-rs/app-server/src/message_processor.rs
2026-04-30 17:20:55 -07:00
evan-oai
1da26691f9 Refine thread execution environment values 2026-04-30 16:54:00 -07:00
evan-oai
515ea86159 Update thread start test params 2026-04-30 12:55:07 -07:00
evan-oai
c7b058c928 Trim remote attribution scope 2026-04-30 09:07:13 -07:00
evan-oai
8d6d520ca4 Preserve remote execution attribution 2026-04-30 08:38:34 -07:00
evan-oai
3205b65d11 Clear tracked analytics requests on errors 2026-04-30 08:14:41 -07:00
evan-oai
4a7a71be67 Track thread lifecycle requests 2026-04-30 08:07:21 -07:00
evan-oai
26e5796f28 Simplify thread metadata plumbing 2026-04-29 14:55:01 -07:00
evan-oai
ec69151398 Scope remote execution analytics to threads 2026-04-29 14:50:40 -07:00
evan-oai
6149431e00 Track remote execution on thread analytics 2026-04-29 14:28:44 -07:00
evan-oai
bce1b30945 Add remote execution environment to turn analytics 2026-04-29 13:19:25 -07:00
18 changed files with 371 additions and 10 deletions

View File

@@ -72,7 +72,10 @@ use codex_app_server_protocol::SessionSource as AppServerSessionSource;
use codex_app_server_protocol::Thread;
use codex_app_server_protocol::ThreadArchiveParams;
use codex_app_server_protocol::ThreadArchiveResponse;
use codex_app_server_protocol::ThreadExecutionEnvironment;
use codex_app_server_protocol::ThreadResumeParams;
use codex_app_server_protocol::ThreadResumeResponse;
use codex_app_server_protocol::ThreadStartParams;
use codex_app_server_protocol::ThreadStartResponse;
use codex_app_server_protocol::ThreadStatus as AppServerThreadStatus;
use codex_app_server_protocol::Turn;
@@ -815,6 +818,7 @@ fn thread_initialized_event_serializes_expected_shape() {
initialization_mode: ThreadInitializationMode::New,
subagent_source: None,
parent_thread_id: None,
execution_environment: Some(ThreadExecutionEnvironment::Ssh),
created_at: 1,
},
});
@@ -846,6 +850,7 @@ fn thread_initialized_event_serializes_expected_shape() {
"initialization_mode": "new",
"subagent_source": null,
"parent_thread_id": null,
"execution_environment": "ssh",
"created_at": 1
}
})
@@ -1821,6 +1826,7 @@ fn turn_event_serializes_expected_shape() {
initialization_mode: ThreadInitializationMode::New,
subagent_source: None,
parent_thread_id: None,
execution_environment: Some(ThreadExecutionEnvironment::Ssh),
model: Some("gpt-5".to_string()),
model_provider: "openai".to_string(),
sandbox_policy: Some("read_only"),
@@ -1882,6 +1888,7 @@ fn turn_event_serializes_expected_shape() {
"initialization_mode": "new",
"subagent_source": null,
"parent_thread_id": null,
"execution_environment": "ssh",
"model": "gpt-5",
"model_provider": "openai",
"sandbox_policy": "read_only",
@@ -2218,6 +2225,149 @@ async fn turn_lifecycle_emits_turn_event() {
assert_eq!(payload["event_params"]["total_tokens"], json!(321));
}
#[tokio::test]
async fn thread_execution_environment_flows_to_thread_turn_and_steer_events() {
let mut reducer = AnalyticsReducer::default();
let mut out = Vec::new();
ingest_initialize(&mut reducer, &mut out).await;
reducer
.ingest(
AnalyticsFact::ClientRequest {
connection_id: 7,
request_id: RequestId::Integer(1),
request: Box::new(ClientRequest::ThreadStart {
request_id: RequestId::Integer(1),
params: ThreadStartParams {
execution_environment: Some(ThreadExecutionEnvironment::Ssh),
..Default::default()
},
}),
},
&mut out,
)
.await;
reducer
.ingest(
AnalyticsFact::ClientResponse {
connection_id: 7,
request_id: RequestId::Integer(1),
response: Box::new(sample_thread_start_response(
"thread-2", /*ephemeral*/ false, "gpt-5",
)),
},
&mut out,
)
.await;
let thread_payload = serde_json::to_value(&out[0]).expect("serialize thread event");
assert_eq!(
thread_payload["event_params"]["execution_environment"],
json!("ssh")
);
out.clear();
reducer
.ingest(
AnalyticsFact::ClientRequest {
connection_id: 7,
request_id: RequestId::Integer(2),
request: Box::new(ClientRequest::ThreadResume {
request_id: RequestId::Integer(2),
params: ThreadResumeParams {
thread_id: "thread-2".to_string(),
..Default::default()
},
}),
},
&mut out,
)
.await;
reducer
.ingest(
AnalyticsFact::ClientResponse {
connection_id: 7,
request_id: RequestId::Integer(2),
response: Box::new(sample_thread_resume_response(
"thread-2", /*ephemeral*/ false, "gpt-5",
)),
},
&mut out,
)
.await;
let resume_payload = serde_json::to_value(&out[0]).expect("serialize resume thread event");
assert_eq!(
resume_payload["event_params"]["execution_environment"],
json!("ssh")
);
out.clear();
ingest_turn_prerequisites(
&mut reducer,
&mut out,
/*include_initialize*/ false,
/*include_resolved_config*/ true,
/*include_started*/ false,
/*include_token_usage*/ false,
)
.await;
reducer
.ingest(
AnalyticsFact::ClientRequest {
connection_id: 7,
request_id: RequestId::Integer(4),
request: Box::new(sample_turn_steer_request(
"thread-2", "turn-2", /*request_id*/ 4,
)),
},
&mut out,
)
.await;
reducer
.ingest(
AnalyticsFact::ClientResponse {
connection_id: 7,
request_id: RequestId::Integer(4),
response: Box::new(sample_turn_steer_response("turn-2")),
},
&mut out,
)
.await;
reducer
.ingest(
AnalyticsFact::Notification(Box::new(sample_turn_completed_notification(
"thread-2",
"turn-2",
AppServerTurnStatus::Completed,
/*codex_error_info*/ None,
))),
&mut out,
)
.await;
let turn_steer_event = out
.iter()
.find(|event| matches!(event, TrackEventRequest::TurnSteer(_)))
.expect("turn steer event should be emitted");
let turn_steer_payload =
serde_json::to_value(turn_steer_event).expect("serialize turn steer event");
assert_eq!(
turn_steer_payload["event_params"]["execution_environment"],
json!("ssh")
);
let turn_event = out
.iter()
.find(|event| matches!(event, TrackEventRequest::TurnEvent(_)))
.expect("turn event should be emitted");
let turn_payload = serde_json::to_value(turn_event).expect("serialize turn event");
assert_eq!(
turn_payload["event_params"]["execution_environment"],
json!("ssh")
);
}
#[tokio::test]
async fn accepted_steers_increment_turn_steer_count() {
let mut reducer = AnalyticsReducer::default();

View File

@@ -194,7 +194,11 @@ impl AnalyticsEventsClient {
) {
if !matches!(
request,
ClientRequest::TurnStart { .. } | ClientRequest::TurnSteer { .. }
ClientRequest::ThreadStart { .. }
| ClientRequest::ThreadResume { .. }
| ClientRequest::ThreadFork { .. }
| ClientRequest::TurnStart { .. }
| ClientRequest::TurnSteer { .. }
) {
return;
}

View File

@@ -12,8 +12,11 @@ use codex_app_server_protocol::SessionSource as AppServerSessionSource;
use codex_app_server_protocol::Thread;
use codex_app_server_protocol::ThreadArchiveParams;
use codex_app_server_protocol::ThreadArchiveResponse;
use codex_app_server_protocol::ThreadForkParams;
use codex_app_server_protocol::ThreadForkResponse;
use codex_app_server_protocol::ThreadResumeParams;
use codex_app_server_protocol::ThreadResumeResponse;
use codex_app_server_protocol::ThreadStartParams;
use codex_app_server_protocol::ThreadStartResponse;
use codex_app_server_protocol::ThreadStatus as AppServerThreadStatus;
use codex_app_server_protocol::Turn;
@@ -64,9 +67,36 @@ fn sample_turn_steer_request() -> ClientRequest {
}
}
fn sample_thread_start_request() -> ClientRequest {
ClientRequest::ThreadStart {
request_id: RequestId::Integer(3),
params: ThreadStartParams::default(),
}
}
fn sample_thread_resume_request() -> ClientRequest {
ClientRequest::ThreadResume {
request_id: RequestId::Integer(4),
params: ThreadResumeParams {
thread_id: "thread-1".to_string(),
..Default::default()
},
}
}
fn sample_thread_fork_request() -> ClientRequest {
ClientRequest::ThreadFork {
request_id: RequestId::Integer(5),
params: ThreadForkParams {
thread_id: "thread-1".to_string(),
..Default::default()
},
}
}
fn sample_thread_archive_request() -> ClientRequest {
ClientRequest::ThreadArchive {
request_id: RequestId::Integer(3),
request_id: RequestId::Integer(6),
params: ThreadArchiveParams {
thread_id: "thread-1".to_string(),
},
@@ -177,6 +207,9 @@ fn track_request_only_enqueues_analytics_relevant_requests() {
for (request_id, request) in [
(RequestId::Integer(1), sample_turn_start_request()),
(RequestId::Integer(2), sample_turn_steer_request()),
(RequestId::Integer(3), sample_thread_start_request()),
(RequestId::Integer(4), sample_thread_resume_request()),
(RequestId::Integer(5), sample_thread_fork_request()),
] {
client.track_request(/*connection_id*/ 7, request_id, &request);
assert!(matches!(
@@ -188,7 +221,7 @@ fn track_request_only_enqueues_analytics_relevant_requests() {
let ignored_request = sample_thread_archive_request();
client.track_request(
/*connection_id*/ 7,
RequestId::Integer(3),
RequestId::Integer(6),
&ignored_request,
);
assert!(matches!(receiver.try_recv(), Err(TryRecvError::Empty)));

View File

@@ -20,6 +20,7 @@ use crate::facts::TurnSteerResult;
use crate::facts::TurnSubmissionType;
use crate::now_unix_seconds;
use codex_app_server_protocol::CodexErrorInfo;
use codex_app_server_protocol::ThreadExecutionEnvironment;
use codex_login::default_client::originator;
use codex_plugin::PluginTelemetryMetadata;
use codex_protocol::approvals::NetworkApprovalProtocol;
@@ -114,6 +115,7 @@ pub(crate) struct ThreadInitializedEventParams {
pub(crate) initialization_mode: ThreadInitializationMode,
pub(crate) subagent_source: Option<String>,
pub(crate) parent_thread_id: Option<String>,
pub(crate) execution_environment: Option<ThreadExecutionEnvironment>,
pub(crate) created_at: u64,
}
@@ -466,6 +468,7 @@ pub(crate) struct CodexTurnEventParams {
pub(crate) initialization_mode: ThreadInitializationMode,
pub(crate) subagent_source: Option<String>,
pub(crate) parent_thread_id: Option<String>,
pub(crate) execution_environment: Option<ThreadExecutionEnvironment>,
pub(crate) model: Option<String>,
pub(crate) model_provider: String,
pub(crate) sandbox_policy: Option<&'static str>,
@@ -518,6 +521,7 @@ pub(crate) struct CodexTurnSteerEventParams {
pub(crate) thread_source: Option<String>,
pub(crate) subagent_source: Option<String>,
pub(crate) parent_thread_id: Option<String>,
pub(crate) execution_environment: Option<ThreadExecutionEnvironment>,
pub(crate) num_input_images: usize,
pub(crate) result: TurnSteerResult,
pub(crate) rejection_reason: Option<TurnSteerRejectionReason>,
@@ -723,6 +727,7 @@ pub(crate) fn subagent_thread_started_event_request(
parent_thread_id: input
.parent_thread_id
.or_else(|| subagent_parent_thread_id(&input.subagent_source)),
execution_environment: None,
created_at: input.created_at,
};
ThreadInitializedEvent {

View File

@@ -53,6 +53,7 @@ use codex_app_server_protocol::CodexErrorInfo;
use codex_app_server_protocol::InitializeParams;
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::ServerNotification;
use codex_app_server_protocol::ThreadExecutionEnvironment;
use codex_app_server_protocol::TurnSteerResponse;
use codex_app_server_protocol::UserInput;
use codex_git_utils::collect_git_info;
@@ -89,12 +90,14 @@ struct ThreadMetadataState {
initialization_mode: ThreadInitializationMode,
subagent_source: Option<String>,
parent_thread_id: Option<String>,
execution_environment: Option<ThreadExecutionEnvironment>,
}
impl ThreadMetadataState {
fn from_thread_metadata(
session_source: &SessionSource,
initialization_mode: ThreadInitializationMode,
execution_environment: Option<ThreadExecutionEnvironment>,
) -> Self {
let (subagent_source, parent_thread_id) = match session_source {
SessionSource::SubAgent(subagent_source) => (
@@ -114,11 +117,15 @@ impl ThreadMetadataState {
initialization_mode,
subagent_source,
parent_thread_id,
execution_environment,
}
}
}
enum RequestState {
ThreadInitialized {
execution_environment: Option<ThreadExecutionEnvironment>,
},
TurnStart(PendingTurnStartState),
TurnSteer(PendingTurnSteerState),
}
@@ -322,6 +329,30 @@ impl AnalyticsReducer {
request: ClientRequest,
) {
match request {
ClientRequest::ThreadStart { params, .. } => {
self.requests.insert(
(connection_id, request_id),
RequestState::ThreadInitialized {
execution_environment: params.execution_environment,
},
);
}
ClientRequest::ThreadResume { params, .. } => {
self.requests.insert(
(connection_id, request_id),
RequestState::ThreadInitialized {
execution_environment: params.execution_environment,
},
);
}
ClientRequest::ThreadFork { params, .. } => {
self.requests.insert(
(connection_id, request_id),
RequestState::ThreadInitialized {
execution_environment: params.execution_environment,
},
);
}
ClientRequest::TurnStart { params, .. } => {
self.requests.insert(
(connection_id, request_id),
@@ -500,30 +531,48 @@ impl AnalyticsReducer {
out: &mut Vec<TrackEventRequest>,
) {
match response {
ClientResponse::ThreadStart { response, .. } => {
ClientResponse::ThreadStart {
request_id,
response,
} => {
let execution_environment =
self.thread_initialized_execution_environment(connection_id, request_id);
self.emit_thread_initialized(
connection_id,
response.thread,
response.model,
ThreadInitializationMode::New,
execution_environment,
out,
);
}
ClientResponse::ThreadResume { response, .. } => {
ClientResponse::ThreadResume {
request_id,
response,
} => {
let execution_environment =
self.thread_initialized_execution_environment(connection_id, request_id);
self.emit_thread_initialized(
connection_id,
response.thread,
response.model,
ThreadInitializationMode::Resumed,
execution_environment,
out,
);
}
ClientResponse::ThreadFork { response, .. } => {
ClientResponse::ThreadFork {
request_id,
response,
} => {
let execution_environment =
self.thread_initialized_execution_environment(connection_id, request_id);
self.emit_thread_initialized(
connection_id,
response.thread,
response.model,
ThreadInitializationMode::Forked,
execution_environment,
out,
);
}
@@ -583,6 +632,7 @@ impl AnalyticsReducer {
out: &mut Vec<TrackEventRequest>,
) {
match request {
RequestState::ThreadInitialized { .. } => {}
RequestState::TurnStart(_) => {}
RequestState::TurnSteer(pending_request) => {
self.ingest_turn_steer_error_response(
@@ -677,6 +727,7 @@ impl AnalyticsReducer {
thread: codex_app_server_protocol::Thread,
model: String,
initialization_mode: ThreadInitializationMode,
execution_environment: Option<ThreadExecutionEnvironment>,
out: &mut Vec<TrackEventRequest>,
) {
let thread_source: SessionSource = thread.source.into();
@@ -684,8 +735,16 @@ impl AnalyticsReducer {
let Some(connection_state) = self.connections.get(&connection_id) else {
return;
};
let thread_metadata =
ThreadMetadataState::from_thread_metadata(&thread_source, initialization_mode);
let execution_environment = execution_environment.or_else(|| {
self.thread_metadata
.get(&thread_id)
.and_then(|thread_metadata| thread_metadata.execution_environment)
});
let thread_metadata = ThreadMetadataState::from_thread_metadata(
&thread_source,
initialization_mode,
execution_environment,
);
self.thread_connections
.insert(thread_id.clone(), connection_id);
self.thread_metadata
@@ -703,12 +762,26 @@ impl AnalyticsReducer {
initialization_mode,
subagent_source: thread_metadata.subagent_source,
parent_thread_id: thread_metadata.parent_thread_id,
execution_environment,
created_at: u64::try_from(thread.created_at).unwrap_or_default(),
},
},
));
}
fn thread_initialized_execution_environment(
&mut self,
connection_id: u64,
request_id: RequestId,
) -> Option<ThreadExecutionEnvironment> {
match self.requests.remove(&(connection_id, request_id)) {
Some(RequestState::ThreadInitialized {
execution_environment,
}) => execution_environment,
Some(RequestState::TurnStart(_)) | Some(RequestState::TurnSteer(_)) | None => None,
}
}
fn ingest_compaction(&mut self, input: CodexCompactionEvent, out: &mut Vec<TrackEventRequest>) {
let Some(connection_id) = self.thread_connections.get(&input.thread_id) else {
tracing::warn!(
@@ -805,6 +878,7 @@ impl AnalyticsReducer {
thread_source: thread_metadata.thread_source.map(str::to_string),
subagent_source: thread_metadata.subagent_source.clone(),
parent_thread_id: thread_metadata.parent_thread_id.clone(),
execution_environment: thread_metadata.execution_environment,
num_input_images: pending_request.num_input_images,
result,
rejection_reason,
@@ -919,6 +993,7 @@ fn codex_turn_event_params(
initialization_mode: thread_metadata.initialization_mode,
subagent_source: thread_metadata.subagent_source.clone(),
parent_thread_id: thread_metadata.parent_thread_id.clone(),
execution_environment: thread_metadata.execution_environment,
model: Some(model),
model_provider,
sandbox_policy: Some(sandbox_policy_mode(

View File

@@ -3401,6 +3401,14 @@
],
"type": "object"
},
"ThreadExecutionEnvironment": {
"enum": [
"local",
"ssh",
"remote_control"
],
"type": "string"
},
"ThreadForkParams": {
"description": "There are two ways to fork a thread: 1. By thread_id: load the thread from disk by thread_id and fork it into a new thread. 2. By path: load the thread from disk by path and fork it into a new thread.\n\nIf using path, the thread_id param will be ignored.\n\nPrefer using thread_id whenever possible.",
"properties": {

View File

@@ -15000,6 +15000,14 @@
"title": "ThreadCompactStartResponse",
"type": "object"
},
"ThreadExecutionEnvironment": {
"enum": [
"local",
"ssh",
"remote_control"
],
"type": "string"
},
"ThreadForkParams": {
"$schema": "http://json-schema.org/draft-07/schema#",
"description": "There are two ways to fork a thread: 1. By thread_id: load the thread from disk by thread_id and fork it into a new thread. 2. By path: load the thread from disk by path and fork it into a new thread.\n\nIf using path, the thread_id param will be ignored.\n\nPrefer using thread_id whenever possible.",
@@ -18232,4 +18240,4 @@
},
"title": "CodexAppServerProtocol",
"type": "object"
}
}

View File

@@ -12886,6 +12886,14 @@
"title": "ThreadCompactStartResponse",
"type": "object"
},
"ThreadExecutionEnvironment": {
"enum": [
"local",
"ssh",
"remote_control"
],
"type": "string"
},
"ThreadForkParams": {
"$schema": "http://json-schema.org/draft-07/schema#",
"description": "There are two ways to fork a thread: 1. By thread_id: load the thread from disk by thread_id and fork it into a new thread. 2. By path: load the thread from disk by path and fork it into a new thread.\n\nIf using path, the thread_id param will be ignored.\n\nPrefer using thread_id whenever possible.",
@@ -16117,4 +16125,4 @@
},
"title": "CodexAppServerProtocolV2",
"type": "object"
}
}

View File

@@ -137,6 +137,14 @@
"flex"
],
"type": "string"
},
"ThreadExecutionEnvironment": {
"enum": [
"local",
"ssh",
"remote_control"
],
"type": "string"
}
},
"description": "There are two ways to fork a thread: 1. By thread_id: load the thread from disk by thread_id and fork it into a new thread. 2. By path: load the thread from disk by path and fork it into a new thread.\n\nIf using path, the thread_id param will be ignored.\n\nPrefer using thread_id whenever possible.",

View File

@@ -995,6 +995,14 @@
"flex"
],
"type": "string"
},
"ThreadExecutionEnvironment": {
"enum": [
"local",
"ssh",
"remote_control"
],
"type": "string"
}
},
"description": "There are three ways to resume a thread: 1. By thread_id: load the thread from disk by thread_id and resume it. 2. By history: instantiate the thread from memory and resume it. 3. By path: load the thread from disk by path and resume it.\n\nThe precedence is: history > path > thread_id. If using history or path, the thread_id param will be ignored.\n\nPrefer using thread_id whenever possible.",

View File

@@ -172,6 +172,14 @@
],
"type": "string"
},
"ThreadExecutionEnvironment": {
"enum": [
"local",
"ssh",
"remote_control"
],
"type": "string"
},
"ThreadStartSource": {
"enum": [
"startup",

View File

@@ -0,0 +1,5 @@
// GENERATED CODE! DO NOT MODIFY BY HAND!
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
export type ThreadExecutionEnvironment = "local" | "ssh" | "remote_control";

View File

@@ -348,6 +348,7 @@ export type { ThreadArchivedNotification } from "./ThreadArchivedNotification";
export type { ThreadClosedNotification } from "./ThreadClosedNotification";
export type { ThreadCompactStartParams } from "./ThreadCompactStartParams";
export type { ThreadCompactStartResponse } from "./ThreadCompactStartResponse";
export type { ThreadExecutionEnvironment } from "./ThreadExecutionEnvironment";
export type { ThreadForkParams } from "./ThreadForkParams";
export type { ThreadForkResponse } from "./ThreadForkResponse";
export type { ThreadGoal } from "./ThreadGoal";

View File

@@ -3588,6 +3588,10 @@ pub struct ThreadStartParams {
pub ephemeral: Option<bool>,
#[ts(optional = nullable)]
pub session_start_source: Option<ThreadStartSource>,
/// Optional thread-scoped execution environment.
#[experimental("thread/start.executionEnvironment")]
#[ts(optional = nullable)]
pub execution_environment: Option<ThreadExecutionEnvironment>,
/// Optional sticky environments for this thread.
///
/// Omitted selects the default environment when environment access is
@@ -3740,6 +3744,10 @@ pub struct ThreadResumeParams {
/// `thread/turns/list` immediately after resuming.
#[serde(default, skip_serializing_if = "std::ops::Not::not")]
pub exclude_turns: bool,
/// Optional thread-scoped execution environment.
#[experimental("thread/resume.executionEnvironment")]
#[ts(optional = nullable)]
pub execution_environment: Option<ThreadExecutionEnvironment>,
/// If true, persist additional rollout EventMsg variants required to
/// reconstruct a richer thread history on subsequent resume/fork/read.
#[experimental("thread/resume.persistFullHistory")]
@@ -3844,6 +3852,10 @@ pub struct ThreadForkParams {
/// `thread/turns/list` immediately after forking.
#[serde(default, skip_serializing_if = "std::ops::Not::not")]
pub exclude_turns: bool,
/// Optional thread-scoped execution environment.
#[experimental("thread/fork.executionEnvironment")]
#[ts(optional = nullable)]
pub execution_environment: Option<ThreadExecutionEnvironment>,
/// If true, persist additional rollout EventMsg variants required to
/// reconstruct a richer thread history on subsequent resume/fork/read.
#[experimental("thread/fork.persistFullHistory")]
@@ -5475,6 +5487,15 @@ pub struct TurnEnvironmentParams {
pub cwd: AbsolutePathBuf,
}
#[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq, Eq, JsonSchema, TS)]
#[serde(rename_all = "snake_case")]
#[ts(rename_all = "snake_case", export_to = "v2/")]
pub enum ThreadExecutionEnvironment {
Local,
Ssh,
RemoteControl,
}
#[derive(
Serialize, Deserialize, Debug, Default, Clone, PartialEq, JsonSchema, TS, ExperimentalApi,
)]

View File

@@ -2466,6 +2466,7 @@ impl CodexMessageProcessor {
personality,
ephemeral,
session_start_source,
execution_environment: _execution_environment,
environments,
persist_extended_history,
} = params;
@@ -4285,6 +4286,7 @@ impl CodexMessageProcessor {
developer_instructions,
personality,
exclude_turns,
execution_environment: _execution_environment,
persist_extended_history,
} = params;
let include_turns = !exclude_turns;
@@ -4608,6 +4610,7 @@ impl CodexMessageProcessor {
let command = crate::thread_state::ThreadListenerCommand::SendThreadResumeResponse(
Box::new(crate::thread_state::PendingThreadResumeRequest {
request_id: request_id.clone(),
analytics_events_client: self.analytics_events_client.clone(),
history_items,
config_snapshot,
instruction_sources,
@@ -4863,6 +4866,7 @@ impl CodexMessageProcessor {
developer_instructions,
ephemeral,
exclude_turns,
execution_environment: _execution_environment,
persist_extended_history,
} = params;
let include_turns = !exclude_turns;
@@ -8427,6 +8431,11 @@ async fn handle_pending_thread_resume_request(
active_permission_profile,
reasoning_effort,
};
pending.analytics_events_client.track_response(
connection_id.0,
request_id.request_id.clone(),
ClientResponsePayload::ThreadResume(response.clone()),
);
let token_usage_thread = pending.include_turns.then(|| response.thread.clone());
outgoing.send_response(request_id, response).await;
// Match cold resume: metadata-only resume should attach the listener without
@@ -10429,6 +10438,7 @@ mod tests {
developer_instructions: None,
personality: None,
exclude_turns: false,
execution_environment: None,
persist_extended_history: false,
};
let config_snapshot = ThreadConfigSnapshot {

View File

@@ -628,6 +628,12 @@ impl OutgoingMessageSender {
request_id: ConnectionRequestId,
error: JSONRPCErrorError,
) {
self.analytics_events_client.track_error_response(
request_id.connection_id.0,
request_id.request_id.clone(),
error.clone(),
/*error_type*/ None,
);
let outgoing_message = OutgoingMessage::Error(OutgoingError {
id: request_id.request_id,
error,

View File

@@ -1,5 +1,6 @@
use crate::outgoing_message::ConnectionId;
use crate::outgoing_message::ConnectionRequestId;
use codex_analytics::AnalyticsEventsClient;
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::ThreadGoal;
use codex_app_server_protocol::ThreadHistoryBuilder;
@@ -26,6 +27,7 @@ type PendingInterruptQueue = Vec<ConnectionRequestId>;
pub(crate) struct PendingThreadResumeRequest {
pub(crate) request_id: ConnectionRequestId,
pub(crate) analytics_events_client: AnalyticsEventsClient,
pub(crate) history_items: Vec<RolloutItem>,
pub(crate) config_snapshot: ThreadConfigSnapshot,
pub(crate) instruction_sources: Vec<AbsolutePathBuf>,

View File

@@ -675,6 +675,7 @@ async fn skills_changed_notification_is_emitted_after_skill_change() -> Result<(
personality: None,
ephemeral: None,
session_start_source: None,
execution_environment: None,
dynamic_tools: None,
environments: None,
mock_experimental_field: None,