Compare commits

...

24 Commits

Author SHA1 Message Date
Ahmed Ibrahim
ab9c89e6e5 Fix realtime parser item_id handling and test callsites
Preserve item_id on v1 audio deltas and annotate opaque None arguments in calls URL tests so argument-comment lint passes.

Co-authored-by: Codex <noreply@openai.com>
2026-04-05 13:01:43 -07:00
Ahmed Ibrahim
fbd2b534b8 Support ChatGPT realtime calls auth
Route realtime call auth through CoreAuthProvider and send JSON payloads for non-v1 realtime calls while preserving the v1 multipart request shape.

Co-authored-by: Codex <noreply@openai.com>
2026-04-05 13:01:43 -07:00
Ahmed Ibrahim
eb27a0773b Remove local realtime playback interruption
Co-authored-by: Codex <noreply@openai.com>
2026-04-05 13:00:45 -07:00
Ahmed Ibrahim
90f10e9ab9 Route TUI realtime audio through shared echo cancellation
Co-authored-by: Codex <noreply@openai.com>
2026-04-05 13:00:28 -07:00
Ahmed Ibrahim
8207ff6ead codex: fix realtime CI failures
Install the rustls crypto provider in the realtime WebRTC test server so the app-server realtime tests stop panicking, and update the core handshake assertion to match the current realtime calls URL.

Co-authored-by: Codex <noreply@openai.com>
2026-04-05 10:57:48 -07:00
Ahmed Ibrahim
99f72e27ae codex: fix CI failure on PR #16805
Keep the first realtime audio frame from being dropped during WebRTC startup.

Also give the realtime test server a tiny flush window before it closes the data channel so the last scripted events land reliably.

Co-authored-by: Codex <noreply@openai.com>
2026-04-05 10:44:06 -07:00
Ahmed Ibrahim
5d5305c5d4 codex: fix CI failure on PR #16805
Stop the realtime test WebRTC server from hanging after the client closes.

Teach the scripted request loop to exit when the data channel or peer connection closes so compact-remote tests can unwind instead of timing out at shutdown.

Co-authored-by: Codex <noreply@openai.com>
2026-04-05 10:26:58 -07:00
Ahmed Ibrahim
236891f5d3 Use non-silent realtime test audio fixtures
- replace silent PCM realtime test fixtures with a deterministic tone
- avoid codec paths optimizing away audio in WebRTC test flows

Co-authored-by: Codex <noreply@openai.com>
2026-04-05 09:48:50 -07:00
Ahmed Ibrahim
11180cefd9 Fix realtime test audio fixtures
- use valid 24 kHz mono PCM audio in realtime tests
- keep websocket/WebRTC request sequencing aligned with transport behavior

Co-authored-by: Codex <noreply@openai.com>
2026-04-05 01:38:42 -07:00
Ahmed Ibrahim
76397dbdd0 Use opus-rs for realtime transport
- switch realtime transport and test server to opus-rs
- drop native opus bazel and cmake plumbing

Co-authored-by: Codex <noreply@openai.com>
2026-04-05 00:43:41 -07:00
Ahmed Ibrahim
f9739d0178 Add git diff headers to Opus Bazel patch
Co-authored-by: Codex <noreply@openai.com>
2026-04-04 16:17:54 -07:00
Ahmed Ibrahim
bbff67a7b9 Fix Opus Bazel patch hunk headers
Co-authored-by: Codex <noreply@openai.com>
2026-04-04 16:13:38 -07:00
Ahmed Ibrahim
d887e0be7f Fix Opus Bazel patch paths for BCR overlays
Co-authored-by: Codex <noreply@openai.com>
2026-04-04 16:05:41 -07:00
Ahmed Ibrahim
1f9567d121 Disable Opus stack probes under Windows gnullvm Bazel builds
Co-authored-by: Codex <noreply@openai.com>
2026-04-04 15:58:17 -07:00
Ahmed Ibrahim
763dc66fb6 Return realtime test SDP answers before data channel setup
Co-authored-by: Codex <noreply@openai.com>
2026-04-04 15:30:07 -07:00
Ahmed Ibrahim
357140d3c9 Fix realtime parser item_id handling and test callsites
Preserve item_id on v1 audio deltas and annotate opaque None arguments in calls URL tests so argument-comment lint passes.

Co-authored-by: Codex <noreply@openai.com>
2026-04-04 14:18:19 -07:00
Ahmed Ibrahim
1aa17fafcc Fix realtime test server helper imports
Use the workspace opus crate name directly and clone the request sender before moving it into the data-channel callback so the RTP track handler can still enqueue decoded packets.

Co-authored-by: Codex <noreply@openai.com>
2026-04-04 14:11:47 -07:00
Ahmed Ibrahim
5462954edd codex: fix CI failure on PR #16805
Explicitly include first-level response helper modules in the core_test_support Bazel target so realtime_webrtc_server.rs is available to macOS builds and lints.

Co-authored-by: Codex <noreply@openai.com>
2026-04-04 13:31:53 -07:00
Ahmed Ibrahim
da1ad103fa codex: fix CI failure on PR #16805
Include nested response helper modules in the core_test_support Bazel target so the new realtime WebRTC test server source is visible to macOS Bazel and argument-comment-lint jobs.

Co-authored-by: Codex <noreply@openai.com>
2026-04-04 13:26:19 -07:00
Ahmed Ibrahim
4abb01d268 codex: fix CI failure on PR #16805
Ignore the workspace `opus` dependency in cargo-shear for core_test_support because Rust imports that package as `audiopus`.

Co-authored-by: Codex <noreply@openai.com>
2026-04-04 13:20:32 -07:00
Ahmed Ibrahim
d6d8d6304d Add WebRTC support to realtime test server
Teach the shared test helper to accept realtime /calls POSTs, answer SDP offers, and relay scripted events over a data channel while logging incoming RTP audio packets as synthetic append requests. Update the one stale handshake-path assertion to the /realtime/calls URL.

Co-authored-by: Codex <noreply@openai.com>
2026-04-04 13:14:31 -07:00
Ahmed Ibrahim
63c1223141 codex: fix CI failure on PR #16805
Link audiopus_sys against the Bazel opus module so remote Bazel builds do not depend on host cmake.

Co-authored-by: Codex <noreply@openai.com>
2026-04-04 12:56:37 -07:00
Ahmed Ibrahim
a40422b85f codex: fix CI failure on PR #16805
Install cmake on Linux and macOS Bazel runners so audiopus_sys build scripts can build bundled Opus.

Co-authored-by: Codex <noreply@openai.com>
2026-04-04 12:51:33 -07:00
Ahmed Ibrahim
175d831ff4 Replace realtime websocket transport with WebRTC
Move the realtime model transport implementation to WebRTC while keeping the core session/event interface intact for the TUI layer.

Co-authored-by: Codex <noreply@openai.com>
2026-04-04 12:43:42 -07:00
32 changed files with 3245 additions and 2370 deletions

View File

@@ -278,6 +278,74 @@ http_archive = use_repo_rule("@bazel_tools//tools/build_defs/repo:http.bzl", "ht
http_file = use_repo_rule("@bazel_tools//tools/build_defs/repo:http.bzl", "http_file")
new_local_repository = use_repo_rule("@bazel_tools//tools/build_defs/repo:local.bzl", "new_local_repository")
http_archive(
name = "ninja_1.11.0_linux",
urls = [
"https://github.com/ninja-build/ninja/releases/download/v1.11.0/ninja-linux.zip",
],
sha256 = "9726e730d5b8599f82654dc80265e64a10a8a817552c34153361ed0c017f9f02",
strip_prefix = "",
build_file_content = """\
package(default_visibility = ["//visibility:public"])
filegroup(
name = "ninja_bin",
srcs = ["ninja"],
)
""",
)
http_archive(
name = "ninja_1.12.1_linux_aarch64",
urls = [
"https://github.com/ninja-build/ninja/releases/download/v1.12.1/ninja-linux-aarch64.zip",
],
sha256 = "5c25c6570b0155e95fce5918cb95f1ad9870df5768653afe128db822301a05a1",
strip_prefix = "",
build_file_content = """\
package(default_visibility = ["//visibility:public"])
filegroup(
name = "ninja_bin",
srcs = ["ninja"],
)
""",
)
http_archive(
name = "ninja_1.11.0_mac",
urls = [
"https://github.com/ninja-build/ninja/releases/download/v1.11.0/ninja-mac.zip",
],
sha256 = "21915277db59756bfc61f6f281c1f5e3897760b63776fd3d360f77dd7364137f",
strip_prefix = "",
build_file_content = """\
package(default_visibility = ["//visibility:public"])
filegroup(
name = "ninja_bin",
srcs = ["ninja"],
)
""",
)
http_archive(
name = "ninja_1.11.0_win",
urls = [
"https://github.com/ninja-build/ninja/releases/download/v1.11.0/ninja-win.zip",
],
sha256 = "d0ee3da143211aa447e750085876c9b9d7bcdd637ab5b2c5b41349c617f22f3b",
strip_prefix = "",
build_file_content = """\
package(default_visibility = ["//visibility:public"])
filegroup(
name = "ninja_bin",
srcs = ["ninja.exe"],
)
""",
)
new_local_repository(
name = "v8_targets",
build_file = "//third_party/v8:BUILD.bazel",
@@ -345,6 +413,41 @@ crate.annotation(
inject_repo(crate, "alsa_lib")
bazel_dep(name = "meson", version = "1.5.1.bcr.1")
crate.annotation(
build_script_tools = [
"@meson//:meson",
"@ninja_1.11.0_linux//:ninja_bin",
"@ninja_1.12.1_linux_aarch64//:ninja_bin",
"@ninja_1.11.0_mac//:ninja_bin",
"@ninja_1.11.0_win//:ninja_bin",
],
build_script_env = {
"MESON": "$(execpath @meson//:meson)",
},
build_script_env_select = {
"aarch64-apple-darwin": "{\"NINJA_REAL\":\"$(execpath @ninja_1.12.1_linux_aarch64//:ninja_bin)\"}",
"x86_64-apple-darwin": "{\"NINJA_REAL\":\"$(execpath @ninja_1.12.1_linux_aarch64//:ninja_bin)\"}",
"aarch64-unknown-linux-gnu": "{\"NINJA_REAL\":\"$(execpath @ninja_1.11.0_linux//:ninja_bin)\"}",
"x86_64-unknown-linux-gnu": "{\"NINJA_REAL\":\"$(execpath @ninja_1.11.0_linux//:ninja_bin)\"}",
"aarch64-unknown-linux-musl": "{\"NINJA_REAL\":\"$(execpath @ninja_1.11.0_linux//:ninja_bin)\"}",
"x86_64-unknown-linux-musl": "{\"NINJA_REAL\":\"$(execpath @ninja_1.11.0_linux//:ninja_bin)\"}",
"aarch64-pc-windows-msvc": "{\"NINJA\":\"$(execpath @ninja_1.11.0_win//:ninja_bin)\"}",
"x86_64-pc-windows-msvc": "{\"NINJA\":\"$(execpath @ninja_1.11.0_win//:ninja_bin)\"}",
"aarch64-pc-windows-gnullvm": "{\"NINJA\":\"$(execpath @ninja_1.11.0_win//:ninja_bin)\"}",
"x86_64-pc-windows-gnullvm": "{\"NINJA\":\"$(execpath @ninja_1.11.0_win//:ninja_bin)\"}",
},
crate = "webrtc-audio-processing-sys",
gen_build_script = "on",
patch_args = ["-p1"],
patches = [
"//patches:webrtc-audio-processing-sys_meson_env.patch",
"//patches:webrtc-audio-processing-sys_logging_macos_sdk.patch",
],
)
inject_repo(crate, "meson", "ninja_1.11.0_linux", "ninja_1.12.1_linux_aarch64", "ninja_1.11.0_mac", "ninja_1.11.0_win")
bazel_dep(name = "v8", version = "14.6.202.9")
archive_override(
module_name = "v8",

68
MODULE.bazel.lock generated

File diff suppressed because one or more lines are too long

852
codex-rs/Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -239,6 +239,7 @@ image = { version = "^0.25.9", default-features = false }
include_dir = "0.7.4"
indexmap = "2.12.0"
insta = "1.46.3"
interceptor = "0.17.1"
inventory = "0.3.19"
itertools = "0.14.0"
jsonwebtoken = "9.3.1"
@@ -255,6 +256,7 @@ notify = "8.2.0"
nucleo = { git = "https://github.com/helix-editor/nucleo.git", rev = "4253de9faabb4e5c6d81d946a5e35a90f87347ee" }
once_cell = "1.20.2"
openssl-sys = "*"
opus-rs = "0.1.11"
opentelemetry = "0.31.0"
opentelemetry-appender-tracing = "0.31.0"
opentelemetry-otlp = "0.31.0"
@@ -349,6 +351,8 @@ v8 = "=146.4.0"
vt100 = "0.16.2"
walkdir = "2.5.0"
webbrowser = "1.0"
webrtc = "0.17.1"
webrtc-audio-processing = { version = "~2.0", features = ["bundled"] }
which = "8"
wildmatch = "2.6.1"
zip = "2.4.2"

View File

@@ -3,6 +3,8 @@ use anyhow::Result;
use app_test_support::McpProcess;
use app_test_support::create_mock_responses_server_sequence_unchecked;
use app_test_support::to_response;
use base64::Engine;
use base64::engine::general_purpose::STANDARD as BASE64_STANDARD;
use codex_app_server_protocol::JSONRPCError;
use codex_app_server_protocol::JSONRPCResponse;
use codex_app_server_protocol::LoginAccountResponse;
@@ -40,6 +42,16 @@ use tokio::time::timeout;
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10);
const STARTUP_CONTEXT_HEADER: &str = "Startup context from Codex.";
fn realtime_pcm_test_tone_20ms_base64() -> String {
let pcm_bytes: Vec<u8> = (0..480)
.flat_map(|index| {
let sample = if index % 2 == 0 { 1024_i16 } else { -1024_i16 };
sample.to_le_bytes()
})
.collect();
BASE64_STANDARD.encode(pcm_bytes)
}
#[tokio::test]
async fn realtime_conversation_streams_v2_notifications() -> Result<()> {
skip_if_no_network!(Ok(()));
@@ -155,7 +167,7 @@ async fn realtime_conversation_streams_v2_notifications() -> Result<()> {
.send_thread_realtime_append_audio_request(ThreadRealtimeAppendAudioParams {
thread_id: started.thread_id.clone(),
audio: ThreadRealtimeAudioChunk {
data: "BQYH".to_string(),
data: realtime_pcm_test_tone_20ms_base64(),
sample_rate: 24_000,
num_channels: 1,
samples_per_channel: Some(480),

View File

@@ -14,17 +14,21 @@ codex-protocol = { workspace = true }
codex-utils-rustls-provider = { workspace = true }
futures = { workspace = true }
http = { workspace = true }
interceptor = { workspace = true }
opus-rs = { workspace = true }
reqwest = { workspace = true, features = ["multipart"] }
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true, features = ["macros", "net", "rt", "sync", "time"] }
tokio-tungstenite = { workspace = true }
tungstenite = { workspace = true }
tracing = { workspace = true }
eventsource-stream = { workspace = true }
regex-lite = { workspace = true }
tokio-util = { workspace = true, features = ["codec"] }
tungstenite = { workspace = true }
url = { workspace = true }
webrtc = { workspace = true }
[dev-dependencies]
anyhow = { workspace = true }
@@ -32,7 +36,6 @@ assert_matches = { workspace = true }
pretty_assertions = { workspace = true }
tokio-test = { workspace = true }
wiremock = { workspace = true }
reqwest = { workspace = true }
[lints]
workspace = true

File diff suppressed because it is too large Load Diff

View File

@@ -1,11 +1,11 @@
use crate::endpoint::realtime_websocket::methods_v1::conversation_handoff_append_message as v1_conversation_handoff_append_message;
use crate::endpoint::realtime_websocket::methods_v1::conversation_item_create_message as v1_conversation_item_create_message;
use crate::endpoint::realtime_websocket::methods_v1::session_update_session as v1_session_update_session;
use crate::endpoint::realtime_websocket::methods_v1::websocket_intent as v1_websocket_intent;
use crate::endpoint::realtime_websocket::methods_v1::webrtc_intent as v1_webrtc_intent;
use crate::endpoint::realtime_websocket::methods_v2::conversation_handoff_append_message as v2_conversation_handoff_append_message;
use crate::endpoint::realtime_websocket::methods_v2::conversation_item_create_message as v2_conversation_item_create_message;
use crate::endpoint::realtime_websocket::methods_v2::session_update_session as v2_session_update_session;
use crate::endpoint::realtime_websocket::methods_v2::websocket_intent as v2_websocket_intent;
use crate::endpoint::realtime_websocket::methods_v2::webrtc_intent as v2_webrtc_intent;
use crate::endpoint::realtime_websocket::protocol::RealtimeEventParser;
use crate::endpoint::realtime_websocket::protocol::RealtimeOutboundMessage;
use crate::endpoint::realtime_websocket::protocol::RealtimeSessionMode;
@@ -60,9 +60,9 @@ pub(super) fn session_update_session(
}
}
pub(super) fn websocket_intent(event_parser: RealtimeEventParser) -> Option<&'static str> {
pub(super) fn webrtc_intent(event_parser: RealtimeEventParser) -> Option<&'static str> {
match event_parser {
RealtimeEventParser::V1 => v1_websocket_intent(),
RealtimeEventParser::RealtimeV2 => v2_websocket_intent(),
RealtimeEventParser::V1 => v1_webrtc_intent(),
RealtimeEventParser::RealtimeV2 => v2_webrtc_intent(),
}
}

View File

@@ -62,6 +62,6 @@ pub(super) fn session_update_session(instructions: String) -> SessionUpdateSessi
}
}
pub(super) fn websocket_intent() -> Option<&'static str> {
pub(super) fn webrtc_intent() -> Option<&'static str> {
Some("quicksilver")
}

View File

@@ -127,6 +127,6 @@ pub(super) fn session_update_session(
}
}
pub(super) fn websocket_intent() -> Option<&'static str> {
pub(super) fn webrtc_intent() -> Option<&'static str> {
None
}

