mirror of
https://github.com/openai/codex.git
synced 2026-05-18 10:12:59 +00:00
## Why While investigating `codex exec hi` startup latency, the useful questions were not "is startup slow?" but "which durable bucket is slow in production?" The path we observed has a few distinct stages: 1. `thread/start` creates the session 2. startup prewarm builds the turn context, tools, and prompt 3. startup prewarm warms the websocket 4. the first real turn resolves the prewarm 5. the model produces the first token Before this PR, production telemetry had some of the raw measurements already: - aggregate startup-prewarm duration / age-at-first-turn metrics - TTFT as a metric - websocket request telemetry But there was no coherent production event stream for the startup breakdown itself, and TTFT was metric-only. That made it hard to answer the same latency questions from OpenTelemetry-backed logs without adding one-off local instrumentation. ## What changed Add durable production telemetry on the existing `SessionTelemetry` path: - new `codex.startup_phase` OTel log/trace events plus `codex.startup.phase.duration_ms` - new `codex.turn_ttft` OTel log/trace events while preserving the existing TTFT metric The startup phase event is emitted for the coarse buckets we actually observed while running `exec hi`: - `thread_start_create_thread` - `startup_prewarm_total` - `startup_prewarm_create_turn_context` - `startup_prewarm_build_tools` - `startup_prewarm_build_prompt` - `startup_prewarm_websocket_warmup` - `startup_prewarm_resolve` These phases are intentionally low-cardinality so they remain safe as production telemetry tags. ## Why this shape This keeps the instrumentation on the same production path as the rest of the session telemetry instead of adding a local debug-only trace mode. It also avoids changing startup behavior: - prewarm still runs - no control flow changes - no extra remote calls - no user-visible behavior changes One boundary is intentional: very early process bootstrap that happens before a session exists is not included here, because this PR uses session-scoped production telemetry. The expensive buckets we were trying to understand after `thread/start` are now covered durably. ## Verification - `cargo test -p codex-otel` - `cargo test -p codex-core turn_timing` - `cargo test -p codex-core regular_turn_emits_turn_started_without_waiting_for_startup_prewarm` - `cargo test -p codex-core interrupting_regular_turn_waiting_on_startup_prewarm_emits_turn_aborted` - `cargo test -p codex-app-server thread_start` - `just fix -p codex-otel -p codex-core -p codex-app-server` I also ran `cargo test -p codex-core`; it built successfully and then hit an existing unrelated stack overflow in `tools::handlers::multi_agents::tests::tool_handlers_cascade_close_and_resume_and_keep_explicitly_closed_subtrees_closed`.
571 lines
19 KiB
Rust
571 lines
19 KiB
Rust
use crate::agent::AgentStatus;
|
|
use crate::config::ConstraintResult;
|
|
use crate::goals::ExternalGoalSet;
|
|
use crate::goals::GoalRuntimeEvent;
|
|
use crate::session::Codex;
|
|
use crate::session::SessionSettingsUpdate;
|
|
use crate::session::SteerInputError;
|
|
use codex_features::Feature;
|
|
use codex_otel::SessionTelemetry;
|
|
use codex_protocol::config_types::ApprovalsReviewer;
|
|
use codex_protocol::config_types::CollaborationMode;
|
|
use codex_protocol::config_types::Personality;
|
|
use codex_protocol::config_types::ReasoningSummary;
|
|
use codex_protocol::config_types::WindowsSandboxLevel;
|
|
use codex_protocol::error::CodexErr;
|
|
use codex_protocol::error::Result as CodexResult;
|
|
use codex_protocol::mcp::CallToolResult;
|
|
use codex_protocol::models::ActivePermissionProfile;
|
|
use codex_protocol::models::ContentItem;
|
|
use codex_protocol::models::PermissionProfile;
|
|
use codex_protocol::models::ResponseInputItem;
|
|
use codex_protocol::models::ResponseItem;
|
|
use codex_protocol::openai_models::ReasoningEffort;
|
|
use codex_protocol::protocol::AskForApproval;
|
|
use codex_protocol::protocol::Event;
|
|
use codex_protocol::protocol::Op;
|
|
use codex_protocol::protocol::SandboxPolicy;
|
|
use codex_protocol::protocol::SessionConfiguredEvent;
|
|
use codex_protocol::protocol::SessionSource;
|
|
use codex_protocol::protocol::Submission;
|
|
use codex_protocol::protocol::ThreadMemoryMode;
|
|
use codex_protocol::protocol::ThreadSource;
|
|
use codex_protocol::protocol::TokenUsageInfo;
|
|
use codex_protocol::protocol::TurnEnvironmentSelection;
|
|
use codex_protocol::protocol::W3cTraceContext;
|
|
use codex_protocol::user_input::UserInput;
|
|
use codex_thread_store::StoredThread;
|
|
use codex_thread_store::StoredThreadHistory;
|
|
use codex_thread_store::ThreadMetadataPatch;
|
|
use codex_thread_store::ThreadStoreError;
|
|
use codex_thread_store::ThreadStoreResult;
|
|
use codex_utils_absolute_path::AbsolutePathBuf;
|
|
use rmcp::model::ReadResourceRequestParams;
|
|
use std::collections::HashMap;
|
|
use std::path::PathBuf;
|
|
use std::sync::Arc;
|
|
use tokio::sync::Mutex;
|
|
use tokio::sync::watch;
|
|
|
|
use codex_rollout::state_db::StateDbHandle;
|
|
|
|
#[derive(Clone, Debug)]
|
|
pub struct ThreadConfigSnapshot {
|
|
pub model: String,
|
|
pub model_provider_id: String,
|
|
pub service_tier: Option<String>,
|
|
pub approval_policy: AskForApproval,
|
|
pub approvals_reviewer: ApprovalsReviewer,
|
|
pub permission_profile: PermissionProfile,
|
|
pub active_permission_profile: Option<ActivePermissionProfile>,
|
|
pub cwd: AbsolutePathBuf,
|
|
pub ephemeral: bool,
|
|
pub reasoning_effort: Option<ReasoningEffort>,
|
|
pub personality: Option<Personality>,
|
|
pub session_source: SessionSource,
|
|
pub thread_source: Option<ThreadSource>,
|
|
}
|
|
|
|
impl ThreadConfigSnapshot {
|
|
pub fn sandbox_policy(&self) -> SandboxPolicy {
|
|
let file_system_sandbox_policy = self.permission_profile.file_system_sandbox_policy();
|
|
codex_sandboxing::compatibility_sandbox_policy_for_permission_profile(
|
|
&self.permission_profile,
|
|
&file_system_sandbox_policy,
|
|
self.permission_profile.network_sandbox_policy(),
|
|
self.cwd.as_path(),
|
|
)
|
|
}
|
|
}
|
|
|
|
/// Turn context overrides that app-server validates before starting a turn.
|
|
#[derive(Clone, Default)]
|
|
pub struct CodexThreadTurnContextOverrides {
|
|
pub cwd: Option<PathBuf>,
|
|
pub approval_policy: Option<AskForApproval>,
|
|
pub approvals_reviewer: Option<ApprovalsReviewer>,
|
|
pub sandbox_policy: Option<SandboxPolicy>,
|
|
pub permission_profile: Option<PermissionProfile>,
|
|
pub active_permission_profile: Option<ActivePermissionProfile>,
|
|
pub windows_sandbox_level: Option<WindowsSandboxLevel>,
|
|
pub model: Option<String>,
|
|
pub effort: Option<Option<ReasoningEffort>>,
|
|
pub summary: Option<ReasoningSummary>,
|
|
pub service_tier: Option<Option<String>>,
|
|
pub collaboration_mode: Option<CollaborationMode>,
|
|
pub personality: Option<Personality>,
|
|
}
|
|
|
|
pub struct CodexThread {
|
|
pub(crate) codex: Codex,
|
|
pub(crate) session_source: SessionSource,
|
|
session_configured: SessionConfiguredEvent,
|
|
rollout_path: Option<PathBuf>,
|
|
out_of_band_elicitation_count: Mutex<u64>,
|
|
}
|
|
|
|
/// Conduit for the bidirectional stream of messages that compose a thread
|
|
/// (formerly called a conversation) in Codex.
|
|
impl CodexThread {
|
|
pub(crate) fn new(
|
|
codex: Codex,
|
|
session_configured: SessionConfiguredEvent,
|
|
rollout_path: Option<PathBuf>,
|
|
session_source: SessionSource,
|
|
) -> Self {
|
|
Self {
|
|
codex,
|
|
session_source,
|
|
session_configured,
|
|
rollout_path,
|
|
out_of_band_elicitation_count: Mutex::new(0),
|
|
}
|
|
}
|
|
|
|
pub async fn submit(&self, op: Op) -> CodexResult<String> {
|
|
self.codex.submit(op).await
|
|
}
|
|
|
|
/// Returns the session telemetry handle for thread-scoped production instrumentation.
|
|
pub fn session_telemetry(&self) -> SessionTelemetry {
|
|
self.codex.session.services.session_telemetry.clone()
|
|
}
|
|
|
|
pub async fn shutdown_and_wait(&self) -> CodexResult<()> {
|
|
self.codex.shutdown_and_wait().await
|
|
}
|
|
|
|
/// Wait until the underlying session loop has terminated.
|
|
pub async fn wait_until_terminated(&self) {
|
|
self.codex.session_loop_termination.clone().await;
|
|
}
|
|
|
|
pub async fn apply_goal_resume_runtime_effects(&self) -> anyhow::Result<()> {
|
|
self.codex
|
|
.session
|
|
.goal_runtime_apply(GoalRuntimeEvent::ThreadResumed)
|
|
.await
|
|
}
|
|
|
|
pub async fn continue_active_goal_if_idle(&self) -> anyhow::Result<()> {
|
|
self.codex
|
|
.session
|
|
.goal_runtime_apply(GoalRuntimeEvent::MaybeContinueIfIdle)
|
|
.await
|
|
}
|
|
|
|
pub async fn prepare_external_goal_mutation(&self) {
|
|
if let Err(err) = self
|
|
.codex
|
|
.session
|
|
.goal_runtime_apply(GoalRuntimeEvent::ExternalMutationStarting)
|
|
.await
|
|
{
|
|
tracing::warn!("failed to prepare external goal mutation: {err}");
|
|
}
|
|
}
|
|
|
|
pub async fn apply_external_goal_set(&self, external_set: ExternalGoalSet) {
|
|
if let Err(err) = self
|
|
.codex
|
|
.session
|
|
.goal_runtime_apply(GoalRuntimeEvent::ExternalSet { external_set })
|
|
.await
|
|
{
|
|
tracing::warn!("failed to apply external goal status runtime effects: {err}");
|
|
}
|
|
}
|
|
|
|
pub async fn apply_external_goal_clear(&self) {
|
|
if let Err(err) = self
|
|
.codex
|
|
.session
|
|
.goal_runtime_apply(GoalRuntimeEvent::ExternalClear)
|
|
.await
|
|
{
|
|
tracing::warn!("failed to apply external goal clear runtime effects: {err}");
|
|
}
|
|
}
|
|
|
|
#[doc(hidden)]
|
|
pub async fn ensure_rollout_materialized(&self) {
|
|
self.codex.session.ensure_rollout_materialized().await;
|
|
}
|
|
|
|
#[doc(hidden)]
|
|
pub async fn flush_rollout(&self) -> std::io::Result<()> {
|
|
self.codex.session.flush_rollout().await
|
|
}
|
|
|
|
pub async fn submit_with_trace(
|
|
&self,
|
|
op: Op,
|
|
trace: Option<W3cTraceContext>,
|
|
) -> CodexResult<String> {
|
|
self.codex.submit_with_trace(op, trace).await
|
|
}
|
|
|
|
/// Persist whether this thread is eligible for future memory generation.
|
|
pub async fn set_thread_memory_mode(&self, mode: ThreadMemoryMode) -> anyhow::Result<()> {
|
|
self.codex.set_thread_memory_mode(mode).await
|
|
}
|
|
|
|
pub async fn steer_input(
|
|
&self,
|
|
input: Vec<UserInput>,
|
|
expected_turn_id: Option<&str>,
|
|
responsesapi_client_metadata: Option<HashMap<String, String>>,
|
|
) -> Result<String, SteerInputError> {
|
|
self.codex
|
|
.steer_input(input, expected_turn_id, responsesapi_client_metadata)
|
|
.await
|
|
}
|
|
|
|
pub async fn set_app_server_client_info(
|
|
&self,
|
|
app_server_client_name: Option<String>,
|
|
app_server_client_version: Option<String>,
|
|
mcp_elicitations_auto_deny: bool,
|
|
) -> ConstraintResult<()> {
|
|
self.codex
|
|
.set_app_server_client_info(
|
|
app_server_client_name,
|
|
app_server_client_version,
|
|
mcp_elicitations_auto_deny,
|
|
)
|
|
.await
|
|
}
|
|
|
|
/// Validate persistent turn context overrides without committing them.
|
|
pub async fn validate_turn_context_overrides(
|
|
&self,
|
|
overrides: CodexThreadTurnContextOverrides,
|
|
) -> ConstraintResult<()> {
|
|
let CodexThreadTurnContextOverrides {
|
|
cwd,
|
|
approval_policy,
|
|
approvals_reviewer,
|
|
sandbox_policy,
|
|
permission_profile,
|
|
active_permission_profile,
|
|
windows_sandbox_level,
|
|
model,
|
|
effort,
|
|
summary,
|
|
service_tier,
|
|
collaboration_mode,
|
|
personality,
|
|
} = overrides;
|
|
let collaboration_mode = if let Some(collaboration_mode) = collaboration_mode {
|
|
collaboration_mode
|
|
} else {
|
|
self.codex
|
|
.session
|
|
.collaboration_mode()
|
|
.await
|
|
.with_updates(model, effort, /*developer_instructions*/ None)
|
|
};
|
|
|
|
let updates = SessionSettingsUpdate {
|
|
cwd,
|
|
approval_policy,
|
|
approvals_reviewer,
|
|
sandbox_policy,
|
|
permission_profile,
|
|
active_permission_profile,
|
|
windows_sandbox_level,
|
|
collaboration_mode: Some(collaboration_mode),
|
|
reasoning_summary: summary,
|
|
service_tier,
|
|
personality,
|
|
..Default::default()
|
|
};
|
|
self.codex.session.validate_settings(&updates).await
|
|
}
|
|
|
|
/// Use sparingly: this is intended to be removed soon.
|
|
pub async fn submit_with_id(&self, sub: Submission) -> CodexResult<()> {
|
|
self.codex.submit_with_id(sub).await
|
|
}
|
|
|
|
pub async fn next_event(&self) -> CodexResult<Event> {
|
|
self.codex.next_event().await
|
|
}
|
|
|
|
pub async fn agent_status(&self) -> AgentStatus {
|
|
self.codex.agent_status().await
|
|
}
|
|
|
|
pub(crate) fn subscribe_status(&self) -> watch::Receiver<AgentStatus> {
|
|
self.codex.agent_status.clone()
|
|
}
|
|
|
|
/// Returns the complete token usage snapshot currently cached for this thread.
|
|
///
|
|
/// This accessor is intentionally narrower than direct session access: it lets
|
|
/// app-server lifecycle paths replay restored usage after resume or fork without
|
|
/// exposing broader session mutation authority. A caller that only reads
|
|
/// `total_token_usage` would drop last-turn usage and make the v2
|
|
/// `thread/tokenUsage/updated` payload incomplete.
|
|
pub async fn token_usage_info(&self) -> Option<TokenUsageInfo> {
|
|
self.codex.session.token_usage_info().await
|
|
}
|
|
|
|
/// Records a user-role session-prefix message without creating a new user turn boundary.
|
|
pub(crate) async fn inject_user_message_without_turn(&self, message: String) {
|
|
let message = ResponseItem::Message {
|
|
id: None,
|
|
role: "user".to_string(),
|
|
content: vec![ContentItem::InputText { text: message }],
|
|
phase: None,
|
|
};
|
|
let pending_item = match pending_message_input_item(&message) {
|
|
Ok(pending_item) => pending_item,
|
|
Err(err) => {
|
|
debug_assert!(false, "session-prefix message append should succeed: {err}");
|
|
return;
|
|
}
|
|
};
|
|
if self
|
|
.codex
|
|
.session
|
|
.inject_response_items(vec![pending_item])
|
|
.await
|
|
.is_err()
|
|
{
|
|
let turn_context = self.codex.session.new_default_turn().await;
|
|
self.codex
|
|
.session
|
|
.record_conversation_items(turn_context.as_ref(), &[message])
|
|
.await;
|
|
}
|
|
}
|
|
|
|
/// Append a prebuilt message to the thread history without treating it as a user turn.
|
|
///
|
|
/// If the thread already has an active turn, the message is queued as pending input for that
|
|
/// turn. Otherwise it is queued at session scope and a regular turn is started so the agent
|
|
/// can consume that pending input through the normal turn pipeline.
|
|
#[cfg(test)]
|
|
pub(crate) async fn append_message(&self, message: ResponseItem) -> CodexResult<String> {
|
|
let submission_id = uuid::Uuid::new_v4().to_string();
|
|
let pending_item = pending_message_input_item(&message)?;
|
|
if let Err(items) = self
|
|
.codex
|
|
.session
|
|
.inject_response_items(vec![pending_item])
|
|
.await
|
|
{
|
|
self.codex
|
|
.session
|
|
.queue_response_items_for_next_turn(items)
|
|
.await;
|
|
self.codex.session.maybe_start_turn_for_pending_work().await;
|
|
}
|
|
|
|
Ok(submission_id)
|
|
}
|
|
|
|
/// Append raw Responses API items to the thread's model-visible history.
|
|
pub async fn inject_response_items(&self, items: Vec<ResponseItem>) -> CodexResult<()> {
|
|
if items.is_empty() {
|
|
return Err(CodexErr::InvalidRequest(
|
|
"items must not be empty".to_string(),
|
|
));
|
|
}
|
|
|
|
let turn_context = self.codex.session.new_default_turn().await;
|
|
if self.codex.session.reference_context_item().await.is_none() {
|
|
self.codex
|
|
.session
|
|
.record_context_updates_and_set_reference_context_item(turn_context.as_ref())
|
|
.await;
|
|
}
|
|
self.codex
|
|
.session
|
|
.record_conversation_items(turn_context.as_ref(), &items)
|
|
.await;
|
|
self.codex.session.flush_rollout().await?;
|
|
Ok(())
|
|
}
|
|
|
|
pub fn rollout_path(&self) -> Option<PathBuf> {
|
|
self.rollout_path.clone()
|
|
}
|
|
|
|
pub fn session_configured(&self) -> SessionConfiguredEvent {
|
|
self.session_configured.clone()
|
|
}
|
|
|
|
pub(crate) fn is_running(&self) -> bool {
|
|
!self.codex.tx_sub.is_closed()
|
|
}
|
|
|
|
pub async fn guardian_trunk_rollout_path(&self) -> Option<PathBuf> {
|
|
self.codex
|
|
.session
|
|
.guardian_review_session
|
|
.trunk_rollout_path()
|
|
.await
|
|
}
|
|
|
|
pub async fn load_history(
|
|
&self,
|
|
include_archived: bool,
|
|
) -> ThreadStoreResult<StoredThreadHistory> {
|
|
let live_thread = self
|
|
.codex
|
|
.session
|
|
.live_thread_for_persistence("load history")
|
|
.map_err(|err| ThreadStoreError::Internal {
|
|
message: err.to_string(),
|
|
})?;
|
|
live_thread.load_history(include_archived).await
|
|
}
|
|
|
|
pub async fn read_thread(
|
|
&self,
|
|
include_archived: bool,
|
|
include_history: bool,
|
|
) -> ThreadStoreResult<StoredThread> {
|
|
let live_thread = self
|
|
.codex
|
|
.session
|
|
.live_thread_for_persistence("read thread")
|
|
.map_err(|err| ThreadStoreError::Internal {
|
|
message: err.to_string(),
|
|
})?;
|
|
live_thread
|
|
.read_thread(include_archived, include_history)
|
|
.await
|
|
}
|
|
|
|
pub async fn update_thread_metadata(
|
|
&self,
|
|
patch: ThreadMetadataPatch,
|
|
include_archived: bool,
|
|
) -> ThreadStoreResult<StoredThread> {
|
|
let live_thread = self
|
|
.codex
|
|
.session
|
|
.live_thread_for_persistence("update thread metadata")
|
|
.map_err(|err| ThreadStoreError::Internal {
|
|
message: err.to_string(),
|
|
})?;
|
|
live_thread.update_metadata(patch, include_archived).await
|
|
}
|
|
|
|
pub fn state_db(&self) -> Option<StateDbHandle> {
|
|
self.codex.state_db()
|
|
}
|
|
|
|
pub async fn config_snapshot(&self) -> ThreadConfigSnapshot {
|
|
self.codex.thread_config_snapshot().await
|
|
}
|
|
|
|
pub async fn config(&self) -> Arc<crate::config::Config> {
|
|
self.codex.session.get_config().await
|
|
}
|
|
|
|
/// Refresh the thread's layer-backed user config state from a caller-supplied
|
|
/// config snapshot. Thread-scoped layers and session-static settings remain
|
|
/// unchanged.
|
|
pub async fn refresh_runtime_config(&self, next_config: crate::config::Config) {
|
|
self.codex.session.refresh_runtime_config(next_config).await;
|
|
}
|
|
|
|
pub async fn environment_selections(&self) -> Vec<TurnEnvironmentSelection> {
|
|
self.codex.thread_environment_selections().await
|
|
}
|
|
|
|
pub async fn read_mcp_resource(
|
|
&self,
|
|
server: &str,
|
|
uri: &str,
|
|
) -> anyhow::Result<serde_json::Value> {
|
|
let result = self
|
|
.codex
|
|
.session
|
|
.read_resource(
|
|
server,
|
|
ReadResourceRequestParams {
|
|
meta: None,
|
|
uri: uri.to_string(),
|
|
},
|
|
)
|
|
.await?;
|
|
|
|
Ok(serde_json::to_value(result)?)
|
|
}
|
|
|
|
pub async fn call_mcp_tool(
|
|
&self,
|
|
server: &str,
|
|
tool: &str,
|
|
arguments: Option<serde_json::Value>,
|
|
meta: Option<serde_json::Value>,
|
|
) -> anyhow::Result<CallToolResult> {
|
|
self.codex
|
|
.session
|
|
.call_tool(server, tool, arguments, meta)
|
|
.await
|
|
}
|
|
|
|
pub fn enabled(&self, feature: Feature) -> bool {
|
|
self.codex.enabled(feature)
|
|
}
|
|
|
|
pub async fn increment_out_of_band_elicitation_count(&self) -> CodexResult<u64> {
|
|
let mut guard = self.out_of_band_elicitation_count.lock().await;
|
|
let was_zero = *guard == 0;
|
|
*guard = guard.checked_add(1).ok_or_else(|| {
|
|
CodexErr::Fatal("out-of-band elicitation count overflowed".to_string())
|
|
})?;
|
|
|
|
if was_zero {
|
|
self.codex
|
|
.session
|
|
.set_out_of_band_elicitation_pause_state(/*paused*/ true);
|
|
}
|
|
|
|
Ok(*guard)
|
|
}
|
|
|
|
pub async fn decrement_out_of_band_elicitation_count(&self) -> CodexResult<u64> {
|
|
let mut guard = self.out_of_band_elicitation_count.lock().await;
|
|
if *guard == 0 {
|
|
return Err(CodexErr::InvalidRequest(
|
|
"out-of-band elicitation count is already zero".to_string(),
|
|
));
|
|
}
|
|
|
|
*guard -= 1;
|
|
let now_zero = *guard == 0;
|
|
if now_zero {
|
|
self.codex
|
|
.session
|
|
.set_out_of_band_elicitation_pause_state(/*paused*/ false);
|
|
}
|
|
|
|
Ok(*guard)
|
|
}
|
|
}
|
|
|
|
fn pending_message_input_item(message: &ResponseItem) -> CodexResult<ResponseInputItem> {
|
|
match message {
|
|
ResponseItem::Message {
|
|
role,
|
|
content,
|
|
phase,
|
|
..
|
|
} => Ok(ResponseInputItem::Message {
|
|
role: role.clone(),
|
|
content: content.clone(),
|
|
phase: phase.clone(),
|
|
}),
|
|
_ => Err(CodexErr::InvalidRequest(
|
|
"append_message only supports ResponseItem::Message".to_string(),
|
|
)),
|
|
}
|
|
}
|