mirror of
https://github.com/openai/codex.git
synced 2026-05-23 04:24:21 +00:00
Compare commits
1 Commits
starr/rust
...
aibrahim/s
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
a19a247dac |
@@ -30,8 +30,8 @@ use serde::de::DeserializeOwned;
|
||||
use sha2::Digest as _;
|
||||
use sha2::Sha512;
|
||||
|
||||
const AGENT_TASK_REGISTRATION_TIMEOUT: Duration = Duration::from_secs(30);
|
||||
const AGENT_IDENTITY_JWKS_TIMEOUT: Duration = Duration::from_secs(10);
|
||||
const AGENT_TASK_REGISTRATION_TIMEOUT: Duration = Duration::from_secs(5);
|
||||
const AGENT_IDENTITY_JWKS_TIMEOUT: Duration = Duration::from_secs(5);
|
||||
const AGENT_IDENTITY_JWT_AUDIENCE: &str = "codex-app-server";
|
||||
const AGENT_IDENTITY_JWT_ISSUER: &str = "https://chatgpt.com/codex-backend/agent-identity";
|
||||
|
||||
|
||||
@@ -17,6 +17,7 @@ use std::io::Result as IoResult;
|
||||
use std::sync::Arc;
|
||||
use std::sync::RwLock;
|
||||
use std::sync::atomic::AtomicBool;
|
||||
use std::time::Instant;
|
||||
|
||||
use crate::analytics_utils::analytics_events_client_from_config;
|
||||
use crate::config_manager::ConfigManager;
|
||||
@@ -425,6 +426,7 @@ pub async fn run_main_with_transport_options(
|
||||
auth: AppServerWebsocketAuthSettings,
|
||||
runtime_options: AppServerRuntimeOptions,
|
||||
) -> IoResult<()> {
|
||||
let startup_started_at = Instant::now();
|
||||
let (transport_event_tx, mut transport_event_rx) =
|
||||
mpsc::channel::<TransportEvent>(CHANNEL_CAPACITY);
|
||||
let (outgoing_tx, mut outgoing_rx) = mpsc::channel::<OutgoingEnvelope>(CHANNEL_CAPACITY);
|
||||
@@ -600,6 +602,8 @@ pub async fn run_main_with_transport_options(
|
||||
});
|
||||
}
|
||||
|
||||
let config_and_state_init_duration = startup_started_at.elapsed();
|
||||
|
||||
let feedback = CodexFeedback::new();
|
||||
|
||||
// Install a simple subscriber so `tracing` output is visible. Users can
|
||||
@@ -627,6 +631,7 @@ pub async fn run_main_with_transport_options(
|
||||
.map(|layer| layer.with_filter(Targets::new().with_default(Level::TRACE)));
|
||||
let otel_logger_layer = otel.as_ref().and_then(|o| o.logger_layer());
|
||||
let otel_tracing_layer = otel.as_ref().and_then(|o| o.tracing_layer());
|
||||
let tracing_init_started_at = Instant::now();
|
||||
let _ = tracing_subscriber::registry()
|
||||
.with(stderr_fmt)
|
||||
.with(feedback_layer)
|
||||
@@ -635,6 +640,8 @@ pub async fn run_main_with_transport_options(
|
||||
.with(otel_logger_layer)
|
||||
.with(otel_tracing_layer)
|
||||
.try_init();
|
||||
let tracing_init_duration = tracing_init_started_at.elapsed();
|
||||
let runtime_bootstrap_started_at = Instant::now();
|
||||
for warning in &config_warnings {
|
||||
match &warning.details {
|
||||
Some(details) => error!("{} {}", warning.summary, details),
|
||||
@@ -650,7 +657,9 @@ pub async fn run_main_with_transport_options(
|
||||
let graceful_signal_restart_enabled =
|
||||
runtime_options.install_shutdown_signal_handler && !single_client_mode;
|
||||
let mut app_server_client_name_rx = None;
|
||||
let runtime_bootstrap_duration = runtime_bootstrap_started_at.elapsed();
|
||||
|
||||
let transport_init_started_at = Instant::now();
|
||||
match &transport {
|
||||
AppServerTransport::Stdio => {
|
||||
let (stdio_client_name_tx, stdio_client_name_rx) = oneshot::channel::<String>();
|
||||
@@ -683,7 +692,9 @@ pub async fn run_main_with_transport_options(
|
||||
}
|
||||
AppServerTransport::Off => {}
|
||||
}
|
||||
let transport_init_duration = transport_init_started_at.elapsed();
|
||||
|
||||
let remote_control_init_started_at = Instant::now();
|
||||
let auth_manager =
|
||||
AuthManager::shared_from_config(&config, /*enable_codex_api_key_env*/ false).await;
|
||||
|
||||
@@ -717,6 +728,20 @@ pub async fn run_main_with_transport_options(
|
||||
)
|
||||
.await?;
|
||||
transport_accept_handles.push(remote_control_accept_handle);
|
||||
let remote_control_init_duration = remote_control_init_started_at.elapsed();
|
||||
|
||||
info!(
|
||||
total_ms = %startup_started_at.elapsed().as_millis(),
|
||||
config_and_state_init_ms = %config_and_state_init_duration.as_millis(),
|
||||
tracing_init_ms = %tracing_init_duration.as_millis(),
|
||||
runtime_bootstrap_ms = %runtime_bootstrap_duration.as_millis(),
|
||||
transport_init_ms = %transport_init_duration.as_millis(),
|
||||
remote_control_init_ms = %remote_control_init_duration.as_millis(),
|
||||
transport = ?transport,
|
||||
remote_control_requested,
|
||||
remote_control_enabled,
|
||||
"app-server startup complete"
|
||||
);
|
||||
|
||||
let outbound_handle = tokio::spawn(async move {
|
||||
let mut outbound_connections = HashMap::<ConnectionId, OutboundConnectionState>::new();
|
||||
|
||||
@@ -86,6 +86,7 @@ use tokio::sync::watch;
|
||||
use tokio::time::Duration;
|
||||
use tokio::time::timeout;
|
||||
use tracing::Instrument;
|
||||
use tracing::info;
|
||||
|
||||
const EXTERNAL_AUTH_REFRESH_TIMEOUT: Duration = Duration::from_secs(10);
|
||||
|
||||
@@ -275,6 +276,7 @@ impl MessageProcessor {
|
||||
/// Create a new `MessageProcessor`, retaining a handle to the outgoing
|
||||
/// `Sender` so handlers can enqueue messages to be written to stdout.
|
||||
pub(crate) fn new(args: MessageProcessorArgs) -> Self {
|
||||
let processor_init_started_at = std::time::Instant::now();
|
||||
let MessageProcessorArgs {
|
||||
outgoing,
|
||||
analytics_events_client,
|
||||
@@ -301,6 +303,7 @@ impl MessageProcessor {
|
||||
// affect per-thread behavior, but they must not move newly started,
|
||||
// resumed, or forked threads to a different persistence backend/root.
|
||||
let thread_store = codex_core::thread_store_from_config(config.as_ref(), state_db.clone());
|
||||
let thread_manager_init_started_at = std::time::Instant::now();
|
||||
let thread_manager = Arc::new_cyclic(|thread_manager| {
|
||||
ThreadManager::new(
|
||||
config.as_ref(),
|
||||
@@ -318,6 +321,7 @@ impl MessageProcessor {
|
||||
)),
|
||||
)
|
||||
});
|
||||
let thread_manager_init_duration = thread_manager_init_started_at.elapsed();
|
||||
thread_manager
|
||||
.plugins_manager()
|
||||
.set_analytics_events_client(analytics_events_client.clone());
|
||||
@@ -366,6 +370,7 @@ impl MessageProcessor {
|
||||
state_db.clone(),
|
||||
);
|
||||
let git_processor = GitRequestProcessor::new();
|
||||
let startup_warning_count = config_warnings.len();
|
||||
let initialize_processor = InitializeRequestProcessor::new(
|
||||
outgoing.clone(),
|
||||
analytics_events_client.clone(),
|
||||
@@ -474,6 +479,13 @@ impl MessageProcessor {
|
||||
config_manager,
|
||||
);
|
||||
|
||||
info!(
|
||||
total_init_ms = %processor_init_started_at.elapsed().as_millis(),
|
||||
thread_manager_init_ms = %thread_manager_init_duration.as_millis(),
|
||||
startup_warning_count,
|
||||
"app-server message processor initialized"
|
||||
);
|
||||
|
||||
Self {
|
||||
outgoing,
|
||||
account_processor,
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
use std::sync::Arc;
|
||||
use std::time::Instant;
|
||||
|
||||
use codex_app_server_protocol::Model;
|
||||
use codex_app_server_protocol::ModelServiceTier;
|
||||
@@ -13,13 +14,29 @@ pub async fn supported_models(
|
||||
thread_manager: Arc<ThreadManager>,
|
||||
include_hidden: bool,
|
||||
) -> Vec<Model> {
|
||||
thread_manager
|
||||
let supported_models_started_at = Instant::now();
|
||||
let list_models_started_at = Instant::now();
|
||||
let presets = thread_manager
|
||||
.list_models(RefreshStrategy::OnlineIfUncached)
|
||||
.await
|
||||
.await;
|
||||
let list_models_ms = list_models_started_at.elapsed().as_millis();
|
||||
let preset_count = presets.len();
|
||||
let filter_and_map_started_at = Instant::now();
|
||||
let models = presets
|
||||
.into_iter()
|
||||
.filter(|preset| include_hidden || preset.show_in_picker)
|
||||
.map(model_from_preset)
|
||||
.collect()
|
||||
.collect::<Vec<_>>();
|
||||
tracing::info!(
|
||||
include_hidden,
|
||||
list_models_ms = %list_models_ms,
|
||||
filter_and_map_ms = %filter_and_map_started_at.elapsed().as_millis(),
|
||||
preset_count,
|
||||
returned_count = models.len(),
|
||||
total_ms = %supported_models_started_at.elapsed().as_millis(),
|
||||
"app-server supported models timing"
|
||||
);
|
||||
models
|
||||
}
|
||||
|
||||
fn model_from_preset(preset: ModelPreset) -> Model {
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
use super::*;
|
||||
use futures::StreamExt;
|
||||
use std::time::Instant;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub(crate) struct CatalogRequestProcessor {
|
||||
@@ -224,21 +225,33 @@ impl CatalogRequestProcessor {
|
||||
thread_manager: Arc<ThreadManager>,
|
||||
params: ModelListParams,
|
||||
) -> Result<ModelListResponse, JSONRPCErrorError> {
|
||||
let list_models_started_at = Instant::now();
|
||||
let ModelListParams {
|
||||
limit,
|
||||
cursor,
|
||||
include_hidden,
|
||||
} = params;
|
||||
let supported_models_started_at = Instant::now();
|
||||
let models = supported_models(thread_manager, include_hidden.unwrap_or(false)).await;
|
||||
let supported_models_ms = supported_models_started_at.elapsed().as_millis();
|
||||
let total = models.len();
|
||||
|
||||
if total == 0 {
|
||||
tracing::info!(
|
||||
supported_models_ms = %supported_models_ms,
|
||||
pagination_ms = 0,
|
||||
total_models = 0,
|
||||
returned_models = 0,
|
||||
total_ms = %list_models_started_at.elapsed().as_millis(),
|
||||
"app-server model/list timing"
|
||||
);
|
||||
return Ok(ModelListResponse {
|
||||
data: Vec::new(),
|
||||
next_cursor: None,
|
||||
});
|
||||
}
|
||||
|
||||
let pagination_started_at = Instant::now();
|
||||
let effective_limit = limit.unwrap_or(total as u32).max(1) as usize;
|
||||
let effective_limit = effective_limit.min(total);
|
||||
let start = match cursor {
|
||||
@@ -261,6 +274,15 @@ impl CatalogRequestProcessor {
|
||||
} else {
|
||||
None
|
||||
};
|
||||
let pagination_ms = pagination_started_at.elapsed().as_millis();
|
||||
tracing::info!(
|
||||
supported_models_ms = %supported_models_ms,
|
||||
pagination_ms = %pagination_ms,
|
||||
total_models = total,
|
||||
returned_models = items.len(),
|
||||
total_ms = %list_models_started_at.elapsed().as_millis(),
|
||||
"app-server model/list timing"
|
||||
);
|
||||
Ok(ModelListResponse {
|
||||
data: items,
|
||||
next_cursor,
|
||||
|
||||
@@ -445,7 +445,7 @@ fn codex_apps_mcp_server_config(config: &McpConfig) -> McpServerConfig {
|
||||
required: false,
|
||||
supports_parallel_tool_calls: false,
|
||||
disabled_reason: None,
|
||||
startup_timeout_sec: Some(Duration::from_secs(30)),
|
||||
startup_timeout_sec: Some(Duration::from_secs(5)),
|
||||
tool_timeout_sec: None,
|
||||
default_tools_approval_mode: None,
|
||||
enabled_tools: None,
|
||||
|
||||
@@ -71,7 +71,7 @@ pub const MCP_SANDBOX_STATE_META_CAPABILITY: &str = "codex/sandbox-state-meta";
|
||||
pub(crate) const MCP_TOOLS_LIST_DURATION_METRIC: &str = "codex.mcp.tools.list.duration_ms";
|
||||
pub(crate) const MCP_TOOLS_FETCH_UNCACHED_DURATION_METRIC: &str =
|
||||
"codex.mcp.tools.fetch_uncached.duration_ms";
|
||||
pub(crate) const DEFAULT_STARTUP_TIMEOUT: Duration = Duration::from_secs(30);
|
||||
pub(crate) const DEFAULT_STARTUP_TIMEOUT: Duration = Duration::from_secs(5);
|
||||
pub(crate) const DEFAULT_TOOL_TIMEOUT: Duration = Duration::from_secs(120);
|
||||
|
||||
const UNTRUSTED_CONNECTOR_META_KEYS: &[&str] = &[
|
||||
|
||||
@@ -59,7 +59,7 @@ pub const REMOTE_WORKSPACE_SHARED_WITH_ME_PRIVATE_MARKETPLACE_DISPLAY_NAME: &str
|
||||
pub const REMOTE_WORKSPACE_SHARED_WITH_ME_UNLISTED_MARKETPLACE_DISPLAY_NAME: &str =
|
||||
"Shared with me (unlisted)";
|
||||
|
||||
const REMOTE_PLUGIN_CATALOG_TIMEOUT: Duration = Duration::from_secs(30);
|
||||
const REMOTE_PLUGIN_CATALOG_TIMEOUT: Duration = Duration::from_secs(5);
|
||||
const REMOTE_PLUGIN_LIST_PAGE_LIMIT: u32 = 200;
|
||||
const MAX_REMOTE_DEFAULT_PROMPT_LEN: usize = 128;
|
||||
const INVALID_REQUEST_ERROR_CODE: i64 = -32600;
|
||||
|
||||
@@ -7,9 +7,9 @@ use std::time::Duration;
|
||||
use url::Url;
|
||||
|
||||
const DEFAULT_REMOTE_MARKETPLACE_NAME: &str = "openai-curated";
|
||||
const REMOTE_PLUGIN_FETCH_TIMEOUT: Duration = Duration::from_secs(30);
|
||||
const REMOTE_FEATURED_PLUGIN_FETCH_TIMEOUT: Duration = Duration::from_secs(10);
|
||||
const REMOTE_PLUGIN_MUTATION_TIMEOUT: Duration = Duration::from_secs(30);
|
||||
const REMOTE_PLUGIN_FETCH_TIMEOUT: Duration = Duration::from_secs(5);
|
||||
const REMOTE_FEATURED_PLUGIN_FETCH_TIMEOUT: Duration = Duration::from_secs(5);
|
||||
const REMOTE_PLUGIN_MUTATION_TIMEOUT: Duration = Duration::from_secs(5);
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Deserialize)]
|
||||
pub struct RemotePluginStatusSummary {
|
||||
|
||||
@@ -25,9 +25,9 @@ const OPENAI_PLUGINS_REPO: &str = "plugins";
|
||||
const CURATED_PLUGINS_RELATIVE_DIR: &str = ".tmp/plugins";
|
||||
const CURATED_PLUGINS_SHA_FILE: &str = ".tmp/plugins.sha";
|
||||
const CURATED_PLUGINS_BACKUP_ARCHIVE_FALLBACK_VERSION: &str = "export-backup";
|
||||
const CURATED_PLUGINS_GIT_TIMEOUT: Duration = Duration::from_secs(30);
|
||||
const CURATED_PLUGINS_HTTP_TIMEOUT: Duration = Duration::from_secs(30);
|
||||
const CURATED_PLUGINS_BACKUP_ARCHIVE_TIMEOUT: Duration = Duration::from_secs(30);
|
||||
const CURATED_PLUGINS_GIT_TIMEOUT: Duration = Duration::from_secs(5);
|
||||
const CURATED_PLUGINS_HTTP_TIMEOUT: Duration = Duration::from_secs(5);
|
||||
const CURATED_PLUGINS_BACKUP_ARCHIVE_TIMEOUT: Duration = Duration::from_secs(5);
|
||||
// Keep this comfortably above a normal sync attempt so we do not race another Codex process.
|
||||
const CURATED_PLUGINS_STALE_TEMP_DIR_MAX_AGE: Duration = Duration::from_secs(10 * 60);
|
||||
|
||||
|
||||
@@ -5,6 +5,7 @@ use std::path::Path;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::AtomicU64;
|
||||
use std::time::Instant;
|
||||
use std::time::SystemTime;
|
||||
use std::time::UNIX_EPOCH;
|
||||
|
||||
@@ -451,6 +452,7 @@ impl Codex {
|
||||
}
|
||||
|
||||
async fn spawn_internal(args: CodexSpawnArgs) -> CodexResult<CodexSpawnOk> {
|
||||
let spawn_started_at = Instant::now();
|
||||
let CodexSpawnArgs {
|
||||
mut config,
|
||||
installation_id,
|
||||
@@ -481,11 +483,13 @@ impl Codex {
|
||||
let (tx_sub, rx_sub) = async_channel::bounded(SUBMISSION_CHANNEL_CAPACITY);
|
||||
let (tx_event, rx_event) = async_channel::unbounded();
|
||||
let fs = environment_selections.primary_filesystem();
|
||||
let plugin_skills_started_at = Instant::now();
|
||||
let plugins_input = config.plugins_config_input();
|
||||
let plugin_outcome = plugins_manager.plugins_for_config(&plugins_input).await;
|
||||
let effective_skill_roots = plugin_outcome.effective_plugin_skill_roots();
|
||||
let skills_input = skills_load_input_from_config(&config, effective_skill_roots);
|
||||
let loaded_skills = skills_manager.skills_for_config(&skills_input, fs).await;
|
||||
let plugin_skills_duration = plugin_skills_started_at.elapsed();
|
||||
|
||||
for err in &loaded_skills.errors {
|
||||
error!(
|
||||
@@ -504,10 +508,13 @@ impl Codex {
|
||||
}
|
||||
|
||||
let primary_environment = environment_selections.primary_environment();
|
||||
let user_instructions_started_at = Instant::now();
|
||||
let user_instructions = AgentsMdManager::new(&config)
|
||||
.user_instructions(primary_environment.as_deref())
|
||||
.await;
|
||||
let user_instructions_duration = user_instructions_started_at.elapsed();
|
||||
|
||||
let exec_policy_started_at = Instant::now();
|
||||
let exec_policy = if crate::guardian::is_guardian_reviewer_source(&session_source) {
|
||||
// Guardian review should rely on the built-in shell safety checks,
|
||||
// not on caller-provided exec-policy rules that could shape the
|
||||
@@ -522,8 +529,10 @@ impl Codex {
|
||||
.map_err(|err| CodexErr::Fatal(format!("failed to load rules: {err}")))?,
|
||||
)
|
||||
};
|
||||
let exec_policy_duration = exec_policy_started_at.elapsed();
|
||||
|
||||
let config = Arc::new(config);
|
||||
let model_and_session_config_started_at = Instant::now();
|
||||
let refresh_strategy = if session_source.is_non_root_agent() {
|
||||
codex_models_manager::manager::RefreshStrategy::Offline
|
||||
} else {
|
||||
@@ -637,11 +646,13 @@ impl Codex {
|
||||
inherited_shell_snapshot,
|
||||
user_shell_override,
|
||||
};
|
||||
let model_and_session_config_duration = model_and_session_config_started_at.elapsed();
|
||||
|
||||
// Generate a unique ID for the lifetime of this Codex session.
|
||||
let session_source_clone = session_configuration.session_source.clone();
|
||||
let (agent_status_tx, agent_status_rx) = watch::channel(AgentStatus::PendingInit);
|
||||
|
||||
let session_new_started_at = Instant::now();
|
||||
let session = Session::new(
|
||||
session_configuration,
|
||||
config.clone(),
|
||||
@@ -652,7 +663,7 @@ impl Codex {
|
||||
tx_event.clone(),
|
||||
agent_status_tx.clone(),
|
||||
conversation_history,
|
||||
session_source_clone,
|
||||
session_source_clone.clone(),
|
||||
skills_manager,
|
||||
plugins_manager,
|
||||
mcp_manager.clone(),
|
||||
@@ -669,8 +680,21 @@ impl Codex {
|
||||
error!("Failed to create session: {e:#}");
|
||||
map_session_init_error(&e, &config.codex_home)
|
||||
})?;
|
||||
let session_new_duration = session_new_started_at.elapsed();
|
||||
let thread_id = session.conversation_id;
|
||||
|
||||
info!(
|
||||
thread_id = %thread_id,
|
||||
session_source = %session_source_clone,
|
||||
plugin_skills_ms = %plugin_skills_duration.as_millis(),
|
||||
user_instructions_ms = %user_instructions_duration.as_millis(),
|
||||
exec_policy_ms = %exec_policy_duration.as_millis(),
|
||||
model_and_session_config_ms = %model_and_session_config_duration.as_millis(),
|
||||
session_new_ms = %session_new_duration.as_millis(),
|
||||
total_ms = %spawn_started_at.elapsed().as_millis(),
|
||||
"codex spawn timing"
|
||||
);
|
||||
|
||||
// This task will run until Op::Shutdown is received.
|
||||
let session_for_loop = Arc::clone(&session);
|
||||
let session_loop_handle = tokio::spawn(async move {
|
||||
|
||||
@@ -6,6 +6,7 @@ use codex_protocol::permissions::FileSystemPath;
|
||||
use codex_protocol::permissions::FileSystemSpecialPath;
|
||||
use codex_protocol::protocol::ThreadSource;
|
||||
use codex_protocol::protocol::TurnEnvironmentSelection;
|
||||
use std::time::Instant;
|
||||
use tokio::sync::Semaphore;
|
||||
|
||||
/// Context for an initialized model agent
|
||||
@@ -442,6 +443,7 @@ impl Session {
|
||||
parent_rollout_thread_trace: ThreadTraceContext,
|
||||
attestation_provider: Option<Arc<dyn AttestationProvider>>,
|
||||
) -> anyhow::Result<Arc<Self>> {
|
||||
let session_init_started_at = Instant::now();
|
||||
debug!(
|
||||
"Configuring session: model={}; provider={:?}",
|
||||
session_configuration.collaboration_mode.model(),
|
||||
@@ -577,8 +579,10 @@ impl Session {
|
||||
));
|
||||
|
||||
// Join all independent futures.
|
||||
let parallel_setup_started_at = Instant::now();
|
||||
let (thread_persistence_result, state_db_ctx, (auth, mcp_servers, auth_statuses)) =
|
||||
tokio::join!(thread_persistence_fut, state_db_fut, auth_and_mcp_fut);
|
||||
let parallel_setup_duration = parallel_setup_started_at.elapsed();
|
||||
|
||||
let mut live_thread_init =
|
||||
LiveThreadInitGuard::new(thread_persistence_result.map_err(|e| {
|
||||
@@ -586,6 +590,7 @@ impl Session {
|
||||
e
|
||||
})?);
|
||||
let session_result: anyhow::Result<Arc<Self>> = async {
|
||||
let pre_session_configured_started_at = Instant::now();
|
||||
let rollout_path = if let Some(live_thread) = live_thread_init.as_ref() {
|
||||
live_thread.local_rollout_path().await?
|
||||
} else {
|
||||
@@ -854,7 +859,9 @@ impl Session {
|
||||
}),
|
||||
});
|
||||
}
|
||||
let pre_session_configured_duration = pre_session_configured_started_at.elapsed();
|
||||
|
||||
let services_build_started_at = Instant::now();
|
||||
let analytics_events_client = analytics_events_client.unwrap_or_else(|| {
|
||||
AnalyticsEventsClient::new(
|
||||
Arc::clone(&auth_manager),
|
||||
@@ -975,6 +982,8 @@ impl Session {
|
||||
let mut guard = network_policy_decider_session.write().await;
|
||||
*guard = Arc::downgrade(&sess);
|
||||
}
|
||||
let services_build_duration = services_build_started_at.elapsed();
|
||||
let session_configured_emit_started_at = Instant::now();
|
||||
// Dispatch the SessionConfiguredEvent first and then report any errors.
|
||||
// If resuming, include converted initial messages in the payload so UIs can render them immediately.
|
||||
let initial_messages = initial_history.get_event_msgs();
|
||||
@@ -1010,7 +1019,9 @@ impl Session {
|
||||
for event in events {
|
||||
sess.send_event_raw(event).await;
|
||||
}
|
||||
let session_configured_emit_duration = session_configured_emit_started_at.elapsed();
|
||||
|
||||
let mcp_init_started_at = Instant::now();
|
||||
let mut required_mcp_servers: Vec<String> = mcp_servers
|
||||
.iter()
|
||||
.filter(|(_, server)| server.enabled() && server.required())
|
||||
@@ -1119,6 +1130,8 @@ impl Session {
|
||||
anyhow::bail!("required MCP servers failed to initialize: {details}");
|
||||
}
|
||||
}
|
||||
let mcp_init_duration = mcp_init_started_at.elapsed();
|
||||
let post_session_configured_started_at = Instant::now();
|
||||
sess.schedule_startup_prewarm(session_configuration.base_instructions.clone())
|
||||
.await;
|
||||
let session_start_source = match &initial_history {
|
||||
@@ -1135,6 +1148,20 @@ impl Session {
|
||||
let mut state = sess.state.lock().await;
|
||||
state.set_pending_session_start_source(Some(session_start_source));
|
||||
}
|
||||
let post_session_configured_duration = post_session_configured_started_at.elapsed();
|
||||
|
||||
tracing::info!(
|
||||
thread_id = %thread_id,
|
||||
session_source = %session_configuration.session_source,
|
||||
parallel_setup_ms = %parallel_setup_duration.as_millis(),
|
||||
pre_session_configured_ms = %pre_session_configured_duration.as_millis(),
|
||||
services_build_ms = %services_build_duration.as_millis(),
|
||||
session_configured_emit_ms = %session_configured_emit_duration.as_millis(),
|
||||
mcp_init_ms = %mcp_init_duration.as_millis(),
|
||||
post_session_configured_ms = %post_session_configured_duration.as_millis(),
|
||||
total_ms = %session_init_started_at.elapsed().as_millis(),
|
||||
"session init timing"
|
||||
);
|
||||
|
||||
Ok(sess)
|
||||
}
|
||||
|
||||
@@ -74,6 +74,7 @@ use std::sync::atomic::Ordering;
|
||||
use std::time::Duration;
|
||||
use tokio::sync::RwLock;
|
||||
use tokio::sync::broadcast;
|
||||
use tracing::info;
|
||||
use tracing::warn;
|
||||
|
||||
const THREAD_CREATED_CHANNEL_CAPACITY: usize = 1024;
|
||||
@@ -436,10 +437,19 @@ impl ThreadManager {
|
||||
}
|
||||
|
||||
pub async fn list_models(&self, refresh_strategy: RefreshStrategy) -> Vec<ModelPreset> {
|
||||
self.state
|
||||
let list_models_started_at = std::time::Instant::now();
|
||||
let models = self
|
||||
.state
|
||||
.models_manager
|
||||
.list_models(refresh_strategy)
|
||||
.await
|
||||
.await;
|
||||
tracing::info!(
|
||||
refresh_strategy = %refresh_strategy,
|
||||
model_count = models.len(),
|
||||
total_ms = %list_models_started_at.elapsed().as_millis(),
|
||||
"thread manager list_models timing"
|
||||
);
|
||||
models
|
||||
}
|
||||
|
||||
pub fn list_collaboration_modes(&self) -> Vec<CollaborationModeMask> {
|
||||
@@ -1166,6 +1176,7 @@ impl ThreadManagerState {
|
||||
environments: Vec<TurnEnvironmentSelection>,
|
||||
user_shell_override: Option<crate::shell::Shell>,
|
||||
) -> CodexResult<NewThread> {
|
||||
let spawn_thread_started_at = std::time::Instant::now();
|
||||
let is_resumed_thread = matches!(&initial_history, InitialHistory::Resumed(_));
|
||||
if let InitialHistory::Resumed(resumed) = &initial_history {
|
||||
let mut threads = self.threads.write().await;
|
||||
@@ -1193,7 +1204,9 @@ impl ThreadManagerState {
|
||||
let parent_rollout_thread_trace = self
|
||||
.parent_rollout_thread_trace_for_source(&session_source, &initial_history)
|
||||
.await;
|
||||
let pre_spawn_setup_duration = spawn_thread_started_at.elapsed();
|
||||
let tracked_session_source = session_source.clone();
|
||||
let codex_spawn_started_at = std::time::Instant::now();
|
||||
let CodexSpawnOk {
|
||||
codex, thread_id, ..
|
||||
} = Codex::spawn(CodexSpawnArgs {
|
||||
@@ -1207,7 +1220,7 @@ impl ThreadManagerState {
|
||||
mcp_manager: Arc::clone(&self.mcp_manager),
|
||||
extensions: Arc::clone(&self.extensions),
|
||||
conversation_history: initial_history,
|
||||
session_source,
|
||||
session_source: session_source.clone(),
|
||||
thread_source,
|
||||
agent_control,
|
||||
dynamic_tools,
|
||||
@@ -1224,9 +1237,24 @@ impl ThreadManagerState {
|
||||
attestation_provider: self.attestation_provider.clone(),
|
||||
})
|
||||
.await?;
|
||||
let codex_spawn_duration = codex_spawn_started_at.elapsed();
|
||||
let session_configured_wait_started_at = std::time::Instant::now();
|
||||
let new_thread = self
|
||||
.finalize_thread_spawn(codex, thread_id, tracked_session_source)
|
||||
.await?;
|
||||
let session_configured_wait_duration = session_configured_wait_started_at.elapsed();
|
||||
let spawn_to_session_configured_duration = codex_spawn_started_at.elapsed();
|
||||
info!(
|
||||
thread_id = %new_thread.thread_id,
|
||||
session_source = %session_source,
|
||||
is_resumed_thread,
|
||||
pre_spawn_setup_ms = %pre_spawn_setup_duration.as_millis(),
|
||||
codex_spawn_ms = %codex_spawn_duration.as_millis(),
|
||||
wait_for_session_configured_ms = %session_configured_wait_duration.as_millis(),
|
||||
spawn_to_session_configured_ms = %spawn_to_session_configured_duration.as_millis(),
|
||||
total_ms = %spawn_thread_started_at.elapsed().as_millis(),
|
||||
"thread startup timing"
|
||||
);
|
||||
if is_resumed_thread {
|
||||
new_thread.thread.emit_thread_resume_lifecycle().await;
|
||||
if let Err(err) = new_thread.thread.apply_goal_resume_runtime_effects().await {
|
||||
|
||||
@@ -92,6 +92,7 @@ const REFRESH_TOKEN_UNKNOWN_MESSAGE: &str =
|
||||
const REFRESH_TOKEN_ACCOUNT_MISMATCH_MESSAGE: &str = "Your access token could not be refreshed because you have since logged out or signed in to another account. Please sign in again.";
|
||||
const DEFAULT_CHATGPT_BACKEND_BASE_URL: &str = "https://chatgpt.com/backend-api";
|
||||
const REFRESH_TOKEN_URL: &str = "https://auth.openai.com/oauth/token";
|
||||
const REFRESH_TOKEN_REQUEST_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(5);
|
||||
pub(super) const REVOKE_TOKEN_URL: &str = "https://auth.openai.com/oauth/revoke";
|
||||
pub const REFRESH_TOKEN_URL_OVERRIDE_ENV_VAR: &str = "CODEX_REFRESH_TOKEN_URL_OVERRIDE";
|
||||
pub const REVOKE_TOKEN_URL_OVERRIDE_ENV_VAR: &str = "CODEX_REVOKE_TOKEN_URL_OVERRIDE";
|
||||
@@ -827,6 +828,7 @@ async fn request_chatgpt_token_refresh(
|
||||
// Use shared client factory to include standard headers
|
||||
let response = client
|
||||
.post(endpoint.as_str())
|
||||
.timeout(REFRESH_TOKEN_REQUEST_TIMEOUT)
|
||||
.header("Content-Type", "application/json")
|
||||
.json(&refresh_request)
|
||||
.send()
|
||||
|
||||
@@ -25,6 +25,7 @@ use std::io::SeekFrom;
|
||||
use std::io::Write;
|
||||
use std::path::Path;
|
||||
use std::path::PathBuf;
|
||||
use std::time::Instant;
|
||||
|
||||
use serde::Deserialize;
|
||||
use serde::Serialize;
|
||||
@@ -317,30 +318,97 @@ async fn ensure_owner_only_permissions(_file: &File) -> Result<()> {
|
||||
}
|
||||
|
||||
async fn history_metadata_for_file(path: &Path) -> (u64, usize) {
|
||||
let history_metadata_started_at = Instant::now();
|
||||
let metadata_started_at = Instant::now();
|
||||
let log_id = match fs::metadata(path).await {
|
||||
Ok(metadata) => log_identity(&metadata).unwrap_or(0),
|
||||
Err(e) if e.kind() == std::io::ErrorKind::NotFound => return (0, 0),
|
||||
Err(_) => return (0, 0),
|
||||
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
|
||||
tracing::info!(
|
||||
metadata_ms = %metadata_started_at.elapsed().as_millis(),
|
||||
open_ms = 0,
|
||||
scan_ms = 0,
|
||||
bytes_read = 0,
|
||||
newline_count = 0,
|
||||
total_ms = %history_metadata_started_at.elapsed().as_millis(),
|
||||
file_found = false,
|
||||
"history metadata timing"
|
||||
);
|
||||
return (0, 0);
|
||||
}
|
||||
Err(_) => {
|
||||
tracing::info!(
|
||||
metadata_ms = %metadata_started_at.elapsed().as_millis(),
|
||||
open_ms = 0,
|
||||
scan_ms = 0,
|
||||
bytes_read = 0,
|
||||
newline_count = 0,
|
||||
total_ms = %history_metadata_started_at.elapsed().as_millis(),
|
||||
file_found = false,
|
||||
"history metadata timing"
|
||||
);
|
||||
return (0, 0);
|
||||
}
|
||||
};
|
||||
let metadata_ms = metadata_started_at.elapsed().as_millis();
|
||||
|
||||
// Open the file.
|
||||
let open_started_at = Instant::now();
|
||||
let mut file = match fs::File::open(path).await {
|
||||
Ok(f) => f,
|
||||
Err(_) => return (log_id, 0),
|
||||
Err(_) => {
|
||||
tracing::info!(
|
||||
metadata_ms = %metadata_ms,
|
||||
open_ms = %open_started_at.elapsed().as_millis(),
|
||||
scan_ms = 0,
|
||||
bytes_read = 0,
|
||||
newline_count = 0,
|
||||
total_ms = %history_metadata_started_at.elapsed().as_millis(),
|
||||
file_found = true,
|
||||
"history metadata timing"
|
||||
);
|
||||
return (log_id, 0);
|
||||
}
|
||||
};
|
||||
let open_ms = open_started_at.elapsed().as_millis();
|
||||
|
||||
// Count newline bytes.
|
||||
let scan_started_at = Instant::now();
|
||||
let mut buf = [0u8; 8192];
|
||||
let mut count = 0usize;
|
||||
let mut bytes_read = 0usize;
|
||||
loop {
|
||||
match file.read(&mut buf).await {
|
||||
Ok(0) => break,
|
||||
Ok(n) => {
|
||||
bytes_read += n;
|
||||
count += buf[..n].iter().filter(|&&b| b == b'\n').count();
|
||||
}
|
||||
Err(_) => return (log_id, 0),
|
||||
Err(_) => {
|
||||
tracing::info!(
|
||||
metadata_ms = %metadata_ms,
|
||||
open_ms = %open_ms,
|
||||
scan_ms = %scan_started_at.elapsed().as_millis(),
|
||||
bytes_read = %bytes_read,
|
||||
newline_count = 0,
|
||||
total_ms = %history_metadata_started_at.elapsed().as_millis(),
|
||||
file_found = true,
|
||||
"history metadata timing"
|
||||
);
|
||||
return (log_id, 0);
|
||||
}
|
||||
}
|
||||
}
|
||||
let scan_ms = scan_started_at.elapsed().as_millis();
|
||||
tracing::info!(
|
||||
metadata_ms = %metadata_ms,
|
||||
open_ms = %open_ms,
|
||||
scan_ms = %scan_ms,
|
||||
bytes_read = %bytes_read,
|
||||
newline_count = %count,
|
||||
total_ms = %history_metadata_started_at.elapsed().as_millis(),
|
||||
file_found = true,
|
||||
"history metadata timing"
|
||||
);
|
||||
|
||||
(log_id, count)
|
||||
}
|
||||
|
||||
@@ -29,46 +29,86 @@ impl ModelsCacheManager {
|
||||
|
||||
/// Attempt to load a fresh cache entry. Returns `None` if the cache doesn't exist or is stale.
|
||||
pub(crate) async fn load_fresh(&self, expected_version: &str) -> Option<ModelsCache> {
|
||||
info!(
|
||||
cache_path = %self.cache_path.display(),
|
||||
expected_version,
|
||||
"models cache: attempting load_fresh"
|
||||
tracing::info!(
|
||||
target: "codex_core::thread_manager",
|
||||
cache_path = %self.cache_path.display(),
|
||||
expected_version,
|
||||
"models cache load timing"
|
||||
);
|
||||
let cache = match self.load().await {
|
||||
Ok(cache) => cache?,
|
||||
Ok(Some(cache)) => cache,
|
||||
Ok(None) => {
|
||||
tracing::info!(
|
||||
target: "codex_core::thread_manager",
|
||||
cache_path = %self.cache_path.display(),
|
||||
expected_version,
|
||||
cache_hit = false,
|
||||
cache_miss_reason = "not_found",
|
||||
"models cache decision"
|
||||
);
|
||||
return None;
|
||||
}
|
||||
Err(err) => {
|
||||
error!("failed to load models cache: {err}");
|
||||
tracing::info!(
|
||||
target: "codex_core::thread_manager",
|
||||
cache_path = %self.cache_path.display(),
|
||||
expected_version,
|
||||
cache_hit = false,
|
||||
cache_miss_reason = "load_error",
|
||||
error = %err,
|
||||
"models cache decision"
|
||||
);
|
||||
return None;
|
||||
}
|
||||
};
|
||||
info!(
|
||||
let age_secs = Utc::now()
|
||||
.signed_duration_since(cache.fetched_at)
|
||||
.num_milliseconds() as f64
|
||||
/ 1000.0;
|
||||
tracing::info!(
|
||||
target: "codex_core::thread_manager",
|
||||
cache_path = %self.cache_path.display(),
|
||||
cached_version = ?cache.client_version,
|
||||
fetched_at = %cache.fetched_at,
|
||||
age_secs = %age_secs,
|
||||
model_count = cache.models.len(),
|
||||
"models cache: loaded cache file"
|
||||
);
|
||||
if cache.client_version.as_deref() != Some(expected_version) {
|
||||
info!(
|
||||
tracing::info!(
|
||||
target: "codex_core::thread_manager",
|
||||
cache_path = %self.cache_path.display(),
|
||||
expected_version,
|
||||
cached_version = ?cache.client_version,
|
||||
"models cache: cache version mismatch"
|
||||
cache_hit = false,
|
||||
cache_miss_reason = "version_mismatch",
|
||||
"models cache decision"
|
||||
);
|
||||
return None;
|
||||
}
|
||||
if !cache.is_fresh(self.cache_ttl) {
|
||||
info!(
|
||||
tracing::info!(
|
||||
target: "codex_core::thread_manager",
|
||||
cache_path = %self.cache_path.display(),
|
||||
cache_ttl_secs = self.cache_ttl.as_secs(),
|
||||
fetched_at = %cache.fetched_at,
|
||||
"models cache: cache is stale"
|
||||
age_secs = %age_secs,
|
||||
cache_hit = false,
|
||||
cache_miss_reason = "stale",
|
||||
"models cache decision"
|
||||
);
|
||||
return None;
|
||||
}
|
||||
info!(
|
||||
tracing::info!(
|
||||
target: "codex_core::thread_manager",
|
||||
cache_path = %self.cache_path.display(),
|
||||
cache_ttl_secs = self.cache_ttl.as_secs(),
|
||||
"models cache: cache hit"
|
||||
age_secs = %age_secs,
|
||||
cache_hit = true,
|
||||
cache_miss_reason = "none",
|
||||
model_count = cache.models.len(),
|
||||
"models cache decision"
|
||||
);
|
||||
Some(cache)
|
||||
}
|
||||
|
||||
@@ -268,51 +268,154 @@ impl ModelsManager for OpenAiModelsManager {
|
||||
impl OpenAiModelsManager {
|
||||
/// Refresh available models according to the specified strategy.
|
||||
async fn refresh_available_models(&self, refresh_strategy: RefreshStrategy) -> CoreResult<()> {
|
||||
if !self.should_refresh_models().await {
|
||||
let refresh_started_at = std::time::Instant::now();
|
||||
let should_refresh_started_at = std::time::Instant::now();
|
||||
let should_refresh = self.should_refresh_models().await;
|
||||
let should_refresh_ms = should_refresh_started_at.elapsed().as_millis();
|
||||
if !should_refresh {
|
||||
if matches!(
|
||||
refresh_strategy,
|
||||
RefreshStrategy::Offline | RefreshStrategy::OnlineIfUncached
|
||||
) {
|
||||
self.try_load_cache().await;
|
||||
let cache_started_at = std::time::Instant::now();
|
||||
let cache_hit = self.try_load_cache().await;
|
||||
tracing::info!(
|
||||
target: "codex_core::thread_manager",
|
||||
refresh_strategy = %refresh_strategy,
|
||||
should_refresh = false,
|
||||
should_refresh_ms = %should_refresh_ms,
|
||||
cache_lookup_ms = %cache_started_at.elapsed().as_millis(),
|
||||
cache_hit,
|
||||
total_ms = %refresh_started_at.elapsed().as_millis(),
|
||||
"model list refresh timing"
|
||||
);
|
||||
}
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
match refresh_strategy {
|
||||
let result = match refresh_strategy {
|
||||
RefreshStrategy::Offline => {
|
||||
// Only try to load from cache, never fetch
|
||||
self.try_load_cache().await;
|
||||
let cache_started_at = std::time::Instant::now();
|
||||
let cache_hit = self.try_load_cache().await;
|
||||
tracing::info!(
|
||||
target: "codex_core::thread_manager",
|
||||
refresh_strategy = %refresh_strategy,
|
||||
should_refresh = true,
|
||||
should_refresh_ms = %should_refresh_ms,
|
||||
cache_lookup_ms = %cache_started_at.elapsed().as_millis(),
|
||||
cache_hit,
|
||||
fetch_ms = 0,
|
||||
total_ms = %refresh_started_at.elapsed().as_millis(),
|
||||
"model list refresh timing"
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
RefreshStrategy::OnlineIfUncached => {
|
||||
// Try cache first, fall back to online if unavailable
|
||||
let cache_started_at = std::time::Instant::now();
|
||||
if self.try_load_cache().await {
|
||||
info!("models cache: using cached models for OnlineIfUncached");
|
||||
tracing::info!(
|
||||
target: "codex_core::thread_manager",
|
||||
refresh_strategy = %refresh_strategy,
|
||||
should_refresh = true,
|
||||
should_refresh_ms = %should_refresh_ms,
|
||||
cache_lookup_ms = %cache_started_at.elapsed().as_millis(),
|
||||
cache_hit = true,
|
||||
fetch_ms = 0,
|
||||
total_ms = %refresh_started_at.elapsed().as_millis(),
|
||||
"model list refresh timing"
|
||||
);
|
||||
return Ok(());
|
||||
}
|
||||
info!("models cache: cache miss, fetching remote models");
|
||||
self.fetch_and_update_models().await
|
||||
let fetch_started_at = std::time::Instant::now();
|
||||
let result = self.fetch_and_update_models().await;
|
||||
tracing::info!(
|
||||
target: "codex_core::thread_manager",
|
||||
refresh_strategy = %refresh_strategy,
|
||||
should_refresh = true,
|
||||
should_refresh_ms = %should_refresh_ms,
|
||||
cache_lookup_ms = %cache_started_at.elapsed().as_millis(),
|
||||
cache_hit = false,
|
||||
fetch_ms = %fetch_started_at.elapsed().as_millis(),
|
||||
total_ms = %refresh_started_at.elapsed().as_millis(),
|
||||
"model list refresh timing"
|
||||
);
|
||||
result
|
||||
}
|
||||
RefreshStrategy::Online => {
|
||||
// Always fetch from network
|
||||
self.fetch_and_update_models().await
|
||||
let fetch_started_at = std::time::Instant::now();
|
||||
let result = self.fetch_and_update_models().await;
|
||||
tracing::info!(
|
||||
target: "codex_core::thread_manager",
|
||||
refresh_strategy = %refresh_strategy,
|
||||
should_refresh = true,
|
||||
should_refresh_ms = %should_refresh_ms,
|
||||
cache_lookup_ms = 0,
|
||||
cache_hit = false,
|
||||
fetch_ms = %fetch_started_at.elapsed().as_millis(),
|
||||
total_ms = %refresh_started_at.elapsed().as_millis(),
|
||||
"model list refresh timing"
|
||||
);
|
||||
result
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
result
|
||||
}
|
||||
|
||||
async fn fetch_and_update_models(&self) -> CoreResult<()> {
|
||||
let fetch_started_at = std::time::Instant::now();
|
||||
let client_version = crate::client_version_to_whole();
|
||||
let remote_list_started_at = std::time::Instant::now();
|
||||
let (models, etag) = self.endpoint_client.list_models(&client_version).await?;
|
||||
let remote_list_ms = remote_list_started_at.elapsed().as_millis();
|
||||
let apply_started_at = std::time::Instant::now();
|
||||
self.apply_remote_models(models.clone()).await;
|
||||
let apply_ms = apply_started_at.elapsed().as_millis();
|
||||
let etag_write_started_at = std::time::Instant::now();
|
||||
*self.etag.write().await = etag.clone();
|
||||
let etag_write_ms = etag_write_started_at.elapsed().as_millis();
|
||||
let persist_cache_started_at = std::time::Instant::now();
|
||||
self.cache_manager
|
||||
.persist_cache(&models, etag, client_version)
|
||||
.await;
|
||||
tracing::info!(
|
||||
target: "codex_core::thread_manager",
|
||||
remote_list_ms = %remote_list_ms,
|
||||
apply_ms = %apply_ms,
|
||||
etag_write_ms = %etag_write_ms,
|
||||
persist_cache_ms = %persist_cache_started_at.elapsed().as_millis(),
|
||||
model_count = models.len(),
|
||||
total_ms = %fetch_started_at.elapsed().as_millis(),
|
||||
"model list fetch timing"
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn should_refresh_models(&self) -> bool {
|
||||
self.endpoint_client.uses_codex_backend().await || self.endpoint_client.has_command_auth()
|
||||
let should_refresh_started_at = std::time::Instant::now();
|
||||
let uses_codex_backend_started_at = std::time::Instant::now();
|
||||
let uses_codex_backend = self.endpoint_client.uses_codex_backend().await;
|
||||
let uses_codex_backend_ms = uses_codex_backend_started_at.elapsed().as_millis();
|
||||
let has_command_auth_started_at = std::time::Instant::now();
|
||||
let has_command_auth = self.endpoint_client.has_command_auth();
|
||||
let has_command_auth_ms = has_command_auth_started_at.elapsed().as_millis();
|
||||
let should_refresh = uses_codex_backend || has_command_auth;
|
||||
tracing::info!(
|
||||
target: "codex_core::thread_manager",
|
||||
uses_codex_backend,
|
||||
uses_codex_backend_ms = %uses_codex_backend_ms,
|
||||
has_command_auth,
|
||||
has_command_auth_ms = %has_command_auth_ms,
|
||||
should_refresh,
|
||||
total_ms = %should_refresh_started_at.elapsed().as_millis(),
|
||||
"model list refresh gate timing"
|
||||
);
|
||||
should_refresh
|
||||
}
|
||||
|
||||
async fn get_etag(&self) -> Option<String> {
|
||||
|
||||
@@ -55,7 +55,7 @@ pub async fn determine_streamable_http_auth_status(
|
||||
debug!(
|
||||
"failed to detect OAuth support for MCP server `{server_name}` at {url}: {error:?}"
|
||||
);
|
||||
Ok(McpAuthStatus::Unsupported)
|
||||
Err(error)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -779,13 +779,29 @@ impl App {
|
||||
&initial_images,
|
||||
);
|
||||
let thread_and_widget_started_at = Instant::now();
|
||||
let (mut chat_widget, initial_started_thread) = match session_selection {
|
||||
let session_selection_label = match &session_selection {
|
||||
SessionSelection::StartFresh => "start_fresh",
|
||||
SessionSelection::Resume(_) => "resume",
|
||||
SessionSelection::Fork(_) => "fork",
|
||||
SessionSelection::Exit => "exit",
|
||||
};
|
||||
let (
|
||||
mut chat_widget,
|
||||
initial_started_thread,
|
||||
thread_rpc_ms,
|
||||
startup_tooltip_ms,
|
||||
widget_construct_ms,
|
||||
) = match session_selection {
|
||||
SessionSelection::StartFresh | SessionSelection::Exit => {
|
||||
let thread_rpc_started_at = Instant::now();
|
||||
let started = app_server.start_thread(&config).await?;
|
||||
let thread_rpc_ms = thread_rpc_started_at.elapsed().as_millis();
|
||||
// Only count a startup tooltip once the fresh thread can actually render it.
|
||||
let startup_tooltip_started_at = Instant::now();
|
||||
let startup_tooltip_override =
|
||||
prepare_startup_tooltip_override(&mut config, &available_models, is_first_run)
|
||||
.await;
|
||||
let startup_tooltip_ms = startup_tooltip_started_at.elapsed().as_millis();
|
||||
let init = crate::chatwidget::ChatWidgetInit {
|
||||
config: config.clone(),
|
||||
environment_manager: environment_manager.clone(),
|
||||
@@ -813,9 +829,19 @@ impl App {
|
||||
.clone(),
|
||||
session_telemetry: session_telemetry.clone(),
|
||||
};
|
||||
(ChatWidget::new_with_app_event(init), Some(started))
|
||||
let widget_construct_started_at = Instant::now();
|
||||
let chat_widget = ChatWidget::new_with_app_event(init);
|
||||
let widget_construct_ms = widget_construct_started_at.elapsed().as_millis();
|
||||
(
|
||||
chat_widget,
|
||||
Some(started),
|
||||
thread_rpc_ms,
|
||||
startup_tooltip_ms,
|
||||
widget_construct_ms,
|
||||
)
|
||||
}
|
||||
SessionSelection::Resume(target_session) => {
|
||||
let thread_rpc_started_at = Instant::now();
|
||||
let resumed = app_server
|
||||
.resume_thread(config.clone(), target_session.thread_id)
|
||||
.await
|
||||
@@ -823,6 +849,7 @@ impl App {
|
||||
let target_label = target_session.display_label();
|
||||
format!("Failed to resume session from {target_label}")
|
||||
})?;
|
||||
let thread_rpc_ms = thread_rpc_started_at.elapsed().as_millis();
|
||||
let init = crate::chatwidget::ChatWidgetInit {
|
||||
config: config.clone(),
|
||||
environment_manager: environment_manager.clone(),
|
||||
@@ -850,7 +877,16 @@ impl App {
|
||||
.clone(),
|
||||
session_telemetry: session_telemetry.clone(),
|
||||
};
|
||||
(ChatWidget::new_with_app_event(init), Some(resumed))
|
||||
let widget_construct_started_at = Instant::now();
|
||||
let chat_widget = ChatWidget::new_with_app_event(init);
|
||||
let widget_construct_ms = widget_construct_started_at.elapsed().as_millis();
|
||||
(
|
||||
chat_widget,
|
||||
Some(resumed),
|
||||
thread_rpc_ms,
|
||||
/*startup_tooltip_ms*/ 0,
|
||||
widget_construct_ms,
|
||||
)
|
||||
}
|
||||
SessionSelection::Fork(target_session) => {
|
||||
session_telemetry.counter(
|
||||
@@ -858,6 +894,7 @@ impl App {
|
||||
/*inc*/ 1,
|
||||
&[("source", "cli_subcommand")],
|
||||
);
|
||||
let thread_rpc_started_at = Instant::now();
|
||||
let forked = app_server
|
||||
.fork_thread(config.clone(), target_session.thread_id)
|
||||
.await
|
||||
@@ -865,6 +902,7 @@ impl App {
|
||||
let target_label = target_session.display_label();
|
||||
format!("Failed to fork session from {target_label}")
|
||||
})?;
|
||||
let thread_rpc_ms = thread_rpc_started_at.elapsed().as_millis();
|
||||
let init = crate::chatwidget::ChatWidgetInit {
|
||||
config: config.clone(),
|
||||
environment_manager: environment_manager.clone(),
|
||||
@@ -892,10 +930,30 @@ impl App {
|
||||
.clone(),
|
||||
session_telemetry: session_telemetry.clone(),
|
||||
};
|
||||
(ChatWidget::new_with_app_event(init), Some(forked))
|
||||
let widget_construct_started_at = Instant::now();
|
||||
let chat_widget = ChatWidget::new_with_app_event(init);
|
||||
let widget_construct_ms = widget_construct_started_at.elapsed().as_millis();
|
||||
(
|
||||
chat_widget,
|
||||
Some(forked),
|
||||
thread_rpc_ms,
|
||||
/*startup_tooltip_ms*/ 0,
|
||||
widget_construct_ms,
|
||||
)
|
||||
}
|
||||
};
|
||||
let thread_and_widget_ms = thread_and_widget_started_at.elapsed().as_millis();
|
||||
let thread_and_widget_other_ms = thread_and_widget_ms
|
||||
.saturating_sub(thread_rpc_ms + startup_tooltip_ms + widget_construct_ms);
|
||||
tracing::info!(
|
||||
thread_rpc_ms = %thread_rpc_ms,
|
||||
startup_tooltip_ms = %startup_tooltip_ms,
|
||||
widget_construct_ms = %widget_construct_ms,
|
||||
other_ms = %thread_and_widget_other_ms,
|
||||
total_ms = %thread_and_widget_ms,
|
||||
session_selection = session_selection_label,
|
||||
"tui thread and widget startup timing"
|
||||
);
|
||||
if let Some(message) = external_agent_config_migration_message {
|
||||
chat_widget.add_info_message(message, /*hint*/ None);
|
||||
}
|
||||
|
||||
@@ -120,6 +120,9 @@ use color_eyre::eyre::Result;
|
||||
use color_eyre::eyre::WrapErr;
|
||||
use std::collections::HashMap;
|
||||
use std::path::PathBuf;
|
||||
use std::time::Instant;
|
||||
use tokio::fs;
|
||||
use tokio::io::AsyncReadExt;
|
||||
|
||||
fn bootstrap_request_error(context: &'static str, err: TypedRequestError) -> color_eyre::Report {
|
||||
color_eyre::eyre::eyre!("{context}: {err}")
|
||||
@@ -205,27 +208,65 @@ impl AppServerSession {
|
||||
}
|
||||
|
||||
pub(crate) async fn bootstrap(&mut self, config: &Config) -> Result<AppServerBootstrap> {
|
||||
let bootstrap_started_at = Instant::now();
|
||||
let account_read_started_at = Instant::now();
|
||||
let account = self.read_account().await?;
|
||||
let account_read_ms = account_read_started_at.elapsed().as_millis();
|
||||
let model_request_id = self.next_request_id();
|
||||
let models: ModelListResponse = self
|
||||
let model_list_rpc_started_at = Instant::now();
|
||||
let model_list_request = ClientRequest::ModelList {
|
||||
request_id: model_request_id,
|
||||
params: ModelListParams {
|
||||
cursor: None,
|
||||
limit: None,
|
||||
include_hidden: Some(true),
|
||||
},
|
||||
};
|
||||
let model_list_raw = self
|
||||
.client
|
||||
.request_typed(ClientRequest::ModelList {
|
||||
request_id: model_request_id,
|
||||
params: ModelListParams {
|
||||
cursor: None,
|
||||
limit: None,
|
||||
include_hidden: Some(true),
|
||||
},
|
||||
})
|
||||
.request(model_list_request)
|
||||
.await
|
||||
.map_err(|err| {
|
||||
bootstrap_request_error("model/list failed during TUI bootstrap", err)
|
||||
bootstrap_request_error(
|
||||
"model/list failed during TUI bootstrap",
|
||||
TypedRequestError::Transport {
|
||||
method: "model/list".to_string(),
|
||||
source: err,
|
||||
},
|
||||
)
|
||||
})?;
|
||||
let model_list_rpc_ms = model_list_rpc_started_at.elapsed().as_millis();
|
||||
let model_list_result_started_at = Instant::now();
|
||||
let model_list_value = model_list_raw.map_err(|source| {
|
||||
bootstrap_request_error(
|
||||
"model/list failed during TUI bootstrap",
|
||||
TypedRequestError::Server {
|
||||
method: "model/list".to_string(),
|
||||
source,
|
||||
},
|
||||
)
|
||||
})?;
|
||||
let model_list_result_ms = model_list_result_started_at.elapsed().as_millis();
|
||||
let model_list_decode_started_at = Instant::now();
|
||||
let models: ModelListResponse =
|
||||
serde_json::from_value(model_list_value).map_err(|source| {
|
||||
bootstrap_request_error(
|
||||
"model/list failed during TUI bootstrap",
|
||||
TypedRequestError::Deserialize {
|
||||
method: "model/list".to_string(),
|
||||
source,
|
||||
},
|
||||
)
|
||||
})?;
|
||||
let model_list_decode_ms = model_list_decode_started_at.elapsed().as_millis();
|
||||
let model_preset_map_started_at = Instant::now();
|
||||
let available_models = models
|
||||
.data
|
||||
.into_iter()
|
||||
.map(model_preset_from_api_model)
|
||||
.collect::<Vec<_>>();
|
||||
let model_preset_map_ms = model_preset_map_started_at.elapsed().as_millis();
|
||||
let default_model_select_started_at = Instant::now();
|
||||
let default_model = config
|
||||
.model
|
||||
.clone()
|
||||
@@ -237,7 +278,9 @@ impl AppServerSession {
|
||||
})
|
||||
.or_else(|| available_models.first().map(|model| model.model.clone()))
|
||||
.wrap_err("model/list returned no models for TUI bootstrap")?;
|
||||
let default_model_select_ms = default_model_select_started_at.elapsed().as_millis();
|
||||
|
||||
let bootstrap_derive_started_at = Instant::now();
|
||||
let (
|
||||
account_email,
|
||||
auth_mode,
|
||||
@@ -277,6 +320,23 @@ impl AppServerSession {
|
||||
}
|
||||
None => (None, None, None, None, FeedbackAudience::External, false),
|
||||
};
|
||||
let bootstrap_derive_ms = bootstrap_derive_started_at.elapsed().as_millis();
|
||||
tracing::info!(
|
||||
account_read_ms = %account_read_ms,
|
||||
model_list_rpc_ms = %model_list_rpc_ms,
|
||||
model_list_result_ms = %model_list_result_ms,
|
||||
model_list_decode_ms = %model_list_decode_ms,
|
||||
model_preset_map_ms = %model_preset_map_ms,
|
||||
default_model_select_ms = %default_model_select_ms,
|
||||
model_list_total_ms = %(model_list_rpc_ms
|
||||
+ model_list_result_ms
|
||||
+ model_list_decode_ms
|
||||
+ model_preset_map_ms
|
||||
+ default_model_select_ms),
|
||||
derive_ms = %bootstrap_derive_ms,
|
||||
total_ms = %bootstrap_started_at.elapsed().as_millis(),
|
||||
"tui bootstrap timing"
|
||||
);
|
||||
Ok(AppServerBootstrap {
|
||||
account_email,
|
||||
auth_mode,
|
||||
@@ -346,23 +406,37 @@ impl AppServerSession {
|
||||
config: &Config,
|
||||
session_start_source: Option<ThreadStartSource>,
|
||||
) -> Result<AppServerStartedThread> {
|
||||
let start_thread_started_at = Instant::now();
|
||||
let request_id = self.next_request_id();
|
||||
let params_build_started_at = Instant::now();
|
||||
let params = thread_start_params_from_config(
|
||||
config,
|
||||
self.thread_params_mode(),
|
||||
self.remote_cwd_override.as_deref(),
|
||||
session_start_source,
|
||||
);
|
||||
let params_build_ms = params_build_started_at.elapsed().as_millis();
|
||||
let rpc_started_at = Instant::now();
|
||||
let response: ThreadStartResponse = self
|
||||
.client
|
||||
.request_typed(ClientRequest::ThreadStart {
|
||||
request_id,
|
||||
params: thread_start_params_from_config(
|
||||
config,
|
||||
self.thread_params_mode(),
|
||||
self.remote_cwd_override.as_deref(),
|
||||
session_start_source,
|
||||
),
|
||||
})
|
||||
.request_typed(ClientRequest::ThreadStart { request_id, params })
|
||||
.await
|
||||
.map_err(|err| {
|
||||
bootstrap_request_error("thread/start failed during TUI bootstrap", err)
|
||||
})?;
|
||||
started_thread_from_start_response(response, config, self.thread_params_mode()).await
|
||||
let rpc_ms = rpc_started_at.elapsed().as_millis();
|
||||
let response_map_started_at = Instant::now();
|
||||
let started =
|
||||
started_thread_from_start_response(response, config, self.thread_params_mode()).await?;
|
||||
let response_map_ms = response_map_started_at.elapsed().as_millis();
|
||||
tracing::info!(
|
||||
params_build_ms = %params_build_ms,
|
||||
rpc_ms = %rpc_ms,
|
||||
response_map_ms = %response_map_ms,
|
||||
total_ms = %start_thread_started_at.elapsed().as_millis(),
|
||||
"tui thread start timing"
|
||||
);
|
||||
Ok(started)
|
||||
}
|
||||
|
||||
pub(crate) async fn resume_thread(
|
||||
@@ -1372,14 +1446,22 @@ async fn started_thread_from_start_response(
|
||||
config: &Config,
|
||||
thread_params_mode: ThreadParamsMode,
|
||||
) -> Result<AppServerStartedThread> {
|
||||
let started_thread_started_at = Instant::now();
|
||||
let session =
|
||||
thread_session_state_from_thread_start_response(&response, config, thread_params_mode)
|
||||
.await
|
||||
.map_err(color_eyre::eyre::Report::msg)?;
|
||||
Ok(AppServerStartedThread {
|
||||
session,
|
||||
turns: response.thread.turns,
|
||||
})
|
||||
let session_state_ms = started_thread_started_at.elapsed().as_millis();
|
||||
let turns_attach_started_at = Instant::now();
|
||||
let turns = response.thread.turns;
|
||||
let turns_attach_ms = turns_attach_started_at.elapsed().as_millis();
|
||||
tracing::info!(
|
||||
session_state_ms = %session_state_ms,
|
||||
turns_attach_ms = %turns_attach_ms,
|
||||
total_ms = %started_thread_started_at.elapsed().as_millis(),
|
||||
"tui start response mapping timing"
|
||||
);
|
||||
Ok(AppServerStartedThread { session, turns })
|
||||
}
|
||||
|
||||
async fn started_thread_from_resume_response(
|
||||
@@ -1417,13 +1499,17 @@ async fn thread_session_state_from_thread_start_response(
|
||||
config: &Config,
|
||||
thread_params_mode: ThreadParamsMode,
|
||||
) -> Result<ThreadSessionState, String> {
|
||||
let session_state_started_at = Instant::now();
|
||||
let permission_profile_started_at = Instant::now();
|
||||
let permission_profile = display_permission_profile_from_thread_response(
|
||||
&response.sandbox,
|
||||
response.cwd.as_path(),
|
||||
config,
|
||||
thread_params_mode,
|
||||
);
|
||||
thread_session_state_from_thread_response(
|
||||
let permission_profile_ms = permission_profile_started_at.elapsed().as_millis();
|
||||
let session_state_core_started_at = Instant::now();
|
||||
let session = thread_session_state_from_thread_response(
|
||||
&response.thread.id,
|
||||
response.thread.forked_from_id.clone(),
|
||||
response.thread.name.clone(),
|
||||
@@ -1441,7 +1527,15 @@ async fn thread_session_state_from_thread_start_response(
|
||||
response.reasoning_effort,
|
||||
config,
|
||||
)
|
||||
.await
|
||||
.await?;
|
||||
let session_state_core_ms = session_state_core_started_at.elapsed().as_millis();
|
||||
tracing::info!(
|
||||
permission_profile_ms = %permission_profile_ms,
|
||||
session_state_core_ms = %session_state_core_ms,
|
||||
total_ms = %session_state_started_at.elapsed().as_millis(),
|
||||
"tui thread start session state timing"
|
||||
);
|
||||
Ok(session)
|
||||
}
|
||||
|
||||
async fn thread_session_state_from_thread_resume_response(
|
||||
@@ -1553,6 +1647,8 @@ async fn thread_session_state_from_thread_response(
|
||||
reasoning_effort: Option<codex_protocol::openai_models::ReasoningEffort>,
|
||||
config: &Config,
|
||||
) -> Result<ThreadSessionState, String> {
|
||||
let session_state_from_response_started_at = Instant::now();
|
||||
let id_parse_started_at = Instant::now();
|
||||
let thread_id = ThreadId::from_string(thread_id)
|
||||
.map_err(|err| format!("thread id `{thread_id}` is invalid: {err}"))?;
|
||||
let forked_from_id = forked_from_id
|
||||
@@ -1560,10 +1656,21 @@ async fn thread_session_state_from_thread_response(
|
||||
.map(ThreadId::from_string)
|
||||
.transpose()
|
||||
.map_err(|err| format!("forked_from_id is invalid: {err}"))?;
|
||||
let id_parse_ms = id_parse_started_at.elapsed().as_millis();
|
||||
let history_metadata_started_at = Instant::now();
|
||||
let history_config =
|
||||
codex_message_history::HistoryConfig::new(config.codex_home.clone(), &config.history);
|
||||
let (log_id, entry_count) = codex_message_history::history_metadata(&history_config).await;
|
||||
Ok(ThreadSessionState {
|
||||
let (
|
||||
log_id,
|
||||
entry_count,
|
||||
history_metadata_stat_ms,
|
||||
history_metadata_open_ms,
|
||||
history_metadata_scan_ms,
|
||||
history_metadata_bytes_read,
|
||||
) = history_metadata_with_timing(&history_config).await;
|
||||
let history_metadata_ms = history_metadata_started_at.elapsed().as_millis();
|
||||
let struct_build_started_at = Instant::now();
|
||||
let session = ThreadSessionState {
|
||||
thread_id,
|
||||
forked_from_id,
|
||||
fork_parent_title: None,
|
||||
@@ -1585,7 +1692,103 @@ async fn thread_session_state_from_thread_response(
|
||||
}),
|
||||
network_proxy: None,
|
||||
rollout_path,
|
||||
})
|
||||
};
|
||||
let struct_build_ms = struct_build_started_at.elapsed().as_millis();
|
||||
tracing::info!(
|
||||
id_parse_ms = %id_parse_ms,
|
||||
history_metadata_ms = %history_metadata_ms,
|
||||
history_metadata_stat_ms = %history_metadata_stat_ms,
|
||||
history_metadata_open_ms = %history_metadata_open_ms,
|
||||
history_metadata_scan_ms = %history_metadata_scan_ms,
|
||||
history_metadata_bytes_read = %history_metadata_bytes_read,
|
||||
history_metadata_entry_count = %entry_count,
|
||||
struct_build_ms = %struct_build_ms,
|
||||
total_ms = %session_state_from_response_started_at.elapsed().as_millis(),
|
||||
"tui thread session state materialization timing"
|
||||
);
|
||||
Ok(session)
|
||||
}
|
||||
|
||||
async fn history_metadata_with_timing(
|
||||
config: &codex_message_history::HistoryConfig,
|
||||
) -> (u64, usize, u128, u128, u128, usize) {
|
||||
let path = config.codex_home.join("history.jsonl");
|
||||
|
||||
let stat_started_at = Instant::now();
|
||||
let metadata = match fs::metadata(&path).await {
|
||||
Ok(metadata) => metadata,
|
||||
Err(err) if err.kind() == std::io::ErrorKind::NotFound => {
|
||||
return (0, 0, stat_started_at.elapsed().as_millis(), 0, 0, 0);
|
||||
}
|
||||
Err(_) => return (0, 0, stat_started_at.elapsed().as_millis(), 0, 0, 0),
|
||||
};
|
||||
let stat_ms = stat_started_at.elapsed().as_millis();
|
||||
|
||||
let open_started_at = Instant::now();
|
||||
let mut file = match fs::File::open(&path).await {
|
||||
Ok(file) => file,
|
||||
Err(_) => {
|
||||
return (
|
||||
history_log_identity(&metadata),
|
||||
0,
|
||||
stat_ms,
|
||||
open_started_at.elapsed().as_millis(),
|
||||
0,
|
||||
0,
|
||||
);
|
||||
}
|
||||
};
|
||||
let open_ms = open_started_at.elapsed().as_millis();
|
||||
|
||||
let scan_started_at = Instant::now();
|
||||
let mut buf = [0u8; 8192];
|
||||
let mut entry_count = 0usize;
|
||||
let mut bytes_read = 0usize;
|
||||
loop {
|
||||
match file.read(&mut buf).await {
|
||||
Ok(0) => break,
|
||||
Ok(n) => {
|
||||
bytes_read += n;
|
||||
entry_count += buf[..n].iter().filter(|&&b| b == b'\n').count();
|
||||
}
|
||||
Err(_) => {
|
||||
return (
|
||||
history_log_identity(&metadata),
|
||||
0,
|
||||
stat_ms,
|
||||
open_ms,
|
||||
scan_started_at.elapsed().as_millis(),
|
||||
bytes_read,
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
(
|
||||
history_log_identity(&metadata),
|
||||
entry_count,
|
||||
stat_ms,
|
||||
open_ms,
|
||||
scan_started_at.elapsed().as_millis(),
|
||||
bytes_read,
|
||||
)
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
fn history_log_identity(metadata: &std::fs::Metadata) -> u64 {
|
||||
use std::os::unix::fs::MetadataExt;
|
||||
metadata.ino()
|
||||
}
|
||||
|
||||
#[cfg(windows)]
|
||||
fn history_log_identity(metadata: &std::fs::Metadata) -> u64 {
|
||||
use std::os::windows::fs::MetadataExt;
|
||||
metadata.creation_time()
|
||||
}
|
||||
|
||||
#[cfg(not(any(unix, windows)))]
|
||||
fn history_log_identity(_metadata: &std::fs::Metadata) -> u64 {
|
||||
0
|
||||
}
|
||||
|
||||
pub(crate) fn app_server_rate_limit_snapshots(
|
||||
|
||||
Reference in New Issue
Block a user