Compare commits

...

12 Commits

Author SHA1 Message Date
Jeremy lewi
a050c5ee91 refactor: add wasm-safe shims for host-only runtime crates 2026-04-14 14:42:28 -07:00
Jeremy lewi
dfbd21e294 refactor: make codex-tools rmcp-optional on wasm 2026-04-14 14:24:13 -07:00
Jeremy lewi
e8cdf68bdd refactor: make rollout and app protocol wasm-tolerant 2026-04-14 14:21:15 -07:00
Jeremy lewi
54d765635a refactor: split wasm harness into reusable library surface
Expose a pure Rust EmbeddedHarness API in codex-wasm-harness so downstream browser apps can depend on the crate directly without going through the demo-specific wasm_bindgen wrapper. Introduce trait seams for Responses transport, tool execution, and event delivery, move shared response/event types into dedicated modules, and keep BrowserCodex as a thin compatibility adapter so the existing browser demo continues to build and run unchanged.
2026-04-14 09:25:53 -07:00
Jeremy Lewi
11e16ef2e8 feat: run a real browser-side responses tool loop
Upgrade the wasm harness from a one-shot sampler to a real Responses API loop that uses the Codex prompt, handles function_call / function_call_output turns, and exposes a browser exec_js callback. The example page now provisions a sandboxed iframe kernel, persists the API key in localStorage, surfaces browser errors inline, and has been verified in Chrome Testing with a live prompt that executes JavaScript and completes the turn.
2026-04-13 10:18:03 -07:00
Jeremy Lewi
bb02604501 refactor: gate codex-core websocket transport on native targets
Move the direct tokio-tungstenite dependency behind a non-wasm cfg and teach ModelClientSession to compile without websocket transport on wasm. The browser path now cleanly defaults to HTTP-only Responses usage while native builds retain the existing websocket flow.
2026-04-13 10:17:54 -07:00
Jeremy Lewi
609b6168b3 refactor: make shared path and cache utilities wasm-safe
Move native-only path and Tokio dependencies behind cfg gates so shared crates can compile for wasm32. Replace the absolute-path implementation with a lexical normalization fallback on wasm, use a std mutex for the cache helper on wasm, and degrade git symlink creation to an unsupported-platform error instead of a compile-time failure.
2026-04-13 09:58:43 -07:00
Jeremy Lewi
5bc4c5c997 refactor: trim wasm blockers from shared api dependencies
Compile websocket-only exports out of codex-api on wasm and stop enabling Tokio net there by default. Also cut the Tokio process dependency out of wasm builds of codex-git-utils by gating the async git-info module, move codex-utils-image's Tokio dependency to tests, and enable uuid's js RNG feature for wasm in codex-protocol. Native cargo checks passed for codex-api, codex-git-utils, and codex-protocol after the refactor.
2026-04-13 09:55:19 -07:00
Jeremy Lewi
ee4138c363 refactor: make code-mode crate wasm-safe
Gate the native V8 dependency out of wasm builds and provide a wasm-specific default CodeModeService that preserves the shared request/response/runtime trait surface while returning clear errors unless a browser runtime is injected. Keep the public code-mode types available on wasm and mark the V8-backed runtime internals as native-only so codex-code-mode now checks cleanly for wasm32-unknown-unknown.
2026-04-13 09:51:19 -07:00
Jeremy Lewi
3ffd99cd3d refactor: inject code mode runtime into codex sessions
Extract a CodeModeRuntime trait from codex-code-mode and make the core code-mode wrapper hold an injected runtime instead of constructing the native implementation inline. Thread an optional runtime override through Codex spawn/session startup and expose a ThreadManager entrypoint that can start a thread with a caller-provided runtime. Re-export the runtime trait from codex-core so downstream browser harness code can consume the seam without reaching into transitive dependencies.
2026-04-13 09:40:19 -07:00
Jeremy Lewi
fa5d14a81f feat: move wasm sampling into rust 2026-04-10 17:09:07 -07:00
Jeremy Lewi
032b8dddb6 feat: add codex wasm harness prototype 2026-04-10 16:41:22 -07:00
66 changed files with 4738 additions and 289 deletions

16
codex-rs/Cargo.lock generated
View File

@@ -2923,6 +2923,22 @@ dependencies = [
"v8",
]
[[package]]
name = "codex-wasm-harness"
version = "0.0.0"
dependencies = [
"async-trait",
"codex-core",
"js-sys",
"pretty_assertions",
"serde",
"serde_json",
"tokio",
"wasm-bindgen",
"wasm-bindgen-futures",
"web-sys",
]
[[package]]
name = "codex-windows-sandbox"
version = "0.0.0"

View File

