mirror of
https://github.com/openai/codex.git
synced 2026-03-15 11:13:47 +00:00
Compare commits
3 Commits
dev/friel/
...
bot/update
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
0d07d794ca | ||
|
|
49edf311ac | ||
|
|
d692b74007 |
@@ -399,7 +399,7 @@ impl CloudRequirementsService {
|
||||
"Cloud requirements request was unauthorized; attempting auth recovery"
|
||||
);
|
||||
match auth_recovery.next().await {
|
||||
Ok(()) => {
|
||||
Ok(_) => {
|
||||
let Some(refreshed_auth) = self.auth_manager.auth().await else {
|
||||
tracing::error!(
|
||||
"Auth recovery succeeded but no auth is available for cloud requirements"
|
||||
|
||||
@@ -214,6 +214,7 @@ impl ResponsesWebsocketConnection {
|
||||
pub async fn stream_request(
|
||||
&self,
|
||||
request: ResponsesWsRequest,
|
||||
connection_reused: bool,
|
||||
) -> Result<ResponseStream, ApiError> {
|
||||
let (tx_event, rx_event) =
|
||||
mpsc::channel::<std::result::Result<ResponseEvent, ApiError>>(1600);
|
||||
@@ -258,6 +259,7 @@ impl ResponsesWebsocketConnection {
|
||||
request_body,
|
||||
idle_timeout,
|
||||
telemetry,
|
||||
connection_reused,
|
||||
)
|
||||
.await
|
||||
};
|
||||
@@ -534,6 +536,7 @@ async fn run_websocket_response_stream(
|
||||
request_body: Value,
|
||||
idle_timeout: Duration,
|
||||
telemetry: Option<Arc<dyn WebsocketTelemetry>>,
|
||||
connection_reused: bool,
|
||||
) -> Result<(), ApiError> {
|
||||
let mut last_server_model: Option<String> = None;
|
||||
let request_text = match serde_json::to_string(&request_body) {
|
||||
@@ -553,7 +556,11 @@ async fn run_websocket_response_stream(
|
||||
.map_err(|err| ApiError::Stream(format!("failed to send websocket request: {err}")));
|
||||
|
||||
if let Some(t) = telemetry.as_ref() {
|
||||
t.on_ws_request(request_start.elapsed(), result.as_ref().err());
|
||||
t.on_ws_request(
|
||||
request_start.elapsed(),
|
||||
result.as_ref().err(),
|
||||
connection_reused,
|
||||
);
|
||||
}
|
||||
|
||||
result?;
|
||||
|
||||
@@ -33,7 +33,7 @@ pub trait SseTelemetry: Send + Sync {
|
||||
|
||||
/// Telemetry for Responses WebSocket transport.
|
||||
pub trait WebsocketTelemetry: Send + Sync {
|
||||
fn on_ws_request(&self, duration: Duration, error: Option<&ApiError>);
|
||||
fn on_ws_request(&self, duration: Duration, error: Option<&ApiError>, connection_reused: bool);
|
||||
|
||||
fn on_ws_event(
|
||||
&self,
|
||||
|
||||
@@ -338,9 +338,6 @@
|
||||
"apps": {
|
||||
"type": "boolean"
|
||||
},
|
||||
"apps_mcp_gateway": {
|
||||
"type": "boolean"
|
||||
},
|
||||
"artifact": {
|
||||
"type": "boolean"
|
||||
},
|
||||
@@ -1890,9 +1887,6 @@
|
||||
"apps": {
|
||||
"type": "boolean"
|
||||
},
|
||||
"apps_mcp_gateway": {
|
||||
"type": "boolean"
|
||||
},
|
||||
"artifact": {
|
||||
"type": "boolean"
|
||||
},
|
||||
|
||||
File diff suppressed because one or more lines are too long
@@ -1,3 +1,4 @@
|
||||
use base64::Engine;
|
||||
use chrono::DateTime;
|
||||
use chrono::Utc;
|
||||
use codex_api::AuthProvider as ApiAuthProvider;
|
||||
@@ -7,6 +8,7 @@ use codex_api::rate_limits::parse_promo_message;
|
||||
use codex_api::rate_limits::parse_rate_limit_for_limit;
|
||||
use http::HeaderMap;
|
||||
use serde::Deserialize;
|
||||
use serde_json::Value;
|
||||
|
||||
use crate::auth::CodexAuth;
|
||||
use crate::error::CodexErr;
|
||||
@@ -30,6 +32,8 @@ pub(crate) fn map_api_error(err: ApiError) -> CodexErr {
|
||||
url: None,
|
||||
cf_ray: None,
|
||||
request_id: None,
|
||||
identity_authorization_error: None,
|
||||
identity_error_code: None,
|
||||
}),
|
||||
ApiError::InvalidRequest { message } => CodexErr::InvalidRequest(message),
|
||||
ApiError::Transport(transport) => match transport {
|
||||
@@ -98,6 +102,11 @@ pub(crate) fn map_api_error(err: ApiError) -> CodexErr {
|
||||
url,
|
||||
cf_ray: extract_header(headers.as_ref(), CF_RAY_HEADER),
|
||||
request_id: extract_request_id(headers.as_ref()),
|
||||
identity_authorization_error: extract_header(
|
||||
headers.as_ref(),
|
||||
X_OPENAI_AUTHORIZATION_ERROR_HEADER,
|
||||
),
|
||||
identity_error_code: extract_x_error_json_code(headers.as_ref()),
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -118,6 +127,8 @@ const ACTIVE_LIMIT_HEADER: &str = "x-codex-active-limit";
|
||||
const REQUEST_ID_HEADER: &str = "x-request-id";
|
||||
const OAI_REQUEST_ID_HEADER: &str = "x-oai-request-id";
|
||||
const CF_RAY_HEADER: &str = "cf-ray";
|
||||
const X_OPENAI_AUTHORIZATION_ERROR_HEADER: &str = "x-openai-authorization-error";
|
||||
const X_ERROR_JSON_HEADER: &str = "x-error-json";
|
||||
|
||||
#[cfg(test)]
|
||||
#[path = "api_bridge_tests.rs"]
|
||||
@@ -140,6 +151,19 @@ fn extract_header(headers: Option<&HeaderMap>, name: &str) -> Option<String> {
|
||||
})
|
||||
}
|
||||
|
||||
fn extract_x_error_json_code(headers: Option<&HeaderMap>) -> Option<String> {
|
||||
let encoded = extract_header(headers, X_ERROR_JSON_HEADER)?;
|
||||
let decoded = base64::engine::general_purpose::STANDARD
|
||||
.decode(encoded)
|
||||
.ok()?;
|
||||
let parsed = serde_json::from_slice::<Value>(&decoded).ok()?;
|
||||
parsed
|
||||
.get("error")
|
||||
.and_then(|error| error.get("code"))
|
||||
.and_then(Value::as_str)
|
||||
.map(str::to_string)
|
||||
}
|
||||
|
||||
pub(crate) fn auth_provider_from_auth(
|
||||
auth: Option<CodexAuth>,
|
||||
provider: &ModelProviderInfo,
|
||||
@@ -191,6 +215,26 @@ pub(crate) struct CoreAuthProvider {
|
||||
account_id: Option<String>,
|
||||
}
|
||||
|
||||
impl CoreAuthProvider {
|
||||
pub(crate) fn auth_header_attached(&self) -> bool {
|
||||
self.token
|
||||
.as_ref()
|
||||
.is_some_and(|token| http::HeaderValue::from_str(&format!("Bearer {token}")).is_ok())
|
||||
}
|
||||
|
||||
pub(crate) fn auth_header_name(&self) -> Option<&'static str> {
|
||||
self.auth_header_attached().then_some("authorization")
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub(crate) fn for_test(token: Option<&str>, account_id: Option<&str>) -> Self {
|
||||
Self {
|
||||
token: token.map(str::to_string),
|
||||
account_id: account_id.map(str::to_string),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl ApiAuthProvider for CoreAuthProvider {
|
||||
fn bearer_token(&self) -> Option<String> {
|
||||
self.token.clone()
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
use super::*;
|
||||
use base64::Engine;
|
||||
use pretty_assertions::assert_eq;
|
||||
|
||||
#[test]
|
||||
@@ -94,3 +95,49 @@ fn map_api_error_does_not_fallback_limit_name_to_limit_id() {
|
||||
None
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn map_api_error_extracts_identity_auth_details_from_headers() {
|
||||
let mut headers = HeaderMap::new();
|
||||
headers.insert(REQUEST_ID_HEADER, http::HeaderValue::from_static("req-401"));
|
||||
headers.insert(CF_RAY_HEADER, http::HeaderValue::from_static("ray-401"));
|
||||
headers.insert(
|
||||
X_OPENAI_AUTHORIZATION_ERROR_HEADER,
|
||||
http::HeaderValue::from_static("missing_authorization_header"),
|
||||
);
|
||||
let x_error_json =
|
||||
base64::engine::general_purpose::STANDARD.encode(r#"{"error":{"code":"token_expired"}}"#);
|
||||
headers.insert(
|
||||
X_ERROR_JSON_HEADER,
|
||||
http::HeaderValue::from_str(&x_error_json).expect("valid x-error-json header"),
|
||||
);
|
||||
|
||||
let err = map_api_error(ApiError::Transport(TransportError::Http {
|
||||
status: http::StatusCode::UNAUTHORIZED,
|
||||
url: Some("https://chatgpt.com/backend-api/codex/models".to_string()),
|
||||
headers: Some(headers),
|
||||
body: Some(r#"{"detail":"Unauthorized"}"#.to_string()),
|
||||
}));
|
||||
|
||||
let CodexErr::UnexpectedStatus(err) = err else {
|
||||
panic!("expected CodexErr::UnexpectedStatus, got {err:?}");
|
||||
};
|
||||
assert_eq!(err.request_id.as_deref(), Some("req-401"));
|
||||
assert_eq!(err.cf_ray.as_deref(), Some("ray-401"));
|
||||
assert_eq!(
|
||||
err.identity_authorization_error.as_deref(),
|
||||
Some("missing_authorization_header")
|
||||
);
|
||||
assert_eq!(err.identity_error_code.as_deref(), Some("token_expired"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn core_auth_provider_reports_when_auth_header_will_attach() {
|
||||
let auth = CoreAuthProvider {
|
||||
token: Some("access-token".to_string()),
|
||||
account_id: None,
|
||||
};
|
||||
|
||||
assert!(auth.auth_header_attached());
|
||||
assert_eq!(auth.auth_header_name(), Some("authorization"));
|
||||
}
|
||||
|
||||
@@ -4,7 +4,7 @@ use codex_protocol::protocol::APPS_INSTRUCTIONS_OPEN_TAG;
|
||||
|
||||
pub(crate) fn render_apps_section() -> String {
|
||||
let body = format!(
|
||||
"## Apps\nApps are mentioned in user messages in the format `[$app-name](app://{{connector_id}})`.\nAn app is equivalent to a set of MCP tools within the `{CODEX_APPS_MCP_SERVER_NAME}` MCP.\nWhen you see an app mention, the app's MCP tools are either available tools in the `{CODEX_APPS_MCP_SERVER_NAME}` MCP server, or the tools do not exist because the user has not installed the app.\nDo not additionally call list_mcp_resources for apps that are already mentioned."
|
||||
"## Apps (Connectors)\nApps (Connectors) can be explicitly triggered in user messages in the format `[$app-name](app://{{connector_id}})`. Apps can also be implicitly triggered as long as the context suggests usage of available apps, the available apps will be listed by the `tool_search` tool.\nAn app is equivalent to a set of MCP tools within the `{CODEX_APPS_MCP_SERVER_NAME}` MCP.\nAn installed app's MCP tools are either provided to you already, or can be lazy-loaded through the `tool_search` tool.\nDo not additionally call list_mcp_resources or list_mcp_resource_templates for apps."
|
||||
);
|
||||
format!("{APPS_INSTRUCTIONS_OPEN_TAG}\n{body}\n{APPS_INSTRUCTIONS_CLOSE_TAG}")
|
||||
}
|
||||
|
||||
@@ -874,6 +874,17 @@ pub struct UnauthorizedRecovery {
|
||||
mode: UnauthorizedRecoveryMode,
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
|
||||
pub struct UnauthorizedRecoveryStepResult {
|
||||
auth_state_changed: Option<bool>,
|
||||
}
|
||||
|
||||
impl UnauthorizedRecoveryStepResult {
|
||||
pub fn auth_state_changed(&self) -> Option<bool> {
|
||||
self.auth_state_changed
|
||||
}
|
||||
}
|
||||
|
||||
impl UnauthorizedRecovery {
|
||||
fn new(manager: Arc<AuthManager>) -> Self {
|
||||
let cached_auth = manager.auth_cached();
|
||||
@@ -917,7 +928,46 @@ impl UnauthorizedRecovery {
|
||||
!matches!(self.step, UnauthorizedRecoveryStep::Done)
|
||||
}
|
||||
|
||||
pub async fn next(&mut self) -> Result<(), RefreshTokenError> {
|
||||
pub fn unavailable_reason(&self) -> &'static str {
|
||||
if !self
|
||||
.manager
|
||||
.auth_cached()
|
||||
.as_ref()
|
||||
.is_some_and(CodexAuth::is_chatgpt_auth)
|
||||
{
|
||||
return "not_chatgpt_auth";
|
||||
}
|
||||
|
||||
if self.mode == UnauthorizedRecoveryMode::External
|
||||
&& !self.manager.has_external_auth_refresher()
|
||||
{
|
||||
return "no_external_refresher";
|
||||
}
|
||||
|
||||
if matches!(self.step, UnauthorizedRecoveryStep::Done) {
|
||||
return "recovery_exhausted";
|
||||
}
|
||||
|
||||
"ready"
|
||||
}
|
||||
|
||||
pub fn mode_name(&self) -> &'static str {
|
||||
match self.mode {
|
||||
UnauthorizedRecoveryMode::Managed => "managed",
|
||||
UnauthorizedRecoveryMode::External => "external",
|
||||
}
|
||||
}
|
||||
|
||||
pub fn step_name(&self) -> &'static str {
|
||||
match self.step {
|
||||
UnauthorizedRecoveryStep::Reload => "reload",
|
||||
UnauthorizedRecoveryStep::RefreshToken => "refresh_token",
|
||||
UnauthorizedRecoveryStep::ExternalRefresh => "external_refresh",
|
||||
UnauthorizedRecoveryStep::Done => "done",
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn next(&mut self) -> Result<UnauthorizedRecoveryStepResult, RefreshTokenError> {
|
||||
if !self.has_next() {
|
||||
return Err(RefreshTokenError::Permanent(RefreshTokenFailedError::new(
|
||||
RefreshTokenFailedReason::Other,
|
||||
@@ -931,8 +981,17 @@ impl UnauthorizedRecovery {
|
||||
.manager
|
||||
.reload_if_account_id_matches(self.expected_account_id.as_deref())
|
||||
{
|
||||
ReloadOutcome::ReloadedChanged | ReloadOutcome::ReloadedNoChange => {
|
||||
ReloadOutcome::ReloadedChanged => {
|
||||
self.step = UnauthorizedRecoveryStep::RefreshToken;
|
||||
return Ok(UnauthorizedRecoveryStepResult {
|
||||
auth_state_changed: Some(true),
|
||||
});
|
||||
}
|
||||
ReloadOutcome::ReloadedNoChange => {
|
||||
self.step = UnauthorizedRecoveryStep::RefreshToken;
|
||||
return Ok(UnauthorizedRecoveryStepResult {
|
||||
auth_state_changed: Some(false),
|
||||
});
|
||||
}
|
||||
ReloadOutcome::Skipped => {
|
||||
self.step = UnauthorizedRecoveryStep::Done;
|
||||
@@ -946,16 +1005,24 @@ impl UnauthorizedRecovery {
|
||||
UnauthorizedRecoveryStep::RefreshToken => {
|
||||
self.manager.refresh_token_from_authority().await?;
|
||||
self.step = UnauthorizedRecoveryStep::Done;
|
||||
return Ok(UnauthorizedRecoveryStepResult {
|
||||
auth_state_changed: Some(true),
|
||||
});
|
||||
}
|
||||
UnauthorizedRecoveryStep::ExternalRefresh => {
|
||||
self.manager
|
||||
.refresh_external_auth(ExternalAuthRefreshReason::Unauthorized)
|
||||
.await?;
|
||||
self.step = UnauthorizedRecoveryStep::Done;
|
||||
return Ok(UnauthorizedRecoveryStepResult {
|
||||
auth_state_changed: Some(true),
|
||||
});
|
||||
}
|
||||
UnauthorizedRecoveryStep::Done => {}
|
||||
}
|
||||
Ok(())
|
||||
Ok(UnauthorizedRecoveryStepResult {
|
||||
auth_state_changed: None,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -13,6 +13,7 @@ use codex_protocol::config_types::ForcedLoginMethod;
|
||||
use pretty_assertions::assert_eq;
|
||||
use serde::Serialize;
|
||||
use serde_json::json;
|
||||
use std::sync::Arc;
|
||||
use tempfile::tempdir;
|
||||
|
||||
#[tokio::test]
|
||||
@@ -171,6 +172,33 @@ fn logout_removes_auth_file() -> Result<(), std::io::Error> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn unauthorized_recovery_reports_mode_and_step_names() {
|
||||
let dir = tempdir().unwrap();
|
||||
let manager = AuthManager::shared(
|
||||
dir.path().to_path_buf(),
|
||||
false,
|
||||
AuthCredentialsStoreMode::File,
|
||||
);
|
||||
let managed = UnauthorizedRecovery {
|
||||
manager: Arc::clone(&manager),
|
||||
step: UnauthorizedRecoveryStep::Reload,
|
||||
expected_account_id: None,
|
||||
mode: UnauthorizedRecoveryMode::Managed,
|
||||
};
|
||||
assert_eq!(managed.mode_name(), "managed");
|
||||
assert_eq!(managed.step_name(), "reload");
|
||||
|
||||
let external = UnauthorizedRecovery {
|
||||
manager,
|
||||
step: UnauthorizedRecoveryStep::ExternalRefresh,
|
||||
expected_account_id: None,
|
||||
mode: UnauthorizedRecoveryMode::External,
|
||||
};
|
||||
assert_eq!(external.mode_name(), "external");
|
||||
assert_eq!(external.step_name(), "external_refresh");
|
||||
}
|
||||
|
||||
struct AuthFileParams {
|
||||
openai_api_key: Option<String>,
|
||||
chatgpt_plan_type: Option<String>,
|
||||
|
||||
@@ -75,6 +75,7 @@ use http::HeaderValue;
|
||||
use http::StatusCode as HttpStatusCode;
|
||||
use reqwest::StatusCode;
|
||||
use std::time::Duration;
|
||||
use std::time::Instant;
|
||||
use tokio::sync::mpsc;
|
||||
use tokio::sync::oneshot;
|
||||
use tokio::sync::oneshot::error::TryRecvError;
|
||||
@@ -85,6 +86,7 @@ use tracing::trace;
|
||||
use tracing::warn;
|
||||
|
||||
use crate::AuthManager;
|
||||
use crate::auth::AuthMode;
|
||||
use crate::auth::CodexAuth;
|
||||
use crate::auth::RefreshTokenError;
|
||||
use crate::client_common::Prompt;
|
||||
@@ -97,7 +99,14 @@ use crate::error::Result;
|
||||
use crate::flags::CODEX_RS_SSE_FIXTURE;
|
||||
use crate::model_provider_info::ModelProviderInfo;
|
||||
use crate::model_provider_info::WireApi;
|
||||
use crate::response_debug_context::extract_response_debug_context;
|
||||
use crate::response_debug_context::extract_response_debug_context_from_api_error;
|
||||
use crate::response_debug_context::telemetry_api_error_message;
|
||||
use crate::response_debug_context::telemetry_transport_error_message;
|
||||
use crate::tools::spec::create_tools_json_for_responses_api;
|
||||
use crate::util::FeedbackRequestTags;
|
||||
use crate::util::emit_feedback_auth_recovery_tags;
|
||||
use crate::util::emit_feedback_request_tags;
|
||||
|
||||
pub const OPENAI_BETA_HEADER: &str = "OpenAI-Beta";
|
||||
pub const X_CODEX_TURN_STATE_HEADER: &str = "x-codex-turn-state";
|
||||
@@ -105,7 +114,9 @@ pub const X_CODEX_TURN_METADATA_HEADER: &str = "x-codex-turn-metadata";
|
||||
pub const X_RESPONSESAPI_INCLUDE_TIMING_METRICS_HEADER: &str =
|
||||
"x-responsesapi-include-timing-metrics";
|
||||
const RESPONSES_WEBSOCKETS_V2_BETA_HEADER_VALUE: &str = "responses_websockets=2026-02-06";
|
||||
|
||||
const RESPONSES_ENDPOINT: &str = "/responses";
|
||||
const RESPONSES_COMPACT_ENDPOINT: &str = "/responses/compact";
|
||||
const MEMORIES_SUMMARIZE_ENDPOINT: &str = "/memories/trace_summarize";
|
||||
pub fn ws_version_from_features(config: &Config) -> bool {
|
||||
config
|
||||
.features
|
||||
@@ -144,6 +155,17 @@ struct CurrentClientSetup {
|
||||
api_auth: CoreAuthProvider,
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy)]
|
||||
struct RequestRouteTelemetry {
|
||||
endpoint: &'static str,
|
||||
}
|
||||
|
||||
impl RequestRouteTelemetry {
|
||||
fn for_endpoint(endpoint: &'static str) -> Self {
|
||||
Self { endpoint }
|
||||
}
|
||||
}
|
||||
|
||||
/// A session-scoped client for model-provider API calls.
|
||||
///
|
||||
/// This holds configuration and state that should be shared across turns within a Codex session
|
||||
@@ -201,6 +223,23 @@ struct WebsocketSession {
|
||||
connection: Option<ApiWebSocketConnection>,
|
||||
last_request: Option<ResponsesApiRequest>,
|
||||
last_response_rx: Option<oneshot::Receiver<LastResponse>>,
|
||||
connection_reused: StdMutex<bool>,
|
||||
}
|
||||
|
||||
impl WebsocketSession {
|
||||
fn set_connection_reused(&self, connection_reused: bool) {
|
||||
*self
|
||||
.connection_reused
|
||||
.lock()
|
||||
.unwrap_or_else(std::sync::PoisonError::into_inner) = connection_reused;
|
||||
}
|
||||
|
||||
fn connection_reused(&self) -> bool {
|
||||
*self
|
||||
.connection_reused
|
||||
.lock()
|
||||
.unwrap_or_else(std::sync::PoisonError::into_inner)
|
||||
}
|
||||
}
|
||||
|
||||
enum WebsocketStreamOutcome {
|
||||
@@ -291,7 +330,15 @@ impl ModelClient {
|
||||
}
|
||||
let client_setup = self.current_client_setup().await?;
|
||||
let transport = ReqwestTransport::new(build_reqwest_client());
|
||||
let request_telemetry = Self::build_request_telemetry(session_telemetry);
|
||||
let request_telemetry = Self::build_request_telemetry(
|
||||
session_telemetry,
|
||||
AuthRequestTelemetryContext::new(
|
||||
client_setup.auth.as_ref().map(CodexAuth::auth_mode),
|
||||
&client_setup.api_auth,
|
||||
PendingUnauthorizedRetry::default(),
|
||||
),
|
||||
RequestRouteTelemetry::for_endpoint(RESPONSES_COMPACT_ENDPOINT),
|
||||
);
|
||||
let client =
|
||||
ApiCompactClient::new(transport, client_setup.api_provider, client_setup.api_auth)
|
||||
.with_telemetry(Some(request_telemetry));
|
||||
@@ -351,7 +398,15 @@ impl ModelClient {
|
||||
|
||||
let client_setup = self.current_client_setup().await?;
|
||||
let transport = ReqwestTransport::new(build_reqwest_client());
|
||||
let request_telemetry = Self::build_request_telemetry(session_telemetry);
|
||||
let request_telemetry = Self::build_request_telemetry(
|
||||
session_telemetry,
|
||||
AuthRequestTelemetryContext::new(
|
||||
client_setup.auth.as_ref().map(CodexAuth::auth_mode),
|
||||
&client_setup.api_auth,
|
||||
PendingUnauthorizedRetry::default(),
|
||||
),
|
||||
RequestRouteTelemetry::for_endpoint(MEMORIES_SUMMARIZE_ENDPOINT),
|
||||
);
|
||||
let client =
|
||||
ApiMemoriesClient::new(transport, client_setup.api_provider, client_setup.api_auth)
|
||||
.with_telemetry(Some(request_telemetry));
|
||||
@@ -391,8 +446,16 @@ impl ModelClient {
|
||||
}
|
||||
|
||||
/// Builds request telemetry for unary API calls (e.g., Compact endpoint).
|
||||
fn build_request_telemetry(session_telemetry: &SessionTelemetry) -> Arc<dyn RequestTelemetry> {
|
||||
let telemetry = Arc::new(ApiTelemetry::new(session_telemetry.clone()));
|
||||
fn build_request_telemetry(
|
||||
session_telemetry: &SessionTelemetry,
|
||||
auth_context: AuthRequestTelemetryContext,
|
||||
request_route_telemetry: RequestRouteTelemetry,
|
||||
) -> Arc<dyn RequestTelemetry> {
|
||||
let telemetry = Arc::new(ApiTelemetry::new(
|
||||
session_telemetry.clone(),
|
||||
auth_context,
|
||||
request_route_telemetry,
|
||||
));
|
||||
let request_telemetry: Arc<dyn RequestTelemetry> = telemetry;
|
||||
request_telemetry
|
||||
}
|
||||
@@ -458,6 +521,7 @@ impl ModelClient {
|
||||
///
|
||||
/// Both startup prewarm and in-turn `needs_new` reconnects call this path so handshake
|
||||
/// behavior remains consistent across both flows.
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
async fn connect_websocket(
|
||||
&self,
|
||||
session_telemetry: &SessionTelemetry,
|
||||
@@ -465,17 +529,69 @@ impl ModelClient {
|
||||
api_auth: CoreAuthProvider,
|
||||
turn_state: Option<Arc<OnceLock<String>>>,
|
||||
turn_metadata_header: Option<&str>,
|
||||
auth_context: AuthRequestTelemetryContext,
|
||||
request_route_telemetry: RequestRouteTelemetry,
|
||||
) -> std::result::Result<ApiWebSocketConnection, ApiError> {
|
||||
let headers = self.build_websocket_headers(turn_state.as_ref(), turn_metadata_header);
|
||||
let websocket_telemetry = ModelClientSession::build_websocket_telemetry(session_telemetry);
|
||||
ApiWebSocketResponsesClient::new(api_provider, api_auth)
|
||||
let websocket_telemetry = ModelClientSession::build_websocket_telemetry(
|
||||
session_telemetry,
|
||||
auth_context,
|
||||
request_route_telemetry,
|
||||
);
|
||||
let start = Instant::now();
|
||||
let result = ApiWebSocketResponsesClient::new(api_provider, api_auth)
|
||||
.connect(
|
||||
headers,
|
||||
crate::default_client::default_headers(),
|
||||
turn_state,
|
||||
Some(websocket_telemetry),
|
||||
)
|
||||
.await
|
||||
.await;
|
||||
let error_message = result.as_ref().err().map(telemetry_api_error_message);
|
||||
let response_debug = result
|
||||
.as_ref()
|
||||
.err()
|
||||
.map(extract_response_debug_context_from_api_error)
|
||||
.unwrap_or_default();
|
||||
let status = result.as_ref().err().and_then(api_error_http_status);
|
||||
session_telemetry.record_websocket_connect(
|
||||
start.elapsed(),
|
||||
status,
|
||||
error_message.as_deref(),
|
||||
auth_context.auth_header_attached,
|
||||
auth_context.auth_header_name,
|
||||
auth_context.retry_after_unauthorized,
|
||||
auth_context.recovery_mode,
|
||||
auth_context.recovery_phase,
|
||||
request_route_telemetry.endpoint,
|
||||
false,
|
||||
response_debug.request_id.as_deref(),
|
||||
response_debug.cf_ray.as_deref(),
|
||||
response_debug.auth_error.as_deref(),
|
||||
response_debug.auth_error_code.as_deref(),
|
||||
);
|
||||
emit_feedback_request_tags(&FeedbackRequestTags {
|
||||
endpoint: request_route_telemetry.endpoint,
|
||||
auth_header_attached: auth_context.auth_header_attached,
|
||||
auth_header_name: auth_context.auth_header_name,
|
||||
auth_mode: auth_context.auth_mode,
|
||||
auth_retry_after_unauthorized: Some(auth_context.retry_after_unauthorized),
|
||||
auth_recovery_mode: auth_context.recovery_mode,
|
||||
auth_recovery_phase: auth_context.recovery_phase,
|
||||
auth_connection_reused: Some(false),
|
||||
auth_request_id: response_debug.request_id.as_deref(),
|
||||
auth_cf_ray: response_debug.cf_ray.as_deref(),
|
||||
auth_error: response_debug.auth_error.as_deref(),
|
||||
auth_error_code: response_debug.auth_error_code.as_deref(),
|
||||
auth_recovery_followup_success: auth_context
|
||||
.retry_after_unauthorized
|
||||
.then_some(result.is_ok()),
|
||||
auth_recovery_followup_status: auth_context
|
||||
.retry_after_unauthorized
|
||||
.then_some(status)
|
||||
.flatten(),
|
||||
});
|
||||
result
|
||||
}
|
||||
|
||||
/// Builds websocket handshake headers for both prewarm and turn-time reconnect.
|
||||
@@ -718,7 +834,11 @@ impl ModelClientSession {
|
||||
"failed to build websocket prewarm client setup: {err}"
|
||||
))
|
||||
})?;
|
||||
|
||||
let auth_context = AuthRequestTelemetryContext::new(
|
||||
client_setup.auth.as_ref().map(CodexAuth::auth_mode),
|
||||
&client_setup.api_auth,
|
||||
PendingUnauthorizedRetry::default(),
|
||||
);
|
||||
let connection = self
|
||||
.client
|
||||
.connect_websocket(
|
||||
@@ -727,9 +847,12 @@ impl ModelClientSession {
|
||||
client_setup.api_auth,
|
||||
Some(Arc::clone(&self.turn_state)),
|
||||
None,
|
||||
auth_context,
|
||||
RequestRouteTelemetry::for_endpoint(RESPONSES_ENDPOINT),
|
||||
)
|
||||
.await?;
|
||||
self.websocket_session.connection = Some(connection);
|
||||
self.websocket_session.set_connection_reused(false);
|
||||
Ok(())
|
||||
}
|
||||
/// Returns a websocket connection for this turn.
|
||||
@@ -742,17 +865,22 @@ impl ModelClientSession {
|
||||
wire_api = %self.client.state.provider.wire_api,
|
||||
transport = "responses_websocket",
|
||||
api.path = "responses",
|
||||
turn.has_metadata_header = turn_metadata_header.is_some()
|
||||
turn.has_metadata_header = params.turn_metadata_header.is_some()
|
||||
)
|
||||
)]
|
||||
async fn websocket_connection(
|
||||
&mut self,
|
||||
session_telemetry: &SessionTelemetry,
|
||||
api_provider: codex_api::Provider,
|
||||
api_auth: CoreAuthProvider,
|
||||
turn_metadata_header: Option<&str>,
|
||||
options: &ApiResponsesOptions,
|
||||
params: WebsocketConnectParams<'_>,
|
||||
) -> std::result::Result<&ApiWebSocketConnection, ApiError> {
|
||||
let WebsocketConnectParams {
|
||||
session_telemetry,
|
||||
api_provider,
|
||||
api_auth,
|
||||
turn_metadata_header,
|
||||
options,
|
||||
auth_context,
|
||||
request_route_telemetry,
|
||||
} = params;
|
||||
let needs_new = match self.websocket_session.connection.as_ref() {
|
||||
Some(conn) => conn.is_closed().await,
|
||||
None => true,
|
||||
@@ -773,9 +901,14 @@ impl ModelClientSession {
|
||||
api_auth,
|
||||
Some(turn_state),
|
||||
turn_metadata_header,
|
||||
auth_context,
|
||||
request_route_telemetry,
|
||||
)
|
||||
.await?;
|
||||
self.websocket_session.connection = Some(new_conn);
|
||||
self.websocket_session.set_connection_reused(false);
|
||||
} else {
|
||||
self.websocket_session.set_connection_reused(true);
|
||||
}
|
||||
|
||||
self.websocket_session
|
||||
@@ -840,11 +973,20 @@ impl ModelClientSession {
|
||||
let mut auth_recovery = auth_manager
|
||||
.as_ref()
|
||||
.map(super::auth::AuthManager::unauthorized_recovery);
|
||||
let mut pending_retry = PendingUnauthorizedRetry::default();
|
||||
loop {
|
||||
let client_setup = self.client.current_client_setup().await?;
|
||||
let transport = ReqwestTransport::new(build_reqwest_client());
|
||||
let (request_telemetry, sse_telemetry) =
|
||||
Self::build_streaming_telemetry(session_telemetry);
|
||||
let request_auth_context = AuthRequestTelemetryContext::new(
|
||||
client_setup.auth.as_ref().map(CodexAuth::auth_mode),
|
||||
&client_setup.api_auth,
|
||||
pending_retry,
|
||||
);
|
||||
let (request_telemetry, sse_telemetry) = Self::build_streaming_telemetry(
|
||||
session_telemetry,
|
||||
request_auth_context,
|
||||
RequestRouteTelemetry::for_endpoint(RESPONSES_ENDPOINT),
|
||||
);
|
||||
let compression = self.responses_request_compression(client_setup.auth.as_ref());
|
||||
let options = self.build_responses_options(turn_metadata_header, compression);
|
||||
|
||||
@@ -872,7 +1014,14 @@ impl ModelClientSession {
|
||||
Err(ApiError::Transport(
|
||||
unauthorized_transport @ TransportError::Http { status, .. },
|
||||
)) if status == StatusCode::UNAUTHORIZED => {
|
||||
handle_unauthorized(unauthorized_transport, &mut auth_recovery).await?;
|
||||
pending_retry = PendingUnauthorizedRetry::from_recovery(
|
||||
handle_unauthorized(
|
||||
unauthorized_transport,
|
||||
&mut auth_recovery,
|
||||
session_telemetry,
|
||||
)
|
||||
.await?,
|
||||
);
|
||||
continue;
|
||||
}
|
||||
Err(err) => return Err(map_api_error(err)),
|
||||
@@ -911,8 +1060,14 @@ impl ModelClientSession {
|
||||
let mut auth_recovery = auth_manager
|
||||
.as_ref()
|
||||
.map(super::auth::AuthManager::unauthorized_recovery);
|
||||
let mut pending_retry = PendingUnauthorizedRetry::default();
|
||||
loop {
|
||||
let client_setup = self.client.current_client_setup().await?;
|
||||
let request_auth_context = AuthRequestTelemetryContext::new(
|
||||
client_setup.auth.as_ref().map(CodexAuth::auth_mode),
|
||||
&client_setup.api_auth,
|
||||
pending_retry,
|
||||
);
|
||||
let compression = self.responses_request_compression(client_setup.auth.as_ref());
|
||||
|
||||
let options = self.build_responses_options(turn_metadata_header, compression);
|
||||
@@ -933,13 +1088,17 @@ impl ModelClientSession {
|
||||
}
|
||||
|
||||
match self
|
||||
.websocket_connection(
|
||||
.websocket_connection(WebsocketConnectParams {
|
||||
session_telemetry,
|
||||
client_setup.api_provider,
|
||||
client_setup.api_auth,
|
||||
api_provider: client_setup.api_provider,
|
||||
api_auth: client_setup.api_auth,
|
||||
turn_metadata_header,
|
||||
&options,
|
||||
)
|
||||
options: &options,
|
||||
auth_context: request_auth_context,
|
||||
request_route_telemetry: RequestRouteTelemetry::for_endpoint(
|
||||
RESPONSES_ENDPOINT,
|
||||
),
|
||||
})
|
||||
.await
|
||||
{
|
||||
Ok(_) => {}
|
||||
@@ -951,7 +1110,14 @@ impl ModelClientSession {
|
||||
Err(ApiError::Transport(
|
||||
unauthorized_transport @ TransportError::Http { status, .. },
|
||||
)) if status == StatusCode::UNAUTHORIZED => {
|
||||
handle_unauthorized(unauthorized_transport, &mut auth_recovery).await?;
|
||||
pending_retry = PendingUnauthorizedRetry::from_recovery(
|
||||
handle_unauthorized(
|
||||
unauthorized_transport,
|
||||
&mut auth_recovery,
|
||||
session_telemetry,
|
||||
)
|
||||
.await?,
|
||||
);
|
||||
continue;
|
||||
}
|
||||
Err(err) => return Err(map_api_error(err)),
|
||||
@@ -968,7 +1134,7 @@ impl ModelClientSession {
|
||||
"websocket connection is unavailable".to_string(),
|
||||
))
|
||||
})?
|
||||
.stream_request(ws_request)
|
||||
.stream_request(ws_request, self.websocket_session.connection_reused())
|
||||
.await
|
||||
.map_err(map_api_error)?;
|
||||
let (stream, last_request_rx) =
|
||||
@@ -981,8 +1147,14 @@ impl ModelClientSession {
|
||||
/// Builds request and SSE telemetry for streaming API calls.
|
||||
fn build_streaming_telemetry(
|
||||
session_telemetry: &SessionTelemetry,
|
||||
auth_context: AuthRequestTelemetryContext,
|
||||
request_route_telemetry: RequestRouteTelemetry,
|
||||
) -> (Arc<dyn RequestTelemetry>, Arc<dyn SseTelemetry>) {
|
||||
let telemetry = Arc::new(ApiTelemetry::new(session_telemetry.clone()));
|
||||
let telemetry = Arc::new(ApiTelemetry::new(
|
||||
session_telemetry.clone(),
|
||||
auth_context,
|
||||
request_route_telemetry,
|
||||
));
|
||||
let request_telemetry: Arc<dyn RequestTelemetry> = telemetry.clone();
|
||||
let sse_telemetry: Arc<dyn SseTelemetry> = telemetry;
|
||||
(request_telemetry, sse_telemetry)
|
||||
@@ -991,8 +1163,14 @@ impl ModelClientSession {
|
||||
/// Builds telemetry for the Responses API WebSocket transport.
|
||||
fn build_websocket_telemetry(
|
||||
session_telemetry: &SessionTelemetry,
|
||||
auth_context: AuthRequestTelemetryContext,
|
||||
request_route_telemetry: RequestRouteTelemetry,
|
||||
) -> Arc<dyn WebsocketTelemetry> {
|
||||
let telemetry = Arc::new(ApiTelemetry::new(session_telemetry.clone()));
|
||||
let telemetry = Arc::new(ApiTelemetry::new(
|
||||
session_telemetry.clone(),
|
||||
auth_context,
|
||||
request_route_telemetry,
|
||||
));
|
||||
let websocket_telemetry: Arc<dyn WebsocketTelemetry> = telemetry;
|
||||
websocket_telemetry
|
||||
}
|
||||
@@ -1126,6 +1304,7 @@ impl ModelClientSession {
|
||||
self.websocket_session.connection = None;
|
||||
self.websocket_session.last_request = None;
|
||||
self.websocket_session.last_response_rx = None;
|
||||
self.websocket_session.set_connection_reused(false);
|
||||
}
|
||||
activated
|
||||
}
|
||||
@@ -1264,30 +1443,209 @@ where
|
||||
///
|
||||
/// When refresh succeeds, the caller should retry the API call; otherwise
|
||||
/// the mapped `CodexErr` is returned to the caller.
|
||||
#[derive(Clone, Copy, Debug)]
|
||||
struct UnauthorizedRecoveryExecution {
|
||||
mode: &'static str,
|
||||
phase: &'static str,
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Debug, Default)]
|
||||
struct PendingUnauthorizedRetry {
|
||||
retry_after_unauthorized: bool,
|
||||
recovery_mode: Option<&'static str>,
|
||||
recovery_phase: Option<&'static str>,
|
||||
}
|
||||
|
||||
impl PendingUnauthorizedRetry {
|
||||
fn from_recovery(recovery: UnauthorizedRecoveryExecution) -> Self {
|
||||
Self {
|
||||
retry_after_unauthorized: true,
|
||||
recovery_mode: Some(recovery.mode),
|
||||
recovery_phase: Some(recovery.phase),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Debug, Default)]
|
||||
struct AuthRequestTelemetryContext {
|
||||
auth_mode: Option<&'static str>,
|
||||
auth_header_attached: bool,
|
||||
auth_header_name: Option<&'static str>,
|
||||
retry_after_unauthorized: bool,
|
||||
recovery_mode: Option<&'static str>,
|
||||
recovery_phase: Option<&'static str>,
|
||||
}
|
||||
|
||||
impl AuthRequestTelemetryContext {
|
||||
fn new(
|
||||
auth_mode: Option<AuthMode>,
|
||||
api_auth: &CoreAuthProvider,
|
||||
retry: PendingUnauthorizedRetry,
|
||||
) -> Self {
|
||||
Self {
|
||||
auth_mode: auth_mode.map(|mode| match mode {
|
||||
AuthMode::ApiKey => "ApiKey",
|
||||
AuthMode::Chatgpt => "Chatgpt",
|
||||
}),
|
||||
auth_header_attached: api_auth.auth_header_attached(),
|
||||
auth_header_name: api_auth.auth_header_name(),
|
||||
retry_after_unauthorized: retry.retry_after_unauthorized,
|
||||
recovery_mode: retry.recovery_mode,
|
||||
recovery_phase: retry.recovery_phase,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
struct WebsocketConnectParams<'a> {
|
||||
session_telemetry: &'a SessionTelemetry,
|
||||
api_provider: codex_api::Provider,
|
||||
api_auth: CoreAuthProvider,
|
||||
turn_metadata_header: Option<&'a str>,
|
||||
options: &'a ApiResponsesOptions,
|
||||
auth_context: AuthRequestTelemetryContext,
|
||||
request_route_telemetry: RequestRouteTelemetry,
|
||||
}
|
||||
|
||||
async fn handle_unauthorized(
|
||||
transport: TransportError,
|
||||
auth_recovery: &mut Option<UnauthorizedRecovery>,
|
||||
) -> Result<()> {
|
||||
session_telemetry: &SessionTelemetry,
|
||||
) -> Result<UnauthorizedRecoveryExecution> {
|
||||
let debug = extract_response_debug_context(&transport);
|
||||
if let Some(recovery) = auth_recovery
|
||||
&& recovery.has_next()
|
||||
{
|
||||
let mode = recovery.mode_name();
|
||||
let phase = recovery.step_name();
|
||||
return match recovery.next().await {
|
||||
Ok(_) => Ok(()),
|
||||
Err(RefreshTokenError::Permanent(failed)) => Err(CodexErr::RefreshTokenFailed(failed)),
|
||||
Err(RefreshTokenError::Transient(other)) => Err(CodexErr::Io(other)),
|
||||
Ok(step_result) => {
|
||||
session_telemetry.record_auth_recovery(
|
||||
mode,
|
||||
phase,
|
||||
"recovery_succeeded",
|
||||
debug.request_id.as_deref(),
|
||||
debug.cf_ray.as_deref(),
|
||||
debug.auth_error.as_deref(),
|
||||
debug.auth_error_code.as_deref(),
|
||||
None,
|
||||
step_result.auth_state_changed(),
|
||||
);
|
||||
emit_feedback_auth_recovery_tags(
|
||||
mode,
|
||||
phase,
|
||||
"recovery_succeeded",
|
||||
debug.request_id.as_deref(),
|
||||
debug.cf_ray.as_deref(),
|
||||
debug.auth_error.as_deref(),
|
||||
debug.auth_error_code.as_deref(),
|
||||
);
|
||||
Ok(UnauthorizedRecoveryExecution { mode, phase })
|
||||
}
|
||||
Err(RefreshTokenError::Permanent(failed)) => {
|
||||
session_telemetry.record_auth_recovery(
|
||||
mode,
|
||||
phase,
|
||||
"recovery_failed_permanent",
|
||||
debug.request_id.as_deref(),
|
||||
debug.cf_ray.as_deref(),
|
||||
debug.auth_error.as_deref(),
|
||||
debug.auth_error_code.as_deref(),
|
||||
None,
|
||||
None,
|
||||
);
|
||||
emit_feedback_auth_recovery_tags(
|
||||
mode,
|
||||
phase,
|
||||
"recovery_failed_permanent",
|
||||
debug.request_id.as_deref(),
|
||||
debug.cf_ray.as_deref(),
|
||||
debug.auth_error.as_deref(),
|
||||
debug.auth_error_code.as_deref(),
|
||||
);
|
||||
Err(CodexErr::RefreshTokenFailed(failed))
|
||||
}
|
||||
Err(RefreshTokenError::Transient(other)) => {
|
||||
session_telemetry.record_auth_recovery(
|
||||
mode,
|
||||
phase,
|
||||
"recovery_failed_transient",
|
||||
debug.request_id.as_deref(),
|
||||
debug.cf_ray.as_deref(),
|
||||
debug.auth_error.as_deref(),
|
||||
debug.auth_error_code.as_deref(),
|
||||
None,
|
||||
None,
|
||||
);
|
||||
emit_feedback_auth_recovery_tags(
|
||||
mode,
|
||||
phase,
|
||||
"recovery_failed_transient",
|
||||
debug.request_id.as_deref(),
|
||||
debug.cf_ray.as_deref(),
|
||||
debug.auth_error.as_deref(),
|
||||
debug.auth_error_code.as_deref(),
|
||||
);
|
||||
Err(CodexErr::Io(other))
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
let (mode, phase, recovery_reason) = match auth_recovery.as_ref() {
|
||||
Some(recovery) => (
|
||||
recovery.mode_name(),
|
||||
recovery.step_name(),
|
||||
Some(recovery.unavailable_reason()),
|
||||
),
|
||||
None => ("none", "none", Some("auth_manager_missing")),
|
||||
};
|
||||
session_telemetry.record_auth_recovery(
|
||||
mode,
|
||||
phase,
|
||||
"recovery_not_run",
|
||||
debug.request_id.as_deref(),
|
||||
debug.cf_ray.as_deref(),
|
||||
debug.auth_error.as_deref(),
|
||||
debug.auth_error_code.as_deref(),
|
||||
recovery_reason,
|
||||
None,
|
||||
);
|
||||
emit_feedback_auth_recovery_tags(
|
||||
mode,
|
||||
phase,
|
||||
"recovery_not_run",
|
||||
debug.request_id.as_deref(),
|
||||
debug.cf_ray.as_deref(),
|
||||
debug.auth_error.as_deref(),
|
||||
debug.auth_error_code.as_deref(),
|
||||
);
|
||||
|
||||
Err(map_api_error(ApiError::Transport(transport)))
|
||||
}
|
||||
|
||||
fn api_error_http_status(error: &ApiError) -> Option<u16> {
|
||||
match error {
|
||||
ApiError::Transport(TransportError::Http { status, .. }) => Some(status.as_u16()),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
struct ApiTelemetry {
|
||||
session_telemetry: SessionTelemetry,
|
||||
auth_context: AuthRequestTelemetryContext,
|
||||
request_route_telemetry: RequestRouteTelemetry,
|
||||
}
|
||||
|
||||
impl ApiTelemetry {
|
||||
fn new(session_telemetry: SessionTelemetry) -> Self {
|
||||
Self { session_telemetry }
|
||||
fn new(
|
||||
session_telemetry: SessionTelemetry,
|
||||
auth_context: AuthRequestTelemetryContext,
|
||||
request_route_telemetry: RequestRouteTelemetry,
|
||||
) -> Self {
|
||||
Self {
|
||||
session_telemetry,
|
||||
auth_context,
|
||||
request_route_telemetry,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1299,13 +1657,50 @@ impl RequestTelemetry for ApiTelemetry {
|
||||
error: Option<&TransportError>,
|
||||
duration: Duration,
|
||||
) {
|
||||
let error_message = error.map(std::string::ToString::to_string);
|
||||
let error_message = error.map(telemetry_transport_error_message);
|
||||
let status = status.map(|s| s.as_u16());
|
||||
let debug = error
|
||||
.map(extract_response_debug_context)
|
||||
.unwrap_or_default();
|
||||
self.session_telemetry.record_api_request(
|
||||
attempt,
|
||||
status.map(|s| s.as_u16()),
|
||||
status,
|
||||
error_message.as_deref(),
|
||||
duration,
|
||||
self.auth_context.auth_header_attached,
|
||||
self.auth_context.auth_header_name,
|
||||
self.auth_context.retry_after_unauthorized,
|
||||
self.auth_context.recovery_mode,
|
||||
self.auth_context.recovery_phase,
|
||||
self.request_route_telemetry.endpoint,
|
||||
debug.request_id.as_deref(),
|
||||
debug.cf_ray.as_deref(),
|
||||
debug.auth_error.as_deref(),
|
||||
debug.auth_error_code.as_deref(),
|
||||
);
|
||||
emit_feedback_request_tags(&FeedbackRequestTags {
|
||||
endpoint: self.request_route_telemetry.endpoint,
|
||||
auth_header_attached: self.auth_context.auth_header_attached,
|
||||
auth_header_name: self.auth_context.auth_header_name,
|
||||
auth_mode: self.auth_context.auth_mode,
|
||||
auth_retry_after_unauthorized: Some(self.auth_context.retry_after_unauthorized),
|
||||
auth_recovery_mode: self.auth_context.recovery_mode,
|
||||
auth_recovery_phase: self.auth_context.recovery_phase,
|
||||
auth_connection_reused: None,
|
||||
auth_request_id: debug.request_id.as_deref(),
|
||||
auth_cf_ray: debug.cf_ray.as_deref(),
|
||||
auth_error: debug.auth_error.as_deref(),
|
||||
auth_error_code: debug.auth_error_code.as_deref(),
|
||||
auth_recovery_followup_success: self
|
||||
.auth_context
|
||||
.retry_after_unauthorized
|
||||
.then_some(error.is_none()),
|
||||
auth_recovery_followup_status: self
|
||||
.auth_context
|
||||
.retry_after_unauthorized
|
||||
.then_some(status)
|
||||
.flatten(),
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1323,10 +1718,40 @@ impl SseTelemetry for ApiTelemetry {
|
||||
}
|
||||
|
||||
impl WebsocketTelemetry for ApiTelemetry {
|
||||
fn on_ws_request(&self, duration: Duration, error: Option<&ApiError>) {
|
||||
let error_message = error.map(std::string::ToString::to_string);
|
||||
self.session_telemetry
|
||||
.record_websocket_request(duration, error_message.as_deref());
|
||||
fn on_ws_request(&self, duration: Duration, error: Option<&ApiError>, connection_reused: bool) {
|
||||
let error_message = error.map(telemetry_api_error_message);
|
||||
let status = error.and_then(api_error_http_status);
|
||||
let debug = error
|
||||
.map(extract_response_debug_context_from_api_error)
|
||||
.unwrap_or_default();
|
||||
self.session_telemetry.record_websocket_request(
|
||||
duration,
|
||||
error_message.as_deref(),
|
||||
connection_reused,
|
||||
);
|
||||
emit_feedback_request_tags(&FeedbackRequestTags {
|
||||
endpoint: self.request_route_telemetry.endpoint,
|
||||
auth_header_attached: self.auth_context.auth_header_attached,
|
||||
auth_header_name: self.auth_context.auth_header_name,
|
||||
auth_mode: self.auth_context.auth_mode,
|
||||
auth_retry_after_unauthorized: Some(self.auth_context.retry_after_unauthorized),
|
||||
auth_recovery_mode: self.auth_context.recovery_mode,
|
||||
auth_recovery_phase: self.auth_context.recovery_phase,
|
||||
auth_connection_reused: Some(connection_reused),
|
||||
auth_request_id: debug.request_id.as_deref(),
|
||||
auth_cf_ray: debug.cf_ray.as_deref(),
|
||||
auth_error: debug.auth_error.as_deref(),
|
||||
auth_error_code: debug.auth_error_code.as_deref(),
|
||||
auth_recovery_followup_success: self
|
||||
.auth_context
|
||||
.retry_after_unauthorized
|
||||
.then_some(error.is_none()),
|
||||
auth_recovery_followup_status: self
|
||||
.auth_context
|
||||
.retry_after_unauthorized
|
||||
.then_some(status)
|
||||
.flatten(),
|
||||
});
|
||||
}
|
||||
|
||||
fn on_ws_event(
|
||||
|
||||
@@ -1,4 +1,7 @@
|
||||
use super::AuthRequestTelemetryContext;
|
||||
use super::ModelClient;
|
||||
use super::PendingUnauthorizedRetry;
|
||||
use super::UnauthorizedRecoveryExecution;
|
||||
use codex_otel::SessionTelemetry;
|
||||
use codex_protocol::ThreadId;
|
||||
use codex_protocol::openai_models::ModelInfo;
|
||||
@@ -94,3 +97,22 @@ async fn summarize_memories_returns_empty_for_empty_input() {
|
||||
.expect("empty summarize request should succeed");
|
||||
assert_eq!(output.len(), 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn auth_request_telemetry_context_tracks_attached_auth_and_retry_phase() {
|
||||
let auth_context = AuthRequestTelemetryContext::new(
|
||||
Some(crate::auth::AuthMode::Chatgpt),
|
||||
&crate::api_bridge::CoreAuthProvider::for_test(Some("access-token"), Some("workspace-123")),
|
||||
PendingUnauthorizedRetry::from_recovery(UnauthorizedRecoveryExecution {
|
||||
mode: "managed",
|
||||
phase: "refresh_token",
|
||||
}),
|
||||
);
|
||||
|
||||
assert_eq!(auth_context.auth_mode, Some("Chatgpt"));
|
||||
assert!(auth_context.auth_header_attached);
|
||||
assert_eq!(auth_context.auth_header_name, Some("authorization"));
|
||||
assert!(auth_context.retry_after_unauthorized);
|
||||
assert_eq!(auth_context.recovery_mode, Some("managed"));
|
||||
assert_eq!(auth_context.recovery_phase, Some("refresh_token"));
|
||||
}
|
||||
|
||||
@@ -3936,12 +3936,13 @@ impl Session {
|
||||
server: &str,
|
||||
tool: &str,
|
||||
arguments: Option<serde_json::Value>,
|
||||
meta: Option<serde_json::Value>,
|
||||
) -> anyhow::Result<CallToolResult> {
|
||||
self.services
|
||||
.mcp_connection_manager
|
||||
.read()
|
||||
.await
|
||||
.call_tool(server, tool, arguments)
|
||||
.call_tool(server, tool, arguments, meta)
|
||||
.await
|
||||
}
|
||||
|
||||
|
||||
@@ -292,6 +292,8 @@ pub struct UnexpectedResponseError {
|
||||
pub url: Option<String>,
|
||||
pub cf_ray: Option<String>,
|
||||
pub request_id: Option<String>,
|
||||
pub identity_authorization_error: Option<String>,
|
||||
pub identity_error_code: Option<String>,
|
||||
}
|
||||
|
||||
const CLOUDFLARE_BLOCKED_MESSAGE: &str =
|
||||
@@ -346,6 +348,12 @@ impl UnexpectedResponseError {
|
||||
if let Some(id) = &self.request_id {
|
||||
message.push_str(&format!(", request id: {id}"));
|
||||
}
|
||||
if let Some(auth_error) = &self.identity_authorization_error {
|
||||
message.push_str(&format!(", auth error: {auth_error}"));
|
||||
}
|
||||
if let Some(error_code) = &self.identity_error_code {
|
||||
message.push_str(&format!(", auth error code: {error_code}"));
|
||||
}
|
||||
|
||||
Some(message)
|
||||
}
|
||||
@@ -368,6 +376,12 @@ impl std::fmt::Display for UnexpectedResponseError {
|
||||
if let Some(id) = &self.request_id {
|
||||
message.push_str(&format!(", request id: {id}"));
|
||||
}
|
||||
if let Some(auth_error) = &self.identity_authorization_error {
|
||||
message.push_str(&format!(", auth error: {auth_error}"));
|
||||
}
|
||||
if let Some(error_code) = &self.identity_error_code {
|
||||
message.push_str(&format!(", auth error code: {error_code}"));
|
||||
}
|
||||
write!(f, "{message}")
|
||||
}
|
||||
}
|
||||
|
||||
@@ -328,6 +328,8 @@ fn unexpected_status_cloudflare_html_is_simplified() {
|
||||
url: Some("http://example.com/blocked".to_string()),
|
||||
cf_ray: Some("ray-id".to_string()),
|
||||
request_id: None,
|
||||
identity_authorization_error: None,
|
||||
identity_error_code: None,
|
||||
};
|
||||
let status = StatusCode::FORBIDDEN.to_string();
|
||||
let url = "http://example.com/blocked";
|
||||
@@ -345,6 +347,8 @@ fn unexpected_status_non_html_is_unchanged() {
|
||||
url: Some("http://example.com/plain".to_string()),
|
||||
cf_ray: None,
|
||||
request_id: None,
|
||||
identity_authorization_error: None,
|
||||
identity_error_code: None,
|
||||
};
|
||||
let status = StatusCode::FORBIDDEN.to_string();
|
||||
let url = "http://example.com/plain";
|
||||
@@ -363,6 +367,8 @@ fn unexpected_status_prefers_error_message_when_present() {
|
||||
url: Some("https://chatgpt.com/backend-api/codex/responses".to_string()),
|
||||
cf_ray: None,
|
||||
request_id: Some("req-123".to_string()),
|
||||
identity_authorization_error: None,
|
||||
identity_error_code: None,
|
||||
};
|
||||
let status = StatusCode::UNAUTHORIZED.to_string();
|
||||
assert_eq!(
|
||||
@@ -382,6 +388,8 @@ fn unexpected_status_truncates_long_body_with_ellipsis() {
|
||||
url: Some("http://example.com/long".to_string()),
|
||||
cf_ray: None,
|
||||
request_id: Some("req-long".to_string()),
|
||||
identity_authorization_error: None,
|
||||
identity_error_code: None,
|
||||
};
|
||||
let status = StatusCode::BAD_GATEWAY.to_string();
|
||||
let expected_body = format!("{}...", "x".repeat(UNEXPECTED_RESPONSE_BODY_MAX_BYTES));
|
||||
@@ -401,6 +409,8 @@ fn unexpected_status_includes_cf_ray_and_request_id() {
|
||||
url: Some("https://chatgpt.com/backend-api/codex/responses".to_string()),
|
||||
cf_ray: Some("9c81f9f18f2fa49d-LHR".to_string()),
|
||||
request_id: Some("req-xyz".to_string()),
|
||||
identity_authorization_error: None,
|
||||
identity_error_code: None,
|
||||
};
|
||||
let status = StatusCode::UNAUTHORIZED.to_string();
|
||||
assert_eq!(
|
||||
@@ -411,6 +421,26 @@ fn unexpected_status_includes_cf_ray_and_request_id() {
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn unexpected_status_includes_identity_auth_details() {
|
||||
let err = UnexpectedResponseError {
|
||||
status: StatusCode::UNAUTHORIZED,
|
||||
body: "plain text error".to_string(),
|
||||
url: Some("https://chatgpt.com/backend-api/codex/models".to_string()),
|
||||
cf_ray: Some("cf-ray-auth-401-test".to_string()),
|
||||
request_id: Some("req-auth".to_string()),
|
||||
identity_authorization_error: Some("missing_authorization_header".to_string()),
|
||||
identity_error_code: Some("token_expired".to_string()),
|
||||
};
|
||||
let status = StatusCode::UNAUTHORIZED.to_string();
|
||||
assert_eq!(
|
||||
err.to_string(),
|
||||
format!(
|
||||
"unexpected status {status}: plain text error, url: https://chatgpt.com/backend-api/codex/models, cf-ray: cf-ray-auth-401-test, request id: req-auth, auth error: missing_authorization_header, auth error code: token_expired"
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn usage_limit_reached_includes_hours_and_minutes() {
|
||||
let base = Utc.with_ymd_and_hms(2024, 1, 1, 0, 0, 0).unwrap();
|
||||
|
||||
@@ -154,8 +154,6 @@ pub enum Feature {
|
||||
Plugins,
|
||||
/// Allow the model to invoke the built-in image generation tool.
|
||||
ImageGeneration,
|
||||
/// Route apps MCP calls through the configured gateway.
|
||||
AppsMcpGateway,
|
||||
/// Allow prompting and installing missing MCP dependencies.
|
||||
SkillMcpDependencyInstall,
|
||||
/// Prompt for missing skill env var dependencies.
|
||||
@@ -753,12 +751,6 @@ pub const FEATURES: &[FeatureSpec] = &[
|
||||
stage: Stage::UnderDevelopment,
|
||||
default_enabled: false,
|
||||
},
|
||||
FeatureSpec {
|
||||
id: Feature::AppsMcpGateway,
|
||||
key: "apps_mcp_gateway",
|
||||
stage: Stage::UnderDevelopment,
|
||||
default_enabled: false,
|
||||
},
|
||||
FeatureSpec {
|
||||
id: Feature::SkillMcpDependencyInstall,
|
||||
key: "skill_mcp_dependency_install",
|
||||
|
||||
@@ -87,6 +87,7 @@ pub use model_provider_info::WireApi;
|
||||
pub use model_provider_info::built_in_model_providers;
|
||||
pub use model_provider_info::create_oss_provider_with_base_url;
|
||||
mod event_mapping;
|
||||
mod response_debug_context;
|
||||
pub mod review_format;
|
||||
pub mod review_prompts;
|
||||
mod seatbelt_permissions;
|
||||
|
||||
@@ -21,7 +21,6 @@ use crate::CodexAuth;
|
||||
use crate::config::Config;
|
||||
use crate::config::types::McpServerConfig;
|
||||
use crate::config::types::McpServerTransportConfig;
|
||||
use crate::features::Feature;
|
||||
use crate::mcp::auth::compute_auth_statuses;
|
||||
use crate::mcp_connection_manager::McpConnectionManager;
|
||||
use crate::mcp_connection_manager::SandboxState;
|
||||
@@ -33,8 +32,6 @@ const MCP_TOOL_NAME_PREFIX: &str = "mcp";
|
||||
const MCP_TOOL_NAME_DELIMITER: &str = "__";
|
||||
pub(crate) const CODEX_APPS_MCP_SERVER_NAME: &str = "codex_apps";
|
||||
const CODEX_CONNECTORS_TOKEN_ENV_VAR: &str = "CODEX_CONNECTORS_TOKEN";
|
||||
const OPENAI_CONNECTORS_MCP_BASE_URL: &str = "https://api.openai.com";
|
||||
const OPENAI_CONNECTORS_MCP_PATH: &str = "/v1/connectors/gateways/flat/mcp";
|
||||
|
||||
#[derive(Debug, Clone, Default, PartialEq, Eq)]
|
||||
pub struct ToolPluginProvenance {
|
||||
@@ -94,13 +91,6 @@ impl ToolPluginProvenance {
|
||||
}
|
||||
}
|
||||
|
||||
// Legacy vs new MCP gateway
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
enum CodexAppsMcpGateway {
|
||||
LegacyMCPGateway,
|
||||
MCPGateway,
|
||||
}
|
||||
|
||||
fn codex_apps_mcp_bearer_token_env_var() -> Option<String> {
|
||||
match env::var(CODEX_CONNECTORS_TOKEN_ENV_VAR) {
|
||||
Ok(value) if !value.trim().is_empty() => Some(CODEX_CONNECTORS_TOKEN_ENV_VAR.to_string()),
|
||||
@@ -135,14 +125,6 @@ fn codex_apps_mcp_http_headers(auth: Option<&CodexAuth>) -> Option<HashMap<Strin
|
||||
}
|
||||
}
|
||||
|
||||
fn selected_config_codex_apps_mcp_gateway(config: &Config) -> CodexAppsMcpGateway {
|
||||
if config.features.enabled(Feature::AppsMcpGateway) {
|
||||
CodexAppsMcpGateway::MCPGateway
|
||||
} else {
|
||||
CodexAppsMcpGateway::LegacyMCPGateway
|
||||
}
|
||||
}
|
||||
|
||||
fn normalize_codex_apps_base_url(base_url: &str) -> String {
|
||||
let mut base_url = base_url.trim_end_matches('/').to_string();
|
||||
if (base_url.starts_with("https://chatgpt.com")
|
||||
@@ -154,11 +136,7 @@ fn normalize_codex_apps_base_url(base_url: &str) -> String {
|
||||
base_url
|
||||
}
|
||||
|
||||
fn codex_apps_mcp_url_for_gateway(base_url: &str, gateway: CodexAppsMcpGateway) -> String {
|
||||
if gateway == CodexAppsMcpGateway::MCPGateway {
|
||||
return format!("{OPENAI_CONNECTORS_MCP_BASE_URL}{OPENAI_CONNECTORS_MCP_PATH}");
|
||||
}
|
||||
|
||||
fn codex_apps_mcp_url_for_base_url(base_url: &str) -> String {
|
||||
let base_url = normalize_codex_apps_base_url(base_url);
|
||||
if base_url.contains("/backend-api") {
|
||||
format!("{base_url}/wham/apps")
|
||||
@@ -170,10 +148,7 @@ fn codex_apps_mcp_url_for_gateway(base_url: &str, gateway: CodexAppsMcpGateway)
|
||||
}
|
||||
|
||||
pub(crate) fn codex_apps_mcp_url(config: &Config) -> String {
|
||||
codex_apps_mcp_url_for_gateway(
|
||||
&config.chatgpt_base_url,
|
||||
selected_config_codex_apps_mcp_gateway(config),
|
||||
)
|
||||
codex_apps_mcp_url_for_base_url(&config.chatgpt_base_url)
|
||||
}
|
||||
|
||||
fn codex_apps_mcp_server_config(config: &Config, auth: Option<&CodexAuth>) -> McpServerConfig {
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
use super::*;
|
||||
use crate::config::CONFIG_TOML_FILE;
|
||||
use crate::config::ConfigBuilder;
|
||||
use crate::features::Feature;
|
||||
use crate::plugins::AppConnectorId;
|
||||
use crate::plugins::PluginCapabilitySummary;
|
||||
use pretty_assertions::assert_eq;
|
||||
@@ -123,67 +124,27 @@ fn tool_plugin_provenance_collects_app_and_mcp_sources() {
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn codex_apps_mcp_url_for_default_gateway_keeps_existing_paths() {
|
||||
fn codex_apps_mcp_url_for_base_url_keeps_existing_paths() {
|
||||
assert_eq!(
|
||||
codex_apps_mcp_url_for_gateway(
|
||||
"https://chatgpt.com/backend-api",
|
||||
CodexAppsMcpGateway::LegacyMCPGateway
|
||||
),
|
||||
codex_apps_mcp_url_for_base_url("https://chatgpt.com/backend-api"),
|
||||
"https://chatgpt.com/backend-api/wham/apps"
|
||||
);
|
||||
assert_eq!(
|
||||
codex_apps_mcp_url_for_gateway(
|
||||
"https://chat.openai.com",
|
||||
CodexAppsMcpGateway::LegacyMCPGateway
|
||||
),
|
||||
codex_apps_mcp_url_for_base_url("https://chat.openai.com"),
|
||||
"https://chat.openai.com/backend-api/wham/apps"
|
||||
);
|
||||
assert_eq!(
|
||||
codex_apps_mcp_url_for_gateway(
|
||||
"http://localhost:8080/api/codex",
|
||||
CodexAppsMcpGateway::LegacyMCPGateway
|
||||
),
|
||||
codex_apps_mcp_url_for_base_url("http://localhost:8080/api/codex"),
|
||||
"http://localhost:8080/api/codex/apps"
|
||||
);
|
||||
assert_eq!(
|
||||
codex_apps_mcp_url_for_gateway(
|
||||
"http://localhost:8080",
|
||||
CodexAppsMcpGateway::LegacyMCPGateway
|
||||
),
|
||||
codex_apps_mcp_url_for_base_url("http://localhost:8080"),
|
||||
"http://localhost:8080/api/codex/apps"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn codex_apps_mcp_url_for_gateway_uses_openai_connectors_gateway() {
|
||||
let expected_url = format!("{OPENAI_CONNECTORS_MCP_BASE_URL}{OPENAI_CONNECTORS_MCP_PATH}");
|
||||
|
||||
assert_eq!(
|
||||
codex_apps_mcp_url_for_gateway(
|
||||
"https://chatgpt.com/backend-api",
|
||||
CodexAppsMcpGateway::MCPGateway
|
||||
),
|
||||
expected_url.as_str()
|
||||
);
|
||||
assert_eq!(
|
||||
codex_apps_mcp_url_for_gateway("https://chat.openai.com", CodexAppsMcpGateway::MCPGateway),
|
||||
expected_url.as_str()
|
||||
);
|
||||
assert_eq!(
|
||||
codex_apps_mcp_url_for_gateway(
|
||||
"http://localhost:8080/api/codex",
|
||||
CodexAppsMcpGateway::MCPGateway
|
||||
),
|
||||
expected_url.as_str()
|
||||
);
|
||||
assert_eq!(
|
||||
codex_apps_mcp_url_for_gateway("http://localhost:8080", CodexAppsMcpGateway::MCPGateway),
|
||||
expected_url.as_str()
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn codex_apps_mcp_url_uses_default_gateway_when_feature_is_disabled() {
|
||||
fn codex_apps_mcp_url_uses_legacy_codex_apps_path() {
|
||||
let mut config = crate::config::test_config();
|
||||
config.chatgpt_base_url = "https://chatgpt.com".to_string();
|
||||
|
||||
@@ -194,22 +155,7 @@ fn codex_apps_mcp_url_uses_default_gateway_when_feature_is_disabled() {
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn codex_apps_mcp_url_uses_openai_connectors_gateway_when_feature_is_enabled() {
|
||||
let mut config = crate::config::test_config();
|
||||
config.chatgpt_base_url = "https://chatgpt.com".to_string();
|
||||
config
|
||||
.features
|
||||
.enable(Feature::AppsMcpGateway)
|
||||
.expect("test config should allow apps gateway");
|
||||
|
||||
assert_eq!(
|
||||
codex_apps_mcp_url(&config),
|
||||
format!("{OPENAI_CONNECTORS_MCP_BASE_URL}{OPENAI_CONNECTORS_MCP_PATH}")
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn codex_apps_server_config_switches_gateway_with_flags() {
|
||||
fn codex_apps_server_config_uses_legacy_codex_apps_path() {
|
||||
let mut config = crate::config::test_config();
|
||||
config.chatgpt_base_url = "https://chatgpt.com".to_string();
|
||||
|
||||
@@ -231,22 +177,6 @@ fn codex_apps_server_config_switches_gateway_with_flags() {
|
||||
};
|
||||
|
||||
assert_eq!(url, "https://chatgpt.com/backend-api/wham/apps");
|
||||
|
||||
config
|
||||
.features
|
||||
.enable(Feature::AppsMcpGateway)
|
||||
.expect("test config should allow apps gateway");
|
||||
servers = with_codex_apps_mcp(servers, true, None, &config);
|
||||
let server = servers
|
||||
.get(CODEX_APPS_MCP_SERVER_NAME)
|
||||
.expect("codex apps should remain present when apps stays enabled");
|
||||
let url = match &server.transport {
|
||||
McpServerTransportConfig::StreamableHttp { url, .. } => url,
|
||||
_ => panic!("expected streamable http transport for codex apps"),
|
||||
};
|
||||
|
||||
let expected_url = format!("{OPENAI_CONNECTORS_MCP_BASE_URL}{OPENAI_CONNECTORS_MCP_PATH}");
|
||||
assert_eq!(url, &expected_url);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
|
||||
@@ -1014,6 +1014,7 @@ impl McpConnectionManager {
|
||||
server: &str,
|
||||
tool: &str,
|
||||
arguments: Option<serde_json::Value>,
|
||||
meta: Option<serde_json::Value>,
|
||||
) -> Result<CallToolResult> {
|
||||
let client = self.client_by_name(server).await?;
|
||||
if !client.tool_filter.allows(tool) {
|
||||
@@ -1024,7 +1025,7 @@ impl McpConnectionManager {
|
||||
|
||||
let result: rmcp::model::CallToolResult = client
|
||||
.client
|
||||
.call_tool(tool.to_string(), arguments, client.tool_timeout)
|
||||
.call_tool(tool.to_string(), arguments, meta, client.tool_timeout)
|
||||
.await
|
||||
.with_context(|| format!("tool call failed for `{server}/{tool}`"))?;
|
||||
|
||||
|
||||
@@ -117,6 +117,7 @@ pub(crate) async fn handle_mcp_tool_call(
|
||||
.counter("codex.mcp.call", 1, &[("status", status)]);
|
||||
return CallToolResult::from_result(result);
|
||||
}
|
||||
let request_meta = build_mcp_tool_call_request_meta(&server, metadata.as_ref());
|
||||
|
||||
let tool_call_begin_event = EventMsg::McpToolCallBegin(McpToolCallBeginEvent {
|
||||
call_id: call_id.clone(),
|
||||
@@ -142,7 +143,12 @@ pub(crate) async fn handle_mcp_tool_call(
|
||||
|
||||
let start = Instant::now();
|
||||
let result = sess
|
||||
.call_tool(&server, &tool_name, arguments_value.clone())
|
||||
.call_tool(
|
||||
&server,
|
||||
&tool_name,
|
||||
arguments_value.clone(),
|
||||
request_meta.clone(),
|
||||
)
|
||||
.await
|
||||
.map_err(|e| format!("tool call error: {e:?}"));
|
||||
let result = sanitize_mcp_tool_result_for_model(
|
||||
@@ -226,7 +232,7 @@ pub(crate) async fn handle_mcp_tool_call(
|
||||
let start = Instant::now();
|
||||
// Perform the tool call.
|
||||
let result = sess
|
||||
.call_tool(&server, &tool_name, arguments_value.clone())
|
||||
.call_tool(&server, &tool_name, arguments_value.clone(), request_meta)
|
||||
.await
|
||||
.map_err(|e| format!("tool call error: {e:?}"));
|
||||
let result = sanitize_mcp_tool_result_for_model(
|
||||
@@ -374,6 +380,24 @@ pub(crate) struct McpToolApprovalMetadata {
|
||||
connector_description: Option<String>,
|
||||
tool_title: Option<String>,
|
||||
tool_description: Option<String>,
|
||||
codex_apps_meta: Option<serde_json::Map<String, serde_json::Value>>,
|
||||
}
|
||||
|
||||
const MCP_TOOL_CODEX_APPS_META_KEY: &str = "_codex_apps";
|
||||
|
||||
fn build_mcp_tool_call_request_meta(
|
||||
server: &str,
|
||||
metadata: Option<&McpToolApprovalMetadata>,
|
||||
) -> Option<serde_json::Value> {
|
||||
if server != CODEX_APPS_MCP_SERVER_NAME {
|
||||
return None;
|
||||
}
|
||||
|
||||
let codex_apps_meta = metadata.and_then(|metadata| metadata.codex_apps_meta.as_ref())?;
|
||||
|
||||
Some(serde_json::json!({
|
||||
MCP_TOOL_CODEX_APPS_META_KEY: codex_apps_meta,
|
||||
}))
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy)]
|
||||
@@ -750,6 +774,13 @@ pub(crate) async fn lookup_mcp_tool_metadata(
|
||||
connector_description,
|
||||
tool_title: tool_info.tool.title,
|
||||
tool_description: tool_info.tool.description.map(std::borrow::Cow::into_owned),
|
||||
codex_apps_meta: tool_info
|
||||
.tool
|
||||
.meta
|
||||
.as_ref()
|
||||
.and_then(|meta| meta.get(MCP_TOOL_CODEX_APPS_META_KEY))
|
||||
.and_then(serde_json::Value::as_object)
|
||||
.cloned(),
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
@@ -47,6 +47,7 @@ fn approval_metadata(
|
||||
connector_description: connector_description.map(str::to_string),
|
||||
tool_title: tool_title.map(str::to_string),
|
||||
tool_description: tool_description.map(str::to_string),
|
||||
codex_apps_meta: None,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -415,6 +416,39 @@ fn sanitize_mcp_tool_result_for_model_preserves_image_when_supported() {
|
||||
assert_eq!(got, original);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn codex_apps_tool_call_request_meta_includes_codex_apps_meta() {
|
||||
let metadata = McpToolApprovalMetadata {
|
||||
annotations: None,
|
||||
connector_id: Some("calendar".to_string()),
|
||||
connector_name: Some("Calendar".to_string()),
|
||||
connector_description: Some("Manage events".to_string()),
|
||||
tool_title: Some("Create Event".to_string()),
|
||||
tool_description: Some("Create a calendar event.".to_string()),
|
||||
codex_apps_meta: Some(
|
||||
serde_json::json!({
|
||||
"resource_uri": "connector://calendar/tools/calendar_create_event",
|
||||
"contains_mcp_source": true,
|
||||
"connector_id": "calendar",
|
||||
})
|
||||
.as_object()
|
||||
.cloned()
|
||||
.expect("_codex_apps metadata should be an object"),
|
||||
),
|
||||
};
|
||||
|
||||
assert_eq!(
|
||||
build_mcp_tool_call_request_meta(CODEX_APPS_MCP_SERVER_NAME, Some(&metadata)),
|
||||
Some(serde_json::json!({
|
||||
MCP_TOOL_CODEX_APPS_META_KEY: {
|
||||
"resource_uri": "connector://calendar/tools/calendar_create_event",
|
||||
"contains_mcp_source": true,
|
||||
"connector_id": "calendar",
|
||||
},
|
||||
}))
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn accepted_elicitation_content_converts_to_request_user_input_response() {
|
||||
let response = request_user_input_response_from_elicitation_content(Some(serde_json::json!(
|
||||
@@ -535,6 +569,7 @@ fn guardian_mcp_review_request_includes_annotations_when_present() {
|
||||
connector_description: None,
|
||||
tool_title: None,
|
||||
tool_description: None,
|
||||
codex_apps_meta: None,
|
||||
};
|
||||
|
||||
let request = build_guardian_mcp_tool_review_request("call-1", &invocation, Some(&metadata));
|
||||
@@ -856,6 +891,7 @@ async fn approve_mode_skips_when_annotations_do_not_require_approval() {
|
||||
connector_description: None,
|
||||
tool_title: Some("Read Only Tool".to_string()),
|
||||
tool_description: None,
|
||||
codex_apps_meta: None,
|
||||
};
|
||||
|
||||
let decision = maybe_request_mcp_tool_approval(
|
||||
@@ -919,6 +955,7 @@ async fn approve_mode_blocks_when_arc_returns_interrupt_for_model() {
|
||||
connector_description: Some("Manage events".to_string()),
|
||||
tool_title: Some("Dangerous Tool".to_string()),
|
||||
tool_description: Some("Performs a risky action.".to_string()),
|
||||
codex_apps_meta: None,
|
||||
};
|
||||
|
||||
let decision = maybe_request_mcp_tool_approval(
|
||||
@@ -1021,6 +1058,7 @@ async fn approve_mode_routes_arc_ask_user_to_guardian_when_guardian_reviewer_is_
|
||||
connector_description: Some("Manage events".to_string()),
|
||||
tool_title: Some("Dangerous Tool".to_string()),
|
||||
tool_description: Some("Performs a risky action.".to_string()),
|
||||
codex_apps_meta: None,
|
||||
};
|
||||
|
||||
let decision = maybe_request_mcp_tool_approval(
|
||||
|
||||
@@ -3,6 +3,7 @@ use crate::api_bridge::auth_provider_from_auth;
|
||||
use crate::api_bridge::map_api_error;
|
||||
use crate::auth::AuthManager;
|
||||
use crate::auth::AuthMode;
|
||||
use crate::auth::CodexAuth;
|
||||
use crate::config::Config;
|
||||
use crate::default_client::build_reqwest_client;
|
||||
use crate::error::CodexErr;
|
||||
@@ -11,8 +12,15 @@ use crate::model_provider_info::ModelProviderInfo;
|
||||
use crate::models_manager::collaboration_mode_presets::CollaborationModesConfig;
|
||||
use crate::models_manager::collaboration_mode_presets::builtin_collaboration_mode_presets;
|
||||
use crate::models_manager::model_info;
|
||||
use crate::response_debug_context::extract_response_debug_context;
|
||||
use crate::response_debug_context::telemetry_transport_error_message;
|
||||
use crate::util::FeedbackRequestTags;
|
||||
use crate::util::emit_feedback_request_tags;
|
||||
use codex_api::ModelsClient;
|
||||
use codex_api::RequestTelemetry;
|
||||
use codex_api::ReqwestTransport;
|
||||
use codex_api::TransportError;
|
||||
use codex_otel::TelemetryAuthMode;
|
||||
use codex_protocol::config_types::CollaborationModeMask;
|
||||
use codex_protocol::openai_models::ModelInfo;
|
||||
use codex_protocol::openai_models::ModelPreset;
|
||||
@@ -32,6 +40,82 @@ use tracing::instrument;
|
||||
const MODEL_CACHE_FILE: &str = "models_cache.json";
|
||||
const DEFAULT_MODEL_CACHE_TTL: Duration = Duration::from_secs(300);
|
||||
const MODELS_REFRESH_TIMEOUT: Duration = Duration::from_secs(5);
|
||||
const MODELS_ENDPOINT: &str = "/models";
|
||||
#[derive(Clone)]
|
||||
struct ModelsRequestTelemetry {
|
||||
auth_mode: Option<String>,
|
||||
auth_header_attached: bool,
|
||||
auth_header_name: Option<&'static str>,
|
||||
}
|
||||
|
||||
impl RequestTelemetry for ModelsRequestTelemetry {
|
||||
fn on_request(
|
||||
&self,
|
||||
attempt: u64,
|
||||
status: Option<http::StatusCode>,
|
||||
error: Option<&TransportError>,
|
||||
duration: Duration,
|
||||
) {
|
||||
let success = status.is_some_and(|code| code.is_success()) && error.is_none();
|
||||
let error_message = error.map(telemetry_transport_error_message);
|
||||
let response_debug = error
|
||||
.map(extract_response_debug_context)
|
||||
.unwrap_or_default();
|
||||
let status = status.map(|status| status.as_u16());
|
||||
tracing::event!(
|
||||
target: "codex_otel.log_only",
|
||||
tracing::Level::INFO,
|
||||
event.name = "codex.api_request",
|
||||
duration_ms = %duration.as_millis(),
|
||||
http.response.status_code = status,
|
||||
success = success,
|
||||
error.message = error_message.as_deref(),
|
||||
attempt = attempt,
|
||||
endpoint = MODELS_ENDPOINT,
|
||||
auth.header_attached = self.auth_header_attached,
|
||||
auth.header_name = self.auth_header_name,
|
||||
auth.request_id = response_debug.request_id.as_deref(),
|
||||
auth.cf_ray = response_debug.cf_ray.as_deref(),
|
||||
auth.error = response_debug.auth_error.as_deref(),
|
||||
auth.error_code = response_debug.auth_error_code.as_deref(),
|
||||
auth.mode = self.auth_mode.as_deref(),
|
||||
);
|
||||
tracing::event!(
|
||||
target: "codex_otel.trace_safe",
|
||||
tracing::Level::INFO,
|
||||
event.name = "codex.api_request",
|
||||
duration_ms = %duration.as_millis(),
|
||||
http.response.status_code = status,
|
||||
success = success,
|
||||
error.message = error_message.as_deref(),
|
||||
attempt = attempt,
|
||||
endpoint = MODELS_ENDPOINT,
|
||||
auth.header_attached = self.auth_header_attached,
|
||||
auth.header_name = self.auth_header_name,
|
||||
auth.request_id = response_debug.request_id.as_deref(),
|
||||
auth.cf_ray = response_debug.cf_ray.as_deref(),
|
||||
auth.error = response_debug.auth_error.as_deref(),
|
||||
auth.error_code = response_debug.auth_error_code.as_deref(),
|
||||
auth.mode = self.auth_mode.as_deref(),
|
||||
);
|
||||
emit_feedback_request_tags(&FeedbackRequestTags {
|
||||
endpoint: MODELS_ENDPOINT,
|
||||
auth_header_attached: self.auth_header_attached,
|
||||
auth_header_name: self.auth_header_name,
|
||||
auth_mode: self.auth_mode.as_deref(),
|
||||
auth_retry_after_unauthorized: None,
|
||||
auth_recovery_mode: None,
|
||||
auth_recovery_phase: None,
|
||||
auth_connection_reused: None,
|
||||
auth_request_id: response_debug.request_id.as_deref(),
|
||||
auth_cf_ray: response_debug.cf_ray.as_deref(),
|
||||
auth_error: response_debug.auth_error.as_deref(),
|
||||
auth_error_code: response_debug.auth_error_code.as_deref(),
|
||||
auth_recovery_followup_success: None,
|
||||
auth_recovery_followup_status: None,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
/// Strategy for refreshing available models.
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
@@ -330,11 +414,17 @@ impl ModelsManager {
|
||||
let _timer =
|
||||
codex_otel::start_global_timer("codex.remote_models.fetch_update.duration_ms", &[]);
|
||||
let auth = self.auth_manager.auth().await;
|
||||
let auth_mode = self.auth_manager.auth_mode();
|
||||
let auth_mode = auth.as_ref().map(CodexAuth::auth_mode);
|
||||
let api_provider = self.provider.to_api_provider(auth_mode)?;
|
||||
let api_auth = auth_provider_from_auth(auth.clone(), &self.provider)?;
|
||||
let transport = ReqwestTransport::new(build_reqwest_client());
|
||||
let client = ModelsClient::new(transport, api_provider, api_auth);
|
||||
let request_telemetry: Arc<dyn RequestTelemetry> = Arc::new(ModelsRequestTelemetry {
|
||||
auth_mode: auth_mode.map(|mode| TelemetryAuthMode::from(mode).to_string()),
|
||||
auth_header_attached: api_auth.auth_header_attached(),
|
||||
auth_header_name: api_auth.auth_header_name(),
|
||||
});
|
||||
let client = ModelsClient::new(transport, api_provider, api_auth)
|
||||
.with_telemetry(Some(request_telemetry));
|
||||
|
||||
let client_version = crate::models_manager::client_version_to_whole();
|
||||
let (models, etag) = timeout(
|
||||
|
||||
167
codex-rs/core/src/response_debug_context.rs
Normal file
167
codex-rs/core/src/response_debug_context.rs
Normal file
@@ -0,0 +1,167 @@
|
||||
use base64::Engine;
|
||||
use codex_api::TransportError;
|
||||
use codex_api::error::ApiError;
|
||||
|
||||
const REQUEST_ID_HEADER: &str = "x-request-id";
|
||||
const OAI_REQUEST_ID_HEADER: &str = "x-oai-request-id";
|
||||
const CF_RAY_HEADER: &str = "cf-ray";
|
||||
const AUTH_ERROR_HEADER: &str = "x-openai-authorization-error";
|
||||
const X_ERROR_JSON_HEADER: &str = "x-error-json";
|
||||
|
||||
#[derive(Debug, Default, Clone, PartialEq, Eq)]
|
||||
pub(crate) struct ResponseDebugContext {
|
||||
pub(crate) request_id: Option<String>,
|
||||
pub(crate) cf_ray: Option<String>,
|
||||
pub(crate) auth_error: Option<String>,
|
||||
pub(crate) auth_error_code: Option<String>,
|
||||
}
|
||||
|
||||
pub(crate) fn extract_response_debug_context(transport: &TransportError) -> ResponseDebugContext {
|
||||
let mut context = ResponseDebugContext::default();
|
||||
|
||||
let TransportError::Http {
|
||||
headers, body: _, ..
|
||||
} = transport
|
||||
else {
|
||||
return context;
|
||||
};
|
||||
|
||||
let extract_header = |name: &str| {
|
||||
headers
|
||||
.as_ref()
|
||||
.and_then(|headers| headers.get(name))
|
||||
.and_then(|value| value.to_str().ok())
|
||||
.map(str::to_string)
|
||||
};
|
||||
|
||||
context.request_id =
|
||||
extract_header(REQUEST_ID_HEADER).or_else(|| extract_header(OAI_REQUEST_ID_HEADER));
|
||||
context.cf_ray = extract_header(CF_RAY_HEADER);
|
||||
context.auth_error = extract_header(AUTH_ERROR_HEADER);
|
||||
context.auth_error_code = extract_header(X_ERROR_JSON_HEADER).and_then(|encoded| {
|
||||
let decoded = base64::engine::general_purpose::STANDARD
|
||||
.decode(encoded)
|
||||
.ok()?;
|
||||
let parsed = serde_json::from_slice::<serde_json::Value>(&decoded).ok()?;
|
||||
parsed
|
||||
.get("error")
|
||||
.and_then(|error| error.get("code"))
|
||||
.and_then(serde_json::Value::as_str)
|
||||
.map(str::to_string)
|
||||
});
|
||||
|
||||
context
|
||||
}
|
||||
|
||||
pub(crate) fn extract_response_debug_context_from_api_error(
|
||||
error: &ApiError,
|
||||
) -> ResponseDebugContext {
|
||||
match error {
|
||||
ApiError::Transport(transport) => extract_response_debug_context(transport),
|
||||
_ => ResponseDebugContext::default(),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn telemetry_transport_error_message(error: &TransportError) -> String {
|
||||
match error {
|
||||
TransportError::Http { status, .. } => format!("http {}", status.as_u16()),
|
||||
TransportError::RetryLimit => "retry limit reached".to_string(),
|
||||
TransportError::Timeout => "timeout".to_string(),
|
||||
TransportError::Network(err) => err.to_string(),
|
||||
TransportError::Build(err) => err.to_string(),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn telemetry_api_error_message(error: &ApiError) -> String {
|
||||
match error {
|
||||
ApiError::Transport(transport) => telemetry_transport_error_message(transport),
|
||||
ApiError::Api { status, .. } => format!("api error {}", status.as_u16()),
|
||||
ApiError::Stream(err) => err.to_string(),
|
||||
ApiError::ContextWindowExceeded => "context window exceeded".to_string(),
|
||||
ApiError::QuotaExceeded => "quota exceeded".to_string(),
|
||||
ApiError::UsageNotIncluded => "usage not included".to_string(),
|
||||
ApiError::Retryable { .. } => "retryable error".to_string(),
|
||||
ApiError::RateLimit(_) => "rate limit".to_string(),
|
||||
ApiError::InvalidRequest { .. } => "invalid request".to_string(),
|
||||
ApiError::ServerOverloaded => "server overloaded".to_string(),
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::ResponseDebugContext;
|
||||
use super::extract_response_debug_context;
|
||||
use super::telemetry_api_error_message;
|
||||
use super::telemetry_transport_error_message;
|
||||
use codex_api::TransportError;
|
||||
use codex_api::error::ApiError;
|
||||
use http::HeaderMap;
|
||||
use http::HeaderValue;
|
||||
use http::StatusCode;
|
||||
use pretty_assertions::assert_eq;
|
||||
|
||||
#[test]
|
||||
fn extract_response_debug_context_decodes_identity_headers() {
|
||||
let mut headers = HeaderMap::new();
|
||||
headers.insert("x-oai-request-id", HeaderValue::from_static("req-auth"));
|
||||
headers.insert("cf-ray", HeaderValue::from_static("ray-auth"));
|
||||
headers.insert(
|
||||
"x-openai-authorization-error",
|
||||
HeaderValue::from_static("missing_authorization_header"),
|
||||
);
|
||||
headers.insert(
|
||||
"x-error-json",
|
||||
HeaderValue::from_static("eyJlcnJvciI6eyJjb2RlIjoidG9rZW5fZXhwaXJlZCJ9fQ=="),
|
||||
);
|
||||
|
||||
let context = extract_response_debug_context(&TransportError::Http {
|
||||
status: StatusCode::UNAUTHORIZED,
|
||||
url: Some("https://chatgpt.com/backend-api/codex/models".to_string()),
|
||||
headers: Some(headers),
|
||||
body: Some(r#"{"error":{"message":"plain text error"},"status":401}"#.to_string()),
|
||||
});
|
||||
|
||||
assert_eq!(
|
||||
context,
|
||||
ResponseDebugContext {
|
||||
request_id: Some("req-auth".to_string()),
|
||||
cf_ray: Some("ray-auth".to_string()),
|
||||
auth_error: Some("missing_authorization_header".to_string()),
|
||||
auth_error_code: Some("token_expired".to_string()),
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn telemetry_error_messages_omit_http_bodies() {
|
||||
let transport = TransportError::Http {
|
||||
status: StatusCode::UNAUTHORIZED,
|
||||
url: Some("https://chatgpt.com/backend-api/codex/responses".to_string()),
|
||||
headers: None,
|
||||
body: Some(r#"{"error":{"message":"secret token leaked"}}"#.to_string()),
|
||||
};
|
||||
|
||||
assert_eq!(telemetry_transport_error_message(&transport), "http 401");
|
||||
assert_eq!(
|
||||
telemetry_api_error_message(&ApiError::Transport(transport)),
|
||||
"http 401"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn telemetry_error_messages_preserve_non_http_details() {
|
||||
let network = TransportError::Network("dns lookup failed".to_string());
|
||||
let build = TransportError::Build("invalid header value".to_string());
|
||||
let stream = ApiError::Stream("socket closed".to_string());
|
||||
|
||||
assert_eq!(
|
||||
telemetry_transport_error_message(&network),
|
||||
"dns lookup failed"
|
||||
);
|
||||
assert_eq!(
|
||||
telemetry_transport_error_message(&build),
|
||||
"invalid header value"
|
||||
);
|
||||
assert_eq!(telemetry_api_error_message(&stream), "socket closed");
|
||||
}
|
||||
}
|
||||
@@ -5,6 +5,7 @@ use crate::client_common::tools::ToolSpec;
|
||||
use crate::config::AgentRoleConfig;
|
||||
use crate::features::Feature;
|
||||
use crate::features::Features;
|
||||
use crate::mcp::CODEX_APPS_MCP_SERVER_NAME;
|
||||
use crate::mcp_connection_manager::ToolInfo;
|
||||
use crate::models_manager::collaboration_mode_presets::CollaborationModesConfig;
|
||||
use crate::original_image_detail::can_request_original_image_detail;
|
||||
@@ -1673,22 +1674,58 @@ fn create_tool_search_tool(app_tools: &HashMap<String, ToolInfo>) -> ToolSpec {
|
||||
},
|
||||
),
|
||||
]);
|
||||
let mut app_names = app_tools
|
||||
.values()
|
||||
.filter_map(|tool| tool.connector_name.clone())
|
||||
.collect::<Vec<_>>();
|
||||
app_names.sort();
|
||||
app_names.dedup();
|
||||
let app_names = app_names.join(", ");
|
||||
let mut app_descriptions = BTreeMap::new();
|
||||
for tool in app_tools.values() {
|
||||
if tool.server_name != CODEX_APPS_MCP_SERVER_NAME {
|
||||
continue;
|
||||
}
|
||||
|
||||
let description = if app_names.is_empty() {
|
||||
TOOL_SEARCH_DESCRIPTION_TEMPLATE
|
||||
.replace("({{app_names}})", "(None currently enabled)")
|
||||
.replace("{{app_names}}", "available apps")
|
||||
let Some(connector_name) = tool
|
||||
.connector_name
|
||||
.as_deref()
|
||||
.map(str::trim)
|
||||
.filter(|connector_name| !connector_name.is_empty())
|
||||
else {
|
||||
continue;
|
||||
};
|
||||
|
||||
let connector_description = tool
|
||||
.connector_description
|
||||
.as_deref()
|
||||
.map(str::trim)
|
||||
.filter(|connector_description| !connector_description.is_empty())
|
||||
.map(str::to_string);
|
||||
|
||||
app_descriptions
|
||||
.entry(connector_name.to_string())
|
||||
.and_modify(|existing: &mut Option<String>| {
|
||||
if existing.is_none() {
|
||||
*existing = connector_description.clone();
|
||||
}
|
||||
})
|
||||
.or_insert(connector_description);
|
||||
}
|
||||
|
||||
let app_descriptions = if app_descriptions.is_empty() {
|
||||
"None currently enabled.".to_string()
|
||||
} else {
|
||||
TOOL_SEARCH_DESCRIPTION_TEMPLATE.replace("{{app_names}}", app_names.as_str())
|
||||
app_descriptions
|
||||
.into_iter()
|
||||
.map(
|
||||
|(connector_name, connector_description)| match connector_description {
|
||||
Some(connector_description) => {
|
||||
format!("- {connector_name}: {connector_description}")
|
||||
}
|
||||
None => format!("- {connector_name}"),
|
||||
},
|
||||
)
|
||||
.collect::<Vec<_>>()
|
||||
.join("\n")
|
||||
};
|
||||
|
||||
let description =
|
||||
TOOL_SEARCH_DESCRIPTION_TEMPLATE.replace("{{app_descriptions}}", app_descriptions.as_str());
|
||||
|
||||
ToolSpec::ToolSearch {
|
||||
execution: "client".to_string(),
|
||||
description,
|
||||
|
||||
@@ -1690,7 +1690,7 @@ fn test_build_specs_mcp_tools_sorted_by_name() {
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn search_tool_description_includes_only_codex_apps_connector_names() {
|
||||
fn search_tool_description_lists_each_codex_apps_connector_once() {
|
||||
let model_info = search_capable_model_info();
|
||||
let mut features = Features::with_defaults();
|
||||
features.enable(Feature::Apps);
|
||||
@@ -1736,7 +1736,45 @@ fn search_tool_description_includes_only_codex_apps_connector_names() {
|
||||
connector_id: Some("calendar".to_string()),
|
||||
connector_name: Some("Calendar".to_string()),
|
||||
plugin_display_names: Vec::new(),
|
||||
connector_description: None,
|
||||
connector_description: Some(
|
||||
"Plan events and manage your calendar.".to_string(),
|
||||
),
|
||||
},
|
||||
),
|
||||
(
|
||||
"mcp__codex_apps__calendar_list_events".to_string(),
|
||||
ToolInfo {
|
||||
server_name: crate::mcp::CODEX_APPS_MCP_SERVER_NAME.to_string(),
|
||||
tool_name: "_list_events".to_string(),
|
||||
tool_namespace: "mcp__codex_apps__calendar".to_string(),
|
||||
tool: mcp_tool(
|
||||
"calendar-list-events",
|
||||
"List calendar events",
|
||||
serde_json::json!({"type": "object"}),
|
||||
),
|
||||
connector_id: Some("calendar".to_string()),
|
||||
connector_name: Some("Calendar".to_string()),
|
||||
plugin_display_names: Vec::new(),
|
||||
connector_description: Some(
|
||||
"Plan events and manage your calendar.".to_string(),
|
||||
),
|
||||
},
|
||||
),
|
||||
(
|
||||
"mcp__codex_apps__gmail_search_threads".to_string(),
|
||||
ToolInfo {
|
||||
server_name: crate::mcp::CODEX_APPS_MCP_SERVER_NAME.to_string(),
|
||||
tool_name: "_search_threads".to_string(),
|
||||
tool_namespace: "mcp__codex_apps__gmail".to_string(),
|
||||
tool: mcp_tool(
|
||||
"gmail-search-threads",
|
||||
"Search email threads",
|
||||
serde_json::json!({"type": "object"}),
|
||||
),
|
||||
connector_id: Some("gmail".to_string()),
|
||||
connector_name: Some("Gmail".to_string()),
|
||||
plugin_display_names: Vec::new(),
|
||||
connector_description: Some("Find and summarize email threads.".to_string()),
|
||||
},
|
||||
),
|
||||
(
|
||||
@@ -1762,7 +1800,14 @@ fn search_tool_description_includes_only_codex_apps_connector_names() {
|
||||
panic!("expected tool_search tool");
|
||||
};
|
||||
let description = description.as_str();
|
||||
assert!(description.contains("Calendar"));
|
||||
assert!(description.contains("- Calendar: Plan events and manage your calendar."));
|
||||
assert!(description.contains("- Gmail: Find and summarize email threads."));
|
||||
assert_eq!(
|
||||
description
|
||||
.matches("- Calendar: Plan events and manage your calendar.")
|
||||
.count(),
|
||||
1
|
||||
);
|
||||
assert!(!description.contains("mcp__rmcp__echo"));
|
||||
}
|
||||
|
||||
@@ -1874,8 +1919,56 @@ fn search_tool_description_handles_no_enabled_apps() {
|
||||
panic!("expected tool_search tool");
|
||||
};
|
||||
|
||||
assert!(description.contains("(None currently enabled)"));
|
||||
assert!(!description.contains("{{app_names}}"));
|
||||
assert!(description.contains("None currently enabled."));
|
||||
assert!(!description.contains("{{app_descriptions}}"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn search_tool_description_falls_back_to_connector_name_without_description() {
|
||||
let model_info = search_capable_model_info();
|
||||
let mut features = Features::with_defaults();
|
||||
features.enable(Feature::Apps);
|
||||
let available_models = Vec::new();
|
||||
let tools_config = ToolsConfig::new(&ToolsConfigParams {
|
||||
model_info: &model_info,
|
||||
available_models: &available_models,
|
||||
features: &features,
|
||||
web_search_mode: Some(WebSearchMode::Cached),
|
||||
session_source: SessionSource::Cli,
|
||||
sandbox_policy: &SandboxPolicy::DangerFullAccess,
|
||||
windows_sandbox_level: WindowsSandboxLevel::Disabled,
|
||||
});
|
||||
|
||||
let (tools, _) = build_specs(
|
||||
&tools_config,
|
||||
None,
|
||||
Some(HashMap::from([(
|
||||
"mcp__codex_apps__calendar_create_event".to_string(),
|
||||
ToolInfo {
|
||||
server_name: crate::mcp::CODEX_APPS_MCP_SERVER_NAME.to_string(),
|
||||
tool_name: "_create_event".to_string(),
|
||||
tool_namespace: "mcp__codex_apps__calendar".to_string(),
|
||||
tool: mcp_tool(
|
||||
"calendar_create_event",
|
||||
"Create calendar event",
|
||||
serde_json::json!({"type": "object"}),
|
||||
),
|
||||
connector_id: Some("calendar".to_string()),
|
||||
connector_name: Some("Calendar".to_string()),
|
||||
plugin_display_names: Vec::new(),
|
||||
connector_description: None,
|
||||
},
|
||||
)])),
|
||||
&[],
|
||||
)
|
||||
.build();
|
||||
let search_tool = find_tool(&tools, TOOL_SEARCH_TOOL_NAME);
|
||||
let ToolSpec::ToolSearch { description, .. } = &search_tool.spec else {
|
||||
panic!("expected tool_search tool");
|
||||
};
|
||||
|
||||
assert!(description.contains("- Calendar"));
|
||||
assert!(!description.contains("- Calendar:"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
@@ -37,6 +37,111 @@ macro_rules! feedback_tags {
|
||||
};
|
||||
}
|
||||
|
||||
pub(crate) struct FeedbackRequestTags<'a> {
|
||||
pub endpoint: &'a str,
|
||||
pub auth_header_attached: bool,
|
||||
pub auth_header_name: Option<&'a str>,
|
||||
pub auth_mode: Option<&'a str>,
|
||||
pub auth_retry_after_unauthorized: Option<bool>,
|
||||
pub auth_recovery_mode: Option<&'a str>,
|
||||
pub auth_recovery_phase: Option<&'a str>,
|
||||
pub auth_connection_reused: Option<bool>,
|
||||
pub auth_request_id: Option<&'a str>,
|
||||
pub auth_cf_ray: Option<&'a str>,
|
||||
pub auth_error: Option<&'a str>,
|
||||
pub auth_error_code: Option<&'a str>,
|
||||
pub auth_recovery_followup_success: Option<bool>,
|
||||
pub auth_recovery_followup_status: Option<u16>,
|
||||
}
|
||||
|
||||
struct Auth401FeedbackSnapshot<'a> {
|
||||
request_id: &'a str,
|
||||
cf_ray: &'a str,
|
||||
error: &'a str,
|
||||
error_code: &'a str,
|
||||
}
|
||||
|
||||
impl<'a> Auth401FeedbackSnapshot<'a> {
|
||||
fn from_optional_fields(
|
||||
request_id: Option<&'a str>,
|
||||
cf_ray: Option<&'a str>,
|
||||
error: Option<&'a str>,
|
||||
error_code: Option<&'a str>,
|
||||
) -> Self {
|
||||
Self {
|
||||
request_id: request_id.unwrap_or(""),
|
||||
cf_ray: cf_ray.unwrap_or(""),
|
||||
error: error.unwrap_or(""),
|
||||
error_code: error_code.unwrap_or(""),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn emit_feedback_request_tags(tags: &FeedbackRequestTags<'_>) {
|
||||
let auth_header_name = tags.auth_header_name.unwrap_or("");
|
||||
let auth_mode = tags.auth_mode.unwrap_or("");
|
||||
let auth_retry_after_unauthorized = tags
|
||||
.auth_retry_after_unauthorized
|
||||
.map_or_else(String::new, |value| value.to_string());
|
||||
let auth_recovery_mode = tags.auth_recovery_mode.unwrap_or("");
|
||||
let auth_recovery_phase = tags.auth_recovery_phase.unwrap_or("");
|
||||
let auth_connection_reused = tags
|
||||
.auth_connection_reused
|
||||
.map_or_else(String::new, |value| value.to_string());
|
||||
let auth_request_id = tags.auth_request_id.unwrap_or("");
|
||||
let auth_cf_ray = tags.auth_cf_ray.unwrap_or("");
|
||||
let auth_error = tags.auth_error.unwrap_or("");
|
||||
let auth_error_code = tags.auth_error_code.unwrap_or("");
|
||||
let auth_recovery_followup_success = tags
|
||||
.auth_recovery_followup_success
|
||||
.map_or_else(String::new, |value| value.to_string());
|
||||
let auth_recovery_followup_status = tags
|
||||
.auth_recovery_followup_status
|
||||
.map_or_else(String::new, |value| value.to_string());
|
||||
feedback_tags!(
|
||||
endpoint = tags.endpoint,
|
||||
auth_header_attached = tags.auth_header_attached,
|
||||
auth_header_name = auth_header_name,
|
||||
auth_mode = auth_mode,
|
||||
auth_retry_after_unauthorized = auth_retry_after_unauthorized,
|
||||
auth_recovery_mode = auth_recovery_mode,
|
||||
auth_recovery_phase = auth_recovery_phase,
|
||||
auth_connection_reused = auth_connection_reused,
|
||||
auth_request_id = auth_request_id,
|
||||
auth_cf_ray = auth_cf_ray,
|
||||
auth_error = auth_error,
|
||||
auth_error_code = auth_error_code,
|
||||
auth_recovery_followup_success = auth_recovery_followup_success,
|
||||
auth_recovery_followup_status = auth_recovery_followup_status
|
||||
);
|
||||
}
|
||||
|
||||
pub(crate) fn emit_feedback_auth_recovery_tags(
|
||||
auth_recovery_mode: &str,
|
||||
auth_recovery_phase: &str,
|
||||
auth_recovery_outcome: &str,
|
||||
auth_request_id: Option<&str>,
|
||||
auth_cf_ray: Option<&str>,
|
||||
auth_error: Option<&str>,
|
||||
auth_error_code: Option<&str>,
|
||||
) {
|
||||
let auth_401 = Auth401FeedbackSnapshot::from_optional_fields(
|
||||
auth_request_id,
|
||||
auth_cf_ray,
|
||||
auth_error,
|
||||
auth_error_code,
|
||||
);
|
||||
feedback_tags!(
|
||||
auth_recovery_mode = auth_recovery_mode,
|
||||
auth_recovery_phase = auth_recovery_phase,
|
||||
auth_recovery_outcome = auth_recovery_outcome,
|
||||
auth_401_request_id = auth_401.request_id,
|
||||
auth_401_cf_ray = auth_401.cf_ray,
|
||||
auth_401_error = auth_401.error,
|
||||
auth_401_error_code = auth_401.error_code
|
||||
);
|
||||
}
|
||||
|
||||
pub fn backoff(attempt: u64) -> Duration {
|
||||
let exp = BACKOFF_FACTOR.powi(attempt.saturating_sub(1) as i32);
|
||||
let base = (INITIAL_DELAY_MS as f64 * exp) as u64;
|
||||
|
||||
@@ -1,4 +1,15 @@
|
||||
use super::*;
|
||||
use std::collections::BTreeMap;
|
||||
use std::sync::Arc;
|
||||
use std::sync::Mutex;
|
||||
use tracing::Event;
|
||||
use tracing::Subscriber;
|
||||
use tracing::field::Visit;
|
||||
use tracing_subscriber::Layer;
|
||||
use tracing_subscriber::layer::Context;
|
||||
use tracing_subscriber::layer::SubscriberExt;
|
||||
use tracing_subscriber::registry::LookupSpan;
|
||||
use tracing_subscriber::util::SubscriberInitExt;
|
||||
|
||||
#[test]
|
||||
fn test_try_parse_error_message() {
|
||||
@@ -32,6 +43,298 @@ fn feedback_tags_macro_compiles() {
|
||||
feedback_tags!(model = "gpt-5", cached = true, debug_only = OnlyDebug);
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
struct TagCollectorVisitor {
|
||||
tags: BTreeMap<String, String>,
|
||||
}
|
||||
|
||||
impl Visit for TagCollectorVisitor {
|
||||
fn record_bool(&mut self, field: &tracing::field::Field, value: bool) {
|
||||
self.tags
|
||||
.insert(field.name().to_string(), value.to_string());
|
||||
}
|
||||
|
||||
fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
|
||||
self.tags
|
||||
.insert(field.name().to_string(), value.to_string());
|
||||
}
|
||||
|
||||
fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
|
||||
self.tags
|
||||
.insert(field.name().to_string(), format!("{value:?}"));
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct TagCollectorLayer {
|
||||
tags: Arc<Mutex<BTreeMap<String, String>>>,
|
||||
}
|
||||
|
||||
impl<S> Layer<S> for TagCollectorLayer
|
||||
where
|
||||
S: Subscriber + for<'a> LookupSpan<'a>,
|
||||
{
|
||||
fn on_event(&self, event: &Event<'_>, _ctx: Context<'_, S>) {
|
||||
if event.metadata().target() != "feedback_tags" {
|
||||
return;
|
||||
}
|
||||
let mut visitor = TagCollectorVisitor::default();
|
||||
event.record(&mut visitor);
|
||||
self.tags.lock().unwrap().extend(visitor.tags);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn emit_feedback_request_tags_records_sentry_feedback_fields() {
|
||||
let tags = Arc::new(Mutex::new(BTreeMap::new()));
|
||||
let _guard = tracing_subscriber::registry()
|
||||
.with(TagCollectorLayer { tags: tags.clone() })
|
||||
.set_default();
|
||||
|
||||
emit_feedback_request_tags(&FeedbackRequestTags {
|
||||
endpoint: "/responses",
|
||||
auth_header_attached: true,
|
||||
auth_header_name: Some("authorization"),
|
||||
auth_mode: Some("chatgpt"),
|
||||
auth_retry_after_unauthorized: Some(false),
|
||||
auth_recovery_mode: Some("managed"),
|
||||
auth_recovery_phase: Some("refresh_token"),
|
||||
auth_connection_reused: Some(true),
|
||||
auth_request_id: Some("req-123"),
|
||||
auth_cf_ray: Some("ray-123"),
|
||||
auth_error: Some("missing_authorization_header"),
|
||||
auth_error_code: Some("token_expired"),
|
||||
auth_recovery_followup_success: Some(true),
|
||||
auth_recovery_followup_status: Some(200),
|
||||
});
|
||||
|
||||
let tags = tags.lock().unwrap().clone();
|
||||
assert_eq!(
|
||||
tags.get("endpoint").map(String::as_str),
|
||||
Some("\"/responses\"")
|
||||
);
|
||||
assert_eq!(
|
||||
tags.get("auth_header_attached").map(String::as_str),
|
||||
Some("true")
|
||||
);
|
||||
assert_eq!(
|
||||
tags.get("auth_header_name").map(String::as_str),
|
||||
Some("\"authorization\"")
|
||||
);
|
||||
assert_eq!(
|
||||
tags.get("auth_request_id").map(String::as_str),
|
||||
Some("\"req-123\"")
|
||||
);
|
||||
assert_eq!(
|
||||
tags.get("auth_error_code").map(String::as_str),
|
||||
Some("\"token_expired\"")
|
||||
);
|
||||
assert_eq!(
|
||||
tags.get("auth_recovery_followup_success")
|
||||
.map(String::as_str),
|
||||
Some("\"true\"")
|
||||
);
|
||||
assert_eq!(
|
||||
tags.get("auth_recovery_followup_status")
|
||||
.map(String::as_str),
|
||||
Some("\"200\"")
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn emit_feedback_auth_recovery_tags_preserves_401_specific_fields() {
|
||||
let tags = Arc::new(Mutex::new(BTreeMap::new()));
|
||||
let _guard = tracing_subscriber::registry()
|
||||
.with(TagCollectorLayer { tags: tags.clone() })
|
||||
.set_default();
|
||||
|
||||
emit_feedback_auth_recovery_tags(
|
||||
"managed",
|
||||
"refresh_token",
|
||||
"recovery_succeeded",
|
||||
Some("req-401"),
|
||||
Some("ray-401"),
|
||||
Some("missing_authorization_header"),
|
||||
Some("token_expired"),
|
||||
);
|
||||
|
||||
let tags = tags.lock().unwrap().clone();
|
||||
assert_eq!(
|
||||
tags.get("auth_401_request_id").map(String::as_str),
|
||||
Some("\"req-401\"")
|
||||
);
|
||||
assert_eq!(
|
||||
tags.get("auth_401_cf_ray").map(String::as_str),
|
||||
Some("\"ray-401\"")
|
||||
);
|
||||
assert_eq!(
|
||||
tags.get("auth_401_error").map(String::as_str),
|
||||
Some("\"missing_authorization_header\"")
|
||||
);
|
||||
assert_eq!(
|
||||
tags.get("auth_401_error_code").map(String::as_str),
|
||||
Some("\"token_expired\"")
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn emit_feedback_auth_recovery_tags_clears_stale_401_fields() {
|
||||
let tags = Arc::new(Mutex::new(BTreeMap::new()));
|
||||
let _guard = tracing_subscriber::registry()
|
||||
.with(TagCollectorLayer { tags: tags.clone() })
|
||||
.set_default();
|
||||
|
||||
emit_feedback_auth_recovery_tags(
|
||||
"managed",
|
||||
"refresh_token",
|
||||
"recovery_failed_transient",
|
||||
Some("req-401-a"),
|
||||
Some("ray-401-a"),
|
||||
Some("missing_authorization_header"),
|
||||
Some("token_expired"),
|
||||
);
|
||||
emit_feedback_auth_recovery_tags(
|
||||
"managed",
|
||||
"done",
|
||||
"recovery_not_run",
|
||||
Some("req-401-b"),
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
);
|
||||
|
||||
let tags = tags.lock().unwrap().clone();
|
||||
assert_eq!(
|
||||
tags.get("auth_401_request_id").map(String::as_str),
|
||||
Some("\"req-401-b\"")
|
||||
);
|
||||
assert_eq!(
|
||||
tags.get("auth_401_cf_ray").map(String::as_str),
|
||||
Some("\"\"")
|
||||
);
|
||||
assert_eq!(tags.get("auth_401_error").map(String::as_str), Some("\"\""));
|
||||
assert_eq!(
|
||||
tags.get("auth_401_error_code").map(String::as_str),
|
||||
Some("\"\"")
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn emit_feedback_request_tags_preserves_latest_auth_fields_after_unauthorized() {
|
||||
let tags = Arc::new(Mutex::new(BTreeMap::new()));
|
||||
let _guard = tracing_subscriber::registry()
|
||||
.with(TagCollectorLayer { tags: tags.clone() })
|
||||
.set_default();
|
||||
|
||||
emit_feedback_request_tags(&FeedbackRequestTags {
|
||||
endpoint: "/responses",
|
||||
auth_header_attached: true,
|
||||
auth_header_name: Some("authorization"),
|
||||
auth_mode: Some("chatgpt"),
|
||||
auth_retry_after_unauthorized: Some(true),
|
||||
auth_recovery_mode: Some("managed"),
|
||||
auth_recovery_phase: Some("refresh_token"),
|
||||
auth_connection_reused: None,
|
||||
auth_request_id: Some("req-123"),
|
||||
auth_cf_ray: Some("ray-123"),
|
||||
auth_error: Some("missing_authorization_header"),
|
||||
auth_error_code: Some("token_expired"),
|
||||
auth_recovery_followup_success: Some(false),
|
||||
auth_recovery_followup_status: Some(401),
|
||||
});
|
||||
|
||||
let tags = tags.lock().unwrap().clone();
|
||||
assert_eq!(
|
||||
tags.get("auth_request_id").map(String::as_str),
|
||||
Some("\"req-123\"")
|
||||
);
|
||||
assert_eq!(
|
||||
tags.get("auth_cf_ray").map(String::as_str),
|
||||
Some("\"ray-123\"")
|
||||
);
|
||||
assert_eq!(
|
||||
tags.get("auth_error").map(String::as_str),
|
||||
Some("\"missing_authorization_header\"")
|
||||
);
|
||||
assert_eq!(
|
||||
tags.get("auth_error_code").map(String::as_str),
|
||||
Some("\"token_expired\"")
|
||||
);
|
||||
assert_eq!(
|
||||
tags.get("auth_recovery_followup_success")
|
||||
.map(String::as_str),
|
||||
Some("\"false\"")
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn emit_feedback_request_tags_clears_stale_latest_auth_fields() {
|
||||
let tags = Arc::new(Mutex::new(BTreeMap::new()));
|
||||
let _guard = tracing_subscriber::registry()
|
||||
.with(TagCollectorLayer { tags: tags.clone() })
|
||||
.set_default();
|
||||
|
||||
emit_feedback_request_tags(&FeedbackRequestTags {
|
||||
endpoint: "/responses",
|
||||
auth_header_attached: true,
|
||||
auth_header_name: Some("authorization"),
|
||||
auth_mode: Some("chatgpt"),
|
||||
auth_retry_after_unauthorized: Some(false),
|
||||
auth_recovery_mode: Some("managed"),
|
||||
auth_recovery_phase: Some("refresh_token"),
|
||||
auth_connection_reused: Some(true),
|
||||
auth_request_id: Some("req-123"),
|
||||
auth_cf_ray: Some("ray-123"),
|
||||
auth_error: Some("missing_authorization_header"),
|
||||
auth_error_code: Some("token_expired"),
|
||||
auth_recovery_followup_success: Some(true),
|
||||
auth_recovery_followup_status: Some(200),
|
||||
});
|
||||
emit_feedback_request_tags(&FeedbackRequestTags {
|
||||
endpoint: "/responses",
|
||||
auth_header_attached: true,
|
||||
auth_header_name: None,
|
||||
auth_mode: None,
|
||||
auth_retry_after_unauthorized: None,
|
||||
auth_recovery_mode: None,
|
||||
auth_recovery_phase: None,
|
||||
auth_connection_reused: None,
|
||||
auth_request_id: None,
|
||||
auth_cf_ray: None,
|
||||
auth_error: None,
|
||||
auth_error_code: None,
|
||||
auth_recovery_followup_success: None,
|
||||
auth_recovery_followup_status: None,
|
||||
});
|
||||
|
||||
let tags = tags.lock().unwrap().clone();
|
||||
assert_eq!(
|
||||
tags.get("auth_header_name").map(String::as_str),
|
||||
Some("\"\"")
|
||||
);
|
||||
assert_eq!(tags.get("auth_mode").map(String::as_str), Some("\"\""));
|
||||
assert_eq!(
|
||||
tags.get("auth_request_id").map(String::as_str),
|
||||
Some("\"\"")
|
||||
);
|
||||
assert_eq!(tags.get("auth_cf_ray").map(String::as_str), Some("\"\""));
|
||||
assert_eq!(tags.get("auth_error").map(String::as_str), Some("\"\""));
|
||||
assert_eq!(
|
||||
tags.get("auth_error_code").map(String::as_str),
|
||||
Some("\"\"")
|
||||
);
|
||||
assert_eq!(
|
||||
tags.get("auth_recovery_followup_success")
|
||||
.map(String::as_str),
|
||||
Some("\"\"")
|
||||
);
|
||||
assert_eq!(
|
||||
tags.get("auth_recovery_followup_status")
|
||||
.map(String::as_str),
|
||||
Some("\"\"")
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn normalize_thread_name_trims_and_rejects_empty() {
|
||||
assert_eq!(normalize_thread_name(" "), None);
|
||||
|
||||
@@ -2,5 +2,6 @@
|
||||
|
||||
Searches over apps/connectors tool metadata with BM25 and exposes matching tools for the next model call.
|
||||
|
||||
Tools of the apps ({{app_names}}) are hidden until you search for them with this tool (`tool_search`).
|
||||
When the request needs one of these connectors and you don't already have the required tools from it, use this tool to load them. For the apps mentioned above, always prefer `tool_search` over `list_mcp_resources` or `list_mcp_resource_templates` for tool discovery.
|
||||
You have access to all the tools of the following apps/connectors:
|
||||
{{app_descriptions}}
|
||||
Some of the tools may not have been provided to you upfront, and you should use this tool (`tool_search`) to search for the required tools and load them for the apps mentioned above. For the apps mentioned above, always use `tool_search` instead of `list_mcp_resources` or `list_mcp_resource_templates` for tool discovery.
|
||||
|
||||
@@ -18,6 +18,9 @@ const CONNECTOR_DESCRIPTION: &str = "Plan events and manage your calendar.";
|
||||
const PROTOCOL_VERSION: &str = "2025-11-25";
|
||||
const SERVER_NAME: &str = "codex-apps-test";
|
||||
const SERVER_VERSION: &str = "1.0.0";
|
||||
pub const CALENDAR_CREATE_EVENT_RESOURCE_URI: &str =
|
||||
"connector://calendar/tools/calendar_create_event";
|
||||
const CALENDAR_LIST_EVENTS_RESOURCE_URI: &str = "connector://calendar/tools/calendar_list_events";
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct AppsTestServer {
|
||||
@@ -175,7 +178,12 @@ impl Respond for CodexAppsJsonRpcResponder {
|
||||
"_meta": {
|
||||
"connector_id": CONNECTOR_ID,
|
||||
"connector_name": self.connector_name.clone(),
|
||||
"connector_description": self.connector_description.clone()
|
||||
"connector_description": self.connector_description.clone(),
|
||||
"_codex_apps": {
|
||||
"resource_uri": CALENDAR_CREATE_EVENT_RESOURCE_URI,
|
||||
"contains_mcp_source": true,
|
||||
"connector_id": CONNECTOR_ID
|
||||
}
|
||||
}
|
||||
},
|
||||
{
|
||||
@@ -192,7 +200,12 @@ impl Respond for CodexAppsJsonRpcResponder {
|
||||
"_meta": {
|
||||
"connector_id": CONNECTOR_ID,
|
||||
"connector_name": self.connector_name.clone(),
|
||||
"connector_description": self.connector_description.clone()
|
||||
"connector_description": self.connector_description.clone(),
|
||||
"_codex_apps": {
|
||||
"resource_uri": CALENDAR_LIST_EVENTS_RESOURCE_URI,
|
||||
"contains_mcp_source": true,
|
||||
"connector_id": CONNECTOR_ID
|
||||
}
|
||||
}
|
||||
}
|
||||
],
|
||||
@@ -214,6 +227,7 @@ impl Respond for CodexAppsJsonRpcResponder {
|
||||
.pointer("/params/arguments/starts_at")
|
||||
.and_then(Value::as_str)
|
||||
.unwrap_or_default();
|
||||
let codex_apps_meta = body.pointer("/params/_meta/_codex_apps").cloned();
|
||||
|
||||
ResponseTemplate::new(200).set_body_json(json!({
|
||||
"jsonrpc": "2.0",
|
||||
@@ -223,6 +237,9 @@ impl Respond for CodexAppsJsonRpcResponder {
|
||||
"type": "text",
|
||||
"text": format!("called {tool_name} for {title} at {starts_at}")
|
||||
}],
|
||||
"structuredContent": {
|
||||
"_codex_apps": codex_apps_meta,
|
||||
},
|
||||
"isError": false
|
||||
}
|
||||
}))
|
||||
|
||||
@@ -943,10 +943,6 @@ async fn includes_apps_guidance_as_developer_message_for_chatgpt_auth() {
|
||||
.features
|
||||
.enable(Feature::Apps)
|
||||
.expect("test config should allow feature update");
|
||||
config
|
||||
.features
|
||||
.disable(Feature::AppsMcpGateway)
|
||||
.expect("test config should allow feature update");
|
||||
config.chatgpt_base_url = apps_base_url;
|
||||
});
|
||||
let codex = builder
|
||||
@@ -971,7 +967,8 @@ async fn includes_apps_guidance_as_developer_message_for_chatgpt_auth() {
|
||||
let request = resp_mock.single_request();
|
||||
let request_body = request.body_json();
|
||||
let input = request_body["input"].as_array().expect("input array");
|
||||
let apps_snippet = "Apps are mentioned in user messages in the format";
|
||||
let apps_snippet =
|
||||
"Apps (Connectors) can be explicitly triggered in user messages in the format";
|
||||
|
||||
let has_developer_apps_guidance = input.iter().any(|item| {
|
||||
item.get("role").and_then(|value| value.as_str()) == Some("developer")
|
||||
@@ -1034,10 +1031,6 @@ async fn omits_apps_guidance_for_api_key_auth_even_when_feature_enabled() {
|
||||
.features
|
||||
.enable(Feature::Apps)
|
||||
.expect("test config should allow feature update");
|
||||
config
|
||||
.features
|
||||
.disable(Feature::AppsMcpGateway)
|
||||
.expect("test config should allow feature update");
|
||||
config.chatgpt_base_url = apps_base_url;
|
||||
});
|
||||
let codex = builder
|
||||
@@ -1062,7 +1055,8 @@ async fn omits_apps_guidance_for_api_key_auth_even_when_feature_enabled() {
|
||||
let request = resp_mock.single_request();
|
||||
let request_body = request.body_json();
|
||||
let input = request_body["input"].as_array().expect("input array");
|
||||
let apps_snippet = "Apps are mentioned in the prompt in the format";
|
||||
let apps_snippet =
|
||||
"Apps (Connectors) can be explicitly triggered in user messages in the format";
|
||||
|
||||
let has_apps_guidance = input.iter().any(|item| {
|
||||
item.get("content")
|
||||
|
||||
@@ -142,10 +142,6 @@ async fn build_apps_enabled_plugin_test_codex(
|
||||
.features
|
||||
.enable(Feature::Apps)
|
||||
.expect("test config should allow feature update");
|
||||
config
|
||||
.features
|
||||
.disable(Feature::AppsMcpGateway)
|
||||
.expect("test config should allow feature update");
|
||||
config.chatgpt_base_url = chatgpt_base_url;
|
||||
});
|
||||
Ok(builder
|
||||
|
||||
@@ -13,6 +13,7 @@ use codex_protocol::protocol::Op;
|
||||
use codex_protocol::protocol::SandboxPolicy;
|
||||
use codex_protocol::user_input::UserInput;
|
||||
use core_test_support::apps_test_server::AppsTestServer;
|
||||
use core_test_support::apps_test_server::CALENDAR_CREATE_EVENT_RESOURCE_URI;
|
||||
use core_test_support::responses::ResponsesRequest;
|
||||
use core_test_support::responses::ev_assistant_message;
|
||||
use core_test_support::responses::ev_completed;
|
||||
@@ -30,8 +31,9 @@ use pretty_assertions::assert_eq;
|
||||
use serde_json::Value;
|
||||
use serde_json::json;
|
||||
|
||||
const SEARCH_TOOL_DESCRIPTION_SNIPPETS: [&str; 1] = [
|
||||
"Tools of the apps (Calendar) are hidden until you search for them with this tool (`tool_search`).",
|
||||
const SEARCH_TOOL_DESCRIPTION_SNIPPETS: [&str; 2] = [
|
||||
"You have access to all the tools of the following apps/connectors",
|
||||
"- Calendar: Plan events and manage your calendar.",
|
||||
];
|
||||
const TOOL_SEARCH_TOOL_NAME: &str = "tool_search";
|
||||
const CALENDAR_CREATE_TOOL: &str = "mcp__codex_apps__calendar_create_event";
|
||||
@@ -89,10 +91,6 @@ fn configure_apps(config: &mut Config, apps_base_url: &str) {
|
||||
.features
|
||||
.enable(Feature::Apps)
|
||||
.expect("test config should allow feature update");
|
||||
config
|
||||
.features
|
||||
.disable(Feature::AppsMcpGateway)
|
||||
.expect("test config should allow feature update");
|
||||
config.chatgpt_base_url = apps_base_url.to_string();
|
||||
config.model = Some("gpt-5-codex".to_string());
|
||||
|
||||
@@ -404,6 +402,19 @@ async fn tool_search_returns_deferred_tools_without_follow_up_tool_injection() -
|
||||
})),
|
||||
}
|
||||
);
|
||||
assert_eq!(
|
||||
end.result
|
||||
.as_ref()
|
||||
.expect("tool call should succeed")
|
||||
.structured_content,
|
||||
Some(json!({
|
||||
"_codex_apps": {
|
||||
"resource_uri": CALENDAR_CREATE_EVENT_RESOURCE_URI,
|
||||
"contains_mcp_source": true,
|
||||
"connector_id": "calendar",
|
||||
},
|
||||
}))
|
||||
);
|
||||
|
||||
wait_for_event(&test.codex, |event| {
|
||||
matches!(event, EventMsg::TurnComplete(_))
|
||||
|
||||
@@ -340,17 +340,43 @@ impl SessionTelemetry {
|
||||
Ok(response) => (Some(response.status().as_u16()), None),
|
||||
Err(error) => (error.status().map(|s| s.as_u16()), Some(error.to_string())),
|
||||
};
|
||||
self.record_api_request(attempt, status, error.as_deref(), duration);
|
||||
self.record_api_request(
|
||||
attempt,
|
||||
status,
|
||||
error.as_deref(),
|
||||
duration,
|
||||
false,
|
||||
None,
|
||||
false,
|
||||
None,
|
||||
None,
|
||||
"unknown",
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
);
|
||||
|
||||
response
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub fn record_api_request(
|
||||
&self,
|
||||
attempt: u64,
|
||||
status: Option<u16>,
|
||||
error: Option<&str>,
|
||||
duration: Duration,
|
||||
auth_header_attached: bool,
|
||||
auth_header_name: Option<&str>,
|
||||
retry_after_unauthorized: bool,
|
||||
recovery_mode: Option<&str>,
|
||||
recovery_phase: Option<&str>,
|
||||
endpoint: &str,
|
||||
request_id: Option<&str>,
|
||||
cf_ray: Option<&str>,
|
||||
auth_error: Option<&str>,
|
||||
auth_error_code: Option<&str>,
|
||||
) {
|
||||
let success = status.is_some_and(|code| (200..=299).contains(&code)) && error.is_none();
|
||||
let success_str = if success { "true" } else { "false" };
|
||||
@@ -375,13 +401,76 @@ impl SessionTelemetry {
|
||||
http.response.status_code = status,
|
||||
error.message = error,
|
||||
attempt = attempt,
|
||||
auth.header_attached = auth_header_attached,
|
||||
auth.header_name = auth_header_name,
|
||||
auth.retry_after_unauthorized = retry_after_unauthorized,
|
||||
auth.recovery_mode = recovery_mode,
|
||||
auth.recovery_phase = recovery_phase,
|
||||
endpoint = endpoint,
|
||||
auth.request_id = request_id,
|
||||
auth.cf_ray = cf_ray,
|
||||
auth.error = auth_error,
|
||||
auth.error_code = auth_error_code,
|
||||
},
|
||||
log: {},
|
||||
trace: {},
|
||||
);
|
||||
}
|
||||
|
||||
pub fn record_websocket_request(&self, duration: Duration, error: Option<&str>) {
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub fn record_websocket_connect(
|
||||
&self,
|
||||
duration: Duration,
|
||||
status: Option<u16>,
|
||||
error: Option<&str>,
|
||||
auth_header_attached: bool,
|
||||
auth_header_name: Option<&str>,
|
||||
retry_after_unauthorized: bool,
|
||||
recovery_mode: Option<&str>,
|
||||
recovery_phase: Option<&str>,
|
||||
endpoint: &str,
|
||||
connection_reused: bool,
|
||||
request_id: Option<&str>,
|
||||
cf_ray: Option<&str>,
|
||||
auth_error: Option<&str>,
|
||||
auth_error_code: Option<&str>,
|
||||
) {
|
||||
let success = error.is_none()
|
||||
&& status
|
||||
.map(|code| (200..=299).contains(&code))
|
||||
.unwrap_or(true);
|
||||
let success_str = if success { "true" } else { "false" };
|
||||
log_and_trace_event!(
|
||||
self,
|
||||
common: {
|
||||
event.name = "codex.websocket_connect",
|
||||
duration_ms = %duration.as_millis(),
|
||||
http.response.status_code = status,
|
||||
success = success_str,
|
||||
error.message = error,
|
||||
auth.header_attached = auth_header_attached,
|
||||
auth.header_name = auth_header_name,
|
||||
auth.retry_after_unauthorized = retry_after_unauthorized,
|
||||
auth.recovery_mode = recovery_mode,
|
||||
auth.recovery_phase = recovery_phase,
|
||||
endpoint = endpoint,
|
||||
auth.connection_reused = connection_reused,
|
||||
auth.request_id = request_id,
|
||||
auth.cf_ray = cf_ray,
|
||||
auth.error = auth_error,
|
||||
auth.error_code = auth_error_code,
|
||||
},
|
||||
log: {},
|
||||
trace: {},
|
||||
);
|
||||
}
|
||||
|
||||
pub fn record_websocket_request(
|
||||
&self,
|
||||
duration: Duration,
|
||||
error: Option<&str>,
|
||||
connection_reused: bool,
|
||||
) {
|
||||
let success_str = if error.is_none() { "true" } else { "false" };
|
||||
self.counter(
|
||||
WEBSOCKET_REQUEST_COUNT_METRIC,
|
||||
@@ -400,6 +489,39 @@ impl SessionTelemetry {
|
||||
duration_ms = %duration.as_millis(),
|
||||
success = success_str,
|
||||
error.message = error,
|
||||
auth.connection_reused = connection_reused,
|
||||
},
|
||||
log: {},
|
||||
trace: {},
|
||||
);
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub fn record_auth_recovery(
|
||||
&self,
|
||||
mode: &str,
|
||||
step: &str,
|
||||
outcome: &str,
|
||||
request_id: Option<&str>,
|
||||
cf_ray: Option<&str>,
|
||||
auth_error: Option<&str>,
|
||||
auth_error_code: Option<&str>,
|
||||
recovery_reason: Option<&str>,
|
||||
auth_state_changed: Option<bool>,
|
||||
) {
|
||||
log_and_trace_event!(
|
||||
self,
|
||||
common: {
|
||||
event.name = "codex.auth_recovery",
|
||||
auth.mode = mode,
|
||||
auth.step = step,
|
||||
auth.outcome = outcome,
|
||||
auth.request_id = request_id,
|
||||
auth.cf_ray = cf_ray,
|
||||
auth.error = auth_error,
|
||||
auth.error_code = auth_error_code,
|
||||
auth.recovery_reason = recovery_reason,
|
||||
auth.state_changed = auth_state_changed,
|
||||
},
|
||||
log: {},
|
||||
trace: {},
|
||||
|
||||
@@ -297,3 +297,462 @@ fn otel_export_routing_policy_routes_tool_result_log_and_trace_events() {
|
||||
assert!(!tool_trace_attrs.contains_key("mcp_server"));
|
||||
assert!(!tool_trace_attrs.contains_key("mcp_server_origin"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn otel_export_routing_policy_routes_auth_recovery_log_and_trace_events() {
|
||||
let log_exporter = InMemoryLogExporter::default();
|
||||
let logger_provider = SdkLoggerProvider::builder()
|
||||
.with_simple_exporter(log_exporter.clone())
|
||||
.build();
|
||||
let span_exporter = InMemorySpanExporter::default();
|
||||
let tracer_provider = SdkTracerProvider::builder()
|
||||
.with_simple_exporter(span_exporter.clone())
|
||||
.build();
|
||||
let tracer = tracer_provider.tracer("sink-split-test");
|
||||
|
||||
let subscriber = tracing_subscriber::registry()
|
||||
.with(
|
||||
opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge::new(
|
||||
&logger_provider,
|
||||
)
|
||||
.with_filter(filter_fn(OtelProvider::log_export_filter)),
|
||||
)
|
||||
.with(
|
||||
tracing_opentelemetry::layer()
|
||||
.with_tracer(tracer)
|
||||
.with_filter(filter_fn(OtelProvider::trace_export_filter)),
|
||||
);
|
||||
|
||||
tracing::subscriber::with_default(subscriber, || {
|
||||
tracing::callsite::rebuild_interest_cache();
|
||||
let manager = SessionTelemetry::new(
|
||||
ThreadId::new(),
|
||||
"gpt-5.1",
|
||||
"gpt-5.1",
|
||||
Some("account-id".to_string()),
|
||||
Some("engineer@example.com".to_string()),
|
||||
Some(TelemetryAuthMode::Chatgpt),
|
||||
"codex_exec".to_string(),
|
||||
true,
|
||||
"tty".to_string(),
|
||||
SessionSource::Cli,
|
||||
);
|
||||
let root_span = tracing::info_span!("root");
|
||||
let _root_guard = root_span.enter();
|
||||
manager.record_auth_recovery(
|
||||
"managed",
|
||||
"reload",
|
||||
"recovery_succeeded",
|
||||
Some("req-401"),
|
||||
Some("ray-401"),
|
||||
Some("missing_authorization_header"),
|
||||
Some("token_expired"),
|
||||
None,
|
||||
Some(true),
|
||||
);
|
||||
});
|
||||
|
||||
logger_provider.force_flush().expect("flush logs");
|
||||
tracer_provider.force_flush().expect("flush traces");
|
||||
|
||||
let logs = log_exporter.get_emitted_logs().expect("log export");
|
||||
let recovery_log = find_log_by_event_name(&logs, "codex.auth_recovery");
|
||||
let recovery_log_attrs = log_attributes(&recovery_log.record);
|
||||
assert_eq!(
|
||||
recovery_log_attrs.get("auth.mode").map(String::as_str),
|
||||
Some("managed")
|
||||
);
|
||||
assert_eq!(
|
||||
recovery_log_attrs.get("auth.step").map(String::as_str),
|
||||
Some("reload")
|
||||
);
|
||||
assert_eq!(
|
||||
recovery_log_attrs.get("auth.outcome").map(String::as_str),
|
||||
Some("recovery_succeeded")
|
||||
);
|
||||
assert_eq!(
|
||||
recovery_log_attrs
|
||||
.get("auth.request_id")
|
||||
.map(String::as_str),
|
||||
Some("req-401")
|
||||
);
|
||||
assert_eq!(
|
||||
recovery_log_attrs.get("auth.cf_ray").map(String::as_str),
|
||||
Some("ray-401")
|
||||
);
|
||||
assert_eq!(
|
||||
recovery_log_attrs.get("auth.error").map(String::as_str),
|
||||
Some("missing_authorization_header")
|
||||
);
|
||||
assert_eq!(
|
||||
recovery_log_attrs
|
||||
.get("auth.error_code")
|
||||
.map(String::as_str),
|
||||
Some("token_expired")
|
||||
);
|
||||
assert_eq!(
|
||||
recovery_log_attrs
|
||||
.get("auth.state_changed")
|
||||
.map(String::as_str),
|
||||
Some("true")
|
||||
);
|
||||
|
||||
let spans = span_exporter.get_finished_spans().expect("span export");
|
||||
assert_eq!(spans.len(), 1);
|
||||
let span_events = &spans[0].events.events;
|
||||
assert_eq!(span_events.len(), 1);
|
||||
|
||||
let recovery_trace_event = find_span_event_by_name_attr(span_events, "codex.auth_recovery");
|
||||
let recovery_trace_attrs = span_event_attributes(recovery_trace_event);
|
||||
assert_eq!(
|
||||
recovery_trace_attrs.get("auth.mode").map(String::as_str),
|
||||
Some("managed")
|
||||
);
|
||||
assert_eq!(
|
||||
recovery_trace_attrs.get("auth.step").map(String::as_str),
|
||||
Some("reload")
|
||||
);
|
||||
assert_eq!(
|
||||
recovery_trace_attrs.get("auth.outcome").map(String::as_str),
|
||||
Some("recovery_succeeded")
|
||||
);
|
||||
assert_eq!(
|
||||
recovery_trace_attrs
|
||||
.get("auth.request_id")
|
||||
.map(String::as_str),
|
||||
Some("req-401")
|
||||
);
|
||||
assert_eq!(
|
||||
recovery_trace_attrs.get("auth.cf_ray").map(String::as_str),
|
||||
Some("ray-401")
|
||||
);
|
||||
assert_eq!(
|
||||
recovery_trace_attrs.get("auth.error").map(String::as_str),
|
||||
Some("missing_authorization_header")
|
||||
);
|
||||
assert_eq!(
|
||||
recovery_trace_attrs
|
||||
.get("auth.error_code")
|
||||
.map(String::as_str),
|
||||
Some("token_expired")
|
||||
);
|
||||
assert_eq!(
|
||||
recovery_trace_attrs
|
||||
.get("auth.state_changed")
|
||||
.map(String::as_str),
|
||||
Some("true")
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn otel_export_routing_policy_routes_api_request_auth_observability() {
|
||||
let log_exporter = InMemoryLogExporter::default();
|
||||
let logger_provider = SdkLoggerProvider::builder()
|
||||
.with_simple_exporter(log_exporter.clone())
|
||||
.build();
|
||||
let span_exporter = InMemorySpanExporter::default();
|
||||
let tracer_provider = SdkTracerProvider::builder()
|
||||
.with_simple_exporter(span_exporter.clone())
|
||||
.build();
|
||||
let tracer = tracer_provider.tracer("sink-split-test");
|
||||
|
||||
let subscriber = tracing_subscriber::registry()
|
||||
.with(
|
||||
opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge::new(
|
||||
&logger_provider,
|
||||
)
|
||||
.with_filter(filter_fn(OtelProvider::log_export_filter)),
|
||||
)
|
||||
.with(
|
||||
tracing_opentelemetry::layer()
|
||||
.with_tracer(tracer)
|
||||
.with_filter(filter_fn(OtelProvider::trace_export_filter)),
|
||||
);
|
||||
|
||||
tracing::subscriber::with_default(subscriber, || {
|
||||
tracing::callsite::rebuild_interest_cache();
|
||||
let manager = SessionTelemetry::new(
|
||||
ThreadId::new(),
|
||||
"gpt-5.1",
|
||||
"gpt-5.1",
|
||||
Some("account-id".to_string()),
|
||||
Some("engineer@example.com".to_string()),
|
||||
Some(TelemetryAuthMode::Chatgpt),
|
||||
"codex_exec".to_string(),
|
||||
true,
|
||||
"tty".to_string(),
|
||||
SessionSource::Cli,
|
||||
);
|
||||
let root_span = tracing::info_span!("root");
|
||||
let _root_guard = root_span.enter();
|
||||
manager.record_api_request(
|
||||
1,
|
||||
Some(401),
|
||||
Some("http 401"),
|
||||
std::time::Duration::from_millis(42),
|
||||
true,
|
||||
Some("authorization"),
|
||||
true,
|
||||
Some("managed"),
|
||||
Some("refresh_token"),
|
||||
"/responses",
|
||||
Some("req-401"),
|
||||
Some("ray-401"),
|
||||
Some("missing_authorization_header"),
|
||||
Some("token_expired"),
|
||||
);
|
||||
});
|
||||
|
||||
logger_provider.force_flush().expect("flush logs");
|
||||
tracer_provider.force_flush().expect("flush traces");
|
||||
|
||||
let logs = log_exporter.get_emitted_logs().expect("log export");
|
||||
let request_log = find_log_by_event_name(&logs, "codex.api_request");
|
||||
let request_log_attrs = log_attributes(&request_log.record);
|
||||
assert_eq!(
|
||||
request_log_attrs
|
||||
.get("auth.header_attached")
|
||||
.map(String::as_str),
|
||||
Some("true")
|
||||
);
|
||||
assert_eq!(
|
||||
request_log_attrs
|
||||
.get("auth.header_name")
|
||||
.map(String::as_str),
|
||||
Some("authorization")
|
||||
);
|
||||
assert_eq!(
|
||||
request_log_attrs
|
||||
.get("auth.retry_after_unauthorized")
|
||||
.map(String::as_str),
|
||||
Some("true")
|
||||
);
|
||||
assert_eq!(
|
||||
request_log_attrs
|
||||
.get("auth.recovery_mode")
|
||||
.map(String::as_str),
|
||||
Some("managed")
|
||||
);
|
||||
assert_eq!(
|
||||
request_log_attrs
|
||||
.get("auth.recovery_phase")
|
||||
.map(String::as_str),
|
||||
Some("refresh_token")
|
||||
);
|
||||
assert_eq!(
|
||||
request_log_attrs.get("endpoint").map(String::as_str),
|
||||
Some("/responses")
|
||||
);
|
||||
assert_eq!(
|
||||
request_log_attrs.get("auth.error").map(String::as_str),
|
||||
Some("missing_authorization_header")
|
||||
);
|
||||
|
||||
let spans = span_exporter.get_finished_spans().expect("span export");
|
||||
let request_trace_event =
|
||||
find_span_event_by_name_attr(&spans[0].events.events, "codex.api_request");
|
||||
let request_trace_attrs = span_event_attributes(request_trace_event);
|
||||
assert_eq!(
|
||||
request_trace_attrs
|
||||
.get("auth.header_attached")
|
||||
.map(String::as_str),
|
||||
Some("true")
|
||||
);
|
||||
assert_eq!(
|
||||
request_trace_attrs
|
||||
.get("auth.header_name")
|
||||
.map(String::as_str),
|
||||
Some("authorization")
|
||||
);
|
||||
assert_eq!(
|
||||
request_trace_attrs
|
||||
.get("auth.retry_after_unauthorized")
|
||||
.map(String::as_str),
|
||||
Some("true")
|
||||
);
|
||||
assert_eq!(
|
||||
request_trace_attrs.get("endpoint").map(String::as_str),
|
||||
Some("/responses")
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn otel_export_routing_policy_routes_websocket_connect_auth_observability() {
|
||||
let log_exporter = InMemoryLogExporter::default();
|
||||
let logger_provider = SdkLoggerProvider::builder()
|
||||
.with_simple_exporter(log_exporter.clone())
|
||||
.build();
|
||||
let span_exporter = InMemorySpanExporter::default();
|
||||
let tracer_provider = SdkTracerProvider::builder()
|
||||
.with_simple_exporter(span_exporter.clone())
|
||||
.build();
|
||||
let tracer = tracer_provider.tracer("sink-split-test");
|
||||
|
||||
let subscriber = tracing_subscriber::registry()
|
||||
.with(
|
||||
opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge::new(
|
||||
&logger_provider,
|
||||
)
|
||||
.with_filter(filter_fn(OtelProvider::log_export_filter)),
|
||||
)
|
||||
.with(
|
||||
tracing_opentelemetry::layer()
|
||||
.with_tracer(tracer)
|
||||
.with_filter(filter_fn(OtelProvider::trace_export_filter)),
|
||||
);
|
||||
|
||||
tracing::subscriber::with_default(subscriber, || {
|
||||
tracing::callsite::rebuild_interest_cache();
|
||||
let manager = SessionTelemetry::new(
|
||||
ThreadId::new(),
|
||||
"gpt-5.1",
|
||||
"gpt-5.1",
|
||||
Some("account-id".to_string()),
|
||||
Some("engineer@example.com".to_string()),
|
||||
Some(TelemetryAuthMode::Chatgpt),
|
||||
"codex_exec".to_string(),
|
||||
true,
|
||||
"tty".to_string(),
|
||||
SessionSource::Cli,
|
||||
);
|
||||
let root_span = tracing::info_span!("root");
|
||||
let _root_guard = root_span.enter();
|
||||
manager.record_websocket_connect(
|
||||
std::time::Duration::from_millis(17),
|
||||
Some(401),
|
||||
Some("http 401"),
|
||||
true,
|
||||
Some("authorization"),
|
||||
true,
|
||||
Some("managed"),
|
||||
Some("reload"),
|
||||
"/responses",
|
||||
false,
|
||||
Some("req-ws-401"),
|
||||
Some("ray-ws-401"),
|
||||
Some("missing_authorization_header"),
|
||||
Some("token_expired"),
|
||||
);
|
||||
});
|
||||
|
||||
logger_provider.force_flush().expect("flush logs");
|
||||
tracer_provider.force_flush().expect("flush traces");
|
||||
|
||||
let logs = log_exporter.get_emitted_logs().expect("log export");
|
||||
let connect_log = find_log_by_event_name(&logs, "codex.websocket_connect");
|
||||
let connect_log_attrs = log_attributes(&connect_log.record);
|
||||
assert_eq!(
|
||||
connect_log_attrs
|
||||
.get("auth.header_attached")
|
||||
.map(String::as_str),
|
||||
Some("true")
|
||||
);
|
||||
assert_eq!(
|
||||
connect_log_attrs
|
||||
.get("auth.header_name")
|
||||
.map(String::as_str),
|
||||
Some("authorization")
|
||||
);
|
||||
assert_eq!(
|
||||
connect_log_attrs.get("auth.error").map(String::as_str),
|
||||
Some("missing_authorization_header")
|
||||
);
|
||||
assert_eq!(
|
||||
connect_log_attrs.get("endpoint").map(String::as_str),
|
||||
Some("/responses")
|
||||
);
|
||||
assert_eq!(
|
||||
connect_log_attrs
|
||||
.get("auth.connection_reused")
|
||||
.map(String::as_str),
|
||||
Some("false")
|
||||
);
|
||||
|
||||
let spans = span_exporter.get_finished_spans().expect("span export");
|
||||
let connect_trace_event =
|
||||
find_span_event_by_name_attr(&spans[0].events.events, "codex.websocket_connect");
|
||||
let connect_trace_attrs = span_event_attributes(connect_trace_event);
|
||||
assert_eq!(
|
||||
connect_trace_attrs
|
||||
.get("auth.recovery_phase")
|
||||
.map(String::as_str),
|
||||
Some("reload")
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn otel_export_routing_policy_routes_websocket_request_transport_observability() {
|
||||
let log_exporter = InMemoryLogExporter::default();
|
||||
let logger_provider = SdkLoggerProvider::builder()
|
||||
.with_simple_exporter(log_exporter.clone())
|
||||
.build();
|
||||
let span_exporter = InMemorySpanExporter::default();
|
||||
let tracer_provider = SdkTracerProvider::builder()
|
||||
.with_simple_exporter(span_exporter.clone())
|
||||
.build();
|
||||
let tracer = tracer_provider.tracer("sink-split-test");
|
||||
|
||||
let subscriber = tracing_subscriber::registry()
|
||||
.with(
|
||||
opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge::new(
|
||||
&logger_provider,
|
||||
)
|
||||
.with_filter(filter_fn(OtelProvider::log_export_filter)),
|
||||
)
|
||||
.with(
|
||||
tracing_opentelemetry::layer()
|
||||
.with_tracer(tracer)
|
||||
.with_filter(filter_fn(OtelProvider::trace_export_filter)),
|
||||
);
|
||||
|
||||
tracing::subscriber::with_default(subscriber, || {
|
||||
tracing::callsite::rebuild_interest_cache();
|
||||
let manager = SessionTelemetry::new(
|
||||
ThreadId::new(),
|
||||
"gpt-5.1",
|
||||
"gpt-5.1",
|
||||
Some("account-id".to_string()),
|
||||
Some("engineer@example.com".to_string()),
|
||||
Some(TelemetryAuthMode::Chatgpt),
|
||||
"codex_exec".to_string(),
|
||||
true,
|
||||
"tty".to_string(),
|
||||
SessionSource::Cli,
|
||||
);
|
||||
let root_span = tracing::info_span!("root");
|
||||
let _root_guard = root_span.enter();
|
||||
manager.record_websocket_request(
|
||||
std::time::Duration::from_millis(23),
|
||||
Some("stream error"),
|
||||
true,
|
||||
);
|
||||
});
|
||||
|
||||
logger_provider.force_flush().expect("flush logs");
|
||||
tracer_provider.force_flush().expect("flush traces");
|
||||
|
||||
let logs = log_exporter.get_emitted_logs().expect("log export");
|
||||
let request_log = find_log_by_event_name(&logs, "codex.websocket_request");
|
||||
let request_log_attrs = log_attributes(&request_log.record);
|
||||
assert_eq!(
|
||||
request_log_attrs
|
||||
.get("auth.connection_reused")
|
||||
.map(String::as_str),
|
||||
Some("true")
|
||||
);
|
||||
assert_eq!(
|
||||
request_log_attrs.get("error.message").map(String::as_str),
|
||||
Some("stream error")
|
||||
);
|
||||
|
||||
let spans = span_exporter.get_finished_spans().expect("span export");
|
||||
let request_trace_event =
|
||||
find_span_event_by_name_attr(&spans[0].events.events, "codex.websocket_request");
|
||||
let request_trace_attrs = span_event_attributes(request_trace_event);
|
||||
assert_eq!(
|
||||
request_trace_attrs
|
||||
.get("auth.connection_reused")
|
||||
.map(String::as_str),
|
||||
Some("true")
|
||||
);
|
||||
}
|
||||
|
||||
@@ -47,8 +47,23 @@ fn runtime_metrics_summary_collects_tool_api_and_streaming_metrics() -> Result<(
|
||||
None,
|
||||
None,
|
||||
);
|
||||
manager.record_api_request(1, Some(200), None, Duration::from_millis(300));
|
||||
manager.record_websocket_request(Duration::from_millis(400), None);
|
||||
manager.record_api_request(
|
||||
1,
|
||||
Some(200),
|
||||
None,
|
||||
Duration::from_millis(300),
|
||||
false,
|
||||
None,
|
||||
false,
|
||||
None,
|
||||
None,
|
||||
"/responses",
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
);
|
||||
manager.record_websocket_request(Duration::from_millis(400), None, false);
|
||||
let sse_response: std::result::Result<
|
||||
Option<std::result::Result<StreamEvent, eventsource_stream::EventStreamError<&str>>>,
|
||||
tokio::time::error::Elapsed,
|
||||
|
||||
@@ -700,6 +700,7 @@ impl RmcpClient {
|
||||
&self,
|
||||
name: String,
|
||||
arguments: Option<serde_json::Value>,
|
||||
meta: Option<serde_json::Value>,
|
||||
timeout: Option<Duration>,
|
||||
) -> Result<CallToolResult> {
|
||||
self.refresh_oauth_if_needed().await;
|
||||
@@ -712,8 +713,17 @@ impl RmcpClient {
|
||||
}
|
||||
None => None,
|
||||
};
|
||||
let meta = match meta {
|
||||
Some(Value::Object(map)) => Some(rmcp::model::Meta(map)),
|
||||
Some(other) => {
|
||||
return Err(anyhow!(
|
||||
"MCP tool request _meta must be a JSON object, got {other}"
|
||||
));
|
||||
}
|
||||
None => None,
|
||||
};
|
||||
let rmcp_params = CallToolRequestParams {
|
||||
meta: None,
|
||||
meta,
|
||||
name: name.into(),
|
||||
arguments,
|
||||
task: None,
|
||||
|
||||
@@ -105,6 +105,7 @@ async fn call_echo_tool(client: &RmcpClient, message: &str) -> anyhow::Result<Ca
|
||||
.call_tool(
|
||||
"echo".to_string(),
|
||||
Some(json!({ "message": message })),
|
||||
None,
|
||||
Some(Duration::from_secs(5)),
|
||||
)
|
||||
.await
|
||||
|
||||
Reference in New Issue
Block a user