mirror of
https://github.com/openai/codex.git
synced 2026-05-18 02:02:30 +00:00
Split codex-core into smaller crates
Move several leaf and runtime surfaces out of codex-core so small core edits compile less code. Extract JS/V8 runtime, JS REPL assets, file watching, config editing/loading, tool approval templates, message history, review helpers, memory prompts, agent runtime/control helpers, turn state, and tool spec planning into dedicated crates while keeping codex-core as the orchestration layer. Validation on dev: - bazel build //codex-rs/config-loader:config-loader //codex-rs/session-runtime:session-runtime //codex-rs/tool-spec:tool-spec //codex-rs/agent-runtime:agent-runtime //codex-rs/core:core //codex-rs/core:core-unit-tests-bin - wider combined Bazel build including app-server/code-mode/js-repl/review split crates - just bazel-lock-check Co-authored-by: Codex <noreply@openai.com>
This commit is contained in:
6
codex-rs/code-mode-runtime/BUILD.bazel
Normal file
6
codex-rs/code-mode-runtime/BUILD.bazel
Normal file
@@ -0,0 +1,6 @@
|
||||
load("//:defs.bzl", "codex_rust_crate")
|
||||
|
||||
codex_rust_crate(
|
||||
name = "code-mode-runtime",
|
||||
crate_name = "codex_code_mode_runtime",
|
||||
)
|
||||
28
codex-rs/code-mode-runtime/Cargo.toml
Normal file
28
codex-rs/code-mode-runtime/Cargo.toml
Normal file
@@ -0,0 +1,28 @@
|
||||
[package]
|
||||
edition.workspace = true
|
||||
license.workspace = true
|
||||
name = "codex-code-mode-runtime"
|
||||
version.workspace = true
|
||||
|
||||
[lib]
|
||||
doctest = false
|
||||
name = "codex_code_mode_runtime"
|
||||
path = "src/lib.rs"
|
||||
|
||||
[lints]
|
||||
workspace = true
|
||||
|
||||
[dependencies]
|
||||
async-channel = { workspace = true }
|
||||
async-trait = { workspace = true }
|
||||
codex-code-mode = { workspace = true }
|
||||
codex-protocol = { workspace = true }
|
||||
deno_core_icudata = { workspace = true }
|
||||
serde_json = { workspace = true }
|
||||
tokio = { workspace = true, features = ["macros", "rt", "sync", "time"] }
|
||||
tokio-util = { workspace = true, features = ["rt"] }
|
||||
tracing = { workspace = true }
|
||||
v8 = { workspace = true }
|
||||
|
||||
[dev-dependencies]
|
||||
pretty_assertions = { workspace = true }
|
||||
14
codex-rs/code-mode-runtime/src/lib.rs
Normal file
14
codex-rs/code-mode-runtime/src/lib.rs
Normal file
@@ -0,0 +1,14 @@
|
||||
mod runtime;
|
||||
mod service;
|
||||
|
||||
pub use codex_code_mode::CodeModeTurnHost;
|
||||
pub use codex_code_mode::*;
|
||||
pub use runtime::DEFAULT_EXEC_YIELD_TIME_MS;
|
||||
pub use runtime::DEFAULT_MAX_OUTPUT_TOKENS_PER_EXEC_CALL;
|
||||
pub use runtime::DEFAULT_WAIT_YIELD_TIME_MS;
|
||||
pub use runtime::ExecuteRequest;
|
||||
pub use runtime::RuntimeResponse;
|
||||
pub use runtime::WaitRequest;
|
||||
pub use service::CodeModeService;
|
||||
pub use service::CodeModeTurnWorker;
|
||||
pub use service::runtime_factory;
|
||||
274
codex-rs/code-mode-runtime/src/runtime/callbacks.rs
Normal file
274
codex-rs/code-mode-runtime/src/runtime/callbacks.rs
Normal file
@@ -0,0 +1,274 @@
|
||||
use codex_code_mode::FunctionCallOutputContentItem;
|
||||
|
||||
use super::EXIT_SENTINEL;
|
||||
use super::RuntimeEvent;
|
||||
use super::RuntimeState;
|
||||
use super::timers;
|
||||
use super::value::json_to_v8;
|
||||
use super::value::normalize_output_image;
|
||||
use super::value::serialize_output_text;
|
||||
use super::value::throw_type_error;
|
||||
use super::value::v8_value_to_json;
|
||||
|
||||
pub(super) fn tool_callback(
|
||||
scope: &mut v8::PinScope<'_, '_>,
|
||||
args: v8::FunctionCallbackArguments,
|
||||
mut retval: v8::ReturnValue<v8::Value>,
|
||||
) {
|
||||
let tool_index = match args.data().to_rust_string_lossy(scope).parse::<usize>() {
|
||||
Ok(tool_index) => tool_index,
|
||||
Err(_) => {
|
||||
throw_type_error(scope, "invalid tool callback data");
|
||||
return;
|
||||
}
|
||||
};
|
||||
let input = if args.length() == 0 {
|
||||
Ok(None)
|
||||
} else {
|
||||
v8_value_to_json(scope, args.get(0))
|
||||
};
|
||||
let input = match input {
|
||||
Ok(input) => input,
|
||||
Err(error_text) => {
|
||||
throw_type_error(scope, &error_text);
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
let Some(resolver) = v8::PromiseResolver::new(scope) else {
|
||||
throw_type_error(scope, "failed to create tool promise");
|
||||
return;
|
||||
};
|
||||
let promise = resolver.get_promise(scope);
|
||||
|
||||
let resolver = v8::Global::new(scope, resolver);
|
||||
let tool_name = {
|
||||
let Some(state) = scope.get_slot::<RuntimeState>() else {
|
||||
throw_type_error(scope, "runtime state unavailable");
|
||||
return;
|
||||
};
|
||||
let Some(tool_name) = state
|
||||
.enabled_tools
|
||||
.get(tool_index)
|
||||
.map(|tool| tool.tool_name.clone())
|
||||
else {
|
||||
throw_type_error(scope, "tool callback data is out of range");
|
||||
return;
|
||||
};
|
||||
tool_name
|
||||
};
|
||||
|
||||
let Some(state) = scope.get_slot_mut::<RuntimeState>() else {
|
||||
throw_type_error(scope, "runtime state unavailable");
|
||||
return;
|
||||
};
|
||||
let id = format!("tool-{}", state.next_tool_call_id);
|
||||
state.next_tool_call_id = state.next_tool_call_id.saturating_add(1);
|
||||
let event_tx = state.event_tx.clone();
|
||||
state.pending_tool_calls.insert(id.clone(), resolver);
|
||||
let _ = event_tx.send(RuntimeEvent::ToolCall {
|
||||
id,
|
||||
name: tool_name,
|
||||
input,
|
||||
});
|
||||
retval.set(promise.into());
|
||||
}
|
||||
|
||||
pub(super) fn text_callback(
|
||||
scope: &mut v8::PinScope<'_, '_>,
|
||||
args: v8::FunctionCallbackArguments,
|
||||
mut retval: v8::ReturnValue<v8::Value>,
|
||||
) {
|
||||
let value = if args.length() == 0 {
|
||||
v8::undefined(scope).into()
|
||||
} else {
|
||||
args.get(0)
|
||||
};
|
||||
let text = match serialize_output_text(scope, value) {
|
||||
Ok(text) => text,
|
||||
Err(error_text) => {
|
||||
throw_type_error(scope, &error_text);
|
||||
return;
|
||||
}
|
||||
};
|
||||
if let Some(state) = scope.get_slot::<RuntimeState>() {
|
||||
let _ = state.event_tx.send(RuntimeEvent::ContentItem(
|
||||
FunctionCallOutputContentItem::InputText { text },
|
||||
));
|
||||
}
|
||||
retval.set(v8::undefined(scope).into());
|
||||
}
|
||||
|
||||
pub(super) fn image_callback(
|
||||
scope: &mut v8::PinScope<'_, '_>,
|
||||
args: v8::FunctionCallbackArguments,
|
||||
mut retval: v8::ReturnValue<v8::Value>,
|
||||
) {
|
||||
let value = if args.length() == 0 {
|
||||
v8::undefined(scope).into()
|
||||
} else {
|
||||
args.get(0)
|
||||
};
|
||||
let detail_override = if args.length() < 2 {
|
||||
None
|
||||
} else {
|
||||
let detail = args.get(1);
|
||||
if detail.is_string() {
|
||||
Some(detail.to_rust_string_lossy(scope))
|
||||
} else if detail.is_null() || detail.is_undefined() {
|
||||
None
|
||||
} else {
|
||||
throw_type_error(scope, "image detail must be a string when provided");
|
||||
return;
|
||||
}
|
||||
};
|
||||
let image_item = match normalize_output_image(scope, value, detail_override) {
|
||||
Ok(image_item) => image_item,
|
||||
Err(()) => return,
|
||||
};
|
||||
if let Some(state) = scope.get_slot::<RuntimeState>() {
|
||||
let _ = state.event_tx.send(RuntimeEvent::ContentItem(image_item));
|
||||
}
|
||||
retval.set(v8::undefined(scope).into());
|
||||
}
|
||||
|
||||
pub(super) fn store_callback(
|
||||
scope: &mut v8::PinScope<'_, '_>,
|
||||
args: v8::FunctionCallbackArguments,
|
||||
_retval: v8::ReturnValue<v8::Value>,
|
||||
) {
|
||||
let key = match args.get(0).to_string(scope) {
|
||||
Some(key) => key.to_rust_string_lossy(scope),
|
||||
None => {
|
||||
throw_type_error(scope, "store key must be a string");
|
||||
return;
|
||||
}
|
||||
};
|
||||
let value = args.get(1);
|
||||
let serialized = match v8_value_to_json(scope, value) {
|
||||
Ok(Some(value)) => value,
|
||||
Ok(None) => {
|
||||
throw_type_error(
|
||||
scope,
|
||||
&format!("Unable to store {key:?}. Only plain serializable objects can be stored."),
|
||||
);
|
||||
return;
|
||||
}
|
||||
Err(error_text) => {
|
||||
throw_type_error(scope, &error_text);
|
||||
return;
|
||||
}
|
||||
};
|
||||
if let Some(state) = scope.get_slot_mut::<RuntimeState>() {
|
||||
state.stored_values.insert(key, serialized);
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) fn load_callback(
|
||||
scope: &mut v8::PinScope<'_, '_>,
|
||||
args: v8::FunctionCallbackArguments,
|
||||
mut retval: v8::ReturnValue<v8::Value>,
|
||||
) {
|
||||
let key = match args.get(0).to_string(scope) {
|
||||
Some(key) => key.to_rust_string_lossy(scope),
|
||||
None => {
|
||||
throw_type_error(scope, "load key must be a string");
|
||||
return;
|
||||
}
|
||||
};
|
||||
let value = scope
|
||||
.get_slot::<RuntimeState>()
|
||||
.and_then(|state| state.stored_values.get(&key))
|
||||
.cloned();
|
||||
let Some(value) = value else {
|
||||
retval.set(v8::undefined(scope).into());
|
||||
return;
|
||||
};
|
||||
let Some(value) = json_to_v8(scope, &value) else {
|
||||
throw_type_error(scope, "failed to load stored value");
|
||||
return;
|
||||
};
|
||||
retval.set(value);
|
||||
}
|
||||
|
||||
pub(super) fn notify_callback(
|
||||
scope: &mut v8::PinScope<'_, '_>,
|
||||
args: v8::FunctionCallbackArguments,
|
||||
mut retval: v8::ReturnValue<v8::Value>,
|
||||
) {
|
||||
let value = if args.length() == 0 {
|
||||
v8::undefined(scope).into()
|
||||
} else {
|
||||
args.get(0)
|
||||
};
|
||||
let text = match serialize_output_text(scope, value) {
|
||||
Ok(text) => text,
|
||||
Err(error_text) => {
|
||||
throw_type_error(scope, &error_text);
|
||||
return;
|
||||
}
|
||||
};
|
||||
if text.trim().is_empty() {
|
||||
throw_type_error(scope, "notify expects non-empty text");
|
||||
return;
|
||||
}
|
||||
if let Some(state) = scope.get_slot::<RuntimeState>() {
|
||||
let _ = state.event_tx.send(RuntimeEvent::Notify {
|
||||
call_id: state.tool_call_id.clone(),
|
||||
text,
|
||||
});
|
||||
}
|
||||
retval.set(v8::undefined(scope).into());
|
||||
}
|
||||
|
||||
pub(super) fn set_timeout_callback(
|
||||
scope: &mut v8::PinScope<'_, '_>,
|
||||
args: v8::FunctionCallbackArguments,
|
||||
mut retval: v8::ReturnValue<v8::Value>,
|
||||
) {
|
||||
let timeout_id = match timers::schedule_timeout(scope, args) {
|
||||
Ok(timeout_id) => timeout_id,
|
||||
Err(error_text) => {
|
||||
throw_type_error(scope, &error_text);
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
retval.set(v8::Number::new(scope, timeout_id as f64).into());
|
||||
}
|
||||
|
||||
pub(super) fn clear_timeout_callback(
|
||||
scope: &mut v8::PinScope<'_, '_>,
|
||||
args: v8::FunctionCallbackArguments,
|
||||
mut retval: v8::ReturnValue<v8::Value>,
|
||||
) {
|
||||
if let Err(error_text) = timers::clear_timeout(scope, args) {
|
||||
throw_type_error(scope, &error_text);
|
||||
return;
|
||||
}
|
||||
|
||||
retval.set(v8::undefined(scope).into());
|
||||
}
|
||||
|
||||
pub(super) fn yield_control_callback(
|
||||
scope: &mut v8::PinScope<'_, '_>,
|
||||
_args: v8::FunctionCallbackArguments,
|
||||
_retval: v8::ReturnValue<v8::Value>,
|
||||
) {
|
||||
if let Some(state) = scope.get_slot::<RuntimeState>() {
|
||||
let _ = state.event_tx.send(RuntimeEvent::YieldRequested);
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) fn exit_callback(
|
||||
scope: &mut v8::PinScope<'_, '_>,
|
||||
_args: v8::FunctionCallbackArguments,
|
||||
_retval: v8::ReturnValue<v8::Value>,
|
||||
) {
|
||||
if let Some(state) = scope.get_slot_mut::<RuntimeState>() {
|
||||
state.exit_requested = true;
|
||||
}
|
||||
if let Some(error) = v8::String::new(scope, EXIT_SENTINEL) {
|
||||
scope.throw_exception(error.into());
|
||||
}
|
||||
}
|
||||
144
codex-rs/code-mode-runtime/src/runtime/globals.rs
Normal file
144
codex-rs/code-mode-runtime/src/runtime/globals.rs
Normal file
@@ -0,0 +1,144 @@
|
||||
use super::RuntimeState;
|
||||
use super::callbacks::clear_timeout_callback;
|
||||
use super::callbacks::exit_callback;
|
||||
use super::callbacks::image_callback;
|
||||
use super::callbacks::load_callback;
|
||||
use super::callbacks::notify_callback;
|
||||
use super::callbacks::set_timeout_callback;
|
||||
use super::callbacks::store_callback;
|
||||
use super::callbacks::text_callback;
|
||||
use super::callbacks::tool_callback;
|
||||
use super::callbacks::yield_control_callback;
|
||||
|
||||
pub(super) fn install_globals(scope: &mut v8::PinScope<'_, '_>) -> Result<(), String> {
|
||||
let global = scope.get_current_context().global(scope);
|
||||
let console = v8::String::new(scope, "console")
|
||||
.ok_or_else(|| "failed to allocate global `console`".to_string())?;
|
||||
if global.delete(scope, console.into()) != Some(true) {
|
||||
return Err("failed to remove global `console`".to_string());
|
||||
}
|
||||
|
||||
let tools = build_tools_object(scope)?;
|
||||
let all_tools = build_all_tools_value(scope)?;
|
||||
let clear_timeout = helper_function(scope, "clearTimeout", clear_timeout_callback)?;
|
||||
let set_timeout = helper_function(scope, "setTimeout", set_timeout_callback)?;
|
||||
let text = helper_function(scope, "text", text_callback)?;
|
||||
let image = helper_function(scope, "image", image_callback)?;
|
||||
let store = helper_function(scope, "store", store_callback)?;
|
||||
let load = helper_function(scope, "load", load_callback)?;
|
||||
let notify = helper_function(scope, "notify", notify_callback)?;
|
||||
let yield_control = helper_function(scope, "yield_control", yield_control_callback)?;
|
||||
let exit = helper_function(scope, "exit", exit_callback)?;
|
||||
|
||||
set_global(scope, global, "tools", tools.into())?;
|
||||
set_global(scope, global, "ALL_TOOLS", all_tools)?;
|
||||
set_global(scope, global, "clearTimeout", clear_timeout.into())?;
|
||||
set_global(scope, global, "setTimeout", set_timeout.into())?;
|
||||
set_global(scope, global, "text", text.into())?;
|
||||
set_global(scope, global, "image", image.into())?;
|
||||
set_global(scope, global, "store", store.into())?;
|
||||
set_global(scope, global, "load", load.into())?;
|
||||
set_global(scope, global, "notify", notify.into())?;
|
||||
set_global(scope, global, "yield_control", yield_control.into())?;
|
||||
set_global(scope, global, "exit", exit.into())?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn build_tools_object<'s>(
|
||||
scope: &mut v8::PinScope<'s, '_>,
|
||||
) -> Result<v8::Local<'s, v8::Object>, String> {
|
||||
let tools = v8::Object::new(scope);
|
||||
let enabled_tools = scope
|
||||
.get_slot::<RuntimeState>()
|
||||
.map(|state| state.enabled_tools.clone())
|
||||
.unwrap_or_default();
|
||||
|
||||
for (tool_index, tool) in enabled_tools.iter().enumerate() {
|
||||
let name = v8::String::new(scope, &tool.global_name)
|
||||
.ok_or_else(|| "failed to allocate tool name".to_string())?;
|
||||
let function = tool_function(scope, tool_index)?;
|
||||
tools.set(scope, name.into(), function.into());
|
||||
}
|
||||
Ok(tools)
|
||||
}
|
||||
|
||||
fn build_all_tools_value<'s>(
|
||||
scope: &mut v8::PinScope<'s, '_>,
|
||||
) -> Result<v8::Local<'s, v8::Value>, String> {
|
||||
let enabled_tools = scope
|
||||
.get_slot::<RuntimeState>()
|
||||
.map(|state| state.enabled_tools.clone())
|
||||
.unwrap_or_default();
|
||||
let array = v8::Array::new(scope, enabled_tools.len() as i32);
|
||||
let name_key = v8::String::new(scope, "name")
|
||||
.ok_or_else(|| "failed to allocate ALL_TOOLS name key".to_string())?;
|
||||
let description_key = v8::String::new(scope, "description")
|
||||
.ok_or_else(|| "failed to allocate ALL_TOOLS description key".to_string())?;
|
||||
|
||||
for (index, tool) in enabled_tools.iter().enumerate() {
|
||||
let item = v8::Object::new(scope);
|
||||
let name = v8::String::new(scope, &tool.global_name)
|
||||
.ok_or_else(|| "failed to allocate ALL_TOOLS name".to_string())?;
|
||||
let description = v8::String::new(scope, &tool.description)
|
||||
.ok_or_else(|| "failed to allocate ALL_TOOLS description".to_string())?;
|
||||
|
||||
if item.set(scope, name_key.into(), name.into()) != Some(true) {
|
||||
return Err("failed to set ALL_TOOLS name".to_string());
|
||||
}
|
||||
if item.set(scope, description_key.into(), description.into()) != Some(true) {
|
||||
return Err("failed to set ALL_TOOLS description".to_string());
|
||||
}
|
||||
if array.set_index(scope, index as u32, item.into()) != Some(true) {
|
||||
return Err("failed to append ALL_TOOLS metadata".to_string());
|
||||
}
|
||||
}
|
||||
|
||||
Ok(array.into())
|
||||
}
|
||||
|
||||
fn helper_function<'s, F>(
|
||||
scope: &mut v8::PinScope<'s, '_>,
|
||||
name: &str,
|
||||
callback: F,
|
||||
) -> Result<v8::Local<'s, v8::Function>, String>
|
||||
where
|
||||
F: v8::MapFnTo<v8::FunctionCallback>,
|
||||
{
|
||||
let name =
|
||||
v8::String::new(scope, name).ok_or_else(|| "failed to allocate helper name".to_string())?;
|
||||
let template = v8::FunctionTemplate::builder(callback)
|
||||
.data(name.into())
|
||||
.build(scope);
|
||||
template
|
||||
.get_function(scope)
|
||||
.ok_or_else(|| "failed to create helper function".to_string())
|
||||
}
|
||||
|
||||
fn tool_function<'s>(
|
||||
scope: &mut v8::PinScope<'s, '_>,
|
||||
tool_index: usize,
|
||||
) -> Result<v8::Local<'s, v8::Function>, String> {
|
||||
let data = v8::String::new(scope, &tool_index.to_string())
|
||||
.ok_or_else(|| "failed to allocate tool callback data".to_string())?;
|
||||
let template = v8::FunctionTemplate::builder(tool_callback)
|
||||
.data(data.into())
|
||||
.build(scope);
|
||||
template
|
||||
.get_function(scope)
|
||||
.ok_or_else(|| "failed to create tool function".to_string())
|
||||
}
|
||||
|
||||
fn set_global<'s>(
|
||||
scope: &mut v8::PinScope<'s, '_>,
|
||||
global: v8::Local<'s, v8::Object>,
|
||||
name: &str,
|
||||
value: v8::Local<'s, v8::Value>,
|
||||
) -> Result<(), String> {
|
||||
let key = v8::String::new(scope, name)
|
||||
.ok_or_else(|| format!("failed to allocate global `{name}`"))?;
|
||||
if global.set(scope, key.into(), value) == Some(true) {
|
||||
Ok(())
|
||||
} else {
|
||||
Err(format!("failed to set global `{name}`"))
|
||||
}
|
||||
}
|
||||
346
codex-rs/code-mode-runtime/src/runtime/mod.rs
Normal file
346
codex-rs/code-mode-runtime/src/runtime/mod.rs
Normal file
@@ -0,0 +1,346 @@
|
||||
mod callbacks;
|
||||
mod globals;
|
||||
mod module_loader;
|
||||
mod timers;
|
||||
mod value;
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::sync::OnceLock;
|
||||
use std::sync::mpsc as std_mpsc;
|
||||
use std::thread;
|
||||
|
||||
use codex_protocol::ToolName;
|
||||
use serde_json::Value as JsonValue;
|
||||
use tokio::sync::mpsc;
|
||||
|
||||
pub use codex_code_mode::DEFAULT_EXEC_YIELD_TIME_MS;
|
||||
pub use codex_code_mode::DEFAULT_MAX_OUTPUT_TOKENS_PER_EXEC_CALL;
|
||||
pub use codex_code_mode::DEFAULT_WAIT_YIELD_TIME_MS;
|
||||
pub use codex_code_mode::ExecuteRequest;
|
||||
pub use codex_code_mode::RuntimeResponse;
|
||||
pub use codex_code_mode::WaitRequest;
|
||||
|
||||
use codex_code_mode::EnabledToolMetadata;
|
||||
use codex_code_mode::FunctionCallOutputContentItem;
|
||||
use codex_code_mode::enabled_tool_metadata;
|
||||
|
||||
const EXIT_SENTINEL: &str = "__codex_code_mode_exit__";
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) enum TurnMessage {
|
||||
ToolCall {
|
||||
cell_id: String,
|
||||
id: String,
|
||||
name: ToolName,
|
||||
input: Option<JsonValue>,
|
||||
},
|
||||
Notify {
|
||||
cell_id: String,
|
||||
call_id: String,
|
||||
text: String,
|
||||
},
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) enum RuntimeCommand {
|
||||
ToolResponse { id: String, result: JsonValue },
|
||||
ToolError { id: String, error_text: String },
|
||||
TimeoutFired { id: u64 },
|
||||
Terminate,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) enum RuntimeEvent {
|
||||
Started,
|
||||
ContentItem(FunctionCallOutputContentItem),
|
||||
YieldRequested,
|
||||
ToolCall {
|
||||
id: String,
|
||||
name: ToolName,
|
||||
input: Option<JsonValue>,
|
||||
},
|
||||
Notify {
|
||||
call_id: String,
|
||||
text: String,
|
||||
},
|
||||
Result {
|
||||
stored_values: HashMap<String, JsonValue>,
|
||||
error_text: Option<String>,
|
||||
},
|
||||
}
|
||||
|
||||
pub(crate) fn spawn_runtime(
|
||||
request: ExecuteRequest,
|
||||
event_tx: mpsc::UnboundedSender<RuntimeEvent>,
|
||||
) -> Result<(std_mpsc::Sender<RuntimeCommand>, v8::IsolateHandle), String> {
|
||||
initialize_v8()?;
|
||||
|
||||
let (command_tx, command_rx) = std_mpsc::channel();
|
||||
let runtime_command_tx = command_tx.clone();
|
||||
let (isolate_handle_tx, isolate_handle_rx) = std_mpsc::sync_channel(1);
|
||||
let enabled_tools = request
|
||||
.enabled_tools
|
||||
.iter()
|
||||
.map(enabled_tool_metadata)
|
||||
.collect::<Vec<_>>();
|
||||
let config = RuntimeConfig {
|
||||
tool_call_id: request.tool_call_id,
|
||||
enabled_tools,
|
||||
source: request.source,
|
||||
stored_values: request.stored_values,
|
||||
};
|
||||
|
||||
thread::spawn(move || {
|
||||
run_runtime(
|
||||
config,
|
||||
event_tx,
|
||||
command_rx,
|
||||
isolate_handle_tx,
|
||||
runtime_command_tx,
|
||||
);
|
||||
});
|
||||
|
||||
let isolate_handle = isolate_handle_rx
|
||||
.recv()
|
||||
.map_err(|_| "failed to initialize code mode runtime".to_string())?;
|
||||
Ok((command_tx, isolate_handle))
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct RuntimeConfig {
|
||||
tool_call_id: String,
|
||||
enabled_tools: Vec<EnabledToolMetadata>,
|
||||
source: String,
|
||||
stored_values: HashMap<String, JsonValue>,
|
||||
}
|
||||
|
||||
pub(super) struct RuntimeState {
|
||||
event_tx: mpsc::UnboundedSender<RuntimeEvent>,
|
||||
pending_tool_calls: HashMap<String, v8::Global<v8::PromiseResolver>>,
|
||||
pending_timeouts: HashMap<u64, timers::ScheduledTimeout>,
|
||||
stored_values: HashMap<String, JsonValue>,
|
||||
enabled_tools: Vec<EnabledToolMetadata>,
|
||||
next_tool_call_id: u64,
|
||||
next_timeout_id: u64,
|
||||
tool_call_id: String,
|
||||
runtime_command_tx: std_mpsc::Sender<RuntimeCommand>,
|
||||
exit_requested: bool,
|
||||
}
|
||||
|
||||
pub(super) enum CompletionState {
|
||||
Pending,
|
||||
Completed {
|
||||
stored_values: HashMap<String, JsonValue>,
|
||||
error_text: Option<String>,
|
||||
},
|
||||
}
|
||||
|
||||
fn initialize_v8() -> Result<(), String> {
|
||||
static PLATFORM: OnceLock<Result<v8::SharedRef<v8::Platform>, String>> = OnceLock::new();
|
||||
|
||||
match PLATFORM.get_or_init(|| {
|
||||
v8::icu::set_common_data_77(deno_core_icudata::ICU_DATA)
|
||||
.map_err(|error_code| format!("failed to initialize ICU data: {error_code}"))?;
|
||||
let platform = v8::new_default_platform(0, false).make_shared();
|
||||
v8::V8::initialize_platform(platform.clone());
|
||||
v8::V8::initialize();
|
||||
Ok(platform)
|
||||
}) {
|
||||
Ok(_) => Ok(()),
|
||||
Err(error_text) => Err(error_text.clone()),
|
||||
}
|
||||
}
|
||||
|
||||
fn run_runtime(
|
||||
config: RuntimeConfig,
|
||||
event_tx: mpsc::UnboundedSender<RuntimeEvent>,
|
||||
command_rx: std_mpsc::Receiver<RuntimeCommand>,
|
||||
isolate_handle_tx: std_mpsc::SyncSender<v8::IsolateHandle>,
|
||||
runtime_command_tx: std_mpsc::Sender<RuntimeCommand>,
|
||||
) {
|
||||
let isolate = &mut v8::Isolate::new(v8::CreateParams::default());
|
||||
let isolate_handle = isolate.thread_safe_handle();
|
||||
if isolate_handle_tx.send(isolate_handle).is_err() {
|
||||
return;
|
||||
}
|
||||
isolate.set_host_import_module_dynamically_callback(module_loader::dynamic_import_callback);
|
||||
|
||||
v8::scope!(let scope, isolate);
|
||||
let context = v8::Context::new(scope, Default::default());
|
||||
let scope = &mut v8::ContextScope::new(scope, context);
|
||||
|
||||
scope.set_slot(RuntimeState {
|
||||
event_tx: event_tx.clone(),
|
||||
pending_tool_calls: HashMap::new(),
|
||||
pending_timeouts: HashMap::new(),
|
||||
stored_values: config.stored_values,
|
||||
enabled_tools: config.enabled_tools,
|
||||
next_tool_call_id: 1,
|
||||
next_timeout_id: 1,
|
||||
tool_call_id: config.tool_call_id,
|
||||
runtime_command_tx,
|
||||
exit_requested: false,
|
||||
});
|
||||
|
||||
if let Err(error_text) = globals::install_globals(scope) {
|
||||
send_result(&event_tx, HashMap::new(), Some(error_text));
|
||||
return;
|
||||
}
|
||||
|
||||
let _ = event_tx.send(RuntimeEvent::Started);
|
||||
|
||||
let pending_promise = match module_loader::evaluate_main_module(scope, &config.source) {
|
||||
Ok(pending_promise) => pending_promise,
|
||||
Err(error_text) => {
|
||||
capture_scope_send_error(scope, &event_tx, Some(error_text));
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
match module_loader::completion_state(scope, pending_promise.as_ref()) {
|
||||
CompletionState::Completed {
|
||||
stored_values,
|
||||
error_text,
|
||||
} => {
|
||||
send_result(&event_tx, stored_values, error_text);
|
||||
return;
|
||||
}
|
||||
CompletionState::Pending => {}
|
||||
}
|
||||
|
||||
let mut pending_promise = pending_promise;
|
||||
loop {
|
||||
let Ok(command) = command_rx.recv() else {
|
||||
break;
|
||||
};
|
||||
|
||||
match command {
|
||||
RuntimeCommand::Terminate => break,
|
||||
RuntimeCommand::ToolResponse { id, result } => {
|
||||
if let Err(error_text) =
|
||||
module_loader::resolve_tool_response(scope, &id, Ok(result))
|
||||
{
|
||||
capture_scope_send_error(scope, &event_tx, Some(error_text));
|
||||
return;
|
||||
}
|
||||
}
|
||||
RuntimeCommand::ToolError { id, error_text } => {
|
||||
if let Err(runtime_error) =
|
||||
module_loader::resolve_tool_response(scope, &id, Err(error_text))
|
||||
{
|
||||
capture_scope_send_error(scope, &event_tx, Some(runtime_error));
|
||||
return;
|
||||
}
|
||||
}
|
||||
RuntimeCommand::TimeoutFired { id } => {
|
||||
if let Err(runtime_error) = timers::invoke_timeout_callback(scope, id) {
|
||||
capture_scope_send_error(scope, &event_tx, Some(runtime_error));
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
scope.perform_microtask_checkpoint();
|
||||
match module_loader::completion_state(scope, pending_promise.as_ref()) {
|
||||
CompletionState::Completed {
|
||||
stored_values,
|
||||
error_text,
|
||||
} => {
|
||||
send_result(&event_tx, stored_values, error_text);
|
||||
return;
|
||||
}
|
||||
CompletionState::Pending => {}
|
||||
}
|
||||
|
||||
if let Some(promise) = pending_promise.as_ref() {
|
||||
let promise = v8::Local::new(scope, promise);
|
||||
if promise.state() != v8::PromiseState::Pending {
|
||||
pending_promise = None;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn capture_scope_send_error(
|
||||
scope: &mut v8::PinScope<'_, '_>,
|
||||
event_tx: &mpsc::UnboundedSender<RuntimeEvent>,
|
||||
error_text: Option<String>,
|
||||
) {
|
||||
let stored_values = scope
|
||||
.get_slot::<RuntimeState>()
|
||||
.map(|state| state.stored_values.clone())
|
||||
.unwrap_or_default();
|
||||
|
||||
send_result(event_tx, stored_values, error_text);
|
||||
}
|
||||
|
||||
fn send_result(
|
||||
event_tx: &mpsc::UnboundedSender<RuntimeEvent>,
|
||||
stored_values: HashMap<String, JsonValue>,
|
||||
error_text: Option<String>,
|
||||
) {
|
||||
let _ = event_tx.send(RuntimeEvent::Result {
|
||||
stored_values,
|
||||
error_text,
|
||||
});
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::collections::HashMap;
|
||||
use std::time::Duration;
|
||||
|
||||
use pretty_assertions::assert_eq;
|
||||
use tokio::sync::mpsc;
|
||||
|
||||
use super::ExecuteRequest;
|
||||
use super::RuntimeEvent;
|
||||
use super::spawn_runtime;
|
||||
|
||||
fn execute_request(source: &str) -> ExecuteRequest {
|
||||
ExecuteRequest {
|
||||
tool_call_id: "call_1".to_string(),
|
||||
enabled_tools: Vec::new(),
|
||||
source: source.to_string(),
|
||||
stored_values: HashMap::new(),
|
||||
yield_time_ms: Some(1),
|
||||
max_output_tokens: None,
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn terminate_execution_stops_cpu_bound_module() {
|
||||
let (event_tx, mut event_rx) = mpsc::unbounded_channel();
|
||||
let (_runtime_tx, runtime_terminate_handle) =
|
||||
spawn_runtime(execute_request("while (true) {}"), event_tx).unwrap();
|
||||
|
||||
let started_event = tokio::time::timeout(Duration::from_secs(1), event_rx.recv())
|
||||
.await
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
assert!(matches!(started_event, RuntimeEvent::Started));
|
||||
|
||||
assert!(runtime_terminate_handle.terminate_execution());
|
||||
|
||||
let result_event = tokio::time::timeout(Duration::from_secs(1), event_rx.recv())
|
||||
.await
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
let RuntimeEvent::Result {
|
||||
stored_values,
|
||||
error_text,
|
||||
} = result_event
|
||||
else {
|
||||
panic!("expected runtime result after termination");
|
||||
};
|
||||
assert_eq!(stored_values, HashMap::new());
|
||||
assert!(error_text.is_some());
|
||||
|
||||
assert!(
|
||||
tokio::time::timeout(Duration::from_secs(1), event_rx.recv())
|
||||
.await
|
||||
.unwrap()
|
||||
.is_none()
|
||||
);
|
||||
}
|
||||
}
|
||||
235
codex-rs/code-mode-runtime/src/runtime/module_loader.rs
Normal file
235
codex-rs/code-mode-runtime/src/runtime/module_loader.rs
Normal file
@@ -0,0 +1,235 @@
|
||||
use serde_json::Value as JsonValue;
|
||||
|
||||
use super::CompletionState;
|
||||
use super::EXIT_SENTINEL;
|
||||
use super::RuntimeState;
|
||||
use super::value::json_to_v8;
|
||||
use super::value::value_to_error_text;
|
||||
|
||||
pub(super) fn evaluate_main_module(
|
||||
scope: &mut v8::PinScope<'_, '_>,
|
||||
source_text: &str,
|
||||
) -> Result<Option<v8::Global<v8::Promise>>, String> {
|
||||
let tc = std::pin::pin!(v8::TryCatch::new(scope));
|
||||
let mut tc = tc.init();
|
||||
let source = v8::String::new(&tc, source_text)
|
||||
.ok_or_else(|| "failed to allocate exec source".to_string())?;
|
||||
let origin = script_origin(&mut tc, "exec_main.mjs")?;
|
||||
let mut source = v8::script_compiler::Source::new(source, Some(&origin));
|
||||
let module = v8::script_compiler::compile_module(&tc, &mut source).ok_or_else(|| {
|
||||
tc.exception()
|
||||
.map(|exception| value_to_error_text(&mut tc, exception))
|
||||
.unwrap_or_else(|| "unknown code mode exception".to_string())
|
||||
})?;
|
||||
module
|
||||
.instantiate_module(&tc, resolve_module_callback)
|
||||
.ok_or_else(|| {
|
||||
tc.exception()
|
||||
.map(|exception| value_to_error_text(&mut tc, exception))
|
||||
.unwrap_or_else(|| "unknown code mode exception".to_string())
|
||||
})?;
|
||||
let result = match module.evaluate(&tc) {
|
||||
Some(result) => result,
|
||||
None => {
|
||||
if let Some(exception) = tc.exception() {
|
||||
if is_exit_exception(&mut tc, exception) {
|
||||
return Ok(None);
|
||||
}
|
||||
return Err(value_to_error_text(&mut tc, exception));
|
||||
}
|
||||
return Err("unknown code mode exception".to_string());
|
||||
}
|
||||
};
|
||||
tc.perform_microtask_checkpoint();
|
||||
|
||||
if result.is_promise() {
|
||||
let promise = v8::Local::<v8::Promise>::try_from(result)
|
||||
.map_err(|_| "failed to read exec promise".to_string())?;
|
||||
return Ok(Some(v8::Global::new(&tc, promise)));
|
||||
}
|
||||
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
fn is_exit_exception(
|
||||
scope: &mut v8::PinScope<'_, '_>,
|
||||
exception: v8::Local<'_, v8::Value>,
|
||||
) -> bool {
|
||||
scope
|
||||
.get_slot::<RuntimeState>()
|
||||
.map(|state| state.exit_requested)
|
||||
.unwrap_or(false)
|
||||
&& exception.is_string()
|
||||
&& exception.to_rust_string_lossy(scope) == EXIT_SENTINEL
|
||||
}
|
||||
|
||||
pub(super) fn resolve_tool_response(
|
||||
scope: &mut v8::PinScope<'_, '_>,
|
||||
id: &str,
|
||||
response: Result<JsonValue, String>,
|
||||
) -> Result<(), String> {
|
||||
let resolver = {
|
||||
let state = scope
|
||||
.get_slot_mut::<RuntimeState>()
|
||||
.ok_or_else(|| "runtime state unavailable".to_string())?;
|
||||
state.pending_tool_calls.remove(id)
|
||||
}
|
||||
.ok_or_else(|| format!("unknown tool call `{id}`"))?;
|
||||
|
||||
let tc = std::pin::pin!(v8::TryCatch::new(scope));
|
||||
let mut tc = tc.init();
|
||||
let resolver = v8::Local::new(&tc, &resolver);
|
||||
match response {
|
||||
Ok(result) => {
|
||||
let value = json_to_v8(&mut tc, &result)
|
||||
.ok_or_else(|| "failed to serialize tool response".to_string())?;
|
||||
resolver.resolve(&tc, value);
|
||||
}
|
||||
Err(error_text) => {
|
||||
let value = v8::String::new(&tc, &error_text)
|
||||
.ok_or_else(|| "failed to allocate tool error".to_string())?;
|
||||
resolver.reject(&tc, value.into());
|
||||
}
|
||||
}
|
||||
if tc.has_caught() {
|
||||
return Err(tc
|
||||
.exception()
|
||||
.map(|exception| value_to_error_text(&mut tc, exception))
|
||||
.unwrap_or_else(|| "unknown code mode exception".to_string()));
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(super) fn completion_state(
|
||||
scope: &mut v8::PinScope<'_, '_>,
|
||||
pending_promise: Option<&v8::Global<v8::Promise>>,
|
||||
) -> CompletionState {
|
||||
let stored_values = scope
|
||||
.get_slot::<RuntimeState>()
|
||||
.map(|state| state.stored_values.clone())
|
||||
.unwrap_or_default();
|
||||
|
||||
let Some(pending_promise) = pending_promise else {
|
||||
return CompletionState::Completed {
|
||||
stored_values,
|
||||
error_text: None,
|
||||
};
|
||||
};
|
||||
|
||||
let promise = v8::Local::new(scope, pending_promise);
|
||||
match promise.state() {
|
||||
v8::PromiseState::Pending => CompletionState::Pending,
|
||||
v8::PromiseState::Fulfilled => CompletionState::Completed {
|
||||
stored_values,
|
||||
error_text: None,
|
||||
},
|
||||
v8::PromiseState::Rejected => {
|
||||
let result = promise.result(scope);
|
||||
let error_text = if is_exit_exception(scope, result) {
|
||||
None
|
||||
} else {
|
||||
Some(value_to_error_text(scope, result))
|
||||
};
|
||||
CompletionState::Completed {
|
||||
stored_values,
|
||||
error_text,
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn script_origin<'s>(
|
||||
scope: &mut v8::PinScope<'s, '_>,
|
||||
resource_name_: &str,
|
||||
) -> Result<v8::ScriptOrigin<'s>, String> {
|
||||
let resource_name = v8::String::new(scope, resource_name_)
|
||||
.ok_or_else(|| "failed to allocate script origin".to_string())?;
|
||||
let source_map_url = v8::String::new(scope, resource_name_)
|
||||
.ok_or_else(|| "failed to allocate source map url".to_string())?;
|
||||
Ok(v8::ScriptOrigin::new(
|
||||
scope,
|
||||
resource_name.into(),
|
||||
0,
|
||||
0,
|
||||
true,
|
||||
0,
|
||||
Some(source_map_url.into()),
|
||||
true,
|
||||
false,
|
||||
true,
|
||||
None,
|
||||
))
|
||||
}
|
||||
|
||||
fn resolve_module_callback<'s>(
|
||||
context: v8::Local<'s, v8::Context>,
|
||||
specifier: v8::Local<'s, v8::String>,
|
||||
_import_attributes: v8::Local<'s, v8::FixedArray>,
|
||||
_referrer: v8::Local<'s, v8::Module>,
|
||||
) -> Option<v8::Local<'s, v8::Module>> {
|
||||
v8::callback_scope!(unsafe scope, context);
|
||||
let specifier = specifier.to_rust_string_lossy(scope);
|
||||
resolve_module(scope, &specifier)
|
||||
}
|
||||
|
||||
pub(super) fn dynamic_import_callback<'s>(
|
||||
scope: &mut v8::PinScope<'s, '_>,
|
||||
_host_defined_options: v8::Local<'s, v8::Data>,
|
||||
_resource_name: v8::Local<'s, v8::Value>,
|
||||
specifier: v8::Local<'s, v8::String>,
|
||||
_import_attributes: v8::Local<'s, v8::FixedArray>,
|
||||
) -> Option<v8::Local<'s, v8::Promise>> {
|
||||
let specifier = specifier.to_rust_string_lossy(scope);
|
||||
let resolver = v8::PromiseResolver::new(scope)?;
|
||||
|
||||
match resolve_module(scope, &specifier) {
|
||||
Some(module) => {
|
||||
if module.get_status() == v8::ModuleStatus::Uninstantiated
|
||||
&& module
|
||||
.instantiate_module(scope, resolve_module_callback)
|
||||
.is_none()
|
||||
{
|
||||
let error = v8::String::new(scope, "failed to instantiate module")
|
||||
.map(Into::into)
|
||||
.unwrap_or_else(|| v8::undefined(scope).into());
|
||||
resolver.reject(scope, error);
|
||||
return Some(resolver.get_promise(scope));
|
||||
}
|
||||
if matches!(
|
||||
module.get_status(),
|
||||
v8::ModuleStatus::Instantiated | v8::ModuleStatus::Evaluated
|
||||
) && module.evaluate(scope).is_none()
|
||||
{
|
||||
let error = v8::String::new(scope, "failed to evaluate module")
|
||||
.map(Into::into)
|
||||
.unwrap_or_else(|| v8::undefined(scope).into());
|
||||
resolver.reject(scope, error);
|
||||
return Some(resolver.get_promise(scope));
|
||||
}
|
||||
let namespace = module.get_module_namespace();
|
||||
resolver.resolve(scope, namespace);
|
||||
Some(resolver.get_promise(scope))
|
||||
}
|
||||
None => {
|
||||
let error = v8::String::new(scope, "unsupported import in exec")
|
||||
.map(Into::into)
|
||||
.unwrap_or_else(|| v8::undefined(scope).into());
|
||||
resolver.reject(scope, error);
|
||||
Some(resolver.get_promise(scope))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn resolve_module<'s>(
|
||||
scope: &mut v8::PinScope<'s, '_>,
|
||||
specifier: &str,
|
||||
) -> Option<v8::Local<'s, v8::Module>> {
|
||||
if let Some(message) =
|
||||
v8::String::new(scope, &format!("Unsupported import in exec: {specifier}"))
|
||||
{
|
||||
scope.throw_exception(message.into());
|
||||
} else {
|
||||
scope.throw_exception(v8::undefined(scope).into());
|
||||
}
|
||||
None
|
||||
}
|
||||
114
codex-rs/code-mode-runtime/src/runtime/timers.rs
Normal file
114
codex-rs/code-mode-runtime/src/runtime/timers.rs
Normal file
@@ -0,0 +1,114 @@
|
||||
use std::thread;
|
||||
use std::time::Duration;
|
||||
|
||||
use super::RuntimeCommand;
|
||||
use super::RuntimeState;
|
||||
use super::value::value_to_error_text;
|
||||
|
||||
pub(super) struct ScheduledTimeout {
|
||||
callback: v8::Global<v8::Function>,
|
||||
}
|
||||
|
||||
pub(super) fn schedule_timeout(
|
||||
scope: &mut v8::PinScope<'_, '_>,
|
||||
args: v8::FunctionCallbackArguments,
|
||||
) -> Result<u64, String> {
|
||||
let callback = args.get(0);
|
||||
if !callback.is_function() {
|
||||
return Err("setTimeout expects a function callback".to_string());
|
||||
}
|
||||
let callback = v8::Local::<v8::Function>::try_from(callback)
|
||||
.map_err(|_| "setTimeout expects a function callback".to_string())?;
|
||||
|
||||
let delay_ms = args
|
||||
.get(1)
|
||||
.number_value(scope)
|
||||
.map(normalize_delay_ms)
|
||||
.unwrap_or(0);
|
||||
|
||||
let callback = v8::Global::new(scope, callback);
|
||||
let state = scope
|
||||
.get_slot_mut::<RuntimeState>()
|
||||
.ok_or_else(|| "runtime state unavailable".to_string())?;
|
||||
let timeout_id = state.next_timeout_id;
|
||||
state.next_timeout_id = state.next_timeout_id.saturating_add(1);
|
||||
let runtime_command_tx = state.runtime_command_tx.clone();
|
||||
state
|
||||
.pending_timeouts
|
||||
.insert(timeout_id, ScheduledTimeout { callback });
|
||||
thread::spawn(move || {
|
||||
thread::sleep(Duration::from_millis(delay_ms));
|
||||
let _ = runtime_command_tx.send(RuntimeCommand::TimeoutFired { id: timeout_id });
|
||||
});
|
||||
|
||||
Ok(timeout_id)
|
||||
}
|
||||
|
||||
pub(super) fn clear_timeout(
|
||||
scope: &mut v8::PinScope<'_, '_>,
|
||||
args: v8::FunctionCallbackArguments,
|
||||
) -> Result<(), String> {
|
||||
let Some(timeout_id) = timeout_id_from_args(scope, args)? else {
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
let Some(state) = scope.get_slot_mut::<RuntimeState>() else {
|
||||
return Err("runtime state unavailable".to_string());
|
||||
};
|
||||
state.pending_timeouts.remove(&timeout_id);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(super) fn invoke_timeout_callback(
|
||||
scope: &mut v8::PinScope<'_, '_>,
|
||||
timeout_id: u64,
|
||||
) -> Result<(), String> {
|
||||
let callback = {
|
||||
let state = scope
|
||||
.get_slot_mut::<RuntimeState>()
|
||||
.ok_or_else(|| "runtime state unavailable".to_string())?;
|
||||
state.pending_timeouts.remove(&timeout_id)
|
||||
};
|
||||
let Some(callback) = callback else {
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
let tc = std::pin::pin!(v8::TryCatch::new(scope));
|
||||
let mut tc = tc.init();
|
||||
let callback = v8::Local::new(&tc, &callback.callback);
|
||||
let receiver = v8::undefined(&tc).into();
|
||||
let _ = callback.call(&tc, receiver, &[]);
|
||||
if tc.has_caught() {
|
||||
return Err(tc
|
||||
.exception()
|
||||
.map(|exception| value_to_error_text(&mut tc, exception))
|
||||
.unwrap_or_else(|| "unknown code mode exception".to_string()));
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
fn timeout_id_from_args(
|
||||
scope: &mut v8::PinScope<'_, '_>,
|
||||
args: v8::FunctionCallbackArguments,
|
||||
) -> Result<Option<u64>, String> {
|
||||
if args.length() == 0 || args.get(0).is_null_or_undefined() {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
let Some(timeout_id) = args.get(0).number_value(scope) else {
|
||||
return Err("clearTimeout expects a numeric timeout id".to_string());
|
||||
};
|
||||
if !timeout_id.is_finite() || timeout_id <= 0.0 {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
Ok(Some(timeout_id.trunc().min(u64::MAX as f64) as u64))
|
||||
}
|
||||
|
||||
fn normalize_delay_ms(delay_ms: f64) -> u64 {
|
||||
if !delay_ms.is_finite() || delay_ms <= 0.0 {
|
||||
0
|
||||
} else {
|
||||
delay_ms.trunc().min(u64::MAX as f64) as u64
|
||||
}
|
||||
}
|
||||
227
codex-rs/code-mode-runtime/src/runtime/value.rs
Normal file
227
codex-rs/code-mode-runtime/src/runtime/value.rs
Normal file
@@ -0,0 +1,227 @@
|
||||
use serde_json::Value as JsonValue;
|
||||
|
||||
use codex_code_mode::FunctionCallOutputContentItem;
|
||||
use codex_code_mode::ImageDetail;
|
||||
|
||||
const IMAGE_HELPER_EXPECTS_MESSAGE: &str = "image expects a non-empty image URL string, an object with image_url and optional detail, or a raw MCP image block";
|
||||
const CODEX_IMAGE_DETAIL_META_KEY: &str = "codex/imageDetail";
|
||||
|
||||
pub(super) fn serialize_output_text(
|
||||
scope: &mut v8::PinScope<'_, '_>,
|
||||
value: v8::Local<'_, v8::Value>,
|
||||
) -> Result<String, String> {
|
||||
if value.is_undefined()
|
||||
|| value.is_null()
|
||||
|| value.is_boolean()
|
||||
|| value.is_number()
|
||||
|| value.is_big_int()
|
||||
|| value.is_string()
|
||||
{
|
||||
return Ok(value.to_rust_string_lossy(scope));
|
||||
}
|
||||
|
||||
let tc = std::pin::pin!(v8::TryCatch::new(scope));
|
||||
let mut tc = tc.init();
|
||||
if let Some(stringified) = v8::json::stringify(&tc, value) {
|
||||
return Ok(stringified.to_rust_string_lossy(&tc));
|
||||
}
|
||||
if tc.has_caught() {
|
||||
return Err(tc
|
||||
.exception()
|
||||
.map(|exception| value_to_error_text(&mut tc, exception))
|
||||
.unwrap_or_else(|| "unknown code mode exception".to_string()));
|
||||
}
|
||||
Ok(value.to_rust_string_lossy(&tc))
|
||||
}
|
||||
|
||||
pub(super) fn normalize_output_image(
|
||||
scope: &mut v8::PinScope<'_, '_>,
|
||||
value: v8::Local<'_, v8::Value>,
|
||||
detail_override: Option<String>,
|
||||
) -> Result<FunctionCallOutputContentItem, ()> {
|
||||
let result = (|| -> Result<FunctionCallOutputContentItem, String> {
|
||||
let (image_url, detail) = if value.is_string() {
|
||||
(value.to_rust_string_lossy(scope), None)
|
||||
} else if value.is_object() && !value.is_array() {
|
||||
let object = v8::Local::<v8::Object>::try_from(value)
|
||||
.map_err(|_| IMAGE_HELPER_EXPECTS_MESSAGE.to_string())?;
|
||||
if let Some(image) = parse_non_mcp_output_image(scope, object)? {
|
||||
image
|
||||
} else {
|
||||
parse_mcp_output_image(scope, value)?
|
||||
}
|
||||
} else {
|
||||
return Err(IMAGE_HELPER_EXPECTS_MESSAGE.to_string());
|
||||
};
|
||||
|
||||
if image_url.is_empty() {
|
||||
return Err(IMAGE_HELPER_EXPECTS_MESSAGE.to_string());
|
||||
}
|
||||
let lower = image_url.to_ascii_lowercase();
|
||||
if !(lower.starts_with("http://")
|
||||
|| lower.starts_with("https://")
|
||||
|| lower.starts_with("data:"))
|
||||
{
|
||||
return Err("image expects an http(s) or data URL".to_string());
|
||||
}
|
||||
|
||||
let detail = detail_override.or(detail);
|
||||
let detail = match detail {
|
||||
Some(detail) => {
|
||||
let normalized = detail.to_ascii_lowercase();
|
||||
Some(match normalized.as_str() {
|
||||
"auto" => ImageDetail::Auto,
|
||||
"low" => ImageDetail::Low,
|
||||
"high" => ImageDetail::High,
|
||||
"original" => ImageDetail::Original,
|
||||
_ => {
|
||||
return Err(
|
||||
"image detail must be one of: auto, low, high, original".to_string()
|
||||
);
|
||||
}
|
||||
})
|
||||
}
|
||||
None => None,
|
||||
};
|
||||
|
||||
Ok(FunctionCallOutputContentItem::InputImage { image_url, detail })
|
||||
})();
|
||||
|
||||
match result {
|
||||
Ok(item) => Ok(item),
|
||||
Err(error_text) => {
|
||||
throw_type_error(scope, &error_text);
|
||||
Err(())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn parse_non_mcp_output_image(
|
||||
scope: &mut v8::PinScope<'_, '_>,
|
||||
object: v8::Local<'_, v8::Object>,
|
||||
) -> Result<Option<(String, Option<String>)>, String> {
|
||||
let image_url_key = v8::String::new(scope, "image_url")
|
||||
.ok_or_else(|| "failed to allocate image helper keys".to_string())?;
|
||||
let Some(image_url) = object.get(scope, image_url_key.into()) else {
|
||||
return Ok(None);
|
||||
};
|
||||
if image_url.is_undefined() {
|
||||
return Ok(None);
|
||||
}
|
||||
if !image_url.is_string() {
|
||||
return Err(IMAGE_HELPER_EXPECTS_MESSAGE.to_string());
|
||||
}
|
||||
let detail_key = v8::String::new(scope, "detail")
|
||||
.ok_or_else(|| "failed to allocate image helper keys".to_string())?;
|
||||
let detail = parse_image_detail_value(scope, object.get(scope, detail_key.into()))?;
|
||||
Ok(Some((image_url.to_rust_string_lossy(scope), detail)))
|
||||
}
|
||||
|
||||
fn parse_mcp_output_image(
|
||||
scope: &mut v8::PinScope<'_, '_>,
|
||||
value: v8::Local<'_, v8::Value>,
|
||||
) -> Result<(String, Option<String>), String> {
|
||||
let Some(result) = v8_value_to_json(scope, value)? else {
|
||||
return Err(IMAGE_HELPER_EXPECTS_MESSAGE.to_string());
|
||||
};
|
||||
let JsonValue::Object(result) = result else {
|
||||
return Err(IMAGE_HELPER_EXPECTS_MESSAGE.to_string());
|
||||
};
|
||||
let Some(item_type) = result.get("type").and_then(JsonValue::as_str) else {
|
||||
return Err(IMAGE_HELPER_EXPECTS_MESSAGE.to_string());
|
||||
};
|
||||
if item_type != "image" {
|
||||
return Err(format!(
|
||||
"image only accepts MCP image blocks, got \"{item_type}\""
|
||||
));
|
||||
}
|
||||
let data = result
|
||||
.get("data")
|
||||
.and_then(JsonValue::as_str)
|
||||
.ok_or_else(|| "image expected MCP image data".to_string())?;
|
||||
if data.is_empty() {
|
||||
return Err("image expected MCP image data".to_string());
|
||||
}
|
||||
|
||||
let image_url = if data.to_ascii_lowercase().starts_with("data:") {
|
||||
data.to_string()
|
||||
} else {
|
||||
let mime_type = result
|
||||
.get("mimeType")
|
||||
.or_else(|| result.get("mime_type"))
|
||||
.and_then(JsonValue::as_str)
|
||||
.filter(|mime_type| !mime_type.is_empty())
|
||||
.unwrap_or("application/octet-stream");
|
||||
format!("data:{mime_type};base64,{data}")
|
||||
};
|
||||
let detail = result
|
||||
.get("_meta")
|
||||
.and_then(JsonValue::as_object)
|
||||
.and_then(|meta| meta.get(CODEX_IMAGE_DETAIL_META_KEY))
|
||||
.and_then(JsonValue::as_str)
|
||||
.filter(|detail| *detail == "original")
|
||||
.map(str::to_string);
|
||||
Ok((image_url, detail))
|
||||
}
|
||||
|
||||
fn parse_image_detail_value<'s>(
|
||||
scope: &mut v8::PinScope<'s, '_>,
|
||||
value: Option<v8::Local<'s, v8::Value>>,
|
||||
) -> Result<Option<String>, String> {
|
||||
match value {
|
||||
Some(value) if value.is_string() => Ok(Some(value.to_rust_string_lossy(scope))),
|
||||
Some(value) if value.is_null() || value.is_undefined() => Ok(None),
|
||||
Some(_) => Err("image detail must be a string when provided".to_string()),
|
||||
None => Ok(None),
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) fn v8_value_to_json(
|
||||
scope: &mut v8::PinScope<'_, '_>,
|
||||
value: v8::Local<'_, v8::Value>,
|
||||
) -> Result<Option<JsonValue>, String> {
|
||||
let tc = std::pin::pin!(v8::TryCatch::new(scope));
|
||||
let mut tc = tc.init();
|
||||
let Some(stringified) = v8::json::stringify(&tc, value) else {
|
||||
if tc.has_caught() {
|
||||
return Err(tc
|
||||
.exception()
|
||||
.map(|exception| value_to_error_text(&mut tc, exception))
|
||||
.unwrap_or_else(|| "unknown code mode exception".to_string()));
|
||||
}
|
||||
return Ok(None);
|
||||
};
|
||||
serde_json::from_str(&stringified.to_rust_string_lossy(&tc))
|
||||
.map(Some)
|
||||
.map_err(|err| format!("failed to serialize JavaScript value: {err}"))
|
||||
}
|
||||
|
||||
pub(super) fn json_to_v8<'s>(
|
||||
scope: &mut v8::PinScope<'s, '_>,
|
||||
value: &JsonValue,
|
||||
) -> Option<v8::Local<'s, v8::Value>> {
|
||||
let json = serde_json::to_string(value).ok()?;
|
||||
let json = v8::String::new(scope, &json)?;
|
||||
v8::json::parse(scope, json)
|
||||
}
|
||||
|
||||
pub(super) fn value_to_error_text(
|
||||
scope: &mut v8::PinScope<'_, '_>,
|
||||
value: v8::Local<'_, v8::Value>,
|
||||
) -> String {
|
||||
if value.is_object()
|
||||
&& let Ok(object) = v8::Local::<v8::Object>::try_from(value)
|
||||
&& let Some(key) = v8::String::new(scope, "stack")
|
||||
&& let Some(stack) = object.get(scope, key.into())
|
||||
&& stack.is_string()
|
||||
{
|
||||
return stack.to_rust_string_lossy(scope);
|
||||
}
|
||||
value.to_rust_string_lossy(scope)
|
||||
}
|
||||
|
||||
pub(super) fn throw_type_error(scope: &mut v8::PinScope<'_, '_>, message: &str) {
|
||||
if let Some(message) = v8::String::new(scope, message) {
|
||||
scope.throw_exception(message.into());
|
||||
}
|
||||
}
|
||||
915
codex-rs/code-mode-runtime/src/service.rs
Normal file
915
codex-rs/code-mode-runtime/src/service.rs
Normal file
@@ -0,0 +1,915 @@
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::AtomicU64;
|
||||
use std::sync::atomic::Ordering;
|
||||
use std::time::Duration;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use serde_json::Value as JsonValue;
|
||||
use tokio::sync::Mutex;
|
||||
use tokio::sync::mpsc;
|
||||
use tokio::sync::oneshot;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::warn;
|
||||
|
||||
use crate::runtime::DEFAULT_EXEC_YIELD_TIME_MS;
|
||||
use crate::runtime::ExecuteRequest;
|
||||
use crate::runtime::RuntimeCommand;
|
||||
use crate::runtime::RuntimeEvent;
|
||||
use crate::runtime::RuntimeResponse;
|
||||
use crate::runtime::TurnMessage;
|
||||
use crate::runtime::WaitRequest;
|
||||
use crate::runtime::spawn_runtime;
|
||||
use codex_code_mode::CodeModeRuntimeFactory;
|
||||
use codex_code_mode::CodeModeRuntimeService;
|
||||
use codex_code_mode::CodeModeTurnHost;
|
||||
use codex_code_mode::FunctionCallOutputContentItem;
|
||||
|
||||
#[derive(Clone)]
|
||||
struct SessionHandle {
|
||||
control_tx: mpsc::UnboundedSender<SessionControlCommand>,
|
||||
runtime_tx: std::sync::mpsc::Sender<RuntimeCommand>,
|
||||
}
|
||||
|
||||
struct Inner {
|
||||
stored_values: Mutex<HashMap<String, JsonValue>>,
|
||||
sessions: Mutex<HashMap<String, SessionHandle>>,
|
||||
turn_message_tx: async_channel::Sender<TurnMessage>,
|
||||
turn_message_rx: async_channel::Receiver<TurnMessage>,
|
||||
next_cell_id: AtomicU64,
|
||||
}
|
||||
|
||||
pub struct CodeModeService {
|
||||
inner: Arc<Inner>,
|
||||
}
|
||||
|
||||
impl CodeModeService {
|
||||
pub fn new() -> Self {
|
||||
let (turn_message_tx, turn_message_rx) = async_channel::unbounded();
|
||||
|
||||
Self {
|
||||
inner: Arc::new(Inner {
|
||||
stored_values: Mutex::new(HashMap::new()),
|
||||
sessions: Mutex::new(HashMap::new()),
|
||||
turn_message_tx,
|
||||
turn_message_rx,
|
||||
next_cell_id: AtomicU64::new(1),
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn stored_values(&self) -> HashMap<String, JsonValue> {
|
||||
self.inner.stored_values.lock().await.clone()
|
||||
}
|
||||
|
||||
pub async fn replace_stored_values(&self, values: HashMap<String, JsonValue>) {
|
||||
*self.inner.stored_values.lock().await = values;
|
||||
}
|
||||
|
||||
pub async fn execute(&self, request: ExecuteRequest) -> Result<RuntimeResponse, String> {
|
||||
let cell_id = self
|
||||
.inner
|
||||
.next_cell_id
|
||||
.fetch_add(1, Ordering::Relaxed)
|
||||
.to_string();
|
||||
let (event_tx, event_rx) = mpsc::unbounded_channel();
|
||||
let (runtime_tx, runtime_terminate_handle) = spawn_runtime(request.clone(), event_tx)?;
|
||||
let (control_tx, control_rx) = mpsc::unbounded_channel();
|
||||
let (response_tx, response_rx) = oneshot::channel();
|
||||
|
||||
self.inner.sessions.lock().await.insert(
|
||||
cell_id.clone(),
|
||||
SessionHandle {
|
||||
control_tx: control_tx.clone(),
|
||||
runtime_tx: runtime_tx.clone(),
|
||||
},
|
||||
);
|
||||
|
||||
tokio::spawn(run_session_control(
|
||||
Arc::clone(&self.inner),
|
||||
SessionControlContext {
|
||||
cell_id: cell_id.clone(),
|
||||
runtime_tx,
|
||||
runtime_terminate_handle,
|
||||
},
|
||||
event_rx,
|
||||
control_rx,
|
||||
response_tx,
|
||||
request.yield_time_ms.unwrap_or(DEFAULT_EXEC_YIELD_TIME_MS),
|
||||
));
|
||||
|
||||
response_rx
|
||||
.await
|
||||
.map_err(|_| "exec runtime ended unexpectedly".to_string())
|
||||
}
|
||||
|
||||
pub async fn wait(&self, request: WaitRequest) -> Result<RuntimeResponse, String> {
|
||||
let cell_id = request.cell_id.clone();
|
||||
let handle = self
|
||||
.inner
|
||||
.sessions
|
||||
.lock()
|
||||
.await
|
||||
.get(&request.cell_id)
|
||||
.cloned();
|
||||
let Some(handle) = handle else {
|
||||
return Ok(missing_cell_response(cell_id));
|
||||
};
|
||||
let (response_tx, response_rx) = oneshot::channel();
|
||||
let control_message = if request.terminate {
|
||||
SessionControlCommand::Terminate { response_tx }
|
||||
} else {
|
||||
SessionControlCommand::Poll {
|
||||
yield_time_ms: request.yield_time_ms,
|
||||
response_tx,
|
||||
}
|
||||
};
|
||||
if handle.control_tx.send(control_message).is_err() {
|
||||
return Ok(missing_cell_response(cell_id));
|
||||
}
|
||||
match response_rx.await {
|
||||
Ok(response) => Ok(response),
|
||||
Err(_) => Ok(missing_cell_response(request.cell_id)),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn start_turn_worker(&self, host: Arc<dyn CodeModeTurnHost>) -> CodeModeTurnWorker {
|
||||
let (shutdown_tx, mut shutdown_rx) = oneshot::channel();
|
||||
let inner = Arc::clone(&self.inner);
|
||||
let turn_message_rx = self.inner.turn_message_rx.clone();
|
||||
|
||||
tokio::spawn(async move {
|
||||
loop {
|
||||
let next_message = tokio::select! {
|
||||
_ = &mut shutdown_rx => break,
|
||||
message = turn_message_rx.recv() => message.ok(),
|
||||
};
|
||||
let Some(next_message) = next_message else {
|
||||
break;
|
||||
};
|
||||
match next_message {
|
||||
TurnMessage::Notify {
|
||||
cell_id,
|
||||
call_id,
|
||||
text,
|
||||
} => {
|
||||
if let Err(err) = host.notify(call_id, cell_id.clone(), text).await {
|
||||
warn!(
|
||||
"failed to deliver code mode notification for cell {cell_id}: {err}"
|
||||
);
|
||||
}
|
||||
}
|
||||
TurnMessage::ToolCall {
|
||||
cell_id,
|
||||
id,
|
||||
name,
|
||||
input,
|
||||
} => {
|
||||
let host = Arc::clone(&host);
|
||||
let inner = Arc::clone(&inner);
|
||||
tokio::spawn(async move {
|
||||
let response = host
|
||||
.invoke_tool(name, input, CancellationToken::new())
|
||||
.await;
|
||||
let runtime_tx = inner
|
||||
.sessions
|
||||
.lock()
|
||||
.await
|
||||
.get(&cell_id)
|
||||
.map(|handle| handle.runtime_tx.clone());
|
||||
let Some(runtime_tx) = runtime_tx else {
|
||||
return;
|
||||
};
|
||||
let command = match response {
|
||||
Ok(result) => RuntimeCommand::ToolResponse { id, result },
|
||||
Err(error_text) => RuntimeCommand::ToolError { id, error_text },
|
||||
};
|
||||
let _ = runtime_tx.send(command);
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
CodeModeTurnWorker {
|
||||
shutdown_tx: Some(shutdown_tx),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for CodeModeService {
|
||||
fn default() -> Self {
|
||||
Self::new()
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl CodeModeRuntimeService for CodeModeService {
|
||||
async fn stored_values(&self) -> HashMap<String, JsonValue> {
|
||||
self.stored_values().await
|
||||
}
|
||||
|
||||
async fn replace_stored_values(&self, values: HashMap<String, JsonValue>) {
|
||||
self.replace_stored_values(values).await;
|
||||
}
|
||||
|
||||
async fn execute(&self, request: ExecuteRequest) -> Result<RuntimeResponse, String> {
|
||||
self.execute(request).await
|
||||
}
|
||||
|
||||
async fn wait(&self, request: WaitRequest) -> Result<RuntimeResponse, String> {
|
||||
self.wait(request).await
|
||||
}
|
||||
|
||||
fn start_turn_worker(&self, host: Arc<dyn CodeModeTurnHost>) -> Box<dyn Send> {
|
||||
Box::new(self.start_turn_worker(host))
|
||||
}
|
||||
}
|
||||
|
||||
pub struct CodeModeTurnWorker {
|
||||
shutdown_tx: Option<oneshot::Sender<()>>,
|
||||
}
|
||||
|
||||
impl Drop for CodeModeTurnWorker {
|
||||
fn drop(&mut self) {
|
||||
if let Some(shutdown_tx) = self.shutdown_tx.take() {
|
||||
let _ = shutdown_tx.send(());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn runtime_factory() -> CodeModeRuntimeFactory {
|
||||
Arc::new(|| Arc::new(CodeModeService::new()))
|
||||
}
|
||||
|
||||
enum SessionControlCommand {
|
||||
Poll {
|
||||
yield_time_ms: u64,
|
||||
response_tx: oneshot::Sender<RuntimeResponse>,
|
||||
},
|
||||
Terminate {
|
||||
response_tx: oneshot::Sender<RuntimeResponse>,
|
||||
},
|
||||
}
|
||||
|
||||
struct PendingResult {
|
||||
content_items: Vec<FunctionCallOutputContentItem>,
|
||||
stored_values: HashMap<String, JsonValue>,
|
||||
error_text: Option<String>,
|
||||
}
|
||||
|
||||
struct SessionControlContext {
|
||||
cell_id: String,
|
||||
runtime_tx: std::sync::mpsc::Sender<RuntimeCommand>,
|
||||
runtime_terminate_handle: v8::IsolateHandle,
|
||||
}
|
||||
|
||||
fn missing_cell_response(cell_id: String) -> RuntimeResponse {
|
||||
RuntimeResponse::Result {
|
||||
error_text: Some(format!("exec cell {cell_id} not found")),
|
||||
cell_id,
|
||||
content_items: Vec::new(),
|
||||
stored_values: HashMap::new(),
|
||||
}
|
||||
}
|
||||
|
||||
fn pending_result_response(cell_id: &str, result: PendingResult) -> RuntimeResponse {
|
||||
RuntimeResponse::Result {
|
||||
cell_id: cell_id.to_string(),
|
||||
content_items: result.content_items,
|
||||
stored_values: result.stored_values,
|
||||
error_text: result.error_text,
|
||||
}
|
||||
}
|
||||
|
||||
fn send_or_buffer_result(
|
||||
cell_id: &str,
|
||||
result: PendingResult,
|
||||
response_tx: &mut Option<oneshot::Sender<RuntimeResponse>>,
|
||||
pending_result: &mut Option<PendingResult>,
|
||||
) -> bool {
|
||||
if let Some(response_tx) = response_tx.take() {
|
||||
let _ = response_tx.send(pending_result_response(cell_id, result));
|
||||
return true;
|
||||
}
|
||||
|
||||
*pending_result = Some(result);
|
||||
false
|
||||
}
|
||||
|
||||
async fn run_session_control(
|
||||
inner: Arc<Inner>,
|
||||
context: SessionControlContext,
|
||||
mut event_rx: mpsc::UnboundedReceiver<RuntimeEvent>,
|
||||
mut control_rx: mpsc::UnboundedReceiver<SessionControlCommand>,
|
||||
initial_response_tx: oneshot::Sender<RuntimeResponse>,
|
||||
initial_yield_time_ms: u64,
|
||||
) {
|
||||
let SessionControlContext {
|
||||
cell_id,
|
||||
runtime_tx,
|
||||
runtime_terminate_handle,
|
||||
} = context;
|
||||
let mut content_items = Vec::new();
|
||||
let mut pending_result: Option<PendingResult> = None;
|
||||
let mut response_tx = Some(initial_response_tx);
|
||||
let mut termination_requested = false;
|
||||
let mut runtime_closed = false;
|
||||
let mut yield_timer: Option<std::pin::Pin<Box<tokio::time::Sleep>>> = None;
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
maybe_event = async {
|
||||
if runtime_closed {
|
||||
std::future::pending::<Option<RuntimeEvent>>().await
|
||||
} else {
|
||||
event_rx.recv().await
|
||||
}
|
||||
} => {
|
||||
let Some(event) = maybe_event else {
|
||||
runtime_closed = true;
|
||||
if termination_requested {
|
||||
if let Some(response_tx) = response_tx.take() {
|
||||
let _ = response_tx.send(RuntimeResponse::Terminated {
|
||||
cell_id: cell_id.clone(),
|
||||
content_items: std::mem::take(&mut content_items),
|
||||
});
|
||||
}
|
||||
break;
|
||||
}
|
||||
if pending_result.is_none() {
|
||||
let result = PendingResult {
|
||||
content_items: std::mem::take(&mut content_items),
|
||||
stored_values: HashMap::new(),
|
||||
error_text: Some("exec runtime ended unexpectedly".to_string()),
|
||||
};
|
||||
if send_or_buffer_result(
|
||||
&cell_id,
|
||||
result,
|
||||
&mut response_tx,
|
||||
&mut pending_result,
|
||||
) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
continue;
|
||||
};
|
||||
match event {
|
||||
RuntimeEvent::Started => {
|
||||
yield_timer = Some(Box::pin(tokio::time::sleep(Duration::from_millis(initial_yield_time_ms))));
|
||||
}
|
||||
RuntimeEvent::ContentItem(item) => {
|
||||
content_items.push(item);
|
||||
}
|
||||
RuntimeEvent::YieldRequested => {
|
||||
yield_timer = None;
|
||||
if let Some(response_tx) = response_tx.take() {
|
||||
let _ = response_tx.send(RuntimeResponse::Yielded {
|
||||
cell_id: cell_id.clone(),
|
||||
content_items: std::mem::take(&mut content_items),
|
||||
});
|
||||
}
|
||||
}
|
||||
RuntimeEvent::Notify { call_id, text } => {
|
||||
let _ = inner.turn_message_tx.send(TurnMessage::Notify {
|
||||
cell_id: cell_id.clone(),
|
||||
call_id,
|
||||
text,
|
||||
}).await;
|
||||
}
|
||||
RuntimeEvent::ToolCall { id, name, input } => {
|
||||
let _ = inner.turn_message_tx.send(TurnMessage::ToolCall {
|
||||
cell_id: cell_id.clone(),
|
||||
id,
|
||||
name,
|
||||
input,
|
||||
}).await;
|
||||
}
|
||||
RuntimeEvent::Result {
|
||||
stored_values,
|
||||
error_text,
|
||||
} => {
|
||||
yield_timer = None;
|
||||
if termination_requested {
|
||||
if let Some(response_tx) = response_tx.take() {
|
||||
let _ = response_tx.send(RuntimeResponse::Terminated {
|
||||
cell_id: cell_id.clone(),
|
||||
content_items: std::mem::take(&mut content_items),
|
||||
});
|
||||
}
|
||||
break;
|
||||
}
|
||||
let result = PendingResult {
|
||||
content_items: std::mem::take(&mut content_items),
|
||||
stored_values,
|
||||
error_text,
|
||||
};
|
||||
if send_or_buffer_result(
|
||||
&cell_id,
|
||||
result,
|
||||
&mut response_tx,
|
||||
&mut pending_result,
|
||||
) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
maybe_command = control_rx.recv() => {
|
||||
let Some(command) = maybe_command else {
|
||||
break;
|
||||
};
|
||||
match command {
|
||||
SessionControlCommand::Poll {
|
||||
yield_time_ms,
|
||||
response_tx: next_response_tx,
|
||||
} => {
|
||||
if let Some(result) = pending_result.take() {
|
||||
let _ = next_response_tx.send(pending_result_response(&cell_id, result));
|
||||
break;
|
||||
}
|
||||
response_tx = Some(next_response_tx);
|
||||
yield_timer = Some(Box::pin(tokio::time::sleep(Duration::from_millis(yield_time_ms))));
|
||||
}
|
||||
SessionControlCommand::Terminate { response_tx: next_response_tx } => {
|
||||
if let Some(result) = pending_result.take() {
|
||||
let _ = next_response_tx.send(pending_result_response(&cell_id, result));
|
||||
break;
|
||||
}
|
||||
|
||||
response_tx = Some(next_response_tx);
|
||||
termination_requested = true;
|
||||
yield_timer = None;
|
||||
let _ = runtime_tx.send(RuntimeCommand::Terminate);
|
||||
let _ = runtime_terminate_handle.terminate_execution();
|
||||
if runtime_closed {
|
||||
if let Some(response_tx) = response_tx.take() {
|
||||
let _ = response_tx.send(RuntimeResponse::Terminated {
|
||||
cell_id: cell_id.clone(),
|
||||
content_items: std::mem::take(&mut content_items),
|
||||
});
|
||||
}
|
||||
break;
|
||||
} else {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
_ = async {
|
||||
if let Some(yield_timer) = yield_timer.as_mut() {
|
||||
yield_timer.await;
|
||||
} else {
|
||||
std::future::pending::<()>().await;
|
||||
}
|
||||
} => {
|
||||
yield_timer = None;
|
||||
if let Some(response_tx) = response_tx.take() {
|
||||
let _ = response_tx.send(RuntimeResponse::Yielded {
|
||||
cell_id: cell_id.clone(),
|
||||
content_items: std::mem::take(&mut content_items),
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let _ = runtime_tx.send(RuntimeCommand::Terminate);
|
||||
inner.sessions.lock().await.remove(&cell_id);
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::AtomicU64;
|
||||
use std::time::Duration;
|
||||
|
||||
use pretty_assertions::assert_eq;
|
||||
use tokio::sync::Mutex;
|
||||
use tokio::sync::mpsc;
|
||||
use tokio::sync::oneshot;
|
||||
|
||||
use super::CodeModeService;
|
||||
use super::Inner;
|
||||
use super::RuntimeCommand;
|
||||
use super::RuntimeResponse;
|
||||
use super::SessionControlCommand;
|
||||
use super::SessionControlContext;
|
||||
use super::run_session_control;
|
||||
use crate::runtime::ExecuteRequest;
|
||||
use crate::runtime::RuntimeEvent;
|
||||
use crate::runtime::spawn_runtime;
|
||||
use codex_code_mode::FunctionCallOutputContentItem;
|
||||
|
||||
fn execute_request(source: &str) -> ExecuteRequest {
|
||||
ExecuteRequest {
|
||||
tool_call_id: "call_1".to_string(),
|
||||
enabled_tools: Vec::new(),
|
||||
source: source.to_string(),
|
||||
stored_values: HashMap::new(),
|
||||
yield_time_ms: Some(1),
|
||||
max_output_tokens: None,
|
||||
}
|
||||
}
|
||||
|
||||
fn test_inner() -> Arc<Inner> {
|
||||
let (turn_message_tx, turn_message_rx) = async_channel::unbounded();
|
||||
Arc::new(Inner {
|
||||
stored_values: Mutex::new(HashMap::new()),
|
||||
sessions: Mutex::new(HashMap::new()),
|
||||
turn_message_tx,
|
||||
turn_message_rx,
|
||||
next_cell_id: AtomicU64::new(1),
|
||||
})
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn synchronous_exit_returns_successfully() {
|
||||
let service = CodeModeService::new();
|
||||
|
||||
let response = service
|
||||
.execute(ExecuteRequest {
|
||||
source: r#"text("before"); exit(); text("after");"#.to_string(),
|
||||
yield_time_ms: None,
|
||||
..execute_request("")
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(
|
||||
response,
|
||||
RuntimeResponse::Result {
|
||||
cell_id: "1".to_string(),
|
||||
content_items: vec![FunctionCallOutputContentItem::InputText {
|
||||
text: "before".to_string(),
|
||||
}],
|
||||
stored_values: HashMap::new(),
|
||||
error_text: None,
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn v8_console_is_not_exposed_on_global_this() {
|
||||
let service = CodeModeService::new();
|
||||
|
||||
let response = service
|
||||
.execute(ExecuteRequest {
|
||||
source: r#"text(String(Object.hasOwn(globalThis, "console")));"#.to_string(),
|
||||
yield_time_ms: None,
|
||||
..execute_request("")
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(
|
||||
response,
|
||||
RuntimeResponse::Result {
|
||||
cell_id: "1".to_string(),
|
||||
content_items: vec![FunctionCallOutputContentItem::InputText {
|
||||
text: "false".to_string(),
|
||||
}],
|
||||
stored_values: HashMap::new(),
|
||||
error_text: None,
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn date_locale_string_formats_with_icu_data() {
|
||||
let service = CodeModeService::new();
|
||||
|
||||
let response = service
|
||||
.execute(ExecuteRequest {
|
||||
source: r#"
|
||||
const value = new Date("2025-01-02T03:04:05Z")
|
||||
.toLocaleString("fr-FR", {
|
||||
weekday: "long",
|
||||
month: "long",
|
||||
day: "numeric",
|
||||
hour: "2-digit",
|
||||
minute: "2-digit",
|
||||
second: "2-digit",
|
||||
hour12: false,
|
||||
timeZone: "UTC",
|
||||
});
|
||||
text(value);
|
||||
"#
|
||||
.to_string(),
|
||||
yield_time_ms: None,
|
||||
..execute_request("")
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(
|
||||
response,
|
||||
RuntimeResponse::Result {
|
||||
cell_id: "1".to_string(),
|
||||
content_items: vec![FunctionCallOutputContentItem::InputText {
|
||||
text: "jeudi 2 janvier \u{e0} 03:04:05".to_string(),
|
||||
}],
|
||||
stored_values: HashMap::new(),
|
||||
error_text: None,
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn intl_date_time_format_formats_with_icu_data() {
|
||||
let service = CodeModeService::new();
|
||||
|
||||
let response = service
|
||||
.execute(ExecuteRequest {
|
||||
source: r#"
|
||||
const formatter = new Intl.DateTimeFormat("fr-FR", {
|
||||
weekday: "long",
|
||||
month: "long",
|
||||
day: "numeric",
|
||||
hour: "2-digit",
|
||||
minute: "2-digit",
|
||||
second: "2-digit",
|
||||
hour12: false,
|
||||
timeZone: "UTC",
|
||||
});
|
||||
text(formatter.format(new Date("2025-01-02T03:04:05Z")));
|
||||
"#
|
||||
.to_string(),
|
||||
yield_time_ms: None,
|
||||
..execute_request("")
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(
|
||||
response,
|
||||
RuntimeResponse::Result {
|
||||
cell_id: "1".to_string(),
|
||||
content_items: vec![FunctionCallOutputContentItem::InputText {
|
||||
text: "jeudi 2 janvier \u{e0} 03:04:05".to_string(),
|
||||
}],
|
||||
stored_values: HashMap::new(),
|
||||
error_text: None,
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn output_helpers_return_undefined() {
|
||||
let service = CodeModeService::new();
|
||||
|
||||
let response = service
|
||||
.execute(ExecuteRequest {
|
||||
source: r#"
|
||||
const returnsUndefined = [
|
||||
text("first"),
|
||||
image("https://example.com/image.jpg"),
|
||||
notify("ping"),
|
||||
].map((value) => value === undefined);
|
||||
text(JSON.stringify(returnsUndefined));
|
||||
"#
|
||||
.to_string(),
|
||||
yield_time_ms: None,
|
||||
..execute_request("")
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(
|
||||
response,
|
||||
RuntimeResponse::Result {
|
||||
cell_id: "1".to_string(),
|
||||
content_items: vec![
|
||||
FunctionCallOutputContentItem::InputText {
|
||||
text: "first".to_string(),
|
||||
},
|
||||
FunctionCallOutputContentItem::InputImage {
|
||||
image_url: "https://example.com/image.jpg".to_string(),
|
||||
detail: None,
|
||||
},
|
||||
FunctionCallOutputContentItem::InputText {
|
||||
text: "[true,true,true]".to_string(),
|
||||
},
|
||||
],
|
||||
stored_values: HashMap::new(),
|
||||
error_text: None,
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn image_helper_accepts_raw_mcp_image_block_with_original_detail() {
|
||||
let service = CodeModeService::new();
|
||||
|
||||
let response = service
|
||||
.execute(ExecuteRequest {
|
||||
source: r#"
|
||||
image({
|
||||
type: "image",
|
||||
data: "iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAYAAAAfFcSJAAAADUlEQVR4nGP4z8DwHwAFAAH/iZk9HQAAAABJRU5ErkJggg==",
|
||||
mimeType: "image/png",
|
||||
_meta: { "codex/imageDetail": "original" },
|
||||
});
|
||||
"#
|
||||
.to_string(),
|
||||
yield_time_ms: None,
|
||||
..execute_request("")
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(
|
||||
response,
|
||||
RuntimeResponse::Result {
|
||||
cell_id: "1".to_string(),
|
||||
content_items: vec![FunctionCallOutputContentItem::InputImage {
|
||||
image_url: "data:image/png;base64,iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAYAAAAfFcSJAAAADUlEQVR4nGP4z8DwHwAFAAH/iZk9HQAAAABJRU5ErkJggg==".to_string(),
|
||||
detail: Some(crate::ImageDetail::Original),
|
||||
}],
|
||||
stored_values: HashMap::new(),
|
||||
error_text: None,
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn image_helper_second_arg_overrides_explicit_object_detail() {
|
||||
let service = CodeModeService::new();
|
||||
|
||||
let response = service
|
||||
.execute(ExecuteRequest {
|
||||
source: r#"
|
||||
image(
|
||||
{
|
||||
image_url: "https://example.com/image.jpg",
|
||||
detail: "low",
|
||||
},
|
||||
"original",
|
||||
);
|
||||
"#
|
||||
.to_string(),
|
||||
yield_time_ms: None,
|
||||
..execute_request("")
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(
|
||||
response,
|
||||
RuntimeResponse::Result {
|
||||
cell_id: "1".to_string(),
|
||||
content_items: vec![FunctionCallOutputContentItem::InputImage {
|
||||
image_url: "https://example.com/image.jpg".to_string(),
|
||||
detail: Some(crate::ImageDetail::Original),
|
||||
}],
|
||||
stored_values: HashMap::new(),
|
||||
error_text: None,
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn image_helper_second_arg_overrides_raw_mcp_image_detail() {
|
||||
let service = CodeModeService::new();
|
||||
|
||||
let response = service
|
||||
.execute(ExecuteRequest {
|
||||
source: r#"
|
||||
image(
|
||||
{
|
||||
type: "image",
|
||||
data: "iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAYAAAAfFcSJAAAADUlEQVR4nGP4z8DwHwAFAAH/iZk9HQAAAABJRU5ErkJggg==",
|
||||
mimeType: "image/png",
|
||||
_meta: { "codex/imageDetail": "original" },
|
||||
},
|
||||
"low",
|
||||
);
|
||||
"#
|
||||
.to_string(),
|
||||
yield_time_ms: None,
|
||||
..execute_request("")
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(
|
||||
response,
|
||||
RuntimeResponse::Result {
|
||||
cell_id: "1".to_string(),
|
||||
content_items: vec![FunctionCallOutputContentItem::InputImage {
|
||||
image_url: "data:image/png;base64,iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAYAAAAfFcSJAAAADUlEQVR4nGP4z8DwHwAFAAH/iZk9HQAAAABJRU5ErkJggg==".to_string(),
|
||||
detail: Some(crate::ImageDetail::Low),
|
||||
}],
|
||||
stored_values: HashMap::new(),
|
||||
error_text: None,
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn image_helper_rejects_raw_mcp_result_container() {
|
||||
let service = CodeModeService::new();
|
||||
|
||||
let response = service
|
||||
.execute(ExecuteRequest {
|
||||
source: r#"
|
||||
image({
|
||||
content: [
|
||||
{
|
||||
type: "image",
|
||||
data: "iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAYAAAAfFcSJAAAADUlEQVR4nGP4z8DwHwAFAAH/iZk9HQAAAABJRU5ErkJggg==",
|
||||
mimeType: "image/png",
|
||||
_meta: { "codex/imageDetail": "original" },
|
||||
},
|
||||
],
|
||||
isError: false,
|
||||
});
|
||||
"#
|
||||
.to_string(),
|
||||
yield_time_ms: None,
|
||||
..execute_request("")
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(
|
||||
response,
|
||||
RuntimeResponse::Result {
|
||||
cell_id: "1".to_string(),
|
||||
content_items: Vec::new(),
|
||||
stored_values: HashMap::new(),
|
||||
error_text: Some(
|
||||
"image expects a non-empty image URL string, an object with image_url and optional detail, or a raw MCP image block".to_string(),
|
||||
),
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn terminate_waits_for_runtime_shutdown_before_responding() {
|
||||
let inner = test_inner();
|
||||
let (event_tx, event_rx) = mpsc::unbounded_channel();
|
||||
let (control_tx, control_rx) = mpsc::unbounded_channel();
|
||||
let (initial_response_tx, initial_response_rx) = oneshot::channel();
|
||||
let (runtime_event_tx, _runtime_event_rx) = mpsc::unbounded_channel();
|
||||
let (runtime_tx, runtime_terminate_handle) = spawn_runtime(
|
||||
ExecuteRequest {
|
||||
source: "await new Promise(() => {})".to_string(),
|
||||
yield_time_ms: None,
|
||||
..execute_request("")
|
||||
},
|
||||
runtime_event_tx,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
tokio::spawn(run_session_control(
|
||||
inner,
|
||||
SessionControlContext {
|
||||
cell_id: "cell-1".to_string(),
|
||||
runtime_tx: runtime_tx.clone(),
|
||||
runtime_terminate_handle,
|
||||
},
|
||||
event_rx,
|
||||
control_rx,
|
||||
initial_response_tx,
|
||||
/*initial_yield_time_ms*/ 60_000,
|
||||
));
|
||||
|
||||
event_tx.send(RuntimeEvent::Started).unwrap();
|
||||
event_tx.send(RuntimeEvent::YieldRequested).unwrap();
|
||||
assert_eq!(
|
||||
initial_response_rx.await.unwrap(),
|
||||
RuntimeResponse::Yielded {
|
||||
cell_id: "cell-1".to_string(),
|
||||
content_items: Vec::new(),
|
||||
}
|
||||
);
|
||||
|
||||
let (terminate_response_tx, terminate_response_rx) = oneshot::channel();
|
||||
control_tx
|
||||
.send(SessionControlCommand::Terminate {
|
||||
response_tx: terminate_response_tx,
|
||||
})
|
||||
.unwrap();
|
||||
let terminate_response = async { terminate_response_rx.await.unwrap() };
|
||||
tokio::pin!(terminate_response);
|
||||
assert!(
|
||||
tokio::time::timeout(Duration::from_millis(100), terminate_response.as_mut())
|
||||
.await
|
||||
.is_err()
|
||||
);
|
||||
|
||||
drop(event_tx);
|
||||
|
||||
assert_eq!(
|
||||
terminate_response.await,
|
||||
RuntimeResponse::Terminated {
|
||||
cell_id: "cell-1".to_string(),
|
||||
content_items: Vec::new(),
|
||||
}
|
||||
);
|
||||
|
||||
let _ = runtime_tx.send(RuntimeCommand::Terminate);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user