View File

@@ -9,10 +9,10 @@ mod protocol_v2;
pub use codex_protocol::protocol::RealtimeAudioFrame;
pub use codex_protocol::protocol::RealtimeEvent;
pub use methods::RealtimeWebsocketClient;
pub use methods::RealtimeWebsocketConnection;
pub use methods::RealtimeWebsocketEvents;
pub use methods::RealtimeWebsocketWriter;
pub use methods::RealtimeWebRtcClient;
pub use methods::RealtimeWebRtcConnection;
pub use methods::RealtimeWebRtcEvents;
pub use methods::RealtimeWebRtcWriter;
pub use protocol::RealtimeEventParser;
pub use protocol::RealtimeSessionConfig;
pub use protocol::RealtimeSessionMode;

View File

@@ -32,8 +32,6 @@ pub struct RealtimeSessionConfig {
#[derive(Debug, Clone, Serialize)]
#[serde(tag = "type")]
pub(super) enum RealtimeOutboundMessage {
#[serde(rename = "input_audio_buffer.append")]
InputAudioBufferAppend { audio: String },
#[serde(rename = "conversation.handoff.append")]
ConversationHandoffAppend {
handoff_id: String,

View File

@@ -35,7 +35,10 @@ pub(super) fn parse_realtime_event_v1(payload: &str) -> Option<RealtimeEvent> {
.get("samples_per_channel")
.and_then(Value::as_u64)
.and_then(|value| u32::try_from(value).ok()),
item_id: None,
item_id: parsed
.get("item_id")
.and_then(Value::as_str)
.map(str::to_string),
}))
}
"conversation.input_transcript.delta" => {

View File

@@ -34,8 +34,8 @@ pub use crate::endpoint::models::ModelsClient;
pub use crate::endpoint::realtime_websocket::RealtimeEventParser;
pub use crate::endpoint::realtime_websocket::RealtimeSessionConfig;
pub use crate::endpoint::realtime_websocket::RealtimeSessionMode;
pub use crate::endpoint::realtime_websocket::RealtimeWebsocketClient;
pub use crate::endpoint::realtime_websocket::RealtimeWebsocketConnection;
pub use crate::endpoint::realtime_websocket::RealtimeWebRtcClient;
pub use crate::endpoint::realtime_websocket::RealtimeWebRtcConnection;
pub use crate::endpoint::responses::ResponsesClient;
pub use crate::endpoint::responses::ResponsesOptions;
pub use crate::endpoint::responses_websocket::ResponsesWebsocketClient;

View File

@@ -1,460 +0,0 @@
use std::collections::HashMap;
use std::future::Future;
use std::time::Duration;
use codex_api::RealtimeAudioFrame;
use codex_api::RealtimeEvent;
use codex_api::RealtimeEventParser;
use codex_api::RealtimeSessionConfig;
use codex_api::RealtimeSessionMode;
use codex_api::RealtimeWebsocketClient;
use codex_api::provider::Provider;
use codex_api::provider::RetryConfig;
use codex_protocol::protocol::RealtimeHandoffRequested;
use futures::SinkExt;
use futures::StreamExt;
use http::HeaderMap;
use serde_json::Value;
use serde_json::json;
use tokio::net::TcpListener;
use tokio_tungstenite::accept_async;
use tokio_tungstenite::tungstenite::Message;
type RealtimeWsStream = tokio_tungstenite::WebSocketStream<tokio::net::TcpStream>;
async fn spawn_realtime_ws_server<Handler, Fut>(
handler: Handler,
) -> (String, tokio::task::JoinHandle<()>)
where
Handler: FnOnce(RealtimeWsStream) -> Fut + Send + 'static,
Fut: Future<Output = ()> + Send + 'static,
{
let listener = match TcpListener::bind("127.0.0.1:0").await {
Ok(listener) => listener,
Err(err) => panic!("failed to bind test websocket listener: {err}"),
};
let addr = match listener.local_addr() {
Ok(addr) => addr.to_string(),
Err(err) => panic!("failed to read local websocket listener address: {err}"),
};
let server = tokio::spawn(async move {
let (stream, _) = match listener.accept().await {
Ok(stream) => stream,
Err(err) => panic!("failed to accept test websocket connection: {err}"),
};
let ws = match accept_async(stream).await {
Ok(ws) => ws,
Err(err) => panic!("failed to complete websocket handshake: {err}"),
};
handler(ws).await;
});
(addr, server)
}
fn test_provider(base_url: String) -> Provider {
Provider {
name: "test".to_string(),
base_url,
query_params: Some(HashMap::new()),
headers: HeaderMap::new(),
retry: RetryConfig {
max_attempts: 1,
base_delay: Duration::from_millis(1),
retry_429: false,
retry_5xx: false,
retry_transport: false,
},
stream_idle_timeout: Duration::from_secs(5),
}
}
#[tokio::test]
async fn realtime_ws_e2e_session_create_and_event_flow() {
let (addr, server) = spawn_realtime_ws_server(|mut ws: RealtimeWsStream| async move {
let first = ws
.next()
.await
.expect("first msg")
.expect("first msg ok")
.into_text()
.expect("text");
let first_json: Value = serde_json::from_str(&first).expect("json");
assert_eq!(first_json["type"], "session.update");
assert_eq!(
first_json["session"]["type"],
Value::String("quicksilver".to_string())
);
assert_eq!(
first_json["session"]["instructions"],
Value::String("backend prompt".to_string())
);
assert_eq!(
first_json["session"]["audio"]["input"]["format"]["type"],
Value::String("audio/pcm".to_string())
);
assert_eq!(
first_json["session"]["audio"]["input"]["format"]["rate"],
Value::from(24_000)
);
ws.send(Message::Text(
json!({
"type": "session.updated",
"session": {"id": "sess_mock", "instructions": "backend prompt"}
})
.to_string()
.into(),
))
.await
.expect("send session.updated");
let second = ws
.next()
.await
.expect("second msg")
.expect("second msg ok")
.into_text()
.expect("text");
let second_json: Value = serde_json::from_str(&second).expect("json");
assert_eq!(second_json["type"], "input_audio_buffer.append");
ws.send(Message::Text(
json!({
"type": "conversation.output_audio.delta",
"delta": "AQID",
"sample_rate": 48000,
"channels": 1
})
.to_string()
.into(),
))
.await
.expect("send audio out");
})
.await;
let client = RealtimeWebsocketClient::new(test_provider(format!("http://{addr}")));
let connection = client
.connect(
RealtimeSessionConfig {
instructions: "backend prompt".to_string(),
model: Some("realtime-test-model".to_string()),
session_id: Some("conv_123".to_string()),
event_parser: RealtimeEventParser::V1,
session_mode: RealtimeSessionMode::Conversational,
},
HeaderMap::new(),
HeaderMap::new(),
)
.await
.expect("connect");
let created = connection
.next_event()
.await
.expect("next event")
.expect("event");
assert_eq!(
created,
RealtimeEvent::SessionUpdated {
session_id: "sess_mock".to_string(),
instructions: Some("backend prompt".to_string()),
}
);
connection
.send_audio_frame(RealtimeAudioFrame {
data: "AQID".to_string(),
sample_rate: 48000,
num_channels: 1,
samples_per_channel: Some(960),
item_id: None,
})
.await
.expect("send audio");
let audio_event = connection
.next_event()
.await
.expect("next event")
.expect("event");
assert_eq!(
audio_event,
RealtimeEvent::AudioOut(RealtimeAudioFrame {
data: "AQID".to_string(),
sample_rate: 48000,
num_channels: 1,
samples_per_channel: None,
item_id: None,
})
);
connection.close().await.expect("close");
server.await.expect("server task");
}
#[tokio::test]
async fn realtime_ws_e2e_send_while_next_event_waits() {
let (addr, server) = spawn_realtime_ws_server(|mut ws: RealtimeWsStream| async move {
let first = ws
.next()
.await
.expect("first msg")
.expect("first msg ok")
.into_text()
.expect("text");
let first_json: Value = serde_json::from_str(&first).expect("json");
assert_eq!(first_json["type"], "session.update");
let second = ws
.next()
.await
.expect("second msg")
.expect("second msg ok")
.into_text()
.expect("text");
let second_json: Value = serde_json::from_str(&second).expect("json");
assert_eq!(second_json["type"], "input_audio_buffer.append");
ws.send(Message::Text(
json!({
"type": "session.updated",
"session": {"id": "sess_after_send", "instructions": "backend prompt"}
})
.to_string()
.into(),
))
.await
.expect("send session.updated");
})
.await;
let client = RealtimeWebsocketClient::new(test_provider(format!("http://{addr}")));
let connection = client
.connect(
RealtimeSessionConfig {
instructions: "backend prompt".to_string(),
model: Some("realtime-test-model".to_string()),
session_id: Some("conv_123".to_string()),
event_parser: RealtimeEventParser::V1,
session_mode: RealtimeSessionMode::Conversational,
},
HeaderMap::new(),
HeaderMap::new(),
)
.await
.expect("connect");
let (send_result, next_result) = tokio::join!(
async {
tokio::time::timeout(
Duration::from_millis(200),
connection.send_audio_frame(RealtimeAudioFrame {
data: "AQID".to_string(),
sample_rate: 48000,
num_channels: 1,
samples_per_channel: Some(960),
item_id: None,
}),
)
.await
},
connection.next_event()
);
send_result
.expect("send should not block on next_event")
.expect("send audio");
let next_event = next_result.expect("next event").expect("event");
assert_eq!(
next_event,
RealtimeEvent::SessionUpdated {
session_id: "sess_after_send".to_string(),
instructions: Some("backend prompt".to_string()),
}
);
connection.close().await.expect("close");
server.await.expect("server task");
}
#[tokio::test]
async fn realtime_ws_e2e_disconnected_emitted_once() {
let (addr, server) = spawn_realtime_ws_server(|mut ws: RealtimeWsStream| async move {
let first = ws
.next()
.await
.expect("first msg")
.expect("first msg ok")
.into_text()
.expect("text");
let first_json: Value = serde_json::from_str(&first).expect("json");
assert_eq!(first_json["type"], "session.update");
ws.send(Message::Close(None)).await.expect("send close");
})
.await;
let client = RealtimeWebsocketClient::new(test_provider(format!("http://{addr}")));
let connection = client
.connect(
RealtimeSessionConfig {
instructions: "backend prompt".to_string(),
model: Some("realtime-test-model".to_string()),
session_id: Some("conv_123".to_string()),
event_parser: RealtimeEventParser::V1,
session_mode: RealtimeSessionMode::Conversational,
},
HeaderMap::new(),
HeaderMap::new(),
)
.await
.expect("connect");
let first = connection.next_event().await.expect("next event");
assert_eq!(first, None);
let second = connection.next_event().await.expect("next event");
assert_eq!(second, None);
server.await.expect("server task");
}
#[tokio::test]
async fn realtime_ws_e2e_ignores_unknown_text_events() {
let (addr, server) = spawn_realtime_ws_server(|mut ws: RealtimeWsStream| async move {
let first = ws
.next()
.await
.expect("first msg")
.expect("first msg ok")
.into_text()
.expect("text");
let first_json: Value = serde_json::from_str(&first).expect("json");
assert_eq!(first_json["type"], "session.update");
ws.send(Message::Text(
json!({
"type": "response.created",
"response": {"id": "resp_unknown"}
})
.to_string()
.into(),
))
.await
.expect("send unknown event");
ws.send(Message::Text(
json!({
"type": "session.updated",
"session": {"id": "sess_after_unknown", "instructions": "backend prompt"}
})
.to_string()
.into(),
))
.await
.expect("send session.updated");
})
.await;
let client = RealtimeWebsocketClient::new(test_provider(format!("http://{addr}")));
let connection = client
.connect(
RealtimeSessionConfig {
instructions: "backend prompt".to_string(),
model: Some("realtime-test-model".to_string()),
session_id: Some("conv_123".to_string()),
event_parser: RealtimeEventParser::V1,
session_mode: RealtimeSessionMode::Conversational,
},
HeaderMap::new(),
HeaderMap::new(),
)
.await
.expect("connect");
let event = connection
.next_event()
.await
.expect("next event")
.expect("event");
assert_eq!(
event,
RealtimeEvent::SessionUpdated {
session_id: "sess_after_unknown".to_string(),
instructions: Some("backend prompt".to_string()),
}
);
connection.close().await.expect("close");
server.await.expect("server task");
}
#[tokio::test]
async fn realtime_ws_e2e_realtime_v2_parser_emits_handoff_requested() {
let (addr, server) = spawn_realtime_ws_server(|mut ws: RealtimeWsStream| async move {
let first = ws
.next()
.await
.expect("first msg")
.expect("first msg ok")
.into_text()
.expect("text");
let first_json: Value = serde_json::from_str(&first).expect("json");
assert_eq!(first_json["type"], "session.update");
ws.send(Message::Text(
json!({
"type": "conversation.item.done",
"item": {
"id": "item_123",
"type": "function_call",
"name": "codex",
"call_id": "call_123",
"arguments": "{\"prompt\":\"delegate now\"}"
}
})
.to_string()
.into(),
))
.await
.expect("send function call");
})
.await;
let client = RealtimeWebsocketClient::new(test_provider(format!("http://{addr}")));
let connection = client
.connect(
RealtimeSessionConfig {
instructions: "backend prompt".to_string(),
model: Some("realtime-test-model".to_string()),
session_id: Some("conv_123".to_string()),
event_parser: RealtimeEventParser::RealtimeV2,
session_mode: RealtimeSessionMode::Conversational,
},
HeaderMap::new(),
HeaderMap::new(),
)
.await
.expect("connect");
let event = connection
.next_event()
.await
.expect("next event")
.expect("event");
assert_eq!(
event,
RealtimeEvent::HandoffRequested(RealtimeHandoffRequested {
handoff_id: "call_123".to_string(),
item_id: "item_123".to_string(),
input_transcript: "delegate now".to_string(),
active_transcript: Vec::new(),
})
);
connection.close().await.expect("close");
server.await.expect("server task");
}

View File

@@ -13,15 +13,14 @@ use codex_api::RealtimeEvent;
use codex_api::RealtimeEventParser;
use codex_api::RealtimeSessionConfig;
use codex_api::RealtimeSessionMode;
use codex_api::RealtimeWebsocketClient;
use codex_api::RealtimeWebRtcClient;
use codex_api::api_bridge::CoreAuthProvider;
use codex_api::api_bridge::map_api_error;
use codex_api::endpoint::realtime_websocket::RealtimeWebsocketEvents;
use codex_api::endpoint::realtime_websocket::RealtimeWebsocketWriter;
use codex_app_server_protocol::AuthMode;
use codex_api::endpoint::realtime_websocket::RealtimeWebRtcEvents;
use codex_api::endpoint::realtime_websocket::RealtimeWebRtcWriter;
use codex_login::CodexAuth;
use codex_login::api_bridge::auth_provider_from_auth;
use codex_login::default_client::default_headers;
use codex_login::read_openai_api_key_from_env;
use codex_model_provider_info::ModelProviderInfo;
use codex_protocol::error::CodexErr;
use codex_protocol::error::Result as CodexResult;
use codex_protocol::protocol::CodexErrorInfo;
@@ -37,7 +36,6 @@ use codex_protocol::protocol::RealtimeConversationStartedEvent;
use codex_protocol::protocol::RealtimeHandoffRequested;
use http::HeaderMap;
use http::HeaderValue;
use http::header::AUTHORIZATION;
use serde_json::Value;
use serde_json::json;
use std::sync::Arc;
@@ -107,8 +105,8 @@ struct OutputAudioState {
}
struct RealtimeInputTask {
writer: RealtimeWebsocketWriter,
events: RealtimeWebsocketEvents,
writer: RealtimeWebRtcWriter,
events: RealtimeWebRtcEvents,
user_text_rx: Receiver<String>,
handoff_output_rx: Receiver<HandoffOutput>,
audio_rx: Receiver<RealtimeAudioFrame>,
@@ -132,7 +130,7 @@ impl RealtimeHandoffState {
struct ConversationState {
audio_tx: Sender<RealtimeAudioFrame>,
user_text_tx: Sender<String>,
writer: RealtimeWebsocketWriter,
writer: RealtimeWebRtcWriter,
handoff: RealtimeHandoffState,
input_task: JoinHandle<()>,
fanout_task: Option<JoinHandle<()>>,
@@ -157,6 +155,7 @@ impl RealtimeConversationManager {
pub(crate) async fn start(
&self,
api_provider: ApiProvider,
api_auth: CoreAuthProvider,
extra_headers: Option<HeaderMap>,
session_config: RealtimeSessionConfig,
) -> CodexResult<(Receiver<RealtimeEvent>, Arc<AtomicBool>)> {
@@ -172,7 +171,7 @@ impl RealtimeConversationManager {
RealtimeEventParser::RealtimeV2 => RealtimeSessionKind::V2,
};
let client = RealtimeWebsocketClient::new(api_provider);
let client = RealtimeWebRtcClient::new(api_provider, api_auth);
let connection = client
.connect(
session_config,
@@ -392,6 +391,7 @@ async fn stop_conversation_state(
fanout_task_stop: RealtimeFanoutTaskStop,
) {
state.realtime_active.store(false, Ordering::Relaxed);
let _ = state.writer.close().await;
state.input_task.abort();
let _ = state.input_task.await;
@@ -443,6 +443,7 @@ pub(crate) async fn handle_start(
struct PreparedRealtimeConversationStart {
api_provider: ApiProvider,
api_auth: CoreAuthProvider,
extra_headers: Option<HeaderMap>,
requested_session_id: Option<String>,
version: RealtimeWsVersion,
@@ -460,8 +461,8 @@ async fn prepare_realtime_start(
.auth_manager()
.unwrap_or_else(|| Arc::clone(&sess.services.auth_manager));
let auth = auth_manager.auth().await;
let realtime_api_key = realtime_api_key(auth.as_ref(), &provider)?;
let mut api_provider = provider.to_api_provider(Some(AuthMode::ApiKey))?;
let api_auth = auth_provider_from_auth(auth.clone(), &provider)?;
let mut api_provider = provider.to_api_provider(auth.as_ref().map(CodexAuth::auth_mode))?;
let config = sess.get_config().await;
if let Some(realtime_ws_base_url) = &config.experimental_realtime_ws_base_url {
api_provider.base_url = realtime_ws_base_url.clone();
@@ -501,10 +502,10 @@ async fn prepare_realtime_start(
event_parser,
session_mode,
};
let extra_headers =
realtime_request_headers(requested_session_id.as_deref(), realtime_api_key.as_str())?;
let extra_headers = realtime_request_headers(requested_session_id.as_deref())?;
Ok(PreparedRealtimeConversationStart {
api_provider,
api_auth,
extra_headers,
requested_session_id,
version,
@@ -519,6 +520,7 @@ async fn handle_start_inner(
) -> CodexResult<()> {
let PreparedRealtimeConversationStart {
api_provider,
api_auth,
extra_headers,
requested_session_id,
version,
@@ -527,7 +529,7 @@ async fn handle_start_inner(
info!("starting realtime conversation");
let (events_rx, realtime_active) = sess
.conversation
.start(api_provider, extra_headers, session_config)
.start(api_provider, api_auth, extra_headers, session_config)
.await?;
info!("realtime conversation started");
@@ -632,36 +634,7 @@ fn realtime_text_from_handoff_request(handoff: &RealtimeHandoffRequested) -> Opt
.or((!handoff.input_transcript.is_empty()).then_some(handoff.input_transcript.clone()))
}
fn realtime_api_key(auth: Option<&CodexAuth>, provider: &ModelProviderInfo) -> CodexResult<String> {
if let Some(api_key) = provider.api_key()? {
return Ok(api_key);
}
if let Some(token) = provider.experimental_bearer_token.clone() {
return Ok(token);
}
if let Some(api_key) = auth.and_then(CodexAuth::api_key) {
return Ok(api_key.to_string());
}
// TODO(aibrahim): Remove this temporary fallback once realtime auth no longer
// requires API key auth for ChatGPT/SIWC sessions.
if provider.is_openai()
&& let Some(api_key) = read_openai_api_key_from_env()
{
return Ok(api_key);
}
Err(CodexErr::InvalidRequest(
"realtime conversation requires API key auth".to_string(),
))
}
fn realtime_request_headers(
session_id: Option<&str>,
api_key: &str,
) -> CodexResult<Option<HeaderMap>> {
fn realtime_request_headers(session_id: Option<&str>) -> CodexResult<Option<HeaderMap>> {
let mut headers = HeaderMap::new();
if let Some(session_id) = session_id
@@ -670,11 +643,6 @@ fn realtime_request_headers(
headers.insert("x-session-id", session_id);
}
let auth_value = HeaderValue::from_str(&format!("Bearer {api_key}")).map_err(|err| {
CodexErr::InvalidRequest(format!("invalid realtime api key header: {err}"))
})?;
headers.insert(AUTHORIZATION, auth_value);
Ok(Some(headers))
}

View File

@@ -3,7 +3,11 @@ load("//:defs.bzl", "codex_rust_crate")
codex_rust_crate(
name = "common",
crate_name = "core_test_support",
crate_srcs = glob(["*.rs"]),
crate_srcs = glob([
"*.rs",
"responses/*.rs",
"responses/**/*.rs",
]),
lib_data_extra = [
"//codex-rs/core:model_availability_nux_fixtures",
],

View File

@@ -24,11 +24,13 @@ codex-models-manager = { workspace = true }
codex-protocol = { workspace = true }
codex-utils-absolute-path = { workspace = true }
codex-utils-cargo-bin = { workspace = true }
codex-utils-rustls-provider = { workspace = true }
ctor = { workspace = true }
futures = { workspace = true }
notify = { workspace = true }
opentelemetry = { workspace = true }
opentelemetry_sdk = { workspace = true }
opus-rs = { workspace = true }
regex-lite = { workspace = true }
serde_json = { workspace = true }
tempfile = { workspace = true }
@@ -38,6 +40,7 @@ tracing = { workspace = true }
tracing-opentelemetry = { workspace = true }
tracing-subscriber = { workspace = true }
walkdir = { workspace = true }
webrtc = { workspace = true }
wiremock = { workspace = true }
shlex = { workspace = true }
zstd = { workspace = true }

View File

@@ -37,6 +37,8 @@ use wiremock::matchers::path_regex;
use crate::test_codex::ApplyPatchModelOutput;
mod realtime_webrtc_server;
#[derive(Debug, Clone)]
pub struct ResponseMock {
requests: Arc<Mutex<Vec<ResponsesRequest>>>,
@@ -1238,6 +1240,28 @@ pub async fn start_websocket_server_with_headers(
tokio::time::sleep(delay).await;
}
if realtime_webrtc_server::accept_is_http_post(&stream).await {
let connection_index = {
let mut log = requests.lock().unwrap();
log.push(Vec::new());
log.len() - 1
};
realtime_webrtc_server::serve_connection(
stream,
connection,
connection_index,
Arc::clone(&requests),
Arc::clone(&handshakes),
Arc::clone(&request_log),
)
.await;
if connections.lock().unwrap().is_empty() {
return;
}
continue;
}
let response_headers = connection.response_headers.clone();
let handshake_log = Arc::clone(&handshakes);
let callback = move |req: &Request, mut response: Response| {

View File

@@ -0,0 +1,431 @@
use std::collections::VecDeque;
use std::sync::Arc;
use std::sync::Mutex;
use std::time::Duration;
use base64::Engine;
use base64::engine::general_purpose::STANDARD as BASE64_STANDARD;
use codex_utils_rustls_provider::ensure_rustls_crypto_provider;
use opus_rs::OpusDecoder;
use serde_json::Value;
use tokio::io::AsyncReadExt;
use tokio::io::AsyncWriteExt;
use tokio::net::TcpStream;
use tokio::sync::Notify;
use tokio::sync::mpsc;
use tokio::sync::oneshot;
use tokio::time::timeout;
use tracing::debug;
use tracing::warn;
use webrtc::api::APIBuilder;
use webrtc::api::interceptor_registry::register_default_interceptors;
use webrtc::api::media_engine::MediaEngine;
use webrtc::data_channel::RTCDataChannel;
use webrtc::data_channel::data_channel_message::DataChannelMessage;
use webrtc::interceptor::registry::Registry;
use webrtc::peer_connection::RTCPeerConnection;
use webrtc::peer_connection::configuration::RTCConfiguration;
use webrtc::peer_connection::peer_connection_state::RTCPeerConnectionState;
use webrtc::peer_connection::sdp::session_description::RTCSessionDescription;
use webrtc::rtp_transceiver::rtp_codec::RTPCodecType;
use webrtc::track::track_remote::TrackRemote;
use super::WebSocketConnectionConfig;
use super::WebSocketHandshake;
use super::WebSocketRequest;
const HTTP_HEADER_TERMINATOR: &[u8] = b"\r\n\r\n";
const REALTIME_AUDIO_CHANNELS: u8 = 1;
const REALTIME_AUDIO_SAMPLE_RATE: u32 = 24_000;
const REALTIME_DATA_CHANNEL_TIMEOUT: Duration = Duration::from_secs(10);
const REALTIME_DATA_CHANNEL_FLUSH_DELAY: Duration = Duration::from_millis(25);
const REALTIME_MAX_DECODED_SAMPLES_PER_CHANNEL: usize = 5760;
pub(super) async fn accept_is_http_post(stream: &TcpStream) -> bool {
let mut method = [0u8; 4];
matches!(stream.peek(&mut method).await, Ok(4)) && method == *b"POST"
}
pub(super) async fn serve_connection(
mut stream: TcpStream,
connection: WebSocketConnectionConfig,
connection_index: usize,
requests: Arc<Mutex<Vec<Vec<WebSocketRequest>>>>,
handshakes: Arc<Mutex<Vec<WebSocketHandshake>>>,
request_log_updated: Arc<Notify>,
) {
let Some(request) = read_http_request(&mut stream).await else {
return;
};
handshakes.lock().unwrap().push(WebSocketHandshake {
uri: request.uri,
headers: request.headers,
});
let Some(offer_sdp) = parse_multipart_field(&request.body, &request.boundary, "sdp") else {
let _ = stream
.write_all(b"HTTP/1.1 400 Bad Request\r\nContent-Length: 0\r\n\r\n")
.await;
return;
};
let Some(session) = start_session(
offer_sdp,
connection,
connection_index,
requests,
request_log_updated,
)
.await
else {
let _ = stream
.write_all(b"HTTP/1.1 500 Internal Server Error\r\nContent-Length: 0\r\n\r\n")
.await;
return;
};
let response = format!(
"HTTP/1.1 200 OK\r\nContent-Type: application/sdp\r\nContent-Length: {}\r\n\r\n{}",
session.answer_sdp.len(),
session.answer_sdp
);
if stream.write_all(response.as_bytes()).await.is_err() {
return;
}
let _ = session.done_rx.await;
let _ = session.peer_connection.close().await;
}
struct HttpRealtimeRequest {
uri: String,
headers: Vec<(String, String)>,
boundary: String,
body: Vec<u8>,
}
struct RealtimeSession {
answer_sdp: String,
peer_connection: Arc<RTCPeerConnection>,
done_rx: oneshot::Receiver<()>,
}
async fn read_http_request(stream: &mut TcpStream) -> Option<HttpRealtimeRequest> {
let mut received = Vec::new();
let headers_end = loop {
if let Some(headers_end) = received
.windows(HTTP_HEADER_TERMINATOR.len())
.position(|window| window == HTTP_HEADER_TERMINATOR)
{
break headers_end + HTTP_HEADER_TERMINATOR.len();
}
let mut chunk = [0u8; 1024];
let read = stream.read(&mut chunk).await.ok()?;
if read == 0 {
return None;
}
received.extend_from_slice(&chunk[..read]);
};
let header_text = std::str::from_utf8(&received[..headers_end]).ok()?;
let mut lines = header_text.split("\r\n").filter(|line| !line.is_empty());
let request_line = lines.next()?;
let mut request_line_parts = request_line.split_whitespace();
if request_line_parts.next()? != "POST" {
return None;
}
let uri = request_line_parts.next()?.to_string();
let mut headers = Vec::new();
let mut content_length = None;
let mut boundary = None;
for line in lines {
let Some((name, value)) = line.split_once(':') else {
continue;
};
let name = name.trim().to_string();
let value = value.trim().to_string();
if name.eq_ignore_ascii_case("content-length") {
content_length = value.parse::<usize>().ok();
}
if name.eq_ignore_ascii_case("content-type") {
boundary = value
.split(';')
.map(str::trim)
.find_map(|part| part.strip_prefix("boundary="))
.map(|boundary| boundary.trim_matches('"').to_string());
}
headers.push((name, value));
}
let content_length = content_length?;
while received.len() - headers_end < content_length {
let mut chunk = [0u8; 1024];
let read = stream.read(&mut chunk).await.ok()?;
if read == 0 {
return None;
}
received.extend_from_slice(&chunk[..read]);
}
let body_end = headers_end + content_length;
let body = received[headers_end..body_end].to_vec();
Some(HttpRealtimeRequest {
uri,
headers,
boundary: boundary?,
body,
})
}
fn parse_multipart_field(body: &[u8], boundary: &str, field_name: &str) -> Option<String> {
let body = std::str::from_utf8(body).ok()?;
let delimiter = format!("--{boundary}");
body.split(&delimiter).find_map(|part| {
let (headers, value) = part.split_once("\r\n\r\n")?;
if !headers.contains(&format!("name=\"{field_name}\"")) {
return None;
}
Some(value.trim_end_matches("\r\n").to_string())
})
}
async fn start_session(
offer_sdp: String,
connection: WebSocketConnectionConfig,
connection_index: usize,
requests: Arc<Mutex<Vec<Vec<WebSocketRequest>>>>,
request_log_updated: Arc<Notify>,
) -> Option<RealtimeSession> {
let peer_connection = create_peer_connection().await?;
let (tx_request, rx_request) = mpsc::unbounded_channel::<Value>();
let (tx_data_channel, rx_data_channel) = oneshot::channel::<Arc<RTCDataChannel>>();
let tx_data_channel = Mutex::new(Some(tx_data_channel));
let tx_data_channel_request = tx_request.clone();
let connection_closed = Arc::new(Notify::new());
let on_data_channel_closed = Arc::clone(&connection_closed);
let on_peer_connection_closed = Arc::clone(&connection_closed);
peer_connection.on_data_channel(Box::new(move |data_channel| {
let tx_request = tx_data_channel_request.clone();
let on_data_channel_closed = Arc::clone(&on_data_channel_closed);
if let Ok(mut tx_data_channel) = tx_data_channel.lock()
&& let Some(tx_data_channel) = tx_data_channel.take()
{
let _ = tx_data_channel.send(Arc::clone(&data_channel));
}
data_channel.on_close(Box::new(move || {
let on_data_channel_closed = Arc::clone(&on_data_channel_closed);
Box::pin(async move {
on_data_channel_closed.notify_waiters();
})
}));
data_channel.on_message(Box::new(move |message: DataChannelMessage| {
let tx_request = tx_request.clone();
Box::pin(async move {
if !message.is_string {
return;
}
let Ok(text) = String::from_utf8(message.data.to_vec()) else {
return;
};
let Ok(body) = serde_json::from_str::<Value>(&text) else {
return;
};
let _ = tx_request.send(body);
})
}));
Box::pin(async {})
}));
peer_connection.on_peer_connection_state_change(Box::new(move |state| {
let on_peer_connection_closed = Arc::clone(&on_peer_connection_closed);
Box::pin(async move {
if matches!(
state,
RTCPeerConnectionState::Closed
| RTCPeerConnectionState::Disconnected
| RTCPeerConnectionState::Failed
) {
on_peer_connection_closed.notify_waiters();
}
})
}));
register_remote_audio_handler(&peer_connection, tx_request.clone());
let mut gather_complete = peer_connection.gathering_complete_promise().await;
let offer = RTCSessionDescription::offer(offer_sdp).ok()?;
peer_connection.set_remote_description(offer).await.ok()?;
let answer = peer_connection.create_answer(None).await.ok()?;
peer_connection.set_local_description(answer).await.ok()?;
let _ = gather_complete.recv().await;
let answer_sdp = peer_connection.local_description().await?.sdp;
let (done_tx, done_rx) = oneshot::channel();
tokio::spawn(async move {
serve_scripted_requests(
connection,
connection_index,
requests,
request_log_updated,
rx_request,
rx_data_channel,
connection_closed,
)
.await;
let _ = done_tx.send(());
});
Some(RealtimeSession {
answer_sdp,
peer_connection,
done_rx,
})
}
async fn create_peer_connection() -> Option<Arc<RTCPeerConnection>> {
ensure_rustls_crypto_provider();
let mut media_engine = MediaEngine::default();
media_engine.register_default_codecs().ok()?;
let registry = register_default_interceptors(Registry::new(), &mut media_engine).ok()?;
let api = APIBuilder::new()
.with_media_engine(media_engine)
.with_interceptor_registry(registry)
.build();
api.new_peer_connection(RTCConfiguration::default())
.await
.map(Arc::new)
.ok()
}
fn register_remote_audio_handler(
peer_connection: &Arc<RTCPeerConnection>,
tx_request: mpsc::UnboundedSender<Value>,
) {
peer_connection.on_track(Box::new(move |track, _, _| {
let tx_request = tx_request.clone();
Box::pin(async move {
if track.kind() != RTPCodecType::Audio {
return;
}
pump_remote_audio_track(track, tx_request).await;
})
}));
}
async fn pump_remote_audio_track(
track: Arc<TrackRemote>,
tx_request: mpsc::UnboundedSender<Value>,
) {
let mut decoder = match OpusDecoder::new(24_000, usize::from(REALTIME_AUDIO_CHANNELS)) {
Ok(decoder) => decoder,
Err(err) => {
warn!(%err, "failed to initialize realtime Opus decoder in test server");
return;
}
};
debug!("initialized realtime Opus decoder in test server");
let mut decoded = vec![0.0f32; REALTIME_MAX_DECODED_SAMPLES_PER_CHANNEL];
while let Ok((packet, _)) = track.read_rtp().await {
if packet.payload.is_empty() {
continue;
}
let samples_per_channel = match decoder.decode(
&packet.payload,
REALTIME_MAX_DECODED_SAMPLES_PER_CHANNEL,
&mut decoded,
) {
Ok(samples_per_channel) => samples_per_channel,
Err(err) => {
warn!(
%err,
payload_len = packet.payload.len(),
"failed to decode realtime Opus packet in test server"
);
return;
}
};
if samples_per_channel == 0 {
continue;
}
let mut pcm_bytes = Vec::with_capacity(samples_per_channel * 2);
for sample in &decoded[..samples_per_channel] {
pcm_bytes.extend_from_slice(&f32_to_i16(*sample).to_le_bytes());
}
let _ = tx_request.send(serde_json::json!({
"type": "input_audio_buffer.append",
"audio": BASE64_STANDARD.encode(pcm_bytes),
"sample_rate": REALTIME_AUDIO_SAMPLE_RATE,
"channels": REALTIME_AUDIO_CHANNELS,
"samples_per_channel": samples_per_channel,
}));
}
}
fn f32_to_i16(sample: f32) -> i16 {
(sample.clamp(-1.0, 1.0) * i16::MAX as f32) as i16
}
async fn serve_scripted_requests(
connection: WebSocketConnectionConfig,
connection_index: usize,
requests: Arc<Mutex<Vec<Vec<WebSocketRequest>>>>,
request_log_updated: Arc<Notify>,
mut rx_request: mpsc::UnboundedReceiver<Value>,
rx_data_channel: oneshot::Receiver<Arc<RTCDataChannel>>,
connection_closed: Arc<Notify>,
) {
let Ok(Ok(data_channel)) = timeout(REALTIME_DATA_CHANNEL_TIMEOUT, rx_data_channel).await else {
return;
};
let mut scripted_requests = VecDeque::from(connection.requests);
while let Some(request_events) = scripted_requests.pop_front() {
// WebRTC compact-remote tests often close the session before consuming every scripted
// request slot. Treat transport closure as end-of-script instead of waiting forever for
// another request that can no longer arrive.
let body = tokio::select! {
body = rx_request.recv() => body,
_ = connection_closed.notified() => None,
};
let Some(body) = body else {
break;
};
log_request(connection_index, body, &requests, &request_log_updated);
for event in &request_events {
let Ok(payload) = serde_json::to_string(event) else {
continue;
};
if data_channel.send_text(payload).await.is_err() {
return;
}
}
}
if connection.close_after_requests {
tokio::time::sleep(REALTIME_DATA_CHANNEL_FLUSH_DELAY).await;
let _ = data_channel.close().await;
}
}
fn log_request(
connection_index: usize,
body: Value,
requests: &Arc<Mutex<Vec<Vec<WebSocketRequest>>>>,
request_log_updated: &Arc<Notify>,
) {
let mut log = requests.lock().unwrap();
if log.len() <= connection_index {
log.resize_with(connection_index + 1, Vec::new);
}
if let Some(connection_log) = log.get_mut(connection_index) {
connection_log.push(WebSocketRequest { body });
}
drop(log);
request_log_updated.notify_waiters();
}

View File

@@ -1,5 +1,7 @@
use anyhow::Context;
use anyhow::Result;
use base64::Engine;
use base64::engine::general_purpose::STANDARD as BASE64_STANDARD;
use chrono::Utc;
use codex_login::CodexAuth;
use codex_login::OPENAI_API_KEY_ENV_VAR;
@@ -41,6 +43,17 @@ const MEMORY_PROMPT_PHRASE: &str =
"You have access to a memory folder with guidance from prior runs.";
const REALTIME_CONVERSATION_TEST_SUBPROCESS_ENV_VAR: &str =
"CODEX_REALTIME_CONVERSATION_TEST_SUBPROCESS";
fn realtime_pcm_test_tone_20ms_base64() -> String {
let pcm_bytes: Vec<u8> = (0..480)
.flat_map(|index| {
let sample = if index % 2 == 0 { 1024_i16 } else { -1024_i16 };
sample.to_le_bytes()
})
.collect();
BASE64_STANDARD.encode(pcm_bytes)
}
fn websocket_request_text(
request: &core_test_support::responses::WebSocketRequest,
) -> Option<String> {
@@ -207,7 +220,7 @@ async fn conversation_start_audio_text_close_round_trip() -> Result<()> {
test.codex
.submit(Op::RealtimeConversationAudio(ConversationAudioParams {
frame: RealtimeAudioFrame {
data: "AQID".to_string(),
data: realtime_pcm_test_tone_20ms_base64(),
sample_rate: 24000,
num_channels: 1,
samples_per_channel: Some(480),
@@ -254,10 +267,7 @@ async fn conversation_start_audio_text_close_round_trip() -> Result<()> {
server.handshakes()[1].header("authorization").as_deref(),
Some("Bearer dummy")
);
assert_eq!(
server.handshakes()[1].uri(),
"/v1/realtime?intent=quicksilver&model=realtime-test-model"
);
assert_eq!(server.handshakes()[1].uri(), "/v1/realtime/calls");
let mut request_types = [
connection[1].body_json()["type"]
.as_str()
@@ -426,7 +436,7 @@ async fn conversation_audio_before_start_emits_error() -> Result<()> {
test.codex
.submit(Op::RealtimeConversationAudio(ConversationAudioParams {
frame: RealtimeAudioFrame {
data: "AQID".to_string(),
data: realtime_pcm_test_tone_20ms_base64(),
sample_rate: 24000,
num_channels: 1,
samples_per_channel: Some(480),
@@ -625,7 +635,7 @@ async fn conversation_second_start_replaces_runtime() -> Result<()> {
test.codex
.submit(Op::RealtimeConversationAudio(ConversationAudioParams {
frame: RealtimeAudioFrame {
data: "AQID".to_string(),
data: realtime_pcm_test_tone_20ms_base64(),
sample_rate: 24000,
num_channels: 1,
samples_per_channel: Some(480),
@@ -1585,7 +1595,7 @@ async fn inbound_handoff_request_clears_active_transcript_after_each_handoff() -
test.codex
.submit(Op::RealtimeConversationAudio(ConversationAudioParams {
frame: RealtimeAudioFrame {
data: "AQID".to_string(),
data: realtime_pcm_test_tone_20ms_base64(),
sample_rate: 24000,
num_channels: 1,
samples_per_channel: Some(480),
@@ -2073,7 +2083,7 @@ async fn inbound_handoff_request_steers_active_turn() -> Result<()> {
test.codex
.submit(Op::RealtimeConversationAudio(ConversationAudioParams {
frame: RealtimeAudioFrame {
data: "AQID".to_string(),
data: realtime_pcm_test_tone_20ms_base64(),
sample_rate: 24000,
num_channels: 1,
samples_per_channel: Some(480),

View File

@@ -113,6 +113,7 @@ tokio-util = { workspace = true, features = ["time"] }
[target.'cfg(not(target_os = "linux"))'.dependencies]
cpal = "0.15"
webrtc-audio-processing = { workspace = true }
[target.'cfg(unix)'.dependencies]
libc = { workspace = true }

View File

@@ -155,11 +155,6 @@ impl AppServerSession {
}
}
pub(crate) fn with_remote_cwd_override(mut self, remote_cwd_override: Option<PathBuf>) -> Self {
self.remote_cwd_override = remote_cwd_override;
self
}
pub(crate) fn remote_cwd_override(&self) -> Option<&std::path::Path> {
self.remote_cwd_override.as_deref()
}

View File

@@ -1,4 +1,6 @@
use super::*;
#[cfg(not(target_os = "linux"))]
use crate::realtime_audio_processing::RealtimeAudioProcessor;
use codex_protocol::protocol::ConversationStartParams;
use codex_protocol::protocol::RealtimeAudioFrame;
use codex_protocol::protocol::RealtimeConversationClosedEvent;
@@ -30,6 +32,8 @@ pub(super) struct RealtimeConversationUiState {
#[cfg(not(target_os = "linux"))]
capture_stop_flag: Option<Arc<AtomicBool>>,
#[cfg(not(target_os = "linux"))]
audio_processor: Option<RealtimeAudioProcessor>,
#[cfg(not(target_os = "linux"))]
capture: Option<crate::voice::VoiceCapture>,
#[cfg(not(target_os = "linux"))]
audio_player: Option<crate::voice::RealtimeAudioPlayer>,
@@ -298,11 +302,11 @@ impl ChatWidget {
RealtimeEvent::SessionUpdated { session_id, .. } => {
self.realtime_conversation.session_id = Some(session_id);
}
RealtimeEvent::InputAudioSpeechStarted(_) => self.interrupt_realtime_audio_playback(),
RealtimeEvent::InputAudioSpeechStarted(_) => {}
RealtimeEvent::InputTranscriptDelta(_) => {}
RealtimeEvent::OutputTranscriptDelta(_) => {}
RealtimeEvent::AudioOut(frame) => self.enqueue_realtime_audio_out(&frame),
RealtimeEvent::ResponseCancelled(_) => self.interrupt_realtime_audio_playback(),
RealtimeEvent::ResponseCancelled(_) => {}
RealtimeEvent::ConversationItemAdded(_item) => {}
RealtimeEvent::ConversationItemDone { .. } => {}
RealtimeEvent::HandoffRequested(_) => {}
@@ -331,9 +335,26 @@ impl ChatWidget {
fn enqueue_realtime_audio_out(&mut self, frame: &RealtimeAudioFrame) {
#[cfg(not(target_os = "linux"))]
{
if !self.realtime_conversation.is_active() {
return;
}
if self.realtime_conversation.audio_player.is_none() {
self.realtime_conversation.audio_player =
crate::voice::RealtimeAudioPlayer::start(&self.config).ok();
let Some(audio_processor) = self.realtime_conversation.audio_processor.clone()
else {
self.fail_realtime_conversation(
"Realtime audio processor was unavailable".to_string(),
);
return;
};
match crate::voice::RealtimeAudioPlayer::start(&self.config, audio_processor) {
Ok(player) => self.realtime_conversation.audio_player = Some(player),
Err(err) => {
self.fail_realtime_conversation(format!(
"Failed to start speaker output: {err}"
));
return;
}
}
}
if let Some(player) = &self.realtime_conversation.audio_player
&& let Err(err) = player.enqueue_frame(frame)
@@ -347,16 +368,6 @@ impl ChatWidget {
}
}
#[cfg(not(target_os = "linux"))]
fn interrupt_realtime_audio_playback(&mut self) {
if let Some(player) = &self.realtime_conversation.audio_player {
player.clear();
}
}
#[cfg(target_os = "linux")]
fn interrupt_realtime_audio_playback(&mut self) {}
#[cfg(not(target_os = "linux"))]
fn start_realtime_local_audio(&mut self) {
if self.realtime_conversation.capture_stop_flag.is_some() {
@@ -367,12 +378,42 @@ impl ChatWidget {
self.realtime_conversation.meter_placeholder_id = Some(placeholder_id.clone());
self.request_redraw();
let audio_processor = match RealtimeAudioProcessor::new() {
Ok(audio_processor) => audio_processor,
Err(err) => {
self.realtime_conversation.meter_placeholder_id = None;
self.remove_recording_meter_placeholder(&placeholder_id);
self.fail_realtime_conversation(format!(
"Failed to start realtime audio processor: {err}"
));
return;
}
};
self.realtime_conversation.audio_processor = Some(audio_processor.clone());
let audio_player =
match crate::voice::RealtimeAudioPlayer::start(&self.config, audio_processor.clone()) {
Ok(player) => player,
Err(err) => {
self.realtime_conversation.audio_processor = None;
self.realtime_conversation.meter_placeholder_id = None;
self.remove_recording_meter_placeholder(&placeholder_id);
self.fail_realtime_conversation(format!(
"Failed to start speaker output: {err}"
));
return;
}
};
let capture = match crate::voice::VoiceCapture::start_realtime(
&self.config,
self.app_event_tx.clone(),
audio_processor,
) {
Ok(capture) => capture,
Err(err) => {
drop(audio_player);
self.realtime_conversation.audio_processor = None;
self.realtime_conversation.meter_placeholder_id = None;
self.remove_recording_meter_placeholder(&placeholder_id);
self.fail_realtime_conversation(format!(
@@ -389,10 +430,7 @@ impl ChatWidget {
self.realtime_conversation.capture_stop_flag = Some(stop_flag.clone());
self.realtime_conversation.capture = Some(capture);
if self.realtime_conversation.audio_player.is_none() {
self.realtime_conversation.audio_player =
crate::voice::RealtimeAudioPlayer::start(&self.config).ok();
}
self.realtime_conversation.audio_player = Some(audio_player);
std::thread::spawn(move || {
let mut meter = crate::voice::RecordingMeterState::new();
@@ -423,23 +461,10 @@ impl ChatWidget {
}
match kind {
RealtimeAudioDeviceKind::Microphone => {
self.stop_realtime_microphone();
RealtimeAudioDeviceKind::Microphone | RealtimeAudioDeviceKind::Speaker => {
self.stop_realtime_local_audio();
self.start_realtime_local_audio();
}
RealtimeAudioDeviceKind::Speaker => {
self.stop_realtime_speaker();
match crate::voice::RealtimeAudioPlayer::start(&self.config) {
Ok(player) => {
self.realtime_conversation.audio_player = Some(player);
}
Err(err) => {
self.fail_realtime_conversation(format!(
"Failed to start speaker output: {err}"
));
}
}
}
}
self.request_redraw();
}
@@ -453,6 +478,7 @@ impl ChatWidget {
fn stop_realtime_local_audio(&mut self) {
self.stop_realtime_microphone();
self.stop_realtime_speaker();
self.realtime_conversation.audio_processor = None;
}
#[cfg(target_os = "linux")]

View File

@@ -130,6 +130,8 @@ pub mod onboarding;
mod oss_selection;
mod pager_overlay;
pub mod public_widgets;
#[cfg(not(target_os = "linux"))]
mod realtime_audio_processing;
mod render;
mod resume_picker;
mod selection_list;
@@ -577,42 +579,15 @@ fn latest_session_lookup_params(
source_kinds: (!include_non_interactive)
.then_some(vec![ThreadSourceKind::Cli, ThreadSourceKind::VsCode]),
archived: Some(false),
cwd: cwd_filter.map(|cwd| cwd.to_string_lossy().to_string()),
cwd: if is_remote {
None
} else {
cwd_filter.map(|cwd| cwd.to_string_lossy().to_string())
},
search_term: None,
}
}
fn config_cwd_for_app_server_target(
cwd: Option<&Path>,
app_server_target: &AppServerTarget,
) -> std::io::Result<AbsolutePathBuf> {
if matches!(app_server_target, AppServerTarget::Remote { .. }) {
return AbsolutePathBuf::current_dir();
}
match cwd {
Some(path) => AbsolutePathBuf::from_absolute_path(path.canonicalize()?),
None => AbsolutePathBuf::current_dir(),
}
}
fn latest_session_cwd_filter<'a>(
remote_mode: bool,
remote_cwd_override: Option<&'a Path>,
config: &'a Config,
show_all: bool,
) -> Option<&'a Path> {
if show_all {
return None;
}
if remote_mode {
remote_cwd_override
} else {
Some(config.cwd.as_path())
}
}
pub async fn run_main(
mut cli: Cli,
arg0_paths: Arg0DispatchPaths,
@@ -631,10 +606,6 @@ pub async fn run_main(
auth_token: remote_auth_token.clone(),
})
.unwrap_or(AppServerTarget::Embedded);
let remote_cwd_override = cli
.cwd
.clone()
.filter(|_| matches!(app_server_target, AppServerTarget::Remote { .. }));
let (sandbox_mode, approval_policy) = if cli.full_auto {
(
Some(SandboxMode::WorkspaceWrite),
@@ -685,7 +656,10 @@ pub async fn run_main(
};
let cwd = cli.cwd.clone();
let config_cwd = config_cwd_for_app_server_target(cwd.as_deref(), &app_server_target)?;
let config_cwd = match cwd.as_deref() {
Some(path) => AbsolutePathBuf::from_absolute_path(path.canonicalize()?)?,
None => AbsolutePathBuf::current_dir()?,
};
#[allow(clippy::print_stderr)]
let config_toml = match load_config_as_toml_with_cli_overrides(
@@ -773,11 +747,7 @@ pub async fn run_main(
model,
approval_policy,
sandbox_mode,
cwd: if matches!(app_server_target, AppServerTarget::Remote { .. }) {
None
} else {
cwd
},
cwd,
model_provider: model_provider_override.clone(),
config_profile: cli.config_profile.clone(),
codex_self_exe: arg0_paths.codex_self_exe.clone(),
@@ -939,7 +909,6 @@ pub async fn run_main(
arg0_paths,
loader_overrides,
app_server_target,
remote_cwd_override,
config,
overrides,
cli_kv_overrides,
@@ -958,7 +927,6 @@ async fn run_ratatui_app(
arg0_paths: Arg0DispatchPaths,
loader_overrides: LoaderOverrides,
app_server_target: AppServerTarget,
remote_cwd_override: Option<PathBuf>,
initial_config: Config,
overrides: ConfigOverrides,
cli_kv_overrides: Vec<(String, toml::Value)>,
@@ -1017,21 +985,18 @@ async fn run_ratatui_app(
let needs_onboarding_app_server =
should_show_trust_screen_flag || initial_config.model_provider.requires_openai_auth;
let mut onboarding_app_server = if needs_onboarding_app_server {
Some(
AppServerSession::new(
start_app_server(
&app_server_target,
arg0_paths.clone(),
initial_config.clone(),
cli_kv_overrides.clone(),
loader_overrides.clone(),
cloud_requirements.clone(),
feedback.clone(),
)
.await?,
Some(AppServerSession::new(
start_app_server(
&app_server_target,
arg0_paths.clone(),
initial_config.clone(),
cli_kv_overrides.clone(),
loader_overrides.clone(),
cloud_requirements.clone(),
feedback.clone(),
)
.with_remote_cwd_override(remote_cwd_override.clone()),
)
.await?,
))
} else {
None
};
@@ -1134,21 +1099,18 @@ async fn run_ratatui_app(
|| cli.resume_picker
|| cli.fork_picker;
let mut session_lookup_app_server = if needs_app_server_session_lookup {
Some(
AppServerSession::new(
start_app_server(
&app_server_target,
arg0_paths.clone(),
config.clone(),
cli_kv_overrides.clone(),
loader_overrides.clone(),
cloud_requirements.clone(),
feedback.clone(),
)
.await?,
Some(AppServerSession::new(
start_app_server(
&app_server_target,
arg0_paths.clone(),
config.clone(),
cli_kv_overrides.clone(),
loader_overrides.clone(),
cloud_requirements.clone(),
feedback.clone(),
)
.with_remote_cwd_override(remote_cwd_override.clone()),
)
.await?,
))
} else {
None
};
@@ -1167,21 +1129,12 @@ async fn run_ratatui_app(
}
}
} else if cli.fork_last {
let filter_cwd = if remote_mode {
latest_session_cwd_filter(
remote_mode,
remote_cwd_override.as_deref(),
&config,
cli.fork_show_all,
)
} else {
None
};
let Some(app_server) = session_lookup_app_server.as_mut() else {
unreachable!("session lookup app server should be initialized for --fork --last");
};
match lookup_latest_session_target_with_app_server(
app_server, &config, filter_cwd, /*include_non_interactive*/ false,
app_server, &config, /*cwd_filter*/ None,
/*include_non_interactive*/ false,
)
.await?
{
@@ -1228,12 +1181,11 @@ async fn run_ratatui_app(
}
}
} else if cli.resume_last {
let filter_cwd = latest_session_cwd_filter(
remote_mode,
remote_cwd_override.as_deref(),
&config,
cli.resume_show_all,
);
let filter_cwd = if cli.resume_show_all {
None
} else {
Some(config.cwd.as_path())
};
let Some(app_server) = session_lookup_app_server.as_mut() else {
unreachable!("session lookup app server should be initialized for --resume --last");
};
@@ -1384,7 +1336,7 @@ async fn run_ratatui_app(
let app_result = App::run(
&mut tui,
AppServerSession::new(app_server).with_remote_cwd_override(remote_cwd_override),
AppServerSession::new(app_server),
config,
cli_kv_overrides.clone(),
overrides.clone(),
@@ -1845,9 +1797,12 @@ mod tests {
-> std::io::Result<()> {
let temp_dir = TempDir::new()?;
let config = build_config(&temp_dir).await?;
let cwd = temp_dir.path().join("project");
let params = latest_session_lookup_params(
/*is_remote*/ true, &config, /*cwd_filter*/ None,
/*is_remote*/ true,
&config,
Some(cwd.as_path()),
/*include_non_interactive*/ false,
);
@@ -1856,58 +1811,6 @@ mod tests {
Ok(())
}
#[tokio::test]
async fn latest_session_lookup_params_keep_explicit_cwd_filter_for_remote_sessions()
-> std::io::Result<()> {
let temp_dir = TempDir::new()?;
let config = build_config(&temp_dir).await?;
let cwd = Path::new("repo/on/server");
let params = latest_session_lookup_params(
/*is_remote*/ true,
&config,
Some(cwd),
/*include_non_interactive*/ false,
);
assert_eq!(params.model_providers, None);
assert_eq!(params.cwd.as_deref(), Some("repo/on/server"));
Ok(())
}
#[test]
fn config_cwd_for_app_server_target_uses_current_dir_for_remote_sessions() -> std::io::Result<()>
{
let remote_only_cwd = if cfg!(windows) {
Path::new(r"C:\definitely\not\local\to\this\test")
} else {
Path::new("/definitely/not/local/to/this/test")
};
let target = AppServerTarget::Remote {
websocket_url: "ws://127.0.0.1:1234/".to_string(),
auth_token: None,
};
let config_cwd = config_cwd_for_app_server_target(Some(remote_only_cwd), &target)?;
assert_eq!(config_cwd, AbsolutePathBuf::current_dir()?);
Ok(())
}
#[test]
fn config_cwd_for_app_server_target_canonicalizes_embedded_cli_cwd() -> std::io::Result<()> {
let temp_dir = TempDir::new()?;
let target = AppServerTarget::Embedded;
let config_cwd = config_cwd_for_app_server_target(Some(temp_dir.path()), &target)?;
assert_eq!(
config_cwd,
AbsolutePathBuf::from_absolute_path(temp_dir.path().canonicalize()?)?
);
Ok(())
}
#[tokio::test]
async fn read_session_cwd_returns_none_without_sqlite_or_rollout_path() -> std::io::Result<()> {
let temp_dir = TempDir::new()?;

View File

@@ -0,0 +1,237 @@
use std::collections::VecDeque;
use std::sync::Arc;
use tracing::warn;
use webrtc_audio_processing::Config as AudioProcessingConfig;
use webrtc_audio_processing::Processor;
use webrtc_audio_processing::config::EchoCanceller;
use webrtc_audio_processing::config::GainController;
use webrtc_audio_processing::config::HighPassFilter;
use webrtc_audio_processing::config::NoiseSuppression;
use webrtc_audio_processing::config::NoiseSuppressionLevel;
use webrtc_audio_processing::config::Pipeline;
pub(crate) const AUDIO_PROCESSING_SAMPLE_RATE: u32 = 24_000;
pub(crate) const AUDIO_PROCESSING_CHANNELS: u16 = 1;
#[derive(Clone)]
pub(crate) struct RealtimeAudioProcessor {
processor: Arc<Processor>,
}
impl RealtimeAudioProcessor {
pub(crate) fn new() -> Result<Self, String> {
let processor = Processor::new(AUDIO_PROCESSING_SAMPLE_RATE)
.map_err(|err| format!("failed to initialize realtime audio processor: {err}"))?;
processor.set_config(AudioProcessingConfig {
pipeline: Pipeline {
multi_channel_capture: false,
multi_channel_render: false,
..Default::default()
},
echo_canceller: Some(EchoCanceller::Full {
stream_delay_ms: None,
}),
noise_suppression: Some(NoiseSuppression {
level: NoiseSuppressionLevel::High,
..Default::default()
}),
gain_controller: Some(GainController::GainController2(Default::default())),
high_pass_filter: Some(HighPassFilter::default()),
..Default::default()
});
processor.set_output_will_be_muted(true);
Ok(Self {
processor: Arc::new(processor),
})
}
pub(crate) fn capture_stage(
&self,
input_sample_rate: u32,
input_channels: u16,
) -> RealtimeCaptureAudioProcessor {
RealtimeCaptureAudioProcessor {
processor: self.processor.clone(),
input_sample_rate,
input_channels,
pending_samples: VecDeque::new(),
}
}
pub(crate) fn render_stage(
&self,
output_sample_rate: u32,
output_channels: u16,
) -> RealtimeRenderAudioProcessor {
RealtimeRenderAudioProcessor {
processor: self.processor.clone(),
output_sample_rate,
output_channels,
pending_samples: VecDeque::new(),
}
}
pub(crate) fn set_output_will_be_muted(&self, muted: bool) {
self.processor.set_output_will_be_muted(muted);
}
}
pub(crate) struct RealtimeCaptureAudioProcessor {
processor: Arc<Processor>,
input_sample_rate: u32,
input_channels: u16,
pending_samples: VecDeque<i16>,
}
impl RealtimeCaptureAudioProcessor {
pub(crate) fn process_samples(&mut self, samples: &[i16]) -> Vec<i16> {
let converted = convert_pcm16(
samples,
self.input_sample_rate,
self.input_channels,
AUDIO_PROCESSING_SAMPLE_RATE,
AUDIO_PROCESSING_CHANNELS,
);
self.pending_samples.extend(converted);
let mut processed = Vec::new();
while self.pending_samples.len() >= self.processor.num_samples_per_frame() {
let mut frame = self.pop_pending_frame();
if let Err(err) = self.processor.process_capture_frame([frame.as_mut_slice()]) {
warn!("failed to process realtime capture audio: {err}");
continue;
}
processed.extend(frame.into_iter().map(f32_to_i16));
}
processed
}
fn pop_pending_frame(&mut self) -> Vec<f32> {
self.pending_samples
.drain(..self.processor.num_samples_per_frame())
.map(i16_to_f32)
.collect()
}
}
pub(crate) struct RealtimeRenderAudioProcessor {
processor: Arc<Processor>,
output_sample_rate: u32,
output_channels: u16,
pending_samples: VecDeque<i16>,
}
impl RealtimeRenderAudioProcessor {
pub(crate) fn process_samples(&mut self, samples: &[i16]) {
self.processor
.set_output_will_be_muted(samples.iter().all(|sample| *sample == 0));
let converted = convert_pcm16(
samples,
self.output_sample_rate,
self.output_channels,
AUDIO_PROCESSING_SAMPLE_RATE,
AUDIO_PROCESSING_CHANNELS,
);
self.pending_samples.extend(converted);
while self.pending_samples.len() >= self.processor.num_samples_per_frame() {
let mut frame = self.pop_pending_frame();
if let Err(err) = self.processor.process_render_frame([frame.as_mut_slice()]) {
warn!("failed to process realtime render audio: {err}");
}
}
}
fn pop_pending_frame(&mut self) -> Vec<f32> {
self.pending_samples
.drain(..self.processor.num_samples_per_frame())
.map(i16_to_f32)
.collect()
}
}
pub(crate) fn convert_pcm16(
input: &[i16],
input_sample_rate: u32,
input_channels: u16,
output_sample_rate: u32,
output_channels: u16,
) -> Vec<i16> {
if input.is_empty() || input_channels == 0 || output_channels == 0 {
return Vec::new();
}
let in_channels = input_channels as usize;
let out_channels = output_channels as usize;
let in_frames = input.len() / in_channels;
if in_frames == 0 {
return Vec::new();
}
let out_frames = if input_sample_rate == output_sample_rate {
in_frames
} else {
(((in_frames as u64) * (output_sample_rate as u64)) / (input_sample_rate as u64)).max(1)
as usize
};
let mut out = Vec::with_capacity(out_frames.saturating_mul(out_channels));
for out_frame_idx in 0..out_frames {
let src_frame_idx = if out_frames <= 1 || in_frames <= 1 {
0
} else {
((out_frame_idx as u64) * ((in_frames - 1) as u64) / ((out_frames - 1) as u64)) as usize
};
let src_start = src_frame_idx.saturating_mul(in_channels);
let src = &input[src_start..src_start + in_channels];
match (in_channels, out_channels) {
(1, 1) => out.push(src[0]),
(1, n) => {
for _ in 0..n {
out.push(src[0]);
}
}
(n, 1) if n >= 2 => {
let sum: i32 = src.iter().map(|s| *s as i32).sum();
out.push((sum / (n as i32)) as i16);
}
(n, m) if n == m => out.extend_from_slice(src),
(n, m) if n > m => out.extend_from_slice(&src[..m]),
(n, m) => {
out.extend_from_slice(src);
let last = *src.last().unwrap_or(&0);
for _ in n..m {
out.push(last);
}
}
}
}
out
}
#[inline]
fn i16_to_f32(sample: i16) -> f32 {
(sample as f32) / (i16::MAX as f32)
}
#[inline]
fn f32_to_i16(sample: f32) -> i16 {
(sample.clamp(-1.0, 1.0) * i16::MAX as f32) as i16
}
#[cfg(test)]
mod tests {
use super::convert_pcm16;
use pretty_assertions::assert_eq;
#[test]
fn convert_pcm16_downmixes_and_resamples_for_model_input() {
let input = vec![100, 300, 200, 400, 500, 700, 600, 800];
let converted = convert_pcm16(
&input, /*input_sample_rate*/ 48_000, /*input_channels*/ 2,
/*output_sample_rate*/ 24_000, /*output_channels*/ 1,
);
assert_eq!(converted, vec![200, 700]);
}
}

View File

@@ -1,4 +1,10 @@
use crate::app_event_sender::AppEventSender;
use crate::realtime_audio_processing::AUDIO_PROCESSING_CHANNELS;
use crate::realtime_audio_processing::AUDIO_PROCESSING_SAMPLE_RATE;
use crate::realtime_audio_processing::RealtimeAudioProcessor;
use crate::realtime_audio_processing::RealtimeCaptureAudioProcessor;
use crate::realtime_audio_processing::RealtimeRenderAudioProcessor;
use crate::realtime_audio_processing::convert_pcm16;
use base64::Engine;
use codex_core::config::Config;
use codex_protocol::protocol::ConversationAudioParams;
@@ -23,7 +29,11 @@ pub struct VoiceCapture {
}
impl VoiceCapture {
pub fn start_realtime(config: &Config, tx: AppEventSender) -> Result<Self, String> {
pub fn start_realtime(
config: &Config,
tx: AppEventSender,
audio_processor: RealtimeAudioProcessor,
) -> Result<Self, String> {
let (device, config) = select_realtime_input_device_and_config(config)?;
let sample_rate = config.sample_rate().0;
@@ -34,9 +44,8 @@ impl VoiceCapture {
let stream = build_realtime_input_stream(
&device,
&config,
sample_rate,
channels,
tx,
audio_processor.capture_stage(sample_rate, channels),
last_peak.clone(),
)?;
stream
@@ -138,50 +147,76 @@ fn select_realtime_input_device_and_config(
fn build_realtime_input_stream(
device: &cpal::Device,
config: &cpal::SupportedStreamConfig,
sample_rate: u32,
channels: u16,
tx: AppEventSender,
capture_processor: RealtimeCaptureAudioProcessor,
last_peak: Arc<AtomicU16>,
) -> Result<cpal::Stream, String> {
match config.sample_format() {
cpal::SampleFormat::F32 => device
.build_input_stream(
&config.clone().into(),
move |input: &[f32], _| {
let peak = peak_f32(input);
last_peak.store(peak, Ordering::Relaxed);
let samples = input.iter().copied().map(f32_to_i16).collect::<Vec<_>>();
send_realtime_audio_chunk(&tx, samples, sample_rate, channels);
},
move |err| error!("audio input error: {err}"),
None,
)
.map_err(|e| format!("failed to build input stream: {e}")),
cpal::SampleFormat::I16 => device
.build_input_stream(
&config.clone().into(),
move |input: &[i16], _| {
let peak = peak_i16(input);
last_peak.store(peak, Ordering::Relaxed);
send_realtime_audio_chunk(&tx, input.to_vec(), sample_rate, channels);
},
move |err| error!("audio input error: {err}"),
None,
)
.map_err(|e| format!("failed to build input stream: {e}")),
cpal::SampleFormat::U16 => device
.build_input_stream(
&config.clone().into(),
move |input: &[u16], _| {
let mut samples = Vec::with_capacity(input.len());
let peak = convert_u16_to_i16_and_peak(input, &mut samples);
last_peak.store(peak, Ordering::Relaxed);
send_realtime_audio_chunk(&tx, samples, sample_rate, channels);
},
move |err| error!("audio input error: {err}"),
None,
)
.map_err(|e| format!("failed to build input stream: {e}")),
cpal::SampleFormat::F32 => {
let mut capture_processor = capture_processor;
device
.build_input_stream(
&config.clone().into(),
move |input: &[f32], _| {
let peak = peak_f32(input);
last_peak.store(peak, Ordering::Relaxed);
let samples = input.iter().copied().map(f32_to_i16).collect::<Vec<_>>();
let samples = capture_processor.process_samples(&samples);
send_realtime_audio_chunk(
&tx,
samples,
AUDIO_PROCESSING_SAMPLE_RATE,
AUDIO_PROCESSING_CHANNELS,
);
},
move |err| error!("audio input error: {err}"),
None,
)
.map_err(|e| format!("failed to build input stream: {e}"))
}
cpal::SampleFormat::I16 => {
let mut capture_processor = capture_processor;
device
.build_input_stream(
&config.clone().into(),
move |input: &[i16], _| {
let peak = peak_i16(input);
last_peak.store(peak, Ordering::Relaxed);
let samples = capture_processor.process_samples(input);
send_realtime_audio_chunk(
&tx,
samples,
AUDIO_PROCESSING_SAMPLE_RATE,
AUDIO_PROCESSING_CHANNELS,
);
},
move |err| error!("audio input error: {err}"),
None,
)
.map_err(|e| format!("failed to build input stream: {e}"))
}
cpal::SampleFormat::U16 => {
let mut capture_processor = capture_processor;
device
.build_input_stream(
&config.clone().into(),
move |input: &[u16], _| {
let mut samples = Vec::with_capacity(input.len());
let peak = convert_u16_to_i16_and_peak(input, &mut samples);
last_peak.store(peak, Ordering::Relaxed);
let samples = capture_processor.process_samples(&samples);
send_realtime_audio_chunk(
&tx,
samples,
AUDIO_PROCESSING_SAMPLE_RATE,
AUDIO_PROCESSING_CHANNELS,
);
},
move |err| error!("audio input error: {err}"),
None,
)
.map_err(|e| format!("failed to build input stream: {e}"))
}
_ => Err("unsupported input sample format".to_string()),
}
}
@@ -283,24 +318,34 @@ fn convert_u16_to_i16_and_peak(input: &[u16], out: &mut Vec<i16>) -> u16 {
pub(crate) struct RealtimeAudioPlayer {
_stream: cpal::Stream,
queue: Arc<Mutex<VecDeque<i16>>>,
audio_processor: RealtimeAudioProcessor,
output_sample_rate: u32,
output_channels: u16,
}
impl RealtimeAudioPlayer {
pub(crate) fn start(config: &Config) -> Result<Self, String> {
pub(crate) fn start(
config: &Config,
audio_processor: RealtimeAudioProcessor,
) -> Result<Self, String> {
let (device, config) =
crate::audio_device::select_configured_output_device_and_config(config)?;
let output_sample_rate = config.sample_rate().0;
let output_channels = config.channels();
let queue = Arc::new(Mutex::new(VecDeque::new()));
let stream = build_output_stream(&device, &config, Arc::clone(&queue))?;
let stream = build_output_stream(
&device,
&config,
Arc::clone(&queue),
audio_processor.render_stage(output_sample_rate, output_channels),
)?;
stream
.play()
.map_err(|e| format!("failed to start output stream: {e}"))?;
Ok(Self {
_stream: stream,
queue,
audio_processor,
output_sample_rate,
output_channels,
})
@@ -336,6 +381,8 @@ impl RealtimeAudioPlayer {
.map_err(|_| "failed to lock output audio queue".to_string())?;
// TODO(aibrahim): Cap or trim this queue if we observe producer bursts outrunning playback.
guard.extend(converted);
drop(guard);
self.audio_processor.set_output_will_be_muted(false);
Ok(())
}
@@ -343,6 +390,7 @@ impl RealtimeAudioPlayer {
if let Ok(mut guard) = self.queue.lock() {
guard.clear();
}
self.audio_processor.set_output_will_be_muted(true);
}
}
@@ -350,140 +398,94 @@ fn build_output_stream(
device: &cpal::Device,
config: &cpal::SupportedStreamConfig,
queue: Arc<Mutex<VecDeque<i16>>>,
render_processor: RealtimeRenderAudioProcessor,
) -> Result<cpal::Stream, String> {
let config_any: cpal::StreamConfig = config.clone().into();
match config.sample_format() {
cpal::SampleFormat::F32 => device
.build_output_stream(
&config_any,
move |output: &mut [f32], _| fill_output_f32(output, &queue),
move |err| error!("audio output error: {err}"),
None,
)
.map_err(|e| format!("failed to build f32 output stream: {e}")),
cpal::SampleFormat::I16 => device
.build_output_stream(
&config_any,
move |output: &mut [i16], _| fill_output_i16(output, &queue),
move |err| error!("audio output error: {err}"),
None,
)
.map_err(|e| format!("failed to build i16 output stream: {e}")),
cpal::SampleFormat::U16 => device
.build_output_stream(
&config_any,
move |output: &mut [u16], _| fill_output_u16(output, &queue),
move |err| error!("audio output error: {err}"),
None,
)
.map_err(|e| format!("failed to build u16 output stream: {e}")),
cpal::SampleFormat::F32 => {
let mut render_processor = render_processor;
device
.build_output_stream(
&config_any,
move |output: &mut [f32], _| {
fill_output_f32(output, &queue, &mut render_processor)
},
move |err| error!("audio output error: {err}"),
None,
)
.map_err(|e| format!("failed to build f32 output stream: {e}"))
}
cpal::SampleFormat::I16 => {
let mut render_processor = render_processor;
device
.build_output_stream(
&config_any,
move |output: &mut [i16], _| {
fill_output_i16(output, &queue, &mut render_processor)
},
move |err| error!("audio output error: {err}"),
None,
)
.map_err(|e| format!("failed to build i16 output stream: {e}"))
}
cpal::SampleFormat::U16 => {
let mut render_processor = render_processor;
device
.build_output_stream(
&config_any,
move |output: &mut [u16], _| {
fill_output_u16(output, &queue, &mut render_processor)
},
move |err| error!("audio output error: {err}"),
None,
)
.map_err(|e| format!("failed to build u16 output stream: {e}"))
}
other => Err(format!("unsupported output sample format: {other:?}")),
}
}
fn fill_output_i16(output: &mut [i16], queue: &Arc<Mutex<VecDeque<i16>>>) {
if let Ok(mut guard) = queue.lock() {
for sample in output {
*sample = guard.pop_front().unwrap_or(0);
}
return;
}
output.fill(0);
fn fill_output_i16(
output: &mut [i16],
queue: &Arc<Mutex<VecDeque<i16>>>,
render_processor: &mut RealtimeRenderAudioProcessor,
) {
let samples = drain_output_samples(output.len(), queue);
output.copy_from_slice(&samples);
render_processor.process_samples(output);
}
fn fill_output_f32(output: &mut [f32], queue: &Arc<Mutex<VecDeque<i16>>>) {
if let Ok(mut guard) = queue.lock() {
for sample in output {
let v = guard.pop_front().unwrap_or(0);
*sample = (v as f32) / (i16::MAX as f32);
}
return;
fn fill_output_f32(
output: &mut [f32],
queue: &Arc<Mutex<VecDeque<i16>>>,
render_processor: &mut RealtimeRenderAudioProcessor,
) {
let samples = drain_output_samples(output.len(), queue);
for (output_sample, sample) in output.iter_mut().zip(samples.iter()) {
*output_sample = (*sample as f32) / (i16::MAX as f32);
}
output.fill(0.0);
render_processor.process_samples(&samples);
}
fn fill_output_u16(output: &mut [u16], queue: &Arc<Mutex<VecDeque<i16>>>) {
if let Ok(mut guard) = queue.lock() {
for sample in output {
let v = guard.pop_front().unwrap_or(0);
*sample = (v as i32 + 32768).clamp(0, u16::MAX as i32) as u16;
}
return;
fn fill_output_u16(
output: &mut [u16],
queue: &Arc<Mutex<VecDeque<i16>>>,
render_processor: &mut RealtimeRenderAudioProcessor,
) {
let samples = drain_output_samples(output.len(), queue);
for (output_sample, sample) in output.iter_mut().zip(samples.iter()) {
*output_sample = (*sample as i32 + 32768).clamp(0, u16::MAX as i32) as u16;
}
output.fill(32768);
render_processor.process_samples(&samples);
}
fn convert_pcm16(
input: &[i16],
input_sample_rate: u32,
input_channels: u16,
output_sample_rate: u32,
output_channels: u16,
) -> Vec<i16> {
if input.is_empty() || input_channels == 0 || output_channels == 0 {
return Vec::new();
}
let in_channels = input_channels as usize;
let out_channels = output_channels as usize;
let in_frames = input.len() / in_channels;
if in_frames == 0 {
return Vec::new();
}
let out_frames = if input_sample_rate == output_sample_rate {
in_frames
} else {
(((in_frames as u64) * (output_sample_rate as u64)) / (input_sample_rate as u64)).max(1)
as usize
fn drain_output_samples(output_len: usize, queue: &Arc<Mutex<VecDeque<i16>>>) -> Vec<i16> {
let mut samples = vec![0; output_len];
let Ok(mut guard) = queue.lock() else {
return samples;
};
let mut out = Vec::with_capacity(out_frames.saturating_mul(out_channels));
for out_frame_idx in 0..out_frames {
let src_frame_idx = if out_frames <= 1 || in_frames <= 1 {
0
} else {
((out_frame_idx as u64) * ((in_frames - 1) as u64) / ((out_frames - 1) as u64)) as usize
};
let src_start = src_frame_idx.saturating_mul(in_channels);
let src = &input[src_start..src_start + in_channels];
match (in_channels, out_channels) {
(1, 1) => out.push(src[0]),
(1, n) => {
for _ in 0..n {
out.push(src[0]);
}
}
(n, 1) if n >= 2 => {
let sum: i32 = src.iter().map(|s| *s as i32).sum();
out.push((sum / (n as i32)) as i16);
}
(n, m) if n == m => out.extend_from_slice(src),
(n, m) if n > m => out.extend_from_slice(&src[..m]),
(n, m) => {
out.extend_from_slice(src);
let last = *src.last().unwrap_or(&0);
for _ in n..m {
out.push(last);
}
}
}
}
out
}
#[cfg(test)]
mod tests {
use super::convert_pcm16;
use pretty_assertions::assert_eq;
#[test]
fn convert_pcm16_downmixes_and_resamples_for_model_input() {
let input = vec![100, 300, 200, 400, 500, 700, 600, 800];
let converted = convert_pcm16(
&input, /*input_sample_rate*/ 48_000, /*input_channels*/ 2,
/*output_sample_rate*/ 24_000, /*output_channels*/ 1,
);
assert_eq!(converted, vec![200, 700]);
for sample in &mut samples {
*sample = guard.pop_front().unwrap_or(0);
}
samples
}

View File

@@ -19,6 +19,8 @@ exports_files([
"v8_bazel_rules.patch",
"v8_module_deps.patch",
"v8_source_portability.patch",
"webrtc-audio-processing-sys_logging_macos_sdk.patch",
"webrtc-audio-processing-sys_meson_env.patch",
"windows-link.patch",
"zstd-sys_windows_msvc_include_dirs.patch",
])

View File

@@ -0,0 +1,12 @@
diff --git a/webrtc-audio-processing/webrtc/rtc_base/logging.cc b/webrtc-audio-processing/webrtc/rtc_base/logging.cc
--- a/webrtc-audio-processing/webrtc/rtc_base/logging.cc
+++ b/webrtc-audio-processing/webrtc/rtc_base/logging.cc
@@ -24,6 +24,6 @@
#undef ERROR // wingdi.h
#endif
#if defined(WEBRTC_MAC) && !defined(WEBRTC_IOS)
-#include <CoreServices/CoreServices.h>
+#include <CoreFoundation/CoreFoundation.h>
#elif defined(WEBRTC_ANDROID)
#include <android/log.h>

View File

@@ -0,0 +1,401 @@
diff --git a/build.rs b/build.rs
index 8578e9220a..bd5c9ba382 100644
--- a/build.rs
+++ b/build.rs
@@ -1,10 +1,14 @@
use anyhow::{bail, Context, Result};
use bindgen::callbacks::{AttributeInfo, DeriveInfo, ParseCallbacks};
+#[cfg(not(windows))]
+use std::os::unix::fs::symlink;
+#[cfg(windows)]
+use std::os::windows::fs::symlink_dir as symlink;
use std::{
env,
- fs::File,
+ fs::{self, File},
io::{BufWriter, Write},
- path::PathBuf,
+ path::{Path, PathBuf},
process::Command,
};
@@ -203,8 +207,136 @@ mod webrtc {
let webrtc_build_dir = build_dir.join(BUNDLED_SOURCE_PATH);
eprintln!("Building webrtc-audio-processing in {}", webrtc_build_dir.display());
- let mut meson = Command::new("meson");
+ let meson_launcher = env::var_os("MESON").expect("MESON is set by Bazel");
+ let cargo_manifest_dir =
+ env::var_os("CARGO_MANIFEST_DIR").expect("CARGO_MANIFEST_DIR is set by Bazel");
+ let runfiles_dir = Path::new(&cargo_manifest_dir)
+ .parent()
+ .expect("CARGO_MANIFEST_DIR points inside a Bazel runfiles tree");
+ let staged_meson = build_dir.join("meson");
+ fs::copy(&meson_launcher, &staged_meson).context("Failed to stage Meson launcher")?;
+ let mut meson_runfiles = staged_meson.as_os_str().to_owned();
+ meson_runfiles.push(".runfiles");
+ let _ = fs::remove_file(&meson_runfiles);
+ symlink(runfiles_dir, &meson_runfiles).context("Failed to link Meson runfiles")?;
+
+ let mut meson = Command::new(staged_meson);
+ let mut meson_cross_file = None;
+ if let (Ok(host), Ok(target)) = (env::var("HOST"), env::var("TARGET")) {
+ if host != target {
+ let mut target_parts = target.split('-');
+ let cpu_family = target_parts
+ .next()
+ .expect("TARGET triple has an architecture");
+ let system = target_parts
+ .next_back()
+ .expect("TARGET triple has an operating system");
+ let endian = match cpu_family {
+ "aarch64" | "x86_64" => "little",
+ _ => bail!("Unsupported TARGET architecture for Meson cross file: {cpu_family}"),
+ };
+ let cross_file = build_dir.join("meson-cross-file.ini");
+ fs::write(
+ &cross_file,
+ format!(
+ concat!(
+ "[host_machine]\n",
+ "system = '{system}'\n",
+ "cpu_family = '{cpu_family}'\n",
+ "cpu = '{cpu_family}'\n",
+ "endian = '{endian}'\n",
+ "\n",
+ "[properties]\n",
+ "needs_exe_wrapper = true\n",
+ ),
+ system = system,
+ cpu_family = cpu_family,
+ endian = endian,
+ ),
+ )
+ .context("Failed to write Meson cross file")?;
+ meson_cross_file = Some(cross_file);
+ }
+ }
+
+ let mut ninja_launcher = env::var_os("NINJA");
+ if ninja_launcher.is_none() {
+ for candidate in ["ninja", "ninja-build"] {
+ let can_execute = Command::new(candidate)
+ .arg("--version")
+ .status()
+ .map(|status| status.success())
+ .unwrap_or(false);
+ if can_execute {
+ ninja_launcher = Some(candidate.into());
+ break;
+ }
+ }
+ }
+ if let Some(ninja_real) = env::var_os("NINJA_REAL") {
+ let mut temp_dirs = Vec::new();
+ if let Some(dir) = env::var_os("CODEX_NINJA_TMPDIR") {
+ temp_dirs.push(PathBuf::from(dir));
+ }
+ temp_dirs.extend([
+ PathBuf::from("/private/var/tmp"),
+ PathBuf::from("/var/tmp"),
+ env::temp_dir(),
+ PathBuf::from("/tmp"),
+ ]);
+ if let Some(home) = env::var_os("HOME") {
+ temp_dirs.push(PathBuf::from(home));
+ }
+
+ let mut staged_ninja = None;
+ for (attempt, temp_dir) in temp_dirs.into_iter().enumerate() {
+ if !temp_dir.is_dir() {
+ continue;
+ }
+
+ let stage_dir =
+ temp_dir.join(format!("codex-ninja-{}-{attempt}", std::process::id()));
+ let _ = fs::remove_dir_all(&stage_dir);
+ if fs::create_dir_all(&stage_dir).is_err() {
+ continue;
+ }
+
+ let candidate_ninja = stage_dir.join("ninja");
+ let stage_result =
+ fs::copy(&ninja_real, &candidate_ninja).and_then(|_| ensure_executable(&candidate_ninja));
+ if stage_result.is_ok() {
+ staged_ninja = Some(candidate_ninja.into_os_string());
+ break;
+ }
+ }
+ ninja_launcher = staged_ninja.or_else(|| Some(ninja_real));
+ }
+ let ninja_launcher =
+ ninja_launcher.ok_or_else(|| anyhow::anyhow!("Failed to locate ninja launcher"))?;
+
+ if let Some(ldflags) = env::var("LDFLAGS").ok() {
+ let execroot = determine_execroot(runfiles_dir)?;
+ let ldflags = ldflags
+ .split_whitespace()
+ .map(|token| {
+ if let Some(path) = token.strip_prefix("-Lbazel-out") {
+ format!("-L{}", execroot.join(format!("bazel-out{path}")).display())
+ } else if token.starts_with("bazel-out") {
+ execroot.join(token).display().to_string()
+ } else {
+ token.to_owned()
+ }
+ })
+ .collect::<Vec<_>>()
+ .join(" ");
+ meson.env("LDFLAGS", ldflags);
+ }
meson.args(["setup", "--prefix", install_dir.to_str().unwrap()]);
+ meson.env("NINJA", &ninja_launcher);
+ if let Some(cross_file) = meson_cross_file {
+ meson.arg("--cross-file");
+ meson.arg(cross_file);
+ }
meson.arg("--reconfigure");
if cfg!(target_os = "macos") {
@@ -221,14 +353,14 @@ mod webrtc {
.context("Failed to execute meson. Do you have it installed?")?;
assert!(status.success(), "Command failed: {:?}", &meson);
- let mut ninja = Command::new("ninja");
+ let mut ninja = Command::new(ninja_launcher.clone());
let status = ninja
.current_dir(&webrtc_build_dir)
.status()
.context("Failed to execute ninja. Do you have it installed?")?;
assert!(status.success(), "Command failed: {:?}", &ninja);
- let mut install = Command::new("ninja");
+ let mut install = Command::new(ninja_launcher);
let status = install
.current_dir(&webrtc_build_dir)
.arg("install")
@@ -267,15 +399,32 @@ mod webrtc {
/// Extract defined (non-external) symbols from a static library using nm.
fn get_defined_symbols(archive_path: &std::path::Path) -> Result<Vec<String>> {
- let output = Command::new("nm")
+ let nm = determine_nm_path();
+ let output = match Command::new(&nm)
.arg("--defined-only")
.arg("--format=posix")
.arg(archive_path)
.output()
- .context("Failed to execute nm")?;
+ {
+ Ok(output) => output,
+ Err(error)
+ if env::var("CARGO_CFG_TARGET_OS")
+ .map(|target_os| target_os == "macos")
+ .or_else(|_| env::var("TARGET").map(|target| target.contains("apple-darwin")))
+ .unwrap_or(false)
+ && error.kind() == std::io::ErrorKind::NotFound =>
+ {
+ println!(
+ "cargo:warning=Skipping symbol prefixing because {:?} is unavailable",
+ nm
+ );
+ return Ok(Vec::new());
+ }
+ Err(error) => return Err(error).context(format!("Failed to execute {:?}", nm)),
+ };
if !output.status.success() {
- anyhow::bail!("nm failed: {}", String::from_utf8_lossy(&output.stderr));
+ anyhow::bail!("{:?} failed: {}", nm, String::from_utf8_lossy(&output.stderr));
}
let stdout = String::from_utf8_lossy(&output.stdout);
@@ -400,6 +549,7 @@ fn main() -> Result<()> {
}
let binding_file = out_dir().join("bindings.rs");
+ maybe_set_libclang_path();
let mut builder = bindgen::Builder::default()
.header("src/wrapper.hpp")
.clang_args(&["-x", "c++", "-std=c++17", "-fparse-all-comments"])
@@ -461,3 +611,180 @@ fn determine_objcopy_path() -> Result<PathBuf> {
Ok(objcopy)
}
+
+fn determine_execroot(path: &Path) -> Result<PathBuf> {
+ for ancestor in path.ancestors() {
+ if ancestor.file_name().and_then(|name| name.to_str()) == Some("bazel-out") {
+ return ancestor
+ .parent()
+ .map(Path::to_path_buf)
+ .context("bazel-out directory is missing its execroot parent");
+ }
+ }
+
+ env::current_dir().context("Failed to determine execroot")
+}
+
+#[cfg(not(windows))]
+fn ensure_executable(path: &Path) -> std::io::Result<()> {
+ use std::os::unix::fs::PermissionsExt;
+
+ let mut permissions = fs::metadata(path)?.permissions();
+ permissions.set_mode(0o755);
+ fs::set_permissions(path, permissions)
+}
+
+#[cfg(windows)]
+fn ensure_executable(_path: &Path) -> std::io::Result<()> {
+ Ok(())
+}
+
+fn determine_nm_path() -> PathBuf {
+ if let Some(nm) = env::var_os("NM") {
+ return nm.into();
+ }
+
+ if env::var("CARGO_CFG_TARGET_OS")
+ .map(|target_os| target_os == "macos")
+ .or_else(|_| env::var("TARGET").map(|target| target.contains("apple-darwin")))
+ .unwrap_or(false)
+ {
+ let xcrun_output = Command::new("/usr/bin/xcrun")
+ .args(["--find", "llvm-nm"])
+ .output();
+ if let Ok(output) = xcrun_output {
+ if output.status.success() {
+ let path = String::from_utf8_lossy(&output.stdout).trim().to_owned();
+ if !path.is_empty() {
+ return PathBuf::from(path);
+ }
+ }
+ }
+ }
+
+ for tool_env in ["AR", "CC", "CXX"] {
+ if let Some(tool_path) = env::var_os(tool_env) {
+ let tool_path = PathBuf::from(tool_path);
+ if let Some(parent) = tool_path.parent() {
+ for candidate_name in ["llvm-nm", "nm"] {
+ let candidate = parent.join(candidate_name);
+ if candidate.exists() {
+ return candidate;
+ }
+ }
+ }
+
+ if let Some(file_name) = tool_path.file_name().and_then(|name| name.to_str()) {
+ let candidate_name = file_name.replace("llvm-ar", "llvm-nm");
+ if candidate_name != file_name {
+ let candidate = tool_path.with_file_name(candidate_name);
+ if candidate.exists() {
+ return candidate;
+ }
+ }
+
+ if file_name != "llvm-nm" {
+ let candidate = tool_path.with_file_name("llvm-nm");
+ if candidate.exists() {
+ return candidate;
+ }
+ }
+ }
+ }
+ }
+
+ for candidate in ["llvm-nm", "nm"] {
+ if Command::new(candidate)
+ .arg("--version")
+ .status()
+ .map(|status| status.success())
+ .unwrap_or(false)
+ {
+ return PathBuf::from(candidate);
+ }
+ }
+
+ PathBuf::from("nm")
+}
+
+fn maybe_set_libclang_path() {
+ if env::var_os("LIBCLANG_PATH").is_some() {
+ return;
+ }
+
+ for tool_env in ["CC", "CXX"] {
+ let Some(tool_value) = env::var_os(tool_env) else {
+ continue;
+ };
+ let tool_path = PathBuf::from(tool_value);
+ let tool_path = if tool_path.is_absolute() {
+ tool_path
+ } else if let Ok(current_dir) = env::current_dir() {
+ let current_dir_candidate = current_dir.join(&tool_path);
+ if current_dir_candidate.exists() {
+ current_dir_candidate
+ } else if let Some(manifest_dir) = env::var_os("CARGO_MANIFEST_DIR").map(PathBuf::from) {
+ let mut resolved = tool_path.clone();
+ for ancestor in manifest_dir.ancestors() {
+ let ancestor_candidate = ancestor.join(&tool_path);
+ if ancestor_candidate.exists() {
+ resolved = ancestor_candidate;
+ break;
+ }
+ }
+ resolved
+ } else {
+ tool_path
+ }
+ } else {
+ tool_path
+ };
+
+ let Some(tool_dir) = tool_path.parent() else {
+ continue;
+ };
+ let Some(tool_root) = tool_dir.parent() else {
+ continue;
+ };
+
+ let mut directories = vec![tool_root.join("lib"), tool_root.join("lib64")];
+ if let Ok(read_dir) = fs::read_dir(tool_root) {
+ directories.extend(
+ read_dir
+ .filter_map(std::result::Result::ok)
+ .map(|entry| entry.path())
+ .filter(|path| path.is_dir()),
+ );
+ }
+
+ while let Some(directory) = directories.pop() {
+ let read_dir = match fs::read_dir(&directory) {
+ Ok(read_dir) => read_dir,
+ Err(_) => continue,
+ };
+
+ for entry in read_dir.filter_map(std::result::Result::ok) {
+ let path = entry.path();
+ if path.is_dir() {
+ directories.push(path);
+ continue;
+ }
+
+ let Some(file_name) = path.file_name().and_then(|name| name.to_str()) else {
+ continue;
+ };
+ let is_libclang = matches!(file_name, "libclang.so" | "libclang.dll" | "libclang.dylib")
+ || file_name.starts_with("libclang.so.")
+ || file_name.starts_with("libclang-");
+ if !is_libclang {
+ continue;
+ }
+
+ if let Some(parent) = path.parent() {
+ env::set_var("LIBCLANG_PATH", parent);
+ return;
+ }
+ }
+ }
+ }
+}

View File

@@ -0,0 +1,4 @@
exports_files(
["ninja_wrapper.sh"],
visibility = ["//visibility:public"],
)

View File

@@ -0,0 +1,30 @@
#!/bin/sh
set -eu
real_ninja="${NINJA_REAL:?NINJA_REAL must point to the downloaded ninja binary}"
try_exec() {
candidate_dir="$1"
[ -n "${candidate_dir}" ] || return 1
[ -d "${candidate_dir}" ] || return 1
staged_dir="$(mktemp -d "${candidate_dir%/}/codex-ninja.XXXXXX" 2>/dev/null)" || return 1
staged_ninja="${staged_dir}/ninja"
if cp "${real_ninja}" "${staged_ninja}" 2>/dev/null \
&& chmod +x "${staged_ninja}" 2>/dev/null \
&& "${staged_ninja}" --version >/dev/null 2>&1; then
exec "${staged_ninja}" "$@"
fi
rm -rf "${staged_dir}"
return 1
}
for candidate_dir in "${CODEX_NINJA_TMPDIR:-}" /private/var/tmp /var/tmp "${TMPDIR:-}" /tmp "${HOME:-}"; do
try_exec "${candidate_dir}"
done
echo "codex ninja wrapper: unable to stage an executable Ninja binary" >&2
exit 1