@@ -52,6 +52,7 @@ members = [
"tui",
"tools",
"v8-poc",
"wasm-harness",
"utils/absolute-path",
"utils/cargo-bin",
"git-utils",
@@ -229,6 +230,7 @@ insta = "1.46.3"
inventory = "0.3.19"
itertools = "0.14.0"
jsonwebtoken = "9.3.1"
js-sys = "0.3.85"
keyring = { version = "3.6", default-features = false }
landlock = "0.4.4"
lazy_static = "1"
@@ -335,6 +337,9 @@ urlencoding = "2.1"
uuid = "1"
vt100 = "0.16.2"
walkdir = "2.5.0"
wasm-bindgen = "0.2.108"
wasm-bindgen-futures = "0.4.58"
web-sys = "0.3.85"
webbrowser = "1.0"
which = "8"
wildmatch = "2.6.1"

View File

@@ -15,16 +15,18 @@ workspace = true
[dependencies]
codex-app-server-protocol = { workspace = true }
codex-git-utils = { workspace = true }
codex-login = { workspace = true }
codex-plugin = { workspace = true }
codex-protocol = { workspace = true }
serde = { workspace = true, features = ["derive"] }
sha1 = { workspace = true }
tracing = { workspace = true, features = ["log"] }
[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
codex-login = { workspace = true }
tokio = { workspace = true, features = [
"macros",
"rt-multi-thread",
] }
tracing = { workspace = true, features = ["log"] }
[dev-dependencies]
pretty_assertions = { workspace = true }

View File

@@ -1,17 +1,7 @@
mod analytics_client;
#[cfg(not(target_arch = "wasm32"))]
include!("native.rs");
pub use analytics_client::AnalyticsEventsClient;
pub use analytics_client::AnalyticsFact;
pub use analytics_client::AnalyticsReducer;
pub use analytics_client::AppInvocation;
pub use analytics_client::AppMentionedInput;
pub use analytics_client::AppUsedInput;
pub use analytics_client::CustomAnalyticsFact;
pub use analytics_client::InvocationType;
pub use analytics_client::PluginState;
pub use analytics_client::PluginStateChangedInput;
pub use analytics_client::PluginUsedInput;
pub use analytics_client::SkillInvocation;
pub use analytics_client::SkillInvokedInput;
pub use analytics_client::TrackEventsContext;
pub use analytics_client::build_track_events_context;
#[cfg(target_arch = "wasm32")]
mod wasm;
#[cfg(target_arch = "wasm32")]
pub use wasm::*;

View File

@@ -0,0 +1,17 @@
mod analytics_client;
pub use analytics_client::AnalyticsEventsClient;
pub use analytics_client::AnalyticsFact;
pub use analytics_client::AnalyticsReducer;
pub use analytics_client::AppInvocation;
pub use analytics_client::AppMentionedInput;
pub use analytics_client::AppUsedInput;
pub use analytics_client::CustomAnalyticsFact;
pub use analytics_client::InvocationType;
pub use analytics_client::PluginState;
pub use analytics_client::PluginStateChangedInput;
pub use analytics_client::PluginUsedInput;
pub use analytics_client::SkillInvocation;
pub use analytics_client::SkillInvokedInput;
pub use analytics_client::TrackEventsContext;
pub use analytics_client::build_track_events_context;

View File

@@ -0,0 +1,170 @@
use codex_plugin::PluginTelemetryMetadata;
use codex_protocol::protocol::SkillScope;
use serde::Serialize;
use std::path::PathBuf;
use std::sync::Arc;
#[derive(Clone)]
pub struct TrackEventsContext {
pub model_slug: String,
pub thread_id: String,
pub turn_id: String,
}
pub fn build_track_events_context(
model_slug: String,
thread_id: String,
turn_id: String,
) -> TrackEventsContext {
TrackEventsContext {
model_slug,
thread_id,
turn_id,
}
}
#[derive(Clone, Debug)]
pub struct SkillInvocation {
pub skill_name: String,
pub skill_scope: SkillScope,
pub skill_path: PathBuf,
pub invocation_type: InvocationType,
}
#[derive(Clone, Copy, Debug, Serialize)]
#[serde(rename_all = "lowercase")]
pub enum InvocationType {
Explicit,
Implicit,
}
pub struct AppInvocation {
pub connector_id: Option<String>,
pub app_name: Option<String>,
pub invocation_type: Option<InvocationType>,
}
pub enum AnalyticsFact {
Initialize {
connection_id: u64,
params: codex_app_server_protocol::InitializeParams,
},
Request {
connection_id: u64,
request_id: codex_app_server_protocol::RequestId,
request: Box<codex_app_server_protocol::ClientRequest>,
},
Response {
connection_id: u64,
response: Box<codex_app_server_protocol::ClientResponse>,
},
Notification(Box<codex_app_server_protocol::ServerNotification>),
Custom(CustomAnalyticsFact),
}
pub enum CustomAnalyticsFact {
SkillInvoked(SkillInvokedInput),
AppMentioned(AppMentionedInput),
AppUsed(AppUsedInput),
PluginUsed(PluginUsedInput),
PluginStateChanged(PluginStateChangedInput),
}
pub struct SkillInvokedInput {
pub tracking: TrackEventsContext,
pub invocations: Vec<SkillInvocation>,
}
pub struct AppMentionedInput {
pub tracking: TrackEventsContext,
pub mentions: Vec<AppInvocation>,
}
pub struct AppUsedInput {
pub tracking: TrackEventsContext,
pub app: AppInvocation,
}
pub struct PluginUsedInput {
pub tracking: TrackEventsContext,
pub plugin: PluginTelemetryMetadata,
}
pub struct PluginStateChangedInput {
pub plugin: PluginTelemetryMetadata,
pub state: PluginState,
}
#[derive(Clone, Copy)]
pub enum PluginState {
Installed,
Uninstalled,
Enabled,
Disabled,
}
#[derive(Default)]
pub struct AnalyticsReducer;
#[derive(Clone, Default)]
pub struct AnalyticsEventsClient {
analytics_enabled: Option<bool>,
_marker: Arc<()>,
}
impl AnalyticsEventsClient {
pub fn new<T>(
_auth_manager: Arc<T>,
_base_url: String,
analytics_enabled: Option<bool>,
) -> Self {
Self {
analytics_enabled,
_marker: Arc::new(()),
}
}
pub fn track_skill_invocations(
&self,
_tracking: TrackEventsContext,
_invocations: Vec<SkillInvocation>,
) {
let _ = self.analytics_enabled;
}
pub fn track_app_mentioned(
&self,
_tracking: TrackEventsContext,
_mentions: Vec<AppInvocation>,
) {
let _ = self.analytics_enabled;
}
pub fn track_app_used(&self, _tracking: TrackEventsContext, _app: AppInvocation) {
let _ = self.analytics_enabled;
}
pub fn track_plugin_used(
&self,
_tracking: TrackEventsContext,
_plugin: PluginTelemetryMetadata,
) {
let _ = self.analytics_enabled;
}
pub fn track_plugin_installed(&self, _plugin: PluginTelemetryMetadata) {
let _ = self.analytics_enabled;
}
pub fn track_plugin_uninstalled(&self, _plugin: PluginTelemetryMetadata) {
let _ = self.analytics_enabled;
}
pub fn track_plugin_enabled(&self, _plugin: PluginTelemetryMetadata) {
let _ = self.analytics_enabled;
}
pub fn track_plugin_disabled(&self, _plugin: PluginTelemetryMetadata) {
let _ = self.analytics_enabled;
}
}

View File

@@ -25,16 +25,18 @@ serde_with = { workspace = true }
shlex = { workspace = true }
strum_macros = { workspace = true }
thiserror = { workspace = true }
ts-rs = { workspace = true }
inventory = { workspace = true }
tracing = { workspace = true }
uuid = { workspace = true, features = ["serde", "v7"] }
[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
rmcp = { workspace = true, default-features = false, features = [
"base64",
"macros",
"schemars",
"server",
] }
ts-rs = { workspace = true }
inventory = { workspace = true }
tracing = { workspace = true }
uuid = { workspace = true, features = ["serde", "v7"] }
[dev-dependencies]
anyhow = { workspace = true }

View File

@@ -5295,6 +5295,7 @@ impl McpServerElicitationAction {
}
}
#[cfg(not(target_arch = "wasm32"))]
impl From<McpServerElicitationAction> for rmcp::model::ElicitationAction {
fn from(value: McpServerElicitationAction) -> Self {
match value {
@@ -5305,6 +5306,7 @@ impl From<McpServerElicitationAction> for rmcp::model::ElicitationAction {
}
}
#[cfg(not(target_arch = "wasm32"))]
impl From<rmcp::model::ElicitationAction> for McpServerElicitationAction {
fn from(value: rmcp::model::ElicitationAction) -> Self {
match value {
@@ -5722,6 +5724,7 @@ pub struct McpServerElicitationRequestResponse {
pub meta: Option<JsonValue>,
}
#[cfg(not(target_arch = "wasm32"))]
impl From<McpServerElicitationRequestResponse> for rmcp::model::CreateElicitationResult {
fn from(value: McpServerElicitationRequestResponse) -> Self {
Self {
@@ -5731,6 +5734,7 @@ impl From<McpServerElicitationRequestResponse> for rmcp::model::CreateElicitatio
}
}
#[cfg(not(target_arch = "wasm32"))]
impl From<rmcp::model::CreateElicitationResult> for McpServerElicitationRequestResponse {
fn from(value: rmcp::model::CreateElicitationResult) -> Self {
Self {
@@ -7186,6 +7190,7 @@ mod tests {
assert_eq!(reason, Some("askForApproval.granular"));
}
#[cfg(not(target_arch = "wasm32"))]
#[test]
fn mcp_server_elicitation_response_round_trips_rmcp_result() {
let rmcp_result = rmcp::model::CreateElicitationResult {

View File

@@ -19,7 +19,9 @@ serde_json = { workspace = true }
tokio = { workspace = true, features = ["macros", "rt", "sync", "time"] }
tokio-util = { workspace = true, features = ["rt"] }
tracing = { workspace = true }
v8 = { workspace = true }
[dev-dependencies]
pretty_assertions = { workspace = true }
[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
v8 = { workspace = true }

View File

@@ -225,6 +225,7 @@ pub fn augment_tool_definition(mut definition: ToolDefinition) -> ToolDefinition
definition
}
#[cfg(not(target_arch = "wasm32"))]
pub fn enabled_tool_metadata(definition: &ToolDefinition) -> EnabledToolMetadata {
EnabledToolMetadata {
tool_name: definition.name.clone(),
@@ -234,6 +235,7 @@ pub fn enabled_tool_metadata(definition: &ToolDefinition) -> EnabledToolMetadata
}
}
#[cfg(not(target_arch = "wasm32"))]
#[derive(Clone, Debug, Eq, PartialEq, Serialize)]
pub struct EnabledToolMetadata {
pub tool_name: String,

View File

@@ -1,6 +1,7 @@
mod description;
mod response;
mod runtime;
#[cfg_attr(target_arch = "wasm32", path = "service_wasm.rs")]
mod service;
pub use description::CODE_MODE_PRAGMA_PREFIX;
@@ -22,9 +23,11 @@ pub use runtime::DEFAULT_WAIT_YIELD_TIME_MS;
pub use runtime::ExecuteRequest;
pub use runtime::RuntimeResponse;
pub use runtime::WaitRequest;
pub use service::CodeModeRuntime;
pub use service::CodeModeService;
pub use service::CodeModeTurnHost;
pub use service::CodeModeTurnWorker;
pub use service::CodeModeTurnWorkerHandle;
pub const PUBLIC_TOOL_NAME: &str = "exec";
pub const WAIT_TOOL_NAME: &str = "wait";

View File

@@ -1,24 +1,35 @@
#[cfg(not(target_arch = "wasm32"))]
mod callbacks;
#[cfg(not(target_arch = "wasm32"))]
mod globals;
#[cfg(not(target_arch = "wasm32"))]
mod module_loader;
#[cfg(not(target_arch = "wasm32"))]
mod value;
use std::collections::HashMap;
use std::sync::OnceLock;
use std::sync::mpsc as std_mpsc;
use std::thread;
use serde_json::Value as JsonValue;
use tokio::sync::mpsc;
#[cfg(not(target_arch = "wasm32"))]
use crate::description::EnabledToolMetadata;
use crate::description::ToolDefinition;
#[cfg(not(target_arch = "wasm32"))]
use crate::description::enabled_tool_metadata;
use crate::response::FunctionCallOutputContentItem;
#[cfg(not(target_arch = "wasm32"))]
use std::sync::OnceLock;
#[cfg(not(target_arch = "wasm32"))]
use std::sync::mpsc as std_mpsc;
#[cfg(not(target_arch = "wasm32"))]
use std::thread;
#[cfg(not(target_arch = "wasm32"))]
use tokio::sync::mpsc;
pub const DEFAULT_EXEC_YIELD_TIME_MS: u64 = 10_000;
pub const DEFAULT_WAIT_YIELD_TIME_MS: u64 = 10_000;
pub const DEFAULT_MAX_OUTPUT_TOKENS_PER_EXEC_CALL: usize = 10_000;
#[cfg(not(target_arch = "wasm32"))]
const EXIT_SENTINEL: &str = "__codex_code_mode_exit__";
#[derive(Clone, Debug)]
@@ -56,6 +67,7 @@ pub enum RuntimeResponse {
},
}
#[cfg(not(target_arch = "wasm32"))]
#[derive(Debug)]
pub(crate) enum TurnMessage {
ToolCall {
@@ -71,6 +83,7 @@ pub(crate) enum TurnMessage {
},
}
#[cfg(not(target_arch = "wasm32"))]
#[derive(Debug)]
pub(crate) enum RuntimeCommand {
ToolResponse { id: String, result: JsonValue },
@@ -78,6 +91,7 @@ pub(crate) enum RuntimeCommand {
Terminate,
}
#[cfg(not(target_arch = "wasm32"))]
#[derive(Debug)]
pub(crate) enum RuntimeEvent {
Started,
@@ -98,6 +112,7 @@ pub(crate) enum RuntimeEvent {
},
}
#[cfg(not(target_arch = "wasm32"))]
pub(crate) fn spawn_runtime(
request: ExecuteRequest,
event_tx: mpsc::UnboundedSender<RuntimeEvent>,
@@ -126,6 +141,7 @@ pub(crate) fn spawn_runtime(
Ok((command_tx, isolate_handle))
}
#[cfg(not(target_arch = "wasm32"))]
#[derive(Clone)]
struct RuntimeConfig {
tool_call_id: String,
@@ -134,6 +150,7 @@ struct RuntimeConfig {
stored_values: HashMap<String, JsonValue>,
}
#[cfg(not(target_arch = "wasm32"))]
pub(super) struct RuntimeState {
event_tx: mpsc::UnboundedSender<RuntimeEvent>,
pending_tool_calls: HashMap<String, v8::Global<v8::PromiseResolver>>,
@@ -144,6 +161,7 @@ pub(super) struct RuntimeState {
exit_requested: bool,
}
#[cfg(not(target_arch = "wasm32"))]
pub(super) enum CompletionState {
Pending,
Completed {
@@ -152,6 +170,7 @@ pub(super) enum CompletionState {
},
}
#[cfg(not(target_arch = "wasm32"))]
fn initialize_v8() {
static PLATFORM: OnceLock<v8::SharedRef<v8::Platform>> = OnceLock::new();
@@ -163,6 +182,7 @@ fn initialize_v8() {
});
}
#[cfg(not(target_arch = "wasm32"))]
fn run_runtime(
config: RuntimeConfig,
event_tx: mpsc::UnboundedSender<RuntimeEvent>,
@@ -264,6 +284,7 @@ fn run_runtime(
}
}
#[cfg(not(target_arch = "wasm32"))]
fn capture_scope_send_error(
scope: &mut v8::PinScope<'_, '_>,
event_tx: &mpsc::UnboundedSender<RuntimeEvent>,
@@ -277,6 +298,7 @@ fn capture_scope_send_error(
send_result(event_tx, stored_values, error_text);
}
#[cfg(not(target_arch = "wasm32"))]
fn send_result(
event_tx: &mpsc::UnboundedSender<RuntimeEvent>,
stored_values: HashMap<String, JsonValue>,
@@ -288,7 +310,7 @@ fn send_result(
});
}
#[cfg(test)]
#[cfg(all(test, not(target_arch = "wasm32")))]
mod tests {
use std::collections::HashMap;
use std::time::Duration;

View File

@@ -34,6 +34,22 @@ pub trait CodeModeTurnHost: Send + Sync {
async fn notify(&self, call_id: String, cell_id: String, text: String) -> Result<(), String>;
}
#[async_trait]
pub trait CodeModeRuntime: Send + Sync {
async fn stored_values(&self) -> HashMap<String, JsonValue>;
async fn replace_stored_values(&self, values: HashMap<String, JsonValue>);
async fn execute(&self, request: ExecuteRequest) -> Result<RuntimeResponse, String>;
async fn wait(&self, request: WaitRequest) -> Result<RuntimeResponse, String>;
fn start_turn_worker(
&self,
host: Arc<dyn CodeModeTurnHost>,
) -> Box<dyn CodeModeTurnWorkerHandle>;
}
#[derive(Clone)]
struct SessionHandle {
control_tx: mpsc::UnboundedSender<SessionControlCommand>,
@@ -219,6 +235,10 @@ pub struct CodeModeTurnWorker {
shutdown_tx: Option<oneshot::Sender<()>>,
}
pub trait CodeModeTurnWorkerHandle: Send {}
impl CodeModeTurnWorkerHandle for CodeModeTurnWorker {}
impl Drop for CodeModeTurnWorker {
fn drop(&mut self) {
if let Some(shutdown_tx) = self.shutdown_tx.take() {
@@ -227,6 +247,32 @@ impl Drop for CodeModeTurnWorker {
}
}
#[async_trait]
impl CodeModeRuntime for CodeModeService {
async fn stored_values(&self) -> HashMap<String, JsonValue> {
CodeModeService::stored_values(self).await
}
async fn replace_stored_values(&self, values: HashMap<String, JsonValue>) {
CodeModeService::replace_stored_values(self, values).await;
}
async fn execute(&self, request: ExecuteRequest) -> Result<RuntimeResponse, String> {
CodeModeService::execute(self, request).await
}
async fn wait(&self, request: WaitRequest) -> Result<RuntimeResponse, String> {
CodeModeService::wait(self, request).await
}
fn start_turn_worker(
&self,
host: Arc<dyn CodeModeTurnHost>,
) -> Box<dyn CodeModeTurnWorkerHandle> {
Box::new(CodeModeService::start_turn_worker(self, host))
}
}
enum SessionControlCommand {
Poll {
yield_time_ms: u64,

View File

@@ -0,0 +1,119 @@
use std::collections::HashMap;
use std::sync::Arc;
use async_trait::async_trait;
use serde_json::Value as JsonValue;
use tokio::sync::Mutex;
use tokio_util::sync::CancellationToken;
use crate::ExecuteRequest;
use crate::RuntimeResponse;
use crate::WaitRequest;
#[async_trait]
pub trait CodeModeTurnHost: Send + Sync {
async fn invoke_tool(
&self,
tool_name: String,
input: Option<JsonValue>,
cancellation_token: CancellationToken,
) -> Result<JsonValue, String>;
async fn notify(&self, call_id: String, cell_id: String, text: String) -> Result<(), String>;
}
#[async_trait]
pub trait CodeModeRuntime: Send + Sync {
async fn stored_values(&self) -> HashMap<String, JsonValue>;
async fn replace_stored_values(&self, values: HashMap<String, JsonValue>);
async fn execute(&self, request: ExecuteRequest) -> Result<RuntimeResponse, String>;
async fn wait(&self, request: WaitRequest) -> Result<RuntimeResponse, String>;
fn start_turn_worker(
&self,
host: Arc<dyn CodeModeTurnHost>,
) -> Box<dyn CodeModeTurnWorkerHandle>;
}
pub struct CodeModeService {
stored_values: Mutex<HashMap<String, JsonValue>>,
}
impl CodeModeService {
pub fn new() -> Self {
Self {
stored_values: Mutex::new(HashMap::new()),
}
}
pub async fn stored_values(&self) -> HashMap<String, JsonValue> {
self.stored_values.lock().await.clone()
}
pub async fn replace_stored_values(&self, values: HashMap<String, JsonValue>) {
*self.stored_values.lock().await = values;
}
pub async fn execute(&self, _request: ExecuteRequest) -> Result<RuntimeResponse, String> {
Err(
"native code mode runtime is unavailable on wasm32; inject a browser CodeModeRuntime"
.to_string(),
)
}
pub async fn wait(&self, request: WaitRequest) -> Result<RuntimeResponse, String> {
Ok(RuntimeResponse::Result {
cell_id: request.cell_id,
content_items: Vec::new(),
stored_values: self.stored_values().await,
error_text: Some(
"code mode wait is unavailable on wasm32 without an injected runtime".to_string(),
),
})
}
pub fn start_turn_worker(&self, _host: Arc<dyn CodeModeTurnHost>) -> CodeModeTurnWorker {
CodeModeTurnWorker
}
}
impl Default for CodeModeService {
fn default() -> Self {
Self::new()
}
}
pub struct CodeModeTurnWorker;
pub trait CodeModeTurnWorkerHandle: Send {}
impl CodeModeTurnWorkerHandle for CodeModeTurnWorker {}
#[async_trait]
impl CodeModeRuntime for CodeModeService {
async fn stored_values(&self) -> HashMap<String, JsonValue> {
CodeModeService::stored_values(self).await
}
async fn replace_stored_values(&self, values: HashMap<String, JsonValue>) {
CodeModeService::replace_stored_values(self, values).await;
}
async fn execute(&self, request: ExecuteRequest) -> Result<RuntimeResponse, String> {
CodeModeService::execute(self, request).await
}
async fn wait(&self, request: WaitRequest) -> Result<RuntimeResponse, String> {
CodeModeService::wait(self, request).await
}
fn start_turn_worker(
&self,
host: Arc<dyn CodeModeTurnHost>,
) -> Box<dyn CodeModeTurnWorkerHandle> {
Box::new(CodeModeService::start_turn_worker(self, host))
}
}

View File

@@ -15,9 +15,7 @@ http = { workspace = true }
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true, features = ["macros", "net", "rt", "sync", "time"] }
tokio-tungstenite = { workspace = true }
tungstenite = { workspace = true }
tokio = { workspace = true, features = ["macros", "rt", "sync", "time"] }
tracing = { workspace = true }
eventsource-stream = { workspace = true }
regex-lite = { workspace = true }
@@ -32,5 +30,10 @@ tokio-test = { workspace = true }
wiremock = { workspace = true }
reqwest = { workspace = true }
[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
tokio = { workspace = true, features = ["net"] }
tokio-tungstenite = { workspace = true }
tungstenite = { workspace = true }
[lints]
workspace = true

View File

@@ -1,7 +1,9 @@
pub mod compact;
pub mod memories;
pub mod models;
#[cfg(not(target_arch = "wasm32"))]
pub mod realtime_websocket;
pub mod responses;
#[cfg(not(target_arch = "wasm32"))]
pub mod responses_websocket;
mod session;

View File

@@ -30,20 +30,30 @@ pub use crate::common::response_create_client_metadata;
pub use crate::endpoint::compact::CompactClient;
pub use crate::endpoint::memories::MemoriesClient;
pub use crate::endpoint::models::ModelsClient;
#[cfg(not(target_arch = "wasm32"))]
pub use crate::endpoint::realtime_websocket::RealtimeEventParser;
#[cfg(not(target_arch = "wasm32"))]
pub use crate::endpoint::realtime_websocket::RealtimeSessionConfig;
#[cfg(not(target_arch = "wasm32"))]
pub use crate::endpoint::realtime_websocket::RealtimeSessionMode;
#[cfg(not(target_arch = "wasm32"))]
pub use crate::endpoint::realtime_websocket::RealtimeWebsocketClient;
#[cfg(not(target_arch = "wasm32"))]
pub use crate::endpoint::realtime_websocket::RealtimeWebsocketConnection;
pub use crate::endpoint::responses::ResponsesClient;
pub use crate::endpoint::responses::ResponsesOptions;
#[cfg(not(target_arch = "wasm32"))]
pub use crate::endpoint::responses_websocket::ResponsesWebsocketClient;
#[cfg(not(target_arch = "wasm32"))]
pub use crate::endpoint::responses_websocket::ResponsesWebsocketConnection;
pub use crate::error::ApiError;
pub use crate::provider::Provider;
pub use crate::provider::is_azure_responses_wire_base_url;
pub use crate::sse::stream_from_fixture;
pub use crate::telemetry::SseTelemetry;
#[cfg(not(target_arch = "wasm32"))]
pub use crate::telemetry::WebsocketTelemetry;
#[cfg(not(target_arch = "wasm32"))]
pub use codex_protocol::protocol::RealtimeAudioFrame;
#[cfg(not(target_arch = "wasm32"))]
pub use codex_protocol::protocol::RealtimeEvent;

View File

@@ -11,7 +11,9 @@ use std::future::Future;
use std::sync::Arc;
use std::time::Duration;
use tokio::time::Instant;
#[cfg(not(target_arch = "wasm32"))]
use tokio_tungstenite::tungstenite::Error;
#[cfg(not(target_arch = "wasm32"))]
use tokio_tungstenite::tungstenite::Message;
/// Generic telemetry.
@@ -31,6 +33,7 @@ pub trait SseTelemetry: Send + Sync {
);
}
#[cfg(not(target_arch = "wasm32"))]
/// Telemetry for Responses WebSocket transport.
pub trait WebsocketTelemetry: Send + Sync {
fn on_ws_request(&self, duration: Duration, error: Option<&ApiError>, connection_reused: bool);

View File

@@ -47,10 +47,8 @@ codex-network-proxy = { workspace = true }
codex-otel = { workspace = true }
codex-plugin = { workspace = true }
codex-protocol = { workspace = true }
codex-rollout = { workspace = true }
codex-rmcp-client = { workspace = true }
codex-sandboxing = { workspace = true }
codex-state = { workspace = true }
codex-terminal-detection = { workspace = true }
codex-tools = { workspace = true }
codex-utils-absolute-path = { workspace = true }
@@ -107,7 +105,6 @@ tokio = { workspace = true, features = [
"signal",
] }
tokio-util = { workspace = true, features = ["rt"] }
tokio-tungstenite = { workspace = true }
toml = { workspace = true }
toml_edit = { workspace = true }
tracing = { workspace = true, features = ["log"] }
@@ -124,6 +121,11 @@ seccompiler = { workspace = true }
[target.'cfg(target_os = "macos")'.dependencies]
core-foundation = "0.9"
[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
codex-rollout = { workspace = true }
codex-state = { workspace = true }
tokio-tungstenite = { workspace = true }
# Build OpenSSL from source for musl builds.
[target.x86_64-unknown-linux-musl.dependencies]
openssl-sys = { workspace = true, features = ["vendored"] }

View File

@@ -48,10 +48,13 @@ use codex_api::ResponseCreateWsRequest;
use codex_api::ResponsesApiRequest;
use codex_api::ResponsesClient as ApiResponsesClient;
use codex_api::ResponsesOptions as ApiResponsesOptions;
#[cfg(not(target_arch = "wasm32"))]
use codex_api::ResponsesWebsocketClient as ApiWebSocketResponsesClient;
#[cfg(not(target_arch = "wasm32"))]
use codex_api::ResponsesWebsocketConnection as ApiWebSocketConnection;
use codex_api::SseTelemetry;
use codex_api::TransportError;
#[cfg(not(target_arch = "wasm32"))]
use codex_api::WebsocketTelemetry;
use codex_api::build_conversation_headers;
use codex_api::common::Reasoning;
@@ -84,8 +87,11 @@ use std::time::Duration;
use std::time::Instant;
use tokio::sync::mpsc;
use tokio::sync::oneshot;
#[cfg(not(target_arch = "wasm32"))]
use tokio::sync::oneshot::error::TryRecvError;
#[cfg(not(target_arch = "wasm32"))]
use tokio_tungstenite::tungstenite::Error;
#[cfg(not(target_arch = "wasm32"))]
use tokio_tungstenite::tungstenite::Message;
use tracing::instrument;
use tracing::trace;
@@ -118,6 +124,7 @@ pub const X_CODEX_TURN_STATE_HEADER: &str = "x-codex-turn-state";
pub const X_CODEX_TURN_METADATA_HEADER: &str = "x-codex-turn-metadata";
pub const X_RESPONSESAPI_INCLUDE_TIMING_METRICS_HEADER: &str =
"x-responsesapi-include-timing-metrics";
#[cfg(not(target_arch = "wasm32"))]
const RESPONSES_WEBSOCKETS_V2_BETA_HEADER_VALUE: &str = "responses_websockets=2026-02-06";
const RESPONSES_ENDPOINT: &str = "/responses";
const RESPONSES_COMPACT_ENDPOINT: &str = "/responses/compact";
@@ -219,6 +226,7 @@ struct LastResponse {
#[derive(Debug, Default)]
struct WebsocketSession {
#[cfg(not(target_arch = "wasm32"))]
connection: Option<ApiWebSocketConnection>,
last_request: Option<ResponsesApiRequest>,
last_response_rx: Option<oneshot::Receiver<LastResponse>>,
@@ -241,6 +249,7 @@ impl WebsocketSession {
}
}
#[cfg(not(target_arch = "wasm32"))]
enum WebsocketStreamOutcome {
Stream(ResponseStream),
FallbackToHttp,
@@ -514,6 +523,15 @@ impl ModelClient {
/// Returns whether the Responses-over-WebSocket transport is active for this session.
///
/// WebSocket use is controlled by provider capability and session-scoped fallback state.
#[cfg(target_arch = "wasm32")]
pub fn responses_websocket_enabled(&self) -> bool {
false
}
/// Returns whether the Responses-over-WebSocket transport is active for this session.
///
/// WebSocket use is controlled by provider capability and session-scoped fallback state.
#[cfg(not(target_arch = "wasm32"))]
pub fn responses_websocket_enabled(&self) -> bool {
if !self.state.provider.supports_websockets
|| self.state.disable_websockets.load(Ordering::Relaxed)
@@ -551,6 +569,7 @@ impl ModelClient {
/// Both startup prewarm and in-turn `needs_new` reconnects call this path so handshake
/// behavior remains consistent across both flows.
#[allow(clippy::too_many_arguments)]
#[cfg(not(target_arch = "wasm32"))]
async fn connect_websocket(
&self,
session_telemetry: &SessionTelemetry,
@@ -638,6 +657,7 @@ impl ModelClient {
///
/// Callers should pass the current turn-state lock when available so sticky-routing state is
/// replayed on reconnect within the same turn.
#[cfg(not(target_arch = "wasm32"))]
fn build_websocket_headers(
&self,
turn_state: Option<&Arc<OnceLock<String>>>,
@@ -678,13 +698,21 @@ impl Drop for ModelClientSession {
impl ModelClientSession {
fn reset_websocket_session(&mut self) {
self.websocket_session.connection = None;
self.reset_websocket_connection();
self.websocket_session.last_request = None;
self.websocket_session.last_response_rx = None;
self.websocket_session
.set_connection_reused(/*connection_reused*/ false);
}
#[cfg(not(target_arch = "wasm32"))]
fn reset_websocket_connection(&mut self) {
self.websocket_session.connection = None;
}
#[cfg(target_arch = "wasm32")]
fn reset_websocket_connection(&mut self) {}
fn build_responses_request(
&self,
provider: &codex_api::Provider,
@@ -817,6 +845,7 @@ impl ModelClientSession {
}
}
#[cfg(not(target_arch = "wasm32"))]
fn get_last_response(&mut self) -> Option<LastResponse> {
self.websocket_session
.last_response_rx
@@ -827,6 +856,7 @@ impl ModelClientSession {
})
}
#[cfg(not(target_arch = "wasm32"))]
fn prepare_websocket_request(
&mut self,
payload: ResponseCreateWsRequest,
@@ -858,6 +888,7 @@ impl ModelClientSession {
/// Opportunistically preconnects a websocket for this turn-scoped client session.
///
/// This performs only connection setup; it never sends prompt payloads.
#[cfg(not(target_arch = "wasm32"))]
pub async fn preconnect_websocket(
&mut self,
session_telemetry: &SessionTelemetry,
@@ -897,7 +928,18 @@ impl ModelClientSession {
.set_connection_reused(/*connection_reused*/ false);
Ok(())
}
#[cfg(target_arch = "wasm32")]
pub async fn preconnect_websocket(
&mut self,
_session_telemetry: &SessionTelemetry,
_model_info: &ModelInfo,
) -> std::result::Result<(), ApiError> {
Ok(())
}
/// Returns a websocket connection for this turn.
#[cfg(not(target_arch = "wasm32"))]
#[instrument(
name = "model_client.websocket_connection",
level = "info",
@@ -1085,6 +1127,7 @@ impl ModelClientSession {
/// Streams a turn via the Responses API over WebSocket transport.
#[allow(clippy::too_many_arguments)]
#[cfg(not(target_arch = "wasm32"))]
#[instrument(
name = "model_client.stream_responses_websocket",
level = "info",
@@ -1218,6 +1261,7 @@ impl ModelClientSession {
}
/// Builds telemetry for the Responses API WebSocket transport.
#[cfg(not(target_arch = "wasm32"))]
fn build_websocket_telemetry(
session_telemetry: &SessionTelemetry,
auth_context: AuthRequestTelemetryContext,
@@ -1235,6 +1279,7 @@ impl ModelClientSession {
}
#[allow(clippy::too_many_arguments)]
#[cfg(not(target_arch = "wasm32"))]
pub async fn prewarm_websocket(
&mut self,
prompt: &Prompt,
@@ -1285,6 +1330,21 @@ impl ModelClientSession {
}
}
#[allow(clippy::too_many_arguments)]
#[cfg(target_arch = "wasm32")]
pub async fn prewarm_websocket(
&mut self,
_prompt: &Prompt,
_model_info: &ModelInfo,
_session_telemetry: &SessionTelemetry,
_effort: Option<ReasoningEffortConfig>,
_summary: ReasoningSummaryConfig,
_service_tier: Option<ServiceTier>,
_turn_metadata_header: Option<&str>,
) -> Result<()> {
Ok(())
}
#[allow(clippy::too_many_arguments)]
/// Streams a single model request within the current turn.
///
@@ -1305,6 +1365,7 @@ impl ModelClientSession {
let wire_api = self.client.state.provider.wire_api;
match wire_api {
WireApi::Responses => {
#[cfg(not(target_arch = "wasm32"))]
if self.client.responses_websocket_enabled() {
let request_trace = current_span_w3c_trace_context();
match self
@@ -1369,6 +1430,7 @@ fn parse_turn_metadata_header(turn_metadata_header: Option<&str>) -> Option<Head
turn_metadata_header.and_then(|value| HeaderValue::from_str(value).ok())
}
#[cfg(not(target_arch = "wasm32"))]
fn build_ws_client_metadata(turn_metadata_header: Option<&str>) -> Option<HashMap<String, String>> {
let turn_metadata_header = parse_turn_metadata_header(turn_metadata_header)?;
let turn_metadata = turn_metadata_header.to_str().ok()?.to_string();
@@ -1547,6 +1609,7 @@ impl AuthRequestTelemetryContext {
}
}
#[cfg(not(target_arch = "wasm32"))]
struct WebsocketConnectParams<'a> {
session_telemetry: &'a SessionTelemetry,
api_provider: codex_api::Provider,
@@ -1774,6 +1837,7 @@ impl SseTelemetry for ApiTelemetry {
}
}
#[cfg(not(target_arch = "wasm32"))]
impl WebsocketTelemetry for ApiTelemetry {
fn on_ws_request(&self, duration: Duration, error: Option<&ApiError>, connection_reused: bool) {
let error_message = error.map(telemetry_api_error_message);

View File

@@ -422,6 +422,7 @@ pub(crate) struct CodexSpawnArgs {
pub(crate) inherited_shell_snapshot: Option<Arc<ShellSnapshot>>,
pub(crate) inherited_exec_policy: Option<Arc<ExecPolicyManager>>,
pub(crate) user_shell_override: Option<shell::Shell>,
pub(crate) code_mode_runtime: Option<Arc<dyn codex_code_mode::CodeModeRuntime>>,
pub(crate) parent_trace: Option<W3cTraceContext>,
}
@@ -476,6 +477,7 @@ impl Codex {
inherited_shell_snapshot,
user_shell_override,
inherited_exec_policy,
code_mode_runtime,
parent_trace: _,
} = args;
let (tx_sub, rx_sub) = async_channel::bounded(SUBMISSION_CHANNEL_CAPACITY);
@@ -660,6 +662,7 @@ impl Codex {
mcp_manager.clone(),
skills_watcher,
agent_control,
code_mode_runtime,
)
.await
.map_err(|e| {
@@ -1478,6 +1481,7 @@ impl Session {
mcp_manager: Arc<McpManager>,
skills_watcher: Arc<SkillsWatcher>,
agent_control: AgentControl,
code_mode_runtime: Option<Arc<dyn codex_code_mode::CodeModeRuntime>>,
) -> anyhow::Result<Arc<Self>> {
debug!(
"Configuring session: model={}; provider={:?}",
@@ -1903,8 +1907,9 @@ impl Session {
config.features.enabled(Feature::RuntimeMetrics),
Self::build_model_client_beta_features_header(config.as_ref()),
),
code_mode_service: crate::tools::code_mode::CodeModeService::new(
config.js_repl_node_path.clone(),
code_mode_service: code_mode_runtime.map_or_else(
|| crate::tools::code_mode::CodeModeService::new(config.js_repl_node_path.clone()),
crate::tools::code_mode::CodeModeService::from_runtime,
),
environment: environment_manager.current().await?,
};

View File

@@ -93,6 +93,7 @@ pub(crate) async fn run_codex_thread_interactive(
inherited_shell_snapshot: None,
user_shell_override: None,
inherited_exec_policy: Some(Arc::clone(&parent_session.services.exec_policy)),
code_mode_runtime: None,
parent_trace: None,
})
.await?;

View File

@@ -456,6 +456,7 @@ async fn guardian_subagent_does_not_inherit_parent_exec_policy_rules() {
inherited_shell_snapshot: None,
inherited_exec_policy: Some(Arc::new(parent_exec_policy)),
user_shell_override: None,
code_mode_runtime: None,
parent_trace: None,
})
.await

View File

@@ -153,7 +153,9 @@ pub mod shell;
pub mod shell_snapshot;
pub mod spawn;
pub mod state_db_bridge;
pub use codex_rollout::state_db;
pub mod state_db {
pub use crate::state_db_bridge::*;
}
mod thread_rollout_truncation;
mod tools;
pub mod turn_diff_tracker;
@@ -201,6 +203,7 @@ pub use client_common::Prompt;
pub use client_common::REVIEW_PROMPT;
pub use client_common::ResponseEvent;
pub use client_common::ResponseStream;
pub use codex_code_mode::CodeModeRuntime;
pub use codex_sandboxing::get_platform_sandbox;
pub use codex_tools::parse_tool_input_schema;
pub use compact::content_items_to_text;

View File

@@ -1,63 +1,476 @@
use crate::config::Config;
pub use codex_rollout::ARCHIVED_SESSIONS_SUBDIR;
pub use codex_rollout::INTERACTIVE_SESSION_SOURCES;
pub use codex_rollout::RolloutRecorder;
pub use codex_rollout::RolloutRecorderParams;
pub use codex_rollout::SESSIONS_SUBDIR;
pub use codex_rollout::SessionMeta;
pub use codex_rollout::append_thread_name;
pub use codex_rollout::find_archived_thread_path_by_id_str;
#[deprecated(note = "use find_thread_path_by_id_str")]
pub use codex_rollout::find_conversation_path_by_id_str;
pub use codex_rollout::find_thread_name_by_id;
pub use codex_rollout::find_thread_path_by_id_str;
pub use codex_rollout::find_thread_path_by_name_str;
pub use codex_rollout::rollout_date_parts;
#[cfg(not(target_arch = "wasm32"))]
mod native {
use crate::config::Config;
impl codex_rollout::RolloutConfigView for Config {
fn codex_home(&self) -> &std::path::Path {
self.codex_home.as_path()
pub use codex_rollout::ARCHIVED_SESSIONS_SUBDIR;
pub use codex_rollout::INTERACTIVE_SESSION_SOURCES;
pub use codex_rollout::RolloutRecorder;
pub use codex_rollout::RolloutRecorderParams;
pub use codex_rollout::SESSIONS_SUBDIR;
pub use codex_rollout::SessionMeta;
pub use codex_rollout::append_thread_name;
pub use codex_rollout::find_archived_thread_path_by_id_str;
#[deprecated(note = "use find_thread_path_by_id_str")]
pub use codex_rollout::find_conversation_path_by_id_str;
pub use codex_rollout::find_thread_name_by_id;
pub use codex_rollout::find_thread_path_by_id_str;
pub use codex_rollout::find_thread_path_by_name_str;
pub use codex_rollout::rollout_date_parts;
impl codex_rollout::RolloutConfigView for Config {
fn codex_home(&self) -> &std::path::Path {
self.codex_home.as_path()
}
fn sqlite_home(&self) -> &std::path::Path {
self.sqlite_home.as_path()
}
fn cwd(&self) -> &std::path::Path {
self.cwd.as_path()
}
fn model_provider_id(&self) -> &str {
self.model_provider_id.as_str()
}
fn generate_memories(&self) -> bool {
self.memories.generate_memories
}
}
fn sqlite_home(&self) -> &std::path::Path {
self.sqlite_home.as_path()
pub mod list {
pub use codex_rollout::list::*;
}
fn cwd(&self) -> &std::path::Path {
self.cwd.as_path()
pub(crate) mod metadata {
pub(crate) use codex_rollout::metadata::builder_from_items;
}
fn model_provider_id(&self) -> &str {
self.model_provider_id.as_str()
pub mod policy {
pub use codex_rollout::policy::*;
}
fn generate_memories(&self) -> bool {
self.memories.generate_memories
pub mod recorder {
pub use codex_rollout::recorder::*;
}
pub mod session_index {
pub use codex_rollout::session_index::*;
}
pub(crate) use crate::session_rollout_init_error::map_session_init_error;
pub(crate) mod truncation {
pub(crate) use crate::thread_rollout_truncation::*;
}
}
pub mod list {
pub use codex_rollout::list::*;
#[cfg(target_arch = "wasm32")]
mod wasm {
use std::collections::HashMap;
use std::collections::HashSet;
use std::ffi::OsStr;
use std::io;
use std::path::Path;
use std::path::PathBuf;
use std::sync::LazyLock;
use codex_protocol::ThreadId;
use codex_protocol::dynamic_tools::DynamicToolSpec;
use codex_protocol::models::BaseInstructions;
use codex_protocol::protocol::InitialHistory;
use codex_protocol::protocol::RolloutItem;
use codex_protocol::protocol::SessionMeta;
use codex_protocol::protocol::SessionMetaLine;
use codex_protocol::protocol::SessionSource;
use crate::config::Config;
use crate::state_db::StateDbHandle;
pub const SESSIONS_SUBDIR: &str = "sessions";
pub const ARCHIVED_SESSIONS_SUBDIR: &str = "archived_sessions";
pub static INTERACTIVE_SESSION_SOURCES: LazyLock<Vec<SessionSource>> = LazyLock::new(|| {
vec![
SessionSource::Cli,
SessionSource::VSCode,
SessionSource::Custom("atlas".to_string()),
SessionSource::Custom("chatgpt".to_string()),
]
});
pub use session_index::append_thread_name;
pub use session_index::find_thread_name_by_id;
pub use session_index::find_thread_path_by_name_str;
#[deprecated(note = "use find_thread_path_by_id_str")]
pub async fn find_conversation_path_by_id_str(
codex_home: &Path,
id_str: &str,
) -> io::Result<Option<PathBuf>> {
find_thread_path_by_id_str(codex_home, id_str).await
}
pub async fn find_thread_path_by_id_str(
_codex_home: &Path,
_id_str: &str,
) -> io::Result<Option<PathBuf>> {
Ok(None)
}
pub async fn find_archived_thread_path_by_id_str(
_codex_home: &Path,
_id_str: &str,
) -> io::Result<Option<PathBuf>> {
Ok(None)
}
pub fn rollout_date_parts(_file_name: &OsStr) -> Option<(String, String, String)> {
None
}
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
pub enum EventPersistenceMode {
#[default]
Limited,
Extended,
}
pub mod policy {
pub use super::EventPersistenceMode;
}
#[derive(Clone)]
pub struct RolloutRecorder {
rollout_path: PathBuf,
state_db: Option<StateDbHandle>,
_event_persistence_mode: EventPersistenceMode,
}
#[derive(Clone)]
pub enum RolloutRecorderParams {
Create {
conversation_id: ThreadId,
forked_from_id: Option<ThreadId>,
source: SessionSource,
base_instructions: BaseInstructions,
dynamic_tools: Vec<DynamicToolSpec>,
event_persistence_mode: EventPersistenceMode,
},
Resume {
path: PathBuf,
event_persistence_mode: EventPersistenceMode,
},
}
impl RolloutRecorderParams {
pub fn new(
conversation_id: ThreadId,
forked_from_id: Option<ThreadId>,
source: SessionSource,
base_instructions: BaseInstructions,
dynamic_tools: Vec<DynamicToolSpec>,
event_persistence_mode: EventPersistenceMode,
) -> Self {
Self::Create {
conversation_id,
forked_from_id,
source,
base_instructions,
dynamic_tools,
event_persistence_mode,
}
}
pub fn resume(path: PathBuf, event_persistence_mode: EventPersistenceMode) -> Self {
Self::Resume {
path,
event_persistence_mode,
}
}
}
impl RolloutRecorder {
#[allow(clippy::too_many_arguments)]
pub async fn list_threads(
_config: &Config,
_page_size: usize,
_cursor: Option<&list::Cursor>,
_sort_key: list::ThreadSortKey,
_allowed_sources: &[SessionSource],
_model_providers: Option<&[String]>,
_default_provider: &str,
_search_term: Option<&str>,
) -> io::Result<list::ThreadsPage> {
Ok(list::ThreadsPage::default())
}
#[allow(clippy::too_many_arguments)]
pub async fn list_archived_threads(
_config: &Config,
_page_size: usize,
_cursor: Option<&list::Cursor>,
_sort_key: list::ThreadSortKey,
_allowed_sources: &[SessionSource],
_model_providers: Option<&[String]>,
_default_provider: &str,
_search_term: Option<&str>,
) -> io::Result<list::ThreadsPage> {
Ok(list::ThreadsPage::default())
}
pub async fn new(
config: &Config,
params: RolloutRecorderParams,
state_db: Option<StateDbHandle>,
_state_builder: Option<metadata::ThreadMetadataBuilder>,
) -> io::Result<Self> {
let rollout_path = match params {
RolloutRecorderParams::Create {
conversation_id, ..
} => config
.codex_home
.join(SESSIONS_SUBDIR)
.join(format!("browser-{}.jsonl", conversation_id)),
RolloutRecorderParams::Resume { path, .. } => path,
};
Ok(Self {
rollout_path,
state_db,
_event_persistence_mode: EventPersistenceMode::Limited,
})
}
pub fn rollout_path(&self) -> &Path {
self.rollout_path.as_path()
}
pub fn state_db(&self) -> Option<StateDbHandle> {
self.state_db.clone()
}
pub async fn record_items(&self, _items: &[RolloutItem]) -> io::Result<()> {
Ok(())
}
pub async fn persist(&self) -> io::Result<()> {
Ok(())
}
pub async fn flush(&self) -> io::Result<()> {
Ok(())
}
pub async fn load_rollout_items(
_path: &Path,
) -> io::Result<(Vec<RolloutItem>, Option<ThreadId>, usize)> {
Err(io::Error::other(
"rollout loading is unavailable on wasm32 without a browser persistence backend",
))
}
pub async fn get_rollout_history(_path: &Path) -> io::Result<InitialHistory> {
Err(io::Error::other(
"rollout history is unavailable on wasm32 without a browser persistence backend",
))
}
pub async fn shutdown(&self) -> io::Result<()> {
Ok(())
}
}
pub mod recorder {
pub use super::RolloutRecorder;
pub use super::RolloutRecorderParams;
}
pub mod metadata {
use std::path::Path;
use codex_protocol::protocol::RolloutItem;
#[derive(Clone, Debug)]
pub struct ThreadMetadataBuilder;
pub fn builder_from_items(
_items: &[RolloutItem],
_rollout_path: &Path,
) -> Option<ThreadMetadataBuilder> {
None
}
}
pub mod list {
use std::io;
use std::path::Path;
use std::path::PathBuf;
use codex_protocol::ThreadId;
use codex_protocol::protocol::SessionMetaLine;
use codex_protocol::protocol::SessionSource;
#[derive(Debug, Default, PartialEq)]
pub struct ThreadsPage {
pub items: Vec<ThreadItem>,
pub next_cursor: Option<Cursor>,
pub num_scanned_files: usize,
pub reached_scan_cap: bool,
}
#[derive(Debug, PartialEq, Default)]
pub struct ThreadItem {
pub path: PathBuf,
pub thread_id: Option<ThreadId>,
pub first_user_message: Option<String>,
pub cwd: Option<PathBuf>,
pub git_branch: Option<String>,
pub git_sha: Option<String>,
pub git_origin_url: Option<String>,
pub source: Option<SessionSource>,
pub agent_nickname: Option<String>,
pub agent_role: Option<String>,
pub model_provider: Option<String>,
pub cli_version: Option<String>,
pub created_at: Option<String>,
pub updated_at: Option<String>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ThreadSortKey {
CreatedAt,
UpdatedAt,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ThreadListLayout {
NestedByDate,
Flat,
}
pub struct ThreadListConfig<'a> {
pub allowed_sources: &'a [SessionSource],
pub model_providers: Option<&'a [String]>,
pub default_provider: &'a str,
pub layout: ThreadListLayout,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Cursor(String);
pub fn parse_cursor(cursor: &str) -> Option<Cursor> {
if cursor.is_empty() {
None
} else {
Some(Cursor(cursor.to_string()))
}
}
pub async fn get_threads(
_codex_home: &Path,
_page_size: usize,
_cursor: Option<&Cursor>,
_sort_key: ThreadSortKey,
_allowed_sources: &[SessionSource],
_model_providers: Option<&[String]>,
_default_provider: &str,
) -> io::Result<ThreadsPage> {
Ok(ThreadsPage::default())
}
pub async fn get_threads_in_root(
_root: PathBuf,
_page_size: usize,
_cursor: Option<&Cursor>,
_sort_key: ThreadSortKey,
_config: ThreadListConfig<'_>,
) -> io::Result<ThreadsPage> {
Ok(ThreadsPage::default())
}
pub async fn read_head_for_summary(_path: &Path) -> io::Result<Vec<serde_json::Value>> {
Ok(Vec::new())
}
pub async fn read_session_meta_line(_path: &Path) -> io::Result<SessionMetaLine> {
Ok(SessionMetaLine {
meta: Default::default(),
git: None,
})
}
pub async fn find_thread_path_by_id_str(
_codex_home: &Path,
_id_str: &str,
) -> io::Result<Option<PathBuf>> {
Ok(None)
}
pub async fn find_archived_thread_path_by_id_str(
_codex_home: &Path,
_id_str: &str,
) -> io::Result<Option<PathBuf>> {
Ok(None)
}
pub fn rollout_date_parts(
_file_name: &std::ffi::OsStr,
) -> Option<(String, String, String)> {
None
}
}
pub mod session_index {
use std::collections::HashMap;
use std::collections::HashSet;
use std::io;
use std::path::Path;
use std::path::PathBuf;
use codex_protocol::ThreadId;
pub async fn append_thread_name(
_codex_home: &Path,
_thread_id: ThreadId,
_name: &str,
) -> io::Result<()> {
Ok(())
}
pub async fn find_thread_name_by_id(
_codex_home: &Path,
_thread_id: &ThreadId,
) -> io::Result<Option<String>> {
Ok(None)
}
pub async fn find_thread_names_by_ids(
_codex_home: &Path,
_thread_ids: &HashSet<ThreadId>,
) -> io::Result<HashMap<ThreadId, String>> {
Ok(HashMap::new())
}
pub async fn find_thread_path_by_name_str(
_codex_home: &Path,
_name: &str,
) -> io::Result<Option<PathBuf>> {
Ok(None)
}
}
pub(crate) fn map_session_init_error(
err: &anyhow::Error,
_codex_home: &Path,
) -> crate::error::CodexErr {
crate::error::CodexErr::Fatal(err.to_string())
}
pub(crate) mod truncation {}
}
pub(crate) mod metadata {
pub(crate) use codex_rollout::metadata::builder_from_items;
}
#[cfg(not(target_arch = "wasm32"))]
pub use native::*;
pub mod policy {
pub use codex_rollout::policy::*;
}
pub mod recorder {
pub use codex_rollout::recorder::*;
}
pub mod session_index {
pub use codex_rollout::session_index::*;
}
pub(crate) use crate::session_rollout_init_error::map_session_init_error;
pub(crate) mod truncation {
pub(crate) use crate::thread_rollout_truncation::*;
}
#[cfg(target_arch = "wasm32")]
pub use wasm::*;

View File

@@ -1,21 +1,189 @@
#[cfg(not(target_arch = "wasm32"))]
use codex_rollout::state_db as rollout_state_db;
pub use codex_rollout::state_db::StateDbHandle;
pub use codex_rollout::state_db::apply_rollout_items;
pub use codex_rollout::state_db::find_rollout_path_by_id;
pub use codex_rollout::state_db::get_dynamic_tools;
pub use codex_rollout::state_db::list_thread_ids_db;
pub use codex_rollout::state_db::list_threads_db;
pub use codex_rollout::state_db::mark_thread_memory_mode_polluted;
pub use codex_rollout::state_db::normalize_cwd_for_state_db;
pub use codex_rollout::state_db::open_if_present;
pub use codex_rollout::state_db::persist_dynamic_tools;
pub use codex_rollout::state_db::read_repair_rollout_path;
pub use codex_rollout::state_db::reconcile_rollout;
pub use codex_rollout::state_db::touch_thread_updated_at;
pub use codex_state::LogEntry;
use crate::config::Config;
#[cfg(not(target_arch = "wasm32"))]
pub use codex_rollout::state_db::StateDbHandle;
#[cfg(not(target_arch = "wasm32"))]
pub use codex_rollout::state_db::apply_rollout_items;
#[cfg(not(target_arch = "wasm32"))]
pub use codex_rollout::state_db::find_rollout_path_by_id;
#[cfg(not(target_arch = "wasm32"))]
pub use codex_rollout::state_db::get_dynamic_tools;
#[cfg(not(target_arch = "wasm32"))]
pub use codex_rollout::state_db::init;
#[cfg(not(target_arch = "wasm32"))]
pub use codex_rollout::state_db::list_thread_ids_db;
#[cfg(not(target_arch = "wasm32"))]
pub use codex_rollout::state_db::list_threads_db;
#[cfg(not(target_arch = "wasm32"))]
pub use codex_rollout::state_db::mark_thread_memory_mode_polluted;
#[cfg(not(target_arch = "wasm32"))]
pub use codex_rollout::state_db::normalize_cwd_for_state_db;
#[cfg(not(target_arch = "wasm32"))]
pub use codex_rollout::state_db::open_if_present;
#[cfg(not(target_arch = "wasm32"))]
pub use codex_rollout::state_db::persist_dynamic_tools;
#[cfg(not(target_arch = "wasm32"))]
pub use codex_rollout::state_db::read_repair_rollout_path;
#[cfg(not(target_arch = "wasm32"))]
pub use codex_rollout::state_db::reconcile_rollout;
#[cfg(not(target_arch = "wasm32"))]
pub use codex_rollout::state_db::touch_thread_updated_at;
#[cfg(not(target_arch = "wasm32"))]
pub use codex_state::LogEntry;
#[cfg(not(target_arch = "wasm32"))]
pub async fn get_state_db(config: &Config) -> Option<StateDbHandle> {
rollout_state_db::get_state_db(config).await
}
#[cfg(target_arch = "wasm32")]
pub mod wasm {
use std::path::Path;
use std::path::PathBuf;
use std::sync::Arc;
use chrono::DateTime;
use chrono::Utc;
use codex_protocol::ThreadId;
use codex_protocol::dynamic_tools::DynamicToolSpec;
use codex_protocol::protocol::RolloutItem;
use codex_protocol::protocol::SessionSource;
use crate::config::Config;
use crate::rollout::list::Cursor;
use crate::rollout::list::ThreadSortKey;
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct StateRuntimeStub;
pub type StateDbHandle = Arc<StateRuntimeStub>;
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct LogEntry;
pub async fn init(_config: &Config) -> Option<StateDbHandle> {
None
}
pub async fn get_state_db(_config: &Config) -> Option<StateDbHandle> {
None
}
pub async fn open_if_present(
_codex_home: &Path,
_default_provider: &str,
) -> Option<StateDbHandle> {
None
}
pub async fn find_rollout_path_by_id(
_context: Option<&StateRuntimeStub>,
_thread_id: ThreadId,
_archived_only: Option<bool>,
_stage: &str,
) -> Option<PathBuf> {
None
}
pub async fn get_dynamic_tools(
_context: Option<&StateRuntimeStub>,
_thread_id: ThreadId,
_stage: &str,
) -> Option<Vec<DynamicToolSpec>> {
None
}
pub async fn list_thread_ids_db(
_context: Option<&StateRuntimeStub>,
_codex_home: &Path,
_page_size: usize,
_cursor: Option<&Cursor>,
_sort_key: ThreadSortKey,
_allowed_sources: &[SessionSource],
_model_providers: Option<&[String]>,
_archived_only: bool,
_stage: &str,
) -> Option<Vec<ThreadId>> {
None
}
pub async fn list_threads_db(
_context: Option<&StateRuntimeStub>,
_codex_home: &Path,
_page_size: usize,
_cursor: Option<&Cursor>,
_sort_key: ThreadSortKey,
_allowed_sources: &[SessionSource],
_model_providers: Option<&[String]>,
_archived: bool,
_search_term: Option<&str>,
) -> Option<crate::rollout::list::ThreadsPage> {
None
}
pub fn normalize_cwd_for_state_db(cwd: &Path) -> PathBuf {
cwd.to_path_buf()
}
pub async fn persist_dynamic_tools(
_context: Option<&StateRuntimeStub>,
_thread_id: ThreadId,
_tools: Option<&[DynamicToolSpec]>,
_stage: &str,
) {
}
pub async fn mark_thread_memory_mode_polluted(
_context: Option<&StateRuntimeStub>,
_thread_id: ThreadId,
_stage: &str,
) {
}
pub async fn reconcile_rollout(
_context: Option<&StateRuntimeStub>,
_rollout_path: &Path,
_default_provider: &str,
_builder: Option<&()>,
_items: &[RolloutItem],
_archived_only: Option<bool>,
_new_thread_memory_mode: Option<&str>,
) {
}
pub async fn read_repair_rollout_path(
_context: Option<&StateRuntimeStub>,
_thread_id: Option<ThreadId>,
_archived_only: Option<bool>,
_rollout_path: &Path,
) {
}
#[allow(clippy::too_many_arguments)]
pub async fn apply_rollout_items(
_context: Option<&StateRuntimeStub>,
_rollout_path: &Path,
_default_provider: &str,
_builder: Option<&()>,
_items: &[RolloutItem],
_stage: &str,
_new_thread_memory_mode: Option<&str>,
_updated_at_override: Option<DateTime<Utc>>,
) {
}
pub async fn touch_thread_updated_at(
_context: Option<&StateRuntimeStub>,
_thread_id: Option<ThreadId>,
_updated_at: DateTime<Utc>,
_stage: &str,
) -> bool {
false
}
}
#[cfg(target_arch = "wasm32")]
pub use wasm::*;

View File

@@ -447,6 +447,27 @@ impl ThreadManager {
persist_extended_history,
metrics_service_name,
parent_trace,
/*code_mode_runtime*/ None,
/*user_shell_override*/ None,
))
.await
}
pub async fn start_thread_with_code_mode_runtime(
&self,
config: Config,
code_mode_runtime: Arc<dyn crate::CodeModeRuntime>,
) -> CodexResult<NewThread> {
Box::pin(self.state.spawn_thread(
config,
InitialHistory::New,
Arc::clone(&self.state.auth_manager),
self.agent_control(),
Vec::new(),
/*persist_extended_history*/ false,
/*metrics_service_name*/ None,
/*parent_trace*/ None,
Some(code_mode_runtime),
/*user_shell_override*/ None,
))
.await
@@ -487,6 +508,7 @@ impl ThreadManager {
persist_extended_history,
/*metrics_service_name*/ None,
parent_trace,
/*code_mode_runtime*/ None,
/*user_shell_override*/ None,
))
.await
@@ -506,6 +528,7 @@ impl ThreadManager {
/*persist_extended_history*/ false,
/*metrics_service_name*/ None,
/*parent_trace*/ None,
/*code_mode_runtime*/ None,
/*user_shell_override*/ Some(user_shell_override),
))
.await
@@ -528,6 +551,7 @@ impl ThreadManager {
/*persist_extended_history*/ false,
/*metrics_service_name*/ None,
/*parent_trace*/ None,
/*code_mode_runtime*/ None,
/*user_shell_override*/ Some(user_shell_override),
))
.await
@@ -635,6 +659,7 @@ impl ThreadManager {
persist_extended_history,
/*metrics_service_name*/ None,
parent_trace,
/*code_mode_runtime*/ None,
/*user_shell_override*/ None,
))
.await
@@ -736,6 +761,7 @@ impl ThreadManagerState {
inherited_shell_snapshot,
inherited_exec_policy,
/*parent_trace*/ None,
/*code_mode_runtime*/ None,
/*user_shell_override*/ None,
))
.await
@@ -763,6 +789,7 @@ impl ThreadManagerState {
inherited_shell_snapshot,
inherited_exec_policy,
/*parent_trace*/ None,
/*code_mode_runtime*/ None,
/*user_shell_override*/ None,
))
.await
@@ -791,6 +818,7 @@ impl ThreadManagerState {
inherited_shell_snapshot,
inherited_exec_policy,
/*parent_trace*/ None,
/*code_mode_runtime*/ None,
/*user_shell_override*/ None,
))
.await
@@ -808,6 +836,7 @@ impl ThreadManagerState {
persist_extended_history: bool,
metrics_service_name: Option<String>,
parent_trace: Option<W3cTraceContext>,
code_mode_runtime: Option<Arc<dyn codex_code_mode::CodeModeRuntime>>,
user_shell_override: Option<crate::shell::Shell>,
) -> CodexResult<NewThread> {
Box::pin(self.spawn_thread_with_source(
@@ -822,6 +851,7 @@ impl ThreadManagerState {
/*inherited_shell_snapshot*/ None,
/*inherited_exec_policy*/ None,
parent_trace,
code_mode_runtime,
user_shell_override,
))
.await
@@ -841,6 +871,7 @@ impl ThreadManagerState {
inherited_shell_snapshot: Option<Arc<ShellSnapshot>>,
inherited_exec_policy: Option<Arc<crate::exec_policy::ExecPolicyManager>>,
parent_trace: Option<W3cTraceContext>,
code_mode_runtime: Option<Arc<dyn codex_code_mode::CodeModeRuntime>>,
user_shell_override: Option<crate::shell::Shell>,
) -> CodexResult<NewThread> {
let watch_registration = self.skills_watcher.register_config(
@@ -868,6 +899,7 @@ impl ThreadManagerState {
inherited_shell_snapshot,
inherited_exec_policy,
user_shell_override,
code_mode_runtime,
parent_trace,
})
.await?;

View File

@@ -6,7 +6,9 @@ use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use codex_code_mode::CodeModeRuntime;
use codex_code_mode::CodeModeTurnHost;
use codex_code_mode::CodeModeTurnWorkerHandle;
use codex_code_mode::RuntimeResponse;
use codex_protocol::models::FunctionCallOutputContentItem;
use codex_protocol::models::FunctionCallOutputPayload;
@@ -48,14 +50,16 @@ pub(crate) struct ExecContext {
}
pub(crate) struct CodeModeService {
inner: codex_code_mode::CodeModeService,
inner: Arc<dyn CodeModeRuntime>,
}
impl CodeModeService {
pub(crate) fn new(_js_repl_node_path: Option<PathBuf>) -> Self {
Self {
inner: codex_code_mode::CodeModeService::new(),
}
Self::from_runtime(Arc::new(codex_code_mode::CodeModeService::new()))
}
pub(crate) fn from_runtime(inner: Arc<dyn CodeModeRuntime>) -> Self {
Self { inner }
}
pub(crate) async fn stored_values(&self) -> std::collections::HashMap<String, JsonValue> {
@@ -89,7 +93,7 @@ impl CodeModeService {
turn: &Arc<TurnContext>,
router: Arc<ToolRouter>,
tracker: SharedTurnDiffTracker,
) -> Option<codex_code_mode::CodeModeTurnWorker> {
) -> Option<Box<dyn CodeModeTurnWorkerHandle>> {
if !turn.features.enabled(Feature::CodeMode) {
return None;
}

View File

@@ -15,17 +15,20 @@ path = "src/bin/codex-exec-server.rs"
workspace = true
[dependencies]
arc-swap = { workspace = true }
async-trait = { workspace = true }
base64 = { workspace = true }
clap = { workspace = true, features = ["derive"] }
codex-app-server-protocol = { workspace = true }
codex-utils-absolute-path = { workspace = true }
codex-utils-pty = { workspace = true }
futures = { workspace = true }
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
thiserror = { workspace = true }
tracing = { workspace = true }
[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
arc-swap = { workspace = true }
clap = { workspace = true, features = ["derive"] }
codex-utils-pty = { workspace = true }
futures = { workspace = true }
tokio = { workspace = true, features = [
"fs",
"io-std",
@@ -38,7 +41,9 @@ tokio = { workspace = true, features = [
"time",
] }
tokio-tungstenite = { workspace = true }
tracing = { workspace = true }
[target.'cfg(target_arch = "wasm32")'.dependencies]
tokio = { workspace = true, features = ["io-util", "macros", "rt", "sync"] }
[dev-dependencies]
anyhow = { workspace = true }

View File

@@ -1,5 +1,7 @@
#[cfg(not(target_arch = "wasm32"))]
use clap::Parser;
#[cfg(not(target_arch = "wasm32"))]
#[derive(Debug, Parser)]
struct ExecServerArgs {
/// Transport endpoint URL. Supported values: `ws://IP:PORT` (default).
@@ -11,8 +13,14 @@ struct ExecServerArgs {
listen: String,
}
#[cfg(not(target_arch = "wasm32"))]
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let args = ExecServerArgs::parse();
codex_exec_server::run_main_with_listen_url(&args.listen).await
}
#[cfg(target_arch = "wasm32")]
fn main() {
panic!("codex-exec-server binary is unavailable on wasm32");
}

View File

@@ -1,67 +1,7 @@
mod client;
mod client_api;
mod connection;
mod environment;
mod file_system;
mod local_file_system;
mod local_process;
mod process;
mod process_id;
mod protocol;
mod remote_file_system;
mod remote_process;
mod rpc;
mod server;
#[cfg(not(target_arch = "wasm32"))]
include!("native.rs");
pub use client::ExecServerClient;
pub use client::ExecServerError;
pub use client_api::ExecServerClientConnectOptions;
pub use client_api::RemoteExecServerConnectArgs;
pub use codex_app_server_protocol::FsCopyParams;
pub use codex_app_server_protocol::FsCopyResponse;
pub use codex_app_server_protocol::FsCreateDirectoryParams;
pub use codex_app_server_protocol::FsCreateDirectoryResponse;
pub use codex_app_server_protocol::FsGetMetadataParams;
pub use codex_app_server_protocol::FsGetMetadataResponse;
pub use codex_app_server_protocol::FsReadDirectoryParams;
pub use codex_app_server_protocol::FsReadDirectoryResponse;
pub use codex_app_server_protocol::FsReadFileParams;
pub use codex_app_server_protocol::FsReadFileResponse;
pub use codex_app_server_protocol::FsRemoveParams;
pub use codex_app_server_protocol::FsRemoveResponse;
pub use codex_app_server_protocol::FsWriteFileParams;
pub use codex_app_server_protocol::FsWriteFileResponse;
pub use environment::CODEX_EXEC_SERVER_URL_ENV_VAR;
pub use environment::Environment;
pub use environment::EnvironmentManager;
pub use environment::ExecutorEnvironment;
pub use file_system::CopyOptions;
pub use file_system::CreateDirectoryOptions;
pub use file_system::ExecutorFileSystem;
pub use file_system::FileMetadata;
pub use file_system::FileSystemResult;
pub use file_system::ReadDirectoryEntry;
pub use file_system::RemoveOptions;
pub use process::ExecBackend;
pub use process::ExecProcess;
pub use process::StartedExecProcess;
pub use process_id::ProcessId;
pub use protocol::ExecClosedNotification;
pub use protocol::ExecExitedNotification;
pub use protocol::ExecOutputDeltaNotification;
pub use protocol::ExecOutputStream;
pub use protocol::ExecParams;
pub use protocol::ExecResponse;
pub use protocol::InitializeParams;
pub use protocol::InitializeResponse;
pub use protocol::ReadParams;
pub use protocol::ReadResponse;
pub use protocol::TerminateParams;
pub use protocol::TerminateResponse;
pub use protocol::WriteParams;
pub use protocol::WriteResponse;
pub use protocol::WriteStatus;
pub use server::DEFAULT_LISTEN_URL;
pub use server::ExecServerListenUrlParseError;
pub use server::run_main;
pub use server::run_main_with_listen_url;
#[cfg(target_arch = "wasm32")]
mod wasm;
#[cfg(target_arch = "wasm32")]
pub use wasm::*;

View File

@@ -0,0 +1,67 @@
mod client;
mod client_api;
mod connection;
mod environment;
mod file_system;
mod local_file_system;
mod local_process;
mod process;
mod process_id;
mod protocol;
mod remote_file_system;
mod remote_process;
mod rpc;
mod server;
pub use client::ExecServerClient;
pub use client::ExecServerError;
pub use client_api::ExecServerClientConnectOptions;
pub use client_api::RemoteExecServerConnectArgs;
pub use codex_app_server_protocol::FsCopyParams;
pub use codex_app_server_protocol::FsCopyResponse;
pub use codex_app_server_protocol::FsCreateDirectoryParams;
pub use codex_app_server_protocol::FsCreateDirectoryResponse;
pub use codex_app_server_protocol::FsGetMetadataParams;
pub use codex_app_server_protocol::FsGetMetadataResponse;
pub use codex_app_server_protocol::FsReadDirectoryParams;
pub use codex_app_server_protocol::FsReadDirectoryResponse;
pub use codex_app_server_protocol::FsReadFileParams;
pub use codex_app_server_protocol::FsReadFileResponse;
pub use codex_app_server_protocol::FsRemoveParams;
pub use codex_app_server_protocol::FsRemoveResponse;
pub use codex_app_server_protocol::FsWriteFileParams;
pub use codex_app_server_protocol::FsWriteFileResponse;
pub use environment::CODEX_EXEC_SERVER_URL_ENV_VAR;
pub use environment::Environment;
pub use environment::EnvironmentManager;
pub use environment::ExecutorEnvironment;
pub use file_system::CopyOptions;
pub use file_system::CreateDirectoryOptions;
pub use file_system::ExecutorFileSystem;
pub use file_system::FileMetadata;
pub use file_system::FileSystemResult;
pub use file_system::ReadDirectoryEntry;
pub use file_system::RemoveOptions;
pub use process::ExecBackend;
pub use process::ExecProcess;
pub use process::StartedExecProcess;
pub use process_id::ProcessId;
pub use protocol::ExecClosedNotification;
pub use protocol::ExecExitedNotification;
pub use protocol::ExecOutputDeltaNotification;
pub use protocol::ExecOutputStream;
pub use protocol::ExecParams;
pub use protocol::ExecResponse;
pub use protocol::InitializeParams;
pub use protocol::InitializeResponse;
pub use protocol::ReadParams;
pub use protocol::ReadResponse;
pub use protocol::TerminateParams;
pub use protocol::TerminateResponse;
pub use protocol::WriteParams;
pub use protocol::WriteResponse;
pub use protocol::WriteStatus;
pub use server::DEFAULT_LISTEN_URL;
pub use server::ExecServerListenUrlParseError;
pub use server::run_main;
pub use server::run_main_with_listen_url;

View File

@@ -0,0 +1,415 @@
#[path = "file_system.rs"]
mod file_system;
#[path = "process_id.rs"]
mod process_id;
#[path = "protocol.rs"]
mod protocol;
use std::io;
use std::sync::Arc;
use async_trait::async_trait;
use tokio::sync::OnceCell;
use tokio::sync::watch;
pub use codex_app_server_protocol::FsCopyParams;
pub use codex_app_server_protocol::FsCopyResponse;
pub use codex_app_server_protocol::FsCreateDirectoryParams;
pub use codex_app_server_protocol::FsCreateDirectoryResponse;
pub use codex_app_server_protocol::FsGetMetadataParams;
pub use codex_app_server_protocol::FsGetMetadataResponse;
pub use codex_app_server_protocol::FsReadDirectoryParams;
pub use codex_app_server_protocol::FsReadDirectoryResponse;
pub use codex_app_server_protocol::FsReadFileParams;
pub use codex_app_server_protocol::FsReadFileResponse;
pub use codex_app_server_protocol::FsRemoveParams;
pub use codex_app_server_protocol::FsRemoveResponse;
pub use codex_app_server_protocol::FsWriteFileParams;
pub use codex_app_server_protocol::FsWriteFileResponse;
pub use file_system::CopyOptions;
pub use file_system::CreateDirectoryOptions;
pub use file_system::ExecutorFileSystem;
pub use file_system::FileMetadata;
pub use file_system::FileSystemResult;
pub use file_system::ReadDirectoryEntry;
pub use file_system::RemoveOptions;
pub use process_id::ProcessId;
pub use protocol::ExecClosedNotification;
pub use protocol::ExecExitedNotification;
pub use protocol::ExecOutputDeltaNotification;
pub use protocol::ExecOutputStream;
pub use protocol::ExecParams;
pub use protocol::ExecResponse;
pub use protocol::InitializeParams;
pub use protocol::InitializeResponse;
pub use protocol::ReadParams;
pub use protocol::ReadResponse;
pub use protocol::TerminateParams;
pub use protocol::TerminateResponse;
pub use protocol::WriteParams;
pub use protocol::WriteResponse;
pub use protocol::WriteStatus;
pub struct StartedExecProcess {
pub process: Arc<dyn ExecProcess>,
}
#[async_trait]
pub trait ExecProcess: Send + Sync {
fn process_id(&self) -> &ProcessId;
fn subscribe_wake(&self) -> watch::Receiver<u64>;
async fn read(
&self,
after_seq: Option<u64>,
max_bytes: Option<usize>,
wait_ms: Option<u64>,
) -> Result<ReadResponse, ExecServerError>;
async fn write(&self, chunk: Vec<u8>) -> Result<WriteResponse, ExecServerError>;
async fn terminate(&self) -> Result<(), ExecServerError>;
}
#[async_trait]
pub trait ExecBackend: Send + Sync {
async fn start(&self, params: ExecParams) -> Result<StartedExecProcess, ExecServerError>;
}
pub const CODEX_EXEC_SERVER_URL_ENV_VAR: &str = "CODEX_EXEC_SERVER_URL";
pub const DEFAULT_LISTEN_URL: &str = "ws://127.0.0.1:0";
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ExecServerClientConnectOptions {
pub client_name: String,
pub initialize_timeout: std::time::Duration,
}
impl Default for ExecServerClientConnectOptions {
fn default() -> Self {
Self {
client_name: "codex-core".to_string(),
initialize_timeout: std::time::Duration::from_secs(10),
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct RemoteExecServerConnectArgs {
pub websocket_url: String,
pub client_name: String,
pub connect_timeout: std::time::Duration,
pub initialize_timeout: std::time::Duration,
}
impl RemoteExecServerConnectArgs {
pub fn new(websocket_url: String, client_name: String) -> Self {
Self {
websocket_url,
client_name,
connect_timeout: std::time::Duration::from_secs(10),
initialize_timeout: std::time::Duration::from_secs(10),
}
}
}
impl From<RemoteExecServerConnectArgs> for ExecServerClientConnectOptions {
fn from(value: RemoteExecServerConnectArgs) -> Self {
Self {
client_name: value.client_name,
initialize_timeout: value.initialize_timeout,
}
}
}
#[derive(Debug, thiserror::Error)]
pub enum ExecServerError {
#[error("exec-server is unavailable on wasm32")]
Unsupported,
#[error("exec-server transport closed")]
Closed,
#[error("failed to serialize or deserialize exec-server JSON: {0}")]
Json(#[from] serde_json::Error),
#[error("exec-server protocol error: {0}")]
Protocol(String),
#[error("exec-server rejected request ({code}): {message}")]
Server { code: i64, message: String },
}
#[derive(Debug, thiserror::Error)]
pub enum ExecServerListenUrlParseError {
#[error("exec-server listen URLs are unavailable on wasm32: {0}")]
UnsupportedListenUrl(String),
}
#[derive(Clone, Default)]
pub struct ExecServerClient;
impl ExecServerClient {
pub async fn connect_websocket(
_args: RemoteExecServerConnectArgs,
) -> Result<Self, ExecServerError> {
Err(ExecServerError::Unsupported)
}
pub async fn initialize(
&self,
_options: ExecServerClientConnectOptions,
) -> Result<InitializeResponse, ExecServerError> {
Err(ExecServerError::Unsupported)
}
pub async fn exec(&self, _params: ExecParams) -> Result<ExecResponse, ExecServerError> {
Err(ExecServerError::Unsupported)
}
pub async fn read(&self, _params: ReadParams) -> Result<ReadResponse, ExecServerError> {
Err(ExecServerError::Unsupported)
}
pub async fn write(
&self,
_process_id: &ProcessId,
_chunk: Vec<u8>,
) -> Result<WriteResponse, ExecServerError> {
Err(ExecServerError::Unsupported)
}
pub async fn terminate(
&self,
_process_id: &ProcessId,
) -> Result<TerminateResponse, ExecServerError> {
Err(ExecServerError::Unsupported)
}
}
pub trait ExecutorEnvironment: Send + Sync {
fn get_exec_backend(&self) -> Arc<dyn ExecBackend>;
}
#[derive(Default)]
pub struct EnvironmentManager {
exec_server_url: Option<String>,
current_environment: OnceCell<Arc<Environment>>,
}
impl EnvironmentManager {
pub fn new(exec_server_url: Option<String>) -> Self {
Self {
exec_server_url: normalize_exec_server_url(exec_server_url),
current_environment: OnceCell::new(),
}
}
pub fn from_env() -> Self {
Self::new(std::env::var(CODEX_EXEC_SERVER_URL_ENV_VAR).ok())
}
pub fn exec_server_url(&self) -> Option<&str> {
self.exec_server_url.as_deref()
}
pub async fn current(&self) -> Result<Arc<Environment>, ExecServerError> {
self.current_environment
.get_or_init(|| async { Arc::new(Environment::default()) })
.await;
self.current_environment
.get()
.cloned()
.ok_or(ExecServerError::Unsupported)
}
}
#[derive(Clone)]
pub struct Environment {
exec_server_url: Option<String>,
exec_backend: Arc<dyn ExecBackend>,
filesystem: Arc<dyn ExecutorFileSystem>,
}
impl Default for Environment {
fn default() -> Self {
Self {
exec_server_url: None,
exec_backend: Arc::new(NoopExecBackend),
filesystem: Arc::new(UnsupportedFileSystem),
}
}
}
impl Environment {
pub async fn create(exec_server_url: Option<String>) -> Result<Self, ExecServerError> {
Ok(Self {
exec_server_url: normalize_exec_server_url(exec_server_url),
..Self::default()
})
}
pub fn exec_server_url(&self) -> Option<&str> {
self.exec_server_url.as_deref()
}
pub fn get_exec_backend(&self) -> Arc<dyn ExecBackend> {
Arc::clone(&self.exec_backend)
}
pub fn get_filesystem(&self) -> Arc<dyn ExecutorFileSystem> {
Arc::clone(&self.filesystem)
}
}
impl ExecutorEnvironment for Environment {
fn get_exec_backend(&self) -> Arc<dyn ExecBackend> {
Arc::clone(&self.exec_backend)
}
}
#[derive(Debug, Default)]
struct NoopExecBackend;
#[async_trait]
impl ExecBackend for NoopExecBackend {
async fn start(&self, params: ExecParams) -> Result<StartedExecProcess, ExecServerError> {
Ok(StartedExecProcess {
process: Arc::new(NoopExecProcess::new(params.process_id)),
})
}
}
#[derive(Debug)]
struct NoopExecProcess {
process_id: ProcessId,
wake_tx: watch::Sender<u64>,
}
impl NoopExecProcess {
fn new(process_id: ProcessId) -> Self {
let (wake_tx, _wake_rx) = watch::channel(0);
Self {
process_id,
wake_tx,
}
}
}
#[async_trait]
impl ExecProcess for NoopExecProcess {
fn process_id(&self) -> &ProcessId {
&self.process_id
}
fn subscribe_wake(&self) -> watch::Receiver<u64> {
self.wake_tx.subscribe()
}
async fn read(
&self,
_after_seq: Option<u64>,
_max_bytes: Option<usize>,
_wait_ms: Option<u64>,
) -> Result<ReadResponse, ExecServerError> {
Ok(ReadResponse {
chunks: Vec::new(),
next_seq: 0,
exited: true,
exit_code: Some(1),
closed: true,
failure: Some("exec-server is unavailable on wasm32".to_string()),
})
}
async fn write(&self, _chunk: Vec<u8>) -> Result<WriteResponse, ExecServerError> {
Ok(WriteResponse {
status: WriteStatus::UnknownProcess,
})
}
async fn terminate(&self) -> Result<(), ExecServerError> {
Ok(())
}
}
#[derive(Debug, Default)]
struct UnsupportedFileSystem;
#[async_trait]
impl ExecutorFileSystem for UnsupportedFileSystem {
async fn read_file(
&self,
_path: &codex_utils_absolute_path::AbsolutePathBuf,
) -> FileSystemResult<Vec<u8>> {
Err(unsupported_io_error())
}
async fn write_file(
&self,
_path: &codex_utils_absolute_path::AbsolutePathBuf,
_contents: Vec<u8>,
) -> FileSystemResult<()> {
Err(unsupported_io_error())
}
async fn create_directory(
&self,
_path: &codex_utils_absolute_path::AbsolutePathBuf,
_options: CreateDirectoryOptions,
) -> FileSystemResult<()> {
Err(unsupported_io_error())
}
async fn get_metadata(
&self,
_path: &codex_utils_absolute_path::AbsolutePathBuf,
) -> FileSystemResult<FileMetadata> {
Err(unsupported_io_error())
}
async fn read_directory(
&self,
_path: &codex_utils_absolute_path::AbsolutePathBuf,
) -> FileSystemResult<Vec<ReadDirectoryEntry>> {
Err(unsupported_io_error())
}
async fn remove(
&self,
_path: &codex_utils_absolute_path::AbsolutePathBuf,
_options: RemoveOptions,
) -> FileSystemResult<()> {
Err(unsupported_io_error())
}
async fn copy(
&self,
_source_path: &codex_utils_absolute_path::AbsolutePathBuf,
_destination_path: &codex_utils_absolute_path::AbsolutePathBuf,
_options: CopyOptions,
) -> FileSystemResult<()> {
Err(unsupported_io_error())
}
}
pub async fn run_main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
Err(Box::new(ExecServerError::Unsupported))
}
pub async fn run_main_with_listen_url(
_listen_url: &str,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
Err(Box::new(ExecServerError::Unsupported))
}
fn normalize_exec_server_url(exec_server_url: Option<String>) -> Option<String> {
exec_server_url.and_then(|url| {
let url = url.trim();
(!url.is_empty()).then(|| url.to_string())
})
}
fn unsupported_io_error() -> io::Error {
io::Error::new(
io::ErrorKind::Unsupported,
"exec-server filesystem is unavailable on wasm32",
)
}

View File

@@ -17,7 +17,6 @@ schemars = { workspace = true }
serde = { workspace = true, features = ["derive"] }
tempfile = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true, features = ["macros", "process", "rt", "time"] }
ts-rs = { workspace = true, features = [
"uuid-impl",
"serde-json-impl",
@@ -28,3 +27,6 @@ walkdir = { workspace = true }
[dev-dependencies]
assert_matches = { workspace = true }
pretty_assertions = { workspace = true }
[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
tokio = { workspace = true, features = ["macros", "process", "rt", "time"] }

View File

@@ -30,6 +30,8 @@ pub enum GitToolingError {
PathPrefix(#[from] std::path::StripPrefixError),
#[error(transparent)]
Walkdir(#[from] WalkdirError),
#[error("{0}")]
UnsupportedPlatform(String),
#[error(transparent)]
Io(#[from] std::io::Error),
}

View File

@@ -5,6 +5,7 @@ mod apply;
mod branch;
mod errors;
mod ghost_commits;
#[cfg(not(target_arch = "wasm32"))]
mod info;
mod operations;
mod platform;
@@ -29,20 +30,35 @@ pub use ghost_commits::create_ghost_commit_with_report;
pub use ghost_commits::restore_ghost_commit;
pub use ghost_commits::restore_ghost_commit_with_options;
pub use ghost_commits::restore_to_commit;
#[cfg(not(target_arch = "wasm32"))]
pub use info::CommitLogEntry;
#[cfg(not(target_arch = "wasm32"))]
pub use info::GitDiffToRemote;
#[cfg(not(target_arch = "wasm32"))]
pub use info::GitInfo;
#[cfg(not(target_arch = "wasm32"))]
pub use info::collect_git_info;
#[cfg(not(target_arch = "wasm32"))]
pub use info::current_branch_name;
#[cfg(not(target_arch = "wasm32"))]
pub use info::default_branch_name;
#[cfg(not(target_arch = "wasm32"))]
pub use info::get_git_remote_urls;
#[cfg(not(target_arch = "wasm32"))]
pub use info::get_git_remote_urls_assume_git_repo;
#[cfg(not(target_arch = "wasm32"))]
pub use info::get_git_repo_root;
#[cfg(not(target_arch = "wasm32"))]
pub use info::get_has_changes;
#[cfg(not(target_arch = "wasm32"))]
pub use info::get_head_commit_hash;
#[cfg(not(target_arch = "wasm32"))]
pub use info::git_diff_to_remote;
#[cfg(not(target_arch = "wasm32"))]
pub use info::local_git_branches;
#[cfg(not(target_arch = "wasm32"))]
pub use info::recent_commits;
#[cfg(not(target_arch = "wasm32"))]
pub use info::resolve_root_git_project_for_trust;
pub use platform::create_symlink;
use schemars::JsonSchema;

View File

@@ -34,4 +34,12 @@ pub fn create_symlink(
}
#[cfg(not(any(unix, windows)))]
compile_error!("codex-git symlink support is only implemented for Unix and Windows");
pub fn create_symlink(
_source: &Path,
_link_target: &Path,
_destination: &Path,
) -> Result<(), GitToolingError> {
Err(GitToolingError::UnsupportedPlatform(
"symlink creation is unavailable on this target".to_string(),
))
}

View File

@@ -18,15 +18,17 @@ clap = { workspace = true, features = ["derive"] }
chrono = { workspace = true }
codex-utils-absolute-path = { workspace = true }
codex-utils-home-dir = { workspace = true }
codex-utils-rustls-provider = { workspace = true }
globset = { workspace = true }
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
thiserror = { workspace = true }
time = { workspace = true }
tokio = { workspace = true, features = ["full"] }
tracing = { workspace = true }
url = { workspace = true }
[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
codex-utils-rustls-provider = { workspace = true }
time = { workspace = true }
tokio = { workspace = true, features = ["full"] }
rama-core = { version = "=0.3.0-alpha.4" }
rama-http = { version = "=0.3.0-alpha.4" }
rama-http-backend = { version = "=0.3.0-alpha.4", features = ["tls"] }
@@ -35,9 +37,12 @@ rama-socks5 = { version = "=0.3.0-alpha.4" }
rama-tcp = { version = "=0.3.0-alpha.4", features = ["http"] }
rama-tls-rustls = { version = "=0.3.0-alpha.4", features = ["http"] }
[target.'cfg(target_arch = "wasm32")'.dependencies]
tokio = { workspace = true, features = ["macros", "rt", "sync"] }
[dev-dependencies]
pretty_assertions = { workspace = true }
tempfile = { workspace = true }
[target.'cfg(target_family = "unix")'.dependencies]
[target.'cfg(all(target_family = "unix", not(target_arch = "wasm32")))'.dependencies]
rama-unix = { version = "=0.3.0-alpha.4" }

View File

@@ -1,56 +1,9 @@
#![deny(clippy::print_stdout, clippy::print_stderr)]
mod certs;
mod config;
mod http_proxy;
mod mitm;
mod network_policy;
mod policy;
mod proxy;
mod reasons;
mod responses;
mod runtime;
mod socks5;
mod state;
mod upstream;
#[cfg(not(target_arch = "wasm32"))]
include!("native.rs");
pub use config::NetworkDomainPermission;
pub use config::NetworkDomainPermissionEntry;
pub use config::NetworkDomainPermissions;
pub use config::NetworkMode;
pub use config::NetworkProxyConfig;
pub use config::NetworkUnixSocketPermission;
pub use config::NetworkUnixSocketPermissions;
pub use config::host_and_port_from_network_addr;
pub use network_policy::NetworkDecision;
pub use network_policy::NetworkDecisionSource;
pub use network_policy::NetworkPolicyDecider;
pub use network_policy::NetworkPolicyDecision;
pub use network_policy::NetworkPolicyRequest;
pub use network_policy::NetworkPolicyRequestArgs;
pub use network_policy::NetworkProtocol;
pub use policy::normalize_host;
pub use proxy::ALL_PROXY_ENV_KEYS;
pub use proxy::ALLOW_LOCAL_BINDING_ENV_KEY;
pub use proxy::Args;
pub use proxy::DEFAULT_NO_PROXY_VALUE;
pub use proxy::NO_PROXY_ENV_KEYS;
pub use proxy::NetworkProxy;
pub use proxy::NetworkProxyBuilder;
pub use proxy::NetworkProxyHandle;
pub use proxy::PROXY_URL_ENV_KEYS;
pub use proxy::has_proxy_url_env_vars;
pub use proxy::proxy_url_env_value;
pub use runtime::BlockedRequest;
pub use runtime::BlockedRequestArgs;
pub use runtime::BlockedRequestObserver;
pub use runtime::ConfigReloader;
pub use runtime::ConfigState;
pub use runtime::NetworkProxyState;
pub use state::NetworkProxyAuditMetadata;
pub use state::NetworkProxyConstraintError;
pub use state::NetworkProxyConstraints;
pub use state::PartialNetworkConfig;
pub use state::PartialNetworkProxyConfig;
pub use state::build_config_state;
pub use state::validate_policy_against_constraints;
#[cfg(target_arch = "wasm32")]
mod wasm;
#[cfg(target_arch = "wasm32")]
pub use wasm::*;

View File

@@ -0,0 +1,54 @@
mod certs;
mod config;
mod http_proxy;
mod mitm;
mod network_policy;
mod policy;
mod proxy;
mod reasons;
mod responses;
mod runtime;
mod socks5;
mod state;
mod upstream;
pub use config::NetworkDomainPermission;
pub use config::NetworkDomainPermissionEntry;
pub use config::NetworkDomainPermissions;
pub use config::NetworkMode;
pub use config::NetworkProxyConfig;
pub use config::NetworkUnixSocketPermission;
pub use config::NetworkUnixSocketPermissions;
pub use config::host_and_port_from_network_addr;
pub use network_policy::NetworkDecision;
pub use network_policy::NetworkDecisionSource;
pub use network_policy::NetworkPolicyDecider;
pub use network_policy::NetworkPolicyDecision;
pub use network_policy::NetworkPolicyRequest;
pub use network_policy::NetworkPolicyRequestArgs;
pub use network_policy::NetworkProtocol;
pub use policy::normalize_host;
pub use proxy::ALL_PROXY_ENV_KEYS;
pub use proxy::ALLOW_LOCAL_BINDING_ENV_KEY;
pub use proxy::Args;
pub use proxy::DEFAULT_NO_PROXY_VALUE;
pub use proxy::NO_PROXY_ENV_KEYS;
pub use proxy::NetworkProxy;
pub use proxy::NetworkProxyBuilder;
pub use proxy::NetworkProxyHandle;
pub use proxy::PROXY_URL_ENV_KEYS;
pub use proxy::has_proxy_url_env_vars;
pub use proxy::proxy_url_env_value;
pub use runtime::BlockedRequest;
pub use runtime::BlockedRequestArgs;
pub use runtime::BlockedRequestObserver;
pub use runtime::ConfigReloader;
pub use runtime::ConfigState;
pub use runtime::NetworkProxyState;
pub use state::NetworkProxyAuditMetadata;
pub use state::NetworkProxyConstraintError;
pub use state::NetworkProxyConstraints;
pub use state::PartialNetworkConfig;
pub use state::PartialNetworkProxyConfig;
pub use state::build_config_state;
pub use state::validate_policy_against_constraints;

View File

@@ -0,0 +1,863 @@
#[path = "config.rs"]
mod config;
use anyhow::Context;
use anyhow::Result;
use async_trait::async_trait;
use serde::Deserialize;
use serde::Serialize;
use std::collections::HashMap;
use std::collections::VecDeque;
use std::future::Future;
use std::net::SocketAddr;
use std::sync::Arc;
use tokio::sync::RwLock;
pub use config::NetworkDomainPermission;
pub use config::NetworkDomainPermissionEntry;
pub use config::NetworkDomainPermissions;
pub use config::NetworkMode;
pub use config::NetworkProxyConfig;
pub use config::NetworkUnixSocketPermission;
pub use config::NetworkUnixSocketPermissions;
pub use config::host_and_port_from_network_addr;
pub const PROXY_URL_ENV_KEYS: &[&str] = &[
"HTTP_PROXY",
"HTTPS_PROXY",
"WS_PROXY",
"WSS_PROXY",
"ALL_PROXY",
"FTP_PROXY",
"YARN_HTTP_PROXY",
"YARN_HTTPS_PROXY",
"NPM_CONFIG_HTTP_PROXY",
"NPM_CONFIG_HTTPS_PROXY",
"NPM_CONFIG_PROXY",
"BUNDLE_HTTP_PROXY",
"BUNDLE_HTTPS_PROXY",
"PIP_PROXY",
"DOCKER_HTTP_PROXY",
"DOCKER_HTTPS_PROXY",
];
pub const ALL_PROXY_ENV_KEYS: &[&str] = &["ALL_PROXY", "all_proxy"];
pub const ALLOW_LOCAL_BINDING_ENV_KEY: &str = "CODEX_NETWORK_ALLOW_LOCAL_BINDING";
pub const NO_PROXY_ENV_KEYS: &[&str] = &[
"NO_PROXY",
"no_proxy",
"npm_config_noproxy",
"NPM_CONFIG_NOPROXY",
"YARN_NO_PROXY",
"BUNDLE_NO_PROXY",
];
pub const DEFAULT_NO_PROXY_VALUE: &str = concat!(
"localhost,127.0.0.1,::1,",
"*.local,.local,",
"169.254.0.0/16,10.0.0.0/8,172.16.0.0/12,192.168.0.0/16"
);
const FTP_PROXY_ENV_KEYS: &[&str] = &["FTP_PROXY", "ftp_proxy"];
const WEBSOCKET_PROXY_ENV_KEYS: &[&str] = &["WS_PROXY", "WSS_PROXY", "ws_proxy", "wss_proxy"];
#[derive(Debug, Clone, clap::Parser)]
#[command(name = "codex-network-proxy", about = "Codex network sandbox proxy")]
pub struct Args {}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum NetworkProtocol {
Http,
HttpsConnect,
Socks5Tcp,
Socks5Udp,
}
impl NetworkProtocol {
pub const fn as_policy_protocol(self) -> &'static str {
match self {
Self::Http => "http",
Self::HttpsConnect => "https_connect",
Self::Socks5Tcp => "socks5_tcp",
Self::Socks5Udp => "socks5_udp",
}
}
}
#[derive(Clone, Copy, Debug, Deserialize, Serialize, PartialEq, Eq)]
#[serde(rename_all = "lowercase")]
pub enum NetworkPolicyDecision {
Deny,
Ask,
}
impl NetworkPolicyDecision {
pub const fn as_str(self) -> &'static str {
match self {
Self::Deny => "deny",
Self::Ask => "ask",
}
}
}
#[derive(Clone, Copy, Debug, Deserialize, Serialize, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
pub enum NetworkDecisionSource {
BaselinePolicy,
ModeGuard,
ProxyState,
Decider,
}
impl NetworkDecisionSource {
pub const fn as_str(self) -> &'static str {
match self {
Self::BaselinePolicy => "baseline_policy",
Self::ModeGuard => "mode_guard",
Self::ProxyState => "proxy_state",
Self::Decider => "decider",
}
}
}
#[derive(Clone, Debug)]
pub struct NetworkPolicyRequest {
pub protocol: NetworkProtocol,
pub host: String,
pub port: u16,
pub client_addr: Option<String>,
pub method: Option<String>,
pub command: Option<String>,
pub exec_policy_hint: Option<String>,
}
pub struct NetworkPolicyRequestArgs {
pub protocol: NetworkProtocol,
pub host: String,
pub port: u16,
pub client_addr: Option<String>,
pub method: Option<String>,
pub command: Option<String>,
pub exec_policy_hint: Option<String>,
}
impl NetworkPolicyRequest {
pub fn new(args: NetworkPolicyRequestArgs) -> Self {
let NetworkPolicyRequestArgs {
protocol,
host,
port,
client_addr,
method,
command,
exec_policy_hint,
} = args;
Self {
protocol,
host,
port,
client_addr,
method,
command,
exec_policy_hint,
}
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum NetworkDecision {
Allow,
Deny {
reason: String,
source: NetworkDecisionSource,
decision: NetworkPolicyDecision,
},
}
impl NetworkDecision {
pub fn deny(reason: impl Into<String>) -> Self {
Self::deny_with_source(reason, NetworkDecisionSource::Decider)
}
pub fn ask(reason: impl Into<String>) -> Self {
Self::ask_with_source(reason, NetworkDecisionSource::Decider)
}
pub fn deny_with_source(reason: impl Into<String>, source: NetworkDecisionSource) -> Self {
Self::Deny {
reason: reason.into(),
source,
decision: NetworkPolicyDecision::Deny,
}
}
pub fn ask_with_source(reason: impl Into<String>, source: NetworkDecisionSource) -> Self {
Self::Deny {
reason: reason.into(),
source,
decision: NetworkPolicyDecision::Ask,
}
}
}
#[async_trait]
pub trait NetworkPolicyDecider: Send + Sync + 'static {
async fn decide(&self, request: NetworkPolicyRequest) -> Result<NetworkDecision>;
}
#[async_trait]
impl<D: NetworkPolicyDecider + ?Sized> NetworkPolicyDecider for Arc<D> {
async fn decide(&self, request: NetworkPolicyRequest) -> Result<NetworkDecision> {
(**self).decide(request).await
}
}
#[async_trait]
impl<F, Fut> NetworkPolicyDecider for F
where
F: Fn(NetworkPolicyRequest) -> Fut + Send + Sync + 'static,
Fut: Future<Output = Result<NetworkDecision>> + Send,
{
async fn decide(&self, request: NetworkPolicyRequest) -> Result<NetworkDecision> {
(self)(request).await
}
}
pub fn normalize_host(host: &str) -> String {
let host = host.trim();
if host.starts_with('[')
&& let Some(end) = host.find(']')
{
return host[1..end]
.to_ascii_lowercase()
.trim_end_matches('.')
.to_string();
}
if host.bytes().filter(|b| *b == b':').count() == 1 {
let trimmed = host.split(':').next().unwrap_or_default();
return trimmed
.to_ascii_lowercase()
.trim_end_matches('.')
.to_string();
}
host.to_ascii_lowercase().trim_end_matches('.').to_string()
}
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct NetworkProxyConstraints {
pub enabled: Option<bool>,
pub mode: Option<NetworkMode>,
pub allow_upstream_proxy: Option<bool>,
pub dangerously_allow_non_loopback_proxy: Option<bool>,
pub dangerously_allow_all_unix_sockets: Option<bool>,
pub allowed_domains: Option<Vec<String>>,
pub allowlist_expansion_enabled: Option<bool>,
pub denied_domains: Option<Vec<String>>,
pub denylist_expansion_enabled: Option<bool>,
pub allow_unix_sockets: Option<Vec<String>>,
pub allow_local_binding: Option<bool>,
}
#[derive(Debug, Clone, Deserialize)]
pub struct PartialNetworkProxyConfig {
#[serde(default)]
pub network: PartialNetworkConfig,
}
#[derive(Debug, Default, Clone, Deserialize)]
pub struct PartialNetworkConfig {
pub enabled: Option<bool>,
pub mode: Option<NetworkMode>,
pub allow_upstream_proxy: Option<bool>,
pub dangerously_allow_non_loopback_proxy: Option<bool>,
pub dangerously_allow_all_unix_sockets: Option<bool>,
#[serde(default)]
pub domains: Option<NetworkDomainPermissions>,
#[serde(default)]
pub unix_sockets: Option<NetworkUnixSocketPermissions>,
pub allow_local_binding: Option<bool>,
}
#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
pub enum NetworkProxyConstraintError {
#[error("invalid value for {field_name}: {candidate} (allowed {allowed})")]
InvalidValue {
field_name: &'static str,
candidate: String,
allowed: String,
},
}
impl NetworkProxyConstraintError {
pub fn into_anyhow(self) -> anyhow::Error {
anyhow::anyhow!(self)
}
}
#[derive(Clone, Debug, Default, PartialEq, Eq)]
pub struct NetworkProxyAuditMetadata {
pub conversation_id: Option<String>,
pub app_version: Option<String>,
pub user_account_id: Option<String>,
pub auth_mode: Option<String>,
pub originator: Option<String>,
pub user_email: Option<String>,
pub terminal_type: Option<String>,
pub model: Option<String>,
pub slug: Option<String>,
}
#[derive(Clone, Debug, serde::Serialize)]
pub struct BlockedRequest {
pub host: String,
pub reason: String,
pub client: Option<String>,
pub method: Option<String>,
pub mode: Option<NetworkMode>,
pub protocol: String,
pub decision: Option<String>,
pub source: Option<String>,
pub port: Option<u16>,
pub timestamp: i64,
}
pub struct BlockedRequestArgs {
pub host: String,
pub reason: String,
pub client: Option<String>,
pub method: Option<String>,
pub mode: Option<NetworkMode>,
pub protocol: String,
pub decision: Option<String>,
pub source: Option<String>,
pub port: Option<u16>,
}
impl BlockedRequest {
pub fn new(args: BlockedRequestArgs) -> Self {
let BlockedRequestArgs {
host,
reason,
client,
method,
mode,
protocol,
decision,
source,
port,
} = args;
Self {
host,
reason,
client,
method,
mode,
protocol,
decision,
source,
port,
timestamp: chrono::Utc::now().timestamp(),
}
}
}
#[derive(Clone)]
pub struct ConfigState {
pub config: NetworkProxyConfig,
pub constraints: NetworkProxyConstraints,
pub blocked: VecDeque<BlockedRequest>,
pub blocked_total: u64,
}
#[async_trait]
pub trait ConfigReloader: Send + Sync {
fn source_label(&self) -> String;
async fn maybe_reload(&self) -> Result<Option<ConfigState>>;
async fn reload_now(&self) -> Result<ConfigState>;
}
#[async_trait]
pub trait BlockedRequestObserver: Send + Sync + 'static {
async fn on_blocked_request(&self, request: BlockedRequest);
}
#[async_trait]
impl<O: BlockedRequestObserver + ?Sized> BlockedRequestObserver for Arc<O> {
async fn on_blocked_request(&self, request: BlockedRequest) {
(**self).on_blocked_request(request).await
}
}
#[async_trait]
impl<F, Fut> BlockedRequestObserver for F
where
F: Fn(BlockedRequest) -> Fut + Send + Sync + 'static,
Fut: Future<Output = ()> + Send,
{
async fn on_blocked_request(&self, request: BlockedRequest) {
(self)(request).await;
}
}
pub fn build_config_state(
config: NetworkProxyConfig,
constraints: NetworkProxyConstraints,
) -> anyhow::Result<ConfigState> {
validate_policy_against_constraints(&config, &constraints)?;
Ok(ConfigState {
config,
constraints,
blocked: VecDeque::new(),
blocked_total: 0,
})
}
pub fn validate_policy_against_constraints(
config: &NetworkProxyConfig,
constraints: &NetworkProxyConstraints,
) -> Result<(), NetworkProxyConstraintError> {
if let Some(false) = constraints.enabled
&& config.network.enabled
{
return Err(NetworkProxyConstraintError::InvalidValue {
field_name: "network.enabled",
candidate: "true".to_string(),
allowed: "false (disabled by managed config)".to_string(),
});
}
if let Some(false) = constraints.allow_local_binding
&& config.network.allow_local_binding
{
return Err(NetworkProxyConstraintError::InvalidValue {
field_name: "network.allow_local_binding",
candidate: "true".to_string(),
allowed: "false (disabled by managed config)".to_string(),
});
}
Ok(())
}
pub struct NetworkProxyState {
state: Arc<RwLock<ConfigState>>,
reloader: Arc<dyn ConfigReloader>,
blocked_request_observer: Arc<RwLock<Option<Arc<dyn BlockedRequestObserver>>>>,
audit_metadata: NetworkProxyAuditMetadata,
}
impl std::fmt::Debug for NetworkProxyState {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("NetworkProxyState").finish_non_exhaustive()
}
}
impl Clone for NetworkProxyState {
fn clone(&self) -> Self {
Self {
state: self.state.clone(),
reloader: self.reloader.clone(),
blocked_request_observer: self.blocked_request_observer.clone(),
audit_metadata: self.audit_metadata.clone(),
}
}
}
impl NetworkProxyState {
pub fn with_reloader(state: ConfigState, reloader: Arc<dyn ConfigReloader>) -> Self {
Self::with_reloader_and_audit_metadata(
state,
reloader,
NetworkProxyAuditMetadata::default(),
)
}
pub fn with_reloader_and_blocked_observer(
state: ConfigState,
reloader: Arc<dyn ConfigReloader>,
blocked_request_observer: Option<Arc<dyn BlockedRequestObserver>>,
) -> Self {
Self::with_reloader_and_audit_metadata_and_blocked_observer(
state,
reloader,
NetworkProxyAuditMetadata::default(),
blocked_request_observer,
)
}
pub fn with_reloader_and_audit_metadata(
state: ConfigState,
reloader: Arc<dyn ConfigReloader>,
audit_metadata: NetworkProxyAuditMetadata,
) -> Self {
Self::with_reloader_and_audit_metadata_and_blocked_observer(
state,
reloader,
audit_metadata,
None,
)
}
pub fn with_reloader_and_audit_metadata_and_blocked_observer(
state: ConfigState,
reloader: Arc<dyn ConfigReloader>,
audit_metadata: NetworkProxyAuditMetadata,
blocked_request_observer: Option<Arc<dyn BlockedRequestObserver>>,
) -> Self {
Self {
state: Arc::new(RwLock::new(state)),
reloader,
blocked_request_observer: Arc::new(RwLock::new(blocked_request_observer)),
audit_metadata,
}
}
pub async fn set_blocked_request_observer(
&self,
blocked_request_observer: Option<Arc<dyn BlockedRequestObserver>>,
) {
let mut observer = self.blocked_request_observer.write().await;
*observer = blocked_request_observer;
}
pub fn audit_metadata(&self) -> &NetworkProxyAuditMetadata {
&self.audit_metadata
}
pub async fn current_cfg(&self) -> Result<NetworkProxyConfig> {
self.reload_if_needed().await?;
let guard = self.state.read().await;
Ok(guard.config.clone())
}
pub async fn add_allowed_domain(&self, host: &str) -> Result<()> {
let mut guard = self.state.write().await;
guard.config.network.upsert_domain_permission(
host.to_string(),
NetworkDomainPermission::Allow,
normalize_host,
);
Ok(())
}
pub async fn add_denied_domain(&self, host: &str) -> Result<()> {
let mut guard = self.state.write().await;
guard.config.network.upsert_domain_permission(
host.to_string(),
NetworkDomainPermission::Deny,
normalize_host,
);
Ok(())
}
pub async fn record_blocked(&self, entry: BlockedRequest) -> Result<()> {
let blocked_for_observer = entry.clone();
let blocked_request_observer = self.blocked_request_observer.read().await.clone();
let mut guard = self.state.write().await;
guard.blocked.push_back(entry);
guard.blocked_total = guard.blocked_total.saturating_add(1);
while guard.blocked.len() > 200 {
guard.blocked.pop_front();
}
drop(guard);
if let Some(observer) = blocked_request_observer {
observer.on_blocked_request(blocked_for_observer).await;
}
Ok(())
}
async fn reload_if_needed(&self) -> Result<()> {
if let Some(mut new_state) = self.reloader.maybe_reload().await? {
let blocked = {
let guard = self.state.read().await;
(guard.blocked.clone(), guard.blocked_total)
};
new_state.blocked = blocked.0;
new_state.blocked_total = blocked.1;
let mut guard = self.state.write().await;
*guard = new_state;
}
Ok(())
}
}
#[derive(Clone)]
pub struct NetworkProxyBuilder {
state: Option<Arc<NetworkProxyState>>,
http_addr: Option<SocketAddr>,
socks_addr: Option<SocketAddr>,
_managed_by_codex: bool,
policy_decider: Option<Arc<dyn NetworkPolicyDecider>>,
blocked_request_observer: Option<Arc<dyn BlockedRequestObserver>>,
}
impl Default for NetworkProxyBuilder {
fn default() -> Self {
Self {
state: None,
http_addr: None,
socks_addr: None,
_managed_by_codex: true,
policy_decider: None,
blocked_request_observer: None,
}
}
}
impl NetworkProxyBuilder {
pub fn state(mut self, state: Arc<NetworkProxyState>) -> Self {
self.state = Some(state);
self
}
pub fn http_addr(mut self, addr: SocketAddr) -> Self {
self.http_addr = Some(addr);
self
}
pub fn socks_addr(mut self, addr: SocketAddr) -> Self {
self.socks_addr = Some(addr);
self
}
pub fn managed_by_codex(mut self, managed_by_codex: bool) -> Self {
self._managed_by_codex = managed_by_codex;
self
}
pub fn policy_decider<D>(mut self, decider: D) -> Self
where
D: NetworkPolicyDecider,
{
self.policy_decider = Some(Arc::new(decider));
self
}
pub fn policy_decider_arc(mut self, decider: Arc<dyn NetworkPolicyDecider>) -> Self {
self.policy_decider = Some(decider);
self
}
pub fn blocked_request_observer<O>(mut self, observer: O) -> Self
where
O: BlockedRequestObserver,
{
self.blocked_request_observer = Some(Arc::new(observer));
self
}
pub fn blocked_request_observer_arc(
mut self,
observer: Arc<dyn BlockedRequestObserver>,
) -> Self {
self.blocked_request_observer = Some(observer);
self
}
pub async fn build(self) -> Result<NetworkProxy> {
let state = self.state.context("NetworkProxyBuilder requires a state")?;
state
.set_blocked_request_observer(self.blocked_request_observer.clone())
.await;
let current_cfg = state.current_cfg().await?;
let http_addr = self
.http_addr
.unwrap_or(SocketAddr::from(([127, 0, 0, 1], 0)));
let socks_addr = self
.socks_addr
.unwrap_or(SocketAddr::from(([127, 0, 0, 1], 0)));
Ok(NetworkProxy {
state,
http_addr,
socks_addr,
socks_enabled: current_cfg.network.enable_socks5,
allow_local_binding: current_cfg.network.allow_local_binding,
allow_unix_sockets: current_cfg.network.allow_unix_sockets(),
dangerously_allow_all_unix_sockets: current_cfg
.network
.dangerously_allow_all_unix_sockets,
_policy_decider: self.policy_decider,
})
}
}
#[derive(Clone)]
pub struct NetworkProxy {
state: Arc<NetworkProxyState>,
http_addr: SocketAddr,
socks_addr: SocketAddr,
socks_enabled: bool,
allow_local_binding: bool,
allow_unix_sockets: Vec<String>,
dangerously_allow_all_unix_sockets: bool,
_policy_decider: Option<Arc<dyn NetworkPolicyDecider>>,
}
impl std::fmt::Debug for NetworkProxy {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("NetworkProxy")
.field("http_addr", &self.http_addr)
.field("socks_addr", &self.socks_addr)
.finish_non_exhaustive()
}
}
impl PartialEq for NetworkProxy {
fn eq(&self, other: &Self) -> bool {
self.http_addr == other.http_addr
&& self.socks_addr == other.socks_addr
&& self.allow_local_binding == other.allow_local_binding
}
}
impl Eq for NetworkProxy {}
pub fn proxy_url_env_value<'a>(
env: &'a HashMap<String, String>,
canonical_key: &str,
) -> Option<&'a str> {
if let Some(value) = env.get(canonical_key) {
return Some(value.as_str());
}
let lower_key = canonical_key.to_ascii_lowercase();
env.get(lower_key.as_str()).map(String::as_str)
}
pub fn has_proxy_url_env_vars(env: &HashMap<String, String>) -> bool {
PROXY_URL_ENV_KEYS
.iter()
.any(|key| proxy_url_env_value(env, key).is_some_and(|value| !value.trim().is_empty()))
}
fn set_env_keys(env: &mut HashMap<String, String>, keys: &[&str], value: &str) {
for key in keys {
env.insert((*key).to_string(), value.to_string());
}
}
fn apply_proxy_env_overrides(
env: &mut HashMap<String, String>,
http_addr: SocketAddr,
socks_addr: SocketAddr,
socks_enabled: bool,
allow_local_binding: bool,
) {
let http_proxy_url = format!("http://{http_addr}");
let socks_proxy_url = format!("socks5h://{socks_addr}");
env.insert(
ALLOW_LOCAL_BINDING_ENV_KEY.to_string(),
if allow_local_binding { "1" } else { "0" }.to_string(),
);
set_env_keys(
env,
&[
"HTTP_PROXY",
"HTTPS_PROXY",
"http_proxy",
"https_proxy",
"YARN_HTTP_PROXY",
"YARN_HTTPS_PROXY",
"npm_config_http_proxy",
"npm_config_https_proxy",
"npm_config_proxy",
"NPM_CONFIG_HTTP_PROXY",
"NPM_CONFIG_HTTPS_PROXY",
"NPM_CONFIG_PROXY",
"BUNDLE_HTTP_PROXY",
"BUNDLE_HTTPS_PROXY",
"PIP_PROXY",
"DOCKER_HTTP_PROXY",
"DOCKER_HTTPS_PROXY",
],
&http_proxy_url,
);
set_env_keys(env, WEBSOCKET_PROXY_ENV_KEYS, &http_proxy_url);
set_env_keys(env, NO_PROXY_ENV_KEYS, DEFAULT_NO_PROXY_VALUE);
env.insert("ELECTRON_GET_USE_PROXY".to_string(), "true".to_string());
if socks_enabled {
set_env_keys(env, ALL_PROXY_ENV_KEYS, &socks_proxy_url);
set_env_keys(env, FTP_PROXY_ENV_KEYS, &socks_proxy_url);
} else {
set_env_keys(env, ALL_PROXY_ENV_KEYS, &http_proxy_url);
set_env_keys(env, FTP_PROXY_ENV_KEYS, &http_proxy_url);
}
}
impl NetworkProxy {
pub fn builder() -> NetworkProxyBuilder {
NetworkProxyBuilder::default()
}
pub fn http_addr(&self) -> SocketAddr {
self.http_addr
}
pub fn socks_addr(&self) -> SocketAddr {
self.socks_addr
}
pub async fn current_cfg(&self) -> Result<NetworkProxyConfig> {
self.state.current_cfg().await
}
pub async fn add_allowed_domain(&self, host: &str) -> Result<()> {
self.state.add_allowed_domain(host).await
}
pub async fn add_denied_domain(&self, host: &str) -> Result<()> {
self.state.add_denied_domain(host).await
}
pub fn allow_local_binding(&self) -> bool {
self.allow_local_binding
}
pub fn allow_unix_sockets(&self) -> &[String] {
&self.allow_unix_sockets
}
pub fn dangerously_allow_all_unix_sockets(&self) -> bool {
self.dangerously_allow_all_unix_sockets
}
pub fn apply_to_env(&self, env: &mut HashMap<String, String>) {
apply_proxy_env_overrides(
env,
self.http_addr,
self.socks_addr,
self.socks_enabled,
self.allow_local_binding,
);
}
pub async fn run(&self) -> Result<NetworkProxyHandle> {
Ok(NetworkProxyHandle::noop())
}
}
pub struct NetworkProxyHandle {
completed: bool,
}
impl NetworkProxyHandle {
fn noop() -> Self {
Self { completed: true }
}
pub async fn wait(mut self) -> Result<()> {
self.completed = true;
Ok(())
}
pub async fn shutdown(mut self) -> Result<()> {
self.completed = true;
Ok(())
}
}
impl Drop for NetworkProxyHandle {
fn drop(&mut self) {
let _ = self.completed;
}
}

View File

@@ -42,6 +42,9 @@ anyhow = { workspace = true }
pretty_assertions = { workspace = true }
tempfile = { workspace = true }
[target.'cfg(target_arch = "wasm32")'.dependencies]
uuid = { workspace = true, features = ["js"] }
[package.metadata.cargo-shear]
# Required because:
# `icu_provider`: contains a required `sync` feature for `icu_decimal`

View File

@@ -3,22 +3,54 @@ name = "codex-rmcp-client"
version.workspace = true
edition.workspace = true
license.workspace = true
autobins = false
[lints]
workspace = true
[features]
native-bin = []
[[bin]]
name = "rmcp_test_server"
path = "src/bin/rmcp_test_server.rs"
required-features = ["native-bin"]
[[bin]]
name = "test_stdio_server"
path = "src/bin/test_stdio_server.rs"
required-features = ["native-bin"]
[[bin]]
name = "test_streamable_http_server"
path = "src/bin/test_streamable_http_server.rs"
required-features = ["native-bin"]
[dependencies]
anyhow = "1"
codex-protocol = { workspace = true }
futures = { workspace = true, default-features = false, features = ["std"] }
schemars = { workspace = true }
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
thiserror = { workspace = true }
tracing = { workspace = true, features = ["log"] }
[dev-dependencies]
codex-utils-cargo-bin = { workspace = true }
pretty_assertions = { workspace = true }
serial_test = { workspace = true }
tempfile = { workspace = true }
[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
axum = { workspace = true, default-features = false, features = [
"http1",
"tokio",
] }
codex-client = { workspace = true }
codex-keyring-store = { workspace = true }
codex-protocol = { workspace = true }
codex-utils-pty = { workspace = true }
codex-utils-home-dir = { workspace = true }
futures = { workspace = true, default-features = false, features = ["std"] }
codex-utils-pty = { workspace = true }
keyring = { workspace = true, features = ["crypto-rust"] }
oauth2 = "5"
reqwest = { version = "0.12", default-features = false, features = [
@@ -37,12 +69,8 @@ rmcp = { workspace = true, default-features = false, features = [
"transport-streamable-http-client-reqwest",
"transport-streamable-http-server",
] }
schemars = { workspace = true }
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
sha2 = { workspace = true }
sse-stream = "0.2.1"
thiserror = { workspace = true }
tiny_http = { workspace = true }
tokio = { workspace = true, features = [
"io-util",
@@ -53,24 +81,18 @@ tokio = { workspace = true, features = [
"io-std",
"time",
] }
tracing = { workspace = true, features = ["log"] }
urlencoding = { workspace = true }
webbrowser = { workspace = true }
which = { workspace = true }
[dev-dependencies]
codex-utils-cargo-bin = { workspace = true }
pretty_assertions = { workspace = true }
serial_test = { workspace = true }
tempfile = { workspace = true }
[target.'cfg(target_os = "linux")'.dependencies]
[target.'cfg(all(not(target_arch = "wasm32"), target_os = "linux"))'.dependencies]
keyring = { workspace = true, features = ["linux-native-async-persistent"] }
[target.'cfg(target_os = "macos")'.dependencies]
[target.'cfg(all(not(target_arch = "wasm32"), target_os = "macos"))'.dependencies]
keyring = { workspace = true, features = ["apple-native"] }
[target.'cfg(target_os = "windows")'.dependencies]
[target.'cfg(all(not(target_arch = "wasm32"), target_os = "windows"))'.dependencies]
keyring = { workspace = true, features = ["windows-native"] }
[target.'cfg(any(target_os = "freebsd", target_os = "openbsd"))'.dependencies]
[target.'cfg(all(not(target_arch = "wasm32"), any(target_os = "freebsd", target_os = "openbsd")))'.dependencies]
keyring = { workspace = true, features = ["sync-secret-service"] }

View File

@@ -1,31 +1,7 @@
mod auth_status;
mod logging_client_handler;
mod oauth;
mod perform_oauth_login;
mod program_resolver;
mod rmcp_client;
mod utils;
#[cfg(not(target_arch = "wasm32"))]
include!("native.rs");
pub use auth_status::StreamableHttpOAuthDiscovery;
pub use auth_status::determine_streamable_http_auth_status;
pub use auth_status::discover_streamable_http_oauth;
pub use auth_status::supports_oauth_login;
pub use codex_protocol::protocol::McpAuthStatus;
pub use oauth::OAuthCredentialsStoreMode;
pub use oauth::StoredOAuthTokens;
pub use oauth::WrappedOAuthTokenResponse;
pub use oauth::delete_oauth_tokens;
pub(crate) use oauth::load_oauth_tokens;
pub use oauth::save_oauth_tokens;
pub use perform_oauth_login::OAuthProviderError;
pub use perform_oauth_login::OauthLoginHandle;
pub use perform_oauth_login::perform_oauth_login;
pub use perform_oauth_login::perform_oauth_login_return_url;
pub use perform_oauth_login::perform_oauth_login_silent;
pub use rmcp::model::ElicitationAction;
pub use rmcp_client::Elicitation;
pub use rmcp_client::ElicitationResponse;
pub use rmcp_client::ListToolsWithConnectorIdResult;
pub use rmcp_client::RmcpClient;
pub use rmcp_client::SendElicitation;
pub use rmcp_client::ToolWithConnectorId;
#[cfg(target_arch = "wasm32")]
mod wasm;
#[cfg(target_arch = "wasm32")]
pub use wasm::*;

View File

@@ -0,0 +1,31 @@
mod auth_status;
mod logging_client_handler;
mod oauth;
mod perform_oauth_login;
mod program_resolver;
mod rmcp_client;
mod utils;
pub use auth_status::StreamableHttpOAuthDiscovery;
pub use auth_status::determine_streamable_http_auth_status;
pub use auth_status::discover_streamable_http_oauth;
pub use auth_status::supports_oauth_login;
pub use codex_protocol::protocol::McpAuthStatus;
pub use oauth::OAuthCredentialsStoreMode;
pub use oauth::StoredOAuthTokens;
pub use oauth::WrappedOAuthTokenResponse;
pub use oauth::delete_oauth_tokens;
pub(crate) use oauth::load_oauth_tokens;
pub use oauth::save_oauth_tokens;
pub use perform_oauth_login::OAuthProviderError;
pub use perform_oauth_login::OauthLoginHandle;
pub use perform_oauth_login::perform_oauth_login;
pub use perform_oauth_login::perform_oauth_login_return_url;
pub use perform_oauth_login::perform_oauth_login_silent;
pub use rmcp::model::ElicitationAction;
pub use rmcp_client::Elicitation;
pub use rmcp_client::ElicitationResponse;
pub use rmcp_client::ListToolsWithConnectorIdResult;
pub use rmcp_client::RmcpClient;
pub use rmcp_client::SendElicitation;
pub use rmcp_client::ToolWithConnectorId;

View File

@@ -0,0 +1,340 @@
use std::collections::HashMap;
use std::ffi::OsString;
use std::path::PathBuf;
use std::time::Duration;
use anyhow::Result;
use anyhow::anyhow;
use futures::future::BoxFuture;
use schemars::JsonSchema;
use serde::Deserialize;
use serde::Serialize;
pub use codex_protocol::protocol::McpAuthStatus;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub enum ElicitationAction {
Accept,
Decline,
Cancel,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct StreamableHttpOAuthDiscovery {
pub scopes_supported: Option<Vec<String>>,
}
pub async fn determine_streamable_http_auth_status(
_server_name: &str,
_url: &str,
bearer_token_env_var: Option<&str>,
http_headers: Option<HashMap<String, String>>,
env_http_headers: Option<HashMap<String, String>>,
_store_mode: OAuthCredentialsStoreMode,
) -> Result<McpAuthStatus> {
if bearer_token_env_var.is_some() {
return Ok(McpAuthStatus::BearerToken);
}
let has_auth_header = http_headers
.into_iter()
.flatten()
.chain(env_http_headers.into_iter().flatten())
.any(|(key, _)| key.eq_ignore_ascii_case("authorization"));
if has_auth_header {
return Ok(McpAuthStatus::BearerToken);
}
Ok(McpAuthStatus::Unsupported)
}
pub async fn supports_oauth_login(_url: &str) -> Result<bool> {
Ok(false)
}
pub async fn discover_streamable_http_oauth(
_url: &str,
_http_headers: Option<HashMap<String, String>>,
_env_http_headers: Option<HashMap<String, String>>,
) -> Result<Option<StreamableHttpOAuthDiscovery>> {
Ok(None)
}
#[derive(Debug, Default, Copy, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
#[serde(rename_all = "lowercase")]
pub enum OAuthCredentialsStoreMode {
#[default]
Auto,
File,
Keyring,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct StoredOAuthTokens {
pub server_name: String,
pub url: String,
pub client_id: String,
pub token_response: WrappedOAuthTokenResponse,
#[serde(default)]
pub expires_at: Option<u64>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
pub struct WrappedOAuthTokenResponse(pub serde_json::Value);
pub fn delete_oauth_tokens(
_server_name: &str,
_url: &str,
_store_mode: OAuthCredentialsStoreMode,
) -> Result<bool> {
Ok(false)
}
pub fn save_oauth_tokens(
_server_name: &str,
_tokens: &StoredOAuthTokens,
_store_mode: OAuthCredentialsStoreMode,
) -> Result<()> {
Ok(())
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct OAuthProviderError {
error: Option<String>,
error_description: Option<String>,
}
impl OAuthProviderError {
pub fn new(error: Option<String>, error_description: Option<String>) -> Self {
Self {
error,
error_description,
}
}
}
impl std::fmt::Display for OAuthProviderError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match (self.error.as_deref(), self.error_description.as_deref()) {
(Some(error), Some(error_description)) => {
write!(f, "OAuth provider returned `{error}`: {error_description}")
}
(Some(error), None) => write!(f, "OAuth provider returned `{error}`"),
(None, Some(error_description)) => write!(f, "OAuth error: {error_description}"),
(None, None) => write!(f, "OAuth provider returned an error"),
}
}
}
impl std::error::Error for OAuthProviderError {}
#[allow(clippy::too_many_arguments)]
pub async fn perform_oauth_login(
_server_name: &str,
_server_url: &str,
_store_mode: OAuthCredentialsStoreMode,
_http_headers: Option<HashMap<String, String>>,
_env_http_headers: Option<HashMap<String, String>>,
_scopes: &[String],
_oauth_resource: Option<&str>,
_callback_port: Option<u16>,
_callback_url: Option<&str>,
) -> Result<()> {
Err(anyhow!("MCP OAuth login is unavailable on wasm32"))
}
#[allow(clippy::too_many_arguments)]
pub async fn perform_oauth_login_silent(
_server_name: &str,
_server_url: &str,
_store_mode: OAuthCredentialsStoreMode,
_http_headers: Option<HashMap<String, String>>,
_env_http_headers: Option<HashMap<String, String>>,
_scopes: &[String],
_oauth_resource: Option<&str>,
_callback_port: Option<u16>,
_callback_url: Option<&str>,
) -> Result<()> {
Err(anyhow!("MCP OAuth login is unavailable on wasm32"))
}
#[allow(clippy::too_many_arguments)]
pub async fn perform_oauth_login_return_url(
_server_name: &str,
_server_url: &str,
_store_mode: OAuthCredentialsStoreMode,
_http_headers: Option<HashMap<String, String>>,
_env_http_headers: Option<HashMap<String, String>>,
_scopes: &[String],
_oauth_resource: Option<&str>,
_callback_port: Option<u16>,
_callback_url: Option<&str>,
) -> Result<OauthLoginHandle> {
Err(anyhow!("MCP OAuth login is unavailable on wasm32"))
}
pub struct OauthLoginHandle {
authorization_url: String,
}
impl OauthLoginHandle {
pub fn authorization_url(&self) -> &str {
&self.authorization_url
}
pub fn into_parts(self) -> (String, futures::channel::oneshot::Receiver<Result<()>>) {
let (_tx, rx) = futures::channel::oneshot::channel();
(self.authorization_url, rx)
}
pub async fn wait(self) -> Result<()> {
Err(anyhow!("MCP OAuth login is unavailable on wasm32"))
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
pub struct RequestId(pub String);
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Default)]
pub struct Elicitation {
#[serde(default)]
pub message: Option<String>,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ElicitationResponse {
pub action: ElicitationAction,
pub content: Option<serde_json::Value>,
#[serde(rename = "_meta")]
pub meta: Option<serde_json::Value>,
}
pub type SendElicitation = Box<
dyn Fn(RequestId, Elicitation) -> BoxFuture<'static, Result<ElicitationResponse>> + Send + Sync,
>;
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Default)]
pub struct ToolDefinition {
#[serde(default)]
pub name: String,
}
pub struct ToolWithConnectorId {
pub tool: ToolDefinition,
pub connector_id: Option<String>,
pub connector_name: Option<String>,
pub connector_description: Option<String>,
}
pub struct ListToolsWithConnectorIdResult {
pub next_cursor: Option<String>,
pub tools: Vec<ToolWithConnectorId>,
}
pub struct RmcpClient;
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Default)]
pub struct InitializeRequestParams;
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Default)]
pub struct InitializeResult;
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Default)]
pub struct PaginatedRequestParams {
#[serde(default)]
pub cursor: Option<String>,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Default)]
pub struct ListResourcesResult;
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Default)]
pub struct ListResourceTemplatesResult;
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Default)]
pub struct ReadResourceRequestParams;
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Default)]
pub struct ReadResourceResult;
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Default)]
pub struct CallToolResult;
impl RmcpClient {
pub async fn new_stdio_client(
_program: OsString,
_args: Vec<OsString>,
_env: Option<HashMap<OsString, OsString>>,
_env_vars: &[String],
_cwd: Option<PathBuf>,
) -> std::io::Result<Self> {
Err(std::io::Error::other(
"MCP stdio transport is unavailable on wasm32",
))
}
#[allow(clippy::too_many_arguments)]
pub async fn new_streamable_http_client(
_server_name: &str,
_url: &str,
_bearer_token: Option<String>,
_http_headers: Option<HashMap<String, String>>,
_env_http_headers: Option<HashMap<String, String>>,
_store_mode: OAuthCredentialsStoreMode,
) -> Result<Self> {
Err(anyhow!("MCP HTTP transport is unavailable on wasm32"))
}
pub async fn initialize(
&self,
_params: InitializeRequestParams,
_timeout: Option<Duration>,
_send_elicitation: SendElicitation,
) -> Result<InitializeResult> {
Err(anyhow!("MCP initialize is unavailable on wasm32"))
}
pub async fn list_tools_with_connector_ids(
&self,
_params: Option<PaginatedRequestParams>,
_timeout: Option<Duration>,
) -> Result<ListToolsWithConnectorIdResult> {
Err(anyhow!("MCP tools are unavailable on wasm32"))
}
pub async fn list_resources(
&self,
_params: Option<PaginatedRequestParams>,
_timeout: Option<Duration>,
) -> Result<ListResourcesResult> {
Err(anyhow!("MCP resources are unavailable on wasm32"))
}
pub async fn list_resource_templates(
&self,
_params: Option<PaginatedRequestParams>,
_timeout: Option<Duration>,
) -> Result<ListResourceTemplatesResult> {
Err(anyhow!("MCP resource templates are unavailable on wasm32"))
}
pub async fn read_resource(
&self,
_params: ReadResourceRequestParams,
_timeout: Option<Duration>,
) -> Result<ReadResourceResult> {
Err(anyhow!("MCP resources are unavailable on wasm32"))
}
pub async fn call_tool(
&self,
_name: String,
_arguments: Option<serde_json::Value>,
_meta: Option<serde_json::Value>,
_timeout: Option<Duration>,
) -> Result<CallToolResult> {
Err(anyhow!("MCP tools are unavailable on wasm32"))
}
}

View File

@@ -11,14 +11,16 @@ workspace = true
codex-app-server-protocol = { workspace = true }
codex-code-mode = { workspace = true }
codex-protocol = { workspace = true }
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
rmcp = { workspace = true, default-features = false, features = [
"base64",
"macros",
"schemars",
"server",
] }
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
[dev-dependencies]
pretty_assertions = { workspace = true }

View File

@@ -54,6 +54,7 @@ pub use mcp_resource_tool::create_list_mcp_resource_templates_tool;
pub use mcp_resource_tool::create_list_mcp_resources_tool;
pub use mcp_resource_tool::create_read_mcp_resource_tool;
pub use mcp_tool::mcp_call_tool_result_output_schema;
#[cfg(not(target_arch = "wasm32"))]
pub use mcp_tool::parse_mcp_tool;
pub use request_user_input_tool::create_request_user_input_tool;
pub use responses_api::FreeformTool;
@@ -63,7 +64,9 @@ pub use responses_api::ResponsesApiNamespaceTool;
pub use responses_api::ResponsesApiTool;
pub use responses_api::ToolSearchOutputTool;
pub use responses_api::dynamic_tool_to_responses_api_tool;
#[cfg(not(target_arch = "wasm32"))]
pub use responses_api::mcp_tool_to_deferred_responses_api_tool;
#[cfg(not(target_arch = "wasm32"))]
pub use responses_api::mcp_tool_to_responses_api_tool;
pub use responses_api::tool_definition_to_responses_api_tool;
pub use tool_definition::ToolDefinition;

View File

@@ -1,8 +1,11 @@
#[cfg(not(target_arch = "wasm32"))]
use crate::ToolDefinition;
#[cfg(not(target_arch = "wasm32"))]
use crate::parse_tool_input_schema;
use serde_json::Value as JsonValue;
use serde_json::json;
#[cfg(not(target_arch = "wasm32"))]
pub fn parse_mcp_tool(tool: &rmcp::model::Tool) -> Result<ToolDefinition, serde_json::Error> {
let mut serialized_input_schema = serde_json::Value::Object(tool.input_schema.as_ref().clone());
@@ -55,6 +58,6 @@ pub fn mcp_call_tool_result_output_schema(structured_content_schema: JsonValue)
})
}
#[cfg(test)]
#[cfg(all(test, not(target_arch = "wasm32")))]
#[path = "mcp_tool_tests.rs"]
mod tests;

View File

@@ -1,6 +1,7 @@
use crate::JsonSchema;
use crate::ToolDefinition;
use crate::parse_dynamic_tool;
#[cfg(not(target_arch = "wasm32"))]
use crate::parse_mcp_tool;
use codex_protocol::dynamic_tools::DynamicToolSpec;
use serde::Deserialize;
@@ -68,6 +69,7 @@ pub fn dynamic_tool_to_responses_api_tool(
)?))
}
#[cfg(not(target_arch = "wasm32"))]
pub fn mcp_tool_to_responses_api_tool(
name: String,
tool: &rmcp::model::Tool,
@@ -77,6 +79,7 @@ pub fn mcp_tool_to_responses_api_tool(
))
}
#[cfg(not(target_arch = "wasm32"))]
pub fn mcp_tool_to_deferred_responses_api_tool(
name: String,
tool: &rmcp::model::Tool,
@@ -97,6 +100,6 @@ pub fn tool_definition_to_responses_api_tool(tool_definition: ToolDefinition) ->
}
}
#[cfg(test)]
#[cfg(all(test, not(target_arch = "wasm32")))]
#[path = "responses_api_tests.rs"]
mod tests;

View File

@@ -10,7 +10,6 @@ workspace = true
[dependencies]
dirs = { workspace = true }
path-absolutize = { workspace = true }
schemars = { workspace = true }
serde = { workspace = true, features = ["derive"] }
ts-rs = { workspace = true, features = [
@@ -22,3 +21,6 @@ ts-rs = { workspace = true, features = [
pretty_assertions = { workspace = true }
serde_json = { workspace = true }
tempfile = { workspace = true }
[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
path-absolutize = { workspace = true }

View File

@@ -1,4 +1,5 @@
use dirs::home_dir;
#[cfg(not(target_arch = "wasm32"))]
use path_absolutize::Absolutize;
use schemars::JsonSchema;
use serde::Deserialize;
@@ -6,6 +7,10 @@ use serde::Deserializer;
use serde::Serialize;
use serde::de::Error as SerdeError;
use std::cell::RefCell;
#[cfg(target_arch = "wasm32")]
use std::ffi::OsString;
#[cfg(target_arch = "wasm32")]
use std::path::Component;
use std::path::Display;
use std::path::Path;
use std::path::PathBuf;
@@ -45,13 +50,24 @@ impl AbsolutePathBuf {
base_path: B,
) -> std::io::Result<Self> {
let expanded = Self::maybe_expand_home_directory(path.as_ref());
#[cfg(not(target_arch = "wasm32"))]
let absolute_path = expanded.absolutize_from(base_path.as_ref())?;
#[cfg(target_arch = "wasm32")]
let absolute_path =
normalize_absolute_path(join_against_base(expanded, base_path.as_ref())?);
Ok(Self(absolute_path.into_owned()))
}
pub fn from_absolute_path<P: AsRef<Path>>(path: P) -> std::io::Result<Self> {
let expanded = Self::maybe_expand_home_directory(path.as_ref());
#[cfg(not(target_arch = "wasm32"))]
let absolute_path = expanded.absolutize()?;
#[cfg(target_arch = "wasm32")]
let absolute_path = normalize_absolute_path(if expanded.is_absolute() {
expanded
} else {
join_against_base(expanded, &std::env::current_dir()?)?
});
Ok(Self(absolute_path.into_owned()))
}
@@ -101,6 +117,56 @@ impl AbsolutePathBuf {
}
}
#[cfg(target_arch = "wasm32")]
fn join_against_base(path: PathBuf, base_path: &Path) -> std::io::Result<PathBuf> {
if path.is_absolute() {
return Ok(path);
}
if !base_path.is_absolute() {
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidInput,
"base path must be absolute",
));
}
Ok(base_path.join(path))
}
#[cfg(target_arch = "wasm32")]
fn normalize_absolute_path(path: PathBuf) -> std::borrow::Cow<'static, Path> {
let mut prefix: Option<OsString> = None;
let mut has_root = false;
let mut parts: Vec<OsString> = Vec::new();
for component in path.components() {
match component {
Component::Prefix(value) => {
prefix = Some(value.as_os_str().to_os_string());
}
Component::RootDir => {
has_root = true;
}
Component::CurDir => {}
Component::ParentDir => {
let _ = parts.pop();
}
Component::Normal(part) => parts.push(part.to_os_string()),
}
}
let mut normalized = PathBuf::new();
if let Some(prefix) = prefix {
normalized.push(prefix);
}
if has_root {
normalized.push(Path::new(std::path::MAIN_SEPARATOR_STR));
}
for part in parts {
normalized.push(part);
}
std::borrow::Cow::Owned(normalized)
}
impl AsRef<Path> for AbsolutePathBuf {
fn as_ref(&self) -> &Path {
&self.0

View File

@@ -10,7 +10,9 @@ workspace = true
[dependencies]
lru = { workspace = true }
sha1 = { workspace = true }
tokio = { workspace = true, features = ["sync", "rt", "rt-multi-thread"] }
[dev-dependencies]
tokio = { workspace = true, features = ["macros", "rt", "rt-multi-thread"] }
[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
tokio = { workspace = true, features = ["sync", "rt", "rt-multi-thread"] }

View File

@@ -5,7 +5,13 @@ use std::num::NonZeroUsize;
use lru::LruCache;
use sha1::Digest;
use sha1::Sha1;
#[cfg(target_arch = "wasm32")]
use std::sync::Mutex;
#[cfg(target_arch = "wasm32")]
use std::sync::MutexGuard;
#[cfg(not(target_arch = "wasm32"))]
use tokio::sync::Mutex;
#[cfg(not(target_arch = "wasm32"))]
use tokio::sync::MutexGuard;
/// A minimal LRU cache protected by a Tokio mutex.
@@ -119,6 +125,7 @@ where
}
}
#[cfg(not(target_arch = "wasm32"))]
fn lock_if_runtime<K, V>(m: &Mutex<LruCache<K, V>>) -> Option<MutexGuard<'_, LruCache<K, V>>>
where
K: Eq + Hash,
@@ -127,6 +134,14 @@ where
Some(tokio::task::block_in_place(|| m.blocking_lock()))
}
#[cfg(target_arch = "wasm32")]
fn lock_if_runtime<K, V>(m: &Mutex<LruCache<K, V>>) -> Option<MutexGuard<'_, LruCache<K, V>>>
where
K: Eq + Hash,
{
m.lock().ok()
}
/// Computes the SHA-1 digest of `bytes`.
///
/// Useful for content-based cache keys when you want to avoid staleness
@@ -141,6 +156,7 @@ pub fn sha1_digest(bytes: &[u8]) -> [u8; 20] {
out
}
#[cfg(not(target_arch = "wasm32"))]
#[cfg(test)]
mod tests {
use super::BlockingLruCache;

View File

@@ -13,7 +13,7 @@ image = { workspace = true, features = ["jpeg", "png", "gif", "webp"] }
codex-utils-cache = { workspace = true }
mime_guess = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true, features = ["fs", "rt", "rt-multi-thread", "macros"] }
[dev-dependencies]
image = { workspace = true, features = ["jpeg", "png", "gif", "webp"] }
tokio = { workspace = true, features = ["fs", "rt", "rt-multi-thread", "macros"] }

1
codex-rs/wasm-harness/.gitignore vendored Normal file
View File

@@ -0,0 +1 @@
/examples/pkg/

View File

@@ -0,0 +1,6 @@
load("//:defs.bzl", "codex_rust_crate")
codex_rust_crate(
name = "wasm-harness",
crate_name = "codex_wasm_harness",
)

View File

@@ -0,0 +1,36 @@
[package]
edition.workspace = true
license.workspace = true
name = "codex-wasm-harness"
version.workspace = true
[lib]
crate-type = ["cdylib", "rlib"]
name = "codex_wasm_harness"
path = "src/lib.rs"
[lints]
workspace = true
[features]
real-core = ["dep:codex-core"]
[dependencies]
async-trait = { workspace = true }
codex-core = { workspace = true, optional = true }
js-sys = { workspace = true }
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
wasm-bindgen = { workspace = true }
wasm-bindgen-futures = { workspace = true }
web-sys = { workspace = true, features = [
"Headers",
"RequestInit",
"RequestMode",
"Response",
"Window",
] }
[dev-dependencies]
pretty_assertions = { workspace = true }
tokio = { workspace = true, features = ["macros", "rt"] }

View File

@@ -0,0 +1,78 @@
# Codex WASM Harness Prototype
This crate is the first browser-facing seam for a Codex harness prototype.
It now exposes two layers:
- `EmbeddedHarness`: a pure Rust library API that downstream Rust/web repos can
depend on directly.
- `BrowserCodex`: a `wasm_bindgen` adapter that preserves the current browser
demo API.
It does not yet call `codex-core::run_turn` or `RegularTask::run`. Instead, it
establishes the intended browser API shape:
- submit a prompt from JavaScript;
- stream Codex-shaped turn events back to the page; and
- resolve after a `turn_complete` event.
The sampler is currently implemented in Rust/WASM. The demo page uses a
deterministic local response by default, or passes the user's API key into the
WASM facade so Rust can make a direct browser `fetch` request to the Responses
API.
The API key field is for local prototype use only: it stores the key in browser
`localStorage` and sends it directly from the page. A production browser
integration should use a proxy or an ephemeral-token flow instead of persisting
long-lived API keys in the page origin.
The next step is to replace the callback boundary with a real model transport
and then wire the facade to the Codex turn loop after host services are
injectable.
## Library Boundary
The intended downstream integration point is the pure Rust API:
- `EmbeddedHarness`
- `ResponsesClient`
- `ToolExecutor`
- `EventSink`
- `HarnessConfig`
That allows downstream webapps to:
- depend on `codex-wasm-harness` from a Git branch or local path;
- supply their own browser/runtime implementations for transport, tools, event
rendering, or persistence; and
- keep app-specific browser glue out of the Codex repo.
The current `BrowserCodex` type remains a thin compatibility wrapper around
that library API so the demo page keeps working.
The `real-core` feature is an explicit compile probe for depending on
`codex-core`. It currently does not build for `wasm32-unknown-unknown`; the
first blocker is native Tokio/Mio networking pulled through the host-heavy
`codex-core` dependency graph.
## Current Limitations
This is a boundary prototype, not a port of `codex-core` yet.
- It does not construct a `Session` or `TurnContext`.
- It does not call `RegularTask::run` or `run_turn`.
- It does not expose native Codex tools.
- It emits Codex-shaped events, but not the full protocol event set.
The immediate implementation value is that the browser API and demo page can be
iterated independently while the host-heavy Codex services are moved behind
browser-compatible traits.
## Build Sketch
```sh
rustup target add wasm32-unknown-unknown
codex-rs/wasm-harness/scripts/build-browser-demo.sh
```
Then serve `codex-rs/wasm-harness/examples` and open `/browser/index.html`.

View File

@@ -0,0 +1,411 @@
<!doctype html>
<html lang="en">
<head>
<meta charset="utf-8" />
<meta name="viewport" content="width=device-width, initial-scale=1" />
<title>Codex WASM Harness Prototype</title>
<style>
:root {
color-scheme: light;
font-family:
ui-serif, "Iowan Old Style", "Palatino Linotype", Georgia, serif;
background: #f2eddf;
color: #1f2a24;
}
body {
margin: 0;
min-height: 100vh;
background:
radial-gradient(circle at 20% 10%, #ffc96b 0 9rem, transparent 9rem),
linear-gradient(135deg, #f8f2e3 0%, #dfecdc 48%, #c7dde7 100%);
}
main {
box-sizing: border-box;
max-width: 960px;
margin: 0 auto;
padding: 48px 20px;
}
h1 {
max-width: 760px;
margin: 0 0 12px;
font-size: clamp(2.4rem, 7vw, 5.5rem);
line-height: 0.92;
letter-spacing: -0.06em;
}
p {
max-width: 720px;
font-size: 1.1rem;
}
form {
display: grid;
grid-template-columns: 1fr auto;
gap: 12px;
margin: 32px 0;
}
input,
button {
border: 2px solid #1f2a24;
border-radius: 999px;
font: inherit;
font-size: 1.05rem;
padding: 14px 18px;
}
input {
background: #fffaf0;
}
button {
background: #1f2a24;
color: #fffaf0;
cursor: pointer;
}
label {
display: grid;
gap: 6px;
font-family: ui-monospace, SFMono-Regular, Menlo, monospace;
font-size: 0.82rem;
letter-spacing: 0.08em;
text-transform: uppercase;
}
.api-key-panel {
display: grid;
grid-template-columns: minmax(0, 1fr) auto;
gap: 14px;
align-items: end;
margin: 28px 0 0;
border: 2px solid rgb(31 42 36 / 0.26);
border-radius: 28px;
background: rgb(255 250 240 / 0.68);
padding: 18px;
}
.api-key-panel h2 {
margin: 0 0 8px;
font-size: 1.1rem;
letter-spacing: -0.03em;
}
.api-key-panel p {
margin: 0;
font-size: 0.95rem;
}
.api-key-panel label {
grid-column: 1 / -1;
}
.api-key-actions {
display: flex;
flex-wrap: wrap;
grid-column: 1 / -1;
gap: 10px;
align-items: center;
}
.secondary-button {
background: transparent;
color: #1f2a24;
}
.api-key-status {
font-family: ui-monospace, SFMono-Regular, Menlo, monospace;
font-size: 0.85rem;
}
#events {
display: grid;
gap: 12px;
}
.event {
border: 2px solid rgb(31 42 36 / 0.22);
border-radius: 20px;
background: rgb(255 250 240 / 0.75);
padding: 14px 16px;
box-shadow: 0 16px 40px rgb(31 42 36 / 0.08);
}
.event strong {
display: block;
margin-bottom: 6px;
font-family: ui-monospace, SFMono-Regular, Menlo, monospace;
font-size: 0.82rem;
letter-spacing: 0.08em;
text-transform: uppercase;
}
pre {
margin: 0;
overflow: auto;
white-space: pre-wrap;
}
@media (max-width: 640px) {
form,
.api-key-panel {
grid-template-columns: 1fr;
}
}
</style>
</head>
<body>
<main>
<h1>Codex WASM harness prototype</h1>
<p>
This page exercises the browser boundary: submit a prompt, stream
Codex-shaped events from WASM, and resolve when the turn completes.
</p>
<section class="api-key-panel" aria-labelledby="api-key-title">
<div>
<h2 id="api-key-title">Responses API key</h2>
<p>
Local prototype only. The key is stored in this browser's
localStorage and is sent directly from this page to the Responses
API when you run a turn.
</p>
</div>
<label>
API key
<input
id="api-key"
type="password"
placeholder="sk-..."
autocomplete="off"
spellcheck="false"
/>
</label>
<div class="api-key-actions">
<button id="clear-api-key" class="secondary-button" type="button">
Forget key
</button>
<output id="api-key-status" class="api-key-status"></output>
</div>
</section>
<form id="prompt-form">
<input
id="prompt"
value="write hello world in JavaScript, execute it with exec_js, and tell me the result"
autocomplete="off"
/>
<button type="submit">Run turn</button>
</form>
<section id="events" aria-live="polite"></section>
</main>
<script type="module">
import init, { BrowserCodex } from "../pkg/codex_wasm_harness.js";
await init();
const form = document.querySelector("#prompt-form");
const input = document.querySelector("#prompt");
const events = document.querySelector("#events");
const apiKeyInput = document.querySelector("#api-key");
const apiKeyStatus = document.querySelector("#api-key-status");
const clearApiKeyButton = document.querySelector("#clear-api-key");
const API_KEY_STORAGE_KEY = "codexWasmHarness.apiKey";
const codex = new BrowserCodex(readStoredApiKey());
const sandbox = createSandboxKernel();
apiKeyInput.value = readStoredApiKey();
updateApiKeyStatus();
codex.set_code_executor(async (code) => sandbox.execute(code));
apiKeyInput.addEventListener("input", () => {
const apiKey = apiKeyInput.value.trim();
writeStoredApiKey(apiKey);
codex.set_api_key(apiKey);
updateApiKeyStatus();
});
clearApiKeyButton.addEventListener("click", () => {
apiKeyInput.value = "";
writeStoredApiKey("");
codex.set_api_key("");
updateApiKeyStatus();
});
form.addEventListener("submit", async (event) => {
event.preventDefault();
events.replaceChildren();
try {
await codex.submit_turn(input.value, renderEvent);
} catch (error) {
renderEvent({
type: "page_error",
message: error instanceof Error ? error.message : String(error),
});
}
});
window.addEventListener("error", (event) => {
renderEvent({
type: "window_error",
message: event.message,
});
});
window.addEventListener("unhandledrejection", (event) => {
renderEvent({
type: "unhandled_rejection",
message:
event.reason instanceof Error
? event.reason.message
: String(event.reason),
});
});
function readStoredApiKey() {
try {
return localStorage.getItem(API_KEY_STORAGE_KEY) ?? "";
} catch {
return "";
}
}
function writeStoredApiKey(apiKey) {
try {
if (apiKey) {
localStorage.setItem(API_KEY_STORAGE_KEY, apiKey);
} else {
localStorage.removeItem(API_KEY_STORAGE_KEY);
}
} catch {
apiKeyStatus.textContent =
"localStorage is unavailable in this browser context.";
}
}
function updateApiKeyStatus() {
apiKeyStatus.textContent = apiKeyInput.value
? "API key stored in localStorage for this browser."
: "No API key stored. The deterministic demo sampler will be used.";
}
function renderEvent(event) {
const card = document.createElement("article");
card.className = "event";
const title = document.createElement("strong");
title.textContent = event.type;
const body = document.createElement("pre");
body.textContent = JSON.stringify(event, null, 2);
card.append(title, body);
events.append(card);
}
function createSandboxKernel() {
const iframe = document.createElement("iframe");
iframe.setAttribute("sandbox", "allow-scripts allow-same-origin");
iframe.style.display = "none";
iframe.srcdoc = `
<!doctype html>
<html>
<body>
<script>
function formatValue(value) {
if (typeof value === "string") {
return value;
}
if (value === undefined || value === null) {
return "";
}
try {
return JSON.stringify(value);
} catch {
return String(value);
}
}
window.addEventListener("message", async (event) => {
const data = event.data;
if (!data || data.type !== "exec_js") {
return;
}
const logs = [];
const consoleProxy = {
log: (...args) => logs.push(args.map(formatValue).join(" ")),
error: (...args) => logs.push(args.map(formatValue).join(" ")),
};
try {
const runner = new Function(
"console",
"return (async () => {\\n" + data.code + "\\n})();",
);
const result = await runner(consoleProxy);
const outputParts = logs.filter(Boolean);
const formattedResult = formatValue(result);
if (formattedResult) {
outputParts.push(formattedResult);
}
parent.postMessage(
{
type: "exec_js_result",
requestId: data.requestId,
ok: true,
output: outputParts.join("\\n"),
},
"*",
);
} catch (error) {
parent.postMessage(
{
type: "exec_js_result",
requestId: data.requestId,
ok: false,
error: error instanceof Error ? error.message : String(error),
},
"*",
);
}
});
<\/script>
</body>
</html>`;
document.body.append(iframe);
let nextRequestId = 0;
const pending = new Map();
window.addEventListener("message", (event) => {
const data = event.data;
if (!data || data.type !== "exec_js_result") {
return;
}
const entry = pending.get(data.requestId);
if (!entry) {
return;
}
pending.delete(data.requestId);
if (data.ok) {
entry.resolve(data.output ?? "");
} else {
entry.reject(new Error(data.error ?? "sandbox execution failed"));
}
});
return {
execute(code) {
return new Promise((resolve, reject) => {
nextRequestId += 1;
pending.set(nextRequestId, { resolve, reject });
iframe.contentWindow.postMessage(
{ type: "exec_js", requestId: nextRequestId, code },
"*",
);
});
},
};
}
</script>
</body>
</html>

View File

@@ -0,0 +1,16 @@
#!/usr/bin/env bash
set -euo pipefail
crate_dir="$(cd "$(dirname "${BASH_SOURCE[0]}")/.." && pwd)"
workspace_dir="$(cd "$crate_dir/.." && pwd)"
cd "$workspace_dir"
cargo build \
-p codex-wasm-harness \
--target wasm32-unknown-unknown
wasm-bindgen \
--target web \
--out-dir "$crate_dir/examples/pkg" \
"$workspace_dir/target/wasm32-unknown-unknown/debug/codex_wasm_harness.wasm"

View File

@@ -0,0 +1,271 @@
use crate::EXEC_JS_TOOL_NAME;
use crate::EmbeddedHarness;
use crate::EventSink;
use crate::HarnessConfig;
use crate::HarnessError;
use crate::ResponsesClient;
use crate::ResponsesFunctionCall;
use crate::ResponsesRequest;
use crate::ResponsesResponse;
use crate::ResponsesTool;
use crate::ToolExecutor;
use async_trait::async_trait;
use js_sys::Function;
use js_sys::Promise;
use serde::Deserialize;
use wasm_bindgen::JsCast;
use wasm_bindgen::prelude::*;
use wasm_bindgen_futures::JsFuture;
use web_sys::Headers;
use web_sys::RequestInit;
use web_sys::RequestMode;
use web_sys::Response;
const RESPONSES_API_URL: &str = "https://api.openai.com/v1/responses";
#[derive(Default)]
struct BrowserResponsesClient {
api_key: String,
}
impl BrowserResponsesClient {
fn set_api_key(&mut self, api_key: String) {
self.api_key = api_key;
}
}
#[async_trait(?Send)]
impl ResponsesClient for BrowserResponsesClient {
async fn create_response(
&self,
request: ResponsesRequest,
) -> Result<ResponsesResponse, HarnessError> {
if self.api_key.trim().is_empty() {
return Ok(default_demo_response(&request.input.to_string()));
}
let body = serde_json::to_string(&request)?;
let headers = Headers::new().map_err(js_exception)?;
headers
.append("Authorization", &format!("Bearer {}", self.api_key.trim()))
.map_err(js_exception)?;
headers
.append("Content-Type", "application/json")
.map_err(js_exception)?;
let request_init = RequestInit::new();
request_init.set_method("POST");
request_init.set_mode(RequestMode::Cors);
request_init.set_headers(&headers);
request_init.set_body(&JsValue::from_str(&body));
let window = web_sys::window().ok_or_else(|| HarnessError::new("window is unavailable"))?;
let response_value =
JsFuture::from(window.fetch_with_str_and_init(RESPONSES_API_URL, &request_init))
.await
.map_err(js_fetch_error)?;
let response: Response = response_value.dyn_into().map_err(js_exception)?;
let status = response.status();
let ok = response.ok();
let json = JsFuture::from(response.json().map_err(js_exception)?)
.await
.map_err(js_fetch_error)?;
let response_body = parse_response_body(json)?;
if !ok {
let message = response_body
.error
.as_ref()
.and_then(|err| err.message.clone())
.unwrap_or_else(|| format!("Responses API returned {status}"));
return Err(HarnessError::new(message));
}
Ok(response_body)
}
}
#[derive(Default)]
struct BrowserToolExecutor {
code_executor: Option<Function>,
}
impl BrowserToolExecutor {
fn set_code_executor(&mut self, executor: Function) {
self.code_executor = Some(executor);
}
fn clear_code_executor(&mut self) {
self.code_executor = None;
}
}
#[async_trait(?Send)]
impl ToolExecutor for BrowserToolExecutor {
fn tools(&self) -> Vec<ResponsesTool> {
self.code_executor
.as_ref()
.map(|_| vec![ResponsesTool::exec_js()])
.unwrap_or_default()
}
async fn execute(&self, function_call: &ResponsesFunctionCall) -> Result<String, HarnessError> {
if function_call.name != EXEC_JS_TOOL_NAME {
return Err(HarnessError::new(format!(
"browser prototype does not implement tool `{}`",
function_call.name
)));
}
let executor = self.code_executor.as_ref().ok_or_else(|| {
HarnessError::new("`exec_js` was requested but no browser executor is registered")
})?;
let args: ExecJsArguments = serde_json::from_str(&function_call.arguments)?;
let value = executor
.call1(&JsValue::NULL, &JsValue::from_str(&args.code))
.map_err(js_exception)?;
let value = await_possible_promise(value).await?;
js_value_to_string(value)
}
}
struct JsEventSink<'a> {
on_event: &'a Function,
}
impl EventSink for JsEventSink<'_> {
fn emit(&self, event: &crate::HarnessEvent) -> Result<(), HarnessError> {
let json = serde_json::to_string(event)?;
let value = js_sys::JSON::parse(&json).map_err(js_exception)?;
self.on_event
.call1(&JsValue::NULL, &value)
.map_err(js_exception)?;
Ok(())
}
}
#[derive(Debug, Deserialize)]
struct ExecJsArguments {
code: String,
}
/// Browser entrypoint for the prototype harness.
#[wasm_bindgen]
pub struct BrowserCodex {
harness: EmbeddedHarness<BrowserResponsesClient, BrowserToolExecutor>,
}
#[wasm_bindgen]
impl BrowserCodex {
#[wasm_bindgen(constructor)]
#[must_use]
pub fn new(api_key: String) -> Self {
let mut client = BrowserResponsesClient::default();
client.set_api_key(api_key);
let tool_executor = BrowserToolExecutor::default();
let harness = EmbeddedHarness::new(HarnessConfig::default(), client, tool_executor);
Self { harness }
}
pub fn set_api_key(&mut self, api_key: String) {
self.harness.responses_client_mut().set_api_key(api_key);
}
pub fn set_code_executor(&mut self, executor: Function) {
self.harness.tool_executor_mut().set_code_executor(executor);
}
pub fn clear_code_executor(&mut self) {
self.harness.tool_executor_mut().clear_code_executor();
}
pub async fn submit_turn(
&mut self,
prompt: String,
on_event: Function,
) -> Result<JsValue, JsValue> {
let sink = JsEventSink {
on_event: &on_event,
};
let agent_message = self
.harness
.submit_turn(prompt, &sink)
.await
.map_err(harness_error_to_js)?;
Ok(JsValue::from_str(&agent_message))
}
}
fn parse_response_body(value: JsValue) -> Result<ResponsesResponse, HarnessError> {
let json = js_sys::JSON::stringify(&value)
.map_err(js_exception)?
.as_string()
.ok_or_else(|| HarnessError::new("Responses API returned non-JSON output"))?;
serde_json::from_str(&json).map_err(HarnessError::from)
}
async fn await_possible_promise(value: JsValue) -> Result<JsValue, HarnessError> {
if let Ok(promise) = value.clone().dyn_into::<Promise>() {
JsFuture::from(promise).await.map_err(js_exception)
} else {
Ok(value)
}
}
fn js_value_to_string(value: JsValue) -> Result<String, HarnessError> {
if let Some(text) = value.as_string() {
return Ok(text);
}
if value.is_undefined() || value.is_null() {
return Ok(String::new());
}
let json = js_sys::JSON::stringify(&value).map_err(js_exception)?;
Ok(json
.as_string()
.unwrap_or_else(|| "[non-string value]".to_string()))
}
fn js_exception(error: JsValue) -> HarnessError {
HarnessError::new(js_value_to_string_lossy(&error))
}
fn js_fetch_error(error: JsValue) -> HarnessError {
HarnessError::new(format!(
"browser fetch failed: {}",
js_value_to_string_lossy(&error)
))
}
fn js_value_to_string_lossy(value: &JsValue) -> String {
if let Some(text) = value.as_string() {
return text;
}
js_sys::JSON::stringify(value)
.ok()
.and_then(|text| text.as_string())
.unwrap_or_else(|| "[non-string javascript error]".to_string())
}
fn harness_error_to_js(error: HarnessError) -> JsValue {
JsValue::from_str(error.message())
}
fn default_demo_response(input: &str) -> ResponsesResponse {
let prompt = serde_json::from_str::<String>(input).unwrap_or_else(|_| input.to_string());
let output_text = if prompt.to_ascii_lowercase().contains("hello world") {
"Here is a minimal hello world example:\n\n```js\nconsole.log(\"hello world\");\n```"
.to_string()
} else {
format!("Demo mode is active because no API key was provided. Prompt received:\n\n{prompt}")
};
ResponsesResponse {
id: Some("demo-response".to_string()),
output_text: Some(output_text),
output: Some(Vec::new()),
error: None,
}
}

View File

@@ -0,0 +1,43 @@
use std::error::Error;
use std::fmt;
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct HarnessError(String);
impl HarnessError {
#[must_use]
pub fn new(message: impl Into<String>) -> Self {
Self(message.into())
}
#[must_use]
pub fn message(&self) -> &str {
&self.0
}
}
impl fmt::Display for HarnessError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.0.fmt(f)
}
}
impl Error for HarnessError {}
impl From<&str> for HarnessError {
fn from(value: &str) -> Self {
Self::new(value)
}
}
impl From<String> for HarnessError {
fn from(value: String) -> Self {
Self::new(value)
}
}
impl From<serde_json::Error> for HarnessError {
fn from(value: serde_json::Error) -> Self {
Self::new(value.to_string())
}
}

View File

@@ -0,0 +1,374 @@
use crate::HarnessError;
use crate::responses::HarnessEvent;
use crate::responses::ResponsesFunctionCall;
use crate::responses::ResponsesRequest;
use crate::responses::ResponsesResponse;
use crate::responses::ResponsesTool;
use crate::responses::build_browser_instructions;
use crate::responses::tool_output_item;
use async_trait::async_trait;
use serde_json::Value;
const DEFAULT_MODEL: &str = "gpt-5.1";
const DEFAULT_MAX_TOOL_ROUNDS: usize = 8;
#[derive(Clone, Debug)]
pub struct HarnessConfig {
pub model: String,
pub instructions: String,
pub max_tool_rounds: usize,
}
impl Default for HarnessConfig {
fn default() -> Self {
Self {
model: DEFAULT_MODEL.to_string(),
instructions: build_browser_instructions(),
max_tool_rounds: DEFAULT_MAX_TOOL_ROUNDS,
}
}
}
pub trait EventSink {
fn emit(&self, event: &HarnessEvent) -> Result<(), HarnessError>;
}
#[async_trait(?Send)]
pub trait ResponsesClient {
async fn create_response(
&self,
request: ResponsesRequest,
) -> Result<ResponsesResponse, HarnessError>;
}
#[async_trait(?Send)]
pub trait ToolExecutor {
fn tools(&self) -> Vec<ResponsesTool>;
async fn execute(&self, function_call: &ResponsesFunctionCall) -> Result<String, HarnessError>;
}
pub struct EmbeddedHarness<C, T> {
config: HarnessConfig,
responses_client: C,
tool_executor: T,
next_turn_id: u32,
}
impl<C, T> EmbeddedHarness<C, T> {
#[must_use]
pub fn new(config: HarnessConfig, responses_client: C, tool_executor: T) -> Self {
Self {
config,
responses_client,
tool_executor,
next_turn_id: 0,
}
}
pub fn responses_client_mut(&mut self) -> &mut C {
&mut self.responses_client
}
pub fn tool_executor_mut(&mut self) -> &mut T {
&mut self.tool_executor
}
pub fn config_mut(&mut self) -> &mut HarnessConfig {
&mut self.config
}
}
impl<C, T> EmbeddedHarness<C, T>
where
C: ResponsesClient,
T: ToolExecutor,
{
pub async fn submit_turn<E: EventSink>(
&mut self,
prompt: impl Into<String>,
event_sink: &E,
) -> Result<String, HarnessError> {
self.next_turn_id += 1;
let turn_id = format!("browser-turn-{}", self.next_turn_id);
let prompt = prompt.into();
event_sink.emit(&HarnessEvent::TurnStarted {
turn_id: turn_id.clone(),
model_context_window: None,
collaboration_mode_kind: "default".to_string(),
})?;
event_sink.emit(&HarnessEvent::UserMessage {
turn_id: turn_id.clone(),
message: prompt.clone(),
})?;
let result = self.run_turn(&turn_id, &prompt, event_sink).await;
match result {
Ok(last_agent_message) => {
event_sink.emit(&HarnessEvent::TurnComplete {
turn_id,
last_agent_message: Some(last_agent_message.clone()),
})?;
Ok(last_agent_message)
}
Err(err) => {
event_sink.emit(&HarnessEvent::TurnError {
turn_id: turn_id.clone(),
message: err.to_string(),
})?;
event_sink.emit(&HarnessEvent::TurnComplete {
turn_id,
last_agent_message: None,
})?;
Err(err)
}
}
}
async fn run_turn<E: EventSink>(
&self,
turn_id: &str,
prompt: &str,
event_sink: &E,
) -> Result<String, HarnessError> {
let mut previous_response_id: Option<String> = None;
let mut input = Value::String(prompt.to_string());
let mut last_agent_message: Option<String> = None;
for round in 0..self.config.max_tool_rounds {
let tools = self.tool_executor.tools();
let request = ResponsesRequest {
model: self.config.model.clone(),
instructions: self.config.instructions.clone(),
input,
previous_response_id: previous_response_id.clone(),
tools: (!tools.is_empty()).then_some(tools),
parallel_tool_calls: false,
};
let response = self.responses_client.create_response(request).await?;
previous_response_id = response.id.clone();
let agent_message = response.response_text();
if !agent_message.is_empty() {
event_sink.emit(&HarnessEvent::AgentMessageDelta {
turn_id: turn_id.to_string(),
delta: agent_message.clone(),
})?;
event_sink.emit(&HarnessEvent::AgentMessage {
turn_id: turn_id.to_string(),
message: agent_message.clone(),
})?;
last_agent_message = Some(agent_message);
}
let function_calls = response.function_calls()?;
if function_calls.is_empty() {
return Ok(last_agent_message.unwrap_or_else(|| {
"Responses API returned no assistant message.".to_string()
}));
}
let response_id = previous_response_id.clone().ok_or_else(|| {
HarnessError::new("Responses API omitted response.id for a tool-calling turn")
})?;
let mut tool_outputs = Vec::with_capacity(function_calls.len());
for function_call in function_calls {
event_sink.emit(&HarnessEvent::ToolCallStarted {
turn_id: turn_id.to_string(),
response_id: response_id.clone(),
call_id: function_call.call_id.clone(),
name: function_call.name.clone(),
arguments: function_call.arguments.clone(),
})?;
let output = self.tool_executor.execute(&function_call).await?;
event_sink.emit(&HarnessEvent::ToolCallCompleted {
turn_id: turn_id.to_string(),
response_id: response_id.clone(),
call_id: function_call.call_id.clone(),
name: function_call.name.clone(),
output: output.clone(),
})?;
tool_outputs.push(tool_output_item(&function_call.call_id, output));
}
input = Value::Array(tool_outputs);
if round + 1 == self.config.max_tool_rounds {
return Err(HarnessError::new(
"turn exceeded the browser tool-round limit",
));
}
}
Err(HarnessError::new("browser turn loop exited unexpectedly"))
}
}
#[cfg(test)]
mod tests {
use super::EmbeddedHarness;
use super::EventSink;
use super::HarnessConfig;
use super::ResponsesClient;
use super::ToolExecutor;
use crate::HarnessError;
use crate::responses::HarnessEvent;
use crate::responses::ResponsesFunctionCall;
use crate::responses::ResponsesRequest;
use crate::responses::ResponsesResponse;
use crate::responses::ResponsesTool;
use async_trait::async_trait;
use pretty_assertions::assert_eq;
use std::cell::RefCell;
use std::collections::VecDeque;
struct RecordingEventSink {
events: RefCell<Vec<HarnessEvent>>,
}
impl RecordingEventSink {
fn new() -> Self {
Self {
events: RefCell::new(Vec::new()),
}
}
fn events(&self) -> Vec<HarnessEvent> {
self.events.borrow().clone()
}
}
impl EventSink for RecordingEventSink {
fn emit(&self, event: &HarnessEvent) -> Result<(), HarnessError> {
self.events.borrow_mut().push(event.clone());
Ok(())
}
}
struct FakeResponsesClient {
responses: RefCell<VecDeque<ResponsesResponse>>,
requests: RefCell<Vec<ResponsesRequest>>,
}
impl FakeResponsesClient {
fn new(responses: Vec<ResponsesResponse>) -> Self {
Self {
responses: RefCell::new(responses.into()),
requests: RefCell::new(Vec::new()),
}
}
}
#[async_trait(?Send)]
impl ResponsesClient for FakeResponsesClient {
async fn create_response(
&self,
request: ResponsesRequest,
) -> Result<ResponsesResponse, HarnessError> {
self.requests.borrow_mut().push(request);
self.responses
.borrow_mut()
.pop_front()
.ok_or_else(|| HarnessError::new("no fake response available"))
}
}
struct FakeToolExecutor;
#[async_trait(?Send)]
impl ToolExecutor for FakeToolExecutor {
fn tools(&self) -> Vec<ResponsesTool> {
vec![ResponsesTool::exec_js()]
}
async fn execute(
&self,
function_call: &ResponsesFunctionCall,
) -> Result<String, HarnessError> {
assert_eq!(function_call.name, "exec_js");
Ok("Hello, world!".to_string())
}
}
#[tokio::test]
async fn embedded_harness_completes_tool_turn() {
let responses = vec![
serde_json::from_str(
r#"{
"id": "resp_1",
"output": [
{
"type": "function_call",
"call_id": "call_1",
"name": "exec_js",
"arguments": "{\"code\":\"console.log('Hello, world!')\"}"
}
]
}"#,
)
.expect("response should deserialize"),
serde_json::from_str(
r#"{
"id": "resp_2",
"output_text": "Done."
}"#,
)
.expect("response should deserialize"),
];
let client = FakeResponsesClient::new(responses);
let tool_executor = FakeToolExecutor;
let sink = RecordingEventSink::new();
let mut harness = EmbeddedHarness::new(HarnessConfig::default(), client, tool_executor);
let result = harness
.submit_turn("write hello world", &sink)
.await
.expect("turn should succeed");
assert_eq!(result, "Done.");
assert_eq!(
sink.events(),
vec![
HarnessEvent::TurnStarted {
turn_id: "browser-turn-1".to_string(),
model_context_window: None,
collaboration_mode_kind: "default".to_string(),
},
HarnessEvent::UserMessage {
turn_id: "browser-turn-1".to_string(),
message: "write hello world".to_string(),
},
HarnessEvent::ToolCallStarted {
turn_id: "browser-turn-1".to_string(),
response_id: "resp_1".to_string(),
call_id: "call_1".to_string(),
name: "exec_js".to_string(),
arguments: r#"{"code":"console.log('Hello, world!')"}"#.to_string(),
},
HarnessEvent::ToolCallCompleted {
turn_id: "browser-turn-1".to_string(),
response_id: "resp_1".to_string(),
call_id: "call_1".to_string(),
name: "exec_js".to_string(),
output: "Hello, world!".to_string(),
},
HarnessEvent::AgentMessageDelta {
turn_id: "browser-turn-1".to_string(),
delta: "Done.".to_string(),
},
HarnessEvent::AgentMessage {
turn_id: "browser-turn-1".to_string(),
message: "Done.".to_string(),
},
HarnessEvent::TurnComplete {
turn_id: "browser-turn-1".to_string(),
last_agent_message: Some("Done.".to_string()),
},
]
);
}
}

View File

@@ -0,0 +1,18 @@
mod browser;
mod error;
mod harness;
mod responses;
pub use browser::BrowserCodex;
pub use error::HarnessError;
pub use harness::EmbeddedHarness;
pub use harness::EventSink;
pub use harness::HarnessConfig;
pub use harness::ResponsesClient;
pub use harness::ToolExecutor;
pub use responses::EXEC_JS_TOOL_NAME;
pub use responses::HarnessEvent;
pub use responses::ResponsesFunctionCall;
pub use responses::ResponsesRequest;
pub use responses::ResponsesResponse;
pub use responses::ResponsesTool;

View File

@@ -0,0 +1,276 @@
use crate::HarnessError;
use serde::Deserialize;
use serde::Serialize;
use serde_json::Value;
const BASE_INSTRUCTIONS: &str = include_str!("../../core/prompt.md");
const BROWSER_TOOLING_NOTE: &str = concat!(
"Browser prototype note:\n",
"- Only one host tool is available: `exec_js`.\n",
"- `exec_js` runs JavaScript inside a browser-managed sandbox and returns stdout-like text.\n",
"- Native shell, filesystem, MCP, and plugin tools are unavailable in this prototype."
);
pub const EXEC_JS_TOOL_NAME: &str = "exec_js";
#[derive(Clone, Debug, Serialize)]
pub struct ResponsesRequest {
pub model: String,
pub instructions: String,
pub input: Value,
#[serde(skip_serializing_if = "Option::is_none")]
pub previous_response_id: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub tools: Option<Vec<ResponsesTool>>,
pub parallel_tool_calls: bool,
}
#[derive(Clone, Debug, Serialize, PartialEq)]
pub struct ResponsesTool {
#[serde(rename = "type")]
pub kind: String,
pub name: String,
pub description: String,
pub parameters: Value,
}
impl ResponsesTool {
#[must_use]
pub fn function(
name: impl Into<String>,
description: impl Into<String>,
parameters: Value,
) -> Self {
Self {
kind: "function".to_string(),
name: name.into(),
description: description.into(),
parameters,
}
}
#[must_use]
pub fn exec_js() -> Self {
Self::function(
EXEC_JS_TOOL_NAME,
"Execute JavaScript inside the browser sandbox and return the textual result.",
serde_json::json!({
"type": "object",
"properties": {
"code": {
"type": "string",
"description": "JavaScript source code to execute in the browser sandbox."
}
},
"required": ["code"],
"additionalProperties": false
}),
)
}
}
#[derive(Clone, Debug, Serialize, PartialEq)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum HarnessEvent {
TurnStarted {
turn_id: String,
model_context_window: Option<i64>,
collaboration_mode_kind: String,
},
UserMessage {
turn_id: String,
message: String,
},
AgentMessageDelta {
turn_id: String,
delta: String,
},
AgentMessage {
turn_id: String,
message: String,
},
ToolCallStarted {
turn_id: String,
response_id: String,
call_id: String,
name: String,
arguments: String,
},
ToolCallCompleted {
turn_id: String,
response_id: String,
call_id: String,
name: String,
output: String,
},
TurnError {
turn_id: String,
message: String,
},
TurnComplete {
turn_id: String,
last_agent_message: Option<String>,
},
}
#[derive(Clone, Debug, Deserialize)]
pub struct ResponsesResponse {
pub id: Option<String>,
pub output_text: Option<String>,
pub output: Option<Vec<ResponsesOutputItem>>,
pub error: Option<ResponsesError>,
}
#[derive(Clone, Debug, Deserialize)]
pub struct ResponsesError {
pub message: Option<String>,
}
#[derive(Clone, Debug, Deserialize)]
pub struct ResponsesOutputItem {
#[serde(rename = "type")]
pub kind: String,
pub call_id: Option<String>,
pub name: Option<String>,
pub arguments: Option<String>,
pub content: Option<Vec<ResponsesContentItem>>,
}
#[derive(Clone, Debug, Deserialize)]
pub struct ResponsesContentItem {
pub text: Option<String>,
pub output_text: Option<String>,
}
#[derive(Clone, Debug, PartialEq)]
pub struct ResponsesFunctionCall {
pub call_id: String,
pub name: String,
pub arguments: String,
}
impl ResponsesResponse {
#[must_use]
pub fn response_text(&self) -> String {
if let Some(output_text) = &self.output_text
&& !output_text.is_empty()
{
return output_text.clone();
}
let mut chunks = Vec::new();
for item in self.output.as_ref().into_iter().flatten() {
for content in item.content.as_ref().into_iter().flatten() {
if let Some(text) = &content.text {
chunks.push(text.clone());
} else if let Some(output_text) = &content.output_text {
chunks.push(output_text.clone());
}
}
}
chunks.join("\n")
}
pub fn function_calls(&self) -> Result<Vec<ResponsesFunctionCall>, HarnessError> {
let mut function_calls = Vec::new();
for item in self.output.as_ref().into_iter().flatten() {
if item.kind != "function_call" {
continue;
}
let call_id = item.call_id.clone().ok_or_else(|| {
HarnessError::new("Responses API function_call item omitted call_id")
})?;
let name = item.name.clone().ok_or_else(|| {
HarnessError::new("Responses API function_call item omitted name")
})?;
let arguments = item.arguments.clone().ok_or_else(|| {
HarnessError::new("Responses API function_call item omitted arguments")
})?;
function_calls.push(ResponsesFunctionCall {
call_id,
name,
arguments,
});
}
Ok(function_calls)
}
}
#[must_use]
pub fn tool_output_item(call_id: &str, output: String) -> Value {
serde_json::json!({
"type": "function_call_output",
"call_id": call_id,
"output": output,
})
}
#[must_use]
pub fn build_browser_instructions() -> String {
format!("{BASE_INSTRUCTIONS}\n\n{BROWSER_TOOLING_NOTE}")
}
#[cfg(test)]
mod tests {
use super::ResponsesResponse;
use super::ResponsesTool;
use super::build_browser_instructions;
use pretty_assertions::assert_eq;
#[test]
fn browser_instructions_append_browser_note() {
let instructions = build_browser_instructions();
assert!(instructions.contains("Only one host tool is available: `exec_js`"));
assert!(instructions.contains("You are a coding agent running in the Codex CLI"));
}
#[test]
fn extracts_output_text_when_present() {
let response: ResponsesResponse = serde_json::from_str(
r#"{
"id": "resp_123",
"output_text": "final answer",
"output": []
}"#,
)
.expect("response should deserialize");
assert_eq!(response.response_text(), "final answer");
}
#[test]
fn extracts_function_calls_from_output_items() {
let response: ResponsesResponse = serde_json::from_str(
r#"{
"id": "resp_123",
"output": [
{
"type": "function_call",
"call_id": "call_123",
"name": "exec_js",
"arguments": "{\"code\":\"console.log('hi')\"}"
}
]
}"#,
)
.expect("response should deserialize");
let function_calls = response
.function_calls()
.expect("function calls should parse");
assert_eq!(function_calls.len(), 1);
assert_eq!(function_calls[0].call_id, "call_123");
assert_eq!(function_calls[0].name, "exec_js");
assert_eq!(
function_calls[0].arguments,
r#"{"code":"console.log('hi')"}"#
);
}
#[test]
fn exec_tool_schema_is_function_tool() {
let tool = ResponsesTool::exec_js();
assert_eq!(tool.kind, "function");
assert_eq!(tool.name, "exec_js");
}
}