mirror of
https://github.com/openai/codex.git
synced 2026-05-25 05:24:37 +00:00
## Summary Extends rollout tracing across tool dispatch and code-mode runtime boundaries. This records canonical tool-call lifecycle events and links code-mode execution/wait operations back to the model-visible calls that caused them. ## Stack This is PR 3/5 in the rollout trace stack. - [#18876](https://github.com/openai/codex/pull/18876): Add rollout trace crate - [#18877](https://github.com/openai/codex/pull/18877): Record core session rollout traces - [#18878](https://github.com/openai/codex/pull/18878): Trace tool and code-mode boundaries - [#18879](https://github.com/openai/codex/pull/18879): Trace sessions and multi-agent edges - [#18880](https://github.com/openai/codex/pull/18880): Add debug trace reduction command ## Review Notes This PR is about attribution. Reviewers should focus on whether direct tool calls, code-mode-originated tool calls, waits, outputs, and cancellation boundaries are recorded with enough source information for deterministic reduction without coupling the reducer to live runtime internals. The stack remains valid after this layer: tool and code-mode traces reduce through the existing crate model, while the broader session and multi-agent relationships are added in the next PR.
417 lines
12 KiB
Rust
417 lines
12 KiB
Rust
mod callbacks;
|
|
mod globals;
|
|
mod module_loader;
|
|
mod timers;
|
|
mod value;
|
|
|
|
use std::collections::HashMap;
|
|
use std::sync::OnceLock;
|
|
use std::sync::mpsc as std_mpsc;
|
|
use std::thread;
|
|
|
|
use codex_protocol::ToolName;
|
|
use serde::Serialize;
|
|
use serde_json::Value as JsonValue;
|
|
use tokio::sync::mpsc;
|
|
|
|
use crate::description::EnabledToolMetadata;
|
|
use crate::description::ToolDefinition;
|
|
use crate::description::enabled_tool_metadata;
|
|
use crate::response::FunctionCallOutputContentItem;
|
|
|
|
pub const DEFAULT_EXEC_YIELD_TIME_MS: u64 = 10_000;
|
|
pub const DEFAULT_WAIT_YIELD_TIME_MS: u64 = 10_000;
|
|
pub const DEFAULT_MAX_OUTPUT_TOKENS_PER_EXEC_CALL: usize = 10_000;
|
|
const EXIT_SENTINEL: &str = "__codex_code_mode_exit__";
|
|
|
|
#[derive(Clone, Debug)]
|
|
pub struct ExecuteRequest {
|
|
/// Runtime cell id for this execution.
|
|
///
|
|
/// Callers allocate this before execution so tracing, waits, and nested tool
|
|
/// calls can refer to the cell as soon as JavaScript starts.
|
|
pub cell_id: String,
|
|
pub tool_call_id: String,
|
|
pub enabled_tools: Vec<ToolDefinition>,
|
|
pub source: String,
|
|
pub stored_values: HashMap<String, JsonValue>,
|
|
pub yield_time_ms: Option<u64>,
|
|
pub max_output_tokens: Option<usize>,
|
|
}
|
|
|
|
#[derive(Clone, Debug)]
|
|
pub struct WaitRequest {
|
|
pub cell_id: String,
|
|
pub yield_time_ms: u64,
|
|
pub terminate: bool,
|
|
}
|
|
|
|
/// Result of waiting on a code-mode cell.
|
|
///
|
|
/// The wrapped `RuntimeResponse` is the model-facing wait result. The enum
|
|
/// variant carries the extra lifecycle provenance that `RuntimeResponse` cannot:
|
|
/// a failed real cell and a missing-cell wait both use
|
|
/// `RuntimeResponse::Result { error_text: Some(..), .. }`, but only the former
|
|
/// should be treated as a code-cell lifecycle event.
|
|
#[derive(Debug, PartialEq)]
|
|
pub enum WaitOutcome {
|
|
/// The requested code cell was live when the wait command was accepted.
|
|
LiveCell(RuntimeResponse),
|
|
/// The requested code cell was not live.
|
|
MissingCell(RuntimeResponse),
|
|
}
|
|
|
|
impl From<WaitOutcome> for RuntimeResponse {
|
|
fn from(outcome: WaitOutcome) -> Self {
|
|
match outcome {
|
|
WaitOutcome::LiveCell(response) | WaitOutcome::MissingCell(response) => response,
|
|
}
|
|
}
|
|
}
|
|
|
|
#[derive(Debug, PartialEq, Serialize)]
|
|
pub enum RuntimeResponse {
|
|
Yielded {
|
|
cell_id: String,
|
|
content_items: Vec<FunctionCallOutputContentItem>,
|
|
},
|
|
Terminated {
|
|
cell_id: String,
|
|
content_items: Vec<FunctionCallOutputContentItem>,
|
|
},
|
|
Result {
|
|
cell_id: String,
|
|
content_items: Vec<FunctionCallOutputContentItem>,
|
|
stored_values: HashMap<String, JsonValue>,
|
|
error_text: Option<String>,
|
|
},
|
|
}
|
|
|
|
/// Nested tool request emitted by one code-mode cell.
|
|
///
|
|
/// Code mode owns the per-cell runtime id. Hosts should preserve it for
|
|
/// provenance/debugging, but should still assign their own runtime tool call id
|
|
/// if their tool-call graph requires globally unique ids.
|
|
#[derive(Debug)]
|
|
pub struct CodeModeNestedToolCall {
|
|
pub cell_id: String,
|
|
pub runtime_tool_call_id: String,
|
|
pub tool_name: ToolName,
|
|
pub input: Option<JsonValue>,
|
|
}
|
|
|
|
#[derive(Debug)]
|
|
pub(crate) enum TurnMessage {
|
|
ToolCall(CodeModeNestedToolCall),
|
|
Notify {
|
|
cell_id: String,
|
|
call_id: String,
|
|
text: String,
|
|
},
|
|
}
|
|
|
|
#[derive(Debug)]
|
|
pub(crate) enum RuntimeCommand {
|
|
ToolResponse { id: String, result: JsonValue },
|
|
ToolError { id: String, error_text: String },
|
|
TimeoutFired { id: u64 },
|
|
Terminate,
|
|
}
|
|
|
|
#[derive(Debug)]
|
|
pub(crate) enum RuntimeEvent {
|
|
Started,
|
|
ContentItem(FunctionCallOutputContentItem),
|
|
YieldRequested,
|
|
ToolCall {
|
|
id: String,
|
|
name: ToolName,
|
|
input: Option<JsonValue>,
|
|
},
|
|
Notify {
|
|
call_id: String,
|
|
text: String,
|
|
},
|
|
Result {
|
|
stored_values: HashMap<String, JsonValue>,
|
|
error_text: Option<String>,
|
|
},
|
|
}
|
|
|
|
pub(crate) fn spawn_runtime(
|
|
request: ExecuteRequest,
|
|
event_tx: mpsc::UnboundedSender<RuntimeEvent>,
|
|
) -> Result<(std_mpsc::Sender<RuntimeCommand>, v8::IsolateHandle), String> {
|
|
initialize_v8()?;
|
|
|
|
let (command_tx, command_rx) = std_mpsc::channel();
|
|
let runtime_command_tx = command_tx.clone();
|
|
let (isolate_handle_tx, isolate_handle_rx) = std_mpsc::sync_channel(1);
|
|
let enabled_tools = request
|
|
.enabled_tools
|
|
.iter()
|
|
.map(enabled_tool_metadata)
|
|
.collect::<Vec<_>>();
|
|
let config = RuntimeConfig {
|
|
tool_call_id: request.tool_call_id,
|
|
enabled_tools,
|
|
source: request.source,
|
|
stored_values: request.stored_values,
|
|
};
|
|
|
|
thread::spawn(move || {
|
|
run_runtime(
|
|
config,
|
|
event_tx,
|
|
command_rx,
|
|
isolate_handle_tx,
|
|
runtime_command_tx,
|
|
);
|
|
});
|
|
|
|
let isolate_handle = isolate_handle_rx
|
|
.recv()
|
|
.map_err(|_| "failed to initialize code mode runtime".to_string())?;
|
|
Ok((command_tx, isolate_handle))
|
|
}
|
|
|
|
#[derive(Clone)]
|
|
struct RuntimeConfig {
|
|
tool_call_id: String,
|
|
enabled_tools: Vec<EnabledToolMetadata>,
|
|
source: String,
|
|
stored_values: HashMap<String, JsonValue>,
|
|
}
|
|
|
|
pub(super) struct RuntimeState {
|
|
event_tx: mpsc::UnboundedSender<RuntimeEvent>,
|
|
pending_tool_calls: HashMap<String, v8::Global<v8::PromiseResolver>>,
|
|
pending_timeouts: HashMap<u64, timers::ScheduledTimeout>,
|
|
stored_values: HashMap<String, JsonValue>,
|
|
enabled_tools: Vec<EnabledToolMetadata>,
|
|
next_tool_call_id: u64,
|
|
next_timeout_id: u64,
|
|
tool_call_id: String,
|
|
runtime_command_tx: std_mpsc::Sender<RuntimeCommand>,
|
|
exit_requested: bool,
|
|
}
|
|
|
|
pub(super) enum CompletionState {
|
|
Pending,
|
|
Completed {
|
|
stored_values: HashMap<String, JsonValue>,
|
|
error_text: Option<String>,
|
|
},
|
|
}
|
|
|
|
fn initialize_v8() -> Result<(), String> {
|
|
static PLATFORM: OnceLock<Result<v8::SharedRef<v8::Platform>, String>> = OnceLock::new();
|
|
|
|
match PLATFORM.get_or_init(|| {
|
|
v8::icu::set_common_data_77(deno_core_icudata::ICU_DATA)
|
|
.map_err(|error_code| format!("failed to initialize ICU data: {error_code}"))?;
|
|
let platform = v8::new_default_platform(0, false).make_shared();
|
|
v8::V8::initialize_platform(platform.clone());
|
|
v8::V8::initialize();
|
|
Ok(platform)
|
|
}) {
|
|
Ok(_) => Ok(()),
|
|
Err(error_text) => Err(error_text.clone()),
|
|
}
|
|
}
|
|
|
|
fn run_runtime(
|
|
config: RuntimeConfig,
|
|
event_tx: mpsc::UnboundedSender<RuntimeEvent>,
|
|
command_rx: std_mpsc::Receiver<RuntimeCommand>,
|
|
isolate_handle_tx: std_mpsc::SyncSender<v8::IsolateHandle>,
|
|
runtime_command_tx: std_mpsc::Sender<RuntimeCommand>,
|
|
) {
|
|
let isolate = &mut v8::Isolate::new(v8::CreateParams::default());
|
|
let isolate_handle = isolate.thread_safe_handle();
|
|
if isolate_handle_tx.send(isolate_handle).is_err() {
|
|
return;
|
|
}
|
|
isolate.set_host_import_module_dynamically_callback(module_loader::dynamic_import_callback);
|
|
|
|
v8::scope!(let scope, isolate);
|
|
let context = v8::Context::new(scope, Default::default());
|
|
let scope = &mut v8::ContextScope::new(scope, context);
|
|
|
|
scope.set_slot(RuntimeState {
|
|
event_tx: event_tx.clone(),
|
|
pending_tool_calls: HashMap::new(),
|
|
pending_timeouts: HashMap::new(),
|
|
stored_values: config.stored_values,
|
|
enabled_tools: config.enabled_tools,
|
|
next_tool_call_id: 1,
|
|
next_timeout_id: 1,
|
|
tool_call_id: config.tool_call_id,
|
|
runtime_command_tx,
|
|
exit_requested: false,
|
|
});
|
|
|
|
if let Err(error_text) = globals::install_globals(scope) {
|
|
send_result(&event_tx, HashMap::new(), Some(error_text));
|
|
return;
|
|
}
|
|
|
|
let _ = event_tx.send(RuntimeEvent::Started);
|
|
|
|
let pending_promise = match module_loader::evaluate_main_module(scope, &config.source) {
|
|
Ok(pending_promise) => pending_promise,
|
|
Err(error_text) => {
|
|
capture_scope_send_error(scope, &event_tx, Some(error_text));
|
|
return;
|
|
}
|
|
};
|
|
|
|
match module_loader::completion_state(scope, pending_promise.as_ref()) {
|
|
CompletionState::Completed {
|
|
stored_values,
|
|
error_text,
|
|
} => {
|
|
send_result(&event_tx, stored_values, error_text);
|
|
return;
|
|
}
|
|
CompletionState::Pending => {}
|
|
}
|
|
|
|
let mut pending_promise = pending_promise;
|
|
loop {
|
|
let Ok(command) = command_rx.recv() else {
|
|
break;
|
|
};
|
|
|
|
match command {
|
|
RuntimeCommand::Terminate => break,
|
|
RuntimeCommand::ToolResponse { id, result } => {
|
|
if let Err(error_text) =
|
|
module_loader::resolve_tool_response(scope, &id, Ok(result))
|
|
{
|
|
capture_scope_send_error(scope, &event_tx, Some(error_text));
|
|
return;
|
|
}
|
|
}
|
|
RuntimeCommand::ToolError { id, error_text } => {
|
|
if let Err(runtime_error) =
|
|
module_loader::resolve_tool_response(scope, &id, Err(error_text))
|
|
{
|
|
capture_scope_send_error(scope, &event_tx, Some(runtime_error));
|
|
return;
|
|
}
|
|
}
|
|
RuntimeCommand::TimeoutFired { id } => {
|
|
if let Err(runtime_error) = timers::invoke_timeout_callback(scope, id) {
|
|
capture_scope_send_error(scope, &event_tx, Some(runtime_error));
|
|
return;
|
|
}
|
|
}
|
|
}
|
|
|
|
scope.perform_microtask_checkpoint();
|
|
match module_loader::completion_state(scope, pending_promise.as_ref()) {
|
|
CompletionState::Completed {
|
|
stored_values,
|
|
error_text,
|
|
} => {
|
|
send_result(&event_tx, stored_values, error_text);
|
|
return;
|
|
}
|
|
CompletionState::Pending => {}
|
|
}
|
|
|
|
if let Some(promise) = pending_promise.as_ref() {
|
|
let promise = v8::Local::new(scope, promise);
|
|
if promise.state() != v8::PromiseState::Pending {
|
|
pending_promise = None;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
fn capture_scope_send_error(
|
|
scope: &mut v8::PinScope<'_, '_>,
|
|
event_tx: &mpsc::UnboundedSender<RuntimeEvent>,
|
|
error_text: Option<String>,
|
|
) {
|
|
let stored_values = scope
|
|
.get_slot::<RuntimeState>()
|
|
.map(|state| state.stored_values.clone())
|
|
.unwrap_or_default();
|
|
|
|
send_result(event_tx, stored_values, error_text);
|
|
}
|
|
|
|
fn send_result(
|
|
event_tx: &mpsc::UnboundedSender<RuntimeEvent>,
|
|
stored_values: HashMap<String, JsonValue>,
|
|
error_text: Option<String>,
|
|
) {
|
|
let _ = event_tx.send(RuntimeEvent::Result {
|
|
stored_values,
|
|
error_text,
|
|
});
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use std::collections::HashMap;
|
|
use std::time::Duration;
|
|
|
|
use pretty_assertions::assert_eq;
|
|
use tokio::sync::mpsc;
|
|
|
|
use super::ExecuteRequest;
|
|
use super::RuntimeEvent;
|
|
use super::spawn_runtime;
|
|
|
|
fn execute_request(source: &str) -> ExecuteRequest {
|
|
ExecuteRequest {
|
|
cell_id: "1".to_string(),
|
|
tool_call_id: "call_1".to_string(),
|
|
enabled_tools: Vec::new(),
|
|
source: source.to_string(),
|
|
stored_values: HashMap::new(),
|
|
yield_time_ms: Some(1),
|
|
max_output_tokens: None,
|
|
}
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn terminate_execution_stops_cpu_bound_module() {
|
|
let (event_tx, mut event_rx) = mpsc::unbounded_channel();
|
|
let (_runtime_tx, runtime_terminate_handle) =
|
|
spawn_runtime(execute_request("while (true) {}"), event_tx).unwrap();
|
|
|
|
let started_event = tokio::time::timeout(Duration::from_secs(1), event_rx.recv())
|
|
.await
|
|
.unwrap()
|
|
.unwrap();
|
|
assert!(matches!(started_event, RuntimeEvent::Started));
|
|
|
|
assert!(runtime_terminate_handle.terminate_execution());
|
|
|
|
let result_event = tokio::time::timeout(Duration::from_secs(1), event_rx.recv())
|
|
.await
|
|
.unwrap()
|
|
.unwrap();
|
|
let RuntimeEvent::Result {
|
|
stored_values,
|
|
error_text,
|
|
} = result_event
|
|
else {
|
|
panic!("expected runtime result after termination");
|
|
};
|
|
assert_eq!(stored_values, HashMap::new());
|
|
assert!(error_text.is_some());
|
|
|
|
assert!(
|
|
tokio::time::timeout(Duration::from_secs(1), event_rx.recv())
|
|
.await
|
|
.unwrap()
|
|
.is_none()
|
|
);
|
|
}
|
|
}
|