Compare commits

...

11 Commits

Author SHA1 Message Date
Ahmed Ibrahim
3036dee57a Merge branch 'main' into aibrahim/realtime-siwc-client-secrets 2026-03-20 15:53:47 -07:00
Ahmed Ibrahim
5f9ad86d93 fix 2026-03-19 19:53:06 -07:00
Ahmed Ibrahim
819b7a719e fix 2026-03-19 19:32:44 -07:00
Ahmed Ibrahim
56d9b9c268 Gate realtime client secrets to OpenAI providers
Co-authored-by: Codex <noreply@openai.com>
2026-03-19 17:37:52 -07:00
Ahmed Ibrahim
71be8598cf Fix wiremock shutdown in realtime tests
Co-authored-by: Codex <noreply@openai.com>
2026-03-19 17:31:37 -07:00
Ahmed Ibrahim
6151a0b447 codex: fix CI failure on PR #15237
Co-authored-by: Codex <noreply@openai.com>
2026-03-19 16:25:08 -07:00
Ahmed Ibrahim
035dcf8bc3 codex: fix CI failure on PR #15237
Co-authored-by: Codex <noreply@openai.com>
2026-03-19 16:21:52 -07:00
Ahmed Ibrahim
fd329382ae codex: fix CI failure on PR #15237
Co-authored-by: Codex <noreply@openai.com>
2026-03-19 16:15:10 -07:00
Ahmed Ibrahim
c145f6726a codex: fix CI failure on PR #15237
Co-authored-by: Codex <noreply@openai.com>
2026-03-19 16:11:51 -07:00
Ahmed Ibrahim
48188734e2 Use endpoint rails for realtime client secrets
Co-authored-by: Codex <noreply@openai.com>
2026-03-19 16:00:18 -07:00
Ahmed Ibrahim
475883b07b Use SIWC client secrets for realtime auth
Co-authored-by: Codex <noreply@openai.com>
2026-03-19 15:52:38 -07:00
11 changed files with 562 additions and 77 deletions

View File

