mirror of
https://github.com/openai/codex.git
synced 2026-04-23 22:24:57 +00:00
Compare commits
11 Commits
dev/bazel-
...
aibrahim/r
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3036dee57a | ||
|
|
5f9ad86d93 | ||
|
|
819b7a719e | ||
|
|
56d9b9c268 | ||
|
|
71be8598cf | ||
|
|
6151a0b447 | ||
|
|
035dcf8bc3 | ||
|
|
fd329382ae | ||
|
|
c145f6726a | ||
|
|
48188734e2 | ||
|
|
475883b07b |
@@ -1,7 +1,9 @@
|
||||
use anyhow::Context;
|
||||
use anyhow::Result;
|
||||
use app_test_support::ChatGptIdTokenClaims;
|
||||
use app_test_support::McpProcess;
|
||||
use app_test_support::create_mock_responses_server_sequence_unchecked;
|
||||
use app_test_support::encode_id_token;
|
||||
use app_test_support::to_response;
|
||||
use codex_app_server_protocol::JSONRPCError;
|
||||
use codex_app_server_protocol::JSONRPCResponse;
|
||||
@@ -27,6 +29,8 @@ use codex_app_server_protocol::ThreadStartResponse;
|
||||
use codex_features::FEATURES;
|
||||
use codex_features::Feature;
|
||||
use codex_protocol::protocol::RealtimeConversationVersion;
|
||||
use core_test_support::responses::mount_realtime_client_secret;
|
||||
use core_test_support::responses::start_chatgpt_mock_server;
|
||||
use core_test_support::responses::start_websocket_server;
|
||||
use core_test_support::skip_if_no_network;
|
||||
use pretty_assertions::assert_eq;
|
||||
@@ -36,6 +40,7 @@ use std::path::Path;
|
||||
use std::time::Duration;
|
||||
use tempfile::TempDir;
|
||||
use tokio::time::timeout;
|
||||
use wiremock::ResponseTemplate;
|
||||
|
||||
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10);
|
||||
const STARTUP_CONTEXT_HEADER: &str = "Startup context from Codex.";
|
||||
@@ -99,6 +104,7 @@ async fn realtime_conversation_streams_v2_notifications() -> Result<()> {
|
||||
codex_home.path(),
|
||||
&responses_server.uri(),
|
||||
realtime_server.uri(),
|
||||
None,
|
||||
true,
|
||||
)?;
|
||||
|
||||
@@ -306,6 +312,7 @@ async fn realtime_conversation_stop_emits_closed_notification() -> Result<()> {
|
||||
codex_home.path(),
|
||||
&responses_server.uri(),
|
||||
realtime_server.uri(),
|
||||
None,
|
||||
true,
|
||||
)?;
|
||||
|
||||
@@ -366,6 +373,93 @@ async fn realtime_conversation_stop_emits_closed_notification() -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn realtime_conversation_uses_client_secret_with_external_chatgpt_auth() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
let responses_server = create_mock_responses_server_sequence_unchecked(Vec::new()).await;
|
||||
let chatgpt_server = start_chatgpt_mock_server().await;
|
||||
mount_realtime_client_secret(
|
||||
&chatgpt_server,
|
||||
ResponseTemplate::new(200).set_body_json(json!({
|
||||
"value": "ek-app-server"
|
||||
})),
|
||||
)
|
||||
.await;
|
||||
let realtime_server = start_websocket_server(vec![vec![vec![json!({
|
||||
"type": "session.updated",
|
||||
"session": { "id": "sess_external", "instructions": "backend prompt" }
|
||||
})]]])
|
||||
.await;
|
||||
|
||||
let codex_home = TempDir::new()?;
|
||||
create_config_toml(
|
||||
codex_home.path(),
|
||||
&responses_server.uri(),
|
||||
realtime_server.uri(),
|
||||
Some(chatgpt_server.uri().as_str()),
|
||||
true,
|
||||
)?;
|
||||
|
||||
let mut mcp = McpProcess::new(codex_home.path()).await?;
|
||||
mcp.initialize().await?;
|
||||
login_with_chatgpt_auth_tokens(&mut mcp).await?;
|
||||
|
||||
let thread_start_request_id = mcp
|
||||
.send_thread_start_request(ThreadStartParams::default())
|
||||
.await?;
|
||||
let thread_start_response: JSONRPCResponse = timeout(
|
||||
DEFAULT_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(thread_start_request_id)),
|
||||
)
|
||||
.await??;
|
||||
let thread_start: ThreadStartResponse = to_response(thread_start_response)?;
|
||||
|
||||
let start_request_id = mcp
|
||||
.send_thread_realtime_start_request(ThreadRealtimeStartParams {
|
||||
thread_id: thread_start.thread.id.clone(),
|
||||
prompt: "backend prompt".to_string(),
|
||||
session_id: None,
|
||||
})
|
||||
.await?;
|
||||
let start_response: JSONRPCResponse = timeout(
|
||||
DEFAULT_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(start_request_id)),
|
||||
)
|
||||
.await??;
|
||||
let _: ThreadRealtimeStartResponse = to_response(start_response)?;
|
||||
|
||||
let started =
|
||||
read_notification::<ThreadRealtimeStartedNotification>(&mut mcp, "thread/realtime/started")
|
||||
.await?;
|
||||
assert_eq!(started.thread_id, thread_start.thread.id);
|
||||
assert_eq!(started.version, RealtimeConversationVersion::V2);
|
||||
|
||||
let requests = chatgpt_server.received_requests().await.unwrap_or_default();
|
||||
assert_eq!(requests.len(), 1);
|
||||
assert_eq!(requests[0].url.path(), "/codex/realtime/client_secrets");
|
||||
assert_eq!(
|
||||
requests[0]
|
||||
.headers
|
||||
.get("chatgpt-account-id")
|
||||
.and_then(|value| value.to_str().ok()),
|
||||
Some("org-siwc")
|
||||
);
|
||||
let request_body: serde_json::Value = serde_json::from_slice(&requests[0].body)?;
|
||||
assert_eq!(request_body["session"]["type"], json!("realtime"));
|
||||
|
||||
assert_eq!(
|
||||
realtime_server.handshakes()[0]
|
||||
.header("authorization")
|
||||
.as_deref(),
|
||||
Some("Bearer ek-app-server")
|
||||
);
|
||||
|
||||
realtime_server.shutdown().await;
|
||||
drop(chatgpt_server);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn realtime_conversation_requires_feature_flag() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
@@ -378,6 +472,7 @@ async fn realtime_conversation_requires_feature_flag() -> Result<()> {
|
||||
codex_home.path(),
|
||||
&responses_server.uri(),
|
||||
realtime_server.uri(),
|
||||
None,
|
||||
false,
|
||||
)?;
|
||||
|
||||
@@ -443,10 +538,40 @@ async fn login_with_api_key(mcp: &mut McpProcess, api_key: &str) -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn login_with_chatgpt_auth_tokens(mcp: &mut McpProcess) -> Result<()> {
|
||||
let access_token = encode_id_token(
|
||||
&ChatGptIdTokenClaims::new()
|
||||
.email("siwc@example.com")
|
||||
.plan_type("business")
|
||||
.chatgpt_account_id("org-siwc"),
|
||||
)?;
|
||||
let request_id = mcp
|
||||
.send_chatgpt_auth_tokens_login_request(
|
||||
access_token,
|
||||
"org-siwc".to_string(),
|
||||
Some("business".to_string()),
|
||||
)
|
||||
.await?;
|
||||
let response: JSONRPCResponse = timeout(
|
||||
DEFAULT_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(request_id)),
|
||||
)
|
||||
.await??;
|
||||
let login: LoginAccountResponse = to_response(response)?;
|
||||
assert_eq!(login, LoginAccountResponse::ChatgptAuthTokens {});
|
||||
let _updated = timeout(
|
||||
DEFAULT_TIMEOUT,
|
||||
mcp.read_stream_until_notification_message("account/updated"),
|
||||
)
|
||||
.await??;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn create_config_toml(
|
||||
codex_home: &Path,
|
||||
responses_server_uri: &str,
|
||||
realtime_server_uri: &str,
|
||||
chatgpt_base_url: Option<&str>,
|
||||
realtime_enabled: bool,
|
||||
) -> std::io::Result<()> {
|
||||
let realtime_feature_key = FEATURES
|
||||
@@ -454,6 +579,9 @@ fn create_config_toml(
|
||||
.find(|spec| spec.id == Feature::RealtimeConversation)
|
||||
.map(|spec| spec.key)
|
||||
.unwrap_or("realtime_conversation");
|
||||
let chatgpt_base_url = chatgpt_base_url
|
||||
.map(|url| format!("chatgpt_base_url = \"{url}\"\n"))
|
||||
.unwrap_or_default();
|
||||
|
||||
std::fs::write(
|
||||
codex_home.join("config.toml"),
|
||||
@@ -462,7 +590,7 @@ fn create_config_toml(
|
||||
model = "mock-model"
|
||||
approval_policy = "never"
|
||||
sandbox_mode = "read-only"
|
||||
model_provider = "mock_provider"
|
||||
{chatgpt_base_url}model_provider = "mock_provider"
|
||||
experimental_realtime_ws_base_url = "{realtime_server_uri}"
|
||||
|
||||
[realtime]
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
pub mod compact;
|
||||
pub mod memories;
|
||||
pub mod models;
|
||||
pub mod realtime_client_secrets;
|
||||
pub mod realtime_websocket;
|
||||
pub mod responses;
|
||||
pub mod responses_websocket;
|
||||
|
||||
201
codex-rs/codex-api/src/endpoint/realtime_client_secrets.rs
Normal file
201
codex-rs/codex-api/src/endpoint/realtime_client_secrets.rs
Normal file
@@ -0,0 +1,201 @@
|
||||
use crate::auth::AuthProvider;
|
||||
use crate::endpoint::realtime_websocket::RealtimeSessionConfig;
|
||||
use crate::endpoint::realtime_websocket::session_update_session_json;
|
||||
use crate::endpoint::session::EndpointSession;
|
||||
use crate::error::ApiError;
|
||||
use crate::provider::Provider;
|
||||
use codex_client::HttpTransport;
|
||||
use codex_client::RequestTelemetry;
|
||||
use http::HeaderMap;
|
||||
use http::Method;
|
||||
use serde::Deserialize;
|
||||
use serde_json::Value;
|
||||
use serde_json::json;
|
||||
use std::sync::Arc;
|
||||
|
||||
pub struct RealtimeClientSecretsClient<T: HttpTransport, A: AuthProvider> {
|
||||
session: EndpointSession<T, A>,
|
||||
}
|
||||
|
||||
impl<T: HttpTransport, A: AuthProvider> RealtimeClientSecretsClient<T, A> {
|
||||
pub fn new(transport: T, provider: Provider, auth: A) -> Self {
|
||||
Self {
|
||||
session: EndpointSession::new(transport, provider, auth),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn with_telemetry(self, request: Option<Arc<dyn RequestTelemetry>>) -> Self {
|
||||
Self {
|
||||
session: self.session.with_request_telemetry(request),
|
||||
}
|
||||
}
|
||||
|
||||
fn path() -> &'static str {
|
||||
"codex/realtime/client_secrets"
|
||||
}
|
||||
|
||||
pub async fn create(
|
||||
&self,
|
||||
config: &RealtimeSessionConfig,
|
||||
extra_headers: HeaderMap,
|
||||
) -> Result<String, ApiError> {
|
||||
let body = realtime_client_secret_request_body(config)?;
|
||||
let resp = self
|
||||
.session
|
||||
.execute(Method::POST, Self::path(), extra_headers, Some(body))
|
||||
.await?;
|
||||
let parsed: RealtimeClientSecretResponse =
|
||||
serde_json::from_slice(&resp.body).map_err(|err| {
|
||||
ApiError::Stream(format!(
|
||||
"failed to decode realtime client secret response: {err}"
|
||||
))
|
||||
})?;
|
||||
if parsed.value.trim().is_empty() {
|
||||
return Err(ApiError::Stream(
|
||||
"realtime client secret response was missing a value".to_string(),
|
||||
));
|
||||
}
|
||||
Ok(parsed.value)
|
||||
}
|
||||
}
|
||||
|
||||
fn realtime_client_secret_request_body(config: &RealtimeSessionConfig) -> Result<Value, ApiError> {
|
||||
let mut session = session_update_session_json(
|
||||
config.event_parser,
|
||||
config.instructions.clone(),
|
||||
config.session_mode,
|
||||
)?;
|
||||
if let Some(model) = config.model.as_ref()
|
||||
&& let Some(session_object) = session.as_object_mut()
|
||||
{
|
||||
session_object.insert("model".to_string(), Value::String(model.clone()));
|
||||
}
|
||||
|
||||
Ok(json!({
|
||||
"session": session,
|
||||
}))
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
struct RealtimeClientSecretResponse {
|
||||
value: String,
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::provider::RetryConfig;
|
||||
use async_trait::async_trait;
|
||||
use codex_client::Request;
|
||||
use codex_client::Response;
|
||||
use codex_client::StreamResponse;
|
||||
use codex_client::TransportError;
|
||||
use http::HeaderMap;
|
||||
use http::Method;
|
||||
use http::StatusCode;
|
||||
use pretty_assertions::assert_eq;
|
||||
use serde_json::json;
|
||||
use std::sync::Arc;
|
||||
use std::sync::Mutex;
|
||||
use std::time::Duration;
|
||||
|
||||
#[derive(Clone)]
|
||||
struct CapturingTransport {
|
||||
last_request: Arc<Mutex<Option<Request>>>,
|
||||
response_body: Arc<Vec<u8>>,
|
||||
}
|
||||
|
||||
impl CapturingTransport {
|
||||
fn new(response_body: Vec<u8>) -> Self {
|
||||
Self {
|
||||
last_request: Arc::new(Mutex::new(None)),
|
||||
response_body: Arc::new(response_body),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl HttpTransport for CapturingTransport {
|
||||
async fn execute(&self, req: Request) -> Result<Response, TransportError> {
|
||||
*self.last_request.lock().expect("lock request store") = Some(req);
|
||||
Ok(Response {
|
||||
status: StatusCode::OK,
|
||||
headers: HeaderMap::new(),
|
||||
body: self.response_body.as_ref().clone().into(),
|
||||
})
|
||||
}
|
||||
|
||||
async fn stream(&self, _req: Request) -> Result<StreamResponse, TransportError> {
|
||||
Err(TransportError::Build("stream should not run".to_string()))
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Default)]
|
||||
struct DummyAuth;
|
||||
|
||||
impl AuthProvider for DummyAuth {
|
||||
fn bearer_token(&self) -> Option<String> {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
fn provider(base_url: &str) -> Provider {
|
||||
Provider {
|
||||
name: "test".to_string(),
|
||||
base_url: base_url.to_string(),
|
||||
query_params: None,
|
||||
headers: HeaderMap::new(),
|
||||
retry: RetryConfig {
|
||||
max_attempts: 1,
|
||||
base_delay: Duration::from_millis(1),
|
||||
retry_429: false,
|
||||
retry_5xx: true,
|
||||
retry_transport: true,
|
||||
},
|
||||
stream_idle_timeout: Duration::from_secs(1),
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn create_posts_expected_payload_and_parses_value() {
|
||||
let transport = CapturingTransport::new(
|
||||
serde_json::to_vec(&json!({
|
||||
"value": "ek-test-secret"
|
||||
}))
|
||||
.expect("serialize response"),
|
||||
);
|
||||
let client = RealtimeClientSecretsClient::new(
|
||||
transport.clone(),
|
||||
provider("https://example.com/backend-api"),
|
||||
DummyAuth,
|
||||
);
|
||||
let session = RealtimeSessionConfig {
|
||||
instructions: "Be helpful".to_string(),
|
||||
model: Some("gpt-realtime".to_string()),
|
||||
session_id: Some("session-1".to_string()),
|
||||
event_parser: crate::endpoint::realtime_websocket::RealtimeEventParser::RealtimeV2,
|
||||
session_mode: crate::endpoint::realtime_websocket::RealtimeSessionMode::Conversational,
|
||||
};
|
||||
|
||||
let value = client
|
||||
.create(&session, HeaderMap::new())
|
||||
.await
|
||||
.expect("client secret request should succeed");
|
||||
assert_eq!(value, "ek-test-secret");
|
||||
|
||||
let request = transport
|
||||
.last_request
|
||||
.lock()
|
||||
.expect("lock request store")
|
||||
.clone()
|
||||
.expect("request should be captured");
|
||||
assert_eq!(request.method, Method::POST);
|
||||
assert_eq!(
|
||||
request.url,
|
||||
"https://example.com/backend-api/codex/realtime/client_secrets"
|
||||
);
|
||||
let body = request.body.expect("request body should be present");
|
||||
assert_eq!(body["session"]["type"], "realtime");
|
||||
assert_eq!(body["session"]["model"], "gpt-realtime");
|
||||
}
|
||||
}
|
||||
@@ -10,11 +10,13 @@ use crate::endpoint::realtime_websocket::protocol::RealtimeEventParser;
|
||||
use crate::endpoint::realtime_websocket::protocol::RealtimeOutboundMessage;
|
||||
use crate::endpoint::realtime_websocket::protocol::RealtimeSessionMode;
|
||||
use crate::endpoint::realtime_websocket::protocol::SessionUpdateSession;
|
||||
use crate::error::ApiError;
|
||||
use serde_json::Value;
|
||||
|
||||
pub(super) const REALTIME_AUDIO_SAMPLE_RATE: u32 = 24_000;
|
||||
const AGENT_FINAL_MESSAGE_PREFIX: &str = "\"Agent Final Message\":\n\n";
|
||||
|
||||
pub(super) fn normalized_session_mode(
|
||||
pub(crate) fn normalized_session_mode(
|
||||
event_parser: RealtimeEventParser,
|
||||
session_mode: RealtimeSessionMode,
|
||||
) -> RealtimeSessionMode {
|
||||
@@ -48,7 +50,7 @@ pub(super) fn conversation_handoff_append_message(
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) fn session_update_session(
|
||||
pub(crate) fn session_update_session(
|
||||
event_parser: RealtimeEventParser,
|
||||
instructions: String,
|
||||
session_mode: RealtimeSessionMode,
|
||||
@@ -60,6 +62,19 @@ pub(super) fn session_update_session(
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn session_update_session_json(
|
||||
event_parser: RealtimeEventParser,
|
||||
instructions: String,
|
||||
session_mode: RealtimeSessionMode,
|
||||
) -> Result<Value, ApiError> {
|
||||
serde_json::to_value(session_update_session(
|
||||
event_parser,
|
||||
instructions,
|
||||
session_mode,
|
||||
))
|
||||
.map_err(|err| ApiError::Stream(format!("failed to encode realtime session config: {err}")))
|
||||
}
|
||||
|
||||
pub(super) fn websocket_intent(event_parser: RealtimeEventParser) -> Option<&'static str> {
|
||||
match event_parser {
|
||||
RealtimeEventParser::V1 => v1_websocket_intent(),
|
||||
|
||||
@@ -13,6 +13,7 @@ pub use methods::RealtimeWebsocketClient;
|
||||
pub use methods::RealtimeWebsocketConnection;
|
||||
pub use methods::RealtimeWebsocketEvents;
|
||||
pub use methods::RealtimeWebsocketWriter;
|
||||
pub(crate) use methods_common::session_update_session_json;
|
||||
pub use protocol::RealtimeEventParser;
|
||||
pub use protocol::RealtimeSessionConfig;
|
||||
pub use protocol::RealtimeSessionMode;
|
||||
|
||||
@@ -30,6 +30,7 @@ pub use crate::common::response_create_client_metadata;
|
||||
pub use crate::endpoint::compact::CompactClient;
|
||||
pub use crate::endpoint::memories::MemoriesClient;
|
||||
pub use crate::endpoint::models::ModelsClient;
|
||||
pub use crate::endpoint::realtime_client_secrets::RealtimeClientSecretsClient;
|
||||
pub use crate::endpoint::realtime_websocket::RealtimeEventParser;
|
||||
pub use crate::endpoint::realtime_websocket::RealtimeSessionConfig;
|
||||
pub use crate::endpoint::realtime_websocket::RealtimeSessionMode;
|
||||
|
||||
@@ -216,6 +216,13 @@ pub(crate) struct CoreAuthProvider {
|
||||
}
|
||||
|
||||
impl CoreAuthProvider {
|
||||
pub(crate) fn from_auth(auth: &CodexAuth) -> crate::error::Result<Self> {
|
||||
Ok(Self {
|
||||
token: Some(auth.get_token()?),
|
||||
account_id: auth.get_account_id(),
|
||||
})
|
||||
}
|
||||
|
||||
pub(crate) fn auth_header_attached(&self) -> bool {
|
||||
self.token
|
||||
.as_ref()
|
||||
|
||||
@@ -1,9 +1,10 @@
|
||||
use crate::CodexAuth;
|
||||
use crate::api_bridge::CoreAuthProvider;
|
||||
use crate::api_bridge::map_api_error;
|
||||
use crate::auth::read_openai_api_key_from_env;
|
||||
use crate::codex::Session;
|
||||
use crate::config::RealtimeWsMode;
|
||||
use crate::config::RealtimeWsVersion;
|
||||
use crate::default_client::build_reqwest_client;
|
||||
use crate::default_client::default_headers;
|
||||
use crate::error::CodexErr;
|
||||
use crate::error::Result as CodexResult;
|
||||
@@ -15,11 +16,13 @@ use base64::Engine;
|
||||
use base64::engine::general_purpose::STANDARD as BASE64_STANDARD;
|
||||
use codex_api::Provider as ApiProvider;
|
||||
use codex_api::RealtimeAudioFrame;
|
||||
use codex_api::RealtimeClientSecretsClient;
|
||||
use codex_api::RealtimeEvent;
|
||||
use codex_api::RealtimeEventParser;
|
||||
use codex_api::RealtimeSessionConfig;
|
||||
use codex_api::RealtimeSessionMode;
|
||||
use codex_api::RealtimeWebsocketClient;
|
||||
use codex_api::ReqwestTransport;
|
||||
use codex_api::endpoint::realtime_websocket::RealtimeWebsocketEvents;
|
||||
use codex_api::endpoint::realtime_websocket::RealtimeWebsocketWriter;
|
||||
use codex_protocol::protocol::CodexErrorInfo;
|
||||
@@ -453,7 +456,6 @@ async fn prepare_realtime_start(
|
||||
) -> CodexResult<PreparedRealtimeConversationStart> {
|
||||
let provider = sess.provider().await;
|
||||
let auth = sess.services.auth_manager.auth().await;
|
||||
let realtime_api_key = realtime_api_key(auth.as_ref(), &provider)?;
|
||||
let mut api_provider = provider.to_api_provider(Some(crate::auth::AuthMode::ApiKey))?;
|
||||
let config = sess.get_config().await;
|
||||
if let Some(realtime_ws_base_url) = &config.experimental_realtime_ws_base_url {
|
||||
@@ -494,8 +496,12 @@ async fn prepare_realtime_start(
|
||||
event_parser,
|
||||
session_mode,
|
||||
};
|
||||
let extra_headers =
|
||||
realtime_request_headers(requested_session_id.as_deref(), realtime_api_key.as_str())?;
|
||||
let realtime_bearer_token =
|
||||
realtime_bearer_token(auth.as_ref(), &provider, &config, &session_config).await?;
|
||||
let extra_headers = realtime_request_headers(
|
||||
requested_session_id.as_deref(),
|
||||
realtime_bearer_token.as_str(),
|
||||
)?;
|
||||
Ok(PreparedRealtimeConversationStart {
|
||||
api_provider,
|
||||
extra_headers,
|
||||
@@ -625,9 +631,11 @@ fn realtime_text_from_handoff_request(handoff: &RealtimeHandoffRequested) -> Opt
|
||||
.or((!handoff.input_transcript.is_empty()).then_some(handoff.input_transcript.clone()))
|
||||
}
|
||||
|
||||
fn realtime_api_key(
|
||||
async fn realtime_bearer_token(
|
||||
auth: Option<&CodexAuth>,
|
||||
provider: &crate::ModelProviderInfo,
|
||||
config: &crate::config::Config,
|
||||
session_config: &RealtimeSessionConfig,
|
||||
) -> CodexResult<String> {
|
||||
if let Some(api_key) = provider.api_key()? {
|
||||
return Ok(api_key);
|
||||
@@ -637,26 +645,51 @@ fn realtime_api_key(
|
||||
return Ok(token);
|
||||
}
|
||||
|
||||
if let Some(api_key) = auth.and_then(CodexAuth::api_key) {
|
||||
return Ok(api_key.to_string());
|
||||
}
|
||||
|
||||
// TODO(aibrahim): Remove this temporary fallback once realtime auth no longer
|
||||
// requires API key auth for ChatGPT/SIWC sessions.
|
||||
if provider.is_openai()
|
||||
&& let Some(api_key) = read_openai_api_key_from_env()
|
||||
{
|
||||
return Ok(api_key);
|
||||
if let Some(auth) = auth {
|
||||
if auth.is_chatgpt_auth() && provider.is_openai() {
|
||||
let auth_provider = CoreAuthProvider::from_auth(auth)?;
|
||||
let transport = ReqwestTransport::new(build_reqwest_client());
|
||||
let client = RealtimeClientSecretsClient::new(
|
||||
transport,
|
||||
realtime_client_secret_provider(config)?,
|
||||
auth_provider,
|
||||
);
|
||||
return client
|
||||
.create(session_config, HeaderMap::new())
|
||||
.await
|
||||
.map_err(map_api_error);
|
||||
}
|
||||
if let Some(api_key) = auth.api_key() {
|
||||
return Ok(api_key.to_string());
|
||||
}
|
||||
}
|
||||
|
||||
Err(CodexErr::InvalidRequest(
|
||||
"realtime conversation requires API key auth".to_string(),
|
||||
"realtime conversation requires API key or ChatGPT auth".to_string(),
|
||||
))
|
||||
}
|
||||
|
||||
fn realtime_client_secret_provider(config: &crate::config::Config) -> CodexResult<ApiProvider> {
|
||||
crate::ModelProviderInfo::create_openai_provider(Some(normalized_chatgpt_base_url(
|
||||
&config.chatgpt_base_url,
|
||||
)))
|
||||
.to_api_provider(Some(crate::auth::AuthMode::Chatgpt))
|
||||
}
|
||||
|
||||
fn normalized_chatgpt_base_url(input: &str) -> String {
|
||||
let mut base_url = input.trim_end_matches('/').to_string();
|
||||
if (base_url.starts_with("https://chatgpt.com")
|
||||
|| base_url.starts_with("https://chat.openai.com"))
|
||||
&& !base_url.contains("/backend-api")
|
||||
{
|
||||
base_url = format!("{base_url}/backend-api");
|
||||
}
|
||||
base_url
|
||||
}
|
||||
|
||||
fn realtime_request_headers(
|
||||
session_id: Option<&str>,
|
||||
api_key: &str,
|
||||
bearer_token: &str,
|
||||
) -> CodexResult<Option<HeaderMap>> {
|
||||
let mut headers = HeaderMap::new();
|
||||
|
||||
@@ -666,8 +699,8 @@ fn realtime_request_headers(
|
||||
headers.insert("x-session-id", session_id);
|
||||
}
|
||||
|
||||
let auth_value = HeaderValue::from_str(&format!("Bearer {api_key}")).map_err(|err| {
|
||||
CodexErr::InvalidRequest(format!("invalid realtime api key header: {err}"))
|
||||
let auth_value = HeaderValue::from_str(&format!("Bearer {bearer_token}")).map_err(|err| {
|
||||
CodexErr::InvalidRequest(format!("invalid realtime bearer token header: {err}"))
|
||||
})?;
|
||||
headers.insert(AUTHORIZATION, auth_value);
|
||||
|
||||
|
||||
@@ -31,6 +31,7 @@ use wiremock::ResponseTemplate;
|
||||
use wiremock::http::HeaderName;
|
||||
use wiremock::http::HeaderValue;
|
||||
use wiremock::matchers::method;
|
||||
use wiremock::matchers::path;
|
||||
use wiremock::matchers::path_regex;
|
||||
|
||||
use crate::test_codex::ApplyPatchModelOutput;
|
||||
@@ -1177,6 +1178,33 @@ pub async fn start_mock_server() -> MockServer {
|
||||
server
|
||||
}
|
||||
|
||||
pub async fn start_chatgpt_mock_server() -> MockServer {
|
||||
let server = MockServer::builder()
|
||||
.body_print_limit(BodyPrintLimit::Limited(80_000))
|
||||
.start()
|
||||
.await;
|
||||
|
||||
mount_chatgpt_cloud_requirements_ok(&server).await;
|
||||
|
||||
server
|
||||
}
|
||||
|
||||
pub async fn mount_chatgpt_cloud_requirements_ok(server: &MockServer) {
|
||||
Mock::given(method("GET"))
|
||||
.and(path("/backend-api/wham/config/requirements"))
|
||||
.respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({})))
|
||||
.mount(server)
|
||||
.await;
|
||||
}
|
||||
|
||||
pub async fn mount_realtime_client_secret(server: &MockServer, response: ResponseTemplate) {
|
||||
Mock::given(method("POST"))
|
||||
.and(path("/codex/realtime/client_secrets"))
|
||||
.respond_with(response)
|
||||
.mount(server)
|
||||
.await;
|
||||
}
|
||||
|
||||
/// Starts a lightweight WebSocket server for `/v1/responses` tests.
|
||||
///
|
||||
/// Each connection consumes a queue of request/event sequences. For each
|
||||
|
||||
@@ -179,7 +179,7 @@ impl ActionKind {
|
||||
Ok((event, Some(command)))
|
||||
}
|
||||
ActionKind::RunCommand { command } => {
|
||||
let event = shell_event(call_id, command, 1_000, sandbox_permissions)?;
|
||||
let event = shell_event(call_id, command, 2_000, sandbox_permissions)?;
|
||||
Ok((event, Some(command.to_string())))
|
||||
}
|
||||
ActionKind::RunUnifiedExecCommand {
|
||||
|
||||
@@ -2,7 +2,6 @@ use anyhow::Context;
|
||||
use anyhow::Result;
|
||||
use chrono::Utc;
|
||||
use codex_core::CodexAuth;
|
||||
use codex_core::auth::OPENAI_API_KEY_ENV_VAR;
|
||||
use codex_protocol::ThreadId;
|
||||
use codex_protocol::protocol::CodexErrorInfo;
|
||||
use codex_protocol::protocol::ConversationAudioParams;
|
||||
@@ -18,6 +17,8 @@ use codex_protocol::protocol::RealtimeEvent;
|
||||
use codex_protocol::protocol::SessionSource;
|
||||
use codex_protocol::user_input::UserInput;
|
||||
use core_test_support::responses;
|
||||
use core_test_support::responses::mount_realtime_client_secret;
|
||||
use core_test_support::responses::start_chatgpt_mock_server;
|
||||
use core_test_support::responses::start_mock_server;
|
||||
use core_test_support::responses::start_websocket_server;
|
||||
use core_test_support::skip_if_no_network;
|
||||
@@ -31,16 +32,14 @@ use pretty_assertions::assert_eq;
|
||||
use serde_json::Value;
|
||||
use serde_json::json;
|
||||
use std::fs;
|
||||
use std::process::Command;
|
||||
use std::time::Duration;
|
||||
use tokio::sync::oneshot;
|
||||
use tokio::time::timeout;
|
||||
use wiremock::ResponseTemplate;
|
||||
|
||||
const STARTUP_CONTEXT_HEADER: &str = "Startup context from Codex.";
|
||||
const MEMORY_PROMPT_PHRASE: &str =
|
||||
"You have access to a memory folder with guidance from prior runs.";
|
||||
const REALTIME_CONVERSATION_TEST_SUBPROCESS_ENV_VAR: &str =
|
||||
"CODEX_REALTIME_CONVERSATION_TEST_SUBPROCESS";
|
||||
fn websocket_request_text(
|
||||
request: &core_test_support::responses::WebSocketRequest,
|
||||
) -> Option<String> {
|
||||
@@ -85,32 +84,6 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
fn run_realtime_conversation_test_in_subprocess(
|
||||
test_name: &str,
|
||||
openai_api_key: Option<&str>,
|
||||
) -> Result<()> {
|
||||
let mut command = Command::new(std::env::current_exe()?);
|
||||
command
|
||||
.arg("--exact")
|
||||
.arg(test_name)
|
||||
.env(REALTIME_CONVERSATION_TEST_SUBPROCESS_ENV_VAR, "1");
|
||||
match openai_api_key {
|
||||
Some(openai_api_key) => {
|
||||
command.env(OPENAI_API_KEY_ENV_VAR, openai_api_key);
|
||||
}
|
||||
None => {
|
||||
command.env_remove(OPENAI_API_KEY_ENV_VAR);
|
||||
}
|
||||
}
|
||||
let output = command.output()?;
|
||||
assert!(
|
||||
output.status.success(),
|
||||
"subprocess test `{test_name}` failed\nstdout:\n{}\nstderr:\n{}",
|
||||
String::from_utf8_lossy(&output.stdout),
|
||||
String::from_utf8_lossy(&output.stderr),
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
async fn seed_recent_thread(
|
||||
test: &TestCodex,
|
||||
title: &str,
|
||||
@@ -289,17 +262,19 @@ async fn conversation_start_audio_text_close_round_trip() -> Result<()> {
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn conversation_start_uses_openai_env_key_fallback_with_chatgpt_auth() -> Result<()> {
|
||||
if std::env::var_os(REALTIME_CONVERSATION_TEST_SUBPROCESS_ENV_VAR).is_none() {
|
||||
return run_realtime_conversation_test_in_subprocess(
|
||||
"suite::realtime_conversation::conversation_start_uses_openai_env_key_fallback_with_chatgpt_auth",
|
||||
Some("env-realtime-key"),
|
||||
);
|
||||
}
|
||||
|
||||
async fn conversation_start_mints_client_secret_with_chatgpt_auth() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
let server = start_websocket_server(vec![
|
||||
let chatgpt_server = start_chatgpt_mock_server().await;
|
||||
mount_realtime_client_secret(
|
||||
&chatgpt_server,
|
||||
ResponseTemplate::new(200).set_body_json(json!({
|
||||
"value": "ek-test-secret"
|
||||
})),
|
||||
)
|
||||
.await;
|
||||
|
||||
let realtime_server = start_websocket_server(vec![
|
||||
vec![],
|
||||
vec![vec![json!({
|
||||
"type": "session.updated",
|
||||
@@ -308,9 +283,22 @@ async fn conversation_start_uses_openai_env_key_fallback_with_chatgpt_auth() ->
|
||||
])
|
||||
.await;
|
||||
|
||||
let mut builder = test_codex().with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing());
|
||||
let test = builder.build_with_websocket_server(&server).await?;
|
||||
assert!(server.wait_for_handshakes(1, Duration::from_secs(2)).await);
|
||||
let mut builder = test_codex()
|
||||
.with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing())
|
||||
.with_config({
|
||||
let chatgpt_base_url = chatgpt_server.uri();
|
||||
move |config| {
|
||||
config.chatgpt_base_url = chatgpt_base_url;
|
||||
}
|
||||
});
|
||||
let test = builder
|
||||
.build_with_websocket_server(&realtime_server)
|
||||
.await?;
|
||||
assert!(
|
||||
realtime_server
|
||||
.wait_for_handshakes(1, Duration::from_secs(2))
|
||||
.await
|
||||
);
|
||||
|
||||
test.codex
|
||||
.submit(Op::RealtimeConversationStart(ConversationStartParams {
|
||||
@@ -338,9 +326,35 @@ async fn conversation_start_uses_openai_env_key_fallback_with_chatgpt_auth() ->
|
||||
assert_eq!(session_updated, "sess_env");
|
||||
|
||||
assert_eq!(
|
||||
server.handshakes()[1].header("authorization").as_deref(),
|
||||
Some("Bearer env-realtime-key")
|
||||
realtime_server.handshakes()[1]
|
||||
.header("authorization")
|
||||
.as_deref(),
|
||||
Some("Bearer ek-test-secret")
|
||||
);
|
||||
let requests = chatgpt_server.received_requests().await.unwrap_or_default();
|
||||
assert_eq!(requests.len(), 1);
|
||||
assert_eq!(requests[0].method.as_str(), "POST");
|
||||
assert_eq!(requests[0].url.path(), "/codex/realtime/client_secrets");
|
||||
assert_eq!(
|
||||
requests[0]
|
||||
.headers
|
||||
.get("authorization")
|
||||
.and_then(|value| value.to_str().ok()),
|
||||
Some("Bearer Access Token")
|
||||
);
|
||||
assert_eq!(
|
||||
requests[0]
|
||||
.headers
|
||||
.get("chatgpt-account-id")
|
||||
.and_then(|value| value.to_str().ok()),
|
||||
Some("account_id")
|
||||
);
|
||||
let request_body: Value = serde_json::from_slice(&requests[0].body)?;
|
||||
assert_eq!(
|
||||
request_body["session"]["model"],
|
||||
json!("realtime-test-model")
|
||||
);
|
||||
assert_eq!(request_body["session"]["type"], json!("quicksilver"));
|
||||
|
||||
test.codex.submit(Op::RealtimeConversationClose).await?;
|
||||
let _closed = wait_for_event_match(&test.codex, |msg| match msg {
|
||||
@@ -349,7 +363,8 @@ async fn conversation_start_uses_openai_env_key_fallback_with_chatgpt_auth() ->
|
||||
})
|
||||
.await;
|
||||
|
||||
server.shutdown().await;
|
||||
realtime_server.shutdown().await;
|
||||
drop(chatgpt_server);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -437,17 +452,25 @@ async fn conversation_audio_before_start_emits_error() -> Result<()> {
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn conversation_start_preflight_failure_emits_realtime_error_only() -> Result<()> {
|
||||
if std::env::var_os(REALTIME_CONVERSATION_TEST_SUBPROCESS_ENV_VAR).is_none() {
|
||||
return run_realtime_conversation_test_in_subprocess(
|
||||
"suite::realtime_conversation::conversation_start_preflight_failure_emits_realtime_error_only",
|
||||
/*openai_api_key*/ None,
|
||||
);
|
||||
}
|
||||
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
let chatgpt_server = start_chatgpt_mock_server().await;
|
||||
mount_realtime_client_secret(
|
||||
&chatgpt_server,
|
||||
ResponseTemplate::new(401).set_body_json(json!({
|
||||
"detail": "Could not parse your authentication token. Please try signing in again."
|
||||
})),
|
||||
)
|
||||
.await;
|
||||
let server = start_websocket_server(vec![]).await;
|
||||
let mut builder = test_codex().with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing());
|
||||
let mut builder = test_codex()
|
||||
.with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing())
|
||||
.with_config({
|
||||
let chatgpt_base_url = chatgpt_server.uri();
|
||||
move |config| {
|
||||
config.chatgpt_base_url = chatgpt_base_url;
|
||||
}
|
||||
});
|
||||
let test = builder.build_with_websocket_server(&server).await?;
|
||||
|
||||
test.codex
|
||||
@@ -464,7 +487,7 @@ async fn conversation_start_preflight_failure_emits_realtime_error_only() -> Res
|
||||
_ => None,
|
||||
})
|
||||
.await;
|
||||
assert_eq!(err, "realtime conversation requires API key auth");
|
||||
assert!(err.contains("401 Unauthorized"), "unexpected error: {err}");
|
||||
|
||||
let closed = timeout(Duration::from_millis(200), async {
|
||||
wait_for_event_match(&test.codex, |msg| match msg {
|
||||
@@ -520,6 +543,53 @@ async fn conversation_start_connect_failure_emits_realtime_error_only() -> Resul
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn conversation_start_does_not_mint_client_secret_for_non_openai_provider() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
let chatgpt_server = wiremock::MockServer::start().await;
|
||||
let realtime_server = start_websocket_server(vec![vec![]]).await;
|
||||
|
||||
let mut builder = test_codex()
|
||||
.with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing())
|
||||
.with_config({
|
||||
let chatgpt_base_url = chatgpt_server.uri();
|
||||
move |config| {
|
||||
config.chatgpt_base_url = chatgpt_base_url;
|
||||
config.model_provider.name = "Custom Provider".to_string();
|
||||
}
|
||||
});
|
||||
let test = builder
|
||||
.build_with_websocket_server(&realtime_server)
|
||||
.await?;
|
||||
|
||||
test.codex
|
||||
.submit(Op::RealtimeConversationStart(ConversationStartParams {
|
||||
prompt: "backend prompt".to_string(),
|
||||
session_id: None,
|
||||
}))
|
||||
.await?;
|
||||
|
||||
let err = wait_for_event_match(&test.codex, |msg| match msg {
|
||||
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
|
||||
payload: RealtimeEvent::Error(message),
|
||||
}) => Some(message.clone()),
|
||||
_ => None,
|
||||
})
|
||||
.await;
|
||||
assert_eq!(
|
||||
err,
|
||||
"realtime conversation requires API key or ChatGPT auth"
|
||||
);
|
||||
|
||||
let requests = chatgpt_server.received_requests().await.unwrap_or_default();
|
||||
assert_eq!(requests.len(), 0);
|
||||
|
||||
realtime_server.shutdown().await;
|
||||
drop(chatgpt_server);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn conversation_text_before_start_emits_error() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
Reference in New Issue
Block a user