mod callbacks; mod globals; mod module_loader; mod value; use std::collections::HashMap; use std::sync::OnceLock; use std::sync::mpsc as std_mpsc; use std::thread; use serde_json::Value as JsonValue; use tokio::sync::mpsc; use crate::description::EnabledToolMetadata; use crate::description::ToolDefinition; use crate::description::enabled_tool_metadata; use crate::response::FunctionCallOutputContentItem; pub const DEFAULT_EXEC_YIELD_TIME_MS: u64 = 10_000; pub const DEFAULT_WAIT_YIELD_TIME_MS: u64 = 10_000; pub const DEFAULT_MAX_OUTPUT_TOKENS_PER_EXEC_CALL: usize = 10_000; const EXIT_SENTINEL: &str = "__codex_code_mode_exit__"; #[derive(Clone, Debug)] pub struct ExecuteRequest { pub tool_call_id: String, pub enabled_tools: Vec, pub source: String, pub stored_values: HashMap, pub yield_time_ms: Option, pub max_output_tokens: Option, } #[derive(Clone, Debug)] pub struct WaitRequest { pub cell_id: String, pub yield_time_ms: u64, pub terminate: bool, } #[derive(Debug, PartialEq)] pub enum RuntimeResponse { Yielded { cell_id: String, content_items: Vec, }, Terminated { cell_id: String, content_items: Vec, }, Result { cell_id: String, content_items: Vec, stored_values: HashMap, error_text: Option, }, } #[derive(Debug)] pub(crate) enum TurnMessage { ToolCall { cell_id: String, id: String, name: String, input: Option, }, Notify { cell_id: String, call_id: String, text: String, }, } #[derive(Debug)] pub(crate) enum RuntimeCommand { ToolResponse { id: String, result: JsonValue }, ToolError { id: String, error_text: String }, Terminate, } #[derive(Debug)] pub(crate) enum RuntimeEvent { Started, ContentItem(FunctionCallOutputContentItem), YieldRequested, ToolCall { id: String, name: String, input: Option, }, Notify { call_id: String, text: String, }, Result { stored_values: HashMap, error_text: Option, }, } pub(crate) fn spawn_runtime( request: ExecuteRequest, event_tx: mpsc::UnboundedSender, ) -> Result<(std_mpsc::Sender, v8::IsolateHandle), String> { let (command_tx, command_rx) = std_mpsc::channel(); let (isolate_handle_tx, isolate_handle_rx) = std_mpsc::sync_channel(1); let enabled_tools = request .enabled_tools .iter() .map(enabled_tool_metadata) .collect::>(); let config = RuntimeConfig { tool_call_id: request.tool_call_id, enabled_tools, source: request.source, stored_values: request.stored_values, }; thread::spawn(move || { run_runtime(config, event_tx, command_rx, isolate_handle_tx); }); let isolate_handle = isolate_handle_rx .recv() .map_err(|_| "failed to initialize code mode runtime".to_string())?; Ok((command_tx, isolate_handle)) } #[derive(Clone)] struct RuntimeConfig { tool_call_id: String, enabled_tools: Vec, source: String, stored_values: HashMap, } pub(super) struct RuntimeState { event_tx: mpsc::UnboundedSender, pending_tool_calls: HashMap>, stored_values: HashMap, enabled_tools: Vec, next_tool_call_id: u64, tool_call_id: String, exit_requested: bool, } pub(super) enum CompletionState { Pending, Completed { stored_values: HashMap, error_text: Option, }, } fn initialize_v8() { static PLATFORM: OnceLock> = OnceLock::new(); let _ = PLATFORM.get_or_init(|| { let platform = v8::new_default_platform(0, false).make_shared(); v8::V8::initialize_platform(platform.clone()); v8::V8::initialize(); platform }); } fn run_runtime( config: RuntimeConfig, event_tx: mpsc::UnboundedSender, command_rx: std_mpsc::Receiver, isolate_handle_tx: std_mpsc::SyncSender, ) { initialize_v8(); let isolate = &mut v8::Isolate::new(v8::CreateParams::default()); let isolate_handle = isolate.thread_safe_handle(); if isolate_handle_tx.send(isolate_handle).is_err() { return; } isolate.set_host_import_module_dynamically_callback(module_loader::dynamic_import_callback); v8::scope!(let scope, isolate); let context = v8::Context::new(scope, Default::default()); let scope = &mut v8::ContextScope::new(scope, context); scope.set_slot(RuntimeState { event_tx: event_tx.clone(), pending_tool_calls: HashMap::new(), stored_values: config.stored_values, enabled_tools: config.enabled_tools, next_tool_call_id: 1, tool_call_id: config.tool_call_id, exit_requested: false, }); if let Err(error_text) = globals::install_globals(scope) { send_result(&event_tx, HashMap::new(), Some(error_text)); return; } let _ = event_tx.send(RuntimeEvent::Started); let pending_promise = match module_loader::evaluate_main_module(scope, &config.source) { Ok(pending_promise) => pending_promise, Err(error_text) => { capture_scope_send_error(scope, &event_tx, Some(error_text)); return; } }; match module_loader::completion_state(scope, pending_promise.as_ref()) { CompletionState::Completed { stored_values, error_text, } => { send_result(&event_tx, stored_values, error_text); return; } CompletionState::Pending => {} } let mut pending_promise = pending_promise; loop { let Ok(command) = command_rx.recv() else { break; }; match command { RuntimeCommand::Terminate => break, RuntimeCommand::ToolResponse { id, result } => { if let Err(error_text) = module_loader::resolve_tool_response(scope, &id, Ok(result)) { capture_scope_send_error(scope, &event_tx, Some(error_text)); return; } } RuntimeCommand::ToolError { id, error_text } => { if let Err(runtime_error) = module_loader::resolve_tool_response(scope, &id, Err(error_text)) { capture_scope_send_error(scope, &event_tx, Some(runtime_error)); return; } } } scope.perform_microtask_checkpoint(); match module_loader::completion_state(scope, pending_promise.as_ref()) { CompletionState::Completed { stored_values, error_text, } => { send_result(&event_tx, stored_values, error_text); return; } CompletionState::Pending => {} } if let Some(promise) = pending_promise.as_ref() { let promise = v8::Local::new(scope, promise); if promise.state() != v8::PromiseState::Pending { pending_promise = None; } } } } fn capture_scope_send_error( scope: &mut v8::PinScope<'_, '_>, event_tx: &mpsc::UnboundedSender, error_text: Option, ) { let stored_values = scope .get_slot::() .map(|state| state.stored_values.clone()) .unwrap_or_default(); send_result(event_tx, stored_values, error_text); } fn send_result( event_tx: &mpsc::UnboundedSender, stored_values: HashMap, error_text: Option, ) { let _ = event_tx.send(RuntimeEvent::Result { stored_values, error_text, }); } #[cfg(test)] mod tests { use std::collections::HashMap; use std::time::Duration; use pretty_assertions::assert_eq; use tokio::sync::mpsc; use super::ExecuteRequest; use super::RuntimeEvent; use super::spawn_runtime; fn execute_request(source: &str) -> ExecuteRequest { ExecuteRequest { tool_call_id: "call_1".to_string(), enabled_tools: Vec::new(), source: source.to_string(), stored_values: HashMap::new(), yield_time_ms: Some(1), max_output_tokens: None, } } #[tokio::test] async fn terminate_execution_stops_cpu_bound_module() { let (event_tx, mut event_rx) = mpsc::unbounded_channel(); let (_runtime_tx, runtime_terminate_handle) = spawn_runtime(execute_request("while (true) {}"), event_tx).unwrap(); let started_event = tokio::time::timeout(Duration::from_secs(1), event_rx.recv()) .await .unwrap() .unwrap(); assert!(matches!(started_event, RuntimeEvent::Started)); assert!(runtime_terminate_handle.terminate_execution()); let result_event = tokio::time::timeout(Duration::from_secs(1), event_rx.recv()) .await .unwrap() .unwrap(); let RuntimeEvent::Result { stored_values, error_text, } = result_event else { panic!("expected runtime result after termination"); }; assert_eq!(stored_values, HashMap::new()); assert!(error_text.is_some()); assert!( tokio::time::timeout(Duration::from_secs(1), event_rx.recv()) .await .unwrap() .is_none() ); } }