Compare commits

...

6 Commits

Author SHA1 Message Date
sdcoffey
65640e2918 ci: avoid publishing musl runtime wheels to PyPI 2026-03-15 09:26:18 -07:00
sdcoffey
fdb0f519a8 sdk/python: always regenerate types before staging 2026-03-15 09:26:18 -07:00
sdcoffey
e280d64d28 ci: publish python sdk releases to PyPI 2026-03-15 09:26:18 -07:00
sdcoffey
8975fe66ee sdk/python: split core and bundled packages 2026-03-15 09:26:17 -07:00
Matthew Zeng
49edf311ac [apps] Add tool call meta. (#14647)
- [x] Add resource_uri and other things to _meta to shortcut resource
lookup and speed things up.
2026-03-14 22:24:13 -07:00
Colin Young
d692b74007 Add auth 401 observability to client bug reports (#14611)
CXC-392

  [With
  401](https://openai.sentry.io/issues/7333870443/?project=4510195390611458&query=019ce8f8-560c-7f10-a00a-c59553740674&referrer=issue-stream)
  <img width="1909" height="555" alt="401 auth tags in Sentry"
  src="https://github.com/user-attachments/assets/412ea950-61c4-4780-9697-15c270971ee3"
  />


  - auth_401_*: preserved facts from the latest unauthorized response snapshot
  - auth_*: latest auth-related facts from the latest request attempt
  - auth_recovery_*: unauthorized recovery state and follow-up result


  Without 401
  <img width="1917" height="522" alt="happy-path auth tags in Sentry"
  src="https://github.com/user-attachments/assets/3381ed28-8022-43b0-b6c0-623a630e679f"
  />

  ###### Summary
  - Add client-visible 401 diagnostics for auth attachment, upstream auth classification, and 401 request id / cf-ray correlation.
  - Record unauthorized recovery mode, phase, outcome, and retry/follow-up status without changing auth behavior.
  - Surface the highest-signal auth and recovery fields on uploaded client bug reports so they are usable in Sentry.
  - Preserve original unauthorized evidence under `auth_401_*` while keeping follow-up result tags separate.

  ###### Rationale (from spec findings)
  - The dominant bucket needed proof of whether the client attached auth before send or upstream still classified the request as missing auth.
  - Client uploads needed to show whether unauthorized recovery ran and what the client tried next.
  - Request id and cf-ray needed to be preserved on the unauthorized response so server-side correlation is immediate.
  - The bug-report path needed the same auth evidence as the request telemetry path, otherwise the observability would not be operationally useful.

  ###### Scope
  - Add auth 401 and unauthorized-recovery observability in `codex-rs/core`, `codex-rs/codex-api`, and `codex-rs/otel`, including feedback-tag surfacing.
  - Keep auth semantics, refresh behavior, retry behavior, endpoint classification, and geo-denial follow-up work out of this PR.

  ###### Trade-offs
  - This exports only safe auth evidence: header presence/name, upstream auth classification, request ids, and recovery state. It does not export token values or raw upstream bodies.
  - This keeps websocket connection reuse as a transport clue because it can help distinguish stale reused sessions from fresh reconnects.
  - Misroute/base-url classification and geo-denial are intentionally deferred to a separate follow-up PR so this review stays focused on the dominant auth 401 bucket.

  ###### Client follow-up
  - PR 2 will add misroute/provider and geo-denial observability plus the matching feedback-tag surfacing.
  - A separate host/app-server PR should log auth-decision inputs so pre-send host auth state can be correlated with client request evidence.
  - `device_id` remains intentionally separate until there is a safe existing source on the feedback upload path.

  ###### Testing
  - `cargo test -p codex-core refresh_available_models_sorts_by_priority`
  - `cargo test -p codex-core emit_feedback_request_tags_`
  - `cargo test -p codex-core emit_feedback_auth_recovery_tags_`
  - `cargo test -p codex-core auth_request_telemetry_context_tracks_attached_auth_and_retry_phase`
  - `cargo test -p codex-core extract_response_debug_context_decodes_identity_headers`
  - `cargo test -p codex-core identity_auth_details`
  - `cargo test -p codex-core telemetry_error_messages_preserve_non_http_details`
  - `cargo test -p codex-core --all-features --no-run`
  - `cargo test -p codex-otel otel_export_routing_policy_routes_api_request_auth_observability`
  - `cargo test -p codex-otel otel_export_routing_policy_routes_websocket_connect_auth_observability`
  - `cargo test -p codex-otel otel_export_routing_policy_routes_websocket_request_transport_observability`
2026-03-14 15:38:51 -07:00
45 changed files with 2744 additions and 258 deletions

View File

@@ -181,6 +181,41 @@ jobs:
account-name: ${{ secrets.AZURE_TRUSTED_SIGNING_ACCOUNT_NAME }}
certificate-profile-name: ${{ secrets.AZURE_TRUSTED_SIGNING_CERTIFICATE_PROFILE_NAME }}
- name: Setup Python for runtime packaging
uses: actions/setup-python@v6
with:
python-version: "3.13"
- name: Build Python runtime wheel
shell: bash
working-directory: ${{ github.workspace }}
env:
RELEASE_TAG: ${{ github.ref_name }}
TARGET: ${{ matrix.target }}
run: |
set -euo pipefail
version="${RELEASE_TAG#rust-v}"
staging_dir="${RUNNER_TEMP}/codex-cli-bin-${TARGET}"
out_dir="${GITHUB_WORKSPACE}/dist-python/${TARGET}"
python -m pip install --upgrade pip
python -m pip install build hatchling
python sdk/python/scripts/update_sdk_artifacts.py \
stage-runtime \
"${staging_dir}" \
"codex-rs/target/${TARGET}/release/codex.exe" \
--runtime-version "${version}"
python -m build --wheel --outdir "${out_dir}" "${staging_dir}"
- name: Upload Python runtime wheel
uses: actions/upload-artifact@v7
with:
name: python-runtime-${{ matrix.target }}
path: dist-python/${{ matrix.target }}/*
- name: Stage artifacts
shell: bash
run: |

View File

@@ -302,6 +302,41 @@ jobs:
apple-notarization-key-id: ${{ secrets.APPLE_NOTARIZATION_KEY_ID }}
apple-notarization-issuer-id: ${{ secrets.APPLE_NOTARIZATION_ISSUER_ID }}
- name: Setup Python for runtime packaging
uses: actions/setup-python@v6
with:
python-version: "3.13"
- name: Build Python runtime wheel
shell: bash
working-directory: ${{ github.workspace }}
env:
RELEASE_TAG: ${{ github.ref_name }}
TARGET: ${{ matrix.target }}
run: |
set -euo pipefail
version="${RELEASE_TAG#rust-v}"
staging_dir="${RUNNER_TEMP}/codex-cli-bin-${TARGET}"
out_dir="${GITHUB_WORKSPACE}/dist-python/${TARGET}"
python -m pip install --upgrade pip
python -m pip install build hatchling
python sdk/python/scripts/update_sdk_artifacts.py \
stage-runtime \
"${staging_dir}" \
"codex-rs/target/${TARGET}/release/codex" \
--runtime-version "${version}"
python -m build --wheel --outdir "${out_dir}" "${staging_dir}"
- name: Upload Python runtime wheel
uses: actions/upload-artifact@v7
with:
name: python-runtime-${{ matrix.target }}
path: dist-python/${{ matrix.target }}/*
- name: Stage artifacts
shell: bash
run: |
@@ -384,6 +419,7 @@ jobs:
needs:
- build
- build-windows
- build-python-sdk
- shell-tool-mcp
name: release
runs-on: ubuntu-latest
@@ -533,6 +569,190 @@ jobs:
exit 1
fi
build-python-sdk:
name: build-python-sdk
needs:
- tag-check
runs-on: ubuntu-latest
permissions:
contents: read
steps:
- name: Checkout repository
uses: actions/checkout@v6
- name: Setup Python
uses: actions/setup-python@v6
with:
python-version: "3.13"
- name: Build Python SDK artifacts
shell: bash
env:
RELEASE_TAG: ${{ github.ref_name }}
run: |
set -euo pipefail
version="${RELEASE_TAG#rust-v}"
core_staging_dir="${RUNNER_TEMP}/codex-app-server-sdk-core"
core_out_dir="${GITHUB_WORKSPACE}/dist-python/sdk-core"
bundled_staging_dir="${RUNNER_TEMP}/codex-app-server-sdk"
bundled_out_dir="${GITHUB_WORKSPACE}/dist-python/sdk"
python -m pip install --upgrade pip
python -m pip install \
build \
hatchling \
"datamodel-code-generator==0.31.2" \
"ruff==0.11.13"
python sdk/python/scripts/update_sdk_artifacts.py generate-types
python sdk/python/scripts/update_sdk_artifacts.py \
stage-sdk-core \
"${core_staging_dir}" \
--sdk-version "${version}"
python -m build --outdir "${core_out_dir}" "${core_staging_dir}"
python sdk/python/scripts/update_sdk_artifacts.py \
stage-sdk \
"${bundled_staging_dir}" \
--sdk-version "${version}" \
--runtime-version "${version}"
python -m build --outdir "${bundled_out_dir}" "${bundled_staging_dir}"
- name: Upload Python core SDK artifacts
uses: actions/upload-artifact@v7
with:
name: python-sdk-core
path: dist-python/sdk-core/*
- name: Upload Python SDK artifacts
uses: actions/upload-artifact@v7
with:
name: python-sdk
path: dist-python/sdk/*
publish-pypi-runtime:
name: publish-pypi-runtime
needs:
- build
- build-windows
- release
runs-on: ubuntu-latest
permissions:
id-token: write
contents: read
environment:
name: pypi
steps:
# Do not publish musl runtime wheels to PyPI yet. GNU and musl builds for
# the same architecture currently infer the same wheel tag, which can make
# the published Linux runtime nondeterministic.
- name: Download macOS arm64 runtime wheel
uses: actions/download-artifact@v8
with:
name: python-runtime-aarch64-apple-darwin
path: dist-pypi/runtime
- name: Download macOS x64 runtime wheel
uses: actions/download-artifact@v8
with:
name: python-runtime-x86_64-apple-darwin
path: dist-pypi/runtime
- name: Download Linux GNU arm64 runtime wheel
uses: actions/download-artifact@v8
with:
name: python-runtime-aarch64-unknown-linux-gnu
path: dist-pypi/runtime
- name: Download Linux GNU x64 runtime wheel
uses: actions/download-artifact@v8
with:
name: python-runtime-x86_64-unknown-linux-gnu
path: dist-pypi/runtime
- name: Download Windows arm64 runtime wheel
uses: actions/download-artifact@v8
with:
name: python-runtime-aarch64-pc-windows-msvc
path: dist-pypi/runtime
- name: Download Windows x64 runtime wheel
uses: actions/download-artifact@v8
with:
name: python-runtime-x86_64-pc-windows-msvc
path: dist-pypi/runtime
- name: List runtime wheels
shell: bash
run: ls -R dist-pypi/runtime
- name: Publish Python runtime wheels to PyPI
uses: pypa/gh-action-pypi-publish@release/v1
with:
packages-dir: dist-pypi/runtime/
publish-pypi-sdk-core:
name: publish-pypi-sdk-core
needs:
- build-python-sdk
- publish-pypi-runtime
runs-on: ubuntu-latest
permissions:
id-token: write
contents: read
environment:
name: pypi
steps:
- name: Download Python core SDK artifacts
uses: actions/download-artifact@v8
with:
name: python-sdk-core
path: dist-pypi/sdk-core
- name: List core SDK artifacts
shell: bash
run: ls -R dist-pypi/sdk-core
- name: Publish Python core SDK to PyPI
uses: pypa/gh-action-pypi-publish@release/v1
with:
packages-dir: dist-pypi/sdk-core/
publish-pypi-sdk:
name: publish-pypi-sdk
needs:
- build-python-sdk
- publish-pypi-sdk-core
runs-on: ubuntu-latest
permissions:
id-token: write
contents: read
environment:
name: pypi
steps:
- name: Download bundled Python SDK artifacts
uses: actions/download-artifact@v8
with:
name: python-sdk
path: dist-pypi/sdk
- name: List bundled SDK artifacts
shell: bash
run: ls -R dist-pypi/sdk
- name: Publish bundled Python SDK to PyPI
uses: pypa/gh-action-pypi-publish@release/v1
with:
packages-dir: dist-pypi/sdk/
# Publish to npm using OIDC authentication.
# July 31, 2025: https://github.blog/changelog/2025-07-31-npm-trusted-publishing-with-oidc-is-generally-available/
# npm docs: https://docs.npmjs.com/trusted-publishers

View File

@@ -399,7 +399,7 @@ impl CloudRequirementsService {
"Cloud requirements request was unauthorized; attempting auth recovery"
);
match auth_recovery.next().await {
Ok(()) => {
Ok(_) => {
let Some(refreshed_auth) = self.auth_manager.auth().await else {
tracing::error!(
"Auth recovery succeeded but no auth is available for cloud requirements"

View File

@@ -214,6 +214,7 @@ impl ResponsesWebsocketConnection {
pub async fn stream_request(
&self,
request: ResponsesWsRequest,
connection_reused: bool,
) -> Result<ResponseStream, ApiError> {
let (tx_event, rx_event) =
mpsc::channel::<std::result::Result<ResponseEvent, ApiError>>(1600);
@@ -258,6 +259,7 @@ impl ResponsesWebsocketConnection {
request_body,
idle_timeout,
telemetry,
connection_reused,
)
.await
};
@@ -534,6 +536,7 @@ async fn run_websocket_response_stream(
request_body: Value,
idle_timeout: Duration,
telemetry: Option<Arc<dyn WebsocketTelemetry>>,
connection_reused: bool,
) -> Result<(), ApiError> {
let mut last_server_model: Option<String> = None;
let request_text = match serde_json::to_string(&request_body) {
@@ -553,7 +556,11 @@ async fn run_websocket_response_stream(
.map_err(|err| ApiError::Stream(format!("failed to send websocket request: {err}")));
if let Some(t) = telemetry.as_ref() {
t.on_ws_request(request_start.elapsed(), result.as_ref().err());
t.on_ws_request(
request_start.elapsed(),
result.as_ref().err(),
connection_reused,
);
}
result?;

View File

@@ -33,7 +33,7 @@ pub trait SseTelemetry: Send + Sync {
/// Telemetry for Responses WebSocket transport.
pub trait WebsocketTelemetry: Send + Sync {
fn on_ws_request(&self, duration: Duration, error: Option<&ApiError>);
fn on_ws_request(&self, duration: Duration, error: Option<&ApiError>, connection_reused: bool);
fn on_ws_event(
&self,

View File

@@ -338,9 +338,6 @@
"apps": {
"type": "boolean"
},
"apps_mcp_gateway": {
"type": "boolean"
},
"artifact": {
"type": "boolean"
},
@@ -1890,9 +1887,6 @@
"apps": {
"type": "boolean"
},
"apps_mcp_gateway": {
"type": "boolean"
},
"artifact": {
"type": "boolean"
},

View File

@@ -1,3 +1,4 @@
use base64::Engine;
use chrono::DateTime;
use chrono::Utc;
use codex_api::AuthProvider as ApiAuthProvider;
@@ -7,6 +8,7 @@ use codex_api::rate_limits::parse_promo_message;
use codex_api::rate_limits::parse_rate_limit_for_limit;
use http::HeaderMap;
use serde::Deserialize;
use serde_json::Value;
use crate::auth::CodexAuth;
use crate::error::CodexErr;
@@ -30,6 +32,8 @@ pub(crate) fn map_api_error(err: ApiError) -> CodexErr {
url: None,
cf_ray: None,
request_id: None,
identity_authorization_error: None,
identity_error_code: None,
}),
ApiError::InvalidRequest { message } => CodexErr::InvalidRequest(message),
ApiError::Transport(transport) => match transport {
@@ -98,6 +102,11 @@ pub(crate) fn map_api_error(err: ApiError) -> CodexErr {
url,
cf_ray: extract_header(headers.as_ref(), CF_RAY_HEADER),
request_id: extract_request_id(headers.as_ref()),
identity_authorization_error: extract_header(
headers.as_ref(),
X_OPENAI_AUTHORIZATION_ERROR_HEADER,
),
identity_error_code: extract_x_error_json_code(headers.as_ref()),
})
}
}
@@ -118,6 +127,8 @@ const ACTIVE_LIMIT_HEADER: &str = "x-codex-active-limit";
const REQUEST_ID_HEADER: &str = "x-request-id";
const OAI_REQUEST_ID_HEADER: &str = "x-oai-request-id";
const CF_RAY_HEADER: &str = "cf-ray";
const X_OPENAI_AUTHORIZATION_ERROR_HEADER: &str = "x-openai-authorization-error";
const X_ERROR_JSON_HEADER: &str = "x-error-json";
#[cfg(test)]
#[path = "api_bridge_tests.rs"]
@@ -140,6 +151,19 @@ fn extract_header(headers: Option<&HeaderMap>, name: &str) -> Option<String> {
})
}
fn extract_x_error_json_code(headers: Option<&HeaderMap>) -> Option<String> {
let encoded = extract_header(headers, X_ERROR_JSON_HEADER)?;
let decoded = base64::engine::general_purpose::STANDARD
.decode(encoded)
.ok()?;
let parsed = serde_json::from_slice::<Value>(&decoded).ok()?;
parsed
.get("error")
.and_then(|error| error.get("code"))
.and_then(Value::as_str)
.map(str::to_string)
}
pub(crate) fn auth_provider_from_auth(
auth: Option<CodexAuth>,
provider: &ModelProviderInfo,
@@ -191,6 +215,26 @@ pub(crate) struct CoreAuthProvider {
account_id: Option<String>,
}
impl CoreAuthProvider {
pub(crate) fn auth_header_attached(&self) -> bool {
self.token
.as_ref()
.is_some_and(|token| http::HeaderValue::from_str(&format!("Bearer {token}")).is_ok())
}
pub(crate) fn auth_header_name(&self) -> Option<&'static str> {
self.auth_header_attached().then_some("authorization")
}
#[cfg(test)]
pub(crate) fn for_test(token: Option<&str>, account_id: Option<&str>) -> Self {
Self {
token: token.map(str::to_string),
account_id: account_id.map(str::to_string),
}
}
}
impl ApiAuthProvider for CoreAuthProvider {
fn bearer_token(&self) -> Option<String> {
self.token.clone()

View File

@@ -1,4 +1,5 @@
use super::*;
use base64::Engine;
use pretty_assertions::assert_eq;
#[test]
@@ -94,3 +95,49 @@ fn map_api_error_does_not_fallback_limit_name_to_limit_id() {
None
);
}
#[test]
fn map_api_error_extracts_identity_auth_details_from_headers() {
let mut headers = HeaderMap::new();
headers.insert(REQUEST_ID_HEADER, http::HeaderValue::from_static("req-401"));
headers.insert(CF_RAY_HEADER, http::HeaderValue::from_static("ray-401"));
headers.insert(
X_OPENAI_AUTHORIZATION_ERROR_HEADER,
http::HeaderValue::from_static("missing_authorization_header"),
);
let x_error_json =
base64::engine::general_purpose::STANDARD.encode(r#"{"error":{"code":"token_expired"}}"#);
headers.insert(
X_ERROR_JSON_HEADER,
http::HeaderValue::from_str(&x_error_json).expect("valid x-error-json header"),
);
let err = map_api_error(ApiError::Transport(TransportError::Http {
status: http::StatusCode::UNAUTHORIZED,
url: Some("https://chatgpt.com/backend-api/codex/models".to_string()),
headers: Some(headers),
body: Some(r#"{"detail":"Unauthorized"}"#.to_string()),
}));
let CodexErr::UnexpectedStatus(err) = err else {
panic!("expected CodexErr::UnexpectedStatus, got {err:?}");
};
assert_eq!(err.request_id.as_deref(), Some("req-401"));
assert_eq!(err.cf_ray.as_deref(), Some("ray-401"));
assert_eq!(
err.identity_authorization_error.as_deref(),
Some("missing_authorization_header")
);
assert_eq!(err.identity_error_code.as_deref(), Some("token_expired"));
}
#[test]
fn core_auth_provider_reports_when_auth_header_will_attach() {
let auth = CoreAuthProvider {
token: Some("access-token".to_string()),
account_id: None,
};
assert!(auth.auth_header_attached());
assert_eq!(auth.auth_header_name(), Some("authorization"));
}

View File

@@ -4,7 +4,7 @@ use codex_protocol::protocol::APPS_INSTRUCTIONS_OPEN_TAG;
pub(crate) fn render_apps_section() -> String {
let body = format!(
"## Apps\nApps are mentioned in user messages in the format `[$app-name](app://{{connector_id}})`.\nAn app is equivalent to a set of MCP tools within the `{CODEX_APPS_MCP_SERVER_NAME}` MCP.\nWhen you see an app mention, the app's MCP tools are either available tools in the `{CODEX_APPS_MCP_SERVER_NAME}` MCP server, or the tools do not exist because the user has not installed the app.\nDo not additionally call list_mcp_resources for apps that are already mentioned."
"## Apps (Connectors)\nApps (Connectors) can be explicitly triggered in user messages in the format `[$app-name](app://{{connector_id}})`. Apps can also be implicitly triggered as long as the context suggests usage of available apps, the available apps will be listed by the `tool_search` tool.\nAn app is equivalent to a set of MCP tools within the `{CODEX_APPS_MCP_SERVER_NAME}` MCP.\nAn installed app's MCP tools are either provided to you already, or can be lazy-loaded through the `tool_search` tool.\nDo not additionally call list_mcp_resources or list_mcp_resource_templates for apps."
);
format!("{APPS_INSTRUCTIONS_OPEN_TAG}\n{body}\n{APPS_INSTRUCTIONS_CLOSE_TAG}")
}

View File

@@ -874,6 +874,17 @@ pub struct UnauthorizedRecovery {
mode: UnauthorizedRecoveryMode,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub struct UnauthorizedRecoveryStepResult {
auth_state_changed: Option<bool>,
}
impl UnauthorizedRecoveryStepResult {
pub fn auth_state_changed(&self) -> Option<bool> {
self.auth_state_changed
}
}
impl UnauthorizedRecovery {
fn new(manager: Arc<AuthManager>) -> Self {
let cached_auth = manager.auth_cached();
@@ -917,7 +928,46 @@ impl UnauthorizedRecovery {
!matches!(self.step, UnauthorizedRecoveryStep::Done)
}
pub async fn next(&mut self) -> Result<(), RefreshTokenError> {
pub fn unavailable_reason(&self) -> &'static str {
if !self
.manager
.auth_cached()
.as_ref()
.is_some_and(CodexAuth::is_chatgpt_auth)
{
return "not_chatgpt_auth";
}
if self.mode == UnauthorizedRecoveryMode::External
&& !self.manager.has_external_auth_refresher()
{
return "no_external_refresher";
}
if matches!(self.step, UnauthorizedRecoveryStep::Done) {
return "recovery_exhausted";
}
"ready"
}
pub fn mode_name(&self) -> &'static str {
match self.mode {
UnauthorizedRecoveryMode::Managed => "managed",
UnauthorizedRecoveryMode::External => "external",
}
}
pub fn step_name(&self) -> &'static str {
match self.step {
UnauthorizedRecoveryStep::Reload => "reload",
UnauthorizedRecoveryStep::RefreshToken => "refresh_token",
UnauthorizedRecoveryStep::ExternalRefresh => "external_refresh",
UnauthorizedRecoveryStep::Done => "done",
}
}
pub async fn next(&mut self) -> Result<UnauthorizedRecoveryStepResult, RefreshTokenError> {
if !self.has_next() {
return Err(RefreshTokenError::Permanent(RefreshTokenFailedError::new(
RefreshTokenFailedReason::Other,
@@ -931,8 +981,17 @@ impl UnauthorizedRecovery {
.manager
.reload_if_account_id_matches(self.expected_account_id.as_deref())
{
ReloadOutcome::ReloadedChanged | ReloadOutcome::ReloadedNoChange => {
ReloadOutcome::ReloadedChanged => {
self.step = UnauthorizedRecoveryStep::RefreshToken;
return Ok(UnauthorizedRecoveryStepResult {
auth_state_changed: Some(true),
});
}
ReloadOutcome::ReloadedNoChange => {
self.step = UnauthorizedRecoveryStep::RefreshToken;
return Ok(UnauthorizedRecoveryStepResult {
auth_state_changed: Some(false),
});
}
ReloadOutcome::Skipped => {
self.step = UnauthorizedRecoveryStep::Done;
@@ -946,16 +1005,24 @@ impl UnauthorizedRecovery {
UnauthorizedRecoveryStep::RefreshToken => {
self.manager.refresh_token_from_authority().await?;
self.step = UnauthorizedRecoveryStep::Done;
return Ok(UnauthorizedRecoveryStepResult {
auth_state_changed: Some(true),
});
}
UnauthorizedRecoveryStep::ExternalRefresh => {
self.manager
.refresh_external_auth(ExternalAuthRefreshReason::Unauthorized)
.await?;
self.step = UnauthorizedRecoveryStep::Done;
return Ok(UnauthorizedRecoveryStepResult {
auth_state_changed: Some(true),
});
}
UnauthorizedRecoveryStep::Done => {}
}
Ok(())
Ok(UnauthorizedRecoveryStepResult {
auth_state_changed: None,
})
}
}

View File

@@ -13,6 +13,7 @@ use codex_protocol::config_types::ForcedLoginMethod;
use pretty_assertions::assert_eq;
use serde::Serialize;
use serde_json::json;
use std::sync::Arc;
use tempfile::tempdir;
#[tokio::test]
@@ -171,6 +172,33 @@ fn logout_removes_auth_file() -> Result<(), std::io::Error> {
Ok(())
}
#[test]
fn unauthorized_recovery_reports_mode_and_step_names() {
let dir = tempdir().unwrap();
let manager = AuthManager::shared(
dir.path().to_path_buf(),
false,
AuthCredentialsStoreMode::File,
);
let managed = UnauthorizedRecovery {
manager: Arc::clone(&manager),
step: UnauthorizedRecoveryStep::Reload,
expected_account_id: None,
mode: UnauthorizedRecoveryMode::Managed,
};
assert_eq!(managed.mode_name(), "managed");
assert_eq!(managed.step_name(), "reload");
let external = UnauthorizedRecovery {
manager,
step: UnauthorizedRecoveryStep::ExternalRefresh,
expected_account_id: None,
mode: UnauthorizedRecoveryMode::External,
};
assert_eq!(external.mode_name(), "external");
assert_eq!(external.step_name(), "external_refresh");
}
struct AuthFileParams {
openai_api_key: Option<String>,
chatgpt_plan_type: Option<String>,

View File

@@ -75,6 +75,7 @@ use http::HeaderValue;
use http::StatusCode as HttpStatusCode;
use reqwest::StatusCode;
use std::time::Duration;
use std::time::Instant;
use tokio::sync::mpsc;
use tokio::sync::oneshot;
use tokio::sync::oneshot::error::TryRecvError;
@@ -85,6 +86,7 @@ use tracing::trace;
use tracing::warn;
use crate::AuthManager;
use crate::auth::AuthMode;
use crate::auth::CodexAuth;
use crate::auth::RefreshTokenError;
use crate::client_common::Prompt;
@@ -97,7 +99,14 @@ use crate::error::Result;
use crate::flags::CODEX_RS_SSE_FIXTURE;
use crate::model_provider_info::ModelProviderInfo;
use crate::model_provider_info::WireApi;
use crate::response_debug_context::extract_response_debug_context;
use crate::response_debug_context::extract_response_debug_context_from_api_error;
use crate::response_debug_context::telemetry_api_error_message;
use crate::response_debug_context::telemetry_transport_error_message;
use crate::tools::spec::create_tools_json_for_responses_api;
use crate::util::FeedbackRequestTags;
use crate::util::emit_feedback_auth_recovery_tags;
use crate::util::emit_feedback_request_tags;
pub const OPENAI_BETA_HEADER: &str = "OpenAI-Beta";
pub const X_CODEX_TURN_STATE_HEADER: &str = "x-codex-turn-state";
@@ -105,7 +114,9 @@ pub const X_CODEX_TURN_METADATA_HEADER: &str = "x-codex-turn-metadata";
pub const X_RESPONSESAPI_INCLUDE_TIMING_METRICS_HEADER: &str =
"x-responsesapi-include-timing-metrics";
const RESPONSES_WEBSOCKETS_V2_BETA_HEADER_VALUE: &str = "responses_websockets=2026-02-06";
const RESPONSES_ENDPOINT: &str = "/responses";
const RESPONSES_COMPACT_ENDPOINT: &str = "/responses/compact";
const MEMORIES_SUMMARIZE_ENDPOINT: &str = "/memories/trace_summarize";
pub fn ws_version_from_features(config: &Config) -> bool {
config
.features
@@ -144,6 +155,17 @@ struct CurrentClientSetup {
api_auth: CoreAuthProvider,
}
#[derive(Clone, Copy)]
struct RequestRouteTelemetry {
endpoint: &'static str,
}
impl RequestRouteTelemetry {
fn for_endpoint(endpoint: &'static str) -> Self {
Self { endpoint }
}
}
/// A session-scoped client for model-provider API calls.
///
/// This holds configuration and state that should be shared across turns within a Codex session
@@ -201,6 +223,23 @@ struct WebsocketSession {
connection: Option<ApiWebSocketConnection>,
last_request: Option<ResponsesApiRequest>,
last_response_rx: Option<oneshot::Receiver<LastResponse>>,
connection_reused: StdMutex<bool>,
}
impl WebsocketSession {
fn set_connection_reused(&self, connection_reused: bool) {
*self
.connection_reused
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner) = connection_reused;
}
fn connection_reused(&self) -> bool {
*self
.connection_reused
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
}
}
enum WebsocketStreamOutcome {
@@ -291,7 +330,15 @@ impl ModelClient {
}
let client_setup = self.current_client_setup().await?;
let transport = ReqwestTransport::new(build_reqwest_client());
let request_telemetry = Self::build_request_telemetry(session_telemetry);
let request_telemetry = Self::build_request_telemetry(
session_telemetry,
AuthRequestTelemetryContext::new(
client_setup.auth.as_ref().map(CodexAuth::auth_mode),
&client_setup.api_auth,
PendingUnauthorizedRetry::default(),
),
RequestRouteTelemetry::for_endpoint(RESPONSES_COMPACT_ENDPOINT),
);
let client =
ApiCompactClient::new(transport, client_setup.api_provider, client_setup.api_auth)
.with_telemetry(Some(request_telemetry));
@@ -351,7 +398,15 @@ impl ModelClient {
let client_setup = self.current_client_setup().await?;
let transport = ReqwestTransport::new(build_reqwest_client());
let request_telemetry = Self::build_request_telemetry(session_telemetry);
let request_telemetry = Self::build_request_telemetry(
session_telemetry,
AuthRequestTelemetryContext::new(
client_setup.auth.as_ref().map(CodexAuth::auth_mode),
&client_setup.api_auth,
PendingUnauthorizedRetry::default(),
),
RequestRouteTelemetry::for_endpoint(MEMORIES_SUMMARIZE_ENDPOINT),
);
let client =
ApiMemoriesClient::new(transport, client_setup.api_provider, client_setup.api_auth)
.with_telemetry(Some(request_telemetry));
@@ -391,8 +446,16 @@ impl ModelClient {
}
/// Builds request telemetry for unary API calls (e.g., Compact endpoint).
fn build_request_telemetry(session_telemetry: &SessionTelemetry) -> Arc<dyn RequestTelemetry> {
let telemetry = Arc::new(ApiTelemetry::new(session_telemetry.clone()));
fn build_request_telemetry(
session_telemetry: &SessionTelemetry,
auth_context: AuthRequestTelemetryContext,
request_route_telemetry: RequestRouteTelemetry,
) -> Arc<dyn RequestTelemetry> {
let telemetry = Arc::new(ApiTelemetry::new(
session_telemetry.clone(),
auth_context,
request_route_telemetry,
));
let request_telemetry: Arc<dyn RequestTelemetry> = telemetry;
request_telemetry
}
@@ -458,6 +521,7 @@ impl ModelClient {
///
/// Both startup prewarm and in-turn `needs_new` reconnects call this path so handshake
/// behavior remains consistent across both flows.
#[allow(clippy::too_many_arguments)]
async fn connect_websocket(
&self,
session_telemetry: &SessionTelemetry,
@@ -465,17 +529,69 @@ impl ModelClient {
api_auth: CoreAuthProvider,
turn_state: Option<Arc<OnceLock<String>>>,
turn_metadata_header: Option<&str>,
auth_context: AuthRequestTelemetryContext,
request_route_telemetry: RequestRouteTelemetry,
) -> std::result::Result<ApiWebSocketConnection, ApiError> {
let headers = self.build_websocket_headers(turn_state.as_ref(), turn_metadata_header);
let websocket_telemetry = ModelClientSession::build_websocket_telemetry(session_telemetry);
ApiWebSocketResponsesClient::new(api_provider, api_auth)
let websocket_telemetry = ModelClientSession::build_websocket_telemetry(
session_telemetry,
auth_context,
request_route_telemetry,
);
let start = Instant::now();
let result = ApiWebSocketResponsesClient::new(api_provider, api_auth)
.connect(
headers,
crate::default_client::default_headers(),
turn_state,
Some(websocket_telemetry),
)
.await
.await;
let error_message = result.as_ref().err().map(telemetry_api_error_message);
let response_debug = result
.as_ref()
.err()
.map(extract_response_debug_context_from_api_error)
.unwrap_or_default();
let status = result.as_ref().err().and_then(api_error_http_status);
session_telemetry.record_websocket_connect(
start.elapsed(),
status,
error_message.as_deref(),
auth_context.auth_header_attached,
auth_context.auth_header_name,
auth_context.retry_after_unauthorized,
auth_context.recovery_mode,
auth_context.recovery_phase,
request_route_telemetry.endpoint,
false,
response_debug.request_id.as_deref(),
response_debug.cf_ray.as_deref(),
response_debug.auth_error.as_deref(),
response_debug.auth_error_code.as_deref(),
);
emit_feedback_request_tags(&FeedbackRequestTags {
endpoint: request_route_telemetry.endpoint,
auth_header_attached: auth_context.auth_header_attached,
auth_header_name: auth_context.auth_header_name,
auth_mode: auth_context.auth_mode,
auth_retry_after_unauthorized: Some(auth_context.retry_after_unauthorized),
auth_recovery_mode: auth_context.recovery_mode,
auth_recovery_phase: auth_context.recovery_phase,
auth_connection_reused: Some(false),
auth_request_id: response_debug.request_id.as_deref(),
auth_cf_ray: response_debug.cf_ray.as_deref(),
auth_error: response_debug.auth_error.as_deref(),
auth_error_code: response_debug.auth_error_code.as_deref(),
auth_recovery_followup_success: auth_context
.retry_after_unauthorized
.then_some(result.is_ok()),
auth_recovery_followup_status: auth_context
.retry_after_unauthorized
.then_some(status)
.flatten(),
});
result
}
/// Builds websocket handshake headers for both prewarm and turn-time reconnect.
@@ -718,7 +834,11 @@ impl ModelClientSession {
"failed to build websocket prewarm client setup: {err}"
))
})?;
let auth_context = AuthRequestTelemetryContext::new(
client_setup.auth.as_ref().map(CodexAuth::auth_mode),
&client_setup.api_auth,
PendingUnauthorizedRetry::default(),
);
let connection = self
.client
.connect_websocket(
@@ -727,9 +847,12 @@ impl ModelClientSession {
client_setup.api_auth,
Some(Arc::clone(&self.turn_state)),
None,
auth_context,
RequestRouteTelemetry::for_endpoint(RESPONSES_ENDPOINT),
)
.await?;
self.websocket_session.connection = Some(connection);
self.websocket_session.set_connection_reused(false);
Ok(())
}
/// Returns a websocket connection for this turn.
@@ -742,17 +865,22 @@ impl ModelClientSession {
wire_api = %self.client.state.provider.wire_api,
transport = "responses_websocket",
api.path = "responses",
turn.has_metadata_header = turn_metadata_header.is_some()
turn.has_metadata_header = params.turn_metadata_header.is_some()
)
)]
async fn websocket_connection(
&mut self,
session_telemetry: &SessionTelemetry,
api_provider: codex_api::Provider,
api_auth: CoreAuthProvider,
turn_metadata_header: Option<&str>,
options: &ApiResponsesOptions,
params: WebsocketConnectParams<'_>,
) -> std::result::Result<&ApiWebSocketConnection, ApiError> {
let WebsocketConnectParams {
session_telemetry,
api_provider,
api_auth,
turn_metadata_header,
options,
auth_context,
request_route_telemetry,
} = params;
let needs_new = match self.websocket_session.connection.as_ref() {
Some(conn) => conn.is_closed().await,
None => true,
@@ -773,9 +901,14 @@ impl ModelClientSession {
api_auth,
Some(turn_state),
turn_metadata_header,
auth_context,
request_route_telemetry,
)
.await?;
self.websocket_session.connection = Some(new_conn);
self.websocket_session.set_connection_reused(false);
} else {
self.websocket_session.set_connection_reused(true);
}
self.websocket_session
@@ -840,11 +973,20 @@ impl ModelClientSession {
let mut auth_recovery = auth_manager
.as_ref()
.map(super::auth::AuthManager::unauthorized_recovery);
let mut pending_retry = PendingUnauthorizedRetry::default();
loop {
let client_setup = self.client.current_client_setup().await?;
let transport = ReqwestTransport::new(build_reqwest_client());
let (request_telemetry, sse_telemetry) =
Self::build_streaming_telemetry(session_telemetry);
let request_auth_context = AuthRequestTelemetryContext::new(
client_setup.auth.as_ref().map(CodexAuth::auth_mode),
&client_setup.api_auth,
pending_retry,
);
let (request_telemetry, sse_telemetry) = Self::build_streaming_telemetry(
session_telemetry,
request_auth_context,
RequestRouteTelemetry::for_endpoint(RESPONSES_ENDPOINT),
);
let compression = self.responses_request_compression(client_setup.auth.as_ref());
let options = self.build_responses_options(turn_metadata_header, compression);
@@ -872,7 +1014,14 @@ impl ModelClientSession {
Err(ApiError::Transport(
unauthorized_transport @ TransportError::Http { status, .. },
)) if status == StatusCode::UNAUTHORIZED => {
handle_unauthorized(unauthorized_transport, &mut auth_recovery).await?;
pending_retry = PendingUnauthorizedRetry::from_recovery(
handle_unauthorized(
unauthorized_transport,
&mut auth_recovery,
session_telemetry,
)
.await?,
);
continue;
}
Err(err) => return Err(map_api_error(err)),
@@ -911,8 +1060,14 @@ impl ModelClientSession {
let mut auth_recovery = auth_manager
.as_ref()
.map(super::auth::AuthManager::unauthorized_recovery);
let mut pending_retry = PendingUnauthorizedRetry::default();
loop {
let client_setup = self.client.current_client_setup().await?;
let request_auth_context = AuthRequestTelemetryContext::new(
client_setup.auth.as_ref().map(CodexAuth::auth_mode),
&client_setup.api_auth,
pending_retry,
);
let compression = self.responses_request_compression(client_setup.auth.as_ref());
let options = self.build_responses_options(turn_metadata_header, compression);
@@ -933,13 +1088,17 @@ impl ModelClientSession {
}
match self
.websocket_connection(
.websocket_connection(WebsocketConnectParams {
session_telemetry,
client_setup.api_provider,
client_setup.api_auth,
api_provider: client_setup.api_provider,
api_auth: client_setup.api_auth,
turn_metadata_header,
&options,
)
options: &options,
auth_context: request_auth_context,
request_route_telemetry: RequestRouteTelemetry::for_endpoint(
RESPONSES_ENDPOINT,
),
})
.await
{
Ok(_) => {}
@@ -951,7 +1110,14 @@ impl ModelClientSession {
Err(ApiError::Transport(
unauthorized_transport @ TransportError::Http { status, .. },
)) if status == StatusCode::UNAUTHORIZED => {
handle_unauthorized(unauthorized_transport, &mut auth_recovery).await?;
pending_retry = PendingUnauthorizedRetry::from_recovery(
handle_unauthorized(
unauthorized_transport,
&mut auth_recovery,
session_telemetry,
)
.await?,
);
continue;
}
Err(err) => return Err(map_api_error(err)),
@@ -968,7 +1134,7 @@ impl ModelClientSession {
"websocket connection is unavailable".to_string(),
))
})?
.stream_request(ws_request)
.stream_request(ws_request, self.websocket_session.connection_reused())
.await
.map_err(map_api_error)?;
let (stream, last_request_rx) =
@@ -981,8 +1147,14 @@ impl ModelClientSession {
/// Builds request and SSE telemetry for streaming API calls.
fn build_streaming_telemetry(
session_telemetry: &SessionTelemetry,
auth_context: AuthRequestTelemetryContext,
request_route_telemetry: RequestRouteTelemetry,
) -> (Arc<dyn RequestTelemetry>, Arc<dyn SseTelemetry>) {
let telemetry = Arc::new(ApiTelemetry::new(session_telemetry.clone()));
let telemetry = Arc::new(ApiTelemetry::new(
session_telemetry.clone(),
auth_context,
request_route_telemetry,
));
let request_telemetry: Arc<dyn RequestTelemetry> = telemetry.clone();
let sse_telemetry: Arc<dyn SseTelemetry> = telemetry;
(request_telemetry, sse_telemetry)
@@ -991,8 +1163,14 @@ impl ModelClientSession {
/// Builds telemetry for the Responses API WebSocket transport.
fn build_websocket_telemetry(
session_telemetry: &SessionTelemetry,
auth_context: AuthRequestTelemetryContext,
request_route_telemetry: RequestRouteTelemetry,
) -> Arc<dyn WebsocketTelemetry> {
let telemetry = Arc::new(ApiTelemetry::new(session_telemetry.clone()));
let telemetry = Arc::new(ApiTelemetry::new(
session_telemetry.clone(),
auth_context,
request_route_telemetry,
));
let websocket_telemetry: Arc<dyn WebsocketTelemetry> = telemetry;
websocket_telemetry
}
@@ -1126,6 +1304,7 @@ impl ModelClientSession {
self.websocket_session.connection = None;
self.websocket_session.last_request = None;
self.websocket_session.last_response_rx = None;
self.websocket_session.set_connection_reused(false);
}
activated
}
@@ -1264,30 +1443,209 @@ where
///
/// When refresh succeeds, the caller should retry the API call; otherwise
/// the mapped `CodexErr` is returned to the caller.
#[derive(Clone, Copy, Debug)]
struct UnauthorizedRecoveryExecution {
mode: &'static str,
phase: &'static str,
}
#[derive(Clone, Copy, Debug, Default)]
struct PendingUnauthorizedRetry {
retry_after_unauthorized: bool,
recovery_mode: Option<&'static str>,
recovery_phase: Option<&'static str>,
}
impl PendingUnauthorizedRetry {
fn from_recovery(recovery: UnauthorizedRecoveryExecution) -> Self {
Self {
retry_after_unauthorized: true,
recovery_mode: Some(recovery.mode),
recovery_phase: Some(recovery.phase),
}
}
}
#[derive(Clone, Copy, Debug, Default)]
struct AuthRequestTelemetryContext {
auth_mode: Option<&'static str>,
auth_header_attached: bool,
auth_header_name: Option<&'static str>,
retry_after_unauthorized: bool,
recovery_mode: Option<&'static str>,
recovery_phase: Option<&'static str>,
}
impl AuthRequestTelemetryContext {
fn new(
auth_mode: Option<AuthMode>,
api_auth: &CoreAuthProvider,
retry: PendingUnauthorizedRetry,
) -> Self {
Self {
auth_mode: auth_mode.map(|mode| match mode {
AuthMode::ApiKey => "ApiKey",
AuthMode::Chatgpt => "Chatgpt",
}),
auth_header_attached: api_auth.auth_header_attached(),
auth_header_name: api_auth.auth_header_name(),
retry_after_unauthorized: retry.retry_after_unauthorized,
recovery_mode: retry.recovery_mode,
recovery_phase: retry.recovery_phase,
}
}
}
struct WebsocketConnectParams<'a> {
session_telemetry: &'a SessionTelemetry,
api_provider: codex_api::Provider,
api_auth: CoreAuthProvider,
turn_metadata_header: Option<&'a str>,
options: &'a ApiResponsesOptions,
auth_context: AuthRequestTelemetryContext,
request_route_telemetry: RequestRouteTelemetry,
}
async fn handle_unauthorized(
transport: TransportError,
auth_recovery: &mut Option<UnauthorizedRecovery>,
) -> Result<()> {
session_telemetry: &SessionTelemetry,
) -> Result<UnauthorizedRecoveryExecution> {
let debug = extract_response_debug_context(&transport);
if let Some(recovery) = auth_recovery
&& recovery.has_next()
{
let mode = recovery.mode_name();
let phase = recovery.step_name();
return match recovery.next().await {
Ok(_) => Ok(()),
Err(RefreshTokenError::Permanent(failed)) => Err(CodexErr::RefreshTokenFailed(failed)),
Err(RefreshTokenError::Transient(other)) => Err(CodexErr::Io(other)),
Ok(step_result) => {
session_telemetry.record_auth_recovery(
mode,
phase,
"recovery_succeeded",
debug.request_id.as_deref(),
debug.cf_ray.as_deref(),
debug.auth_error.as_deref(),
debug.auth_error_code.as_deref(),
None,
step_result.auth_state_changed(),
);
emit_feedback_auth_recovery_tags(
mode,
phase,
"recovery_succeeded",
debug.request_id.as_deref(),
debug.cf_ray.as_deref(),
debug.auth_error.as_deref(),
debug.auth_error_code.as_deref(),
);
Ok(UnauthorizedRecoveryExecution { mode, phase })
}
Err(RefreshTokenError::Permanent(failed)) => {
session_telemetry.record_auth_recovery(
mode,
phase,
"recovery_failed_permanent",
debug.request_id.as_deref(),
debug.cf_ray.as_deref(),
debug.auth_error.as_deref(),
debug.auth_error_code.as_deref(),
None,
None,
);
emit_feedback_auth_recovery_tags(
mode,
phase,
"recovery_failed_permanent",
debug.request_id.as_deref(),
debug.cf_ray.as_deref(),
debug.auth_error.as_deref(),
debug.auth_error_code.as_deref(),
);
Err(CodexErr::RefreshTokenFailed(failed))
}
Err(RefreshTokenError::Transient(other)) => {
session_telemetry.record_auth_recovery(
mode,
phase,
"recovery_failed_transient",
debug.request_id.as_deref(),
debug.cf_ray.as_deref(),
debug.auth_error.as_deref(),
debug.auth_error_code.as_deref(),
None,
None,
);
emit_feedback_auth_recovery_tags(
mode,
phase,
"recovery_failed_transient",
debug.request_id.as_deref(),
debug.cf_ray.as_deref(),
debug.auth_error.as_deref(),
debug.auth_error_code.as_deref(),
);
Err(CodexErr::Io(other))
}
};
}
let (mode, phase, recovery_reason) = match auth_recovery.as_ref() {
Some(recovery) => (
recovery.mode_name(),
recovery.step_name(),
Some(recovery.unavailable_reason()),
),
None => ("none", "none", Some("auth_manager_missing")),
};
session_telemetry.record_auth_recovery(
mode,
phase,
"recovery_not_run",
debug.request_id.as_deref(),
debug.cf_ray.as_deref(),
debug.auth_error.as_deref(),
debug.auth_error_code.as_deref(),
recovery_reason,
None,
);
emit_feedback_auth_recovery_tags(
mode,
phase,
"recovery_not_run",
debug.request_id.as_deref(),
debug.cf_ray.as_deref(),
debug.auth_error.as_deref(),
debug.auth_error_code.as_deref(),
);
Err(map_api_error(ApiError::Transport(transport)))
}
fn api_error_http_status(error: &ApiError) -> Option<u16> {
match error {
ApiError::Transport(TransportError::Http { status, .. }) => Some(status.as_u16()),
_ => None,
}
}
struct ApiTelemetry {
session_telemetry: SessionTelemetry,
auth_context: AuthRequestTelemetryContext,
request_route_telemetry: RequestRouteTelemetry,
}
impl ApiTelemetry {
fn new(session_telemetry: SessionTelemetry) -> Self {
Self { session_telemetry }
fn new(
session_telemetry: SessionTelemetry,
auth_context: AuthRequestTelemetryContext,
request_route_telemetry: RequestRouteTelemetry,
) -> Self {
Self {
session_telemetry,
auth_context,
request_route_telemetry,
}
}
}
@@ -1299,13 +1657,50 @@ impl RequestTelemetry for ApiTelemetry {
error: Option<&TransportError>,
duration: Duration,
) {
let error_message = error.map(std::string::ToString::to_string);
let error_message = error.map(telemetry_transport_error_message);
let status = status.map(|s| s.as_u16());
let debug = error
.map(extract_response_debug_context)
.unwrap_or_default();
self.session_telemetry.record_api_request(
attempt,
status.map(|s| s.as_u16()),
status,
error_message.as_deref(),
duration,
self.auth_context.auth_header_attached,
self.auth_context.auth_header_name,
self.auth_context.retry_after_unauthorized,
self.auth_context.recovery_mode,
self.auth_context.recovery_phase,
self.request_route_telemetry.endpoint,
debug.request_id.as_deref(),
debug.cf_ray.as_deref(),
debug.auth_error.as_deref(),
debug.auth_error_code.as_deref(),
);
emit_feedback_request_tags(&FeedbackRequestTags {
endpoint: self.request_route_telemetry.endpoint,
auth_header_attached: self.auth_context.auth_header_attached,
auth_header_name: self.auth_context.auth_header_name,
auth_mode: self.auth_context.auth_mode,
auth_retry_after_unauthorized: Some(self.auth_context.retry_after_unauthorized),
auth_recovery_mode: self.auth_context.recovery_mode,
auth_recovery_phase: self.auth_context.recovery_phase,
auth_connection_reused: None,
auth_request_id: debug.request_id.as_deref(),
auth_cf_ray: debug.cf_ray.as_deref(),
auth_error: debug.auth_error.as_deref(),
auth_error_code: debug.auth_error_code.as_deref(),
auth_recovery_followup_success: self
.auth_context
.retry_after_unauthorized
.then_some(error.is_none()),
auth_recovery_followup_status: self
.auth_context
.retry_after_unauthorized
.then_some(status)
.flatten(),
});
}
}
@@ -1323,10 +1718,40 @@ impl SseTelemetry for ApiTelemetry {
}
impl WebsocketTelemetry for ApiTelemetry {
fn on_ws_request(&self, duration: Duration, error: Option<&ApiError>) {
let error_message = error.map(std::string::ToString::to_string);
self.session_telemetry
.record_websocket_request(duration, error_message.as_deref());
fn on_ws_request(&self, duration: Duration, error: Option<&ApiError>, connection_reused: bool) {
let error_message = error.map(telemetry_api_error_message);
let status = error.and_then(api_error_http_status);
let debug = error
.map(extract_response_debug_context_from_api_error)
.unwrap_or_default();
self.session_telemetry.record_websocket_request(
duration,
error_message.as_deref(),
connection_reused,
);
emit_feedback_request_tags(&FeedbackRequestTags {
endpoint: self.request_route_telemetry.endpoint,
auth_header_attached: self.auth_context.auth_header_attached,
auth_header_name: self.auth_context.auth_header_name,
auth_mode: self.auth_context.auth_mode,
auth_retry_after_unauthorized: Some(self.auth_context.retry_after_unauthorized),
auth_recovery_mode: self.auth_context.recovery_mode,
auth_recovery_phase: self.auth_context.recovery_phase,
auth_connection_reused: Some(connection_reused),
auth_request_id: debug.request_id.as_deref(),
auth_cf_ray: debug.cf_ray.as_deref(),
auth_error: debug.auth_error.as_deref(),
auth_error_code: debug.auth_error_code.as_deref(),
auth_recovery_followup_success: self
.auth_context
.retry_after_unauthorized
.then_some(error.is_none()),
auth_recovery_followup_status: self
.auth_context
.retry_after_unauthorized
.then_some(status)
.flatten(),
});
}
fn on_ws_event(

View File

@@ -1,4 +1,7 @@
use super::AuthRequestTelemetryContext;
use super::ModelClient;
use super::PendingUnauthorizedRetry;
use super::UnauthorizedRecoveryExecution;
use codex_otel::SessionTelemetry;
use codex_protocol::ThreadId;
use codex_protocol::openai_models::ModelInfo;
@@ -94,3 +97,22 @@ async fn summarize_memories_returns_empty_for_empty_input() {
.expect("empty summarize request should succeed");
assert_eq!(output.len(), 0);
}
#[test]
fn auth_request_telemetry_context_tracks_attached_auth_and_retry_phase() {
let auth_context = AuthRequestTelemetryContext::new(
Some(crate::auth::AuthMode::Chatgpt),
&crate::api_bridge::CoreAuthProvider::for_test(Some("access-token"), Some("workspace-123")),
PendingUnauthorizedRetry::from_recovery(UnauthorizedRecoveryExecution {
mode: "managed",
phase: "refresh_token",
}),
);
assert_eq!(auth_context.auth_mode, Some("Chatgpt"));
assert!(auth_context.auth_header_attached);
assert_eq!(auth_context.auth_header_name, Some("authorization"));
assert!(auth_context.retry_after_unauthorized);
assert_eq!(auth_context.recovery_mode, Some("managed"));
assert_eq!(auth_context.recovery_phase, Some("refresh_token"));
}

View File

@@ -3936,12 +3936,13 @@ impl Session {
server: &str,
tool: &str,
arguments: Option<serde_json::Value>,
meta: Option<serde_json::Value>,
) -> anyhow::Result<CallToolResult> {
self.services
.mcp_connection_manager
.read()
.await
.call_tool(server, tool, arguments)
.call_tool(server, tool, arguments, meta)
.await
}

View File

@@ -292,6 +292,8 @@ pub struct UnexpectedResponseError {
pub url: Option<String>,
pub cf_ray: Option<String>,
pub request_id: Option<String>,
pub identity_authorization_error: Option<String>,
pub identity_error_code: Option<String>,
}
const CLOUDFLARE_BLOCKED_MESSAGE: &str =
@@ -346,6 +348,12 @@ impl UnexpectedResponseError {
if let Some(id) = &self.request_id {
message.push_str(&format!(", request id: {id}"));
}
if let Some(auth_error) = &self.identity_authorization_error {
message.push_str(&format!(", auth error: {auth_error}"));
}
if let Some(error_code) = &self.identity_error_code {
message.push_str(&format!(", auth error code: {error_code}"));
}
Some(message)
}
@@ -368,6 +376,12 @@ impl std::fmt::Display for UnexpectedResponseError {
if let Some(id) = &self.request_id {
message.push_str(&format!(", request id: {id}"));
}
if let Some(auth_error) = &self.identity_authorization_error {
message.push_str(&format!(", auth error: {auth_error}"));
}
if let Some(error_code) = &self.identity_error_code {
message.push_str(&format!(", auth error code: {error_code}"));
}
write!(f, "{message}")
}
}

View File

@@ -328,6 +328,8 @@ fn unexpected_status_cloudflare_html_is_simplified() {
url: Some("http://example.com/blocked".to_string()),
cf_ray: Some("ray-id".to_string()),
request_id: None,
identity_authorization_error: None,
identity_error_code: None,
};
let status = StatusCode::FORBIDDEN.to_string();
let url = "http://example.com/blocked";
@@ -345,6 +347,8 @@ fn unexpected_status_non_html_is_unchanged() {
url: Some("http://example.com/plain".to_string()),
cf_ray: None,
request_id: None,
identity_authorization_error: None,
identity_error_code: None,
};
let status = StatusCode::FORBIDDEN.to_string();
let url = "http://example.com/plain";
@@ -363,6 +367,8 @@ fn unexpected_status_prefers_error_message_when_present() {
url: Some("https://chatgpt.com/backend-api/codex/responses".to_string()),
cf_ray: None,
request_id: Some("req-123".to_string()),
identity_authorization_error: None,
identity_error_code: None,
};
let status = StatusCode::UNAUTHORIZED.to_string();
assert_eq!(
@@ -382,6 +388,8 @@ fn unexpected_status_truncates_long_body_with_ellipsis() {
url: Some("http://example.com/long".to_string()),
cf_ray: None,
request_id: Some("req-long".to_string()),
identity_authorization_error: None,
identity_error_code: None,
};
let status = StatusCode::BAD_GATEWAY.to_string();
let expected_body = format!("{}...", "x".repeat(UNEXPECTED_RESPONSE_BODY_MAX_BYTES));
@@ -401,6 +409,8 @@ fn unexpected_status_includes_cf_ray_and_request_id() {
url: Some("https://chatgpt.com/backend-api/codex/responses".to_string()),
cf_ray: Some("9c81f9f18f2fa49d-LHR".to_string()),
request_id: Some("req-xyz".to_string()),
identity_authorization_error: None,
identity_error_code: None,
};
let status = StatusCode::UNAUTHORIZED.to_string();
assert_eq!(
@@ -411,6 +421,26 @@ fn unexpected_status_includes_cf_ray_and_request_id() {
);
}
#[test]
fn unexpected_status_includes_identity_auth_details() {
let err = UnexpectedResponseError {
status: StatusCode::UNAUTHORIZED,
body: "plain text error".to_string(),
url: Some("https://chatgpt.com/backend-api/codex/models".to_string()),
cf_ray: Some("cf-ray-auth-401-test".to_string()),
request_id: Some("req-auth".to_string()),
identity_authorization_error: Some("missing_authorization_header".to_string()),
identity_error_code: Some("token_expired".to_string()),
};
let status = StatusCode::UNAUTHORIZED.to_string();
assert_eq!(
err.to_string(),
format!(
"unexpected status {status}: plain text error, url: https://chatgpt.com/backend-api/codex/models, cf-ray: cf-ray-auth-401-test, request id: req-auth, auth error: missing_authorization_header, auth error code: token_expired"
)
);
}
#[test]
fn usage_limit_reached_includes_hours_and_minutes() {
let base = Utc.with_ymd_and_hms(2024, 1, 1, 0, 0, 0).unwrap();

View File

@@ -154,8 +154,6 @@ pub enum Feature {
Plugins,
/// Allow the model to invoke the built-in image generation tool.
ImageGeneration,
/// Route apps MCP calls through the configured gateway.
AppsMcpGateway,
/// Allow prompting and installing missing MCP dependencies.
SkillMcpDependencyInstall,
/// Prompt for missing skill env var dependencies.
@@ -753,12 +751,6 @@ pub const FEATURES: &[FeatureSpec] = &[
stage: Stage::UnderDevelopment,
default_enabled: false,
},
FeatureSpec {
id: Feature::AppsMcpGateway,
key: "apps_mcp_gateway",
stage: Stage::UnderDevelopment,
default_enabled: false,
},
FeatureSpec {
id: Feature::SkillMcpDependencyInstall,
key: "skill_mcp_dependency_install",

View File

@@ -87,6 +87,7 @@ pub use model_provider_info::WireApi;
pub use model_provider_info::built_in_model_providers;
pub use model_provider_info::create_oss_provider_with_base_url;
mod event_mapping;
mod response_debug_context;
pub mod review_format;
pub mod review_prompts;
mod seatbelt_permissions;

View File

@@ -21,7 +21,6 @@ use crate::CodexAuth;
use crate::config::Config;
use crate::config::types::McpServerConfig;
use crate::config::types::McpServerTransportConfig;
use crate::features::Feature;
use crate::mcp::auth::compute_auth_statuses;
use crate::mcp_connection_manager::McpConnectionManager;
use crate::mcp_connection_manager::SandboxState;
@@ -33,8 +32,6 @@ const MCP_TOOL_NAME_PREFIX: &str = "mcp";
const MCP_TOOL_NAME_DELIMITER: &str = "__";
pub(crate) const CODEX_APPS_MCP_SERVER_NAME: &str = "codex_apps";
const CODEX_CONNECTORS_TOKEN_ENV_VAR: &str = "CODEX_CONNECTORS_TOKEN";
const OPENAI_CONNECTORS_MCP_BASE_URL: &str = "https://api.openai.com";
const OPENAI_CONNECTORS_MCP_PATH: &str = "/v1/connectors/gateways/flat/mcp";
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct ToolPluginProvenance {
@@ -94,13 +91,6 @@ impl ToolPluginProvenance {
}
}
// Legacy vs new MCP gateway
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum CodexAppsMcpGateway {
LegacyMCPGateway,
MCPGateway,
}
fn codex_apps_mcp_bearer_token_env_var() -> Option<String> {
match env::var(CODEX_CONNECTORS_TOKEN_ENV_VAR) {
Ok(value) if !value.trim().is_empty() => Some(CODEX_CONNECTORS_TOKEN_ENV_VAR.to_string()),
@@ -135,14 +125,6 @@ fn codex_apps_mcp_http_headers(auth: Option<&CodexAuth>) -> Option<HashMap<Strin
}
}
fn selected_config_codex_apps_mcp_gateway(config: &Config) -> CodexAppsMcpGateway {
if config.features.enabled(Feature::AppsMcpGateway) {
CodexAppsMcpGateway::MCPGateway
} else {
CodexAppsMcpGateway::LegacyMCPGateway
}
}
fn normalize_codex_apps_base_url(base_url: &str) -> String {
let mut base_url = base_url.trim_end_matches('/').to_string();
if (base_url.starts_with("https://chatgpt.com")
@@ -154,11 +136,7 @@ fn normalize_codex_apps_base_url(base_url: &str) -> String {
base_url
}
fn codex_apps_mcp_url_for_gateway(base_url: &str, gateway: CodexAppsMcpGateway) -> String {
if gateway == CodexAppsMcpGateway::MCPGateway {
return format!("{OPENAI_CONNECTORS_MCP_BASE_URL}{OPENAI_CONNECTORS_MCP_PATH}");
}
fn codex_apps_mcp_url_for_base_url(base_url: &str) -> String {
let base_url = normalize_codex_apps_base_url(base_url);
if base_url.contains("/backend-api") {
format!("{base_url}/wham/apps")
@@ -170,10 +148,7 @@ fn codex_apps_mcp_url_for_gateway(base_url: &str, gateway: CodexAppsMcpGateway)
}
pub(crate) fn codex_apps_mcp_url(config: &Config) -> String {
codex_apps_mcp_url_for_gateway(
&config.chatgpt_base_url,
selected_config_codex_apps_mcp_gateway(config),
)
codex_apps_mcp_url_for_base_url(&config.chatgpt_base_url)
}
fn codex_apps_mcp_server_config(config: &Config, auth: Option<&CodexAuth>) -> McpServerConfig {

View File

@@ -1,6 +1,7 @@
use super::*;
use crate::config::CONFIG_TOML_FILE;
use crate::config::ConfigBuilder;
use crate::features::Feature;
use crate::plugins::AppConnectorId;
use crate::plugins::PluginCapabilitySummary;
use pretty_assertions::assert_eq;
@@ -123,67 +124,27 @@ fn tool_plugin_provenance_collects_app_and_mcp_sources() {
}
#[test]
fn codex_apps_mcp_url_for_default_gateway_keeps_existing_paths() {
fn codex_apps_mcp_url_for_base_url_keeps_existing_paths() {
assert_eq!(
codex_apps_mcp_url_for_gateway(
"https://chatgpt.com/backend-api",
CodexAppsMcpGateway::LegacyMCPGateway
),
codex_apps_mcp_url_for_base_url("https://chatgpt.com/backend-api"),
"https://chatgpt.com/backend-api/wham/apps"
);
assert_eq!(
codex_apps_mcp_url_for_gateway(
"https://chat.openai.com",
CodexAppsMcpGateway::LegacyMCPGateway
),
codex_apps_mcp_url_for_base_url("https://chat.openai.com"),
"https://chat.openai.com/backend-api/wham/apps"
);
assert_eq!(
codex_apps_mcp_url_for_gateway(
"http://localhost:8080/api/codex",
CodexAppsMcpGateway::LegacyMCPGateway
),
codex_apps_mcp_url_for_base_url("http://localhost:8080/api/codex"),
"http://localhost:8080/api/codex/apps"
);
assert_eq!(
codex_apps_mcp_url_for_gateway(
"http://localhost:8080",
CodexAppsMcpGateway::LegacyMCPGateway
),
codex_apps_mcp_url_for_base_url("http://localhost:8080"),
"http://localhost:8080/api/codex/apps"
);
}
#[test]
fn codex_apps_mcp_url_for_gateway_uses_openai_connectors_gateway() {
let expected_url = format!("{OPENAI_CONNECTORS_MCP_BASE_URL}{OPENAI_CONNECTORS_MCP_PATH}");
assert_eq!(
codex_apps_mcp_url_for_gateway(
"https://chatgpt.com/backend-api",
CodexAppsMcpGateway::MCPGateway
),
expected_url.as_str()
);
assert_eq!(
codex_apps_mcp_url_for_gateway("https://chat.openai.com", CodexAppsMcpGateway::MCPGateway),
expected_url.as_str()
);
assert_eq!(
codex_apps_mcp_url_for_gateway(
"http://localhost:8080/api/codex",
CodexAppsMcpGateway::MCPGateway
),
expected_url.as_str()
);
assert_eq!(
codex_apps_mcp_url_for_gateway("http://localhost:8080", CodexAppsMcpGateway::MCPGateway),
expected_url.as_str()
);
}
#[test]
fn codex_apps_mcp_url_uses_default_gateway_when_feature_is_disabled() {
fn codex_apps_mcp_url_uses_legacy_codex_apps_path() {
let mut config = crate::config::test_config();
config.chatgpt_base_url = "https://chatgpt.com".to_string();
@@ -194,22 +155,7 @@ fn codex_apps_mcp_url_uses_default_gateway_when_feature_is_disabled() {
}
#[test]
fn codex_apps_mcp_url_uses_openai_connectors_gateway_when_feature_is_enabled() {
let mut config = crate::config::test_config();
config.chatgpt_base_url = "https://chatgpt.com".to_string();
config
.features
.enable(Feature::AppsMcpGateway)
.expect("test config should allow apps gateway");
assert_eq!(
codex_apps_mcp_url(&config),
format!("{OPENAI_CONNECTORS_MCP_BASE_URL}{OPENAI_CONNECTORS_MCP_PATH}")
);
}
#[test]
fn codex_apps_server_config_switches_gateway_with_flags() {
fn codex_apps_server_config_uses_legacy_codex_apps_path() {
let mut config = crate::config::test_config();
config.chatgpt_base_url = "https://chatgpt.com".to_string();
@@ -231,22 +177,6 @@ fn codex_apps_server_config_switches_gateway_with_flags() {
};
assert_eq!(url, "https://chatgpt.com/backend-api/wham/apps");
config
.features
.enable(Feature::AppsMcpGateway)
.expect("test config should allow apps gateway");
servers = with_codex_apps_mcp(servers, true, None, &config);
let server = servers
.get(CODEX_APPS_MCP_SERVER_NAME)
.expect("codex apps should remain present when apps stays enabled");
let url = match &server.transport {
McpServerTransportConfig::StreamableHttp { url, .. } => url,
_ => panic!("expected streamable http transport for codex apps"),
};
let expected_url = format!("{OPENAI_CONNECTORS_MCP_BASE_URL}{OPENAI_CONNECTORS_MCP_PATH}");
assert_eq!(url, &expected_url);
}
#[tokio::test]

View File

@@ -1014,6 +1014,7 @@ impl McpConnectionManager {
server: &str,
tool: &str,
arguments: Option<serde_json::Value>,
meta: Option<serde_json::Value>,
) -> Result<CallToolResult> {
let client = self.client_by_name(server).await?;
if !client.tool_filter.allows(tool) {
@@ -1024,7 +1025,7 @@ impl McpConnectionManager {
let result: rmcp::model::CallToolResult = client
.client
.call_tool(tool.to_string(), arguments, client.tool_timeout)
.call_tool(tool.to_string(), arguments, meta, client.tool_timeout)
.await
.with_context(|| format!("tool call failed for `{server}/{tool}`"))?;

View File

@@ -117,6 +117,7 @@ pub(crate) async fn handle_mcp_tool_call(
.counter("codex.mcp.call", 1, &[("status", status)]);
return CallToolResult::from_result(result);
}
let request_meta = build_mcp_tool_call_request_meta(&server, metadata.as_ref());
let tool_call_begin_event = EventMsg::McpToolCallBegin(McpToolCallBeginEvent {
call_id: call_id.clone(),
@@ -142,7 +143,12 @@ pub(crate) async fn handle_mcp_tool_call(
let start = Instant::now();
let result = sess
.call_tool(&server, &tool_name, arguments_value.clone())
.call_tool(
&server,
&tool_name,
arguments_value.clone(),
request_meta.clone(),
)
.await
.map_err(|e| format!("tool call error: {e:?}"));
let result = sanitize_mcp_tool_result_for_model(
@@ -226,7 +232,7 @@ pub(crate) async fn handle_mcp_tool_call(
let start = Instant::now();
// Perform the tool call.
let result = sess
.call_tool(&server, &tool_name, arguments_value.clone())
.call_tool(&server, &tool_name, arguments_value.clone(), request_meta)
.await
.map_err(|e| format!("tool call error: {e:?}"));
let result = sanitize_mcp_tool_result_for_model(
@@ -374,6 +380,24 @@ pub(crate) struct McpToolApprovalMetadata {
connector_description: Option<String>,
tool_title: Option<String>,
tool_description: Option<String>,
codex_apps_meta: Option<serde_json::Map<String, serde_json::Value>>,
}
const MCP_TOOL_CODEX_APPS_META_KEY: &str = "_codex_apps";
fn build_mcp_tool_call_request_meta(
server: &str,
metadata: Option<&McpToolApprovalMetadata>,
) -> Option<serde_json::Value> {
if server != CODEX_APPS_MCP_SERVER_NAME {
return None;
}
let codex_apps_meta = metadata.and_then(|metadata| metadata.codex_apps_meta.as_ref())?;
Some(serde_json::json!({
MCP_TOOL_CODEX_APPS_META_KEY: codex_apps_meta,
}))
}
#[derive(Clone, Copy)]
@@ -750,6 +774,13 @@ pub(crate) async fn lookup_mcp_tool_metadata(
connector_description,
tool_title: tool_info.tool.title,
tool_description: tool_info.tool.description.map(std::borrow::Cow::into_owned),
codex_apps_meta: tool_info
.tool
.meta
.as_ref()
.and_then(|meta| meta.get(MCP_TOOL_CODEX_APPS_META_KEY))
.and_then(serde_json::Value::as_object)
.cloned(),
})
}

View File

@@ -47,6 +47,7 @@ fn approval_metadata(
connector_description: connector_description.map(str::to_string),
tool_title: tool_title.map(str::to_string),
tool_description: tool_description.map(str::to_string),
codex_apps_meta: None,
}
}
@@ -415,6 +416,39 @@ fn sanitize_mcp_tool_result_for_model_preserves_image_when_supported() {
assert_eq!(got, original);
}
#[test]
fn codex_apps_tool_call_request_meta_includes_codex_apps_meta() {
let metadata = McpToolApprovalMetadata {
annotations: None,
connector_id: Some("calendar".to_string()),
connector_name: Some("Calendar".to_string()),
connector_description: Some("Manage events".to_string()),
tool_title: Some("Create Event".to_string()),
tool_description: Some("Create a calendar event.".to_string()),
codex_apps_meta: Some(
serde_json::json!({
"resource_uri": "connector://calendar/tools/calendar_create_event",
"contains_mcp_source": true,
"connector_id": "calendar",
})
.as_object()
.cloned()
.expect("_codex_apps metadata should be an object"),
),
};
assert_eq!(
build_mcp_tool_call_request_meta(CODEX_APPS_MCP_SERVER_NAME, Some(&metadata)),
Some(serde_json::json!({
MCP_TOOL_CODEX_APPS_META_KEY: {
"resource_uri": "connector://calendar/tools/calendar_create_event",
"contains_mcp_source": true,
"connector_id": "calendar",
},
}))
);
}
#[test]
fn accepted_elicitation_content_converts_to_request_user_input_response() {
let response = request_user_input_response_from_elicitation_content(Some(serde_json::json!(
@@ -535,6 +569,7 @@ fn guardian_mcp_review_request_includes_annotations_when_present() {
connector_description: None,
tool_title: None,
tool_description: None,
codex_apps_meta: None,
};
let request = build_guardian_mcp_tool_review_request("call-1", &invocation, Some(&metadata));
@@ -856,6 +891,7 @@ async fn approve_mode_skips_when_annotations_do_not_require_approval() {
connector_description: None,
tool_title: Some("Read Only Tool".to_string()),
tool_description: None,
codex_apps_meta: None,
};
let decision = maybe_request_mcp_tool_approval(
@@ -919,6 +955,7 @@ async fn approve_mode_blocks_when_arc_returns_interrupt_for_model() {
connector_description: Some("Manage events".to_string()),
tool_title: Some("Dangerous Tool".to_string()),
tool_description: Some("Performs a risky action.".to_string()),
codex_apps_meta: None,
};
let decision = maybe_request_mcp_tool_approval(
@@ -1021,6 +1058,7 @@ async fn approve_mode_routes_arc_ask_user_to_guardian_when_guardian_reviewer_is_
connector_description: Some("Manage events".to_string()),
tool_title: Some("Dangerous Tool".to_string()),
tool_description: Some("Performs a risky action.".to_string()),
codex_apps_meta: None,
};
let decision = maybe_request_mcp_tool_approval(

View File

@@ -3,6 +3,7 @@ use crate::api_bridge::auth_provider_from_auth;
use crate::api_bridge::map_api_error;
use crate::auth::AuthManager;
use crate::auth::AuthMode;
use crate::auth::CodexAuth;
use crate::config::Config;
use crate::default_client::build_reqwest_client;
use crate::error::CodexErr;
@@ -11,8 +12,15 @@ use crate::model_provider_info::ModelProviderInfo;
use crate::models_manager::collaboration_mode_presets::CollaborationModesConfig;
use crate::models_manager::collaboration_mode_presets::builtin_collaboration_mode_presets;
use crate::models_manager::model_info;
use crate::response_debug_context::extract_response_debug_context;
use crate::response_debug_context::telemetry_transport_error_message;
use crate::util::FeedbackRequestTags;
use crate::util::emit_feedback_request_tags;
use codex_api::ModelsClient;
use codex_api::RequestTelemetry;
use codex_api::ReqwestTransport;
use codex_api::TransportError;
use codex_otel::TelemetryAuthMode;
use codex_protocol::config_types::CollaborationModeMask;
use codex_protocol::openai_models::ModelInfo;
use codex_protocol::openai_models::ModelPreset;
@@ -32,6 +40,82 @@ use tracing::instrument;
const MODEL_CACHE_FILE: &str = "models_cache.json";
const DEFAULT_MODEL_CACHE_TTL: Duration = Duration::from_secs(300);
const MODELS_REFRESH_TIMEOUT: Duration = Duration::from_secs(5);
const MODELS_ENDPOINT: &str = "/models";
#[derive(Clone)]
struct ModelsRequestTelemetry {
auth_mode: Option<String>,
auth_header_attached: bool,
auth_header_name: Option<&'static str>,
}
impl RequestTelemetry for ModelsRequestTelemetry {
fn on_request(
&self,
attempt: u64,
status: Option<http::StatusCode>,
error: Option<&TransportError>,
duration: Duration,
) {
let success = status.is_some_and(|code| code.is_success()) && error.is_none();
let error_message = error.map(telemetry_transport_error_message);
let response_debug = error
.map(extract_response_debug_context)
.unwrap_or_default();
let status = status.map(|status| status.as_u16());
tracing::event!(
target: "codex_otel.log_only",
tracing::Level::INFO,
event.name = "codex.api_request",
duration_ms = %duration.as_millis(),
http.response.status_code = status,
success = success,
error.message = error_message.as_deref(),
attempt = attempt,
endpoint = MODELS_ENDPOINT,
auth.header_attached = self.auth_header_attached,
auth.header_name = self.auth_header_name,
auth.request_id = response_debug.request_id.as_deref(),
auth.cf_ray = response_debug.cf_ray.as_deref(),
auth.error = response_debug.auth_error.as_deref(),
auth.error_code = response_debug.auth_error_code.as_deref(),
auth.mode = self.auth_mode.as_deref(),
);
tracing::event!(
target: "codex_otel.trace_safe",
tracing::Level::INFO,
event.name = "codex.api_request",
duration_ms = %duration.as_millis(),
http.response.status_code = status,
success = success,
error.message = error_message.as_deref(),
attempt = attempt,
endpoint = MODELS_ENDPOINT,
auth.header_attached = self.auth_header_attached,
auth.header_name = self.auth_header_name,
auth.request_id = response_debug.request_id.as_deref(),
auth.cf_ray = response_debug.cf_ray.as_deref(),
auth.error = response_debug.auth_error.as_deref(),
auth.error_code = response_debug.auth_error_code.as_deref(),
auth.mode = self.auth_mode.as_deref(),
);
emit_feedback_request_tags(&FeedbackRequestTags {
endpoint: MODELS_ENDPOINT,
auth_header_attached: self.auth_header_attached,
auth_header_name: self.auth_header_name,
auth_mode: self.auth_mode.as_deref(),
auth_retry_after_unauthorized: None,
auth_recovery_mode: None,
auth_recovery_phase: None,
auth_connection_reused: None,
auth_request_id: response_debug.request_id.as_deref(),
auth_cf_ray: response_debug.cf_ray.as_deref(),
auth_error: response_debug.auth_error.as_deref(),
auth_error_code: response_debug.auth_error_code.as_deref(),
auth_recovery_followup_success: None,
auth_recovery_followup_status: None,
});
}
}
/// Strategy for refreshing available models.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
@@ -330,11 +414,17 @@ impl ModelsManager {
let _timer =
codex_otel::start_global_timer("codex.remote_models.fetch_update.duration_ms", &[]);
let auth = self.auth_manager.auth().await;
let auth_mode = self.auth_manager.auth_mode();
let auth_mode = auth.as_ref().map(CodexAuth::auth_mode);
let api_provider = self.provider.to_api_provider(auth_mode)?;
let api_auth = auth_provider_from_auth(auth.clone(), &self.provider)?;
let transport = ReqwestTransport::new(build_reqwest_client());
let client = ModelsClient::new(transport, api_provider, api_auth);
let request_telemetry: Arc<dyn RequestTelemetry> = Arc::new(ModelsRequestTelemetry {
auth_mode: auth_mode.map(|mode| TelemetryAuthMode::from(mode).to_string()),
auth_header_attached: api_auth.auth_header_attached(),
auth_header_name: api_auth.auth_header_name(),
});
let client = ModelsClient::new(transport, api_provider, api_auth)
.with_telemetry(Some(request_telemetry));
let client_version = crate::models_manager::client_version_to_whole();
let (models, etag) = timeout(

View File

@@ -0,0 +1,167 @@
use base64::Engine;
use codex_api::TransportError;
use codex_api::error::ApiError;
const REQUEST_ID_HEADER: &str = "x-request-id";
const OAI_REQUEST_ID_HEADER: &str = "x-oai-request-id";
const CF_RAY_HEADER: &str = "cf-ray";
const AUTH_ERROR_HEADER: &str = "x-openai-authorization-error";
const X_ERROR_JSON_HEADER: &str = "x-error-json";
#[derive(Debug, Default, Clone, PartialEq, Eq)]
pub(crate) struct ResponseDebugContext {
pub(crate) request_id: Option<String>,
pub(crate) cf_ray: Option<String>,
pub(crate) auth_error: Option<String>,
pub(crate) auth_error_code: Option<String>,
}
pub(crate) fn extract_response_debug_context(transport: &TransportError) -> ResponseDebugContext {
let mut context = ResponseDebugContext::default();
let TransportError::Http {
headers, body: _, ..
} = transport
else {
return context;
};
let extract_header = |name: &str| {
headers
.as_ref()
.and_then(|headers| headers.get(name))
.and_then(|value| value.to_str().ok())
.map(str::to_string)
};
context.request_id =
extract_header(REQUEST_ID_HEADER).or_else(|| extract_header(OAI_REQUEST_ID_HEADER));
context.cf_ray = extract_header(CF_RAY_HEADER);
context.auth_error = extract_header(AUTH_ERROR_HEADER);
context.auth_error_code = extract_header(X_ERROR_JSON_HEADER).and_then(|encoded| {
let decoded = base64::engine::general_purpose::STANDARD
.decode(encoded)
.ok()?;
let parsed = serde_json::from_slice::<serde_json::Value>(&decoded).ok()?;
parsed
.get("error")
.and_then(|error| error.get("code"))
.and_then(serde_json::Value::as_str)
.map(str::to_string)
});
context
}
pub(crate) fn extract_response_debug_context_from_api_error(
error: &ApiError,
) -> ResponseDebugContext {
match error {
ApiError::Transport(transport) => extract_response_debug_context(transport),
_ => ResponseDebugContext::default(),
}
}
pub(crate) fn telemetry_transport_error_message(error: &TransportError) -> String {
match error {
TransportError::Http { status, .. } => format!("http {}", status.as_u16()),
TransportError::RetryLimit => "retry limit reached".to_string(),
TransportError::Timeout => "timeout".to_string(),
TransportError::Network(err) => err.to_string(),
TransportError::Build(err) => err.to_string(),
}
}
pub(crate) fn telemetry_api_error_message(error: &ApiError) -> String {
match error {
ApiError::Transport(transport) => telemetry_transport_error_message(transport),
ApiError::Api { status, .. } => format!("api error {}", status.as_u16()),
ApiError::Stream(err) => err.to_string(),
ApiError::ContextWindowExceeded => "context window exceeded".to_string(),
ApiError::QuotaExceeded => "quota exceeded".to_string(),
ApiError::UsageNotIncluded => "usage not included".to_string(),
ApiError::Retryable { .. } => "retryable error".to_string(),
ApiError::RateLimit(_) => "rate limit".to_string(),
ApiError::InvalidRequest { .. } => "invalid request".to_string(),
ApiError::ServerOverloaded => "server overloaded".to_string(),
}
}
#[cfg(test)]
mod tests {
use super::ResponseDebugContext;
use super::extract_response_debug_context;
use super::telemetry_api_error_message;
use super::telemetry_transport_error_message;
use codex_api::TransportError;
use codex_api::error::ApiError;
use http::HeaderMap;
use http::HeaderValue;
use http::StatusCode;
use pretty_assertions::assert_eq;
#[test]
fn extract_response_debug_context_decodes_identity_headers() {
let mut headers = HeaderMap::new();
headers.insert("x-oai-request-id", HeaderValue::from_static("req-auth"));
headers.insert("cf-ray", HeaderValue::from_static("ray-auth"));
headers.insert(
"x-openai-authorization-error",
HeaderValue::from_static("missing_authorization_header"),
);
headers.insert(
"x-error-json",
HeaderValue::from_static("eyJlcnJvciI6eyJjb2RlIjoidG9rZW5fZXhwaXJlZCJ9fQ=="),
);
let context = extract_response_debug_context(&TransportError::Http {
status: StatusCode::UNAUTHORIZED,
url: Some("https://chatgpt.com/backend-api/codex/models".to_string()),
headers: Some(headers),
body: Some(r#"{"error":{"message":"plain text error"},"status":401}"#.to_string()),
});
assert_eq!(
context,
ResponseDebugContext {
request_id: Some("req-auth".to_string()),
cf_ray: Some("ray-auth".to_string()),
auth_error: Some("missing_authorization_header".to_string()),
auth_error_code: Some("token_expired".to_string()),
}
);
}
#[test]
fn telemetry_error_messages_omit_http_bodies() {
let transport = TransportError::Http {
status: StatusCode::UNAUTHORIZED,
url: Some("https://chatgpt.com/backend-api/codex/responses".to_string()),
headers: None,
body: Some(r#"{"error":{"message":"secret token leaked"}}"#.to_string()),
};
assert_eq!(telemetry_transport_error_message(&transport), "http 401");
assert_eq!(
telemetry_api_error_message(&ApiError::Transport(transport)),
"http 401"
);
}
#[test]
fn telemetry_error_messages_preserve_non_http_details() {
let network = TransportError::Network("dns lookup failed".to_string());
let build = TransportError::Build("invalid header value".to_string());
let stream = ApiError::Stream("socket closed".to_string());
assert_eq!(
telemetry_transport_error_message(&network),
"dns lookup failed"
);
assert_eq!(
telemetry_transport_error_message(&build),
"invalid header value"
);
assert_eq!(telemetry_api_error_message(&stream), "socket closed");
}
}

View File

@@ -5,6 +5,7 @@ use crate::client_common::tools::ToolSpec;
use crate::config::AgentRoleConfig;
use crate::features::Feature;
use crate::features::Features;
use crate::mcp::CODEX_APPS_MCP_SERVER_NAME;
use crate::mcp_connection_manager::ToolInfo;
use crate::models_manager::collaboration_mode_presets::CollaborationModesConfig;
use crate::original_image_detail::can_request_original_image_detail;
@@ -1673,22 +1674,58 @@ fn create_tool_search_tool(app_tools: &HashMap<String, ToolInfo>) -> ToolSpec {
},
),
]);
let mut app_names = app_tools
.values()
.filter_map(|tool| tool.connector_name.clone())
.collect::<Vec<_>>();
app_names.sort();
app_names.dedup();
let app_names = app_names.join(", ");
let mut app_descriptions = BTreeMap::new();
for tool in app_tools.values() {
if tool.server_name != CODEX_APPS_MCP_SERVER_NAME {
continue;
}
let description = if app_names.is_empty() {
TOOL_SEARCH_DESCRIPTION_TEMPLATE
.replace("({{app_names}})", "(None currently enabled)")
.replace("{{app_names}}", "available apps")
let Some(connector_name) = tool
.connector_name
.as_deref()
.map(str::trim)
.filter(|connector_name| !connector_name.is_empty())
else {
continue;
};
let connector_description = tool
.connector_description
.as_deref()
.map(str::trim)
.filter(|connector_description| !connector_description.is_empty())
.map(str::to_string);
app_descriptions
.entry(connector_name.to_string())
.and_modify(|existing: &mut Option<String>| {
if existing.is_none() {
*existing = connector_description.clone();
}
})
.or_insert(connector_description);
}
let app_descriptions = if app_descriptions.is_empty() {
"None currently enabled.".to_string()
} else {
TOOL_SEARCH_DESCRIPTION_TEMPLATE.replace("{{app_names}}", app_names.as_str())
app_descriptions
.into_iter()
.map(
|(connector_name, connector_description)| match connector_description {
Some(connector_description) => {
format!("- {connector_name}: {connector_description}")
}
None => format!("- {connector_name}"),
},
)
.collect::<Vec<_>>()
.join("\n")
};
let description =
TOOL_SEARCH_DESCRIPTION_TEMPLATE.replace("{{app_descriptions}}", app_descriptions.as_str());
ToolSpec::ToolSearch {
execution: "client".to_string(),
description,

View File

@@ -1690,7 +1690,7 @@ fn test_build_specs_mcp_tools_sorted_by_name() {
}
#[test]
fn search_tool_description_includes_only_codex_apps_connector_names() {
fn search_tool_description_lists_each_codex_apps_connector_once() {
let model_info = search_capable_model_info();
let mut features = Features::with_defaults();
features.enable(Feature::Apps);
@@ -1736,7 +1736,45 @@ fn search_tool_description_includes_only_codex_apps_connector_names() {
connector_id: Some("calendar".to_string()),
connector_name: Some("Calendar".to_string()),
plugin_display_names: Vec::new(),
connector_description: None,
connector_description: Some(
"Plan events and manage your calendar.".to_string(),
),
},
),
(
"mcp__codex_apps__calendar_list_events".to_string(),
ToolInfo {
server_name: crate::mcp::CODEX_APPS_MCP_SERVER_NAME.to_string(),
tool_name: "_list_events".to_string(),
tool_namespace: "mcp__codex_apps__calendar".to_string(),
tool: mcp_tool(
"calendar-list-events",
"List calendar events",
serde_json::json!({"type": "object"}),
),
connector_id: Some("calendar".to_string()),
connector_name: Some("Calendar".to_string()),
plugin_display_names: Vec::new(),
connector_description: Some(
"Plan events and manage your calendar.".to_string(),
),
},
),
(
"mcp__codex_apps__gmail_search_threads".to_string(),
ToolInfo {
server_name: crate::mcp::CODEX_APPS_MCP_SERVER_NAME.to_string(),
tool_name: "_search_threads".to_string(),
tool_namespace: "mcp__codex_apps__gmail".to_string(),
tool: mcp_tool(
"gmail-search-threads",
"Search email threads",
serde_json::json!({"type": "object"}),
),
connector_id: Some("gmail".to_string()),
connector_name: Some("Gmail".to_string()),
plugin_display_names: Vec::new(),
connector_description: Some("Find and summarize email threads.".to_string()),
},
),
(
@@ -1762,7 +1800,14 @@ fn search_tool_description_includes_only_codex_apps_connector_names() {
panic!("expected tool_search tool");
};
let description = description.as_str();
assert!(description.contains("Calendar"));
assert!(description.contains("- Calendar: Plan events and manage your calendar."));
assert!(description.contains("- Gmail: Find and summarize email threads."));
assert_eq!(
description
.matches("- Calendar: Plan events and manage your calendar.")
.count(),
1
);
assert!(!description.contains("mcp__rmcp__echo"));
}
@@ -1874,8 +1919,56 @@ fn search_tool_description_handles_no_enabled_apps() {
panic!("expected tool_search tool");
};
assert!(description.contains("(None currently enabled)"));
assert!(!description.contains("{{app_names}}"));
assert!(description.contains("None currently enabled."));
assert!(!description.contains("{{app_descriptions}}"));
}
#[test]
fn search_tool_description_falls_back_to_connector_name_without_description() {
let model_info = search_capable_model_info();
let mut features = Features::with_defaults();
features.enable(Feature::Apps);
let available_models = Vec::new();
let tools_config = ToolsConfig::new(&ToolsConfigParams {
model_info: &model_info,
available_models: &available_models,
features: &features,
web_search_mode: Some(WebSearchMode::Cached),
session_source: SessionSource::Cli,
sandbox_policy: &SandboxPolicy::DangerFullAccess,
windows_sandbox_level: WindowsSandboxLevel::Disabled,
});
let (tools, _) = build_specs(
&tools_config,
None,
Some(HashMap::from([(
"mcp__codex_apps__calendar_create_event".to_string(),
ToolInfo {
server_name: crate::mcp::CODEX_APPS_MCP_SERVER_NAME.to_string(),
tool_name: "_create_event".to_string(),
tool_namespace: "mcp__codex_apps__calendar".to_string(),
tool: mcp_tool(
"calendar_create_event",
"Create calendar event",
serde_json::json!({"type": "object"}),
),
connector_id: Some("calendar".to_string()),
connector_name: Some("Calendar".to_string()),
plugin_display_names: Vec::new(),
connector_description: None,
},
)])),
&[],
)
.build();
let search_tool = find_tool(&tools, TOOL_SEARCH_TOOL_NAME);
let ToolSpec::ToolSearch { description, .. } = &search_tool.spec else {
panic!("expected tool_search tool");
};
assert!(description.contains("- Calendar"));
assert!(!description.contains("- Calendar:"));
}
#[test]

View File

@@ -37,6 +37,111 @@ macro_rules! feedback_tags {
};
}
pub(crate) struct FeedbackRequestTags<'a> {
pub endpoint: &'a str,
pub auth_header_attached: bool,
pub auth_header_name: Option<&'a str>,
pub auth_mode: Option<&'a str>,
pub auth_retry_after_unauthorized: Option<bool>,
pub auth_recovery_mode: Option<&'a str>,
pub auth_recovery_phase: Option<&'a str>,
pub auth_connection_reused: Option<bool>,
pub auth_request_id: Option<&'a str>,
pub auth_cf_ray: Option<&'a str>,
pub auth_error: Option<&'a str>,
pub auth_error_code: Option<&'a str>,
pub auth_recovery_followup_success: Option<bool>,
pub auth_recovery_followup_status: Option<u16>,
}
struct Auth401FeedbackSnapshot<'a> {
request_id: &'a str,
cf_ray: &'a str,
error: &'a str,
error_code: &'a str,
}
impl<'a> Auth401FeedbackSnapshot<'a> {
fn from_optional_fields(
request_id: Option<&'a str>,
cf_ray: Option<&'a str>,
error: Option<&'a str>,
error_code: Option<&'a str>,
) -> Self {
Self {
request_id: request_id.unwrap_or(""),
cf_ray: cf_ray.unwrap_or(""),
error: error.unwrap_or(""),
error_code: error_code.unwrap_or(""),
}
}
}
pub(crate) fn emit_feedback_request_tags(tags: &FeedbackRequestTags<'_>) {
let auth_header_name = tags.auth_header_name.unwrap_or("");
let auth_mode = tags.auth_mode.unwrap_or("");
let auth_retry_after_unauthorized = tags
.auth_retry_after_unauthorized
.map_or_else(String::new, |value| value.to_string());
let auth_recovery_mode = tags.auth_recovery_mode.unwrap_or("");
let auth_recovery_phase = tags.auth_recovery_phase.unwrap_or("");
let auth_connection_reused = tags
.auth_connection_reused
.map_or_else(String::new, |value| value.to_string());
let auth_request_id = tags.auth_request_id.unwrap_or("");
let auth_cf_ray = tags.auth_cf_ray.unwrap_or("");
let auth_error = tags.auth_error.unwrap_or("");
let auth_error_code = tags.auth_error_code.unwrap_or("");
let auth_recovery_followup_success = tags
.auth_recovery_followup_success
.map_or_else(String::new, |value| value.to_string());
let auth_recovery_followup_status = tags
.auth_recovery_followup_status
.map_or_else(String::new, |value| value.to_string());
feedback_tags!(
endpoint = tags.endpoint,
auth_header_attached = tags.auth_header_attached,
auth_header_name = auth_header_name,
auth_mode = auth_mode,
auth_retry_after_unauthorized = auth_retry_after_unauthorized,
auth_recovery_mode = auth_recovery_mode,
auth_recovery_phase = auth_recovery_phase,
auth_connection_reused = auth_connection_reused,
auth_request_id = auth_request_id,
auth_cf_ray = auth_cf_ray,
auth_error = auth_error,
auth_error_code = auth_error_code,
auth_recovery_followup_success = auth_recovery_followup_success,
auth_recovery_followup_status = auth_recovery_followup_status
);
}
pub(crate) fn emit_feedback_auth_recovery_tags(
auth_recovery_mode: &str,
auth_recovery_phase: &str,
auth_recovery_outcome: &str,
auth_request_id: Option<&str>,
auth_cf_ray: Option<&str>,
auth_error: Option<&str>,
auth_error_code: Option<&str>,
) {
let auth_401 = Auth401FeedbackSnapshot::from_optional_fields(
auth_request_id,
auth_cf_ray,
auth_error,
auth_error_code,
);
feedback_tags!(
auth_recovery_mode = auth_recovery_mode,
auth_recovery_phase = auth_recovery_phase,
auth_recovery_outcome = auth_recovery_outcome,
auth_401_request_id = auth_401.request_id,
auth_401_cf_ray = auth_401.cf_ray,
auth_401_error = auth_401.error,
auth_401_error_code = auth_401.error_code
);
}
pub fn backoff(attempt: u64) -> Duration {
let exp = BACKOFF_FACTOR.powi(attempt.saturating_sub(1) as i32);
let base = (INITIAL_DELAY_MS as f64 * exp) as u64;

View File

@@ -1,4 +1,15 @@
use super::*;
use std::collections::BTreeMap;
use std::sync::Arc;
use std::sync::Mutex;
use tracing::Event;
use tracing::Subscriber;
use tracing::field::Visit;
use tracing_subscriber::Layer;
use tracing_subscriber::layer::Context;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::registry::LookupSpan;
use tracing_subscriber::util::SubscriberInitExt;
#[test]
fn test_try_parse_error_message() {
@@ -32,6 +43,298 @@ fn feedback_tags_macro_compiles() {
feedback_tags!(model = "gpt-5", cached = true, debug_only = OnlyDebug);
}
#[derive(Default)]
struct TagCollectorVisitor {
tags: BTreeMap<String, String>,
}
impl Visit for TagCollectorVisitor {
fn record_bool(&mut self, field: &tracing::field::Field, value: bool) {
self.tags
.insert(field.name().to_string(), value.to_string());
}
fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
self.tags
.insert(field.name().to_string(), value.to_string());
}
fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
self.tags
.insert(field.name().to_string(), format!("{value:?}"));
}
}
#[derive(Clone)]
struct TagCollectorLayer {
tags: Arc<Mutex<BTreeMap<String, String>>>,
}
impl<S> Layer<S> for TagCollectorLayer
where
S: Subscriber + for<'a> LookupSpan<'a>,
{
fn on_event(&self, event: &Event<'_>, _ctx: Context<'_, S>) {
if event.metadata().target() != "feedback_tags" {
return;
}
let mut visitor = TagCollectorVisitor::default();
event.record(&mut visitor);
self.tags.lock().unwrap().extend(visitor.tags);
}
}
#[test]
fn emit_feedback_request_tags_records_sentry_feedback_fields() {
let tags = Arc::new(Mutex::new(BTreeMap::new()));
let _guard = tracing_subscriber::registry()
.with(TagCollectorLayer { tags: tags.clone() })
.set_default();
emit_feedback_request_tags(&FeedbackRequestTags {
endpoint: "/responses",
auth_header_attached: true,
auth_header_name: Some("authorization"),
auth_mode: Some("chatgpt"),
auth_retry_after_unauthorized: Some(false),
auth_recovery_mode: Some("managed"),
auth_recovery_phase: Some("refresh_token"),
auth_connection_reused: Some(true),
auth_request_id: Some("req-123"),
auth_cf_ray: Some("ray-123"),
auth_error: Some("missing_authorization_header"),
auth_error_code: Some("token_expired"),
auth_recovery_followup_success: Some(true),
auth_recovery_followup_status: Some(200),
});
let tags = tags.lock().unwrap().clone();
assert_eq!(
tags.get("endpoint").map(String::as_str),
Some("\"/responses\"")
);
assert_eq!(
tags.get("auth_header_attached").map(String::as_str),
Some("true")
);
assert_eq!(
tags.get("auth_header_name").map(String::as_str),
Some("\"authorization\"")
);
assert_eq!(
tags.get("auth_request_id").map(String::as_str),
Some("\"req-123\"")
);
assert_eq!(
tags.get("auth_error_code").map(String::as_str),
Some("\"token_expired\"")
);
assert_eq!(
tags.get("auth_recovery_followup_success")
.map(String::as_str),
Some("\"true\"")
);
assert_eq!(
tags.get("auth_recovery_followup_status")
.map(String::as_str),
Some("\"200\"")
);
}
#[test]
fn emit_feedback_auth_recovery_tags_preserves_401_specific_fields() {
let tags = Arc::new(Mutex::new(BTreeMap::new()));
let _guard = tracing_subscriber::registry()
.with(TagCollectorLayer { tags: tags.clone() })
.set_default();
emit_feedback_auth_recovery_tags(
"managed",
"refresh_token",
"recovery_succeeded",
Some("req-401"),
Some("ray-401"),
Some("missing_authorization_header"),
Some("token_expired"),
);
let tags = tags.lock().unwrap().clone();
assert_eq!(
tags.get("auth_401_request_id").map(String::as_str),
Some("\"req-401\"")
);
assert_eq!(
tags.get("auth_401_cf_ray").map(String::as_str),
Some("\"ray-401\"")
);
assert_eq!(
tags.get("auth_401_error").map(String::as_str),
Some("\"missing_authorization_header\"")
);
assert_eq!(
tags.get("auth_401_error_code").map(String::as_str),
Some("\"token_expired\"")
);
}
#[test]
fn emit_feedback_auth_recovery_tags_clears_stale_401_fields() {
let tags = Arc::new(Mutex::new(BTreeMap::new()));
let _guard = tracing_subscriber::registry()
.with(TagCollectorLayer { tags: tags.clone() })
.set_default();
emit_feedback_auth_recovery_tags(
"managed",
"refresh_token",
"recovery_failed_transient",
Some("req-401-a"),
Some("ray-401-a"),
Some("missing_authorization_header"),
Some("token_expired"),
);
emit_feedback_auth_recovery_tags(
"managed",
"done",
"recovery_not_run",
Some("req-401-b"),
None,
None,
None,
);
let tags = tags.lock().unwrap().clone();
assert_eq!(
tags.get("auth_401_request_id").map(String::as_str),
Some("\"req-401-b\"")
);
assert_eq!(
tags.get("auth_401_cf_ray").map(String::as_str),
Some("\"\"")
);
assert_eq!(tags.get("auth_401_error").map(String::as_str), Some("\"\""));
assert_eq!(
tags.get("auth_401_error_code").map(String::as_str),
Some("\"\"")
);
}
#[test]
fn emit_feedback_request_tags_preserves_latest_auth_fields_after_unauthorized() {
let tags = Arc::new(Mutex::new(BTreeMap::new()));
let _guard = tracing_subscriber::registry()
.with(TagCollectorLayer { tags: tags.clone() })
.set_default();
emit_feedback_request_tags(&FeedbackRequestTags {
endpoint: "/responses",
auth_header_attached: true,
auth_header_name: Some("authorization"),
auth_mode: Some("chatgpt"),
auth_retry_after_unauthorized: Some(true),
auth_recovery_mode: Some("managed"),
auth_recovery_phase: Some("refresh_token"),
auth_connection_reused: None,
auth_request_id: Some("req-123"),
auth_cf_ray: Some("ray-123"),
auth_error: Some("missing_authorization_header"),
auth_error_code: Some("token_expired"),
auth_recovery_followup_success: Some(false),
auth_recovery_followup_status: Some(401),
});
let tags = tags.lock().unwrap().clone();
assert_eq!(
tags.get("auth_request_id").map(String::as_str),
Some("\"req-123\"")
);
assert_eq!(
tags.get("auth_cf_ray").map(String::as_str),
Some("\"ray-123\"")
);
assert_eq!(
tags.get("auth_error").map(String::as_str),
Some("\"missing_authorization_header\"")
);
assert_eq!(
tags.get("auth_error_code").map(String::as_str),
Some("\"token_expired\"")
);
assert_eq!(
tags.get("auth_recovery_followup_success")
.map(String::as_str),
Some("\"false\"")
);
}
#[test]
fn emit_feedback_request_tags_clears_stale_latest_auth_fields() {
let tags = Arc::new(Mutex::new(BTreeMap::new()));
let _guard = tracing_subscriber::registry()
.with(TagCollectorLayer { tags: tags.clone() })
.set_default();
emit_feedback_request_tags(&FeedbackRequestTags {
endpoint: "/responses",
auth_header_attached: true,
auth_header_name: Some("authorization"),
auth_mode: Some("chatgpt"),
auth_retry_after_unauthorized: Some(false),
auth_recovery_mode: Some("managed"),
auth_recovery_phase: Some("refresh_token"),
auth_connection_reused: Some(true),
auth_request_id: Some("req-123"),
auth_cf_ray: Some("ray-123"),
auth_error: Some("missing_authorization_header"),
auth_error_code: Some("token_expired"),
auth_recovery_followup_success: Some(true),
auth_recovery_followup_status: Some(200),
});
emit_feedback_request_tags(&FeedbackRequestTags {
endpoint: "/responses",
auth_header_attached: true,
auth_header_name: None,
auth_mode: None,
auth_retry_after_unauthorized: None,
auth_recovery_mode: None,
auth_recovery_phase: None,
auth_connection_reused: None,
auth_request_id: None,
auth_cf_ray: None,
auth_error: None,
auth_error_code: None,
auth_recovery_followup_success: None,
auth_recovery_followup_status: None,
});
let tags = tags.lock().unwrap().clone();
assert_eq!(
tags.get("auth_header_name").map(String::as_str),
Some("\"\"")
);
assert_eq!(tags.get("auth_mode").map(String::as_str), Some("\"\""));
assert_eq!(
tags.get("auth_request_id").map(String::as_str),
Some("\"\"")
);
assert_eq!(tags.get("auth_cf_ray").map(String::as_str), Some("\"\""));
assert_eq!(tags.get("auth_error").map(String::as_str), Some("\"\""));
assert_eq!(
tags.get("auth_error_code").map(String::as_str),
Some("\"\"")
);
assert_eq!(
tags.get("auth_recovery_followup_success")
.map(String::as_str),
Some("\"\"")
);
assert_eq!(
tags.get("auth_recovery_followup_status")
.map(String::as_str),
Some("\"\"")
);
}
#[test]
fn normalize_thread_name_trims_and_rejects_empty() {
assert_eq!(normalize_thread_name(" "), None);

View File

@@ -2,5 +2,6 @@
Searches over apps/connectors tool metadata with BM25 and exposes matching tools for the next model call.
Tools of the apps ({{app_names}}) are hidden until you search for them with this tool (`tool_search`).
When the request needs one of these connectors and you don't already have the required tools from it, use this tool to load them. For the apps mentioned above, always prefer `tool_search` over `list_mcp_resources` or `list_mcp_resource_templates` for tool discovery.
You have access to all the tools of the following apps/connectors:
{{app_descriptions}}
Some of the tools may not have been provided to you upfront, and you should use this tool (`tool_search`) to search for the required tools and load them for the apps mentioned above. For the apps mentioned above, always use `tool_search` instead of `list_mcp_resources` or `list_mcp_resource_templates` for tool discovery.

View File

@@ -18,6 +18,9 @@ const CONNECTOR_DESCRIPTION: &str = "Plan events and manage your calendar.";
const PROTOCOL_VERSION: &str = "2025-11-25";
const SERVER_NAME: &str = "codex-apps-test";
const SERVER_VERSION: &str = "1.0.0";
pub const CALENDAR_CREATE_EVENT_RESOURCE_URI: &str =
"connector://calendar/tools/calendar_create_event";
const CALENDAR_LIST_EVENTS_RESOURCE_URI: &str = "connector://calendar/tools/calendar_list_events";
#[derive(Clone)]
pub struct AppsTestServer {
@@ -175,7 +178,12 @@ impl Respond for CodexAppsJsonRpcResponder {
"_meta": {
"connector_id": CONNECTOR_ID,
"connector_name": self.connector_name.clone(),
"connector_description": self.connector_description.clone()
"connector_description": self.connector_description.clone(),
"_codex_apps": {
"resource_uri": CALENDAR_CREATE_EVENT_RESOURCE_URI,
"contains_mcp_source": true,
"connector_id": CONNECTOR_ID
}
}
},
{
@@ -192,7 +200,12 @@ impl Respond for CodexAppsJsonRpcResponder {
"_meta": {
"connector_id": CONNECTOR_ID,
"connector_name": self.connector_name.clone(),
"connector_description": self.connector_description.clone()
"connector_description": self.connector_description.clone(),
"_codex_apps": {
"resource_uri": CALENDAR_LIST_EVENTS_RESOURCE_URI,
"contains_mcp_source": true,
"connector_id": CONNECTOR_ID
}
}
}
],
@@ -214,6 +227,7 @@ impl Respond for CodexAppsJsonRpcResponder {
.pointer("/params/arguments/starts_at")
.and_then(Value::as_str)
.unwrap_or_default();
let codex_apps_meta = body.pointer("/params/_meta/_codex_apps").cloned();
ResponseTemplate::new(200).set_body_json(json!({
"jsonrpc": "2.0",
@@ -223,6 +237,9 @@ impl Respond for CodexAppsJsonRpcResponder {
"type": "text",
"text": format!("called {tool_name} for {title} at {starts_at}")
}],
"structuredContent": {
"_codex_apps": codex_apps_meta,
},
"isError": false
}
}))

View File

@@ -943,10 +943,6 @@ async fn includes_apps_guidance_as_developer_message_for_chatgpt_auth() {
.features
.enable(Feature::Apps)
.expect("test config should allow feature update");
config
.features
.disable(Feature::AppsMcpGateway)
.expect("test config should allow feature update");
config.chatgpt_base_url = apps_base_url;
});
let codex = builder
@@ -971,7 +967,8 @@ async fn includes_apps_guidance_as_developer_message_for_chatgpt_auth() {
let request = resp_mock.single_request();
let request_body = request.body_json();
let input = request_body["input"].as_array().expect("input array");
let apps_snippet = "Apps are mentioned in user messages in the format";
let apps_snippet =
"Apps (Connectors) can be explicitly triggered in user messages in the format";
let has_developer_apps_guidance = input.iter().any(|item| {
item.get("role").and_then(|value| value.as_str()) == Some("developer")
@@ -1034,10 +1031,6 @@ async fn omits_apps_guidance_for_api_key_auth_even_when_feature_enabled() {
.features
.enable(Feature::Apps)
.expect("test config should allow feature update");
config
.features
.disable(Feature::AppsMcpGateway)
.expect("test config should allow feature update");
config.chatgpt_base_url = apps_base_url;
});
let codex = builder
@@ -1062,7 +1055,8 @@ async fn omits_apps_guidance_for_api_key_auth_even_when_feature_enabled() {
let request = resp_mock.single_request();
let request_body = request.body_json();
let input = request_body["input"].as_array().expect("input array");
let apps_snippet = "Apps are mentioned in the prompt in the format";
let apps_snippet =
"Apps (Connectors) can be explicitly triggered in user messages in the format";
let has_apps_guidance = input.iter().any(|item| {
item.get("content")

View File

@@ -142,10 +142,6 @@ async fn build_apps_enabled_plugin_test_codex(
.features
.enable(Feature::Apps)
.expect("test config should allow feature update");
config
.features
.disable(Feature::AppsMcpGateway)
.expect("test config should allow feature update");
config.chatgpt_base_url = chatgpt_base_url;
});
Ok(builder

View File

@@ -13,6 +13,7 @@ use codex_protocol::protocol::Op;
use codex_protocol::protocol::SandboxPolicy;
use codex_protocol::user_input::UserInput;
use core_test_support::apps_test_server::AppsTestServer;
use core_test_support::apps_test_server::CALENDAR_CREATE_EVENT_RESOURCE_URI;
use core_test_support::responses::ResponsesRequest;
use core_test_support::responses::ev_assistant_message;
use core_test_support::responses::ev_completed;
@@ -30,8 +31,9 @@ use pretty_assertions::assert_eq;
use serde_json::Value;
use serde_json::json;
const SEARCH_TOOL_DESCRIPTION_SNIPPETS: [&str; 1] = [
"Tools of the apps (Calendar) are hidden until you search for them with this tool (`tool_search`).",
const SEARCH_TOOL_DESCRIPTION_SNIPPETS: [&str; 2] = [
"You have access to all the tools of the following apps/connectors",
"- Calendar: Plan events and manage your calendar.",
];
const TOOL_SEARCH_TOOL_NAME: &str = "tool_search";
const CALENDAR_CREATE_TOOL: &str = "mcp__codex_apps__calendar_create_event";
@@ -89,10 +91,6 @@ fn configure_apps(config: &mut Config, apps_base_url: &str) {
.features
.enable(Feature::Apps)
.expect("test config should allow feature update");
config
.features
.disable(Feature::AppsMcpGateway)
.expect("test config should allow feature update");
config.chatgpt_base_url = apps_base_url.to_string();
config.model = Some("gpt-5-codex".to_string());
@@ -404,6 +402,19 @@ async fn tool_search_returns_deferred_tools_without_follow_up_tool_injection() -
})),
}
);
assert_eq!(
end.result
.as_ref()
.expect("tool call should succeed")
.structured_content,
Some(json!({
"_codex_apps": {
"resource_uri": CALENDAR_CREATE_EVENT_RESOURCE_URI,
"contains_mcp_source": true,
"connector_id": "calendar",
},
}))
);
wait_for_event(&test.codex, |event| {
matches!(event, EventMsg::TurnComplete(_))

View File

@@ -340,17 +340,43 @@ impl SessionTelemetry {
Ok(response) => (Some(response.status().as_u16()), None),
Err(error) => (error.status().map(|s| s.as_u16()), Some(error.to_string())),
};
self.record_api_request(attempt, status, error.as_deref(), duration);
self.record_api_request(
attempt,
status,
error.as_deref(),
duration,
false,
None,
false,
None,
None,
"unknown",
None,
None,
None,
None,
);
response
}
#[allow(clippy::too_many_arguments)]
pub fn record_api_request(
&self,
attempt: u64,
status: Option<u16>,
error: Option<&str>,
duration: Duration,
auth_header_attached: bool,
auth_header_name: Option<&str>,
retry_after_unauthorized: bool,
recovery_mode: Option<&str>,
recovery_phase: Option<&str>,
endpoint: &str,
request_id: Option<&str>,
cf_ray: Option<&str>,
auth_error: Option<&str>,
auth_error_code: Option<&str>,
) {
let success = status.is_some_and(|code| (200..=299).contains(&code)) && error.is_none();
let success_str = if success { "true" } else { "false" };
@@ -375,13 +401,76 @@ impl SessionTelemetry {
http.response.status_code = status,
error.message = error,
attempt = attempt,
auth.header_attached = auth_header_attached,
auth.header_name = auth_header_name,
auth.retry_after_unauthorized = retry_after_unauthorized,
auth.recovery_mode = recovery_mode,
auth.recovery_phase = recovery_phase,
endpoint = endpoint,
auth.request_id = request_id,
auth.cf_ray = cf_ray,
auth.error = auth_error,
auth.error_code = auth_error_code,
},
log: {},
trace: {},
);
}
pub fn record_websocket_request(&self, duration: Duration, error: Option<&str>) {
#[allow(clippy::too_many_arguments)]
pub fn record_websocket_connect(
&self,
duration: Duration,
status: Option<u16>,
error: Option<&str>,
auth_header_attached: bool,
auth_header_name: Option<&str>,
retry_after_unauthorized: bool,
recovery_mode: Option<&str>,
recovery_phase: Option<&str>,
endpoint: &str,
connection_reused: bool,
request_id: Option<&str>,
cf_ray: Option<&str>,
auth_error: Option<&str>,
auth_error_code: Option<&str>,
) {
let success = error.is_none()
&& status
.map(|code| (200..=299).contains(&code))
.unwrap_or(true);
let success_str = if success { "true" } else { "false" };
log_and_trace_event!(
self,
common: {
event.name = "codex.websocket_connect",
duration_ms = %duration.as_millis(),
http.response.status_code = status,
success = success_str,
error.message = error,
auth.header_attached = auth_header_attached,
auth.header_name = auth_header_name,
auth.retry_after_unauthorized = retry_after_unauthorized,
auth.recovery_mode = recovery_mode,
auth.recovery_phase = recovery_phase,
endpoint = endpoint,
auth.connection_reused = connection_reused,
auth.request_id = request_id,
auth.cf_ray = cf_ray,
auth.error = auth_error,
auth.error_code = auth_error_code,
},
log: {},
trace: {},
);
}
pub fn record_websocket_request(
&self,
duration: Duration,
error: Option<&str>,
connection_reused: bool,
) {
let success_str = if error.is_none() { "true" } else { "false" };
self.counter(
WEBSOCKET_REQUEST_COUNT_METRIC,
@@ -400,6 +489,39 @@ impl SessionTelemetry {
duration_ms = %duration.as_millis(),
success = success_str,
error.message = error,
auth.connection_reused = connection_reused,
},
log: {},
trace: {},
);
}
#[allow(clippy::too_many_arguments)]
pub fn record_auth_recovery(
&self,
mode: &str,
step: &str,
outcome: &str,
request_id: Option<&str>,
cf_ray: Option<&str>,
auth_error: Option<&str>,
auth_error_code: Option<&str>,
recovery_reason: Option<&str>,
auth_state_changed: Option<bool>,
) {
log_and_trace_event!(
self,
common: {
event.name = "codex.auth_recovery",
auth.mode = mode,
auth.step = step,
auth.outcome = outcome,
auth.request_id = request_id,
auth.cf_ray = cf_ray,
auth.error = auth_error,
auth.error_code = auth_error_code,
auth.recovery_reason = recovery_reason,
auth.state_changed = auth_state_changed,
},
log: {},
trace: {},

View File

@@ -297,3 +297,462 @@ fn otel_export_routing_policy_routes_tool_result_log_and_trace_events() {
assert!(!tool_trace_attrs.contains_key("mcp_server"));
assert!(!tool_trace_attrs.contains_key("mcp_server_origin"));
}
#[test]
fn otel_export_routing_policy_routes_auth_recovery_log_and_trace_events() {
let log_exporter = InMemoryLogExporter::default();
let logger_provider = SdkLoggerProvider::builder()
.with_simple_exporter(log_exporter.clone())
.build();
let span_exporter = InMemorySpanExporter::default();
let tracer_provider = SdkTracerProvider::builder()
.with_simple_exporter(span_exporter.clone())
.build();
let tracer = tracer_provider.tracer("sink-split-test");
let subscriber = tracing_subscriber::registry()
.with(
opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge::new(
&logger_provider,
)
.with_filter(filter_fn(OtelProvider::log_export_filter)),
)
.with(
tracing_opentelemetry::layer()
.with_tracer(tracer)
.with_filter(filter_fn(OtelProvider::trace_export_filter)),
);
tracing::subscriber::with_default(subscriber, || {
tracing::callsite::rebuild_interest_cache();
let manager = SessionTelemetry::new(
ThreadId::new(),
"gpt-5.1",
"gpt-5.1",
Some("account-id".to_string()),
Some("engineer@example.com".to_string()),
Some(TelemetryAuthMode::Chatgpt),
"codex_exec".to_string(),
true,
"tty".to_string(),
SessionSource::Cli,
);
let root_span = tracing::info_span!("root");
let _root_guard = root_span.enter();
manager.record_auth_recovery(
"managed",
"reload",
"recovery_succeeded",
Some("req-401"),
Some("ray-401"),
Some("missing_authorization_header"),
Some("token_expired"),
None,
Some(true),
);
});
logger_provider.force_flush().expect("flush logs");
tracer_provider.force_flush().expect("flush traces");
let logs = log_exporter.get_emitted_logs().expect("log export");
let recovery_log = find_log_by_event_name(&logs, "codex.auth_recovery");
let recovery_log_attrs = log_attributes(&recovery_log.record);
assert_eq!(
recovery_log_attrs.get("auth.mode").map(String::as_str),
Some("managed")
);
assert_eq!(
recovery_log_attrs.get("auth.step").map(String::as_str),
Some("reload")
);
assert_eq!(
recovery_log_attrs.get("auth.outcome").map(String::as_str),
Some("recovery_succeeded")
);
assert_eq!(
recovery_log_attrs
.get("auth.request_id")
.map(String::as_str),
Some("req-401")
);
assert_eq!(
recovery_log_attrs.get("auth.cf_ray").map(String::as_str),
Some("ray-401")
);
assert_eq!(
recovery_log_attrs.get("auth.error").map(String::as_str),
Some("missing_authorization_header")
);
assert_eq!(
recovery_log_attrs
.get("auth.error_code")
.map(String::as_str),
Some("token_expired")
);
assert_eq!(
recovery_log_attrs
.get("auth.state_changed")
.map(String::as_str),
Some("true")
);
let spans = span_exporter.get_finished_spans().expect("span export");
assert_eq!(spans.len(), 1);
let span_events = &spans[0].events.events;
assert_eq!(span_events.len(), 1);
let recovery_trace_event = find_span_event_by_name_attr(span_events, "codex.auth_recovery");
let recovery_trace_attrs = span_event_attributes(recovery_trace_event);
assert_eq!(
recovery_trace_attrs.get("auth.mode").map(String::as_str),
Some("managed")
);
assert_eq!(
recovery_trace_attrs.get("auth.step").map(String::as_str),
Some("reload")
);
assert_eq!(
recovery_trace_attrs.get("auth.outcome").map(String::as_str),
Some("recovery_succeeded")
);
assert_eq!(
recovery_trace_attrs
.get("auth.request_id")
.map(String::as_str),
Some("req-401")
);
assert_eq!(
recovery_trace_attrs.get("auth.cf_ray").map(String::as_str),
Some("ray-401")
);
assert_eq!(
recovery_trace_attrs.get("auth.error").map(String::as_str),
Some("missing_authorization_header")
);
assert_eq!(
recovery_trace_attrs
.get("auth.error_code")
.map(String::as_str),
Some("token_expired")
);
assert_eq!(
recovery_trace_attrs
.get("auth.state_changed")
.map(String::as_str),
Some("true")
);
}
#[test]
fn otel_export_routing_policy_routes_api_request_auth_observability() {
let log_exporter = InMemoryLogExporter::default();
let logger_provider = SdkLoggerProvider::builder()
.with_simple_exporter(log_exporter.clone())
.build();
let span_exporter = InMemorySpanExporter::default();
let tracer_provider = SdkTracerProvider::builder()
.with_simple_exporter(span_exporter.clone())
.build();
let tracer = tracer_provider.tracer("sink-split-test");
let subscriber = tracing_subscriber::registry()
.with(
opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge::new(
&logger_provider,
)
.with_filter(filter_fn(OtelProvider::log_export_filter)),
)
.with(
tracing_opentelemetry::layer()
.with_tracer(tracer)
.with_filter(filter_fn(OtelProvider::trace_export_filter)),
);
tracing::subscriber::with_default(subscriber, || {
tracing::callsite::rebuild_interest_cache();
let manager = SessionTelemetry::new(
ThreadId::new(),
"gpt-5.1",
"gpt-5.1",
Some("account-id".to_string()),
Some("engineer@example.com".to_string()),
Some(TelemetryAuthMode::Chatgpt),
"codex_exec".to_string(),
true,
"tty".to_string(),
SessionSource::Cli,
);
let root_span = tracing::info_span!("root");
let _root_guard = root_span.enter();
manager.record_api_request(
1,
Some(401),
Some("http 401"),
std::time::Duration::from_millis(42),
true,
Some("authorization"),
true,
Some("managed"),
Some("refresh_token"),
"/responses",
Some("req-401"),
Some("ray-401"),
Some("missing_authorization_header"),
Some("token_expired"),
);
});
logger_provider.force_flush().expect("flush logs");
tracer_provider.force_flush().expect("flush traces");
let logs = log_exporter.get_emitted_logs().expect("log export");
let request_log = find_log_by_event_name(&logs, "codex.api_request");
let request_log_attrs = log_attributes(&request_log.record);
assert_eq!(
request_log_attrs
.get("auth.header_attached")
.map(String::as_str),
Some("true")
);
assert_eq!(
request_log_attrs
.get("auth.header_name")
.map(String::as_str),
Some("authorization")
);
assert_eq!(
request_log_attrs
.get("auth.retry_after_unauthorized")
.map(String::as_str),
Some("true")
);
assert_eq!(
request_log_attrs
.get("auth.recovery_mode")
.map(String::as_str),
Some("managed")
);
assert_eq!(
request_log_attrs
.get("auth.recovery_phase")
.map(String::as_str),
Some("refresh_token")
);
assert_eq!(
request_log_attrs.get("endpoint").map(String::as_str),
Some("/responses")
);
assert_eq!(
request_log_attrs.get("auth.error").map(String::as_str),
Some("missing_authorization_header")
);
let spans = span_exporter.get_finished_spans().expect("span export");
let request_trace_event =
find_span_event_by_name_attr(&spans[0].events.events, "codex.api_request");
let request_trace_attrs = span_event_attributes(request_trace_event);
assert_eq!(
request_trace_attrs
.get("auth.header_attached")
.map(String::as_str),
Some("true")
);
assert_eq!(
request_trace_attrs
.get("auth.header_name")
.map(String::as_str),
Some("authorization")
);
assert_eq!(
request_trace_attrs
.get("auth.retry_after_unauthorized")
.map(String::as_str),
Some("true")
);
assert_eq!(
request_trace_attrs.get("endpoint").map(String::as_str),
Some("/responses")
);
}
#[test]
fn otel_export_routing_policy_routes_websocket_connect_auth_observability() {
let log_exporter = InMemoryLogExporter::default();
let logger_provider = SdkLoggerProvider::builder()
.with_simple_exporter(log_exporter.clone())
.build();
let span_exporter = InMemorySpanExporter::default();
let tracer_provider = SdkTracerProvider::builder()
.with_simple_exporter(span_exporter.clone())
.build();
let tracer = tracer_provider.tracer("sink-split-test");
let subscriber = tracing_subscriber::registry()
.with(
opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge::new(
&logger_provider,
)
.with_filter(filter_fn(OtelProvider::log_export_filter)),
)
.with(
tracing_opentelemetry::layer()
.with_tracer(tracer)
.with_filter(filter_fn(OtelProvider::trace_export_filter)),
);
tracing::subscriber::with_default(subscriber, || {
tracing::callsite::rebuild_interest_cache();
let manager = SessionTelemetry::new(
ThreadId::new(),
"gpt-5.1",
"gpt-5.1",
Some("account-id".to_string()),
Some("engineer@example.com".to_string()),
Some(TelemetryAuthMode::Chatgpt),
"codex_exec".to_string(),
true,
"tty".to_string(),
SessionSource::Cli,
);
let root_span = tracing::info_span!("root");
let _root_guard = root_span.enter();
manager.record_websocket_connect(
std::time::Duration::from_millis(17),
Some(401),
Some("http 401"),
true,
Some("authorization"),
true,
Some("managed"),
Some("reload"),
"/responses",
false,
Some("req-ws-401"),
Some("ray-ws-401"),
Some("missing_authorization_header"),
Some("token_expired"),
);
});
logger_provider.force_flush().expect("flush logs");
tracer_provider.force_flush().expect("flush traces");
let logs = log_exporter.get_emitted_logs().expect("log export");
let connect_log = find_log_by_event_name(&logs, "codex.websocket_connect");
let connect_log_attrs = log_attributes(&connect_log.record);
assert_eq!(
connect_log_attrs
.get("auth.header_attached")
.map(String::as_str),
Some("true")
);
assert_eq!(
connect_log_attrs
.get("auth.header_name")
.map(String::as_str),
Some("authorization")
);
assert_eq!(
connect_log_attrs.get("auth.error").map(String::as_str),
Some("missing_authorization_header")
);
assert_eq!(
connect_log_attrs.get("endpoint").map(String::as_str),
Some("/responses")
);
assert_eq!(
connect_log_attrs
.get("auth.connection_reused")
.map(String::as_str),
Some("false")
);
let spans = span_exporter.get_finished_spans().expect("span export");
let connect_trace_event =
find_span_event_by_name_attr(&spans[0].events.events, "codex.websocket_connect");
let connect_trace_attrs = span_event_attributes(connect_trace_event);
assert_eq!(
connect_trace_attrs
.get("auth.recovery_phase")
.map(String::as_str),
Some("reload")
);
}
#[test]
fn otel_export_routing_policy_routes_websocket_request_transport_observability() {
let log_exporter = InMemoryLogExporter::default();
let logger_provider = SdkLoggerProvider::builder()
.with_simple_exporter(log_exporter.clone())
.build();
let span_exporter = InMemorySpanExporter::default();
let tracer_provider = SdkTracerProvider::builder()
.with_simple_exporter(span_exporter.clone())
.build();
let tracer = tracer_provider.tracer("sink-split-test");
let subscriber = tracing_subscriber::registry()
.with(
opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge::new(
&logger_provider,
)
.with_filter(filter_fn(OtelProvider::log_export_filter)),
)
.with(
tracing_opentelemetry::layer()
.with_tracer(tracer)
.with_filter(filter_fn(OtelProvider::trace_export_filter)),
);
tracing::subscriber::with_default(subscriber, || {
tracing::callsite::rebuild_interest_cache();
let manager = SessionTelemetry::new(
ThreadId::new(),
"gpt-5.1",
"gpt-5.1",
Some("account-id".to_string()),
Some("engineer@example.com".to_string()),
Some(TelemetryAuthMode::Chatgpt),
"codex_exec".to_string(),
true,
"tty".to_string(),
SessionSource::Cli,
);
let root_span = tracing::info_span!("root");
let _root_guard = root_span.enter();
manager.record_websocket_request(
std::time::Duration::from_millis(23),
Some("stream error"),
true,
);
});
logger_provider.force_flush().expect("flush logs");
tracer_provider.force_flush().expect("flush traces");
let logs = log_exporter.get_emitted_logs().expect("log export");
let request_log = find_log_by_event_name(&logs, "codex.websocket_request");
let request_log_attrs = log_attributes(&request_log.record);
assert_eq!(
request_log_attrs
.get("auth.connection_reused")
.map(String::as_str),
Some("true")
);
assert_eq!(
request_log_attrs.get("error.message").map(String::as_str),
Some("stream error")
);
let spans = span_exporter.get_finished_spans().expect("span export");
let request_trace_event =
find_span_event_by_name_attr(&spans[0].events.events, "codex.websocket_request");
let request_trace_attrs = span_event_attributes(request_trace_event);
assert_eq!(
request_trace_attrs
.get("auth.connection_reused")
.map(String::as_str),
Some("true")
);
}

View File

@@ -47,8 +47,23 @@ fn runtime_metrics_summary_collects_tool_api_and_streaming_metrics() -> Result<(
None,
None,
);
manager.record_api_request(1, Some(200), None, Duration::from_millis(300));
manager.record_websocket_request(Duration::from_millis(400), None);
manager.record_api_request(
1,
Some(200),
None,
Duration::from_millis(300),
false,
None,
false,
None,
None,
"/responses",
None,
None,
None,
None,
);
manager.record_websocket_request(Duration::from_millis(400), None, false);
let sse_response: std::result::Result<
Option<std::result::Result<StreamEvent, eventsource_stream::EventStreamError<&str>>>,
tokio::time::error::Elapsed,

View File

@@ -700,6 +700,7 @@ impl RmcpClient {
&self,
name: String,
arguments: Option<serde_json::Value>,
meta: Option<serde_json::Value>,
timeout: Option<Duration>,
) -> Result<CallToolResult> {
self.refresh_oauth_if_needed().await;
@@ -712,8 +713,17 @@ impl RmcpClient {
}
None => None,
};
let meta = match meta {
Some(Value::Object(map)) => Some(rmcp::model::Meta(map)),
Some(other) => {
return Err(anyhow!(
"MCP tool request _meta must be a JSON object, got {other}"
));
}
None => None,
};
let rmcp_params = CallToolRequestParams {
meta: None,
meta,
name: name.into(),
arguments,
task: None,

View File

@@ -105,6 +105,7 @@ async fn call_echo_tool(client: &RmcpClient, message: &str) -> anyhow::Result<Ca
.call_tool(
"echo".to_string(),
Some(json!({ "message": message })),
None,
Some(Duration::from_secs(5)),
)
.await

View File

@@ -11,9 +11,16 @@ cd sdk/python
python -m pip install -e .
```
Published SDK builds pin an exact `codex-cli-bin` runtime dependency. For local
repo development, pass `AppServerConfig(codex_bin=...)` to point at a local
build explicitly.
This checked-in package is the runtime-free core distribution:
`codex-app-server-sdk-core`.
Published releases expose two Python package names:
- `codex-app-server-sdk-core`: the actual Python SDK code, without bundled binaries
- `codex-app-server-sdk`: a bundled metapackage that depends on `codex-app-server-sdk-core` and `codex-cli-bin`
For local repo development, pass `AppServerConfig(codex_bin=...)` to point at
a local build explicitly.
## Quickstart
@@ -48,9 +55,9 @@ python examples/01_quickstart_constructor/async.py
The repo no longer checks `codex` binaries into `sdk/python`.
Published SDK builds are pinned to an exact `codex-cli-bin` package version,
and that runtime package carries the platform-specific binary for the target
wheel.
Published `codex-app-server-sdk` builds depend on an exact
`codex-app-server-sdk-core` version plus an exact `codex-cli-bin` version, and
that runtime package carries the platform-specific binary for the target wheel.
For local repo development, the checked-in `sdk/python-runtime` package is only
a template for staged release artifacts. Editable installs should use an
@@ -61,9 +68,14 @@ explicit `codex_bin` override instead.
```bash
cd sdk/python
python scripts/update_sdk_artifacts.py generate-types
python scripts/update_sdk_artifacts.py \
stage-sdk-core \
/tmp/codex-python-release/codex-app-server-sdk-core \
--sdk-version 1.2.3
python scripts/update_sdk_artifacts.py \
stage-sdk \
/tmp/codex-python-release/codex-app-server-sdk \
--sdk-version 1.2.3 \
--runtime-version 1.2.3
python scripts/update_sdk_artifacts.py \
stage-runtime \
@@ -75,17 +87,22 @@ python scripts/update_sdk_artifacts.py \
This supports the CI release flow:
- run `generate-types` before packaging
- stage `codex-app-server-sdk` once with an exact `codex-cli-bin==...` dependency
- stage `codex-app-server-sdk-core` with the release tag version as `--sdk-version`
- stage `codex-app-server-sdk` as a bundled metapackage pinned to exact `codex-app-server-sdk-core==...` and `codex-cli-bin==...`
- stage `codex-cli-bin` on each supported platform runner with the same pinned runtime version
- build and publish `codex-cli-bin` as platform wheels only; do not publish an sdist
- publish `codex-app-server-sdk-core` to PyPI after the runtime wheels land
- publish `codex-app-server-sdk` to PyPI last, using the same release version
## Compatibility and versioning
- Package: `codex-app-server-sdk`
- Core package: `codex-app-server-sdk-core`
- Bundled package: `codex-app-server-sdk`
- Runtime package: `codex-cli-bin`
- Current SDK version in this repo: `0.2.0`
- Python: `>=3.10`
- Target protocol: Codex `app-server` JSON-RPC v2
- Release policy: published Python package versions should match the `rust-v...` release tag
- Recommendation: keep SDK and `codex` CLI reasonably up to date together
## Notes

View File

@@ -38,16 +38,23 @@ Common causes:
- local auth/session is missing
- incompatible/old app-server
Maintainers stage releases by building the SDK once and the runtime once per
platform with the same pinned runtime version. Publish `codex-cli-bin` as
platform wheels only; do not publish an sdist:
Maintainers stage releases by building the core SDK once, the bundled SDK
metapackage once, and the runtime once per platform with the same pinned
runtime version. Publish `codex-cli-bin` as platform wheels only; do not
publish an sdist. Published Python package versions should match the
`rust-v...` release tag:
```bash
cd sdk/python
python scripts/update_sdk_artifacts.py generate-types
python scripts/update_sdk_artifacts.py \
stage-sdk-core \
/tmp/codex-python-release/codex-app-server-sdk-core \
--sdk-version 1.2.3
python scripts/update_sdk_artifacts.py \
stage-sdk \
/tmp/codex-python-release/codex-app-server-sdk \
--sdk-version 1.2.3 \
--runtime-version 1.2.3
python scripts/update_sdk_artifacts.py \
stage-runtime \

View File

@@ -3,9 +3,9 @@ requires = ["hatchling>=1.24.0"]
build-backend = "hatchling.build"
[project]
name = "codex-app-server-sdk"
name = "codex-app-server-sdk-core"
version = "0.2.0"
description = "Python SDK for Codex app-server v2"
description = "Core Python SDK for Codex app-server v2"
readme = "README.md"
requires-python = ">=3.10"
license = { text = "Apache-2.0" }

View File

@@ -15,8 +15,13 @@ import types
import typing
from dataclasses import dataclass
from pathlib import Path
from textwrap import dedent
from typing import Any, Callable, Sequence, get_args, get_origin
CORE_SDK_PKG_NAME = "codex-app-server-sdk-core"
BUNDLED_SDK_PKG_NAME = "codex-app-server-sdk"
RUNTIME_PKG_NAME = "codex-cli-bin"
def repo_root() -> Path:
return Path(__file__).resolve().parents[3]
@@ -110,23 +115,7 @@ def _rewrite_project_version(pyproject_text: str, version: str) -> str:
return updated
def _rewrite_sdk_runtime_dependency(pyproject_text: str, runtime_version: str) -> str:
match = re.search(r"^dependencies = \[(.*?)\]$", pyproject_text, flags=re.MULTILINE)
if match is None:
raise RuntimeError(
"Could not find dependencies array in sdk/python/pyproject.toml"
)
raw_items = [item.strip() for item in match.group(1).split(",") if item.strip()]
raw_items = [item for item in raw_items if "codex-cli-bin" not in item]
raw_items.append(f'"codex-cli-bin=={runtime_version}"')
replacement = "dependencies = [\n " + ",\n ".join(raw_items) + ",\n]"
return pyproject_text[: match.start()] + replacement + pyproject_text[match.end() :]
def stage_python_sdk_package(
staging_dir: Path, sdk_version: str, runtime_version: str
) -> Path:
def stage_python_core_sdk_package(staging_dir: Path, sdk_version: str) -> Path:
_copy_package_tree(sdk_root(), staging_dir)
sdk_bin_dir = staging_dir / "src" / "codex_app_server" / "bin"
if sdk_bin_dir.exists():
@@ -135,11 +124,73 @@ def stage_python_sdk_package(
pyproject_path = staging_dir / "pyproject.toml"
pyproject_text = pyproject_path.read_text()
pyproject_text = _rewrite_project_version(pyproject_text, sdk_version)
pyproject_text = _rewrite_sdk_runtime_dependency(pyproject_text, runtime_version)
pyproject_path.write_text(pyproject_text)
return staging_dir
def stage_python_sdk_package(
staging_dir: Path, sdk_version: str, runtime_version: str
) -> Path:
if staging_dir.exists():
if staging_dir.is_dir():
shutil.rmtree(staging_dir)
else:
staging_dir.unlink()
package_dir = staging_dir / "src" / "codex_app_server_sdk_meta"
package_dir.mkdir(parents=True, exist_ok=True)
(package_dir / "__init__.py").write_text(
'"""Bundled Codex app-server SDK package metadata."""\n'
)
pyproject = dedent(
f"""
[build-system]
requires = ["hatchling>=1.24.0"]
build-backend = "hatchling.build"
[project]
name = "{BUNDLED_SDK_PKG_NAME}"
version = "{sdk_version}"
description = "Bundled Python SDK for Codex app-server v2"
readme = "README.md"
requires-python = ">=3.10"
license = {{ text = "Apache-2.0" }}
authors = [{{ name = "OpenClaw Assistant" }}]
dependencies = [
"{CORE_SDK_PKG_NAME}=={sdk_version}",
"{RUNTIME_PKG_NAME}=={runtime_version}",
]
[project.urls]
Homepage = "https://github.com/openai/codex"
Repository = "https://github.com/openai/codex"
Issues = "https://github.com/openai/codex/issues"
[tool.hatch.build.targets.wheel]
packages = ["src/codex_app_server_sdk_meta"]
[tool.hatch.build.targets.sdist]
include = ["src/codex_app_server_sdk_meta/**", "README.md", "pyproject.toml"]
"""
).lstrip()
(staging_dir / "pyproject.toml").write_text(pyproject)
(staging_dir / "README.md").write_text(
"\n".join(
[
"# Codex App Server Python SDK",
"",
"Bundled metapackage for the Codex app-server Python SDK.",
f"It depends on `{CORE_SDK_PKG_NAME}` and `{RUNTIME_PKG_NAME}`",
"at the same version so a regular install includes both the SDK",
"and the packaged Codex runtime binary.",
"",
]
)
)
return staging_dir
def stage_python_runtime_package(
staging_dir: Path, runtime_version: str, binary_path: Path
) -> Path:
@@ -558,6 +609,7 @@ class PublicFieldSpec:
@dataclass(frozen=True)
class CliOps:
generate_types: Callable[[], None]
stage_python_core_sdk_package: Callable[[Path, str], Path]
stage_python_sdk_package: Callable[[Path, str, str], Path]
stage_python_runtime_package: Callable[[Path, str, Path], Path]
current_sdk_version: Callable[[], str]
@@ -916,23 +968,37 @@ def build_parser() -> argparse.ArgumentParser:
"generate-types", help="Regenerate Python protocol-derived types"
)
stage_sdk_core_parser = subparsers.add_parser(
"stage-sdk-core",
help="Stage a releasable core SDK package without a bundled runtime",
)
stage_sdk_core_parser.add_argument(
"staging_dir",
type=Path,
help="Output directory for the staged core SDK package",
)
stage_sdk_core_parser.add_argument(
"--sdk-version",
help="Version to write into the staged core SDK package (defaults to sdk/python current version)",
)
stage_sdk_parser = subparsers.add_parser(
"stage-sdk",
help="Stage a releasable SDK package pinned to a runtime version",
help="Stage a releasable bundled SDK metapackage pinned to a runtime version",
)
stage_sdk_parser.add_argument(
"staging_dir",
type=Path,
help="Output directory for the staged SDK package",
help="Output directory for the staged bundled SDK package",
)
stage_sdk_parser.add_argument(
"--runtime-version",
required=True,
help="Pinned codex-cli-bin version for the staged SDK package",
help="Pinned codex-cli-bin version for the staged bundled SDK package",
)
stage_sdk_parser.add_argument(
"--sdk-version",
help="Version to write into the staged SDK package (defaults to sdk/python current version)",
help="Version to write into the staged bundled SDK package (defaults to sdk/python current version)",
)
stage_runtime_parser = subparsers.add_parser(
@@ -964,6 +1030,7 @@ def parse_args(argv: Sequence[str] | None = None) -> argparse.Namespace:
def default_cli_ops() -> CliOps:
return CliOps(
generate_types=generate_types,
stage_python_core_sdk_package=stage_python_core_sdk_package,
stage_python_sdk_package=stage_python_sdk_package,
stage_python_runtime_package=stage_python_runtime_package,
current_sdk_version=current_sdk_version,
@@ -973,6 +1040,12 @@ def default_cli_ops() -> CliOps:
def run_command(args: argparse.Namespace, ops: CliOps) -> None:
if args.command == "generate-types":
ops.generate_types()
elif args.command == "stage-sdk-core":
ops.generate_types()
ops.stage_python_core_sdk_package(
args.staging_dir,
args.sdk_version or ops.current_sdk_version(),
)
elif args.command == "stage-sdk":
ops.generate_types()
ops.stage_python_sdk_package(

View File

@@ -47,6 +47,7 @@ from .retry import retry_on_overload
ModelT = TypeVar("ModelT", bound=BaseModel)
ApprovalHandler = Callable[[str, JsonObject | None], JsonObject]
BUNDLED_SDK_PKG_NAME = "codex-app-server-sdk"
RUNTIME_PKG_NAME = "codex-cli-bin"
@@ -82,9 +83,9 @@ def _installed_codex_path() -> Path:
from codex_cli_bin import bundled_codex_path
except ImportError as exc:
raise FileNotFoundError(
"Unable to locate the pinned Codex runtime. Install the published SDK build "
f"with its {RUNTIME_PKG_NAME} dependency, or set AppServerConfig.codex_bin "
"explicitly."
"Unable to locate the Codex runtime. Install the published "
f"{BUNDLED_SDK_PKG_NAME} package, install {RUNTIME_PKG_NAME} "
"alongside the core SDK, or set AppServerConfig.codex_bin explicitly."
) from exc
return bundled_codex_path()

View File

@@ -238,17 +238,31 @@ def test_stage_runtime_release_replaces_existing_staging_dir(tmp_path: Path) ->
assert script.staged_runtime_bin_path(staged).read_text() == "fake codex\n"
def test_stage_sdk_release_injects_exact_runtime_pin(tmp_path: Path) -> None:
def test_stage_core_sdk_release_sets_version_without_runtime_pin(tmp_path: Path) -> None:
script = _load_update_script_module()
staged = script.stage_python_core_sdk_package(tmp_path / "sdk-core-stage", "0.2.1")
pyproject = (staged / "pyproject.toml").read_text()
assert 'version = "0.2.1"' in pyproject
assert 'name = "codex-app-server-sdk-core"' in pyproject
assert "codex-cli-bin" not in pyproject
assert not any((staged / "src" / "codex_app_server").glob("bin/**"))
def test_stage_bundled_sdk_release_injects_exact_core_and_runtime_pins(
tmp_path: Path,
) -> None:
script = _load_update_script_module()
staged = script.stage_python_sdk_package(tmp_path / "sdk-stage", "0.2.1", "1.2.3")
pyproject = (staged / "pyproject.toml").read_text()
assert 'name = "codex-app-server-sdk"' in pyproject
assert 'version = "0.2.1"' in pyproject
assert '"codex-app-server-sdk-core==0.2.1"' in pyproject
assert '"codex-cli-bin==1.2.3"' in pyproject
assert not any((staged / "src" / "codex_app_server").glob("bin/**"))
def test_stage_sdk_release_replaces_existing_staging_dir(tmp_path: Path) -> None:
def test_stage_bundled_sdk_release_replaces_existing_staging_dir(tmp_path: Path) -> None:
script = _load_update_script_module()
staging_dir = tmp_path / "sdk-stage"
old_file = staging_dir / "stale.txt"
@@ -261,6 +275,49 @@ def test_stage_sdk_release_replaces_existing_staging_dir(tmp_path: Path) -> None
assert not old_file.exists()
def test_stage_sdk_core_runs_type_generation_before_staging(tmp_path: Path) -> None:
script = _load_update_script_module()
calls: list[str] = []
args = script.parse_args(
[
"stage-sdk-core",
str(tmp_path / "sdk-core-stage"),
]
)
def fake_generate_types() -> None:
calls.append("generate_types")
def fake_stage_core_sdk_package(_staging_dir: Path, _sdk_version: str) -> Path:
calls.append("stage_sdk_core")
return tmp_path / "sdk-core-stage"
def fake_stage_sdk_package(
_staging_dir: Path, _sdk_version: str, _runtime_version: str
) -> Path:
raise AssertionError("bundled sdk staging should not run for stage-sdk-core")
def fake_stage_runtime_package(
_staging_dir: Path, _runtime_version: str, _runtime_binary: Path
) -> Path:
raise AssertionError("runtime staging should not run for stage-sdk-core")
def fake_current_sdk_version() -> str:
return "0.2.0"
ops = script.CliOps(
generate_types=fake_generate_types,
stage_python_core_sdk_package=fake_stage_core_sdk_package,
stage_python_sdk_package=fake_stage_sdk_package,
stage_python_runtime_package=fake_stage_runtime_package,
current_sdk_version=fake_current_sdk_version,
)
script.run_command(args, ops)
assert calls == ["generate_types", "stage_sdk_core"]
def test_stage_sdk_runs_type_generation_before_staging(tmp_path: Path) -> None:
script = _load_update_script_module()
calls: list[str] = []
@@ -276,6 +333,9 @@ def test_stage_sdk_runs_type_generation_before_staging(tmp_path: Path) -> None:
def fake_generate_types() -> None:
calls.append("generate_types")
def fake_stage_core_sdk_package(_staging_dir: Path, _sdk_version: str) -> Path:
raise AssertionError("core sdk staging should not run for stage-sdk")
def fake_stage_sdk_package(
_staging_dir: Path, _sdk_version: str, _runtime_version: str
) -> Path:
@@ -292,6 +352,7 @@ def test_stage_sdk_runs_type_generation_before_staging(tmp_path: Path) -> None:
ops = script.CliOps(
generate_types=fake_generate_types,
stage_python_core_sdk_package=fake_stage_core_sdk_package,
stage_python_sdk_package=fake_stage_sdk_package,
stage_python_runtime_package=fake_stage_runtime_package,
current_sdk_version=fake_current_sdk_version,
@@ -320,6 +381,9 @@ def test_stage_runtime_stages_binary_without_type_generation(tmp_path: Path) ->
def fake_generate_types() -> None:
calls.append("generate_types")
def fake_stage_core_sdk_package(_staging_dir: Path, _sdk_version: str) -> Path:
raise AssertionError("core sdk staging should not run for stage-runtime")
def fake_stage_sdk_package(
_staging_dir: Path, _sdk_version: str, _runtime_version: str
) -> Path:
@@ -336,6 +400,7 @@ def test_stage_runtime_stages_binary_without_type_generation(tmp_path: Path) ->
ops = script.CliOps(
generate_types=fake_generate_types,
stage_python_core_sdk_package=fake_stage_core_sdk_package,
stage_python_sdk_package=fake_stage_sdk_package,
stage_python_runtime_package=fake_stage_runtime_package,
current_sdk_version=fake_current_sdk_version,