mirror of
https://github.com/openai/codex.git
synced 2026-06-02 19:31:59 +00:00
Compare commits
13 Commits
codex/nigh
...
aibrahim/c
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
0f1f48cded | ||
|
|
b2a1da0664 | ||
|
|
c20d45cbb9 | ||
|
|
48c903e48a | ||
|
|
f1953fe38b | ||
|
|
70103dcbea | ||
|
|
57c7a229b1 | ||
|
|
d03b11c3bf | ||
|
|
1b36ab6622 | ||
|
|
8e69d9ca12 | ||
|
|
f8529d6dfd | ||
|
|
c0ec966a2b | ||
|
|
5c07818724 |
@@ -4,6 +4,8 @@ use crate::events::CodexAcceptedLineFingerprintsEventParams;
|
||||
use crate::events::CodexAcceptedLineFingerprintsEventRequest;
|
||||
use crate::events::CodexAppMentionedEventRequest;
|
||||
use crate::events::CodexAppServerClientMetadata;
|
||||
use crate::events::CodexAppServerStartedEventParams;
|
||||
use crate::events::CodexAppServerStartedEventRequest;
|
||||
use crate::events::CodexAppUsedEventRequest;
|
||||
use crate::events::CodexCommandExecutionEventParams;
|
||||
use crate::events::CodexCommandExecutionEventRequest;
|
||||
@@ -15,6 +17,7 @@ use crate::events::CodexReviewEventParams;
|
||||
use crate::events::CodexReviewEventRequest;
|
||||
use crate::events::CodexRuntimeMetadata;
|
||||
use crate::events::CodexToolItemEventBase;
|
||||
use crate::events::CodexTurnEventParams;
|
||||
use crate::events::CodexTurnEventRequest;
|
||||
use crate::events::FinalApprovalOutcome;
|
||||
use crate::events::GuardianApprovalRequestSource;
|
||||
@@ -41,6 +44,7 @@ use crate::facts::AnalyticsFact;
|
||||
use crate::facts::AnalyticsJsonRpcError;
|
||||
use crate::facts::AppInvocation;
|
||||
use crate::facts::AppMentionedInput;
|
||||
use crate::facts::AppServerStartedInput;
|
||||
use crate::facts::AppUsedInput;
|
||||
use crate::facts::CodexCompactionEvent;
|
||||
use crate::facts::CompactionImplementation;
|
||||
@@ -61,10 +65,12 @@ use crate::facts::SkillInvocation;
|
||||
use crate::facts::SkillInvokedInput;
|
||||
use crate::facts::SubAgentThreadStartedInput;
|
||||
use crate::facts::ThreadInitializationMode;
|
||||
use crate::facts::ThreadStartTimingFact;
|
||||
use crate::facts::TrackEventsContext;
|
||||
use crate::facts::TurnResolvedConfigFact;
|
||||
use crate::facts::TurnStatus;
|
||||
use crate::facts::TurnSteerRequestError;
|
||||
use crate::facts::TurnTimingBreakdownFact;
|
||||
use crate::facts::TurnTokenUsageFact;
|
||||
use crate::reducer::AnalyticsReducer;
|
||||
use crate::reducer::normalize_path_for_skill_id;
|
||||
@@ -331,6 +337,16 @@ fn sample_turn_token_usage_fact(thread_id: &str, turn_id: &str) -> TurnTokenUsag
|
||||
}
|
||||
}
|
||||
|
||||
fn sample_turn_timing_breakdown_fact(thread_id: &str, turn_id: &str) -> TurnTimingBreakdownFact {
|
||||
TurnTimingBreakdownFact {
|
||||
turn_id: turn_id.to_string(),
|
||||
thread_id: thread_id.to_string(),
|
||||
request_start_delay_ms: Some(120),
|
||||
sampling_duration_ms: 900,
|
||||
blocking_tool_critical_path_duration_ms: 140,
|
||||
}
|
||||
}
|
||||
|
||||
fn sample_turn_completed_notification(
|
||||
thread_id: &str,
|
||||
turn_id: &str,
|
||||
@@ -1317,6 +1333,7 @@ fn thread_initialized_event_serializes_expected_shape() {
|
||||
initialization_mode: ThreadInitializationMode::New,
|
||||
subagent_source: None,
|
||||
parent_thread_id: None,
|
||||
thread_start_duration_ms: Some(321),
|
||||
created_at: 1,
|
||||
},
|
||||
});
|
||||
@@ -1348,12 +1365,46 @@ fn thread_initialized_event_serializes_expected_shape() {
|
||||
"initialization_mode": "new",
|
||||
"subagent_source": null,
|
||||
"parent_thread_id": null,
|
||||
"thread_start_duration_ms": 321,
|
||||
"created_at": 1
|
||||
}
|
||||
})
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn app_server_started_event_serializes_expected_shape() {
|
||||
let event = TrackEventRequest::AppServerStarted(CodexAppServerStartedEventRequest {
|
||||
event_type: "codex_app_server_started",
|
||||
event_params: CodexAppServerStartedEventParams {
|
||||
runtime: sample_runtime_metadata(),
|
||||
rpc_transport: AppServerRpcTransport::Websocket,
|
||||
startup_duration_ms: 987,
|
||||
completed_at: 12,
|
||||
},
|
||||
});
|
||||
|
||||
let payload = serde_json::to_value(&event).expect("serialize app-server started event");
|
||||
|
||||
assert_eq!(
|
||||
payload,
|
||||
json!({
|
||||
"event_type": "codex_app_server_started",
|
||||
"event_params": {
|
||||
"runtime": {
|
||||
"codex_rs_version": "0.1.0",
|
||||
"runtime_os": "macos",
|
||||
"runtime_os_version": "15.3.1",
|
||||
"runtime_arch": "aarch64"
|
||||
},
|
||||
"rpc_transport": "websocket",
|
||||
"startup_duration_ms": 987,
|
||||
"completed_at": 12
|
||||
}
|
||||
})
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn command_execution_event_serializes_expected_shape() {
|
||||
let event = TrackEventRequest::CommandExecution(CodexCommandExecutionEventRequest {
|
||||
@@ -1629,6 +1680,87 @@ async fn initialize_caches_client_and_thread_lifecycle_publishes_once_initialize
|
||||
payload[0]["event_params"]["runtime"]["runtime_arch"],
|
||||
"x86_64"
|
||||
);
|
||||
assert_eq!(
|
||||
payload[0]["event_params"]["thread_start_duration_ms"],
|
||||
json!(null)
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn thread_start_timing_fact_enriches_thread_initialized_event() {
|
||||
let mut reducer = AnalyticsReducer::default();
|
||||
let mut events = Vec::new();
|
||||
|
||||
ingest_initialize(&mut reducer, &mut events).await;
|
||||
reducer
|
||||
.ingest(
|
||||
AnalyticsFact::Custom(CustomAnalyticsFact::ThreadStartTiming(
|
||||
ThreadStartTimingFact {
|
||||
thread_id: "thread-1".to_string(),
|
||||
duration_ms: 222,
|
||||
},
|
||||
)),
|
||||
&mut events,
|
||||
)
|
||||
.await;
|
||||
reducer
|
||||
.ingest(
|
||||
AnalyticsFact::ClientResponse {
|
||||
connection_id: 7,
|
||||
request_id: RequestId::Integer(2),
|
||||
response: Box::new(sample_thread_start_response(
|
||||
"thread-1", /*ephemeral*/ true, "gpt-5",
|
||||
)),
|
||||
},
|
||||
&mut events,
|
||||
)
|
||||
.await;
|
||||
|
||||
let payload = serde_json::to_value(&events).expect("serialize events");
|
||||
assert_eq!(payload[0]["event_type"], json!("codex_thread_initialized"));
|
||||
assert_eq!(
|
||||
payload[0]["event_params"]["thread_start_duration_ms"],
|
||||
json!(222)
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn app_server_started_fact_emits_event() {
|
||||
let mut reducer = AnalyticsReducer::default();
|
||||
let mut events = Vec::new();
|
||||
|
||||
reducer
|
||||
.ingest(
|
||||
AnalyticsFact::Custom(CustomAnalyticsFact::AppServerStarted(
|
||||
AppServerStartedInput {
|
||||
runtime: sample_runtime_metadata(),
|
||||
rpc_transport: AppServerRpcTransport::Stdio,
|
||||
startup_duration_ms: 456,
|
||||
completed_at: 12,
|
||||
},
|
||||
)),
|
||||
&mut events,
|
||||
)
|
||||
.await;
|
||||
|
||||
let payload = serde_json::to_value(&events).expect("serialize events");
|
||||
assert_eq!(
|
||||
payload,
|
||||
json!([{
|
||||
"event_type": "codex_app_server_started",
|
||||
"event_params": {
|
||||
"runtime": {
|
||||
"codex_rs_version": "0.1.0",
|
||||
"runtime_os": "macos",
|
||||
"runtime_os_version": "15.3.1",
|
||||
"runtime_arch": "aarch64"
|
||||
},
|
||||
"rpc_transport": "stdio",
|
||||
"startup_duration_ms": 456,
|
||||
"completed_at": 12
|
||||
}
|
||||
}])
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
@@ -3180,7 +3312,7 @@ async fn reducer_ingests_plugin_state_changed_fact() {
|
||||
fn turn_event_serializes_expected_shape() {
|
||||
let event = TrackEventRequest::TurnEvent(Box::new(CodexTurnEventRequest {
|
||||
event_type: "codex_turn_event",
|
||||
event_params: crate::events::CodexTurnEventParams {
|
||||
event_params: CodexTurnEventParams {
|
||||
thread_id: "thread-2".to_string(),
|
||||
turn_id: "turn-2".to_string(),
|
||||
app_server_client: sample_app_server_client_metadata(),
|
||||
@@ -3221,6 +3353,11 @@ fn turn_event_serializes_expected_shape() {
|
||||
reasoning_output_tokens: None,
|
||||
total_tokens: None,
|
||||
duration_ms: Some(1234),
|
||||
request_start_delay_ms: Some(120),
|
||||
sampling_duration_ms: Some(900),
|
||||
blocking_tool_critical_path_duration_ms: Some(140),
|
||||
approval_wait_duration_ms: Some(0),
|
||||
finalize_duration_ms: Some(74),
|
||||
started_at: Some(455),
|
||||
completed_at: Some(456),
|
||||
},
|
||||
@@ -3282,6 +3419,11 @@ fn turn_event_serializes_expected_shape() {
|
||||
"reasoning_output_tokens": null,
|
||||
"total_tokens": null,
|
||||
"duration_ms": 1234,
|
||||
"request_start_delay_ms": 120,
|
||||
"sampling_duration_ms": 900,
|
||||
"blocking_tool_critical_path_duration_ms": 140,
|
||||
"approval_wait_duration_ms": 0,
|
||||
"finalize_duration_ms": 74,
|
||||
"started_at": 455,
|
||||
"completed_at": 456
|
||||
}
|
||||
@@ -3597,6 +3739,164 @@ async fn turn_lifecycle_emits_turn_event() {
|
||||
json!(13)
|
||||
);
|
||||
assert_eq!(payload["event_params"]["total_tokens"], json!(321));
|
||||
assert_eq!(
|
||||
payload["event_params"]["request_start_delay_ms"],
|
||||
json!(null)
|
||||
);
|
||||
assert_eq!(payload["event_params"]["sampling_duration_ms"], json!(null));
|
||||
assert_eq!(
|
||||
payload["event_params"]["blocking_tool_critical_path_duration_ms"],
|
||||
json!(null)
|
||||
);
|
||||
assert_eq!(
|
||||
payload["event_params"]["approval_wait_duration_ms"],
|
||||
json!(null)
|
||||
);
|
||||
assert_eq!(payload["event_params"]["finalize_duration_ms"], json!(null));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn turn_timing_breakdown_fact_enriches_turn_event() {
|
||||
let mut reducer = AnalyticsReducer::default();
|
||||
let mut out = Vec::new();
|
||||
|
||||
ingest_turn_prerequisites(
|
||||
&mut reducer,
|
||||
&mut out,
|
||||
/*include_initialize*/ true,
|
||||
/*include_resolved_config*/ true,
|
||||
/*include_started*/ true,
|
||||
/*include_token_usage*/ false,
|
||||
)
|
||||
.await;
|
||||
reducer
|
||||
.ingest(
|
||||
AnalyticsFact::Custom(CustomAnalyticsFact::TurnTimingBreakdown(Box::new(
|
||||
sample_turn_timing_breakdown_fact("thread-2", "turn-2"),
|
||||
))),
|
||||
&mut out,
|
||||
)
|
||||
.await;
|
||||
reducer
|
||||
.ingest(
|
||||
AnalyticsFact::Notification(Box::new(sample_turn_completed_notification(
|
||||
"thread-2",
|
||||
"turn-2",
|
||||
AppServerTurnStatus::Completed,
|
||||
/*codex_error_info*/ None,
|
||||
))),
|
||||
&mut out,
|
||||
)
|
||||
.await;
|
||||
|
||||
let payload = serde_json::to_value(&out[0]).expect("serialize turn event");
|
||||
assert_eq!(
|
||||
payload["event_params"]["request_start_delay_ms"],
|
||||
json!(120)
|
||||
);
|
||||
assert_eq!(payload["event_params"]["sampling_duration_ms"], json!(900));
|
||||
assert_eq!(
|
||||
payload["event_params"]["blocking_tool_critical_path_duration_ms"],
|
||||
json!(140)
|
||||
);
|
||||
assert_eq!(
|
||||
payload["event_params"]["approval_wait_duration_ms"],
|
||||
json!(0)
|
||||
);
|
||||
assert_eq!(payload["event_params"]["finalize_duration_ms"], json!(74));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn review_durations_roll_up_into_turn_approval_wait() {
|
||||
let mut reducer = AnalyticsReducer::default();
|
||||
let mut out = Vec::new();
|
||||
|
||||
ingest_turn_prerequisites(
|
||||
&mut reducer,
|
||||
&mut out,
|
||||
/*include_initialize*/ true,
|
||||
/*include_resolved_config*/ true,
|
||||
/*include_started*/ true,
|
||||
/*include_token_usage*/ false,
|
||||
)
|
||||
.await;
|
||||
reducer
|
||||
.ingest(
|
||||
AnalyticsFact::Custom(CustomAnalyticsFact::TurnTimingBreakdown(Box::new(
|
||||
TurnTimingBreakdownFact {
|
||||
turn_id: "turn-2".to_string(),
|
||||
thread_id: "thread-2".to_string(),
|
||||
request_start_delay_ms: Some(100),
|
||||
sampling_duration_ms: 400,
|
||||
blocking_tool_critical_path_duration_ms: 700,
|
||||
},
|
||||
))),
|
||||
&mut out,
|
||||
)
|
||||
.await;
|
||||
reducer
|
||||
.ingest(
|
||||
AnalyticsFact::ServerRequest {
|
||||
connection_id: 7,
|
||||
request: Box::new(ServerRequest::CommandExecutionRequestApproval {
|
||||
request_id: RequestId::Integer(99),
|
||||
params: CommandExecutionRequestApprovalParams {
|
||||
thread_id: "thread-2".to_string(),
|
||||
turn_id: "turn-2".to_string(),
|
||||
item_id: "item-1".to_string(),
|
||||
started_at_ms: 1_000,
|
||||
approval_id: None,
|
||||
reason: None,
|
||||
network_approval_context: None,
|
||||
command: Some("echo hi".to_string()),
|
||||
cwd: None,
|
||||
command_actions: None,
|
||||
additional_permissions: None,
|
||||
proposed_execpolicy_amendment: None,
|
||||
proposed_network_policy_amendments: None,
|
||||
available_decisions: None,
|
||||
},
|
||||
}),
|
||||
},
|
||||
&mut out,
|
||||
)
|
||||
.await;
|
||||
reducer
|
||||
.ingest(
|
||||
AnalyticsFact::ServerResponse {
|
||||
completed_at_ms: 1_600,
|
||||
response: Box::new(ServerResponse::CommandExecutionRequestApproval {
|
||||
request_id: RequestId::Integer(99),
|
||||
response: CommandExecutionRequestApprovalResponse {
|
||||
decision: CommandExecutionApprovalDecision::AllowOnce,
|
||||
},
|
||||
}),
|
||||
},
|
||||
&mut out,
|
||||
)
|
||||
.await;
|
||||
reducer
|
||||
.ingest(
|
||||
AnalyticsFact::Notification(Box::new(sample_turn_completed_notification(
|
||||
"thread-2",
|
||||
"turn-2",
|
||||
AppServerTurnStatus::Completed,
|
||||
/*codex_error_info*/ None,
|
||||
))),
|
||||
&mut out,
|
||||
)
|
||||
.await;
|
||||
|
||||
let turn_event = out
|
||||
.iter()
|
||||
.find(|event| matches!(event, TrackEventRequest::TurnEvent(_)))
|
||||
.expect("turn event should be emitted");
|
||||
let payload = serde_json::to_value(turn_event).expect("serialize turn event");
|
||||
assert_eq!(
|
||||
payload["event_params"]["approval_wait_duration_ms"],
|
||||
json!(600)
|
||||
);
|
||||
assert_eq!(payload["event_params"]["finalize_duration_ms"], json!(34));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
|
||||
@@ -8,18 +8,24 @@ use crate::facts::AnalyticsFact;
|
||||
use crate::facts::AnalyticsJsonRpcError;
|
||||
use crate::facts::AppInvocation;
|
||||
use crate::facts::AppMentionedInput;
|
||||
use crate::facts::AppServerStartedInput;
|
||||
use crate::facts::AppUsedInput;
|
||||
use crate::facts::CodexCompactionEvent;
|
||||
use crate::facts::CustomAnalyticsFact;
|
||||
use crate::facts::HookRunFact;
|
||||
use crate::facts::HookRunInput;
|
||||
use crate::facts::PluginState;
|
||||
use crate::facts::PluginStateChangedInput;
|
||||
use crate::facts::PluginUsedInput;
|
||||
use crate::facts::SkillInvocation;
|
||||
use crate::facts::SkillInvokedInput;
|
||||
use crate::facts::SubAgentThreadStartedInput;
|
||||
use crate::facts::ThreadStartTimingFact;
|
||||
use crate::facts::TrackEventsContext;
|
||||
use crate::facts::TurnResolvedConfigFact;
|
||||
use crate::facts::TurnTimingBreakdownFact;
|
||||
use crate::facts::TurnTokenUsageFact;
|
||||
use crate::now_unix_seconds;
|
||||
use crate::reducer::AnalyticsReducer;
|
||||
use codex_app_server_protocol::ClientRequest;
|
||||
use codex_app_server_protocol::ClientResponsePayload;
|
||||
@@ -38,12 +44,36 @@ use std::collections::HashSet;
|
||||
use std::sync::Arc;
|
||||
use std::sync::Mutex;
|
||||
use std::time::Duration;
|
||||
use std::time::Instant;
|
||||
use tokio::sync::mpsc;
|
||||
|
||||
const ANALYTICS_EVENTS_QUEUE_SIZE: usize = 256;
|
||||
const ANALYTICS_EVENTS_TIMEOUT: Duration = Duration::from_secs(10);
|
||||
const ANALYTICS_EVENT_DEDUPE_MAX_KEYS: usize = 4096;
|
||||
|
||||
#[derive(Clone, Copy, Debug)]
|
||||
pub struct StartedTimer {
|
||||
started_at: Instant,
|
||||
}
|
||||
|
||||
impl StartedTimer {
|
||||
#[must_use]
|
||||
pub fn start() -> Self {
|
||||
Self {
|
||||
started_at: Instant::now(),
|
||||
}
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
pub fn elapsed(self) -> Duration {
|
||||
self.started_at.elapsed()
|
||||
}
|
||||
|
||||
fn elapsed_ms(self) -> u64 {
|
||||
self.elapsed().as_millis().try_into().unwrap_or(u64::MAX)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub(crate) struct AnalyticsEventsQueue {
|
||||
pub(crate) sender: mpsc::Sender<AnalyticsFact>,
|
||||
@@ -163,12 +193,36 @@ impl AnalyticsEventsClient {
|
||||
});
|
||||
}
|
||||
|
||||
pub fn track_app_server_started(
|
||||
&self,
|
||||
rpc_transport: AppServerRpcTransport,
|
||||
timer: StartedTimer,
|
||||
) {
|
||||
self.record_fact(AnalyticsFact::Custom(
|
||||
CustomAnalyticsFact::AppServerStarted(AppServerStartedInput {
|
||||
runtime: current_runtime_metadata(),
|
||||
rpc_transport,
|
||||
startup_duration_ms: timer.elapsed_ms(),
|
||||
completed_at: now_unix_seconds(),
|
||||
}),
|
||||
));
|
||||
}
|
||||
|
||||
pub fn track_subagent_thread_started(&self, input: SubAgentThreadStartedInput) {
|
||||
self.record_fact(AnalyticsFact::Custom(
|
||||
CustomAnalyticsFact::SubAgentThreadStarted(input),
|
||||
));
|
||||
}
|
||||
|
||||
pub fn track_thread_start_timing(&self, thread_id: String, timer: StartedTimer) {
|
||||
self.record_fact(AnalyticsFact::Custom(
|
||||
CustomAnalyticsFact::ThreadStartTiming(ThreadStartTimingFact {
|
||||
thread_id,
|
||||
duration_ms: timer.elapsed_ms(),
|
||||
}),
|
||||
));
|
||||
}
|
||||
|
||||
pub fn track_guardian_review(
|
||||
&self,
|
||||
tracking: &GuardianReviewTrackContext,
|
||||
@@ -234,11 +288,11 @@ impl AnalyticsEventsClient {
|
||||
return;
|
||||
}
|
||||
self.record_fact(AnalyticsFact::Custom(CustomAnalyticsFact::PluginUsed(
|
||||
crate::facts::PluginUsedInput { tracking, plugin },
|
||||
PluginUsedInput { tracking, plugin },
|
||||
)));
|
||||
}
|
||||
|
||||
pub fn track_compaction(&self, event: crate::facts::CodexCompactionEvent) {
|
||||
pub fn track_compaction(&self, event: CodexCompactionEvent) {
|
||||
self.record_fact(AnalyticsFact::Custom(CustomAnalyticsFact::Compaction(
|
||||
Box::new(event),
|
||||
)));
|
||||
@@ -256,6 +310,12 @@ impl AnalyticsEventsClient {
|
||||
)));
|
||||
}
|
||||
|
||||
pub fn track_turn_timing(&self, fact: TurnTimingBreakdownFact) {
|
||||
self.record_fact(AnalyticsFact::Custom(
|
||||
CustomAnalyticsFact::TurnTimingBreakdown(Box::new(fact)),
|
||||
));
|
||||
}
|
||||
|
||||
pub fn track_plugin_installed(&self, plugin: PluginTelemetryMetadata) {
|
||||
self.record_fact(AnalyticsFact::Custom(
|
||||
CustomAnalyticsFact::PluginStateChanged(PluginStateChangedInput {
|
||||
|
||||
@@ -56,6 +56,7 @@ pub(crate) struct TrackEventsRequest {
|
||||
#[serde(untagged)]
|
||||
pub(crate) enum TrackEventRequest {
|
||||
SkillInvocation(SkillInvocationEventRequest),
|
||||
AppServerStarted(CodexAppServerStartedEventRequest),
|
||||
ThreadInitialized(ThreadInitializedEvent),
|
||||
GuardianReview(Box<GuardianReviewEventRequest>),
|
||||
AppMentioned(CodexAppMentionedEventRequest),
|
||||
@@ -144,6 +145,20 @@ pub(crate) struct CodexRuntimeMetadata {
|
||||
pub(crate) runtime_arch: String,
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
pub(crate) struct CodexAppServerStartedEventParams {
|
||||
pub(crate) runtime: CodexRuntimeMetadata,
|
||||
pub(crate) rpc_transport: AppServerRpcTransport,
|
||||
pub(crate) startup_duration_ms: u64,
|
||||
pub(crate) completed_at: u64,
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
pub(crate) struct CodexAppServerStartedEventRequest {
|
||||
pub(crate) event_type: &'static str,
|
||||
pub(crate) event_params: CodexAppServerStartedEventParams,
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
pub(crate) struct ThreadInitializedEventParams {
|
||||
pub(crate) thread_id: String,
|
||||
@@ -155,6 +170,7 @@ pub(crate) struct ThreadInitializedEventParams {
|
||||
pub(crate) initialization_mode: ThreadInitializationMode,
|
||||
pub(crate) subagent_source: Option<String>,
|
||||
pub(crate) parent_thread_id: Option<String>,
|
||||
pub(crate) thread_start_duration_ms: Option<u64>,
|
||||
pub(crate) created_at: u64,
|
||||
}
|
||||
|
||||
@@ -808,6 +824,11 @@ pub(crate) struct CodexTurnEventParams {
|
||||
pub(crate) reasoning_output_tokens: Option<i64>,
|
||||
pub(crate) total_tokens: Option<i64>,
|
||||
pub(crate) duration_ms: Option<u64>,
|
||||
pub(crate) request_start_delay_ms: Option<u64>,
|
||||
pub(crate) sampling_duration_ms: Option<u64>,
|
||||
pub(crate) blocking_tool_critical_path_duration_ms: Option<u64>,
|
||||
pub(crate) approval_wait_duration_ms: Option<u64>,
|
||||
pub(crate) finalize_duration_ms: Option<u64>,
|
||||
pub(crate) started_at: Option<u64>,
|
||||
pub(crate) completed_at: Option<u64>,
|
||||
}
|
||||
@@ -1041,6 +1062,7 @@ pub(crate) fn subagent_thread_started_event_request(
|
||||
parent_thread_id: input
|
||||
.parent_thread_id
|
||||
.or_else(|| subagent_parent_thread_id(&input.subagent_source)),
|
||||
thread_start_duration_ms: None,
|
||||
created_at: input.created_at,
|
||||
};
|
||||
ThreadInitializedEvent {
|
||||
|
||||
@@ -99,6 +99,15 @@ pub struct TurnTokenUsageFact {
|
||||
pub token_usage: TokenUsage,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct TurnTimingBreakdownFact {
|
||||
pub turn_id: String,
|
||||
pub thread_id: String,
|
||||
pub request_start_delay_ms: Option<u64>,
|
||||
pub sampling_duration_ms: u64,
|
||||
pub blocking_tool_critical_path_duration_ms: u64,
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Debug, Serialize)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub enum TurnStatus {
|
||||
@@ -273,6 +282,18 @@ pub struct CodexCompactionEvent {
|
||||
pub duration_ms: Option<u64>,
|
||||
}
|
||||
|
||||
pub(crate) struct AppServerStartedInput {
|
||||
pub runtime: CodexRuntimeMetadata,
|
||||
pub rpc_transport: AppServerRpcTransport,
|
||||
pub startup_duration_ms: u64,
|
||||
pub completed_at: u64,
|
||||
}
|
||||
|
||||
pub(crate) struct ThreadStartTimingFact {
|
||||
pub thread_id: String,
|
||||
pub duration_ms: u64,
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
pub(crate) enum AnalyticsFact {
|
||||
Initialize {
|
||||
@@ -322,11 +343,14 @@ pub(crate) enum AnalyticsFact {
|
||||
}
|
||||
|
||||
pub(crate) enum CustomAnalyticsFact {
|
||||
AppServerStarted(AppServerStartedInput),
|
||||
SubAgentThreadStarted(SubAgentThreadStartedInput),
|
||||
ThreadStartTiming(ThreadStartTimingFact),
|
||||
Compaction(Box<CodexCompactionEvent>),
|
||||
GuardianReview(Box<GuardianReviewEventParams>),
|
||||
TurnResolvedConfig(Box<TurnResolvedConfigFact>),
|
||||
TurnTokenUsage(Box<TurnTokenUsageFact>),
|
||||
TurnTimingBreakdown(Box<TurnTimingBreakdownFact>),
|
||||
SkillInvoked(SkillInvokedInput),
|
||||
AppMentioned(AppMentionedInput),
|
||||
AppUsed(AppUsedInput),
|
||||
|
||||
@@ -10,6 +10,7 @@ use std::time::UNIX_EPOCH;
|
||||
pub use accepted_lines::accepted_line_fingerprints_from_unified_diff;
|
||||
pub use accepted_lines::fingerprint_hash;
|
||||
pub use client::AnalyticsEventsClient;
|
||||
pub use client::StartedTimer;
|
||||
pub use events::AppServerRpcTransport;
|
||||
pub use events::GuardianApprovalRequestSource;
|
||||
pub use events::GuardianReviewAnalyticsResult;
|
||||
@@ -43,6 +44,7 @@ pub use facts::TurnStatus;
|
||||
pub use facts::TurnSteerRejectionReason;
|
||||
pub use facts::TurnSteerRequestError;
|
||||
pub use facts::TurnSteerResult;
|
||||
pub use facts::TurnTimingBreakdownFact;
|
||||
pub use facts::TurnTokenUsageFact;
|
||||
pub use facts::build_track_events_context;
|
||||
|
||||
|
||||
@@ -5,6 +5,8 @@ use crate::accepted_lines::accepted_line_repo_hash_for_cwd;
|
||||
use crate::events::AppServerRpcTransport;
|
||||
use crate::events::CodexAppMentionedEventRequest;
|
||||
use crate::events::CodexAppServerClientMetadata;
|
||||
use crate::events::CodexAppServerStartedEventParams;
|
||||
use crate::events::CodexAppServerStartedEventRequest;
|
||||
use crate::events::CodexAppUsedEventRequest;
|
||||
use crate::events::CodexCollabAgentToolCallEventParams;
|
||||
use crate::events::CodexCollabAgentToolCallEventRequest;
|
||||
@@ -61,6 +63,7 @@ use crate::events::subagent_thread_started_event_request;
|
||||
use crate::facts::AnalyticsFact;
|
||||
use crate::facts::AnalyticsJsonRpcError;
|
||||
use crate::facts::AppMentionedInput;
|
||||
use crate::facts::AppServerStartedInput;
|
||||
use crate::facts::AppUsedInput;
|
||||
use crate::facts::CodexCompactionEvent;
|
||||
use crate::facts::CustomAnalyticsFact;
|
||||
@@ -71,10 +74,12 @@ use crate::facts::PluginUsedInput;
|
||||
use crate::facts::SkillInvokedInput;
|
||||
use crate::facts::SubAgentThreadStartedInput;
|
||||
use crate::facts::ThreadInitializationMode;
|
||||
use crate::facts::ThreadStartTimingFact;
|
||||
use crate::facts::TurnResolvedConfigFact;
|
||||
use crate::facts::TurnStatus;
|
||||
use crate::facts::TurnSteerRejectionReason;
|
||||
use crate::facts::TurnSteerResult;
|
||||
use crate::facts::TurnTimingBreakdownFact;
|
||||
use crate::facts::TurnTokenUsageFact;
|
||||
use crate::now_unix_seconds;
|
||||
use crate::option_i64_to_u64;
|
||||
@@ -147,6 +152,7 @@ struct ConnectionState {
|
||||
struct ThreadAnalyticsState {
|
||||
connection_id: Option<u64>,
|
||||
metadata: Option<ThreadMetadataState>,
|
||||
thread_start_duration_ms: Option<u64>,
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy)]
|
||||
@@ -314,6 +320,15 @@ struct CompletedTurnState {
|
||||
duration_ms: Option<u64>,
|
||||
}
|
||||
|
||||
#[derive(Clone, Default)]
|
||||
struct TurnTimingState {
|
||||
request_start_delay_ms: Option<u64>,
|
||||
sampling_duration_ms: Option<u64>,
|
||||
blocking_tool_critical_path_duration_ms: Option<u64>,
|
||||
approval_wait_duration_ms: u64,
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
struct TurnState {
|
||||
connection_id: Option<u64>,
|
||||
thread_id: Option<String>,
|
||||
@@ -325,6 +340,7 @@ struct TurnState {
|
||||
latest_diff: Option<String>,
|
||||
steer_count: usize,
|
||||
tool_counts: TurnToolCounts,
|
||||
timing: TurnTimingState,
|
||||
}
|
||||
|
||||
#[derive(Hash, Eq, PartialEq)]
|
||||
@@ -446,9 +462,15 @@ impl AnalyticsReducer {
|
||||
self.ingest_server_request_aborted(completed_at_ms, request_id, out);
|
||||
}
|
||||
AnalyticsFact::Custom(input) => match input {
|
||||
CustomAnalyticsFact::AppServerStarted(input) => {
|
||||
self.ingest_app_server_started(input, out);
|
||||
}
|
||||
CustomAnalyticsFact::SubAgentThreadStarted(input) => {
|
||||
self.ingest_subagent_thread_started(input, out);
|
||||
}
|
||||
CustomAnalyticsFact::ThreadStartTiming(input) => {
|
||||
self.ingest_thread_start_timing(input);
|
||||
}
|
||||
CustomAnalyticsFact::Compaction(input) => {
|
||||
self.ingest_compaction(*input, out);
|
||||
}
|
||||
@@ -461,6 +483,9 @@ impl AnalyticsReducer {
|
||||
CustomAnalyticsFact::TurnTokenUsage(input) => {
|
||||
self.ingest_turn_token_usage(*input, out).await;
|
||||
}
|
||||
CustomAnalyticsFact::TurnTimingBreakdown(input) => {
|
||||
self.ingest_turn_timing_breakdown(*input, out).await;
|
||||
}
|
||||
CustomAnalyticsFact::SkillInvoked(input) => {
|
||||
self.ingest_skill_invoked(input, out).await;
|
||||
}
|
||||
@@ -508,6 +533,24 @@ impl AnalyticsReducer {
|
||||
);
|
||||
}
|
||||
|
||||
fn ingest_app_server_started(
|
||||
&mut self,
|
||||
input: AppServerStartedInput,
|
||||
out: &mut Vec<TrackEventRequest>,
|
||||
) {
|
||||
out.push(TrackEventRequest::AppServerStarted(
|
||||
CodexAppServerStartedEventRequest {
|
||||
event_type: "codex_app_server_started",
|
||||
event_params: CodexAppServerStartedEventParams {
|
||||
runtime: input.runtime,
|
||||
rpc_transport: input.rpc_transport,
|
||||
startup_duration_ms: input.startup_duration_ms,
|
||||
completed_at: input.completed_at,
|
||||
},
|
||||
},
|
||||
));
|
||||
}
|
||||
|
||||
fn ingest_subagent_thread_started(
|
||||
&mut self,
|
||||
input: SubAgentThreadStartedInput,
|
||||
@@ -538,6 +581,13 @@ impl AnalyticsReducer {
|
||||
));
|
||||
}
|
||||
|
||||
fn ingest_thread_start_timing(&mut self, input: ThreadStartTimingFact) {
|
||||
self.threads
|
||||
.entry(input.thread_id)
|
||||
.or_default()
|
||||
.thread_start_duration_ms = Some(input.duration_ms);
|
||||
}
|
||||
|
||||
fn ingest_guardian_review(
|
||||
&mut self,
|
||||
input: GuardianReviewEventParams,
|
||||
@@ -599,18 +649,7 @@ impl AnalyticsReducer {
|
||||
let turn_id = input.turn_id.clone();
|
||||
let thread_id = input.thread_id.clone();
|
||||
let num_input_images = input.num_input_images;
|
||||
let turn_state = self.turns.entry(turn_id.clone()).or_insert(TurnState {
|
||||
connection_id: None,
|
||||
thread_id: None,
|
||||
num_input_images: None,
|
||||
resolved_config: None,
|
||||
started_at: None,
|
||||
token_usage: None,
|
||||
completed: None,
|
||||
latest_diff: None,
|
||||
steer_count: 0,
|
||||
tool_counts: TurnToolCounts::default(),
|
||||
});
|
||||
let turn_state = self.turns.entry(turn_id.clone()).or_default();
|
||||
turn_state.thread_id = Some(thread_id);
|
||||
turn_state.num_input_images = Some(num_input_images);
|
||||
turn_state.resolved_config = Some(input);
|
||||
@@ -623,23 +662,27 @@ impl AnalyticsReducer {
|
||||
out: &mut Vec<TrackEventRequest>,
|
||||
) {
|
||||
let turn_id = input.turn_id.clone();
|
||||
let turn_state = self.turns.entry(turn_id.clone()).or_insert(TurnState {
|
||||
connection_id: None,
|
||||
thread_id: None,
|
||||
num_input_images: None,
|
||||
resolved_config: None,
|
||||
started_at: None,
|
||||
token_usage: None,
|
||||
completed: None,
|
||||
latest_diff: None,
|
||||
steer_count: 0,
|
||||
tool_counts: TurnToolCounts::default(),
|
||||
});
|
||||
let turn_state = self.turns.entry(turn_id.clone()).or_default();
|
||||
turn_state.thread_id = Some(input.thread_id);
|
||||
turn_state.token_usage = Some(input.token_usage);
|
||||
self.maybe_emit_turn_event(&turn_id, out).await;
|
||||
}
|
||||
|
||||
async fn ingest_turn_timing_breakdown(
|
||||
&mut self,
|
||||
input: TurnTimingBreakdownFact,
|
||||
out: &mut Vec<TrackEventRequest>,
|
||||
) {
|
||||
let turn_id = input.turn_id.clone();
|
||||
let turn_state = self.turns.entry(turn_id.clone()).or_default();
|
||||
turn_state.thread_id = Some(input.thread_id);
|
||||
turn_state.timing.request_start_delay_ms = input.request_start_delay_ms;
|
||||
turn_state.timing.sampling_duration_ms = Some(input.sampling_duration_ms);
|
||||
turn_state.timing.blocking_tool_critical_path_duration_ms =
|
||||
Some(input.blocking_tool_critical_path_duration_ms);
|
||||
self.maybe_emit_turn_event(&turn_id, out).await;
|
||||
}
|
||||
|
||||
async fn ingest_skill_invoked(
|
||||
&mut self,
|
||||
input: SkillInvokedInput,
|
||||
@@ -788,18 +831,7 @@ impl AnalyticsReducer {
|
||||
else {
|
||||
return;
|
||||
};
|
||||
let turn_state = self.turns.entry(turn_id.clone()).or_insert(TurnState {
|
||||
connection_id: None,
|
||||
thread_id: None,
|
||||
num_input_images: None,
|
||||
resolved_config: None,
|
||||
started_at: None,
|
||||
token_usage: None,
|
||||
completed: None,
|
||||
latest_diff: None,
|
||||
steer_count: 0,
|
||||
tool_counts: TurnToolCounts::default(),
|
||||
});
|
||||
let turn_state = self.turns.entry(turn_id.clone()).or_default();
|
||||
turn_state.connection_id = Some(connection_id);
|
||||
turn_state.thread_id = Some(pending_request.thread_id);
|
||||
turn_state.num_input_images = Some(pending_request.num_input_images);
|
||||
@@ -1147,58 +1179,19 @@ impl AnalyticsReducer {
|
||||
self.ingest_guardian_review_completed(notification, out);
|
||||
}
|
||||
ServerNotification::TurnStarted(notification) => {
|
||||
let turn_state = self.turns.entry(notification.turn.id).or_insert(TurnState {
|
||||
connection_id: None,
|
||||
thread_id: None,
|
||||
num_input_images: None,
|
||||
resolved_config: None,
|
||||
started_at: None,
|
||||
token_usage: None,
|
||||
completed: None,
|
||||
latest_diff: None,
|
||||
steer_count: 0,
|
||||
tool_counts: TurnToolCounts::default(),
|
||||
});
|
||||
let turn_state = self.turns.entry(notification.turn.id).or_default();
|
||||
turn_state.started_at = notification
|
||||
.turn
|
||||
.started_at
|
||||
.and_then(|started_at| u64::try_from(started_at).ok());
|
||||
}
|
||||
ServerNotification::TurnDiffUpdated(notification) => {
|
||||
let turn_state =
|
||||
self.turns
|
||||
.entry(notification.turn_id.clone())
|
||||
.or_insert(TurnState {
|
||||
connection_id: None,
|
||||
thread_id: None,
|
||||
num_input_images: None,
|
||||
resolved_config: None,
|
||||
started_at: None,
|
||||
token_usage: None,
|
||||
completed: None,
|
||||
latest_diff: None,
|
||||
steer_count: 0,
|
||||
tool_counts: TurnToolCounts::default(),
|
||||
});
|
||||
let turn_state = self.turns.entry(notification.turn_id.clone()).or_default();
|
||||
turn_state.thread_id = Some(notification.thread_id);
|
||||
turn_state.latest_diff = Some(notification.diff);
|
||||
}
|
||||
ServerNotification::TurnCompleted(notification) => {
|
||||
let turn_state =
|
||||
self.turns
|
||||
.entry(notification.turn.id.clone())
|
||||
.or_insert(TurnState {
|
||||
connection_id: None,
|
||||
thread_id: None,
|
||||
num_input_images: None,
|
||||
resolved_config: None,
|
||||
started_at: None,
|
||||
token_usage: None,
|
||||
completed: None,
|
||||
latest_diff: None,
|
||||
steer_count: 0,
|
||||
tool_counts: TurnToolCounts::default(),
|
||||
});
|
||||
let turn_state = self.turns.entry(notification.turn.id.clone()).or_default();
|
||||
turn_state.completed = Some(CompletedTurnState {
|
||||
status: analytics_turn_status(notification.turn.status),
|
||||
turn_error: notification
|
||||
@@ -1240,13 +1233,12 @@ impl AnalyticsReducer {
|
||||
thread.thread_source.map(Into::into),
|
||||
initialization_mode,
|
||||
);
|
||||
self.threads.insert(
|
||||
thread_id.clone(),
|
||||
ThreadAnalyticsState {
|
||||
connection_id: Some(connection_id),
|
||||
metadata: Some(thread_metadata.clone()),
|
||||
},
|
||||
);
|
||||
let thread_state = self.threads.entry(thread_id.clone()).or_default();
|
||||
thread_state.connection_id = Some(connection_id);
|
||||
thread_state.metadata = Some(thread_metadata.clone());
|
||||
let thread_start_duration_ms = matches!(initialization_mode, ThreadInitializationMode::New)
|
||||
.then_some(thread_state.thread_start_duration_ms)
|
||||
.flatten();
|
||||
out.push(TrackEventRequest::ThreadInitialized(
|
||||
ThreadInitializedEvent {
|
||||
event_type: "codex_thread_initialized",
|
||||
@@ -1260,6 +1252,7 @@ impl AnalyticsReducer {
|
||||
initialization_mode,
|
||||
subagent_source: thread_metadata.subagent_source.clone(),
|
||||
parent_thread_id: thread_metadata.parent_thread_id,
|
||||
thread_start_duration_ms,
|
||||
created_at: u64::try_from(thread.created_at).unwrap_or_default(),
|
||||
},
|
||||
},
|
||||
@@ -1403,6 +1396,7 @@ impl AnalyticsReducer {
|
||||
completed_at_ms: u64,
|
||||
out: &mut Vec<TrackEventRequest>,
|
||||
) {
|
||||
let duration_ms = observed_duration_ms(pending_review.started_at_ms, completed_at_ms);
|
||||
if let Some(item_key) = item_review_summary_key(&pending_review) {
|
||||
self.record_item_review_summary(
|
||||
item_key,
|
||||
@@ -1412,6 +1406,16 @@ impl AnalyticsReducer {
|
||||
&pending_review,
|
||||
);
|
||||
}
|
||||
if let Some(duration_ms) = duration_ms {
|
||||
let turn_state = self
|
||||
.turns
|
||||
.entry(pending_review.turn_id.clone())
|
||||
.or_default();
|
||||
turn_state.timing.approval_wait_duration_ms = turn_state
|
||||
.timing
|
||||
.approval_wait_duration_ms
|
||||
.saturating_add(duration_ms);
|
||||
}
|
||||
let Some((connection_state, thread_metadata)) =
|
||||
self.thread_context_or_warn(AnalyticsDropSite::review(&pending_review))
|
||||
else {
|
||||
@@ -1437,7 +1441,7 @@ impl AnalyticsReducer {
|
||||
resolution,
|
||||
started_at_ms: pending_review.started_at_ms,
|
||||
completed_at_ms,
|
||||
duration_ms: observed_duration_ms(pending_review.started_at_ms, completed_at_ms),
|
||||
duration_ms,
|
||||
},
|
||||
}));
|
||||
}
|
||||
@@ -2445,6 +2449,26 @@ fn codex_turn_event_params(
|
||||
is_first_turn,
|
||||
} = resolved_config;
|
||||
let token_usage = turn_state.token_usage.clone();
|
||||
let timing = &turn_state.timing;
|
||||
let has_turn_timing = timing.request_start_delay_ms.is_some()
|
||||
|| timing.sampling_duration_ms.is_some()
|
||||
|| timing.blocking_tool_critical_path_duration_ms.is_some();
|
||||
let approval_wait_duration_ms = has_turn_timing.then_some(timing.approval_wait_duration_ms);
|
||||
let finalize_duration_ms = completed.duration_ms.and_then(|duration_ms| {
|
||||
has_turn_timing.then_some(
|
||||
duration_ms.saturating_sub(
|
||||
timing
|
||||
.request_start_delay_ms
|
||||
.unwrap_or_default()
|
||||
.saturating_add(timing.sampling_duration_ms.unwrap_or_default())
|
||||
.saturating_add(
|
||||
timing
|
||||
.blocking_tool_critical_path_duration_ms
|
||||
.unwrap_or_default(),
|
||||
),
|
||||
),
|
||||
)
|
||||
});
|
||||
CodexTurnEventParams {
|
||||
thread_id,
|
||||
turn_id,
|
||||
@@ -2501,6 +2525,11 @@ fn codex_turn_event_params(
|
||||
.as_ref()
|
||||
.map(|token_usage| token_usage.total_tokens),
|
||||
duration_ms: completed.duration_ms,
|
||||
request_start_delay_ms: timing.request_start_delay_ms,
|
||||
sampling_duration_ms: timing.sampling_duration_ms,
|
||||
blocking_tool_critical_path_duration_ms: timing.blocking_tool_critical_path_duration_ms,
|
||||
approval_wait_duration_ms,
|
||||
finalize_duration_ms,
|
||||
started_at,
|
||||
completed_at: Some(completed.completed_at),
|
||||
}
|
||||
|
||||
@@ -41,6 +41,7 @@ use crate::transport::start_remote_control;
|
||||
use crate::transport::start_stdio_connection;
|
||||
use crate::transport::start_websocket_acceptor;
|
||||
use codex_analytics::AppServerRpcTransport;
|
||||
use codex_analytics::StartedTimer;
|
||||
use codex_app_server_protocol::ConfigLayerSource;
|
||||
use codex_app_server_protocol::ConfigWarningNotification;
|
||||
use codex_app_server_protocol::JSONRPCMessage;
|
||||
@@ -428,6 +429,7 @@ pub async fn run_main_with_transport_options(
|
||||
auth: AppServerWebsocketAuthSettings,
|
||||
runtime_options: AppServerRuntimeOptions,
|
||||
) -> IoResult<()> {
|
||||
let app_server_start_timer = StartedTimer::start();
|
||||
let (transport_event_tx, mut transport_event_rx) =
|
||||
mpsc::channel::<TransportEvent>(CHANNEL_CAPACITY);
|
||||
let (outgoing_tx, mut outgoing_rx) = mpsc::channel::<OutgoingEnvelope>(CHANNEL_CAPACITY);
|
||||
@@ -698,6 +700,9 @@ pub async fn run_main_with_transport_options(
|
||||
|
||||
let auth_manager =
|
||||
AuthManager::shared_from_config(&config, /*enable_codex_api_key_env*/ false).await;
|
||||
let analytics_events_client =
|
||||
analytics_events_client_from_config(Arc::clone(&auth_manager), &config);
|
||||
let analytics_transport = analytics_rpc_transport(&transport);
|
||||
|
||||
let remote_control_requested = runtime_options.remote_control_enabled;
|
||||
let remote_control_enabled = remote_control_requested && state_db.is_some();
|
||||
@@ -786,10 +791,8 @@ pub async fn run_main_with_transport_options(
|
||||
});
|
||||
|
||||
let processor_handle = tokio::spawn({
|
||||
let auth_manager =
|
||||
AuthManager::shared_from_config(&config, /*enable_codex_api_key_env*/ false).await;
|
||||
let analytics_events_client =
|
||||
analytics_events_client_from_config(Arc::clone(&auth_manager), &config);
|
||||
let auth_manager = Arc::clone(&auth_manager);
|
||||
let analytics_events_client = analytics_events_client.clone();
|
||||
let outgoing_message_sender = Arc::new(OutgoingMessageSender::new(
|
||||
outgoing_tx,
|
||||
analytics_events_client.clone(),
|
||||
@@ -810,7 +813,7 @@ pub async fn run_main_with_transport_options(
|
||||
session_source,
|
||||
auth_manager,
|
||||
installation_id,
|
||||
rpc_transport: analytics_rpc_transport(&transport),
|
||||
rpc_transport: analytics_transport,
|
||||
remote_control_handle: Some(remote_control_handle.clone()),
|
||||
plugin_startup_tasks: runtime_options.plugin_startup_tasks,
|
||||
}));
|
||||
@@ -1066,6 +1069,7 @@ pub async fn run_main_with_transport_options(
|
||||
info!("processor task exited (channel closed)");
|
||||
}
|
||||
});
|
||||
analytics_events_client.track_app_server_started(analytics_transport, app_server_start_timer);
|
||||
|
||||
drop(transport_event_tx);
|
||||
|
||||
|
||||
@@ -410,6 +410,7 @@ impl MessageProcessor {
|
||||
auth_manager.clone(),
|
||||
Arc::clone(&thread_manager),
|
||||
outgoing.clone(),
|
||||
analytics_events_client.clone(),
|
||||
arg0_paths.clone(),
|
||||
Arc::clone(&config),
|
||||
config_manager.clone(),
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
use super::*;
|
||||
use crate::error_code::method_not_found;
|
||||
use codex_analytics::StartedTimer;
|
||||
use codex_protocol::models::BUILT_IN_PERMISSION_PROFILE_DANGER_FULL_ACCESS;
|
||||
use codex_protocol::models::BUILT_IN_PERMISSION_PROFILE_WORKSPACE;
|
||||
|
||||
@@ -327,6 +328,7 @@ pub(crate) struct ThreadRequestProcessor {
|
||||
pub(super) auth_manager: Arc<AuthManager>,
|
||||
pub(super) thread_manager: Arc<ThreadManager>,
|
||||
pub(super) outgoing: Arc<OutgoingMessageSender>,
|
||||
pub(super) analytics_events_client: AnalyticsEventsClient,
|
||||
pub(super) arg0_paths: Arg0DispatchPaths,
|
||||
pub(super) config: Arc<Config>,
|
||||
pub(super) config_manager: ConfigManager,
|
||||
@@ -347,6 +349,7 @@ impl ThreadRequestProcessor {
|
||||
auth_manager: Arc<AuthManager>,
|
||||
thread_manager: Arc<ThreadManager>,
|
||||
outgoing: Arc<OutgoingMessageSender>,
|
||||
analytics_events_client: AnalyticsEventsClient,
|
||||
arg0_paths: Arg0DispatchPaths,
|
||||
config: Arc<Config>,
|
||||
config_manager: ConfigManager,
|
||||
@@ -363,6 +366,7 @@ impl ThreadRequestProcessor {
|
||||
auth_manager,
|
||||
thread_manager,
|
||||
outgoing,
|
||||
analytics_events_client,
|
||||
arg0_paths,
|
||||
config,
|
||||
config_manager,
|
||||
@@ -880,11 +884,13 @@ impl ThreadRequestProcessor {
|
||||
};
|
||||
let request_trace = request_context.request_trace();
|
||||
let config_manager = self.config_manager.clone();
|
||||
let analytics_events_client = self.analytics_events_client.clone();
|
||||
let outgoing = Arc::clone(&listener_task_context.outgoing);
|
||||
let error_request_id = request_id.clone();
|
||||
let thread_start_task = async move {
|
||||
if let Err(error) = Self::thread_start_task(
|
||||
listener_task_context,
|
||||
analytics_events_client,
|
||||
config_manager,
|
||||
request_id,
|
||||
app_server_client_name,
|
||||
@@ -969,6 +975,7 @@ impl ThreadRequestProcessor {
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
async fn thread_start_task(
|
||||
listener_task_context: ListenerTaskContext,
|
||||
analytics_events_client: AnalyticsEventsClient,
|
||||
config_manager: ConfigManager,
|
||||
request_id: ConnectionRequestId,
|
||||
app_server_client_name: Option<String>,
|
||||
@@ -983,7 +990,7 @@ impl ThreadRequestProcessor {
|
||||
experimental_raw_events: bool,
|
||||
request_trace: Option<W3cTraceContext>,
|
||||
) -> Result<(), JSONRPCErrorError> {
|
||||
let thread_start_started_at = std::time::Instant::now();
|
||||
let thread_start_timer = StartedTimer::start();
|
||||
let requested_cwd = typesafe_overrides.cwd.clone();
|
||||
let mut config = config_manager
|
||||
.load_with_overrides(config_overrides.clone(), typesafe_overrides.clone())
|
||||
@@ -1202,6 +1209,7 @@ impl ThreadRequestProcessor {
|
||||
active_permission_profile,
|
||||
reasoning_effort: config_snapshot.reasoning_effort,
|
||||
};
|
||||
analytics_events_client.track_thread_start_timing(thread.id.clone(), thread_start_timer);
|
||||
let notif = thread_started_notification(thread);
|
||||
listener_task_context
|
||||
.outgoing
|
||||
@@ -1222,7 +1230,7 @@ impl ThreadRequestProcessor {
|
||||
.await;
|
||||
session_telemetry.record_startup_phase(
|
||||
"thread_start_total",
|
||||
thread_start_started_at.elapsed(),
|
||||
thread_start_timer.elapsed(),
|
||||
Some("ready"),
|
||||
);
|
||||
Ok(())
|
||||
|
||||
@@ -1733,7 +1733,12 @@ async fn try_run_sampling_request(
|
||||
turn_context.model_info.slug.as_str(),
|
||||
turn_context.provider.info().name.as_str(),
|
||||
);
|
||||
let mut stream = client_session
|
||||
turn_context
|
||||
.turn_timing_state
|
||||
.mark_model_request_started()
|
||||
.await;
|
||||
turn_context.turn_timing_state.mark_sampling_started().await;
|
||||
let stream_result = client_session
|
||||
.stream(
|
||||
prompt,
|
||||
&turn_context.model_info,
|
||||
@@ -1746,7 +1751,24 @@ async fn try_run_sampling_request(
|
||||
)
|
||||
.instrument(trace_span!("stream_request"))
|
||||
.or_cancel(&cancellation_token)
|
||||
.await??;
|
||||
.await;
|
||||
let mut stream = match stream_result {
|
||||
Ok(Ok(stream)) => stream,
|
||||
Ok(Err(err)) => {
|
||||
turn_context
|
||||
.turn_timing_state
|
||||
.mark_sampling_completed()
|
||||
.await;
|
||||
return Err(err);
|
||||
}
|
||||
Err(codex_async_utils::CancelErr::Cancelled) => {
|
||||
turn_context
|
||||
.turn_timing_state
|
||||
.mark_sampling_completed()
|
||||
.await;
|
||||
return Err(CodexErr::TurnAborted);
|
||||
}
|
||||
};
|
||||
let mut in_flight: FuturesOrdered<BoxFuture<'static, CodexResult<ResponseInputItem>>> =
|
||||
FuturesOrdered::new();
|
||||
let mut needs_follow_up = false;
|
||||
@@ -2151,6 +2173,10 @@ async fn try_run_sampling_request(
|
||||
&mut assistant_message_stream_parsers,
|
||||
)
|
||||
.await;
|
||||
turn_context
|
||||
.turn_timing_state
|
||||
.mark_sampling_completed()
|
||||
.await;
|
||||
|
||||
if sess
|
||||
.features
|
||||
@@ -2161,7 +2187,17 @@ async fn try_run_sampling_request(
|
||||
client_session.send_response_processed(response_id).await;
|
||||
}
|
||||
|
||||
drain_in_flight(&mut in_flight, sess.clone(), turn_context.clone()).await?;
|
||||
turn_context
|
||||
.turn_timing_state
|
||||
.mark_blocking_tool_critical_path_started()
|
||||
.await;
|
||||
let drain_in_flight_result =
|
||||
drain_in_flight(&mut in_flight, sess.clone(), turn_context.clone()).await;
|
||||
turn_context
|
||||
.turn_timing_state
|
||||
.mark_blocking_tool_critical_path_completed()
|
||||
.await;
|
||||
drain_in_flight_result?;
|
||||
|
||||
if should_emit_token_count {
|
||||
// A tool call such as request_user_input can intentionally pause the turn. Emit token
|
||||
|
||||
@@ -6,7 +6,6 @@ mod user_shell;
|
||||
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use std::time::Instant;
|
||||
|
||||
use codex_extension_api::ExtensionData;
|
||||
use futures::future::BoxFuture;
|
||||
@@ -33,6 +32,7 @@ use crate::session::turn_context::TurnContext;
|
||||
use crate::state::ActiveTurn;
|
||||
use crate::state::RunningTask;
|
||||
use crate::state::TaskKind;
|
||||
use codex_analytics::TurnTimingBreakdownFact;
|
||||
use codex_analytics::TurnTokenUsageFact;
|
||||
use codex_login::AuthManager;
|
||||
use codex_models_manager::manager::SharedModelsManager;
|
||||
@@ -318,11 +318,7 @@ impl Session {
|
||||
let task: Arc<dyn AnySessionTask> = Arc::new(task);
|
||||
let task_kind = task.kind();
|
||||
let span_name = task.span_name();
|
||||
let started_at = Instant::now();
|
||||
let turn_started_at_unix_ms = turn_context
|
||||
.turn_timing_state
|
||||
.mark_turn_started(started_at)
|
||||
.await;
|
||||
let turn_started_at_unix_ms = turn_context.turn_timing_state.mark_turn_started().await;
|
||||
turn_context
|
||||
.turn_metadata_state
|
||||
.set_turn_started_at_unix_ms(turn_started_at_unix_ms);
|
||||
@@ -767,10 +763,21 @@ impl Session {
|
||||
.turn_timing_state
|
||||
.completed_at_and_duration_ms()
|
||||
.await;
|
||||
let turn_timing_breakdown = turn_context.turn_timing_state.timing_breakdown().await;
|
||||
let time_to_first_token_ms = turn_context
|
||||
.turn_timing_state
|
||||
.time_to_first_token_ms()
|
||||
.await;
|
||||
self.services
|
||||
.analytics_events_client
|
||||
.track_turn_timing(TurnTimingBreakdownFact {
|
||||
turn_id: turn_context.sub_id.clone(),
|
||||
thread_id: self.conversation_id.to_string(),
|
||||
request_start_delay_ms: turn_timing_breakdown.request_start_delay_ms,
|
||||
sampling_duration_ms: turn_timing_breakdown.sampling_duration_ms,
|
||||
blocking_tool_critical_path_duration_ms: turn_timing_breakdown
|
||||
.blocking_tool_critical_path_duration_ms,
|
||||
});
|
||||
if should_clear_active_turn {
|
||||
self.emit_turn_stop_lifecycle(turn_context.extension_data.as_ref())
|
||||
.await;
|
||||
|
||||
@@ -311,6 +311,7 @@ pub struct ExecCommandToolOutput {
|
||||
pub wall_time: Duration,
|
||||
/// Raw bytes returned for this unified exec call before any truncation.
|
||||
pub raw_output: Vec<u8>,
|
||||
pub truncation_policy: TruncationPolicy,
|
||||
pub max_output_tokens: Option<usize>,
|
||||
pub process_id: Option<i32>,
|
||||
pub exit_code: Option<i32>,
|
||||
@@ -357,7 +358,9 @@ impl ToolOutput for ExecCommandToolOutput {
|
||||
return None;
|
||||
}
|
||||
|
||||
Some(JsonValue::String(self.truncated_output()))
|
||||
Some(JsonValue::String(
|
||||
self.truncated_output(self.model_output_max_tokens()),
|
||||
))
|
||||
}
|
||||
|
||||
fn code_mode_result(&self, _payload: &ToolPayload) -> JsonValue {
|
||||
@@ -381,7 +384,10 @@ impl ToolOutput for ExecCommandToolOutput {
|
||||
exit_code: self.exit_code,
|
||||
session_id: self.process_id,
|
||||
original_token_count: self.original_token_count,
|
||||
output: self.truncated_output(),
|
||||
output: match self.max_output_tokens {
|
||||
Some(max_tokens) => self.truncated_output(max_tokens),
|
||||
None => String::from_utf8_lossy(&self.raw_output).to_string(),
|
||||
},
|
||||
};
|
||||
|
||||
serde_json::to_value(result).unwrap_or_else(|err| {
|
||||
@@ -391,9 +397,12 @@ impl ToolOutput for ExecCommandToolOutput {
|
||||
}
|
||||
|
||||
impl ExecCommandToolOutput {
|
||||
pub(crate) fn truncated_output(&self) -> String {
|
||||
fn model_output_max_tokens(&self) -> usize {
|
||||
resolve_max_tokens(self.max_output_tokens).min(self.truncation_policy.token_budget())
|
||||
}
|
||||
|
||||
pub(crate) fn truncated_output(&self, max_tokens: usize) -> String {
|
||||
let text = String::from_utf8_lossy(&self.raw_output).to_string();
|
||||
let max_tokens = resolve_max_tokens(self.max_output_tokens);
|
||||
formatted_truncate_text(&text, TruncationPolicy::Tokens(max_tokens))
|
||||
}
|
||||
|
||||
@@ -420,7 +429,7 @@ impl ExecCommandToolOutput {
|
||||
}
|
||||
|
||||
sections.push("Output:".to_string());
|
||||
sections.push(self.truncated_output());
|
||||
sections.push(self.truncated_output(self.model_output_max_tokens()));
|
||||
|
||||
sections.join("\n")
|
||||
}
|
||||
|
||||
@@ -429,6 +429,7 @@ fn exec_command_tool_output_formats_truncated_response() {
|
||||
chunk_id: "abc123".to_string(),
|
||||
wall_time: std::time::Duration::from_millis(1250),
|
||||
raw_output: b"token one token two token three token four token five".to_vec(),
|
||||
truncation_policy: TruncationPolicy::Tokens(10_000),
|
||||
max_output_tokens: Some(4),
|
||||
process_id: None,
|
||||
exit_code: Some(0),
|
||||
|
||||
@@ -7,10 +7,8 @@ use crate::tools::context::ToolOutput;
|
||||
use crate::tools::context::ToolPayload;
|
||||
use crate::tools::hook_names::HookToolName;
|
||||
use crate::tools::registry::PostToolUsePayload;
|
||||
use crate::unified_exec::resolve_max_tokens;
|
||||
use codex_protocol::models::AdditionalPermissionProfile;
|
||||
use codex_tools::UnifiedExecShellMode;
|
||||
use codex_utils_output_truncation::TruncationPolicy;
|
||||
use serde::Deserialize;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
@@ -72,13 +70,6 @@ fn default_tty() -> bool {
|
||||
false
|
||||
}
|
||||
|
||||
fn effective_max_output_tokens(
|
||||
max_output_tokens: Option<usize>,
|
||||
truncation_policy: TruncationPolicy,
|
||||
) -> usize {
|
||||
resolve_max_tokens(max_output_tokens).min(truncation_policy.token_budget())
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct ResolvedCommand {
|
||||
pub(crate) command: Vec<String>,
|
||||
|
||||
@@ -36,7 +36,6 @@ use super::super::shell_spec::CommandToolOptions;
|
||||
use super::super::shell_spec::create_exec_command_tool_with_environment_id;
|
||||
use super::ExecCommandArgs;
|
||||
use super::ExecCommandEnvironmentArgs;
|
||||
use super::effective_max_output_tokens;
|
||||
use super::get_command;
|
||||
use super::post_unified_exec_tool_use_payload;
|
||||
|
||||
@@ -162,8 +161,6 @@ impl ToolExecutor<ToolInvocation> for ExecCommandHandler {
|
||||
prefix_rule,
|
||||
..
|
||||
} = args;
|
||||
let max_output_tokens =
|
||||
effective_max_output_tokens(max_output_tokens, turn.truncation_policy);
|
||||
|
||||
let exec_permission_approvals_enabled =
|
||||
session.features().enabled(Feature::ExecPermissionApprovals);
|
||||
@@ -241,7 +238,8 @@ impl ToolExecutor<ToolInvocation> for ExecCommandHandler {
|
||||
chunk_id: String::new(),
|
||||
wall_time: std::time::Duration::ZERO,
|
||||
raw_output: output.into_text().into_bytes(),
|
||||
max_output_tokens: Some(max_output_tokens),
|
||||
truncation_policy: turn.truncation_policy,
|
||||
max_output_tokens,
|
||||
process_id: None,
|
||||
exit_code: None,
|
||||
original_token_count: None,
|
||||
@@ -258,7 +256,7 @@ impl ToolExecutor<ToolInvocation> for ExecCommandHandler {
|
||||
hook_command: hook_command.clone(),
|
||||
process_id,
|
||||
yield_time_ms,
|
||||
max_output_tokens: Some(max_output_tokens),
|
||||
max_output_tokens,
|
||||
cwd,
|
||||
sandbox_cwd: turn_environment.cwd.clone(),
|
||||
environment,
|
||||
@@ -284,7 +282,8 @@ impl ToolExecutor<ToolInvocation> for ExecCommandHandler {
|
||||
chunk_id: generate_chunk_id(),
|
||||
wall_time: output.duration,
|
||||
raw_output: output_text.into_bytes(),
|
||||
max_output_tokens: Some(max_output_tokens),
|
||||
truncation_policy: turn.truncation_policy,
|
||||
max_output_tokens,
|
||||
// Sandbox denial is terminal, so there is no live
|
||||
// process for write_stdin to resume.
|
||||
process_id: None,
|
||||
|
||||
@@ -14,7 +14,6 @@ use codex_tools::ToolSpec;
|
||||
use serde::Deserialize;
|
||||
|
||||
use super::super::shell_spec::create_write_stdin_tool;
|
||||
use super::effective_max_output_tokens;
|
||||
use super::post_unified_exec_tool_use_payload;
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
@@ -62,8 +61,6 @@ impl ToolExecutor<ToolInvocation> for WriteStdinHandler {
|
||||
};
|
||||
|
||||
let args: WriteStdinArgs = parse_arguments(&arguments)?;
|
||||
let max_output_tokens =
|
||||
effective_max_output_tokens(args.max_output_tokens, turn.truncation_policy);
|
||||
let response = session
|
||||
.services
|
||||
.unified_exec_manager
|
||||
@@ -71,7 +68,8 @@ impl ToolExecutor<ToolInvocation> for WriteStdinHandler {
|
||||
process_id: args.session_id,
|
||||
input: &args.chars,
|
||||
yield_time_ms: args.yield_time_ms,
|
||||
max_output_tokens: Some(max_output_tokens),
|
||||
max_output_tokens: args.max_output_tokens,
|
||||
truncation_policy: turn.truncation_policy,
|
||||
})
|
||||
.await
|
||||
.map_err(|err| {
|
||||
|
||||
@@ -4,6 +4,7 @@ use crate::shell::default_user_shell;
|
||||
use codex_tools::UnifiedExecShellMode;
|
||||
use codex_tools::ZshForkConfig;
|
||||
use codex_utils_absolute_path::AbsolutePathBuf;
|
||||
use codex_utils_output_truncation::TruncationPolicy;
|
||||
use pretty_assertions::assert_eq;
|
||||
use std::sync::Arc;
|
||||
|
||||
@@ -17,6 +18,8 @@ use crate::tools::registry::CoreToolRuntime;
|
||||
use crate::turn_diff_tracker::TurnDiffTracker;
|
||||
use tokio::sync::Mutex;
|
||||
|
||||
const TEST_TRUNCATION_POLICY: TruncationPolicy = TruncationPolicy::Tokens(10_000);
|
||||
|
||||
async fn invocation_for_payload(
|
||||
tool_name: &str,
|
||||
call_id: &str,
|
||||
@@ -258,6 +261,7 @@ async fn exec_command_post_tool_use_payload_uses_output_for_noninteractive_one_s
|
||||
chunk_id: "chunk-1".to_string(),
|
||||
wall_time: std::time::Duration::from_millis(498),
|
||||
raw_output: b"three".to_vec(),
|
||||
truncation_policy: TEST_TRUNCATION_POLICY,
|
||||
max_output_tokens: None,
|
||||
process_id: None,
|
||||
exit_code: Some(0),
|
||||
@@ -287,6 +291,7 @@ async fn exec_command_post_tool_use_payload_uses_output_for_interactive_completi
|
||||
chunk_id: "chunk-1".to_string(),
|
||||
wall_time: std::time::Duration::from_millis(498),
|
||||
raw_output: b"three".to_vec(),
|
||||
truncation_policy: TEST_TRUNCATION_POLICY,
|
||||
max_output_tokens: None,
|
||||
process_id: None,
|
||||
exit_code: Some(0),
|
||||
@@ -317,6 +322,7 @@ async fn exec_command_post_tool_use_payload_skips_running_sessions() {
|
||||
chunk_id: "chunk-1".to_string(),
|
||||
wall_time: std::time::Duration::from_millis(498),
|
||||
raw_output: b"three".to_vec(),
|
||||
truncation_policy: TEST_TRUNCATION_POLICY,
|
||||
max_output_tokens: None,
|
||||
process_id: Some(45),
|
||||
exit_code: None,
|
||||
@@ -342,6 +348,7 @@ async fn write_stdin_post_tool_use_payload_uses_original_exec_call_id_and_comman
|
||||
chunk_id: "chunk-2".to_string(),
|
||||
wall_time: std::time::Duration::from_millis(498),
|
||||
raw_output: b"finished\n".to_vec(),
|
||||
truncation_policy: TEST_TRUNCATION_POLICY,
|
||||
max_output_tokens: None,
|
||||
process_id: None,
|
||||
exit_code: Some(0),
|
||||
@@ -372,6 +379,7 @@ async fn write_stdin_post_tool_use_payload_keeps_parallel_session_metadata_separ
|
||||
chunk_id: "chunk-a".to_string(),
|
||||
wall_time: std::time::Duration::from_millis(498),
|
||||
raw_output: b"alpha\n".to_vec(),
|
||||
truncation_policy: TEST_TRUNCATION_POLICY,
|
||||
max_output_tokens: None,
|
||||
process_id: None,
|
||||
exit_code: Some(0),
|
||||
@@ -383,6 +391,7 @@ async fn write_stdin_post_tool_use_payload_keeps_parallel_session_metadata_separ
|
||||
chunk_id: "chunk-b".to_string(),
|
||||
wall_time: std::time::Duration::from_millis(498),
|
||||
raw_output: b"beta\n".to_vec(),
|
||||
truncation_policy: TEST_TRUNCATION_POLICY,
|
||||
max_output_tokens: None,
|
||||
process_id: None,
|
||||
exit_code: Some(0),
|
||||
|
||||
@@ -36,6 +36,13 @@ pub(crate) async fn record_turn_ttfm_metric(turn_context: &TurnContext, item: &T
|
||||
.record_duration(TURN_TTFM_DURATION_METRIC, duration, &[]);
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
|
||||
pub(crate) struct TurnTimingBreakdown {
|
||||
pub(crate) request_start_delay_ms: Option<u64>,
|
||||
pub(crate) sampling_duration_ms: u64,
|
||||
pub(crate) blocking_tool_critical_path_duration_ms: u64,
|
||||
}
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
pub(crate) struct TurnTimingState {
|
||||
state: Mutex<TurnTimingStateInner>,
|
||||
@@ -47,16 +54,27 @@ struct TurnTimingStateInner {
|
||||
started_at_unix_secs: Option<i64>,
|
||||
first_token_at: Option<Instant>,
|
||||
first_message_at: Option<Instant>,
|
||||
first_request_started_at: Option<Instant>,
|
||||
sampling_started_at: Option<Instant>,
|
||||
sampling_duration: Duration,
|
||||
blocking_tool_critical_path_started_at: Option<Instant>,
|
||||
blocking_tool_critical_path_duration: Duration,
|
||||
}
|
||||
|
||||
impl TurnTimingState {
|
||||
pub(crate) async fn mark_turn_started(&self, started_at: Instant) -> i64 {
|
||||
pub(crate) async fn mark_turn_started(&self) -> i64 {
|
||||
let started_at = Instant::now();
|
||||
let started_at_unix_ms = now_unix_timestamp_ms();
|
||||
let mut state = self.state.lock().await;
|
||||
state.started_at = Some(started_at);
|
||||
state.started_at_unix_secs = Some(started_at_unix_ms / 1000);
|
||||
state.first_token_at = None;
|
||||
state.first_message_at = None;
|
||||
state.first_request_started_at = None;
|
||||
state.sampling_started_at = None;
|
||||
state.sampling_duration = Duration::default();
|
||||
state.blocking_tool_critical_path_started_at = None;
|
||||
state.blocking_tool_critical_path_duration = Duration::default();
|
||||
started_at_unix_ms
|
||||
}
|
||||
|
||||
@@ -98,6 +116,54 @@ impl TurnTimingState {
|
||||
let mut state = self.state.lock().await;
|
||||
state.record_turn_ttfm()
|
||||
}
|
||||
|
||||
pub(crate) async fn mark_model_request_started(&self) {
|
||||
let mut state = self.state.lock().await;
|
||||
if state.first_request_started_at.is_none() {
|
||||
state.first_request_started_at = Some(Instant::now());
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn mark_sampling_started(&self) {
|
||||
let mut state = self.state.lock().await;
|
||||
if state.sampling_started_at.is_none() {
|
||||
state.sampling_started_at = Some(Instant::now());
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn mark_sampling_completed(&self) {
|
||||
let mut state = self.state.lock().await;
|
||||
if let Some(started_at) = state.sampling_started_at.take() {
|
||||
state.sampling_duration = state.sampling_duration.saturating_add(started_at.elapsed());
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn mark_blocking_tool_critical_path_started(&self) {
|
||||
let mut state = self.state.lock().await;
|
||||
if state.blocking_tool_critical_path_started_at.is_none() {
|
||||
state.blocking_tool_critical_path_started_at = Some(Instant::now());
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn mark_blocking_tool_critical_path_completed(&self) {
|
||||
let mut state = self.state.lock().await;
|
||||
if let Some(started_at) = state.blocking_tool_critical_path_started_at.take() {
|
||||
state.blocking_tool_critical_path_duration = state
|
||||
.blocking_tool_critical_path_duration
|
||||
.saturating_add(started_at.elapsed());
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn timing_breakdown(&self) -> TurnTimingBreakdown {
|
||||
let state = self.state.lock().await;
|
||||
TurnTimingBreakdown {
|
||||
request_start_delay_ms: state.request_start_delay_ms(),
|
||||
sampling_duration_ms: duration_to_u64_millis(state.sampling_duration),
|
||||
blocking_tool_critical_path_duration_ms: duration_to_u64_millis(
|
||||
state.blocking_tool_critical_path_duration,
|
||||
),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn now_unix_timestamp_secs() -> i64 {
|
||||
@@ -112,6 +178,13 @@ pub(crate) fn now_unix_timestamp_ms() -> i64 {
|
||||
}
|
||||
|
||||
impl TurnTimingStateInner {
|
||||
fn request_start_delay_ms(&self) -> Option<u64> {
|
||||
let duration = self
|
||||
.first_request_started_at?
|
||||
.duration_since(self.started_at?);
|
||||
Some(duration_to_u64_millis(duration))
|
||||
}
|
||||
|
||||
fn time_to_first_token(&self) -> Option<Duration> {
|
||||
Some(self.first_token_at?.duration_since(self.started_at?))
|
||||
}
|
||||
@@ -136,6 +209,10 @@ impl TurnTimingStateInner {
|
||||
}
|
||||
}
|
||||
|
||||
fn duration_to_u64_millis(duration: Duration) -> u64 {
|
||||
u64::try_from(duration.as_millis()).unwrap_or(u64::MAX)
|
||||
}
|
||||
|
||||
fn response_event_records_turn_ttft(event: &ResponseEvent) -> bool {
|
||||
match event {
|
||||
ResponseEvent::OutputItemDone(item) | ResponseEvent::OutputItemAdded(item) => {
|
||||
|
||||
@@ -4,9 +4,10 @@ use codex_protocol::models::ContentItem;
|
||||
use codex_protocol::models::FunctionCallOutputPayload;
|
||||
use codex_protocol::models::ResponseItem;
|
||||
use pretty_assertions::assert_eq;
|
||||
use std::time::Instant;
|
||||
use std::time::Duration;
|
||||
use std::time::SystemTime;
|
||||
use std::time::UNIX_EPOCH;
|
||||
use tokio::time::sleep;
|
||||
|
||||
use super::TurnTimingState;
|
||||
use super::response_item_records_turn_ttft;
|
||||
@@ -22,7 +23,7 @@ async fn turn_timing_state_records_ttft_only_once_per_turn() {
|
||||
None
|
||||
);
|
||||
|
||||
state.mark_turn_started(Instant::now()).await;
|
||||
state.mark_turn_started().await;
|
||||
assert_eq!(
|
||||
state
|
||||
.record_ttft_for_response_event(&ResponseEvent::Created)
|
||||
@@ -46,7 +47,7 @@ async fn turn_timing_state_records_ttft_only_once_per_turn() {
|
||||
#[tokio::test]
|
||||
async fn turn_timing_state_records_ttfm_independently_of_ttft() {
|
||||
let state = TurnTimingState::default();
|
||||
state.mark_turn_started(Instant::now()).await;
|
||||
state.mark_turn_started().await;
|
||||
|
||||
assert!(
|
||||
state
|
||||
@@ -86,7 +87,7 @@ async fn turn_timing_state_records_turn_started_epoch_millis() {
|
||||
.expect("system time should be after unix epoch")
|
||||
.as_millis();
|
||||
|
||||
let started_at_unix_ms = state.mark_turn_started(Instant::now()).await;
|
||||
let started_at_unix_ms = state.mark_turn_started().await;
|
||||
|
||||
let after = SystemTime::now()
|
||||
.duration_since(UNIX_EPOCH)
|
||||
@@ -99,6 +100,28 @@ async fn turn_timing_state_records_turn_started_epoch_millis() {
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn turn_timing_state_tracks_request_start_and_duration_breakdown() {
|
||||
let state = TurnTimingState::default();
|
||||
state.mark_turn_started().await;
|
||||
state.mark_model_request_started().await;
|
||||
state.mark_sampling_started().await;
|
||||
sleep(Duration::from_millis(10)).await;
|
||||
state.mark_sampling_completed().await;
|
||||
state.mark_sampling_started().await;
|
||||
sleep(Duration::from_millis(5)).await;
|
||||
state.mark_sampling_completed().await;
|
||||
state.mark_blocking_tool_critical_path_started().await;
|
||||
sleep(Duration::from_millis(5)).await;
|
||||
state.mark_blocking_tool_critical_path_completed().await;
|
||||
|
||||
let breakdown = state.timing_breakdown().await;
|
||||
|
||||
assert!(breakdown.sampling_duration_ms >= 15);
|
||||
assert!(breakdown.blocking_tool_critical_path_duration_ms >= 5);
|
||||
assert!(breakdown.request_start_delay_ms.is_some());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn response_item_records_turn_ttft_for_first_output_signals() {
|
||||
assert!(response_item_records_turn_ttft(
|
||||
|
||||
@@ -31,6 +31,7 @@ use codex_exec_server::Environment;
|
||||
use codex_network_proxy::NetworkProxy;
|
||||
use codex_protocol::models::AdditionalPermissionProfile;
|
||||
use codex_utils_absolute_path::AbsolutePathBuf;
|
||||
use codex_utils_output_truncation::TruncationPolicy;
|
||||
use rand::Rng;
|
||||
use rand::rng;
|
||||
use tokio::sync::Mutex;
|
||||
@@ -111,6 +112,7 @@ pub(crate) struct WriteStdinRequest<'a> {
|
||||
pub input: &'a str,
|
||||
pub yield_time_ms: u64,
|
||||
pub max_output_tokens: Option<usize>,
|
||||
pub truncation_policy: TruncationPolicy,
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
|
||||
@@ -10,6 +10,7 @@ use crate::tools::context::ExecCommandToolOutput;
|
||||
use crate::unified_exec::WriteStdinRequest;
|
||||
use crate::unified_exec::process::OutputHandles;
|
||||
use codex_sandboxing::SandboxType;
|
||||
use codex_utils_output_truncation::TruncationPolicy;
|
||||
use codex_utils_output_truncation::approx_token_count;
|
||||
use core_test_support::get_remote_test_env;
|
||||
use core_test_support::skip_if_sandbox;
|
||||
@@ -162,6 +163,7 @@ async fn exec_command_with_tty(
|
||||
chunk_id: generate_chunk_id(),
|
||||
wall_time,
|
||||
raw_output: collected,
|
||||
truncation_policy: turn.truncation_policy,
|
||||
max_output_tokens: None,
|
||||
process_id: response_process_id,
|
||||
exit_code,
|
||||
@@ -195,6 +197,7 @@ async fn write_stdin(
|
||||
input,
|
||||
yield_time_ms,
|
||||
max_output_tokens: None,
|
||||
truncation_policy: TruncationPolicy::Tokens(10_000),
|
||||
})
|
||||
.await
|
||||
}
|
||||
@@ -260,7 +263,9 @@ async fn unified_exec_persists_across_requests() -> anyhow::Result<()> {
|
||||
)
|
||||
.await?;
|
||||
assert!(
|
||||
out_2.truncated_output().contains("codex"),
|
||||
out_2
|
||||
.truncated_output(DEFAULT_MAX_OUTPUT_TOKENS)
|
||||
.contains("codex"),
|
||||
"expected environment variable output"
|
||||
);
|
||||
|
||||
@@ -301,7 +306,9 @@ async fn multi_unified_exec_sessions() -> anyhow::Result<()> {
|
||||
"short command should not report a process id if it exits quickly"
|
||||
);
|
||||
assert!(
|
||||
!out_2.truncated_output().contains("codex"),
|
||||
!out_2
|
||||
.truncated_output(DEFAULT_MAX_OUTPUT_TOKENS)
|
||||
.contains("codex"),
|
||||
"short command should run in a fresh shell"
|
||||
);
|
||||
|
||||
@@ -313,7 +320,9 @@ async fn multi_unified_exec_sessions() -> anyhow::Result<()> {
|
||||
)
|
||||
.await?;
|
||||
assert!(
|
||||
out_3.truncated_output().contains("codex"),
|
||||
out_3
|
||||
.truncated_output(DEFAULT_MAX_OUTPUT_TOKENS)
|
||||
.contains("codex"),
|
||||
"session should preserve state"
|
||||
);
|
||||
|
||||
@@ -350,7 +359,9 @@ async fn unified_exec_timeouts() -> anyhow::Result<()> {
|
||||
)
|
||||
.await?;
|
||||
assert!(
|
||||
!out_2.truncated_output().contains(TEST_VAR_VALUE),
|
||||
!out_2
|
||||
.truncated_output(DEFAULT_MAX_OUTPUT_TOKENS)
|
||||
.contains(TEST_VAR_VALUE),
|
||||
"timeout too short should yield incomplete output"
|
||||
);
|
||||
|
||||
@@ -359,7 +370,9 @@ async fn unified_exec_timeouts() -> anyhow::Result<()> {
|
||||
let out_3 = write_stdin(&session, process_id, "", /*yield_time_ms*/ 100).await?;
|
||||
|
||||
assert!(
|
||||
out_3.truncated_output().contains(TEST_VAR_VALUE),
|
||||
out_3
|
||||
.truncated_output(DEFAULT_MAX_OUTPUT_TOKENS)
|
||||
.contains(TEST_VAR_VALUE),
|
||||
"subsequent poll should retrieve output"
|
||||
);
|
||||
|
||||
@@ -394,7 +407,9 @@ async fn unified_exec_pause_blocks_yield_timeout() -> anyhow::Result<()> {
|
||||
"pause should block the unified exec yield timeout"
|
||||
);
|
||||
assert!(
|
||||
response.truncated_output().contains("unified-exec-done"),
|
||||
response
|
||||
.truncated_output(DEFAULT_MAX_OUTPUT_TOKENS)
|
||||
.contains("unified-exec-done"),
|
||||
"exec_command should wait for output after the pause lifts"
|
||||
);
|
||||
assert!(
|
||||
@@ -420,7 +435,11 @@ async fn requests_with_large_timeout_are_capped() -> anyhow::Result<()> {
|
||||
.await?;
|
||||
|
||||
assert!(result.process_id.is_some());
|
||||
assert!(result.truncated_output().contains("codex"));
|
||||
assert!(
|
||||
result
|
||||
.truncated_output(DEFAULT_MAX_OUTPUT_TOKENS)
|
||||
.contains("codex")
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -442,7 +461,11 @@ async fn completed_commands_do_not_persist_sessions() -> anyhow::Result<()> {
|
||||
result.process_id.is_some(),
|
||||
"completed command should report a process id"
|
||||
);
|
||||
assert!(result.truncated_output().contains("codex"));
|
||||
assert!(
|
||||
result
|
||||
.truncated_output(DEFAULT_MAX_OUTPUT_TOKENS)
|
||||
.contains("codex")
|
||||
);
|
||||
|
||||
assert!(
|
||||
session
|
||||
|
||||
@@ -581,6 +581,7 @@ impl UnifiedExecProcessManager {
|
||||
chunk_id,
|
||||
wall_time,
|
||||
raw_output: collected,
|
||||
truncation_policy: context.turn.truncation_policy,
|
||||
max_output_tokens: request.max_output_tokens,
|
||||
process_id: response_process_id,
|
||||
exit_code,
|
||||
@@ -725,6 +726,7 @@ impl UnifiedExecProcessManager {
|
||||
chunk_id,
|
||||
wall_time,
|
||||
raw_output: collected,
|
||||
truncation_policy: request.truncation_policy,
|
||||
max_output_tokens: request.max_output_tokens,
|
||||
process_id,
|
||||
exit_code,
|
||||
|
||||
@@ -5,6 +5,7 @@ use base64::Engine;
|
||||
use base64::engine::general_purpose::STANDARD as BASE64_STANDARD;
|
||||
use codex_config::types::McpServerConfig;
|
||||
use codex_config::types::McpServerTransportConfig;
|
||||
use codex_core::config::Config;
|
||||
use codex_features::Feature;
|
||||
use codex_login::CodexAuth;
|
||||
use codex_models_manager::bundled_models_response;
|
||||
@@ -144,11 +145,21 @@ async fn run_code_mode_turn(
|
||||
server: &MockServer,
|
||||
prompt: &str,
|
||||
code: &str,
|
||||
) -> Result<(TestCodex, ResponseMock)> {
|
||||
run_code_mode_turn_with_config(server, prompt, code, |_| {}).await
|
||||
}
|
||||
|
||||
async fn run_code_mode_turn_with_config(
|
||||
server: &MockServer,
|
||||
prompt: &str,
|
||||
code: &str,
|
||||
configure: impl FnOnce(&mut Config) + Send + 'static,
|
||||
) -> Result<(TestCodex, ResponseMock)> {
|
||||
let mut builder = test_codex()
|
||||
.with_model("test-gpt-5.1-codex")
|
||||
.with_config(move |config| {
|
||||
let _ = config.features.enable(Feature::CodeMode);
|
||||
configure(config);
|
||||
});
|
||||
let test = builder.build(server).await?;
|
||||
|
||||
@@ -292,8 +303,7 @@ text(JSON.stringify(await tools.exec_command({ cmd: "printf code_mode_exec_marke
|
||||
)
|
||||
.await?;
|
||||
|
||||
let req = second_mock.single_request();
|
||||
let items = custom_tool_output_items(&req, "call-1");
|
||||
let items = custom_tool_output_items(&second_mock.single_request(), "call-1");
|
||||
assert_eq!(items.len(), 2);
|
||||
assert_regex_match(
|
||||
concat!(
|
||||
@@ -645,40 +655,217 @@ text(JSON.stringify(results));
|
||||
|
||||
#[cfg_attr(windows, ignore = "no exec_command on Windows")]
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn code_mode_can_truncate_final_result_with_configured_budget() -> Result<()> {
|
||||
async fn code_mode_exec_command_explicit_max_output_tokens_truncates() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
let server = responses::start_mock_server().await;
|
||||
let (_test, second_mock) = run_code_mode_turn(
|
||||
&server,
|
||||
"use exec to truncate the final result",
|
||||
r#"// @exec: {"max_output_tokens": 6}
|
||||
text(JSON.stringify(await tools.exec_command({
|
||||
cmd: "printf 'token one token two token three token four token five token six token seven'",
|
||||
max_output_tokens: 100
|
||||
})));
|
||||
"use exec_command from code mode",
|
||||
r#"
|
||||
const result = await tools.exec_command({
|
||||
cmd: "printf '0123456789012345678901234567890123456789'",
|
||||
max_output_tokens: 5
|
||||
});
|
||||
text(result.output);
|
||||
"#,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let req = second_mock.single_request();
|
||||
let items = custom_tool_output_items(&req, "call-1");
|
||||
assert_eq!(items.len(), 2);
|
||||
assert_regex_match(
|
||||
concat!(
|
||||
r"(?s)\A",
|
||||
r"Script completed\nWall time \d+\.\d seconds\nOutput:\n\z"
|
||||
assert_eq!(
|
||||
text_item(
|
||||
&custom_tool_output_items(&second_mock.single_request(), "call-1"),
|
||||
/*index*/ 1
|
||||
),
|
||||
text_item(&items, /*index*/ 0),
|
||||
"Total output lines: 1\n\n0123456789…5 tokens truncated…0123456789"
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg_attr(windows, ignore = "no exec_command on Windows")]
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn code_mode_exec_explicit_max_above_default_preserves_output() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
let server = responses::start_mock_server().await;
|
||||
let (_test, second_mock) = run_code_mode_turn(
|
||||
&server,
|
||||
"use exec_command from code mode",
|
||||
r#"// @exec: {"max_output_tokens": 20000}
|
||||
const result = await tools.exec_command({
|
||||
cmd: "python3 -c \"import sys; sys.stdout.write('x' * 50000)\"",
|
||||
max_output_tokens: 20000
|
||||
});
|
||||
text(result.output);
|
||||
"#,
|
||||
)
|
||||
.await?;
|
||||
|
||||
assert_eq!(
|
||||
text_item(
|
||||
&custom_tool_output_items(&second_mock.single_request(), "call-1"),
|
||||
/*index*/ 1
|
||||
),
|
||||
"x".repeat(50_000)
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg_attr(windows, ignore = "no exec_command on Windows")]
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn code_mode_exec_explicit_max_above_default_truncates_larger_output() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
let server = responses::start_mock_server().await;
|
||||
let (_test, second_mock) = run_code_mode_turn(
|
||||
&server,
|
||||
"use exec_command from code mode",
|
||||
r#"// @exec: {"max_output_tokens": 25000}
|
||||
const result = await tools.exec_command({
|
||||
cmd: "python3 -c \"import sys; sys.stdout.write('A' * 90000)\"",
|
||||
max_output_tokens: 20000
|
||||
});
|
||||
text(result.output);
|
||||
"#,
|
||||
)
|
||||
.await?;
|
||||
|
||||
assert_eq!(
|
||||
text_item(
|
||||
&custom_tool_output_items(&second_mock.single_request(), "call-1"),
|
||||
/*index*/ 1
|
||||
),
|
||||
format!(
|
||||
"Total output lines: 1\n\n{}…2500 tokens truncated…{}",
|
||||
"A".repeat(40_000),
|
||||
"A".repeat(40_000)
|
||||
)
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg_attr(windows, ignore = "no exec_command on Windows")]
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn code_mode_exec_explicit_max_above_truncation_policy_preserves_output() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
let server = responses::start_mock_server().await;
|
||||
let (_test, second_mock) = run_code_mode_turn_with_config(
|
||||
&server,
|
||||
"use exec_command from code mode",
|
||||
r#"// @exec: {"max_output_tokens": 20000}
|
||||
const result = await tools.exec_command({
|
||||
cmd: "python3 -c \"import sys; sys.stdout.write('x' * 50000)\"",
|
||||
max_output_tokens: 20000
|
||||
});
|
||||
text(result.output);
|
||||
"#,
|
||||
|config| {
|
||||
config.tool_output_token_limit = Some(50);
|
||||
},
|
||||
)
|
||||
.await?;
|
||||
|
||||
assert_eq!(
|
||||
text_item(
|
||||
&custom_tool_output_items(&second_mock.single_request(), "call-1"),
|
||||
/*index*/ 1
|
||||
),
|
||||
"x".repeat(50_000)
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg_attr(windows, ignore = "no exec_command on Windows")]
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn code_mode_exec_without_max_preserves_output_beyond_default() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
let server = responses::start_mock_server().await;
|
||||
let (_test, second_mock) = run_code_mode_turn(
|
||||
&server,
|
||||
"use exec_command from code mode",
|
||||
r#"// @exec: {"max_output_tokens": 20000}
|
||||
const result = await tools.exec_command({
|
||||
cmd: "python3 -c \"import sys; sys.stdout.write('x' * 50000)\""
|
||||
});
|
||||
text(result.output);
|
||||
"#,
|
||||
)
|
||||
.await?;
|
||||
|
||||
assert_eq!(
|
||||
text_item(
|
||||
&custom_tool_output_items(&second_mock.single_request(), "call-1"),
|
||||
/*index*/ 1
|
||||
),
|
||||
"x".repeat(50_000)
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg_attr(windows, ignore = "no exec_command on Windows")]
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn code_mode_exec_without_max_preserves_output_beyond_truncation_policy() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
let server = responses::start_mock_server().await;
|
||||
let (_test, second_mock) = run_code_mode_turn_with_config(
|
||||
&server,
|
||||
"use exec_command from code mode",
|
||||
r#"// @exec: {"max_output_tokens": 20000}
|
||||
const result = await tools.exec_command({
|
||||
cmd: "python3 -c \"import sys; sys.stdout.write('x' * 50000)\""
|
||||
});
|
||||
text(result.output);
|
||||
"#,
|
||||
|config| {
|
||||
config.tool_output_token_limit = Some(50);
|
||||
},
|
||||
)
|
||||
.await?;
|
||||
|
||||
assert_eq!(
|
||||
text_item(
|
||||
&custom_tool_output_items(&second_mock.single_request(), "call-1"),
|
||||
/*index*/ 1
|
||||
),
|
||||
"x".repeat(50_000)
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg_attr(windows, ignore = "no exec_command on Windows")]
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn code_mode_exec_explicit_max_output_tokens_truncates() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
let server = responses::start_mock_server().await;
|
||||
let (_test, second_mock) = run_code_mode_turn(
|
||||
&server,
|
||||
"use exec_command from code mode",
|
||||
r#"// @exec: {"max_output_tokens": 5}
|
||||
const result = await tools.exec_command({
|
||||
cmd: "printf '0123456789012345678901234567890123456789'"
|
||||
});
|
||||
text(result.output);
|
||||
"#,
|
||||
)
|
||||
.await?;
|
||||
|
||||
assert_eq!(
|
||||
text_item(
|
||||
&custom_tool_output_items(&second_mock.single_request(), "call-1"),
|
||||
/*index*/ 1
|
||||
),
|
||||
"Total output lines: 1\n\n0123456789…5 tokens truncated…0123456789"
|
||||
);
|
||||
let expected_pattern = r#"(?sx)
|
||||
\A
|
||||
Total\ output\ lines:\ 1\n
|
||||
\n
|
||||
.*…\d+\ tokens\ truncated….*
|
||||
\z
|
||||
"#;
|
||||
assert_regex_match(expected_pattern, text_item(&items, /*index*/ 1));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user