Compare commits

...

3 Commits

Author SHA1 Message Date
aibrahim-oai
0d07d794ca Update models.json 2026-03-15 08:09:29 +00:00
Matthew Zeng
49edf311ac [apps] Add tool call meta. (#14647)
- [x] Add resource_uri and other things to _meta to shortcut resource
lookup and speed things up.
2026-03-14 22:24:13 -07:00
Colin Young
d692b74007 Add auth 401 observability to client bug reports (#14611)
CXC-392

  [With
  401](https://openai.sentry.io/issues/7333870443/?project=4510195390611458&query=019ce8f8-560c-7f10-a00a-c59553740674&referrer=issue-stream)
  <img width="1909" height="555" alt="401 auth tags in Sentry"
  src="https://github.com/user-attachments/assets/412ea950-61c4-4780-9697-15c270971ee3"
  />


  - auth_401_*: preserved facts from the latest unauthorized response snapshot
  - auth_*: latest auth-related facts from the latest request attempt
  - auth_recovery_*: unauthorized recovery state and follow-up result


  Without 401
  <img width="1917" height="522" alt="happy-path auth tags in Sentry"
  src="https://github.com/user-attachments/assets/3381ed28-8022-43b0-b6c0-623a630e679f"
  />

  ###### Summary
  - Add client-visible 401 diagnostics for auth attachment, upstream auth classification, and 401 request id / cf-ray correlation.
  - Record unauthorized recovery mode, phase, outcome, and retry/follow-up status without changing auth behavior.
  - Surface the highest-signal auth and recovery fields on uploaded client bug reports so they are usable in Sentry.
  - Preserve original unauthorized evidence under `auth_401_*` while keeping follow-up result tags separate.

  ###### Rationale (from spec findings)
  - The dominant bucket needed proof of whether the client attached auth before send or upstream still classified the request as missing auth.
  - Client uploads needed to show whether unauthorized recovery ran and what the client tried next.
  - Request id and cf-ray needed to be preserved on the unauthorized response so server-side correlation is immediate.
  - The bug-report path needed the same auth evidence as the request telemetry path, otherwise the observability would not be operationally useful.

  ###### Scope
  - Add auth 401 and unauthorized-recovery observability in `codex-rs/core`, `codex-rs/codex-api`, and `codex-rs/otel`, including feedback-tag surfacing.
  - Keep auth semantics, refresh behavior, retry behavior, endpoint classification, and geo-denial follow-up work out of this PR.

  ###### Trade-offs
  - This exports only safe auth evidence: header presence/name, upstream auth classification, request ids, and recovery state. It does not export token values or raw upstream bodies.
  - This keeps websocket connection reuse as a transport clue because it can help distinguish stale reused sessions from fresh reconnects.
  - Misroute/base-url classification and geo-denial are intentionally deferred to a separate follow-up PR so this review stays focused on the dominant auth 401 bucket.

  ###### Client follow-up
  - PR 2 will add misroute/provider and geo-denial observability plus the matching feedback-tag surfacing.
  - A separate host/app-server PR should log auth-decision inputs so pre-send host auth state can be correlated with client request evidence.
  - `device_id` remains intentionally separate until there is a safe existing source on the feedback upload path.

  ###### Testing
  - `cargo test -p codex-core refresh_available_models_sorts_by_priority`
  - `cargo test -p codex-core emit_feedback_request_tags_`
  - `cargo test -p codex-core emit_feedback_auth_recovery_tags_`
  - `cargo test -p codex-core auth_request_telemetry_context_tracks_attached_auth_and_retry_phase`
  - `cargo test -p codex-core extract_response_debug_context_decodes_identity_headers`
  - `cargo test -p codex-core identity_auth_details`
  - `cargo test -p codex-core telemetry_error_messages_preserve_non_http_details`
  - `cargo test -p codex-core --all-features --no-run`
  - `cargo test -p codex-otel otel_export_routing_policy_routes_api_request_auth_observability`
  - `cargo test -p codex-otel otel_export_routing_policy_routes_websocket_connect_auth_observability`
  - `cargo test -p codex-otel otel_export_routing_policy_routes_websocket_request_transport_observability`
2026-03-14 15:38:51 -07:00
38 changed files with 2489 additions and 508 deletions

View File

@@ -399,7 +399,7 @@ impl CloudRequirementsService {
"Cloud requirements request was unauthorized; attempting auth recovery"
);
match auth_recovery.next().await {
Ok(()) => {
Ok(_) => {
let Some(refreshed_auth) = self.auth_manager.auth().await else {
tracing::error!(
"Auth recovery succeeded but no auth is available for cloud requirements"

View File

@@ -214,6 +214,7 @@ impl ResponsesWebsocketConnection {
pub async fn stream_request(
&self,
request: ResponsesWsRequest,
connection_reused: bool,
) -> Result<ResponseStream, ApiError> {
let (tx_event, rx_event) =
mpsc::channel::<std::result::Result<ResponseEvent, ApiError>>(1600);
@@ -258,6 +259,7 @@ impl ResponsesWebsocketConnection {
request_body,
idle_timeout,
telemetry,
connection_reused,
)
.await
};
@@ -534,6 +536,7 @@ async fn run_websocket_response_stream(
request_body: Value,
idle_timeout: Duration,
telemetry: Option<Arc<dyn WebsocketTelemetry>>,
connection_reused: bool,
) -> Result<(), ApiError> {
let mut last_server_model: Option<String> = None;
let request_text = match serde_json::to_string(&request_body) {
@@ -553,7 +556,11 @@ async fn run_websocket_response_stream(
.map_err(|err| ApiError::Stream(format!("failed to send websocket request: {err}")));
if let Some(t) = telemetry.as_ref() {
t.on_ws_request(request_start.elapsed(), result.as_ref().err());
t.on_ws_request(
request_start.elapsed(),
result.as_ref().err(),
connection_reused,
);
}
result?;

View File

@@ -33,7 +33,7 @@ pub trait SseTelemetry: Send + Sync {
/// Telemetry for Responses WebSocket transport.
pub trait WebsocketTelemetry: Send + Sync {
fn on_ws_request(&self, duration: Duration, error: Option<&ApiError>);
fn on_ws_request(&self, duration: Duration, error: Option<&ApiError>, connection_reused: bool);
fn on_ws_event(
&self,

View File

@@ -338,9 +338,6 @@
"apps": {
"type": "boolean"
},
"apps_mcp_gateway": {
"type": "boolean"
},
"artifact": {
"type": "boolean"
},
@@ -1890,9 +1887,6 @@
"apps": {
"type": "boolean"
},
"apps_mcp_gateway": {
"type": "boolean"
},
"artifact": {
"type": "boolean"
},

File diff suppressed because one or more lines are too long

View File

@@ -1,3 +1,4 @@
use base64::Engine;
use chrono::DateTime;
use chrono::Utc;
use codex_api::AuthProvider as ApiAuthProvider;
@@ -7,6 +8,7 @@ use codex_api::rate_limits::parse_promo_message;
use codex_api::rate_limits::parse_rate_limit_for_limit;
use http::HeaderMap;
use serde::Deserialize;
use serde_json::Value;
use crate::auth::CodexAuth;
use crate::error::CodexErr;
@@ -30,6 +32,8 @@ pub(crate) fn map_api_error(err: ApiError) -> CodexErr {
url: None,
cf_ray: None,
request_id: None,
identity_authorization_error: None,
identity_error_code: None,
}),
ApiError::InvalidRequest { message } => CodexErr::InvalidRequest(message),
ApiError::Transport(transport) => match transport {
@@ -98,6 +102,11 @@ pub(crate) fn map_api_error(err: ApiError) -> CodexErr {
url,
cf_ray: extract_header(headers.as_ref(), CF_RAY_HEADER),
request_id: extract_request_id(headers.as_ref()),
identity_authorization_error: extract_header(
headers.as_ref(),
X_OPENAI_AUTHORIZATION_ERROR_HEADER,
),
identity_error_code: extract_x_error_json_code(headers.as_ref()),
})
}
}
@@ -118,6 +127,8 @@ const ACTIVE_LIMIT_HEADER: &str = "x-codex-active-limit";
const REQUEST_ID_HEADER: &str = "x-request-id";
const OAI_REQUEST_ID_HEADER: &str = "x-oai-request-id";
const CF_RAY_HEADER: &str = "cf-ray";
const X_OPENAI_AUTHORIZATION_ERROR_HEADER: &str = "x-openai-authorization-error";
const X_ERROR_JSON_HEADER: &str = "x-error-json";
#[cfg(test)]
#[path = "api_bridge_tests.rs"]
@@ -140,6 +151,19 @@ fn extract_header(headers: Option<&HeaderMap>, name: &str) -> Option<String> {
})
}
fn extract_x_error_json_code(headers: Option<&HeaderMap>) -> Option<String> {
let encoded = extract_header(headers, X_ERROR_JSON_HEADER)?;
let decoded = base64::engine::general_purpose::STANDARD
.decode(encoded)
.ok()?;
let parsed = serde_json::from_slice::<Value>(&decoded).ok()?;
parsed
.get("error")
.and_then(|error| error.get("code"))
.and_then(Value::as_str)
.map(str::to_string)
}
pub(crate) fn auth_provider_from_auth(
auth: Option<CodexAuth>,
provider: &ModelProviderInfo,
@@ -191,6 +215,26 @@ pub(crate) struct CoreAuthProvider {
account_id: Option<String>,
}
impl CoreAuthProvider {
pub(crate) fn auth_header_attached(&self) -> bool {
self.token
.as_ref()
.is_some_and(|token| http::HeaderValue::from_str(&format!("Bearer {token}")).is_ok())
}
pub(crate) fn auth_header_name(&self) -> Option<&'static str> {
self.auth_header_attached().then_some("authorization")
}
#[cfg(test)]
pub(crate) fn for_test(token: Option<&str>, account_id: Option<&str>) -> Self {
Self {
token: token.map(str::to_string),
account_id: account_id.map(str::to_string),
}
}
}
impl ApiAuthProvider for CoreAuthProvider {
fn bearer_token(&self) -> Option<String> {
self.token.clone()

View File

@@ -1,4 +1,5 @@
use super::*;
use base64::Engine;
use pretty_assertions::assert_eq;
#[test]
@@ -94,3 +95,49 @@ fn map_api_error_does_not_fallback_limit_name_to_limit_id() {
None
);
}
#[test]
fn map_api_error_extracts_identity_auth_details_from_headers() {
let mut headers = HeaderMap::new();
headers.insert(REQUEST_ID_HEADER, http::HeaderValue::from_static("req-401"));
headers.insert(CF_RAY_HEADER, http::HeaderValue::from_static("ray-401"));
headers.insert(
X_OPENAI_AUTHORIZATION_ERROR_HEADER,
http::HeaderValue::from_static("missing_authorization_header"),
);
let x_error_json =
base64::engine::general_purpose::STANDARD.encode(r#"{"error":{"code":"token_expired"}}"#);
headers.insert(
X_ERROR_JSON_HEADER,
http::HeaderValue::from_str(&x_error_json).expect("valid x-error-json header"),
);
let err = map_api_error(ApiError::Transport(TransportError::Http {
status: http::StatusCode::UNAUTHORIZED,
url: Some("https://chatgpt.com/backend-api/codex/models".to_string()),
headers: Some(headers),
body: Some(r#"{"detail":"Unauthorized"}"#.to_string()),
}));
let CodexErr::UnexpectedStatus(err) = err else {
panic!("expected CodexErr::UnexpectedStatus, got {err:?}");
};
assert_eq!(err.request_id.as_deref(), Some("req-401"));
assert_eq!(err.cf_ray.as_deref(), Some("ray-401"));
assert_eq!(
err.identity_authorization_error.as_deref(),
Some("missing_authorization_header")
);
assert_eq!(err.identity_error_code.as_deref(), Some("token_expired"));
}
#[test]
fn core_auth_provider_reports_when_auth_header_will_attach() {
let auth = CoreAuthProvider {
token: Some("access-token".to_string()),
account_id: None,
};
assert!(auth.auth_header_attached());
assert_eq!(auth.auth_header_name(), Some("authorization"));
}

View File

@@ -4,7 +4,7 @@ use codex_protocol::protocol::APPS_INSTRUCTIONS_OPEN_TAG;
pub(crate) fn render_apps_section() -> String {
let body = format!(
"## Apps\nApps are mentioned in user messages in the format `[$app-name](app://{{connector_id}})`.\nAn app is equivalent to a set of MCP tools within the `{CODEX_APPS_MCP_SERVER_NAME}` MCP.\nWhen you see an app mention, the app's MCP tools are either available tools in the `{CODEX_APPS_MCP_SERVER_NAME}` MCP server, or the tools do not exist because the user has not installed the app.\nDo not additionally call list_mcp_resources for apps that are already mentioned."
"## Apps (Connectors)\nApps (Connectors) can be explicitly triggered in user messages in the format `[$app-name](app://{{connector_id}})`. Apps can also be implicitly triggered as long as the context suggests usage of available apps, the available apps will be listed by the `tool_search` tool.\nAn app is equivalent to a set of MCP tools within the `{CODEX_APPS_MCP_SERVER_NAME}` MCP.\nAn installed app's MCP tools are either provided to you already, or can be lazy-loaded through the `tool_search` tool.\nDo not additionally call list_mcp_resources or list_mcp_resource_templates for apps."
);
format!("{APPS_INSTRUCTIONS_OPEN_TAG}\n{body}\n{APPS_INSTRUCTIONS_CLOSE_TAG}")
}

View File

@@ -874,6 +874,17 @@ pub struct UnauthorizedRecovery {
mode: UnauthorizedRecoveryMode,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub struct UnauthorizedRecoveryStepResult {
auth_state_changed: Option<bool>,
}
impl UnauthorizedRecoveryStepResult {
pub fn auth_state_changed(&self) -> Option<bool> {
self.auth_state_changed
}
}
impl UnauthorizedRecovery {
fn new(manager: Arc<AuthManager>) -> Self {
let cached_auth = manager.auth_cached();
@@ -917,7 +928,46 @@ impl UnauthorizedRecovery {
!matches!(self.step, UnauthorizedRecoveryStep::Done)
}
pub async fn next(&mut self) -> Result<(), RefreshTokenError> {
pub fn unavailable_reason(&self) -> &'static str {
if !self
.manager
.auth_cached()
.as_ref()
.is_some_and(CodexAuth::is_chatgpt_auth)
{
return "not_chatgpt_auth";
}
if self.mode == UnauthorizedRecoveryMode::External
&& !self.manager.has_external_auth_refresher()
{
return "no_external_refresher";
}
if matches!(self.step, UnauthorizedRecoveryStep::Done) {
return "recovery_exhausted";
}
"ready"
}
pub fn mode_name(&self) -> &'static str {
match self.mode {
UnauthorizedRecoveryMode::Managed => "managed",
UnauthorizedRecoveryMode::External => "external",
}
}
pub fn step_name(&self) -> &'static str {
match self.step {
UnauthorizedRecoveryStep::Reload => "reload",
UnauthorizedRecoveryStep::RefreshToken => "refresh_token",
UnauthorizedRecoveryStep::ExternalRefresh => "external_refresh",
UnauthorizedRecoveryStep::Done => "done",
}
}
pub async fn next(&mut self) -> Result<UnauthorizedRecoveryStepResult, RefreshTokenError> {
if !self.has_next() {
return Err(RefreshTokenError::Permanent(RefreshTokenFailedError::new(
RefreshTokenFailedReason::Other,
@@ -931,8 +981,17 @@ impl UnauthorizedRecovery {
.manager
.reload_if_account_id_matches(self.expected_account_id.as_deref())
{
ReloadOutcome::ReloadedChanged | ReloadOutcome::ReloadedNoChange => {
ReloadOutcome::ReloadedChanged => {
self.step = UnauthorizedRecoveryStep::RefreshToken;
return Ok(UnauthorizedRecoveryStepResult {
auth_state_changed: Some(true),
});
}
ReloadOutcome::ReloadedNoChange => {
self.step = UnauthorizedRecoveryStep::RefreshToken;
return Ok(UnauthorizedRecoveryStepResult {
auth_state_changed: Some(false),
});
}
ReloadOutcome::Skipped => {
self.step = UnauthorizedRecoveryStep::Done;
@@ -946,16 +1005,24 @@ impl UnauthorizedRecovery {
UnauthorizedRecoveryStep::RefreshToken => {
self.manager.refresh_token_from_authority().await?;
self.step = UnauthorizedRecoveryStep::Done;
return Ok(UnauthorizedRecoveryStepResult {
auth_state_changed: Some(true),
});
}
UnauthorizedRecoveryStep::ExternalRefresh => {
self.manager
.refresh_external_auth(ExternalAuthRefreshReason::Unauthorized)
.await?;
self.step = UnauthorizedRecoveryStep::Done;
return Ok(UnauthorizedRecoveryStepResult {
auth_state_changed: Some(true),
});
}
UnauthorizedRecoveryStep::Done => {}
}
Ok(())
Ok(UnauthorizedRecoveryStepResult {
auth_state_changed: None,
})
}
}

View File

@@ -13,6 +13,7 @@ use codex_protocol::config_types::ForcedLoginMethod;
use pretty_assertions::assert_eq;
use serde::Serialize;
use serde_json::json;
use std::sync::Arc;
use tempfile::tempdir;
#[tokio::test]
@@ -171,6 +172,33 @@ fn logout_removes_auth_file() -> Result<(), std::io::Error> {
Ok(())
}
#[test]
fn unauthorized_recovery_reports_mode_and_step_names() {
let dir = tempdir().unwrap();
let manager = AuthManager::shared(
dir.path().to_path_buf(),
false,
AuthCredentialsStoreMode::File,
);
let managed = UnauthorizedRecovery {
manager: Arc::clone(&manager),
step: UnauthorizedRecoveryStep::Reload,
expected_account_id: None,
mode: UnauthorizedRecoveryMode::Managed,
};
assert_eq!(managed.mode_name(), "managed");
assert_eq!(managed.step_name(), "reload");
let external = UnauthorizedRecovery {
manager,
step: UnauthorizedRecoveryStep::ExternalRefresh,
expected_account_id: None,
mode: UnauthorizedRecoveryMode::External,
};
assert_eq!(external.mode_name(), "external");
assert_eq!(external.step_name(), "external_refresh");
}
struct AuthFileParams {
openai_api_key: Option<String>,
chatgpt_plan_type: Option<String>,

View File

@@ -75,6 +75,7 @@ use http::HeaderValue;
use http::StatusCode as HttpStatusCode;
use reqwest::StatusCode;
use std::time::Duration;
use std::time::Instant;
use tokio::sync::mpsc;
use tokio::sync::oneshot;
use tokio::sync::oneshot::error::TryRecvError;
@@ -85,6 +86,7 @@ use tracing::trace;
use tracing::warn;
use crate::AuthManager;
use crate::auth::AuthMode;
use crate::auth::CodexAuth;
use crate::auth::RefreshTokenError;
use crate::client_common::Prompt;
@@ -97,7 +99,14 @@ use crate::error::Result;
use crate::flags::CODEX_RS_SSE_FIXTURE;
use crate::model_provider_info::ModelProviderInfo;
use crate::model_provider_info::WireApi;
use crate::response_debug_context::extract_response_debug_context;
use crate::response_debug_context::extract_response_debug_context_from_api_error;
use crate::response_debug_context::telemetry_api_error_message;
use crate::response_debug_context::telemetry_transport_error_message;
use crate::tools::spec::create_tools_json_for_responses_api;
use crate::util::FeedbackRequestTags;
use crate::util::emit_feedback_auth_recovery_tags;
use crate::util::emit_feedback_request_tags;
pub const OPENAI_BETA_HEADER: &str = "OpenAI-Beta";
pub const X_CODEX_TURN_STATE_HEADER: &str = "x-codex-turn-state";
@@ -105,7 +114,9 @@ pub const X_CODEX_TURN_METADATA_HEADER: &str = "x-codex-turn-metadata";
pub const X_RESPONSESAPI_INCLUDE_TIMING_METRICS_HEADER: &str =
"x-responsesapi-include-timing-metrics";
const RESPONSES_WEBSOCKETS_V2_BETA_HEADER_VALUE: &str = "responses_websockets=2026-02-06";
const RESPONSES_ENDPOINT: &str = "/responses";
const RESPONSES_COMPACT_ENDPOINT: &str = "/responses/compact";
const MEMORIES_SUMMARIZE_ENDPOINT: &str = "/memories/trace_summarize";
pub fn ws_version_from_features(config: &Config) -> bool {
config
.features
@@ -144,6 +155,17 @@ struct CurrentClientSetup {
api_auth: CoreAuthProvider,
}
#[derive(Clone, Copy)]
struct RequestRouteTelemetry {
endpoint: &'static str,
}
impl RequestRouteTelemetry {
fn for_endpoint(endpoint: &'static str) -> Self {
Self { endpoint }
}
}
/// A session-scoped client for model-provider API calls.
///
/// This holds configuration and state that should be shared across turns within a Codex session
@@ -201,6 +223,23 @@ struct WebsocketSession {
connection: Option<ApiWebSocketConnection>,
last_request: Option<ResponsesApiRequest>,
last_response_rx: Option<oneshot::Receiver<LastResponse>>,
connection_reused: StdMutex<bool>,
}
impl WebsocketSession {
fn set_connection_reused(&self, connection_reused: bool) {
*self
.connection_reused
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner) = connection_reused;
}
fn connection_reused(&self) -> bool {
*self
.connection_reused
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
}
}
enum WebsocketStreamOutcome {
@@ -291,7 +330,15 @@ impl ModelClient {
}
let client_setup = self.current_client_setup().await?;
let transport = ReqwestTransport::new(build_reqwest_client());
let request_telemetry = Self::build_request_telemetry(session_telemetry);
let request_telemetry = Self::build_request_telemetry(
session_telemetry,
AuthRequestTelemetryContext::new(
client_setup.auth.as_ref().map(CodexAuth::auth_mode),
&client_setup.api_auth,
PendingUnauthorizedRetry::default(),
),
RequestRouteTelemetry::for_endpoint(RESPONSES_COMPACT_ENDPOINT),
);
let client =
ApiCompactClient::new(transport, client_setup.api_provider, client_setup.api_auth)
.with_telemetry(Some(request_telemetry));
@@ -351,7 +398,15 @@ impl ModelClient {
let client_setup = self.current_client_setup().await?;
let transport = ReqwestTransport::new(build_reqwest_client());
let request_telemetry = Self::build_request_telemetry(session_telemetry);
let request_telemetry = Self::build_request_telemetry(
session_telemetry,
AuthRequestTelemetryContext::new(
client_setup.auth.as_ref().map(CodexAuth::auth_mode),
&client_setup.api_auth,
PendingUnauthorizedRetry::default(),
),
RequestRouteTelemetry::for_endpoint(MEMORIES_SUMMARIZE_ENDPOINT),
);
let client =
ApiMemoriesClient::new(transport, client_setup.api_provider, client_setup.api_auth)
.with_telemetry(Some(request_telemetry));
@@ -391,8 +446,16 @@ impl ModelClient {
}
/// Builds request telemetry for unary API calls (e.g., Compact endpoint).
fn build_request_telemetry(session_telemetry: &SessionTelemetry) -> Arc<dyn RequestTelemetry> {
let telemetry = Arc::new(ApiTelemetry::new(session_telemetry.clone()));
fn build_request_telemetry(
session_telemetry: &SessionTelemetry,
auth_context: AuthRequestTelemetryContext,
request_route_telemetry: RequestRouteTelemetry,
) -> Arc<dyn RequestTelemetry> {
let telemetry = Arc::new(ApiTelemetry::new(
session_telemetry.clone(),
auth_context,
request_route_telemetry,
));
let request_telemetry: Arc<dyn RequestTelemetry> = telemetry;
request_telemetry
}
@@ -458,6 +521,7 @@ impl ModelClient {
///
/// Both startup prewarm and in-turn `needs_new` reconnects call this path so handshake
/// behavior remains consistent across both flows.
#[allow(clippy::too_many_arguments)]
async fn connect_websocket(
&self,
session_telemetry: &SessionTelemetry,
@@ -465,17 +529,69 @@ impl ModelClient {
api_auth: CoreAuthProvider,
turn_state: Option<Arc<OnceLock<String>>>,
turn_metadata_header: Option<&str>,
auth_context: AuthRequestTelemetryContext,
request_route_telemetry: RequestRouteTelemetry,
) -> std::result::Result<ApiWebSocketConnection, ApiError> {
let headers = self.build_websocket_headers(turn_state.as_ref(), turn_metadata_header);
let websocket_telemetry = ModelClientSession::build_websocket_telemetry(session_telemetry);
ApiWebSocketResponsesClient::new(api_provider, api_auth)
let websocket_telemetry = ModelClientSession::build_websocket_telemetry(
session_telemetry,
auth_context,
request_route_telemetry,
);
let start = Instant::now();
let result = ApiWebSocketResponsesClient::new(api_provider, api_auth)
.connect(
headers,
crate::default_client::default_headers(),
turn_state,
Some(websocket_telemetry),
)
.await
.await;
let error_message = result.as_ref().err().map(telemetry_api_error_message);
let response_debug = result
.as_ref()
.err()
.map(extract_response_debug_context_from_api_error)
.unwrap_or_default();
let status = result.as_ref().err().and_then(api_error_http_status);
session_telemetry.record_websocket_connect(
start.elapsed(),
status,
error_message.as_deref(),
auth_context.auth_header_attached,
auth_context.auth_header_name,
auth_context.retry_after_unauthorized,
auth_context.recovery_mode,
auth_context.recovery_phase,
request_route_telemetry.endpoint,
false,
response_debug.request_id.as_deref(),
response_debug.cf_ray.as_deref(),
response_debug.auth_error.as_deref(),
response_debug.auth_error_code.as_deref(),
);
emit_feedback_request_tags(&FeedbackRequestTags {
endpoint: request_route_telemetry.endpoint,
auth_header_attached: auth_context.auth_header_attached,
auth_header_name: auth_context.auth_header_name,
auth_mode: auth_context.auth_mode,
auth_retry_after_unauthorized: Some(auth_context.retry_after_unauthorized),
auth_recovery_mode: auth_context.recovery_mode,
auth_recovery_phase: auth_context.recovery_phase,
auth_connection_reused: Some(false),
auth_request_id: response_debug.request_id.as_deref(),
auth_cf_ray: response_debug.cf_ray.as_deref(),
auth_error: response_debug.auth_error.as_deref(),
auth_error_code: response_debug.auth_error_code.as_deref(),
auth_recovery_followup_success: auth_context
.retry_after_unauthorized
.then_some(result.is_ok()),
auth_recovery_followup_status: auth_context
.retry_after_unauthorized
.then_some(status)
.flatten(),
});
result
}
/// Builds websocket handshake headers for both prewarm and turn-time reconnect.
@@ -718,7 +834,11 @@ impl ModelClientSession {
"failed to build websocket prewarm client setup: {err}"
))
})?;
let auth_context = AuthRequestTelemetryContext::new(
client_setup.auth.as_ref().map(CodexAuth::auth_mode),
&client_setup.api_auth,
PendingUnauthorizedRetry::default(),
);
let connection = self
.client
.connect_websocket(
@@ -727,9 +847,12 @@ impl ModelClientSession {
client_setup.api_auth,
Some(Arc::clone(&self.turn_state)),
None,
auth_context,
RequestRouteTelemetry::for_endpoint(RESPONSES_ENDPOINT),
)
.await?;
self.websocket_session.connection = Some(connection);
self.websocket_session.set_connection_reused(false);
Ok(())
}
/// Returns a websocket connection for this turn.
@@ -742,17 +865,22 @@ impl ModelClientSession {
wire_api = %self.client.state.provider.wire_api,
transport = "responses_websocket",
api.path = "responses",
turn.has_metadata_header = turn_metadata_header.is_some()
turn.has_metadata_header = params.turn_metadata_header.is_some()
)
)]
async fn websocket_connection(
&mut self,
session_telemetry: &SessionTelemetry,
api_provider: codex_api::Provider,
api_auth: CoreAuthProvider,
turn_metadata_header: Option<&str>,
options: &ApiResponsesOptions,
params: WebsocketConnectParams<'_>,
) -> std::result::Result<&ApiWebSocketConnection, ApiError> {
let WebsocketConnectParams {
session_telemetry,
api_provider,
api_auth,
turn_metadata_header,
options,
auth_context,
request_route_telemetry,
} = params;
let needs_new = match self.websocket_session.connection.as_ref() {
Some(conn) => conn.is_closed().await,
None => true,
@@ -773,9 +901,14 @@ impl ModelClientSession {
api_auth,
Some(turn_state),
turn_metadata_header,
auth_context,
request_route_telemetry,
)
.await?;
self.websocket_session.connection = Some(new_conn);
self.websocket_session.set_connection_reused(false);
} else {
self.websocket_session.set_connection_reused(true);
}
self.websocket_session
@@ -840,11 +973,20 @@ impl ModelClientSession {
let mut auth_recovery = auth_manager
.as_ref()
.map(super::auth::AuthManager::unauthorized_recovery);
let mut pending_retry = PendingUnauthorizedRetry::default();
loop {
let client_setup = self.client.current_client_setup().await?;
let transport = ReqwestTransport::new(build_reqwest_client());
let (request_telemetry, sse_telemetry) =
Self::build_streaming_telemetry(session_telemetry);
let request_auth_context = AuthRequestTelemetryContext::new(
client_setup.auth.as_ref().map(CodexAuth::auth_mode),
&client_setup.api_auth,
pending_retry,
);
let (request_telemetry, sse_telemetry) = Self::build_streaming_telemetry(
session_telemetry,
request_auth_context,
RequestRouteTelemetry::for_endpoint(RESPONSES_ENDPOINT),
);
let compression = self.responses_request_compression(client_setup.auth.as_ref());
let options = self.build_responses_options(turn_metadata_header, compression);
@@ -872,7 +1014,14 @@ impl ModelClientSession {
Err(ApiError::Transport(
unauthorized_transport @ TransportError::Http { status, .. },
)) if status == StatusCode::UNAUTHORIZED => {
handle_unauthorized(unauthorized_transport, &mut auth_recovery).await?;
pending_retry = PendingUnauthorizedRetry::from_recovery(
handle_unauthorized(
unauthorized_transport,
&mut auth_recovery,
session_telemetry,
)
.await?,
);
continue;
}
Err(err) => return Err(map_api_error(err)),
@@ -911,8 +1060,14 @@ impl ModelClientSession {
let mut auth_recovery = auth_manager
.as_ref()
.map(super::auth::AuthManager::unauthorized_recovery);
let mut pending_retry = PendingUnauthorizedRetry::default();
loop {
let client_setup = self.client.current_client_setup().await?;
let request_auth_context = AuthRequestTelemetryContext::new(
client_setup.auth.as_ref().map(CodexAuth::auth_mode),
&client_setup.api_auth,
pending_retry,
);
let compression = self.responses_request_compression(client_setup.auth.as_ref());
let options = self.build_responses_options(turn_metadata_header, compression);
@@ -933,13 +1088,17 @@ impl ModelClientSession {
}
match self
.websocket_connection(
.websocket_connection(WebsocketConnectParams {
session_telemetry,
client_setup.api_provider,
client_setup.api_auth,
api_provider: client_setup.api_provider,
api_auth: client_setup.api_auth,
turn_metadata_header,
&options,
)
options: &options,
auth_context: request_auth_context,
request_route_telemetry: RequestRouteTelemetry::for_endpoint(
RESPONSES_ENDPOINT,
),
})
.await
{
Ok(_) => {}
@@ -951,7 +1110,14 @@ impl ModelClientSession {
Err(ApiError::Transport(
unauthorized_transport @ TransportError::Http { status, .. },
)) if status == StatusCode::UNAUTHORIZED => {
handle_unauthorized(unauthorized_transport, &mut auth_recovery).await?;
pending_retry = PendingUnauthorizedRetry::from_recovery(
handle_unauthorized(
unauthorized_transport,
&mut auth_recovery,
session_telemetry,
)
.await?,
);
continue;
}
Err(err) => return Err(map_api_error(err)),
@@ -968,7 +1134,7 @@ impl ModelClientSession {
"websocket connection is unavailable".to_string(),
))
})?
.stream_request(ws_request)
.stream_request(ws_request, self.websocket_session.connection_reused())
.await
.map_err(map_api_error)?;
let (stream, last_request_rx) =
@@ -981,8 +1147,14 @@ impl ModelClientSession {
/// Builds request and SSE telemetry for streaming API calls.
fn build_streaming_telemetry(
session_telemetry: &SessionTelemetry,
auth_context: AuthRequestTelemetryContext,
request_route_telemetry: RequestRouteTelemetry,
) -> (Arc<dyn RequestTelemetry>, Arc<dyn SseTelemetry>) {
let telemetry = Arc::new(ApiTelemetry::new(session_telemetry.clone()));
let telemetry = Arc::new(ApiTelemetry::new(
session_telemetry.clone(),
auth_context,
request_route_telemetry,
));
let request_telemetry: Arc<dyn RequestTelemetry> = telemetry.clone();
let sse_telemetry: Arc<dyn SseTelemetry> = telemetry;
(request_telemetry, sse_telemetry)
@@ -991,8 +1163,14 @@ impl ModelClientSession {
/// Builds telemetry for the Responses API WebSocket transport.
fn build_websocket_telemetry(
session_telemetry: &SessionTelemetry,
auth_context: AuthRequestTelemetryContext,
request_route_telemetry: RequestRouteTelemetry,
) -> Arc<dyn WebsocketTelemetry> {
let telemetry = Arc::new(ApiTelemetry::new(session_telemetry.clone()));
let telemetry = Arc::new(ApiTelemetry::new(
session_telemetry.clone(),
auth_context,
request_route_telemetry,
));
let websocket_telemetry: Arc<dyn WebsocketTelemetry> = telemetry;
websocket_telemetry
}
@@ -1126,6 +1304,7 @@ impl ModelClientSession {
self.websocket_session.connection = None;
self.websocket_session.last_request = None;
self.websocket_session.last_response_rx = None;
self.websocket_session.set_connection_reused(false);
}
activated
}
@@ -1264,30 +1443,209 @@ where
///
/// When refresh succeeds, the caller should retry the API call; otherwise
/// the mapped `CodexErr` is returned to the caller.
#[derive(Clone, Copy, Debug)]
struct UnauthorizedRecoveryExecution {
mode: &'static str,
phase: &'static str,
}
#[derive(Clone, Copy, Debug, Default)]
struct PendingUnauthorizedRetry {
retry_after_unauthorized: bool,
recovery_mode: Option<&'static str>,
recovery_phase: Option<&'static str>,
}
impl PendingUnauthorizedRetry {
fn from_recovery(recovery: UnauthorizedRecoveryExecution) -> Self {
Self {
retry_after_unauthorized: true,
recovery_mode: Some(recovery.mode),
recovery_phase: Some(recovery.phase),
}
}
}
#[derive(Clone, Copy, Debug, Default)]
struct AuthRequestTelemetryContext {
auth_mode: Option<&'static str>,
auth_header_attached: bool,
auth_header_name: Option<&'static str>,
retry_after_unauthorized: bool,
recovery_mode: Option<&'static str>,
recovery_phase: Option<&'static str>,
}
impl AuthRequestTelemetryContext {
fn new(
auth_mode: Option<AuthMode>,
api_auth: &CoreAuthProvider,
retry: PendingUnauthorizedRetry,
) -> Self {
Self {
auth_mode: auth_mode.map(|mode| match mode {
AuthMode::ApiKey => "ApiKey",
AuthMode::Chatgpt => "Chatgpt",
}),
auth_header_attached: api_auth.auth_header_attached(),
auth_header_name: api_auth.auth_header_name(),
retry_after_unauthorized: retry.retry_after_unauthorized,
recovery_mode: retry.recovery_mode,
recovery_phase: retry.recovery_phase,
}
}
}
struct WebsocketConnectParams<'a> {
session_telemetry: &'a SessionTelemetry,
api_provider: codex_api::Provider,
api_auth: CoreAuthProvider,
turn_metadata_header: Option<&'a str>,
options: &'a ApiResponsesOptions,
auth_context: AuthRequestTelemetryContext,
request_route_telemetry: RequestRouteTelemetry,
}
async fn handle_unauthorized(
transport: TransportError,
auth_recovery: &mut Option<UnauthorizedRecovery>,
) -> Result<()> {
session_telemetry: &SessionTelemetry,
) -> Result<UnauthorizedRecoveryExecution> {
let debug = extract_response_debug_context(&transport);
if let Some(recovery) = auth_recovery
&& recovery.has_next()
{
let mode = recovery.mode_name();
let phase = recovery.step_name();
return match recovery.next().await {
Ok(_) => Ok(()),
Err(RefreshTokenError::Permanent(failed)) => Err(CodexErr::RefreshTokenFailed(failed)),
Err(RefreshTokenError::Transient(other)) => Err(CodexErr::Io(other)),
Ok(step_result) => {
session_telemetry.record_auth_recovery(
mode,
phase,
"recovery_succeeded",
debug.request_id.as_deref(),
debug.cf_ray.as_deref(),
debug.auth_error.as_deref(),
debug.auth_error_code.as_deref(),
None,
step_result.auth_state_changed(),
);
emit_feedback_auth_recovery_tags(
mode,
phase,
"recovery_succeeded",
debug.request_id.as_deref(),
debug.cf_ray.as_deref(),
debug.auth_error.as_deref(),
debug.auth_error_code.as_deref(),
);
Ok(UnauthorizedRecoveryExecution { mode, phase })
}
Err(RefreshTokenError::Permanent(failed)) => {
session_telemetry.record_auth_recovery(
mode,
phase,
"recovery_failed_permanent",
debug.request_id.as_deref(),
debug.cf_ray.as_deref(),
debug.auth_error.as_deref(),
debug.auth_error_code.as_deref(),
None,
None,
);
emit_feedback_auth_recovery_tags(
mode,
phase,
"recovery_failed_permanent",
debug.request_id.as_deref(),
debug.cf_ray.as_deref(),
debug.auth_error.as_deref(),
debug.auth_error_code.as_deref(),
);
Err(CodexErr::RefreshTokenFailed(failed))
}
Err(RefreshTokenError::Transient(other)) => {
session_telemetry.record_auth_recovery(
mode,
phase,
"recovery_failed_transient",
debug.request_id.as_deref(),
debug.cf_ray.as_deref(),
debug.auth_error.as_deref(),
debug.auth_error_code.as_deref(),
None,
None,
);
emit_feedback_auth_recovery_tags(
mode,
phase,
"recovery_failed_transient",
debug.request_id.as_deref(),
debug.cf_ray.as_deref(),
debug.auth_error.as_deref(),
debug.auth_error_code.as_deref(),
);
Err(CodexErr::Io(other))
}
};
}
let (mode, phase, recovery_reason) = match auth_recovery.as_ref() {
Some(recovery) => (
recovery.mode_name(),
recovery.step_name(),
Some(recovery.unavailable_reason()),
),
None => ("none", "none", Some("auth_manager_missing")),
};
session_telemetry.record_auth_recovery(
mode,
phase,
"recovery_not_run",
debug.request_id.as_deref(),
debug.cf_ray.as_deref(),
debug.auth_error.as_deref(),
debug.auth_error_code.as_deref(),
recovery_reason,
None,
);
emit_feedback_auth_recovery_tags(
mode,
phase,
"recovery_not_run",
debug.request_id.as_deref(),
debug.cf_ray.as_deref(),
debug.auth_error.as_deref(),
debug.auth_error_code.as_deref(),
);
Err(map_api_error(ApiError::Transport(transport)))
}
fn api_error_http_status(error: &ApiError) -> Option<u16> {
match error {
ApiError::Transport(TransportError::Http { status, .. }) => Some(status.as_u16()),
_ => None,
}
}
struct ApiTelemetry {
session_telemetry: SessionTelemetry,
auth_context: AuthRequestTelemetryContext,
request_route_telemetry: RequestRouteTelemetry,
}
impl ApiTelemetry {
fn new(session_telemetry: SessionTelemetry) -> Self {
Self { session_telemetry }
fn new(
session_telemetry: SessionTelemetry,
auth_context: AuthRequestTelemetryContext,
request_route_telemetry: RequestRouteTelemetry,
) -> Self {
Self {
session_telemetry,
auth_context,
request_route_telemetry,
}
}
}
@@ -1299,13 +1657,50 @@ impl RequestTelemetry for ApiTelemetry {
error: Option<&TransportError>,
duration: Duration,
) {
let error_message = error.map(std::string::ToString::to_string);
let error_message = error.map(telemetry_transport_error_message);
let status = status.map(|s| s.as_u16());
let debug = error
.map(extract_response_debug_context)
.unwrap_or_default();
self.session_telemetry.record_api_request(
attempt,
status.map(|s| s.as_u16()),
status,
error_message.as_deref(),
duration,
self.auth_context.auth_header_attached,
self.auth_context.auth_header_name,
self.auth_context.retry_after_unauthorized,
self.auth_context.recovery_mode,
self.auth_context.recovery_phase,
self.request_route_telemetry.endpoint,
debug.request_id.as_deref(),
debug.cf_ray.as_deref(),
debug.auth_error.as_deref(),
debug.auth_error_code.as_deref(),
);
emit_feedback_request_tags(&FeedbackRequestTags {
endpoint: self.request_route_telemetry.endpoint,
auth_header_attached: self.auth_context.auth_header_attached,
auth_header_name: self.auth_context.auth_header_name,
auth_mode: self.auth_context.auth_mode,
auth_retry_after_unauthorized: Some(self.auth_context.retry_after_unauthorized),
auth_recovery_mode: self.auth_context.recovery_mode,
auth_recovery_phase: self.auth_context.recovery_phase,
auth_connection_reused: None,
auth_request_id: debug.request_id.as_deref(),
auth_cf_ray: debug.cf_ray.as_deref(),
auth_error: debug.auth_error.as_deref(),
auth_error_code: debug.auth_error_code.as_deref(),
auth_recovery_followup_success: self
.auth_context
.retry_after_unauthorized
.then_some(error.is_none()),
auth_recovery_followup_status: self
.auth_context
.retry_after_unauthorized
.then_some(status)
.flatten(),
});
}
}
@@ -1323,10 +1718,40 @@ impl SseTelemetry for ApiTelemetry {
}
impl WebsocketTelemetry for ApiTelemetry {
fn on_ws_request(&self, duration: Duration, error: Option<&ApiError>) {
let error_message = error.map(std::string::ToString::to_string);
self.session_telemetry
.record_websocket_request(duration, error_message.as_deref());
fn on_ws_request(&self, duration: Duration, error: Option<&ApiError>, connection_reused: bool) {
let error_message = error.map(telemetry_api_error_message);
let status = error.and_then(api_error_http_status);
let debug = error
.map(extract_response_debug_context_from_api_error)
.unwrap_or_default();
self.session_telemetry.record_websocket_request(
duration,
error_message.as_deref(),
connection_reused,
);
emit_feedback_request_tags(&FeedbackRequestTags {
endpoint: self.request_route_telemetry.endpoint,
auth_header_attached: self.auth_context.auth_header_attached,
auth_header_name: self.auth_context.auth_header_name,
auth_mode: self.auth_context.auth_mode,
auth_retry_after_unauthorized: Some(self.auth_context.retry_after_unauthorized),
auth_recovery_mode: self.auth_context.recovery_mode,
auth_recovery_phase: self.auth_context.recovery_phase,
auth_connection_reused: Some(connection_reused),
auth_request_id: debug.request_id.as_deref(),
auth_cf_ray: debug.cf_ray.as_deref(),
auth_error: debug.auth_error.as_deref(),
auth_error_code: debug.auth_error_code.as_deref(),
auth_recovery_followup_success: self
.auth_context
.retry_after_unauthorized
.then_some(error.is_none()),
auth_recovery_followup_status: self
.auth_context
.retry_after_unauthorized
.then_some(status)
.flatten(),
});
}
fn on_ws_event(

View File

@@ -1,4 +1,7 @@
use super::AuthRequestTelemetryContext;
use super::ModelClient;
use super::PendingUnauthorizedRetry;
use super::UnauthorizedRecoveryExecution;
use codex_otel::SessionTelemetry;
use codex_protocol::ThreadId;
use codex_protocol::openai_models::ModelInfo;
@@ -94,3 +97,22 @@ async fn summarize_memories_returns_empty_for_empty_input() {
.expect("empty summarize request should succeed");
assert_eq!(output.len(), 0);
}
#[test]
fn auth_request_telemetry_context_tracks_attached_auth_and_retry_phase() {
let auth_context = AuthRequestTelemetryContext::new(
Some(crate::auth::AuthMode::Chatgpt),
&crate::api_bridge::CoreAuthProvider::for_test(Some("access-token"), Some("workspace-123")),
PendingUnauthorizedRetry::from_recovery(UnauthorizedRecoveryExecution {
mode: "managed",
phase: "refresh_token",
}),
);
assert_eq!(auth_context.auth_mode, Some("Chatgpt"));
assert!(auth_context.auth_header_attached);
assert_eq!(auth_context.auth_header_name, Some("authorization"));
assert!(auth_context.retry_after_unauthorized);
assert_eq!(auth_context.recovery_mode, Some("managed"));
assert_eq!(auth_context.recovery_phase, Some("refresh_token"));
}

View File

@@ -3936,12 +3936,13 @@ impl Session {
server: &str,
tool: &str,
arguments: Option<serde_json::Value>,
meta: Option<serde_json::Value>,
) -> anyhow::Result<CallToolResult> {
self.services
.mcp_connection_manager
.read()
.await
.call_tool(server, tool, arguments)
.call_tool(server, tool, arguments, meta)
.await
}

View File

@@ -292,6 +292,8 @@ pub struct UnexpectedResponseError {
pub url: Option<String>,
pub cf_ray: Option<String>,
pub request_id: Option<String>,
pub identity_authorization_error: Option<String>,
pub identity_error_code: Option<String>,
}
const CLOUDFLARE_BLOCKED_MESSAGE: &str =
@@ -346,6 +348,12 @@ impl UnexpectedResponseError {
if let Some(id) = &self.request_id {
message.push_str(&format!(", request id: {id}"));
}
if let Some(auth_error) = &self.identity_authorization_error {
message.push_str(&format!(", auth error: {auth_error}"));
}
if let Some(error_code) = &self.identity_error_code {
message.push_str(&format!(", auth error code: {error_code}"));
}
Some(message)
}
@@ -368,6 +376,12 @@ impl std::fmt::Display for UnexpectedResponseError {
if let Some(id) = &self.request_id {
message.push_str(&format!(", request id: {id}"));
}
if let Some(auth_error) = &self.identity_authorization_error {
message.push_str(&format!(", auth error: {auth_error}"));
}
if let Some(error_code) = &self.identity_error_code {
message.push_str(&format!(", auth error code: {error_code}"));
}
write!(f, "{message}")
}
}

View File

@@ -328,6 +328,8 @@ fn unexpected_status_cloudflare_html_is_simplified() {
url: Some("http://example.com/blocked".to_string()),
cf_ray: Some("ray-id".to_string()),
request_id: None,
identity_authorization_error: None,
identity_error_code: None,
};
let status = StatusCode::FORBIDDEN.to_string();
let url = "http://example.com/blocked";
@@ -345,6 +347,8 @@ fn unexpected_status_non_html_is_unchanged() {
url: Some("http://example.com/plain".to_string()),
cf_ray: None,
request_id: None,
identity_authorization_error: None,
identity_error_code: None,
};
let status = StatusCode::FORBIDDEN.to_string();
let url = "http://example.com/plain";
@@ -363,6 +367,8 @@ fn unexpected_status_prefers_error_message_when_present() {
url: Some("https://chatgpt.com/backend-api/codex/responses".to_string()),
cf_ray: None,
request_id: Some("req-123".to_string()),
identity_authorization_error: None,
identity_error_code: None,
};
let status = StatusCode::UNAUTHORIZED.to_string();
assert_eq!(
@@ -382,6 +388,8 @@ fn unexpected_status_truncates_long_body_with_ellipsis() {
url: Some("http://example.com/long".to_string()),
cf_ray: None,
request_id: Some("req-long".to_string()),
identity_authorization_error: None,
identity_error_code: None,
};
let status = StatusCode::BAD_GATEWAY.to_string();
let expected_body = format!("{}...", "x".repeat(UNEXPECTED_RESPONSE_BODY_MAX_BYTES));
@@ -401,6 +409,8 @@ fn unexpected_status_includes_cf_ray_and_request_id() {
url: Some("https://chatgpt.com/backend-api/codex/responses".to_string()),
cf_ray: Some("9c81f9f18f2fa49d-LHR".to_string()),
request_id: Some("req-xyz".to_string()),
identity_authorization_error: None,
identity_error_code: None,
};
let status = StatusCode::UNAUTHORIZED.to_string();
assert_eq!(
@@ -411,6 +421,26 @@ fn unexpected_status_includes_cf_ray_and_request_id() {
);
}
#[test]
fn unexpected_status_includes_identity_auth_details() {
let err = UnexpectedResponseError {
status: StatusCode::UNAUTHORIZED,
body: "plain text error".to_string(),
url: Some("https://chatgpt.com/backend-api/codex/models".to_string()),
cf_ray: Some("cf-ray-auth-401-test".to_string()),
request_id: Some("req-auth".to_string()),
identity_authorization_error: Some("missing_authorization_header".to_string()),
identity_error_code: Some("token_expired".to_string()),
};
let status = StatusCode::UNAUTHORIZED.to_string();
assert_eq!(
err.to_string(),
format!(
"unexpected status {status}: plain text error, url: https://chatgpt.com/backend-api/codex/models, cf-ray: cf-ray-auth-401-test, request id: req-auth, auth error: missing_authorization_header, auth error code: token_expired"
)
);
}
#[test]
fn usage_limit_reached_includes_hours_and_minutes() {
let base = Utc.with_ymd_and_hms(2024, 1, 1, 0, 0, 0).unwrap();

View File

@@ -154,8 +154,6 @@ pub enum Feature {
Plugins,
/// Allow the model to invoke the built-in image generation tool.
ImageGeneration,
/// Route apps MCP calls through the configured gateway.
AppsMcpGateway,
/// Allow prompting and installing missing MCP dependencies.
SkillMcpDependencyInstall,
/// Prompt for missing skill env var dependencies.
@@ -753,12 +751,6 @@ pub const FEATURES: &[FeatureSpec] = &[
stage: Stage::UnderDevelopment,
default_enabled: false,
},
FeatureSpec {
id: Feature::AppsMcpGateway,
key: "apps_mcp_gateway",
stage: Stage::UnderDevelopment,
default_enabled: false,
},
FeatureSpec {
id: Feature::SkillMcpDependencyInstall,
key: "skill_mcp_dependency_install",

View File

@@ -87,6 +87,7 @@ pub use model_provider_info::WireApi;
pub use model_provider_info::built_in_model_providers;
pub use model_provider_info::create_oss_provider_with_base_url;
mod event_mapping;
mod response_debug_context;
pub mod review_format;
pub mod review_prompts;
mod seatbelt_permissions;

View File

@@ -21,7 +21,6 @@ use crate::CodexAuth;
use crate::config::Config;
use crate::config::types::McpServerConfig;
use crate::config::types::McpServerTransportConfig;
use crate::features::Feature;
use crate::mcp::auth::compute_auth_statuses;
use crate::mcp_connection_manager::McpConnectionManager;
use crate::mcp_connection_manager::SandboxState;
@@ -33,8 +32,6 @@ const MCP_TOOL_NAME_PREFIX: &str = "mcp";
const MCP_TOOL_NAME_DELIMITER: &str = "__";
pub(crate) const CODEX_APPS_MCP_SERVER_NAME: &str = "codex_apps";
const CODEX_CONNECTORS_TOKEN_ENV_VAR: &str = "CODEX_CONNECTORS_TOKEN";
const OPENAI_CONNECTORS_MCP_BASE_URL: &str = "https://api.openai.com";
const OPENAI_CONNECTORS_MCP_PATH: &str = "/v1/connectors/gateways/flat/mcp";
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct ToolPluginProvenance {
@@ -94,13 +91,6 @@ impl ToolPluginProvenance {
}
}
// Legacy vs new MCP gateway
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum CodexAppsMcpGateway {
LegacyMCPGateway,
MCPGateway,
}
fn codex_apps_mcp_bearer_token_env_var() -> Option<String> {
match env::var(CODEX_CONNECTORS_TOKEN_ENV_VAR) {
Ok(value) if !value.trim().is_empty() => Some(CODEX_CONNECTORS_TOKEN_ENV_VAR.to_string()),
@@ -135,14 +125,6 @@ fn codex_apps_mcp_http_headers(auth: Option<&CodexAuth>) -> Option<HashMap<Strin
}
}
fn selected_config_codex_apps_mcp_gateway(config: &Config) -> CodexAppsMcpGateway {
if config.features.enabled(Feature::AppsMcpGateway) {
CodexAppsMcpGateway::MCPGateway
} else {
CodexAppsMcpGateway::LegacyMCPGateway
}
}
fn normalize_codex_apps_base_url(base_url: &str) -> String {
let mut base_url = base_url.trim_end_matches('/').to_string();
if (base_url.starts_with("https://chatgpt.com")
@@ -154,11 +136,7 @@ fn normalize_codex_apps_base_url(base_url: &str) -> String {
base_url
}
fn codex_apps_mcp_url_for_gateway(base_url: &str, gateway: CodexAppsMcpGateway) -> String {
if gateway == CodexAppsMcpGateway::MCPGateway {
return format!("{OPENAI_CONNECTORS_MCP_BASE_URL}{OPENAI_CONNECTORS_MCP_PATH}");
}
fn codex_apps_mcp_url_for_base_url(base_url: &str) -> String {
let base_url = normalize_codex_apps_base_url(base_url);
if base_url.contains("/backend-api") {
format!("{base_url}/wham/apps")
@@ -170,10 +148,7 @@ fn codex_apps_mcp_url_for_gateway(base_url: &str, gateway: CodexAppsMcpGateway)
}
pub(crate) fn codex_apps_mcp_url(config: &Config) -> String {
codex_apps_mcp_url_for_gateway(
&config.chatgpt_base_url,
selected_config_codex_apps_mcp_gateway(config),
)
codex_apps_mcp_url_for_base_url(&config.chatgpt_base_url)
}
fn codex_apps_mcp_server_config(config: &Config, auth: Option<&CodexAuth>) -> McpServerConfig {

View File

@@ -1,6 +1,7 @@
use super::*;
use crate::config::CONFIG_TOML_FILE;
use crate::config::ConfigBuilder;
use crate::features::Feature;
use crate::plugins::AppConnectorId;
use crate::plugins::PluginCapabilitySummary;
use pretty_assertions::assert_eq;
@@ -123,67 +124,27 @@ fn tool_plugin_provenance_collects_app_and_mcp_sources() {
}
#[test]
fn codex_apps_mcp_url_for_default_gateway_keeps_existing_paths() {
fn codex_apps_mcp_url_for_base_url_keeps_existing_paths() {
assert_eq!(
codex_apps_mcp_url_for_gateway(
"https://chatgpt.com/backend-api",
CodexAppsMcpGateway::LegacyMCPGateway
),
codex_apps_mcp_url_for_base_url("https://chatgpt.com/backend-api"),
"https://chatgpt.com/backend-api/wham/apps"
);
assert_eq!(
codex_apps_mcp_url_for_gateway(
"https://chat.openai.com",
CodexAppsMcpGateway::LegacyMCPGateway
),
codex_apps_mcp_url_for_base_url("https://chat.openai.com"),
"https://chat.openai.com/backend-api/wham/apps"
);
assert_eq!(
codex_apps_mcp_url_for_gateway(
"http://localhost:8080/api/codex",
CodexAppsMcpGateway::LegacyMCPGateway
),
codex_apps_mcp_url_for_base_url("http://localhost:8080/api/codex"),
"http://localhost:8080/api/codex/apps"
);
assert_eq!(
codex_apps_mcp_url_for_gateway(
"http://localhost:8080",
CodexAppsMcpGateway::LegacyMCPGateway
),
codex_apps_mcp_url_for_base_url("http://localhost:8080"),
"http://localhost:8080/api/codex/apps"
);
}
#[test]
fn codex_apps_mcp_url_for_gateway_uses_openai_connectors_gateway() {
let expected_url = format!("{OPENAI_CONNECTORS_MCP_BASE_URL}{OPENAI_CONNECTORS_MCP_PATH}");
assert_eq!(
codex_apps_mcp_url_for_gateway(
"https://chatgpt.com/backend-api",
CodexAppsMcpGateway::MCPGateway
),
expected_url.as_str()
);
assert_eq!(
codex_apps_mcp_url_for_gateway("https://chat.openai.com", CodexAppsMcpGateway::MCPGateway),
expected_url.as_str()
);
assert_eq!(
codex_apps_mcp_url_for_gateway(
"http://localhost:8080/api/codex",
CodexAppsMcpGateway::MCPGateway
),
expected_url.as_str()
);
assert_eq!(
codex_apps_mcp_url_for_gateway("http://localhost:8080", CodexAppsMcpGateway::MCPGateway),
expected_url.as_str()
);
}
#[test]
fn codex_apps_mcp_url_uses_default_gateway_when_feature_is_disabled() {
fn codex_apps_mcp_url_uses_legacy_codex_apps_path() {
let mut config = crate::config::test_config();
config.chatgpt_base_url = "https://chatgpt.com".to_string();
@@ -194,22 +155,7 @@ fn codex_apps_mcp_url_uses_default_gateway_when_feature_is_disabled() {
}
#[test]
fn codex_apps_mcp_url_uses_openai_connectors_gateway_when_feature_is_enabled() {
let mut config = crate::config::test_config();
config.chatgpt_base_url = "https://chatgpt.com".to_string();
config
.features
.enable(Feature::AppsMcpGateway)
.expect("test config should allow apps gateway");
assert_eq!(
codex_apps_mcp_url(&config),
format!("{OPENAI_CONNECTORS_MCP_BASE_URL}{OPENAI_CONNECTORS_MCP_PATH}")
);
}
#[test]
fn codex_apps_server_config_switches_gateway_with_flags() {
fn codex_apps_server_config_uses_legacy_codex_apps_path() {
let mut config = crate::config::test_config();
config.chatgpt_base_url = "https://chatgpt.com".to_string();
@@ -231,22 +177,6 @@ fn codex_apps_server_config_switches_gateway_with_flags() {
};
assert_eq!(url, "https://chatgpt.com/backend-api/wham/apps");
config
.features
.enable(Feature::AppsMcpGateway)
.expect("test config should allow apps gateway");
servers = with_codex_apps_mcp(servers, true, None, &config);
let server = servers
.get(CODEX_APPS_MCP_SERVER_NAME)
.expect("codex apps should remain present when apps stays enabled");
let url = match &server.transport {
McpServerTransportConfig::StreamableHttp { url, .. } => url,
_ => panic!("expected streamable http transport for codex apps"),
};
let expected_url = format!("{OPENAI_CONNECTORS_MCP_BASE_URL}{OPENAI_CONNECTORS_MCP_PATH}");
assert_eq!(url, &expected_url);
}
#[tokio::test]

View File

@@ -1014,6 +1014,7 @@ impl McpConnectionManager {
server: &str,
tool: &str,
arguments: Option<serde_json::Value>,
meta: Option<serde_json::Value>,
) -> Result<CallToolResult> {
let client = self.client_by_name(server).await?;
if !client.tool_filter.allows(tool) {
@@ -1024,7 +1025,7 @@ impl McpConnectionManager {
let result: rmcp::model::CallToolResult = client
.client
.call_tool(tool.to_string(), arguments, client.tool_timeout)
.call_tool(tool.to_string(), arguments, meta, client.tool_timeout)
.await
.with_context(|| format!("tool call failed for `{server}/{tool}`"))?;

View File

@@ -117,6 +117,7 @@ pub(crate) async fn handle_mcp_tool_call(
.counter("codex.mcp.call", 1, &[("status", status)]);
return CallToolResult::from_result(result);
}
let request_meta = build_mcp_tool_call_request_meta(&server, metadata.as_ref());
let tool_call_begin_event = EventMsg::McpToolCallBegin(McpToolCallBeginEvent {
call_id: call_id.clone(),
@@ -142,7 +143,12 @@ pub(crate) async fn handle_mcp_tool_call(
let start = Instant::now();
let result = sess
.call_tool(&server, &tool_name, arguments_value.clone())
.call_tool(
&server,
&tool_name,
arguments_value.clone(),
request_meta.clone(),
)
.await
.map_err(|e| format!("tool call error: {e:?}"));
let result = sanitize_mcp_tool_result_for_model(
@@ -226,7 +232,7 @@ pub(crate) async fn handle_mcp_tool_call(
let start = Instant::now();
// Perform the tool call.
let result = sess
.call_tool(&server, &tool_name, arguments_value.clone())
.call_tool(&server, &tool_name, arguments_value.clone(), request_meta)
.await
.map_err(|e| format!("tool call error: {e:?}"));
let result = sanitize_mcp_tool_result_for_model(
@@ -374,6 +380,24 @@ pub(crate) struct McpToolApprovalMetadata {
connector_description: Option<String>,
tool_title: Option<String>,
tool_description: Option<String>,
codex_apps_meta: Option<serde_json::Map<String, serde_json::Value>>,
}
const MCP_TOOL_CODEX_APPS_META_KEY: &str = "_codex_apps";
fn build_mcp_tool_call_request_meta(
server: &str,
metadata: Option<&McpToolApprovalMetadata>,
) -> Option<serde_json::Value> {
if server != CODEX_APPS_MCP_SERVER_NAME {
return None;
}
let codex_apps_meta = metadata.and_then(|metadata| metadata.codex_apps_meta.as_ref())?;
Some(serde_json::json!({
MCP_TOOL_CODEX_APPS_META_KEY: codex_apps_meta,
}))
}
#[derive(Clone, Copy)]
@@ -750,6 +774,13 @@ pub(crate) async fn lookup_mcp_tool_metadata(
connector_description,
tool_title: tool_info.tool.title,
tool_description: tool_info.tool.description.map(std::borrow::Cow::into_owned),
codex_apps_meta: tool_info
.tool
.meta
.as_ref()
.and_then(|meta| meta.get(MCP_TOOL_CODEX_APPS_META_KEY))
.and_then(serde_json::Value::as_object)
.cloned(),
})
}

View File

@@ -47,6 +47,7 @@ fn approval_metadata(
connector_description: connector_description.map(str::to_string),
tool_title: tool_title.map(str::to_string),
tool_description: tool_description.map(str::to_string),
codex_apps_meta: None,
}
}
@@ -415,6 +416,39 @@ fn sanitize_mcp_tool_result_for_model_preserves_image_when_supported() {
assert_eq!(got, original);
}
#[test]
fn codex_apps_tool_call_request_meta_includes_codex_apps_meta() {
let metadata = McpToolApprovalMetadata {
annotations: None,
connector_id: Some("calendar".to_string()),
connector_name: Some("Calendar".to_string()),
connector_description: Some("Manage events".to_string()),
tool_title: Some("Create Event".to_string()),
tool_description: Some("Create a calendar event.".to_string()),
codex_apps_meta: Some(
serde_json::json!({
"resource_uri": "connector://calendar/tools/calendar_create_event",
"contains_mcp_source": true,
"connector_id": "calendar",
})
.as_object()
.cloned()
.expect("_codex_apps metadata should be an object"),
),
};
assert_eq!(
build_mcp_tool_call_request_meta(CODEX_APPS_MCP_SERVER_NAME, Some(&metadata)),
Some(serde_json::json!({
MCP_TOOL_CODEX_APPS_META_KEY: {
"resource_uri": "connector://calendar/tools/calendar_create_event",
"contains_mcp_source": true,
"connector_id": "calendar",
},
}))
);
}
#[test]
fn accepted_elicitation_content_converts_to_request_user_input_response() {
let response = request_user_input_response_from_elicitation_content(Some(serde_json::json!(
@@ -535,6 +569,7 @@ fn guardian_mcp_review_request_includes_annotations_when_present() {
connector_description: None,
tool_title: None,
tool_description: None,
codex_apps_meta: None,
};
let request = build_guardian_mcp_tool_review_request("call-1", &invocation, Some(&metadata));
@@ -856,6 +891,7 @@ async fn approve_mode_skips_when_annotations_do_not_require_approval() {
connector_description: None,
tool_title: Some("Read Only Tool".to_string()),
tool_description: None,
codex_apps_meta: None,
};
let decision = maybe_request_mcp_tool_approval(
@@ -919,6 +955,7 @@ async fn approve_mode_blocks_when_arc_returns_interrupt_for_model() {
connector_description: Some("Manage events".to_string()),
tool_title: Some("Dangerous Tool".to_string()),
tool_description: Some("Performs a risky action.".to_string()),
codex_apps_meta: None,
};
let decision = maybe_request_mcp_tool_approval(
@@ -1021,6 +1058,7 @@ async fn approve_mode_routes_arc_ask_user_to_guardian_when_guardian_reviewer_is_
connector_description: Some("Manage events".to_string()),
tool_title: Some("Dangerous Tool".to_string()),
tool_description: Some("Performs a risky action.".to_string()),
codex_apps_meta: None,
};
let decision = maybe_request_mcp_tool_approval(

View File

@@ -3,6 +3,7 @@ use crate::api_bridge::auth_provider_from_auth;
use crate::api_bridge::map_api_error;
use crate::auth::AuthManager;
use crate::auth::AuthMode;
use crate::auth::CodexAuth;
use crate::config::Config;
use crate::default_client::build_reqwest_client;
use crate::error::CodexErr;
@@ -11,8 +12,15 @@ use crate::model_provider_info::ModelProviderInfo;
use crate::models_manager::collaboration_mode_presets::CollaborationModesConfig;
use crate::models_manager::collaboration_mode_presets::builtin_collaboration_mode_presets;
use crate::models_manager::model_info;
use crate::response_debug_context::extract_response_debug_context;
use crate::response_debug_context::telemetry_transport_error_message;
use crate::util::FeedbackRequestTags;
use crate::util::emit_feedback_request_tags;
use codex_api::ModelsClient;
use codex_api::RequestTelemetry;
use codex_api::ReqwestTransport;
use codex_api::TransportError;
use codex_otel::TelemetryAuthMode;
use codex_protocol::config_types::CollaborationModeMask;
use codex_protocol::openai_models::ModelInfo;
use codex_protocol::openai_models::ModelPreset;
@@ -32,6 +40,82 @@ use tracing::instrument;
const MODEL_CACHE_FILE: &str = "models_cache.json";
const DEFAULT_MODEL_CACHE_TTL: Duration = Duration::from_secs(300);
const MODELS_REFRESH_TIMEOUT: Duration = Duration::from_secs(5);
const MODELS_ENDPOINT: &str = "/models";
#[derive(Clone)]
struct ModelsRequestTelemetry {
auth_mode: Option<String>,
auth_header_attached: bool,
auth_header_name: Option<&'static str>,
}
impl RequestTelemetry for ModelsRequestTelemetry {
fn on_request(
&self,
attempt: u64,
status: Option<http::StatusCode>,
error: Option<&TransportError>,
duration: Duration,
) {
let success = status.is_some_and(|code| code.is_success()) && error.is_none();
let error_message = error.map(telemetry_transport_error_message);
let response_debug = error
.map(extract_response_debug_context)
.unwrap_or_default();
let status = status.map(|status| status.as_u16());
tracing::event!(
target: "codex_otel.log_only",
tracing::Level::INFO,
event.name = "codex.api_request",
duration_ms = %duration.as_millis(),
http.response.status_code = status,
success = success,
error.message = error_message.as_deref(),
attempt = attempt,
endpoint = MODELS_ENDPOINT,
auth.header_attached = self.auth_header_attached,
auth.header_name = self.auth_header_name,
auth.request_id = response_debug.request_id.as_deref(),
auth.cf_ray = response_debug.cf_ray.as_deref(),
auth.error = response_debug.auth_error.as_deref(),
auth.error_code = response_debug.auth_error_code.as_deref(),
auth.mode = self.auth_mode.as_deref(),
);
tracing::event!(
target: "codex_otel.trace_safe",
tracing::Level::INFO,
event.name = "codex.api_request",
duration_ms = %duration.as_millis(),
http.response.status_code = status,
success = success,
error.message = error_message.as_deref(),
attempt = attempt,
endpoint = MODELS_ENDPOINT,
auth.header_attached = self.auth_header_attached,
auth.header_name = self.auth_header_name,
auth.request_id = response_debug.request_id.as_deref(),
auth.cf_ray = response_debug.cf_ray.as_deref(),
auth.error = response_debug.auth_error.as_deref(),
auth.error_code = response_debug.auth_error_code.as_deref(),
auth.mode = self.auth_mode.as_deref(),
);
emit_feedback_request_tags(&FeedbackRequestTags {
endpoint: MODELS_ENDPOINT,
auth_header_attached: self.auth_header_attached,
auth_header_name: self.auth_header_name,
auth_mode: self.auth_mode.as_deref(),
auth_retry_after_unauthorized: None,
auth_recovery_mode: None,
auth_recovery_phase: None,
auth_connection_reused: None,
auth_request_id: response_debug.request_id.as_deref(),
auth_cf_ray: response_debug.cf_ray.as_deref(),
auth_error: response_debug.auth_error.as_deref(),
auth_error_code: response_debug.auth_error_code.as_deref(),
auth_recovery_followup_success: None,
auth_recovery_followup_status: None,
});
}
}
/// Strategy for refreshing available models.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
@@ -330,11 +414,17 @@ impl ModelsManager {
let _timer =
codex_otel::start_global_timer("codex.remote_models.fetch_update.duration_ms", &[]);
let auth = self.auth_manager.auth().await;
let auth_mode = self.auth_manager.auth_mode();
let auth_mode = auth.as_ref().map(CodexAuth::auth_mode);
let api_provider = self.provider.to_api_provider(auth_mode)?;
let api_auth = auth_provider_from_auth(auth.clone(), &self.provider)?;
let transport = ReqwestTransport::new(build_reqwest_client());
let client = ModelsClient::new(transport, api_provider, api_auth);
let request_telemetry: Arc<dyn RequestTelemetry> = Arc::new(ModelsRequestTelemetry {
auth_mode: auth_mode.map(|mode| TelemetryAuthMode::from(mode).to_string()),
auth_header_attached: api_auth.auth_header_attached(),
auth_header_name: api_auth.auth_header_name(),
});
let client = ModelsClient::new(transport, api_provider, api_auth)
.with_telemetry(Some(request_telemetry));
let client_version = crate::models_manager::client_version_to_whole();
let (models, etag) = timeout(

View File

@@ -0,0 +1,167 @@
use base64::Engine;
use codex_api::TransportError;
use codex_api::error::ApiError;
const REQUEST_ID_HEADER: &str = "x-request-id";
const OAI_REQUEST_ID_HEADER: &str = "x-oai-request-id";
const CF_RAY_HEADER: &str = "cf-ray";
const AUTH_ERROR_HEADER: &str = "x-openai-authorization-error";
const X_ERROR_JSON_HEADER: &str = "x-error-json";
#[derive(Debug, Default, Clone, PartialEq, Eq)]
pub(crate) struct ResponseDebugContext {
pub(crate) request_id: Option<String>,
pub(crate) cf_ray: Option<String>,
pub(crate) auth_error: Option<String>,
pub(crate) auth_error_code: Option<String>,
}
pub(crate) fn extract_response_debug_context(transport: &TransportError) -> ResponseDebugContext {
let mut context = ResponseDebugContext::default();
let TransportError::Http {
headers, body: _, ..
} = transport
else {
return context;
};
let extract_header = |name: &str| {
headers
.as_ref()
.and_then(|headers| headers.get(name))
.and_then(|value| value.to_str().ok())
.map(str::to_string)
};
context.request_id =
extract_header(REQUEST_ID_HEADER).or_else(|| extract_header(OAI_REQUEST_ID_HEADER));
context.cf_ray = extract_header(CF_RAY_HEADER);
context.auth_error = extract_header(AUTH_ERROR_HEADER);
context.auth_error_code = extract_header(X_ERROR_JSON_HEADER).and_then(|encoded| {
let decoded = base64::engine::general_purpose::STANDARD
.decode(encoded)
.ok()?;
let parsed = serde_json::from_slice::<serde_json::Value>(&decoded).ok()?;
parsed
.get("error")
.and_then(|error| error.get("code"))
.and_then(serde_json::Value::as_str)
.map(str::to_string)
});
context
}
pub(crate) fn extract_response_debug_context_from_api_error(
error: &ApiError,
) -> ResponseDebugContext {
match error {
ApiError::Transport(transport) => extract_response_debug_context(transport),
_ => ResponseDebugContext::default(),
}
}
pub(crate) fn telemetry_transport_error_message(error: &TransportError) -> String {
match error {
TransportError::Http { status, .. } => format!("http {}", status.as_u16()),
TransportError::RetryLimit => "retry limit reached".to_string(),
TransportError::Timeout => "timeout".to_string(),
TransportError::Network(err) => err.to_string(),
TransportError::Build(err) => err.to_string(),
}
}
pub(crate) fn telemetry_api_error_message(error: &ApiError) -> String {
match error {
ApiError::Transport(transport) => telemetry_transport_error_message(transport),
ApiError::Api { status, .. } => format!("api error {}", status.as_u16()),
ApiError::Stream(err) => err.to_string(),
ApiError::ContextWindowExceeded => "context window exceeded".to_string(),
ApiError::QuotaExceeded => "quota exceeded".to_string(),
ApiError::UsageNotIncluded => "usage not included".to_string(),
ApiError::Retryable { .. } => "retryable error".to_string(),
ApiError::RateLimit(_) => "rate limit".to_string(),
ApiError::InvalidRequest { .. } => "invalid request".to_string(),
ApiError::ServerOverloaded => "server overloaded".to_string(),
}
}
#[cfg(test)]
mod tests {
use super::ResponseDebugContext;
use super::extract_response_debug_context;
use super::telemetry_api_error_message;
use super::telemetry_transport_error_message;
use codex_api::TransportError;
use codex_api::error::ApiError;
use http::HeaderMap;
use http::HeaderValue;
use http::StatusCode;
use pretty_assertions::assert_eq;
#[test]
fn extract_response_debug_context_decodes_identity_headers() {
let mut headers = HeaderMap::new();
headers.insert("x-oai-request-id", HeaderValue::from_static("req-auth"));
headers.insert("cf-ray", HeaderValue::from_static("ray-auth"));
headers.insert(
"x-openai-authorization-error",
HeaderValue::from_static("missing_authorization_header"),
);
headers.insert(
"x-error-json",
HeaderValue::from_static("eyJlcnJvciI6eyJjb2RlIjoidG9rZW5fZXhwaXJlZCJ9fQ=="),
);
let context = extract_response_debug_context(&TransportError::Http {
status: StatusCode::UNAUTHORIZED,
url: Some("https://chatgpt.com/backend-api/codex/models".to_string()),
headers: Some(headers),
body: Some(r#"{"error":{"message":"plain text error"},"status":401}"#.to_string()),
});
assert_eq!(
context,
ResponseDebugContext {
request_id: Some("req-auth".to_string()),
cf_ray: Some("ray-auth".to_string()),
auth_error: Some("missing_authorization_header".to_string()),
auth_error_code: Some("token_expired".to_string()),
}
);
}
#[test]
fn telemetry_error_messages_omit_http_bodies() {
let transport = TransportError::Http {
status: StatusCode::UNAUTHORIZED,
url: Some("https://chatgpt.com/backend-api/codex/responses".to_string()),
headers: None,
body: Some(r#"{"error":{"message":"secret token leaked"}}"#.to_string()),
};
assert_eq!(telemetry_transport_error_message(&transport), "http 401");
assert_eq!(
telemetry_api_error_message(&ApiError::Transport(transport)),
"http 401"
);
}
#[test]
fn telemetry_error_messages_preserve_non_http_details() {
let network = TransportError::Network("dns lookup failed".to_string());
let build = TransportError::Build("invalid header value".to_string());
let stream = ApiError::Stream("socket closed".to_string());
assert_eq!(
telemetry_transport_error_message(&network),
"dns lookup failed"
);
assert_eq!(
telemetry_transport_error_message(&build),
"invalid header value"
);
assert_eq!(telemetry_api_error_message(&stream), "socket closed");
}
}

View File

@@ -5,6 +5,7 @@ use crate::client_common::tools::ToolSpec;
use crate::config::AgentRoleConfig;
use crate::features::Feature;
use crate::features::Features;
use crate::mcp::CODEX_APPS_MCP_SERVER_NAME;
use crate::mcp_connection_manager::ToolInfo;
use crate::models_manager::collaboration_mode_presets::CollaborationModesConfig;
use crate::original_image_detail::can_request_original_image_detail;
@@ -1673,22 +1674,58 @@ fn create_tool_search_tool(app_tools: &HashMap<String, ToolInfo>) -> ToolSpec {
},
),
]);
let mut app_names = app_tools
.values()
.filter_map(|tool| tool.connector_name.clone())
.collect::<Vec<_>>();
app_names.sort();
app_names.dedup();
let app_names = app_names.join(", ");
let mut app_descriptions = BTreeMap::new();
for tool in app_tools.values() {
if tool.server_name != CODEX_APPS_MCP_SERVER_NAME {
continue;
}
let description = if app_names.is_empty() {
TOOL_SEARCH_DESCRIPTION_TEMPLATE
.replace("({{app_names}})", "(None currently enabled)")
.replace("{{app_names}}", "available apps")
let Some(connector_name) = tool
.connector_name
.as_deref()
.map(str::trim)
.filter(|connector_name| !connector_name.is_empty())
else {
continue;
};
let connector_description = tool
.connector_description
.as_deref()
.map(str::trim)
.filter(|connector_description| !connector_description.is_empty())
.map(str::to_string);
app_descriptions
.entry(connector_name.to_string())
.and_modify(|existing: &mut Option<String>| {
if existing.is_none() {
*existing = connector_description.clone();
}
})
.or_insert(connector_description);
}
let app_descriptions = if app_descriptions.is_empty() {
"None currently enabled.".to_string()
} else {
TOOL_SEARCH_DESCRIPTION_TEMPLATE.replace("{{app_names}}", app_names.as_str())
app_descriptions
.into_iter()
.map(
|(connector_name, connector_description)| match connector_description {
Some(connector_description) => {
format!("- {connector_name}: {connector_description}")
}
None => format!("- {connector_name}"),
},
)
.collect::<Vec<_>>()
.join("\n")
};
let description =
TOOL_SEARCH_DESCRIPTION_TEMPLATE.replace("{{app_descriptions}}", app_descriptions.as_str());
ToolSpec::ToolSearch {
execution: "client".to_string(),
description,

View File

@@ -1690,7 +1690,7 @@ fn test_build_specs_mcp_tools_sorted_by_name() {
}
#[test]
fn search_tool_description_includes_only_codex_apps_connector_names() {
fn search_tool_description_lists_each_codex_apps_connector_once() {
let model_info = search_capable_model_info();
let mut features = Features::with_defaults();
features.enable(Feature::Apps);
@@ -1736,7 +1736,45 @@ fn search_tool_description_includes_only_codex_apps_connector_names() {
connector_id: Some("calendar".to_string()),
connector_name: Some("Calendar".to_string()),
plugin_display_names: Vec::new(),
connector_description: None,
connector_description: Some(
"Plan events and manage your calendar.".to_string(),
),
},
),
(
"mcp__codex_apps__calendar_list_events".to_string(),
ToolInfo {
server_name: crate::mcp::CODEX_APPS_MCP_SERVER_NAME.to_string(),
tool_name: "_list_events".to_string(),
tool_namespace: "mcp__codex_apps__calendar".to_string(),
tool: mcp_tool(
"calendar-list-events",
"List calendar events",
serde_json::json!({"type": "object"}),
),
connector_id: Some("calendar".to_string()),
connector_name: Some("Calendar".to_string()),
plugin_display_names: Vec::new(),
connector_description: Some(
"Plan events and manage your calendar.".to_string(),
),
},
),
(
"mcp__codex_apps__gmail_search_threads".to_string(),
ToolInfo {
server_name: crate::mcp::CODEX_APPS_MCP_SERVER_NAME.to_string(),
tool_name: "_search_threads".to_string(),
tool_namespace: "mcp__codex_apps__gmail".to_string(),
tool: mcp_tool(
"gmail-search-threads",
"Search email threads",
serde_json::json!({"type": "object"}),
),
connector_id: Some("gmail".to_string()),
connector_name: Some("Gmail".to_string()),
plugin_display_names: Vec::new(),
connector_description: Some("Find and summarize email threads.".to_string()),
},
),
(
@@ -1762,7 +1800,14 @@ fn search_tool_description_includes_only_codex_apps_connector_names() {
panic!("expected tool_search tool");
};
let description = description.as_str();
assert!(description.contains("Calendar"));
assert!(description.contains("- Calendar: Plan events and manage your calendar."));
assert!(description.contains("- Gmail: Find and summarize email threads."));
assert_eq!(
description
.matches("- Calendar: Plan events and manage your calendar.")
.count(),
1
);
assert!(!description.contains("mcp__rmcp__echo"));
}
@@ -1874,8 +1919,56 @@ fn search_tool_description_handles_no_enabled_apps() {
panic!("expected tool_search tool");
};
assert!(description.contains("(None currently enabled)"));
assert!(!description.contains("{{app_names}}"));
assert!(description.contains("None currently enabled."));
assert!(!description.contains("{{app_descriptions}}"));
}
#[test]
fn search_tool_description_falls_back_to_connector_name_without_description() {
let model_info = search_capable_model_info();
let mut features = Features::with_defaults();
features.enable(Feature::Apps);
let available_models = Vec::new();
let tools_config = ToolsConfig::new(&ToolsConfigParams {
model_info: &model_info,
available_models: &available_models,
features: &features,
web_search_mode: Some(WebSearchMode::Cached),
session_source: SessionSource::Cli,
sandbox_policy: &SandboxPolicy::DangerFullAccess,
windows_sandbox_level: WindowsSandboxLevel::Disabled,
});
let (tools, _) = build_specs(
&tools_config,
None,
Some(HashMap::from([(
"mcp__codex_apps__calendar_create_event".to_string(),
ToolInfo {
server_name: crate::mcp::CODEX_APPS_MCP_SERVER_NAME.to_string(),
tool_name: "_create_event".to_string(),
tool_namespace: "mcp__codex_apps__calendar".to_string(),
tool: mcp_tool(
"calendar_create_event",
"Create calendar event",
serde_json::json!({"type": "object"}),
),
connector_id: Some("calendar".to_string()),
connector_name: Some("Calendar".to_string()),
plugin_display_names: Vec::new(),
connector_description: None,
},
)])),
&[],
)
.build();
let search_tool = find_tool(&tools, TOOL_SEARCH_TOOL_NAME);
let ToolSpec::ToolSearch { description, .. } = &search_tool.spec else {
panic!("expected tool_search tool");
};
assert!(description.contains("- Calendar"));
assert!(!description.contains("- Calendar:"));
}
#[test]

View File

@@ -37,6 +37,111 @@ macro_rules! feedback_tags {
};
}
pub(crate) struct FeedbackRequestTags<'a> {
pub endpoint: &'a str,
pub auth_header_attached: bool,
pub auth_header_name: Option<&'a str>,
pub auth_mode: Option<&'a str>,
pub auth_retry_after_unauthorized: Option<bool>,
pub auth_recovery_mode: Option<&'a str>,
pub auth_recovery_phase: Option<&'a str>,
pub auth_connection_reused: Option<bool>,
pub auth_request_id: Option<&'a str>,
pub auth_cf_ray: Option<&'a str>,
pub auth_error: Option<&'a str>,
pub auth_error_code: Option<&'a str>,
pub auth_recovery_followup_success: Option<bool>,
pub auth_recovery_followup_status: Option<u16>,
}
struct Auth401FeedbackSnapshot<'a> {
request_id: &'a str,
cf_ray: &'a str,
error: &'a str,
error_code: &'a str,
}
impl<'a> Auth401FeedbackSnapshot<'a> {
fn from_optional_fields(
request_id: Option<&'a str>,
cf_ray: Option<&'a str>,
error: Option<&'a str>,
error_code: Option<&'a str>,
) -> Self {
Self {
request_id: request_id.unwrap_or(""),
cf_ray: cf_ray.unwrap_or(""),
error: error.unwrap_or(""),
error_code: error_code.unwrap_or(""),
}
}
}
pub(crate) fn emit_feedback_request_tags(tags: &FeedbackRequestTags<'_>) {
let auth_header_name = tags.auth_header_name.unwrap_or("");
let auth_mode = tags.auth_mode.unwrap_or("");
let auth_retry_after_unauthorized = tags
.auth_retry_after_unauthorized
.map_or_else(String::new, |value| value.to_string());
let auth_recovery_mode = tags.auth_recovery_mode.unwrap_or("");
let auth_recovery_phase = tags.auth_recovery_phase.unwrap_or("");
let auth_connection_reused = tags
.auth_connection_reused
.map_or_else(String::new, |value| value.to_string());
let auth_request_id = tags.auth_request_id.unwrap_or("");
let auth_cf_ray = tags.auth_cf_ray.unwrap_or("");
let auth_error = tags.auth_error.unwrap_or("");
let auth_error_code = tags.auth_error_code.unwrap_or("");
let auth_recovery_followup_success = tags
.auth_recovery_followup_success
.map_or_else(String::new, |value| value.to_string());
let auth_recovery_followup_status = tags
.auth_recovery_followup_status
.map_or_else(String::new, |value| value.to_string());
feedback_tags!(
endpoint = tags.endpoint,
auth_header_attached = tags.auth_header_attached,
auth_header_name = auth_header_name,
auth_mode = auth_mode,
auth_retry_after_unauthorized = auth_retry_after_unauthorized,
auth_recovery_mode = auth_recovery_mode,
auth_recovery_phase = auth_recovery_phase,
auth_connection_reused = auth_connection_reused,
auth_request_id = auth_request_id,
auth_cf_ray = auth_cf_ray,
auth_error = auth_error,
auth_error_code = auth_error_code,
auth_recovery_followup_success = auth_recovery_followup_success,
auth_recovery_followup_status = auth_recovery_followup_status
);
}
pub(crate) fn emit_feedback_auth_recovery_tags(
auth_recovery_mode: &str,
auth_recovery_phase: &str,
auth_recovery_outcome: &str,
auth_request_id: Option<&str>,
auth_cf_ray: Option<&str>,
auth_error: Option<&str>,
auth_error_code: Option<&str>,
) {
let auth_401 = Auth401FeedbackSnapshot::from_optional_fields(
auth_request_id,
auth_cf_ray,
auth_error,
auth_error_code,
);
feedback_tags!(
auth_recovery_mode = auth_recovery_mode,
auth_recovery_phase = auth_recovery_phase,
auth_recovery_outcome = auth_recovery_outcome,
auth_401_request_id = auth_401.request_id,
auth_401_cf_ray = auth_401.cf_ray,
auth_401_error = auth_401.error,
auth_401_error_code = auth_401.error_code
);
}
pub fn backoff(attempt: u64) -> Duration {
let exp = BACKOFF_FACTOR.powi(attempt.saturating_sub(1) as i32);
let base = (INITIAL_DELAY_MS as f64 * exp) as u64;

View File

@@ -1,4 +1,15 @@
use super::*;
use std::collections::BTreeMap;
use std::sync::Arc;
use std::sync::Mutex;
use tracing::Event;
use tracing::Subscriber;
use tracing::field::Visit;
use tracing_subscriber::Layer;
use tracing_subscriber::layer::Context;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::registry::LookupSpan;
use tracing_subscriber::util::SubscriberInitExt;
#[test]
fn test_try_parse_error_message() {
@@ -32,6 +43,298 @@ fn feedback_tags_macro_compiles() {
feedback_tags!(model = "gpt-5", cached = true, debug_only = OnlyDebug);
}
#[derive(Default)]
struct TagCollectorVisitor {
tags: BTreeMap<String, String>,
}
impl Visit for TagCollectorVisitor {
fn record_bool(&mut self, field: &tracing::field::Field, value: bool) {
self.tags
.insert(field.name().to_string(), value.to_string());
}
fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
self.tags
.insert(field.name().to_string(), value.to_string());
}
fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
self.tags
.insert(field.name().to_string(), format!("{value:?}"));
}
}
#[derive(Clone)]
struct TagCollectorLayer {
tags: Arc<Mutex<BTreeMap<String, String>>>,
}
impl<S> Layer<S> for TagCollectorLayer
where
S: Subscriber + for<'a> LookupSpan<'a>,
{
fn on_event(&self, event: &Event<'_>, _ctx: Context<'_, S>) {
if event.metadata().target() != "feedback_tags" {
return;
}
let mut visitor = TagCollectorVisitor::default();
event.record(&mut visitor);
self.tags.lock().unwrap().extend(visitor.tags);
}
}
#[test]
fn emit_feedback_request_tags_records_sentry_feedback_fields() {
let tags = Arc::new(Mutex::new(BTreeMap::new()));
let _guard = tracing_subscriber::registry()
.with(TagCollectorLayer { tags: tags.clone() })
.set_default();
emit_feedback_request_tags(&FeedbackRequestTags {
endpoint: "/responses",
auth_header_attached: true,
auth_header_name: Some("authorization"),
auth_mode: Some("chatgpt"),
auth_retry_after_unauthorized: Some(false),
auth_recovery_mode: Some("managed"),
auth_recovery_phase: Some("refresh_token"),
auth_connection_reused: Some(true),
auth_request_id: Some("req-123"),
auth_cf_ray: Some("ray-123"),
auth_error: Some("missing_authorization_header"),
auth_error_code: Some("token_expired"),
auth_recovery_followup_success: Some(true),
auth_recovery_followup_status: Some(200),
});
let tags = tags.lock().unwrap().clone();
assert_eq!(
tags.get("endpoint").map(String::as_str),
Some("\"/responses\"")
);
assert_eq!(
tags.get("auth_header_attached").map(String::as_str),
Some("true")
);
assert_eq!(
tags.get("auth_header_name").map(String::as_str),
Some("\"authorization\"")
);
assert_eq!(
tags.get("auth_request_id").map(String::as_str),
Some("\"req-123\"")
);
assert_eq!(
tags.get("auth_error_code").map(String::as_str),
Some("\"token_expired\"")
);
assert_eq!(
tags.get("auth_recovery_followup_success")
.map(String::as_str),
Some("\"true\"")
);
assert_eq!(
tags.get("auth_recovery_followup_status")
.map(String::as_str),
Some("\"200\"")
);
}
#[test]
fn emit_feedback_auth_recovery_tags_preserves_401_specific_fields() {
let tags = Arc::new(Mutex::new(BTreeMap::new()));
let _guard = tracing_subscriber::registry()
.with(TagCollectorLayer { tags: tags.clone() })
.set_default();
emit_feedback_auth_recovery_tags(
"managed",
"refresh_token",
"recovery_succeeded",
Some("req-401"),
Some("ray-401"),
Some("missing_authorization_header"),
Some("token_expired"),
);
let tags = tags.lock().unwrap().clone();
assert_eq!(
tags.get("auth_401_request_id").map(String::as_str),
Some("\"req-401\"")
);
assert_eq!(
tags.get("auth_401_cf_ray").map(String::as_str),
Some("\"ray-401\"")
);
assert_eq!(
tags.get("auth_401_error").map(String::as_str),
Some("\"missing_authorization_header\"")
);
assert_eq!(
tags.get("auth_401_error_code").map(String::as_str),
Some("\"token_expired\"")
);
}
#[test]
fn emit_feedback_auth_recovery_tags_clears_stale_401_fields() {
let tags = Arc::new(Mutex::new(BTreeMap::new()));
let _guard = tracing_subscriber::registry()
.with(TagCollectorLayer { tags: tags.clone() })
.set_default();
emit_feedback_auth_recovery_tags(
"managed",
"refresh_token",
"recovery_failed_transient",
Some("req-401-a"),
Some("ray-401-a"),
Some("missing_authorization_header"),
Some("token_expired"),
);
emit_feedback_auth_recovery_tags(
"managed",
"done",
"recovery_not_run",
Some("req-401-b"),
None,
None,
None,
);
let tags = tags.lock().unwrap().clone();
assert_eq!(
tags.get("auth_401_request_id").map(String::as_str),
Some("\"req-401-b\"")
);
assert_eq!(
tags.get("auth_401_cf_ray").map(String::as_str),
Some("\"\"")
);
assert_eq!(tags.get("auth_401_error").map(String::as_str), Some("\"\""));
assert_eq!(
tags.get("auth_401_error_code").map(String::as_str),
Some("\"\"")
);
}
#[test]
fn emit_feedback_request_tags_preserves_latest_auth_fields_after_unauthorized() {
let tags = Arc::new(Mutex::new(BTreeMap::new()));
let _guard = tracing_subscriber::registry()
.with(TagCollectorLayer { tags: tags.clone() })
.set_default();
emit_feedback_request_tags(&FeedbackRequestTags {
endpoint: "/responses",
auth_header_attached: true,
auth_header_name: Some("authorization"),
auth_mode: Some("chatgpt"),
auth_retry_after_unauthorized: Some(true),
auth_recovery_mode: Some("managed"),
auth_recovery_phase: Some("refresh_token"),
auth_connection_reused: None,
auth_request_id: Some("req-123"),
auth_cf_ray: Some("ray-123"),
auth_error: Some("missing_authorization_header"),
auth_error_code: Some("token_expired"),
auth_recovery_followup_success: Some(false),
auth_recovery_followup_status: Some(401),
});
let tags = tags.lock().unwrap().clone();
assert_eq!(
tags.get("auth_request_id").map(String::as_str),
Some("\"req-123\"")
);
assert_eq!(
tags.get("auth_cf_ray").map(String::as_str),
Some("\"ray-123\"")
);
assert_eq!(
tags.get("auth_error").map(String::as_str),
Some("\"missing_authorization_header\"")
);
assert_eq!(
tags.get("auth_error_code").map(String::as_str),
Some("\"token_expired\"")
);
assert_eq!(
tags.get("auth_recovery_followup_success")
.map(String::as_str),
Some("\"false\"")
);
}
#[test]
fn emit_feedback_request_tags_clears_stale_latest_auth_fields() {
let tags = Arc::new(Mutex::new(BTreeMap::new()));
let _guard = tracing_subscriber::registry()
.with(TagCollectorLayer { tags: tags.clone() })
.set_default();
emit_feedback_request_tags(&FeedbackRequestTags {
endpoint: "/responses",
auth_header_attached: true,
auth_header_name: Some("authorization"),
auth_mode: Some("chatgpt"),
auth_retry_after_unauthorized: Some(false),
auth_recovery_mode: Some("managed"),
auth_recovery_phase: Some("refresh_token"),
auth_connection_reused: Some(true),
auth_request_id: Some("req-123"),
auth_cf_ray: Some("ray-123"),
auth_error: Some("missing_authorization_header"),
auth_error_code: Some("token_expired"),
auth_recovery_followup_success: Some(true),
auth_recovery_followup_status: Some(200),
});
emit_feedback_request_tags(&FeedbackRequestTags {
endpoint: "/responses",
auth_header_attached: true,
auth_header_name: None,
auth_mode: None,
auth_retry_after_unauthorized: None,
auth_recovery_mode: None,
auth_recovery_phase: None,
auth_connection_reused: None,
auth_request_id: None,
auth_cf_ray: None,
auth_error: None,
auth_error_code: None,
auth_recovery_followup_success: None,
auth_recovery_followup_status: None,
});
let tags = tags.lock().unwrap().clone();
assert_eq!(
tags.get("auth_header_name").map(String::as_str),
Some("\"\"")
);
assert_eq!(tags.get("auth_mode").map(String::as_str), Some("\"\""));
assert_eq!(
tags.get("auth_request_id").map(String::as_str),
Some("\"\"")
);
assert_eq!(tags.get("auth_cf_ray").map(String::as_str), Some("\"\""));
assert_eq!(tags.get("auth_error").map(String::as_str), Some("\"\""));
assert_eq!(
tags.get("auth_error_code").map(String::as_str),
Some("\"\"")
);
assert_eq!(
tags.get("auth_recovery_followup_success")
.map(String::as_str),
Some("\"\"")
);
assert_eq!(
tags.get("auth_recovery_followup_status")
.map(String::as_str),
Some("\"\"")
);
}
#[test]
fn normalize_thread_name_trims_and_rejects_empty() {
assert_eq!(normalize_thread_name(" "), None);

View File

@@ -2,5 +2,6 @@
Searches over apps/connectors tool metadata with BM25 and exposes matching tools for the next model call.
Tools of the apps ({{app_names}}) are hidden until you search for them with this tool (`tool_search`).
When the request needs one of these connectors and you don't already have the required tools from it, use this tool to load them. For the apps mentioned above, always prefer `tool_search` over `list_mcp_resources` or `list_mcp_resource_templates` for tool discovery.
You have access to all the tools of the following apps/connectors:
{{app_descriptions}}
Some of the tools may not have been provided to you upfront, and you should use this tool (`tool_search`) to search for the required tools and load them for the apps mentioned above. For the apps mentioned above, always use `tool_search` instead of `list_mcp_resources` or `list_mcp_resource_templates` for tool discovery.

View File

@@ -18,6 +18,9 @@ const CONNECTOR_DESCRIPTION: &str = "Plan events and manage your calendar.";
const PROTOCOL_VERSION: &str = "2025-11-25";
const SERVER_NAME: &str = "codex-apps-test";
const SERVER_VERSION: &str = "1.0.0";
pub const CALENDAR_CREATE_EVENT_RESOURCE_URI: &str =
"connector://calendar/tools/calendar_create_event";
const CALENDAR_LIST_EVENTS_RESOURCE_URI: &str = "connector://calendar/tools/calendar_list_events";
#[derive(Clone)]
pub struct AppsTestServer {
@@ -175,7 +178,12 @@ impl Respond for CodexAppsJsonRpcResponder {
"_meta": {
"connector_id": CONNECTOR_ID,
"connector_name": self.connector_name.clone(),
"connector_description": self.connector_description.clone()
"connector_description": self.connector_description.clone(),
"_codex_apps": {
"resource_uri": CALENDAR_CREATE_EVENT_RESOURCE_URI,
"contains_mcp_source": true,
"connector_id": CONNECTOR_ID
}
}
},
{
@@ -192,7 +200,12 @@ impl Respond for CodexAppsJsonRpcResponder {
"_meta": {
"connector_id": CONNECTOR_ID,
"connector_name": self.connector_name.clone(),
"connector_description": self.connector_description.clone()
"connector_description": self.connector_description.clone(),
"_codex_apps": {
"resource_uri": CALENDAR_LIST_EVENTS_RESOURCE_URI,
"contains_mcp_source": true,
"connector_id": CONNECTOR_ID
}
}
}
],
@@ -214,6 +227,7 @@ impl Respond for CodexAppsJsonRpcResponder {
.pointer("/params/arguments/starts_at")
.and_then(Value::as_str)
.unwrap_or_default();
let codex_apps_meta = body.pointer("/params/_meta/_codex_apps").cloned();
ResponseTemplate::new(200).set_body_json(json!({
"jsonrpc": "2.0",
@@ -223,6 +237,9 @@ impl Respond for CodexAppsJsonRpcResponder {
"type": "text",
"text": format!("called {tool_name} for {title} at {starts_at}")
}],
"structuredContent": {
"_codex_apps": codex_apps_meta,
},
"isError": false
}
}))

View File

@@ -943,10 +943,6 @@ async fn includes_apps_guidance_as_developer_message_for_chatgpt_auth() {
.features
.enable(Feature::Apps)
.expect("test config should allow feature update");
config
.features
.disable(Feature::AppsMcpGateway)
.expect("test config should allow feature update");
config.chatgpt_base_url = apps_base_url;
});
let codex = builder
@@ -971,7 +967,8 @@ async fn includes_apps_guidance_as_developer_message_for_chatgpt_auth() {
let request = resp_mock.single_request();
let request_body = request.body_json();
let input = request_body["input"].as_array().expect("input array");
let apps_snippet = "Apps are mentioned in user messages in the format";
let apps_snippet =
"Apps (Connectors) can be explicitly triggered in user messages in the format";
let has_developer_apps_guidance = input.iter().any(|item| {
item.get("role").and_then(|value| value.as_str()) == Some("developer")
@@ -1034,10 +1031,6 @@ async fn omits_apps_guidance_for_api_key_auth_even_when_feature_enabled() {
.features
.enable(Feature::Apps)
.expect("test config should allow feature update");
config
.features
.disable(Feature::AppsMcpGateway)
.expect("test config should allow feature update");
config.chatgpt_base_url = apps_base_url;
});
let codex = builder
@@ -1062,7 +1055,8 @@ async fn omits_apps_guidance_for_api_key_auth_even_when_feature_enabled() {
let request = resp_mock.single_request();
let request_body = request.body_json();
let input = request_body["input"].as_array().expect("input array");
let apps_snippet = "Apps are mentioned in the prompt in the format";
let apps_snippet =
"Apps (Connectors) can be explicitly triggered in user messages in the format";
let has_apps_guidance = input.iter().any(|item| {
item.get("content")

View File

@@ -142,10 +142,6 @@ async fn build_apps_enabled_plugin_test_codex(
.features
.enable(Feature::Apps)
.expect("test config should allow feature update");
config
.features
.disable(Feature::AppsMcpGateway)
.expect("test config should allow feature update");
config.chatgpt_base_url = chatgpt_base_url;
});
Ok(builder

View File

@@ -13,6 +13,7 @@ use codex_protocol::protocol::Op;
use codex_protocol::protocol::SandboxPolicy;
use codex_protocol::user_input::UserInput;
use core_test_support::apps_test_server::AppsTestServer;
use core_test_support::apps_test_server::CALENDAR_CREATE_EVENT_RESOURCE_URI;
use core_test_support::responses::ResponsesRequest;
use core_test_support::responses::ev_assistant_message;
use core_test_support::responses::ev_completed;
@@ -30,8 +31,9 @@ use pretty_assertions::assert_eq;
use serde_json::Value;
use serde_json::json;
const SEARCH_TOOL_DESCRIPTION_SNIPPETS: [&str; 1] = [
"Tools of the apps (Calendar) are hidden until you search for them with this tool (`tool_search`).",
const SEARCH_TOOL_DESCRIPTION_SNIPPETS: [&str; 2] = [
"You have access to all the tools of the following apps/connectors",
"- Calendar: Plan events and manage your calendar.",
];
const TOOL_SEARCH_TOOL_NAME: &str = "tool_search";
const CALENDAR_CREATE_TOOL: &str = "mcp__codex_apps__calendar_create_event";
@@ -89,10 +91,6 @@ fn configure_apps(config: &mut Config, apps_base_url: &str) {
.features
.enable(Feature::Apps)
.expect("test config should allow feature update");
config
.features
.disable(Feature::AppsMcpGateway)
.expect("test config should allow feature update");
config.chatgpt_base_url = apps_base_url.to_string();
config.model = Some("gpt-5-codex".to_string());
@@ -404,6 +402,19 @@ async fn tool_search_returns_deferred_tools_without_follow_up_tool_injection() -
})),
}
);
assert_eq!(
end.result
.as_ref()
.expect("tool call should succeed")
.structured_content,
Some(json!({
"_codex_apps": {
"resource_uri": CALENDAR_CREATE_EVENT_RESOURCE_URI,
"contains_mcp_source": true,
"connector_id": "calendar",
},
}))
);
wait_for_event(&test.codex, |event| {
matches!(event, EventMsg::TurnComplete(_))

View File

@@ -340,17 +340,43 @@ impl SessionTelemetry {
Ok(response) => (Some(response.status().as_u16()), None),
Err(error) => (error.status().map(|s| s.as_u16()), Some(error.to_string())),
};
self.record_api_request(attempt, status, error.as_deref(), duration);
self.record_api_request(
attempt,
status,
error.as_deref(),
duration,
false,
None,
false,
None,
None,
"unknown",
None,
None,
None,
None,
);
response
}
#[allow(clippy::too_many_arguments)]
pub fn record_api_request(
&self,
attempt: u64,
status: Option<u16>,
error: Option<&str>,
duration: Duration,
auth_header_attached: bool,
auth_header_name: Option<&str>,
retry_after_unauthorized: bool,
recovery_mode: Option<&str>,
recovery_phase: Option<&str>,
endpoint: &str,
request_id: Option<&str>,
cf_ray: Option<&str>,
auth_error: Option<&str>,
auth_error_code: Option<&str>,
) {
let success = status.is_some_and(|code| (200..=299).contains(&code)) && error.is_none();
let success_str = if success { "true" } else { "false" };
@@ -375,13 +401,76 @@ impl SessionTelemetry {
http.response.status_code = status,
error.message = error,
attempt = attempt,
auth.header_attached = auth_header_attached,
auth.header_name = auth_header_name,
auth.retry_after_unauthorized = retry_after_unauthorized,
auth.recovery_mode = recovery_mode,
auth.recovery_phase = recovery_phase,
endpoint = endpoint,
auth.request_id = request_id,
auth.cf_ray = cf_ray,
auth.error = auth_error,
auth.error_code = auth_error_code,
},
log: {},
trace: {},
);
}
pub fn record_websocket_request(&self, duration: Duration, error: Option<&str>) {
#[allow(clippy::too_many_arguments)]
pub fn record_websocket_connect(
&self,
duration: Duration,
status: Option<u16>,
error: Option<&str>,
auth_header_attached: bool,
auth_header_name: Option<&str>,
retry_after_unauthorized: bool,
recovery_mode: Option<&str>,
recovery_phase: Option<&str>,
endpoint: &str,
connection_reused: bool,
request_id: Option<&str>,
cf_ray: Option<&str>,
auth_error: Option<&str>,
auth_error_code: Option<&str>,
) {
let success = error.is_none()
&& status
.map(|code| (200..=299).contains(&code))
.unwrap_or(true);
let success_str = if success { "true" } else { "false" };
log_and_trace_event!(
self,
common: {
event.name = "codex.websocket_connect",
duration_ms = %duration.as_millis(),
http.response.status_code = status,
success = success_str,
error.message = error,
auth.header_attached = auth_header_attached,
auth.header_name = auth_header_name,
auth.retry_after_unauthorized = retry_after_unauthorized,
auth.recovery_mode = recovery_mode,
auth.recovery_phase = recovery_phase,
endpoint = endpoint,
auth.connection_reused = connection_reused,
auth.request_id = request_id,
auth.cf_ray = cf_ray,
auth.error = auth_error,
auth.error_code = auth_error_code,
},
log: {},
trace: {},
);
}
pub fn record_websocket_request(
&self,
duration: Duration,
error: Option<&str>,
connection_reused: bool,
) {
let success_str = if error.is_none() { "true" } else { "false" };
self.counter(
WEBSOCKET_REQUEST_COUNT_METRIC,
@@ -400,6 +489,39 @@ impl SessionTelemetry {
duration_ms = %duration.as_millis(),
success = success_str,
error.message = error,
auth.connection_reused = connection_reused,
},
log: {},
trace: {},
);
}
#[allow(clippy::too_many_arguments)]
pub fn record_auth_recovery(
&self,
mode: &str,
step: &str,
outcome: &str,
request_id: Option<&str>,
cf_ray: Option<&str>,
auth_error: Option<&str>,
auth_error_code: Option<&str>,
recovery_reason: Option<&str>,
auth_state_changed: Option<bool>,
) {
log_and_trace_event!(
self,
common: {
event.name = "codex.auth_recovery",
auth.mode = mode,
auth.step = step,
auth.outcome = outcome,
auth.request_id = request_id,
auth.cf_ray = cf_ray,
auth.error = auth_error,
auth.error_code = auth_error_code,
auth.recovery_reason = recovery_reason,
auth.state_changed = auth_state_changed,
},
log: {},
trace: {},

View File

@@ -297,3 +297,462 @@ fn otel_export_routing_policy_routes_tool_result_log_and_trace_events() {
assert!(!tool_trace_attrs.contains_key("mcp_server"));
assert!(!tool_trace_attrs.contains_key("mcp_server_origin"));
}
#[test]
fn otel_export_routing_policy_routes_auth_recovery_log_and_trace_events() {
let log_exporter = InMemoryLogExporter::default();
let logger_provider = SdkLoggerProvider::builder()
.with_simple_exporter(log_exporter.clone())
.build();
let span_exporter = InMemorySpanExporter::default();
let tracer_provider = SdkTracerProvider::builder()
.with_simple_exporter(span_exporter.clone())
.build();
let tracer = tracer_provider.tracer("sink-split-test");
let subscriber = tracing_subscriber::registry()
.with(
opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge::new(
&logger_provider,
)
.with_filter(filter_fn(OtelProvider::log_export_filter)),
)
.with(
tracing_opentelemetry::layer()
.with_tracer(tracer)
.with_filter(filter_fn(OtelProvider::trace_export_filter)),
);
tracing::subscriber::with_default(subscriber, || {
tracing::callsite::rebuild_interest_cache();
let manager = SessionTelemetry::new(
ThreadId::new(),
"gpt-5.1",
"gpt-5.1",
Some("account-id".to_string()),
Some("engineer@example.com".to_string()),
Some(TelemetryAuthMode::Chatgpt),
"codex_exec".to_string(),
true,
"tty".to_string(),
SessionSource::Cli,
);
let root_span = tracing::info_span!("root");
let _root_guard = root_span.enter();
manager.record_auth_recovery(
"managed",
"reload",
"recovery_succeeded",
Some("req-401"),
Some("ray-401"),
Some("missing_authorization_header"),
Some("token_expired"),
None,
Some(true),
);
});
logger_provider.force_flush().expect("flush logs");
tracer_provider.force_flush().expect("flush traces");
let logs = log_exporter.get_emitted_logs().expect("log export");
let recovery_log = find_log_by_event_name(&logs, "codex.auth_recovery");
let recovery_log_attrs = log_attributes(&recovery_log.record);
assert_eq!(
recovery_log_attrs.get("auth.mode").map(String::as_str),
Some("managed")
);
assert_eq!(
recovery_log_attrs.get("auth.step").map(String::as_str),
Some("reload")
);
assert_eq!(
recovery_log_attrs.get("auth.outcome").map(String::as_str),
Some("recovery_succeeded")
);
assert_eq!(
recovery_log_attrs
.get("auth.request_id")
.map(String::as_str),
Some("req-401")
);
assert_eq!(
recovery_log_attrs.get("auth.cf_ray").map(String::as_str),
Some("ray-401")
);
assert_eq!(
recovery_log_attrs.get("auth.error").map(String::as_str),
Some("missing_authorization_header")
);
assert_eq!(
recovery_log_attrs
.get("auth.error_code")
.map(String::as_str),
Some("token_expired")
);
assert_eq!(
recovery_log_attrs
.get("auth.state_changed")
.map(String::as_str),
Some("true")
);
let spans = span_exporter.get_finished_spans().expect("span export");
assert_eq!(spans.len(), 1);
let span_events = &spans[0].events.events;
assert_eq!(span_events.len(), 1);
let recovery_trace_event = find_span_event_by_name_attr(span_events, "codex.auth_recovery");
let recovery_trace_attrs = span_event_attributes(recovery_trace_event);
assert_eq!(
recovery_trace_attrs.get("auth.mode").map(String::as_str),
Some("managed")
);
assert_eq!(
recovery_trace_attrs.get("auth.step").map(String::as_str),
Some("reload")
);
assert_eq!(
recovery_trace_attrs.get("auth.outcome").map(String::as_str),
Some("recovery_succeeded")
);
assert_eq!(
recovery_trace_attrs
.get("auth.request_id")
.map(String::as_str),
Some("req-401")
);
assert_eq!(
recovery_trace_attrs.get("auth.cf_ray").map(String::as_str),
Some("ray-401")
);
assert_eq!(
recovery_trace_attrs.get("auth.error").map(String::as_str),
Some("missing_authorization_header")
);
assert_eq!(
recovery_trace_attrs
.get("auth.error_code")
.map(String::as_str),
Some("token_expired")
);
assert_eq!(
recovery_trace_attrs
.get("auth.state_changed")
.map(String::as_str),
Some("true")
);
}
#[test]
fn otel_export_routing_policy_routes_api_request_auth_observability() {
let log_exporter = InMemoryLogExporter::default();
let logger_provider = SdkLoggerProvider::builder()
.with_simple_exporter(log_exporter.clone())
.build();
let span_exporter = InMemorySpanExporter::default();
let tracer_provider = SdkTracerProvider::builder()
.with_simple_exporter(span_exporter.clone())
.build();
let tracer = tracer_provider.tracer("sink-split-test");
let subscriber = tracing_subscriber::registry()
.with(
opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge::new(
&logger_provider,
)
.with_filter(filter_fn(OtelProvider::log_export_filter)),
)
.with(
tracing_opentelemetry::layer()
.with_tracer(tracer)
.with_filter(filter_fn(OtelProvider::trace_export_filter)),
);
tracing::subscriber::with_default(subscriber, || {
tracing::callsite::rebuild_interest_cache();
let manager = SessionTelemetry::new(
ThreadId::new(),
"gpt-5.1",
"gpt-5.1",
Some("account-id".to_string()),
Some("engineer@example.com".to_string()),
Some(TelemetryAuthMode::Chatgpt),
"codex_exec".to_string(),
true,
"tty".to_string(),
SessionSource::Cli,
);
let root_span = tracing::info_span!("root");
let _root_guard = root_span.enter();
manager.record_api_request(
1,
Some(401),
Some("http 401"),
std::time::Duration::from_millis(42),
true,
Some("authorization"),
true,
Some("managed"),
Some("refresh_token"),
"/responses",
Some("req-401"),
Some("ray-401"),
Some("missing_authorization_header"),
Some("token_expired"),
);
});
logger_provider.force_flush().expect("flush logs");
tracer_provider.force_flush().expect("flush traces");
let logs = log_exporter.get_emitted_logs().expect("log export");
let request_log = find_log_by_event_name(&logs, "codex.api_request");
let request_log_attrs = log_attributes(&request_log.record);
assert_eq!(
request_log_attrs
.get("auth.header_attached")
.map(String::as_str),
Some("true")
);
assert_eq!(
request_log_attrs
.get("auth.header_name")
.map(String::as_str),
Some("authorization")
);
assert_eq!(
request_log_attrs
.get("auth.retry_after_unauthorized")
.map(String::as_str),
Some("true")
);
assert_eq!(
request_log_attrs
.get("auth.recovery_mode")
.map(String::as_str),
Some("managed")
);
assert_eq!(
request_log_attrs
.get("auth.recovery_phase")
.map(String::as_str),
Some("refresh_token")
);
assert_eq!(
request_log_attrs.get("endpoint").map(String::as_str),
Some("/responses")
);
assert_eq!(
request_log_attrs.get("auth.error").map(String::as_str),
Some("missing_authorization_header")
);
let spans = span_exporter.get_finished_spans().expect("span export");
let request_trace_event =
find_span_event_by_name_attr(&spans[0].events.events, "codex.api_request");
let request_trace_attrs = span_event_attributes(request_trace_event);
assert_eq!(
request_trace_attrs
.get("auth.header_attached")
.map(String::as_str),
Some("true")
);
assert_eq!(
request_trace_attrs
.get("auth.header_name")
.map(String::as_str),
Some("authorization")
);
assert_eq!(
request_trace_attrs
.get("auth.retry_after_unauthorized")
.map(String::as_str),
Some("true")
);
assert_eq!(
request_trace_attrs.get("endpoint").map(String::as_str),
Some("/responses")
);
}
#[test]
fn otel_export_routing_policy_routes_websocket_connect_auth_observability() {
let log_exporter = InMemoryLogExporter::default();
let logger_provider = SdkLoggerProvider::builder()
.with_simple_exporter(log_exporter.clone())
.build();
let span_exporter = InMemorySpanExporter::default();
let tracer_provider = SdkTracerProvider::builder()
.with_simple_exporter(span_exporter.clone())
.build();
let tracer = tracer_provider.tracer("sink-split-test");
let subscriber = tracing_subscriber::registry()
.with(
opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge::new(
&logger_provider,
)
.with_filter(filter_fn(OtelProvider::log_export_filter)),
)
.with(
tracing_opentelemetry::layer()
.with_tracer(tracer)
.with_filter(filter_fn(OtelProvider::trace_export_filter)),
);
tracing::subscriber::with_default(subscriber, || {
tracing::callsite::rebuild_interest_cache();
let manager = SessionTelemetry::new(
ThreadId::new(),
"gpt-5.1",
"gpt-5.1",
Some("account-id".to_string()),
Some("engineer@example.com".to_string()),
Some(TelemetryAuthMode::Chatgpt),
"codex_exec".to_string(),
true,
"tty".to_string(),
SessionSource::Cli,
);
let root_span = tracing::info_span!("root");
let _root_guard = root_span.enter();
manager.record_websocket_connect(
std::time::Duration::from_millis(17),
Some(401),
Some("http 401"),
true,
Some("authorization"),
true,
Some("managed"),
Some("reload"),
"/responses",
false,
Some("req-ws-401"),
Some("ray-ws-401"),
Some("missing_authorization_header"),
Some("token_expired"),
);
});
logger_provider.force_flush().expect("flush logs");
tracer_provider.force_flush().expect("flush traces");
let logs = log_exporter.get_emitted_logs().expect("log export");
let connect_log = find_log_by_event_name(&logs, "codex.websocket_connect");
let connect_log_attrs = log_attributes(&connect_log.record);
assert_eq!(
connect_log_attrs
.get("auth.header_attached")
.map(String::as_str),
Some("true")
);
assert_eq!(
connect_log_attrs
.get("auth.header_name")
.map(String::as_str),
Some("authorization")
);
assert_eq!(
connect_log_attrs.get("auth.error").map(String::as_str),
Some("missing_authorization_header")
);
assert_eq!(
connect_log_attrs.get("endpoint").map(String::as_str),
Some("/responses")
);
assert_eq!(
connect_log_attrs
.get("auth.connection_reused")
.map(String::as_str),
Some("false")
);
let spans = span_exporter.get_finished_spans().expect("span export");
let connect_trace_event =
find_span_event_by_name_attr(&spans[0].events.events, "codex.websocket_connect");
let connect_trace_attrs = span_event_attributes(connect_trace_event);
assert_eq!(
connect_trace_attrs
.get("auth.recovery_phase")
.map(String::as_str),
Some("reload")
);
}
#[test]
fn otel_export_routing_policy_routes_websocket_request_transport_observability() {
let log_exporter = InMemoryLogExporter::default();
let logger_provider = SdkLoggerProvider::builder()
.with_simple_exporter(log_exporter.clone())
.build();
let span_exporter = InMemorySpanExporter::default();
let tracer_provider = SdkTracerProvider::builder()
.with_simple_exporter(span_exporter.clone())
.build();
let tracer = tracer_provider.tracer("sink-split-test");
let subscriber = tracing_subscriber::registry()
.with(
opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge::new(
&logger_provider,
)
.with_filter(filter_fn(OtelProvider::log_export_filter)),
)
.with(
tracing_opentelemetry::layer()
.with_tracer(tracer)
.with_filter(filter_fn(OtelProvider::trace_export_filter)),
);
tracing::subscriber::with_default(subscriber, || {
tracing::callsite::rebuild_interest_cache();
let manager = SessionTelemetry::new(
ThreadId::new(),
"gpt-5.1",
"gpt-5.1",
Some("account-id".to_string()),
Some("engineer@example.com".to_string()),
Some(TelemetryAuthMode::Chatgpt),
"codex_exec".to_string(),
true,
"tty".to_string(),
SessionSource::Cli,
);
let root_span = tracing::info_span!("root");
let _root_guard = root_span.enter();
manager.record_websocket_request(
std::time::Duration::from_millis(23),
Some("stream error"),
true,
);
});
logger_provider.force_flush().expect("flush logs");
tracer_provider.force_flush().expect("flush traces");
let logs = log_exporter.get_emitted_logs().expect("log export");
let request_log = find_log_by_event_name(&logs, "codex.websocket_request");
let request_log_attrs = log_attributes(&request_log.record);
assert_eq!(
request_log_attrs
.get("auth.connection_reused")
.map(String::as_str),
Some("true")
);
assert_eq!(
request_log_attrs.get("error.message").map(String::as_str),
Some("stream error")
);
let spans = span_exporter.get_finished_spans().expect("span export");
let request_trace_event =
find_span_event_by_name_attr(&spans[0].events.events, "codex.websocket_request");
let request_trace_attrs = span_event_attributes(request_trace_event);
assert_eq!(
request_trace_attrs
.get("auth.connection_reused")
.map(String::as_str),
Some("true")
);
}

View File

@@ -47,8 +47,23 @@ fn runtime_metrics_summary_collects_tool_api_and_streaming_metrics() -> Result<(
None,
None,
);
manager.record_api_request(1, Some(200), None, Duration::from_millis(300));
manager.record_websocket_request(Duration::from_millis(400), None);
manager.record_api_request(
1,
Some(200),
None,
Duration::from_millis(300),
false,
None,
false,
None,
None,
"/responses",
None,
None,
None,
None,
);
manager.record_websocket_request(Duration::from_millis(400), None, false);
let sse_response: std::result::Result<
Option<std::result::Result<StreamEvent, eventsource_stream::EventStreamError<&str>>>,
tokio::time::error::Elapsed,

View File

@@ -700,6 +700,7 @@ impl RmcpClient {
&self,
name: String,
arguments: Option<serde_json::Value>,
meta: Option<serde_json::Value>,
timeout: Option<Duration>,
) -> Result<CallToolResult> {
self.refresh_oauth_if_needed().await;
@@ -712,8 +713,17 @@ impl RmcpClient {
}
None => None,
};
let meta = match meta {
Some(Value::Object(map)) => Some(rmcp::model::Meta(map)),
Some(other) => {
return Err(anyhow!(
"MCP tool request _meta must be a JSON object, got {other}"
));
}
None => None,
};
let rmcp_params = CallToolRequestParams {
meta: None,
meta,
name: name.into(),
arguments,
task: None,

View File

@@ -105,6 +105,7 @@ async fn call_echo_tool(client: &RmcpClient, message: &str) -> anyhow::Result<Ca
.call_tool(
"echo".to_string(),
Some(json!({ "message": message })),
None,
Some(Duration::from_secs(5)),
)
.await