@@ -1,7 +1,9 @@
use anyhow::Context;
use anyhow::Result;
use app_test_support::ChatGptIdTokenClaims;
use app_test_support::McpProcess;
use app_test_support::create_mock_responses_server_sequence_unchecked;
use app_test_support::encode_id_token;
use app_test_support::to_response;
use codex_app_server_protocol::JSONRPCError;
use codex_app_server_protocol::JSONRPCResponse;
@@ -27,6 +29,8 @@ use codex_app_server_protocol::ThreadStartResponse;
use codex_features::FEATURES;
use codex_features::Feature;
use codex_protocol::protocol::RealtimeConversationVersion;
use core_test_support::responses::mount_realtime_client_secret;
use core_test_support::responses::start_chatgpt_mock_server;
use core_test_support::responses::start_websocket_server;
use core_test_support::skip_if_no_network;
use pretty_assertions::assert_eq;
@@ -36,6 +40,7 @@ use std::path::Path;
use std::time::Duration;
use tempfile::TempDir;
use tokio::time::timeout;
use wiremock::ResponseTemplate;
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10);
const STARTUP_CONTEXT_HEADER: &str = "Startup context from Codex.";
@@ -99,6 +104,7 @@ async fn realtime_conversation_streams_v2_notifications() -> Result<()> {
codex_home.path(),
&responses_server.uri(),
realtime_server.uri(),
None,
true,
)?;
@@ -306,6 +312,7 @@ async fn realtime_conversation_stop_emits_closed_notification() -> Result<()> {
codex_home.path(),
&responses_server.uri(),
realtime_server.uri(),
None,
true,
)?;
@@ -366,6 +373,93 @@ async fn realtime_conversation_stop_emits_closed_notification() -> Result<()> {
Ok(())
}
#[tokio::test]
async fn realtime_conversation_uses_client_secret_with_external_chatgpt_auth() -> Result<()> {
skip_if_no_network!(Ok(()));
let responses_server = create_mock_responses_server_sequence_unchecked(Vec::new()).await;
let chatgpt_server = start_chatgpt_mock_server().await;
mount_realtime_client_secret(
&chatgpt_server,
ResponseTemplate::new(200).set_body_json(json!({
"value": "ek-app-server"
})),
)
.await;
let realtime_server = start_websocket_server(vec![vec![vec![json!({
"type": "session.updated",
"session": { "id": "sess_external", "instructions": "backend prompt" }
})]]])
.await;
let codex_home = TempDir::new()?;
create_config_toml(
codex_home.path(),
&responses_server.uri(),
realtime_server.uri(),
Some(chatgpt_server.uri().as_str()),
true,
)?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
mcp.initialize().await?;
login_with_chatgpt_auth_tokens(&mut mcp).await?;
let thread_start_request_id = mcp
.send_thread_start_request(ThreadStartParams::default())
.await?;
let thread_start_response: JSONRPCResponse = timeout(
DEFAULT_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(thread_start_request_id)),
)
.await??;
let thread_start: ThreadStartResponse = to_response(thread_start_response)?;
let start_request_id = mcp
.send_thread_realtime_start_request(ThreadRealtimeStartParams {
thread_id: thread_start.thread.id.clone(),
prompt: "backend prompt".to_string(),
session_id: None,
})
.await?;
let start_response: JSONRPCResponse = timeout(
DEFAULT_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(start_request_id)),
)
.await??;
let _: ThreadRealtimeStartResponse = to_response(start_response)?;
let started =
read_notification::<ThreadRealtimeStartedNotification>(&mut mcp, "thread/realtime/started")
.await?;
assert_eq!(started.thread_id, thread_start.thread.id);
assert_eq!(started.version, RealtimeConversationVersion::V2);
let requests = chatgpt_server.received_requests().await.unwrap_or_default();
assert_eq!(requests.len(), 1);
assert_eq!(requests[0].url.path(), "/codex/realtime/client_secrets");
assert_eq!(
requests[0]
.headers
.get("chatgpt-account-id")
.and_then(|value| value.to_str().ok()),
Some("org-siwc")
);
let request_body: serde_json::Value = serde_json::from_slice(&requests[0].body)?;
assert_eq!(request_body["session"]["type"], json!("realtime"));
assert_eq!(
realtime_server.handshakes()[0]
.header("authorization")
.as_deref(),
Some("Bearer ek-app-server")
);
realtime_server.shutdown().await;
drop(chatgpt_server);
Ok(())
}
#[tokio::test]
async fn realtime_conversation_requires_feature_flag() -> Result<()> {
skip_if_no_network!(Ok(()));
@@ -378,6 +472,7 @@ async fn realtime_conversation_requires_feature_flag() -> Result<()> {
codex_home.path(),
&responses_server.uri(),
realtime_server.uri(),
None,
false,
)?;
@@ -443,10 +538,40 @@ async fn login_with_api_key(mcp: &mut McpProcess, api_key: &str) -> Result<()> {
Ok(())
}
async fn login_with_chatgpt_auth_tokens(mcp: &mut McpProcess) -> Result<()> {
let access_token = encode_id_token(
&ChatGptIdTokenClaims::new()
.email("siwc@example.com")
.plan_type("business")
.chatgpt_account_id("org-siwc"),
)?;
let request_id = mcp
.send_chatgpt_auth_tokens_login_request(
access_token,
"org-siwc".to_string(),
Some("business".to_string()),
)
.await?;
let response: JSONRPCResponse = timeout(
DEFAULT_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(request_id)),
)
.await??;
let login: LoginAccountResponse = to_response(response)?;
assert_eq!(login, LoginAccountResponse::ChatgptAuthTokens {});
let _updated = timeout(
DEFAULT_TIMEOUT,
mcp.read_stream_until_notification_message("account/updated"),
)
.await??;
Ok(())
}
fn create_config_toml(
codex_home: &Path,
responses_server_uri: &str,
realtime_server_uri: &str,
chatgpt_base_url: Option<&str>,
realtime_enabled: bool,
) -> std::io::Result<()> {
let realtime_feature_key = FEATURES
@@ -454,6 +579,9 @@ fn create_config_toml(
.find(|spec| spec.id == Feature::RealtimeConversation)
.map(|spec| spec.key)
.unwrap_or("realtime_conversation");
let chatgpt_base_url = chatgpt_base_url
.map(|url| format!("chatgpt_base_url = \"{url}\"\n"))
.unwrap_or_default();
std::fs::write(
codex_home.join("config.toml"),
@@ -462,7 +590,7 @@ fn create_config_toml(
model = "mock-model"
approval_policy = "never"
sandbox_mode = "read-only"
model_provider = "mock_provider"
{chatgpt_base_url}model_provider = "mock_provider"
experimental_realtime_ws_base_url = "{realtime_server_uri}"
[realtime]

View File

@@ -1,6 +1,7 @@
pub mod compact;
pub mod memories;
pub mod models;
pub mod realtime_client_secrets;
pub mod realtime_websocket;
pub mod responses;
pub mod responses_websocket;

View File

@@ -0,0 +1,201 @@
use crate::auth::AuthProvider;
use crate::endpoint::realtime_websocket::RealtimeSessionConfig;
use crate::endpoint::realtime_websocket::session_update_session_json;
use crate::endpoint::session::EndpointSession;
use crate::error::ApiError;
use crate::provider::Provider;
use codex_client::HttpTransport;
use codex_client::RequestTelemetry;
use http::HeaderMap;
use http::Method;
use serde::Deserialize;
use serde_json::Value;
use serde_json::json;
use std::sync::Arc;
pub struct RealtimeClientSecretsClient<T: HttpTransport, A: AuthProvider> {
session: EndpointSession<T, A>,
}
impl<T: HttpTransport, A: AuthProvider> RealtimeClientSecretsClient<T, A> {
pub fn new(transport: T, provider: Provider, auth: A) -> Self {
Self {
session: EndpointSession::new(transport, provider, auth),
}
}
pub fn with_telemetry(self, request: Option<Arc<dyn RequestTelemetry>>) -> Self {
Self {
session: self.session.with_request_telemetry(request),
}
}
fn path() -> &'static str {
"codex/realtime/client_secrets"
}
pub async fn create(
&self,
config: &RealtimeSessionConfig,
extra_headers: HeaderMap,
) -> Result<String, ApiError> {
let body = realtime_client_secret_request_body(config)?;
let resp = self
.session
.execute(Method::POST, Self::path(), extra_headers, Some(body))
.await?;
let parsed: RealtimeClientSecretResponse =
serde_json::from_slice(&resp.body).map_err(|err| {
ApiError::Stream(format!(
"failed to decode realtime client secret response: {err}"
))
})?;
if parsed.value.trim().is_empty() {
return Err(ApiError::Stream(
"realtime client secret response was missing a value".to_string(),
));
}
Ok(parsed.value)
}
}
fn realtime_client_secret_request_body(config: &RealtimeSessionConfig) -> Result<Value, ApiError> {
let mut session = session_update_session_json(
config.event_parser,
config.instructions.clone(),
config.session_mode,
)?;
if let Some(model) = config.model.as_ref()
&& let Some(session_object) = session.as_object_mut()
{
session_object.insert("model".to_string(), Value::String(model.clone()));
}
Ok(json!({
"session": session,
}))
}
#[derive(Debug, Deserialize)]
struct RealtimeClientSecretResponse {
value: String,
}
#[cfg(test)]
mod tests {
use super::*;
use crate::provider::RetryConfig;
use async_trait::async_trait;
use codex_client::Request;
use codex_client::Response;
use codex_client::StreamResponse;
use codex_client::TransportError;
use http::HeaderMap;
use http::Method;
use http::StatusCode;
use pretty_assertions::assert_eq;
use serde_json::json;
use std::sync::Arc;
use std::sync::Mutex;
use std::time::Duration;
#[derive(Clone)]
struct CapturingTransport {
last_request: Arc<Mutex<Option<Request>>>,
response_body: Arc<Vec<u8>>,
}
impl CapturingTransport {
fn new(response_body: Vec<u8>) -> Self {
Self {
last_request: Arc::new(Mutex::new(None)),
response_body: Arc::new(response_body),
}
}
}
#[async_trait]
impl HttpTransport for CapturingTransport {
async fn execute(&self, req: Request) -> Result<Response, TransportError> {
*self.last_request.lock().expect("lock request store") = Some(req);
Ok(Response {
status: StatusCode::OK,
headers: HeaderMap::new(),
body: self.response_body.as_ref().clone().into(),
})
}
async fn stream(&self, _req: Request) -> Result<StreamResponse, TransportError> {
Err(TransportError::Build("stream should not run".to_string()))
}
}
#[derive(Clone, Default)]
struct DummyAuth;
impl AuthProvider for DummyAuth {
fn bearer_token(&self) -> Option<String> {
None
}
}
fn provider(base_url: &str) -> Provider {
Provider {
name: "test".to_string(),
base_url: base_url.to_string(),
query_params: None,
headers: HeaderMap::new(),
retry: RetryConfig {
max_attempts: 1,
base_delay: Duration::from_millis(1),
retry_429: false,
retry_5xx: true,
retry_transport: true,
},
stream_idle_timeout: Duration::from_secs(1),
}
}
#[tokio::test]
async fn create_posts_expected_payload_and_parses_value() {
let transport = CapturingTransport::new(
serde_json::to_vec(&json!({
"value": "ek-test-secret"
}))
.expect("serialize response"),
);
let client = RealtimeClientSecretsClient::new(
transport.clone(),
provider("https://example.com/backend-api"),
DummyAuth,
);
let session = RealtimeSessionConfig {
instructions: "Be helpful".to_string(),
model: Some("gpt-realtime".to_string()),
session_id: Some("session-1".to_string()),
event_parser: crate::endpoint::realtime_websocket::RealtimeEventParser::RealtimeV2,
session_mode: crate::endpoint::realtime_websocket::RealtimeSessionMode::Conversational,
};
let value = client
.create(&session, HeaderMap::new())
.await
.expect("client secret request should succeed");
assert_eq!(value, "ek-test-secret");
let request = transport
.last_request
.lock()
.expect("lock request store")
.clone()
.expect("request should be captured");
assert_eq!(request.method, Method::POST);
assert_eq!(
request.url,
"https://example.com/backend-api/codex/realtime/client_secrets"
);
let body = request.body.expect("request body should be present");
assert_eq!(body["session"]["type"], "realtime");
assert_eq!(body["session"]["model"], "gpt-realtime");
}
}

View File

@@ -10,11 +10,13 @@ use crate::endpoint::realtime_websocket::protocol::RealtimeEventParser;
use crate::endpoint::realtime_websocket::protocol::RealtimeOutboundMessage;
use crate::endpoint::realtime_websocket::protocol::RealtimeSessionMode;
use crate::endpoint::realtime_websocket::protocol::SessionUpdateSession;
use crate::error::ApiError;
use serde_json::Value;
pub(super) const REALTIME_AUDIO_SAMPLE_RATE: u32 = 24_000;
const AGENT_FINAL_MESSAGE_PREFIX: &str = "\"Agent Final Message\":\n\n";
pub(super) fn normalized_session_mode(
pub(crate) fn normalized_session_mode(
event_parser: RealtimeEventParser,
session_mode: RealtimeSessionMode,
) -> RealtimeSessionMode {
@@ -48,7 +50,7 @@ pub(super) fn conversation_handoff_append_message(
}
}
pub(super) fn session_update_session(
pub(crate) fn session_update_session(
event_parser: RealtimeEventParser,
instructions: String,
session_mode: RealtimeSessionMode,
@@ -60,6 +62,19 @@ pub(super) fn session_update_session(
}
}
pub(crate) fn session_update_session_json(
event_parser: RealtimeEventParser,
instructions: String,
session_mode: RealtimeSessionMode,
) -> Result<Value, ApiError> {
serde_json::to_value(session_update_session(
event_parser,
instructions,
session_mode,
))
.map_err(|err| ApiError::Stream(format!("failed to encode realtime session config: {err}")))
}
pub(super) fn websocket_intent(event_parser: RealtimeEventParser) -> Option<&'static str> {
match event_parser {
RealtimeEventParser::V1 => v1_websocket_intent(),

View File

@@ -13,6 +13,7 @@ pub use methods::RealtimeWebsocketClient;
pub use methods::RealtimeWebsocketConnection;
pub use methods::RealtimeWebsocketEvents;
pub use methods::RealtimeWebsocketWriter;
pub(crate) use methods_common::session_update_session_json;
pub use protocol::RealtimeEventParser;
pub use protocol::RealtimeSessionConfig;
pub use protocol::RealtimeSessionMode;

View File

@@ -30,6 +30,7 @@ pub use crate::common::response_create_client_metadata;
pub use crate::endpoint::compact::CompactClient;
pub use crate::endpoint::memories::MemoriesClient;
pub use crate::endpoint::models::ModelsClient;
pub use crate::endpoint::realtime_client_secrets::RealtimeClientSecretsClient;
pub use crate::endpoint::realtime_websocket::RealtimeEventParser;
pub use crate::endpoint::realtime_websocket::RealtimeSessionConfig;
pub use crate::endpoint::realtime_websocket::RealtimeSessionMode;

View File

@@ -216,6 +216,13 @@ pub(crate) struct CoreAuthProvider {
}
impl CoreAuthProvider {
pub(crate) fn from_auth(auth: &CodexAuth) -> crate::error::Result<Self> {
Ok(Self {
token: Some(auth.get_token()?),
account_id: auth.get_account_id(),
})
}
pub(crate) fn auth_header_attached(&self) -> bool {
self.token
.as_ref()

View File

@@ -1,9 +1,10 @@
use crate::CodexAuth;
use crate::api_bridge::CoreAuthProvider;
use crate::api_bridge::map_api_error;
use crate::auth::read_openai_api_key_from_env;
use crate::codex::Session;
use crate::config::RealtimeWsMode;
use crate::config::RealtimeWsVersion;
use crate::default_client::build_reqwest_client;
use crate::default_client::default_headers;
use crate::error::CodexErr;
use crate::error::Result as CodexResult;
@@ -15,11 +16,13 @@ use base64::Engine;
use base64::engine::general_purpose::STANDARD as BASE64_STANDARD;
use codex_api::Provider as ApiProvider;
use codex_api::RealtimeAudioFrame;
use codex_api::RealtimeClientSecretsClient;
use codex_api::RealtimeEvent;
use codex_api::RealtimeEventParser;
use codex_api::RealtimeSessionConfig;
use codex_api::RealtimeSessionMode;
use codex_api::RealtimeWebsocketClient;
use codex_api::ReqwestTransport;
use codex_api::endpoint::realtime_websocket::RealtimeWebsocketEvents;
use codex_api::endpoint::realtime_websocket::RealtimeWebsocketWriter;
use codex_protocol::protocol::CodexErrorInfo;
@@ -453,7 +456,6 @@ async fn prepare_realtime_start(
) -> CodexResult<PreparedRealtimeConversationStart> {
let provider = sess.provider().await;
let auth = sess.services.auth_manager.auth().await;
let realtime_api_key = realtime_api_key(auth.as_ref(), &provider)?;
let mut api_provider = provider.to_api_provider(Some(crate::auth::AuthMode::ApiKey))?;
let config = sess.get_config().await;
if let Some(realtime_ws_base_url) = &config.experimental_realtime_ws_base_url {
@@ -494,8 +496,12 @@ async fn prepare_realtime_start(
event_parser,
session_mode,
};
let extra_headers =
realtime_request_headers(requested_session_id.as_deref(), realtime_api_key.as_str())?;
let realtime_bearer_token =
realtime_bearer_token(auth.as_ref(), &provider, &config, &session_config).await?;
let extra_headers = realtime_request_headers(
requested_session_id.as_deref(),
realtime_bearer_token.as_str(),
)?;
Ok(PreparedRealtimeConversationStart {
api_provider,
extra_headers,
@@ -625,9 +631,11 @@ fn realtime_text_from_handoff_request(handoff: &RealtimeHandoffRequested) -> Opt
.or((!handoff.input_transcript.is_empty()).then_some(handoff.input_transcript.clone()))
}
fn realtime_api_key(
async fn realtime_bearer_token(
auth: Option<&CodexAuth>,
provider: &crate::ModelProviderInfo,
config: &crate::config::Config,
session_config: &RealtimeSessionConfig,
) -> CodexResult<String> {
if let Some(api_key) = provider.api_key()? {
return Ok(api_key);
@@ -637,26 +645,51 @@ fn realtime_api_key(
return Ok(token);
}
if let Some(api_key) = auth.and_then(CodexAuth::api_key) {
return Ok(api_key.to_string());
}
// TODO(aibrahim): Remove this temporary fallback once realtime auth no longer
// requires API key auth for ChatGPT/SIWC sessions.
if provider.is_openai()
&& let Some(api_key) = read_openai_api_key_from_env()
{
return Ok(api_key);
if let Some(auth) = auth {
if auth.is_chatgpt_auth() && provider.is_openai() {
let auth_provider = CoreAuthProvider::from_auth(auth)?;
let transport = ReqwestTransport::new(build_reqwest_client());
let client = RealtimeClientSecretsClient::new(
transport,
realtime_client_secret_provider(config)?,
auth_provider,
);
return client
.create(session_config, HeaderMap::new())
.await
.map_err(map_api_error);
}
if let Some(api_key) = auth.api_key() {
return Ok(api_key.to_string());
}
}
Err(CodexErr::InvalidRequest(
"realtime conversation requires API key auth".to_string(),
"realtime conversation requires API key or ChatGPT auth".to_string(),
))
}
fn realtime_client_secret_provider(config: &crate::config::Config) -> CodexResult<ApiProvider> {
crate::ModelProviderInfo::create_openai_provider(Some(normalized_chatgpt_base_url(
&config.chatgpt_base_url,
)))
.to_api_provider(Some(crate::auth::AuthMode::Chatgpt))
}
fn normalized_chatgpt_base_url(input: &str) -> String {
let mut base_url = input.trim_end_matches('/').to_string();
if (base_url.starts_with("https://chatgpt.com")
|| base_url.starts_with("https://chat.openai.com"))
&& !base_url.contains("/backend-api")
{
base_url = format!("{base_url}/backend-api");
}
base_url
}
fn realtime_request_headers(
session_id: Option<&str>,
api_key: &str,
bearer_token: &str,
) -> CodexResult<Option<HeaderMap>> {
let mut headers = HeaderMap::new();
@@ -666,8 +699,8 @@ fn realtime_request_headers(
headers.insert("x-session-id", session_id);
}
let auth_value = HeaderValue::from_str(&format!("Bearer {api_key}")).map_err(|err| {
CodexErr::InvalidRequest(format!("invalid realtime api key header: {err}"))
let auth_value = HeaderValue::from_str(&format!("Bearer {bearer_token}")).map_err(|err| {
CodexErr::InvalidRequest(format!("invalid realtime bearer token header: {err}"))
})?;
headers.insert(AUTHORIZATION, auth_value);

View File

@@ -31,6 +31,7 @@ use wiremock::ResponseTemplate;
use wiremock::http::HeaderName;
use wiremock::http::HeaderValue;
use wiremock::matchers::method;
use wiremock::matchers::path;
use wiremock::matchers::path_regex;
use crate::test_codex::ApplyPatchModelOutput;
@@ -1177,6 +1178,33 @@ pub async fn start_mock_server() -> MockServer {
server
}
pub async fn start_chatgpt_mock_server() -> MockServer {
let server = MockServer::builder()
.body_print_limit(BodyPrintLimit::Limited(80_000))
.start()
.await;
mount_chatgpt_cloud_requirements_ok(&server).await;
server
}
pub async fn mount_chatgpt_cloud_requirements_ok(server: &MockServer) {
Mock::given(method("GET"))
.and(path("/backend-api/wham/config/requirements"))
.respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({})))
.mount(server)
.await;
}
pub async fn mount_realtime_client_secret(server: &MockServer, response: ResponseTemplate) {
Mock::given(method("POST"))
.and(path("/codex/realtime/client_secrets"))
.respond_with(response)
.mount(server)
.await;
}
/// Starts a lightweight WebSocket server for `/v1/responses` tests.
///
/// Each connection consumes a queue of request/event sequences. For each

View File

@@ -179,7 +179,7 @@ impl ActionKind {
Ok((event, Some(command)))
}
ActionKind::RunCommand { command } => {
let event = shell_event(call_id, command, 1_000, sandbox_permissions)?;
let event = shell_event(call_id, command, 2_000, sandbox_permissions)?;
Ok((event, Some(command.to_string())))
}
ActionKind::RunUnifiedExecCommand {

View File

@@ -2,7 +2,6 @@ use anyhow::Context;
use anyhow::Result;
use chrono::Utc;
use codex_core::CodexAuth;
use codex_core::auth::OPENAI_API_KEY_ENV_VAR;
use codex_protocol::ThreadId;
use codex_protocol::protocol::CodexErrorInfo;
use codex_protocol::protocol::ConversationAudioParams;
@@ -18,6 +17,8 @@ use codex_protocol::protocol::RealtimeEvent;
use codex_protocol::protocol::SessionSource;
use codex_protocol::user_input::UserInput;
use core_test_support::responses;
use core_test_support::responses::mount_realtime_client_secret;
use core_test_support::responses::start_chatgpt_mock_server;
use core_test_support::responses::start_mock_server;
use core_test_support::responses::start_websocket_server;
use core_test_support::skip_if_no_network;
@@ -31,16 +32,14 @@ use pretty_assertions::assert_eq;
use serde_json::Value;
use serde_json::json;
use std::fs;
use std::process::Command;
use std::time::Duration;
use tokio::sync::oneshot;
use tokio::time::timeout;
use wiremock::ResponseTemplate;
const STARTUP_CONTEXT_HEADER: &str = "Startup context from Codex.";
const MEMORY_PROMPT_PHRASE: &str =
"You have access to a memory folder with guidance from prior runs.";
const REALTIME_CONVERSATION_TEST_SUBPROCESS_ENV_VAR: &str =
"CODEX_REALTIME_CONVERSATION_TEST_SUBPROCESS";
fn websocket_request_text(
request: &core_test_support::responses::WebSocketRequest,
) -> Option<String> {
@@ -85,32 +84,6 @@ where
}
}
fn run_realtime_conversation_test_in_subprocess(
test_name: &str,
openai_api_key: Option<&str>,
) -> Result<()> {
let mut command = Command::new(std::env::current_exe()?);
command
.arg("--exact")
.arg(test_name)
.env(REALTIME_CONVERSATION_TEST_SUBPROCESS_ENV_VAR, "1");
match openai_api_key {
Some(openai_api_key) => {
command.env(OPENAI_API_KEY_ENV_VAR, openai_api_key);
}
None => {
command.env_remove(OPENAI_API_KEY_ENV_VAR);
}
}
let output = command.output()?;
assert!(
output.status.success(),
"subprocess test `{test_name}` failed\nstdout:\n{}\nstderr:\n{}",
String::from_utf8_lossy(&output.stdout),
String::from_utf8_lossy(&output.stderr),
);
Ok(())
}
async fn seed_recent_thread(
test: &TestCodex,
title: &str,
@@ -289,17 +262,19 @@ async fn conversation_start_audio_text_close_round_trip() -> Result<()> {
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn conversation_start_uses_openai_env_key_fallback_with_chatgpt_auth() -> Result<()> {
if std::env::var_os(REALTIME_CONVERSATION_TEST_SUBPROCESS_ENV_VAR).is_none() {
return run_realtime_conversation_test_in_subprocess(
"suite::realtime_conversation::conversation_start_uses_openai_env_key_fallback_with_chatgpt_auth",
Some("env-realtime-key"),
);
}
async fn conversation_start_mints_client_secret_with_chatgpt_auth() -> Result<()> {
skip_if_no_network!(Ok(()));
let server = start_websocket_server(vec![
let chatgpt_server = start_chatgpt_mock_server().await;
mount_realtime_client_secret(
&chatgpt_server,
ResponseTemplate::new(200).set_body_json(json!({
"value": "ek-test-secret"
})),
)
.await;
let realtime_server = start_websocket_server(vec![
vec![],
vec![vec![json!({
"type": "session.updated",
@@ -308,9 +283,22 @@ async fn conversation_start_uses_openai_env_key_fallback_with_chatgpt_auth() ->
])
.await;
let mut builder = test_codex().with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing());
let test = builder.build_with_websocket_server(&server).await?;
assert!(server.wait_for_handshakes(1, Duration::from_secs(2)).await);
let mut builder = test_codex()
.with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing())
.with_config({
let chatgpt_base_url = chatgpt_server.uri();
move |config| {
config.chatgpt_base_url = chatgpt_base_url;
}
});
let test = builder
.build_with_websocket_server(&realtime_server)
.await?;
assert!(
realtime_server
.wait_for_handshakes(1, Duration::from_secs(2))
.await
);
test.codex
.submit(Op::RealtimeConversationStart(ConversationStartParams {
@@ -338,9 +326,35 @@ async fn conversation_start_uses_openai_env_key_fallback_with_chatgpt_auth() ->
assert_eq!(session_updated, "sess_env");
assert_eq!(
server.handshakes()[1].header("authorization").as_deref(),
Some("Bearer env-realtime-key")
realtime_server.handshakes()[1]
.header("authorization")
.as_deref(),
Some("Bearer ek-test-secret")
);
let requests = chatgpt_server.received_requests().await.unwrap_or_default();
assert_eq!(requests.len(), 1);
assert_eq!(requests[0].method.as_str(), "POST");
assert_eq!(requests[0].url.path(), "/codex/realtime/client_secrets");
assert_eq!(
requests[0]
.headers
.get("authorization")
.and_then(|value| value.to_str().ok()),
Some("Bearer Access Token")
);
assert_eq!(
requests[0]
.headers
.get("chatgpt-account-id")
.and_then(|value| value.to_str().ok()),
Some("account_id")
);
let request_body: Value = serde_json::from_slice(&requests[0].body)?;
assert_eq!(
request_body["session"]["model"],
json!("realtime-test-model")
);
assert_eq!(request_body["session"]["type"], json!("quicksilver"));
test.codex.submit(Op::RealtimeConversationClose).await?;
let _closed = wait_for_event_match(&test.codex, |msg| match msg {
@@ -349,7 +363,8 @@ async fn conversation_start_uses_openai_env_key_fallback_with_chatgpt_auth() ->
})
.await;
server.shutdown().await;
realtime_server.shutdown().await;
drop(chatgpt_server);
Ok(())
}
@@ -437,17 +452,25 @@ async fn conversation_audio_before_start_emits_error() -> Result<()> {
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn conversation_start_preflight_failure_emits_realtime_error_only() -> Result<()> {
if std::env::var_os(REALTIME_CONVERSATION_TEST_SUBPROCESS_ENV_VAR).is_none() {
return run_realtime_conversation_test_in_subprocess(
"suite::realtime_conversation::conversation_start_preflight_failure_emits_realtime_error_only",
/*openai_api_key*/ None,
);
}
skip_if_no_network!(Ok(()));
let chatgpt_server = start_chatgpt_mock_server().await;
mount_realtime_client_secret(
&chatgpt_server,
ResponseTemplate::new(401).set_body_json(json!({
"detail": "Could not parse your authentication token. Please try signing in again."
})),
)
.await;
let server = start_websocket_server(vec![]).await;
let mut builder = test_codex().with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing());
let mut builder = test_codex()
.with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing())
.with_config({
let chatgpt_base_url = chatgpt_server.uri();
move |config| {
config.chatgpt_base_url = chatgpt_base_url;
}
});
let test = builder.build_with_websocket_server(&server).await?;
test.codex
@@ -464,7 +487,7 @@ async fn conversation_start_preflight_failure_emits_realtime_error_only() -> Res
_ => None,
})
.await;
assert_eq!(err, "realtime conversation requires API key auth");
assert!(err.contains("401 Unauthorized"), "unexpected error: {err}");
let closed = timeout(Duration::from_millis(200), async {
wait_for_event_match(&test.codex, |msg| match msg {
@@ -520,6 +543,53 @@ async fn conversation_start_connect_failure_emits_realtime_error_only() -> Resul
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn conversation_start_does_not_mint_client_secret_for_non_openai_provider() -> Result<()> {
skip_if_no_network!(Ok(()));
let chatgpt_server = wiremock::MockServer::start().await;
let realtime_server = start_websocket_server(vec![vec![]]).await;
let mut builder = test_codex()
.with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing())
.with_config({
let chatgpt_base_url = chatgpt_server.uri();
move |config| {
config.chatgpt_base_url = chatgpt_base_url;
config.model_provider.name = "Custom Provider".to_string();
}
});
let test = builder
.build_with_websocket_server(&realtime_server)
.await?;
test.codex
.submit(Op::RealtimeConversationStart(ConversationStartParams {
prompt: "backend prompt".to_string(),
session_id: None,
}))
.await?;
let err = wait_for_event_match(&test.codex, |msg| match msg {
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
payload: RealtimeEvent::Error(message),
}) => Some(message.clone()),
_ => None,
})
.await;
assert_eq!(
err,
"realtime conversation requires API key or ChatGPT auth"
);
let requests = chatgpt_server.received_requests().await.unwrap_or_default();
assert_eq!(requests.len(), 0);
realtime_server.shutdown().await;
drop(chatgpt_server);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn conversation_text_before_start_emits_error() -> Result<()> {
skip_if_no_network!(Ok(()));