diff --git a/codex-rs/code-mode/src/lib.rs b/codex-rs/code-mode/src/lib.rs index bf0ce699cc..3da8c77325 100644 --- a/codex-rs/code-mode/src/lib.rs +++ b/codex-rs/code-mode/src/lib.rs @@ -23,9 +23,12 @@ pub use runtime::DEFAULT_EXEC_YIELD_TIME_MS; pub use runtime::DEFAULT_MAX_OUTPUT_TOKENS_PER_EXEC_CALL; pub use runtime::DEFAULT_WAIT_YIELD_TIME_MS; pub use runtime::ExecuteRequest; +pub use runtime::ExecuteToPendingOutcome; pub use runtime::RuntimeResponse; pub use runtime::WaitOutcome; pub use runtime::WaitRequest; +pub use runtime::WaitToPendingOutcome; +pub use runtime::WaitToPendingRequest; pub use service::CodeModeService; pub use service::CodeModeTurnHost; pub use service::CodeModeTurnWorker; diff --git a/codex-rs/code-mode/src/runtime/mod.rs b/codex-rs/code-mode/src/runtime/mod.rs index f989312da1..2b605fa573 100644 --- a/codex-rs/code-mode/src/runtime/mod.rs +++ b/codex-rs/code-mode/src/runtime/mod.rs @@ -47,6 +47,11 @@ pub struct WaitRequest { pub terminate: bool, } +#[derive(Clone, Debug)] +pub struct WaitToPendingRequest { + pub cell_id: String, +} + /// Result of waiting on a code-mode cell. /// /// The wrapped `RuntimeResponse` is the model-facing wait result. The enum @@ -62,6 +67,34 @@ pub enum WaitOutcome { MissingCell(RuntimeResponse), } +/// Result of executing a code-mode cell until it either completes or reaches a +/// quiescent pending state. +#[derive(Debug, PartialEq)] +pub enum ExecuteToPendingOutcome { + /// The cell is waiting for more runtime input after draining the runtime + /// input queue that was ready at the pending boundary. + Pending { + cell_id: String, + content_items: Vec, + /// Runtime tool-call ids emitted before this paused execution frontier + /// sealed. Hosts can use these ids to drain their tool-call transport + /// before surfacing the pending boundary to callers. + pending_tool_call_ids: Vec, + }, + /// The cell reached a terminal runtime response before going pending. + Completed(RuntimeResponse), +} + +/// Result of resuming a live code-mode cell until it completes or becomes +/// quiescent again. +#[derive(Debug, PartialEq)] +pub enum WaitToPendingOutcome { + /// The requested code cell was live when the wait command was accepted. + LiveCell(ExecuteToPendingOutcome), + /// The requested code cell was not live. + MissingCell(RuntimeResponse), +} + impl From for RuntimeResponse { fn from(outcome: WaitOutcome) -> Self { match outcome { @@ -120,9 +153,22 @@ pub(crate) enum RuntimeCommand { Terminate, } +#[derive(Clone, Copy, Debug, PartialEq)] +pub(crate) enum PendingRuntimeMode { + Continue, + PauseUntilResumed, +} + +#[derive(Debug)] +pub(crate) enum RuntimeControlCommand { + Resume, + Terminate, +} + #[derive(Debug)] pub(crate) enum RuntimeEvent { Started, + Pending, ContentItem(FunctionCallOutputContentItem), YieldRequested, ToolCall { @@ -144,10 +190,19 @@ pub(crate) enum RuntimeEvent { pub(crate) fn spawn_runtime( request: ExecuteRequest, event_tx: mpsc::UnboundedSender, -) -> Result<(std_mpsc::Sender, v8::IsolateHandle), String> { + pending_mode: PendingRuntimeMode, +) -> Result< + ( + std_mpsc::Sender, + std_mpsc::Sender, + v8::IsolateHandle, + ), + String, +> { initialize_v8()?; let (command_tx, command_rx) = std_mpsc::channel(); + let (control_tx, control_rx) = std_mpsc::channel(); let runtime_command_tx = command_tx.clone(); let (isolate_handle_tx, isolate_handle_rx) = std_mpsc::sync_channel(1); let enabled_tools = request @@ -167,6 +222,8 @@ pub(crate) fn spawn_runtime( config, event_tx, command_rx, + control_rx, + pending_mode, isolate_handle_tx, runtime_command_tx, ); @@ -175,7 +232,7 @@ pub(crate) fn spawn_runtime( let isolate_handle = isolate_handle_rx .recv() .map_err(|_| "failed to initialize code mode runtime".to_string())?; - Ok((command_tx, isolate_handle)) + Ok((command_tx, control_tx, isolate_handle)) } #[derive(Clone)] @@ -227,6 +284,8 @@ fn run_runtime( config: RuntimeConfig, event_tx: mpsc::UnboundedSender, command_rx: std_mpsc::Receiver, + control_rx: std_mpsc::Receiver, + pending_mode: PendingRuntimeMode, isolate_handle_tx: std_mpsc::SyncSender, runtime_command_tx: std_mpsc::Sender, ) { @@ -282,7 +341,8 @@ fn run_runtime( let mut pending_promise = pending_promise; loop { - let Ok(command) = command_rx.recv() else { + let Some(command) = next_runtime_command(&event_tx, &command_rx, &control_rx, pending_mode) + else { break; }; @@ -333,6 +393,30 @@ fn run_runtime( } } +fn next_runtime_command( + event_tx: &mpsc::UnboundedSender, + command_rx: &std_mpsc::Receiver, + control_rx: &std_mpsc::Receiver, + pending_mode: PendingRuntimeMode, +) -> Option { + loop { + match command_rx.try_recv() { + Ok(command) => return Some(command), + Err(std_mpsc::TryRecvError::Disconnected) => return None, + Err(std_mpsc::TryRecvError::Empty) => {} + } + + let _ = event_tx.send(RuntimeEvent::Pending); + match pending_mode { + PendingRuntimeMode::Continue => return command_rx.recv().ok(), + PendingRuntimeMode::PauseUntilResumed => match control_rx.recv().ok()? { + RuntimeControlCommand::Resume => continue, + RuntimeControlCommand::Terminate => return Some(RuntimeCommand::Terminate), + }, + } + } +} + fn capture_scope_send_error( scope: &mut v8::PinScope<'_, '_>, event_tx: &mpsc::UnboundedSender, @@ -366,8 +450,12 @@ mod tests { use tokio::sync::mpsc; use super::ExecuteRequest; + use super::PendingRuntimeMode; + use super::RuntimeCommand; + use super::RuntimeControlCommand; use super::RuntimeEvent; use super::spawn_runtime; + use crate::FunctionCallOutputContentItem; fn execute_request(source: &str) -> ExecuteRequest { ExecuteRequest { @@ -384,8 +472,12 @@ mod tests { #[tokio::test] async fn terminate_execution_stops_cpu_bound_module() { let (event_tx, mut event_rx) = mpsc::unbounded_channel(); - let (_runtime_tx, runtime_terminate_handle) = - spawn_runtime(execute_request("while (true) {}"), event_tx).unwrap(); + let (_runtime_tx, _runtime_control_tx, runtime_terminate_handle) = spawn_runtime( + execute_request("while (true) {}"), + event_tx, + PendingRuntimeMode::Continue, + ) + .unwrap(); let started_event = tokio::time::timeout(Duration::from_secs(1), event_rx.recv()) .await @@ -416,4 +508,71 @@ mod tests { .is_none() ); } + + #[tokio::test] + async fn pending_mode_freezes_runtime_commands_until_resume() { + let (event_tx, mut event_rx) = mpsc::unbounded_channel(); + let (runtime_tx, runtime_control_tx, _runtime_terminate_handle) = spawn_runtime( + execute_request( + r#" +await new Promise((resolve) => setTimeout(resolve, 60_000)); +text("after"); +await new Promise(() => {}); +"#, + ), + event_tx, + PendingRuntimeMode::PauseUntilResumed, + ) + .unwrap(); + + assert!(matches!( + tokio::time::timeout(Duration::from_secs(1), event_rx.recv()) + .await + .unwrap() + .unwrap(), + RuntimeEvent::Started + )); + assert!(matches!( + tokio::time::timeout(Duration::from_secs(1), event_rx.recv()) + .await + .unwrap() + .unwrap(), + RuntimeEvent::Pending + )); + + runtime_tx + .send(RuntimeCommand::TimeoutFired { id: 1 }) + .unwrap(); + assert!( + tokio::time::timeout(Duration::from_millis(100), event_rx.recv()) + .await + .is_err() + ); + + runtime_control_tx + .send(RuntimeControlCommand::Resume) + .unwrap(); + + let content_event = tokio::time::timeout(Duration::from_secs(1), event_rx.recv()) + .await + .unwrap() + .unwrap(); + let RuntimeEvent::ContentItem(FunctionCallOutputContentItem::InputText { text }) = + content_event + else { + panic!("expected resumed runtime output"); + }; + assert_eq!(text, "after"); + assert!(matches!( + tokio::time::timeout(Duration::from_secs(1), event_rx.recv()) + .await + .unwrap() + .unwrap(), + RuntimeEvent::Pending + )); + + runtime_control_tx + .send(RuntimeControlCommand::Terminate) + .unwrap(); + } } diff --git a/codex-rs/code-mode/src/service.rs b/codex-rs/code-mode/src/service.rs index 3ab8913f19..44e4be4939 100644 --- a/codex-rs/code-mode/src/service.rs +++ b/codex-rs/code-mode/src/service.rs @@ -16,12 +16,17 @@ use crate::FunctionCallOutputContentItem; use crate::runtime::CodeModeNestedToolCall; use crate::runtime::DEFAULT_EXEC_YIELD_TIME_MS; use crate::runtime::ExecuteRequest; +use crate::runtime::ExecuteToPendingOutcome; +use crate::runtime::PendingRuntimeMode; use crate::runtime::RuntimeCommand; +use crate::runtime::RuntimeControlCommand; use crate::runtime::RuntimeEvent; use crate::runtime::RuntimeResponse; use crate::runtime::TurnMessage; use crate::runtime::WaitOutcome; use crate::runtime::WaitRequest; +use crate::runtime::WaitToPendingOutcome; +use crate::runtime::WaitToPendingRequest; use crate::runtime::spawn_runtime; #[async_trait] @@ -90,18 +95,57 @@ impl CodeModeService { } pub async fn execute(&self, request: ExecuteRequest) -> Result { - let cell_id = request.cell_id.clone(); let initial_yield_time_ms = request.yield_time_ms.unwrap_or(DEFAULT_EXEC_YIELD_TIME_MS); + let (response_tx, response_rx) = oneshot::channel(); + self.start_session( + request, + SessionResponseSender::Runtime(response_tx), + Some(initial_yield_time_ms), + PendingRuntimeMode::Continue, + ) + .await?; + + response_rx + .await + .map_err(|_| "exec runtime ended unexpectedly".to_string()) + } + + pub async fn execute_to_pending( + &self, + request: ExecuteRequest, + ) -> Result { + let (response_tx, response_rx) = oneshot::channel(); + self.start_session( + request, + SessionResponseSender::ExecuteToPending(response_tx), + /*initial_yield_time_ms*/ None, + PendingRuntimeMode::PauseUntilResumed, + ) + .await?; + + response_rx + .await + .map_err(|_| "exec runtime ended unexpectedly".to_string()) + } + + async fn start_session( + &self, + request: ExecuteRequest, + initial_response_tx: SessionResponseSender, + initial_yield_time_ms: Option, + pending_mode: PendingRuntimeMode, + ) -> Result<(), String> { + let cell_id = request.cell_id.clone(); let (event_tx, event_rx) = mpsc::unbounded_channel(); let (control_tx, control_rx) = mpsc::unbounded_channel(); - let (response_tx, response_rx) = oneshot::channel(); - let (runtime_tx, runtime_terminate_handle) = { + let (runtime_tx, runtime_control_tx, runtime_terminate_handle) = { let mut sessions = self.inner.sessions.lock().await; if sessions.contains_key(&cell_id) { return Err(format!("exec cell {cell_id} already exists")); } - let (runtime_tx, runtime_terminate_handle) = spawn_runtime(request, event_tx)?; + let (runtime_tx, runtime_control_tx, runtime_terminate_handle) = + spawn_runtime(request, event_tx, pending_mode)?; // Keep the session registry locked through insertion so a // caller-owned cell id cannot race with another execute and replace @@ -113,7 +157,7 @@ impl CodeModeService { runtime_tx: runtime_tx.clone(), }, ); - (runtime_tx, runtime_terminate_handle) + (runtime_tx, runtime_control_tx, runtime_terminate_handle) }; tokio::spawn(run_session_control( @@ -121,17 +165,17 @@ impl CodeModeService { SessionControlContext { cell_id: cell_id.clone(), runtime_tx, + runtime_control_tx, + pending_mode, runtime_terminate_handle, }, event_rx, control_rx, - response_tx, + initial_response_tx, initial_yield_time_ms, )); - response_rx - .await - .map_err(|_| "exec runtime ended unexpectedly".to_string()) + Ok(()) } pub async fn wait(&self, request: WaitRequest) -> Result { @@ -166,6 +210,41 @@ impl CodeModeService { } } + pub async fn wait_to_pending( + &self, + request: WaitToPendingRequest, + ) -> Result { + let cell_id = request.cell_id.clone(); + let handle = self + .inner + .sessions + .lock() + .await + .get(&request.cell_id) + .cloned(); + let Some(handle) = handle else { + return Ok(WaitToPendingOutcome::MissingCell(missing_cell_response( + cell_id, + ))); + }; + let (response_tx, response_rx) = oneshot::channel(); + if handle + .control_tx + .send(SessionControlCommand::PollToPending { response_tx }) + .is_err() + { + return Ok(WaitToPendingOutcome::MissingCell(missing_cell_response( + cell_id, + ))); + } + match response_rx.await { + Ok(response) => Ok(WaitToPendingOutcome::LiveCell(response)), + Err(_) => Ok(WaitToPendingOutcome::MissingCell(missing_cell_response( + request.cell_id, + ))), + } + } + pub fn start_turn_worker(&self, host: Arc) -> CodeModeTurnWorker { let (shutdown_tx, mut shutdown_rx) = oneshot::channel(); let inner = Arc::clone(&self.inner); @@ -255,11 +334,19 @@ enum SessionControlCommand { yield_time_ms: u64, response_tx: oneshot::Sender, }, + PollToPending { + response_tx: oneshot::Sender, + }, Terminate { response_tx: oneshot::Sender, }, } +enum SessionResponseSender { + Runtime(oneshot::Sender), + ExecuteToPending(oneshot::Sender), +} + struct PendingResult { content_items: Vec, stored_values: HashMap, @@ -269,6 +356,8 @@ struct PendingResult { struct SessionControlContext { cell_id: String, runtime_tx: std::sync::mpsc::Sender, + runtime_control_tx: std::sync::mpsc::Sender, + pending_mode: PendingRuntimeMode, runtime_terminate_handle: v8::IsolateHandle, } @@ -290,14 +379,26 @@ fn pending_result_response(cell_id: &str, result: PendingResult) -> RuntimeRespo } } +fn send_terminal_response(response_tx: SessionResponseSender, response: RuntimeResponse) { + match response_tx { + SessionResponseSender::Runtime(response_tx) => { + let _ = response_tx.send(response); + } + SessionResponseSender::ExecuteToPending(response_tx) => { + let _ = response_tx.send(ExecuteToPendingOutcome::Completed(response)); + } + } +} + fn send_or_buffer_result( cell_id: &str, result: PendingResult, - response_tx: &mut Option>, + response_tx: &mut Option, pending_result: &mut Option, ) -> bool { if let Some(response_tx) = response_tx.take() { - let _ = response_tx.send(pending_result_response(cell_id, result)); + let response = pending_result_response(cell_id, result); + send_terminal_response(response_tx, response); return true; } @@ -305,20 +406,46 @@ fn send_or_buffer_result( false } +fn send_yield_response( + cell_id: &str, + content_items: &mut Vec, + response_tx: &mut Option, +) { + let Some(current_response_tx) = response_tx.take() else { + return; + }; + match current_response_tx { + SessionResponseSender::Runtime(response_tx) => { + let _ = response_tx.send(RuntimeResponse::Yielded { + cell_id: cell_id.to_string(), + content_items: std::mem::take(content_items), + }); + } + SessionResponseSender::ExecuteToPending(execute_to_pending_tx) => { + *response_tx = Some(SessionResponseSender::ExecuteToPending( + execute_to_pending_tx, + )); + } + } +} + async fn run_session_control( inner: Arc, context: SessionControlContext, mut event_rx: mpsc::UnboundedReceiver, mut control_rx: mpsc::UnboundedReceiver, - initial_response_tx: oneshot::Sender, - initial_yield_time_ms: u64, + initial_response_tx: SessionResponseSender, + initial_yield_time_ms: Option, ) { let SessionControlContext { cell_id, runtime_tx, + runtime_control_tx, + pending_mode, runtime_terminate_handle, } = context; let mut content_items = Vec::new(); + let mut pending_tool_call_ids = Vec::new(); let mut pending_result: Option = None; let mut response_tx = Some(initial_response_tx); let mut termination_requested = false; @@ -338,10 +465,11 @@ async fn run_session_control( runtime_closed = true; if termination_requested { if let Some(response_tx) = response_tx.take() { - let _ = response_tx.send(RuntimeResponse::Terminated { + let response = RuntimeResponse::Terminated { cell_id: cell_id.clone(), content_items: std::mem::take(&mut content_items), - }); + }; + send_terminal_response(response_tx, response); } break; } @@ -364,19 +492,35 @@ async fn run_session_control( }; match event { RuntimeEvent::Started => { - yield_timer = Some(Box::pin(tokio::time::sleep(Duration::from_millis(initial_yield_time_ms)))); + yield_timer = initial_yield_time_ms.map(|initial_yield_time_ms| { + Box::pin(tokio::time::sleep(Duration::from_millis(initial_yield_time_ms))) + }); + } + RuntimeEvent::Pending => { + if let Some(current_response_tx) = response_tx.take() { + match current_response_tx { + SessionResponseSender::Runtime(runtime_response_tx) => { + response_tx = + Some(SessionResponseSender::Runtime(runtime_response_tx)); + } + SessionResponseSender::ExecuteToPending(response_tx) => { + let _ = response_tx.send(ExecuteToPendingOutcome::Pending { + cell_id: cell_id.clone(), + content_items: std::mem::take(&mut content_items), + pending_tool_call_ids: std::mem::take( + &mut pending_tool_call_ids, + ), + }); + } + } + } } RuntimeEvent::ContentItem(item) => { content_items.push(item); } RuntimeEvent::YieldRequested => { yield_timer = None; - if let Some(response_tx) = response_tx.take() { - let _ = response_tx.send(RuntimeResponse::Yielded { - cell_id: cell_id.clone(), - content_items: std::mem::take(&mut content_items), - }); - } + send_yield_response(&cell_id, &mut content_items, &mut response_tx); } RuntimeEvent::Notify { call_id, text } => { let _ = inner.turn_message_tx.send(TurnMessage::Notify { @@ -391,6 +535,9 @@ async fn run_session_control( kind, input, } => { + if pending_mode == PendingRuntimeMode::PauseUntilResumed { + pending_tool_call_ids.push(id.clone()); + } let tool_call = CodeModeNestedToolCall { cell_id: cell_id.clone(), runtime_tool_call_id: id, @@ -410,10 +557,11 @@ async fn run_session_control( yield_timer = None; if termination_requested { if let Some(response_tx) = response_tx.take() { - let _ = response_tx.send(RuntimeResponse::Terminated { + let response = RuntimeResponse::Terminated { cell_id: cell_id.clone(), content_items: std::mem::take(&mut content_items), - }); + }; + send_terminal_response(response_tx, response); } break; } @@ -446,8 +594,23 @@ async fn run_session_control( let _ = next_response_tx.send(pending_result_response(&cell_id, result)); break; } - response_tx = Some(next_response_tx); + response_tx = Some(SessionResponseSender::Runtime(next_response_tx)); yield_timer = Some(Box::pin(tokio::time::sleep(Duration::from_millis(yield_time_ms)))); + resume_paused_runtime(&runtime_control_tx, pending_mode); + } + SessionControlCommand::PollToPending { + response_tx: next_response_tx, + } => { + if let Some(result) = pending_result.take() { + let response = pending_result_response(&cell_id, result); + let _ = next_response_tx + .send(ExecuteToPendingOutcome::Completed(response)); + break; + } + response_tx = + Some(SessionResponseSender::ExecuteToPending(next_response_tx)); + yield_timer = None; + resume_paused_runtime(&runtime_control_tx, pending_mode); } SessionControlCommand::Terminate { response_tx: next_response_tx } => { if let Some(result) = pending_result.take() { @@ -455,17 +618,19 @@ async fn run_session_control( break; } - response_tx = Some(next_response_tx); + response_tx = Some(SessionResponseSender::Runtime(next_response_tx)); termination_requested = true; yield_timer = None; let _ = runtime_tx.send(RuntimeCommand::Terminate); + terminate_paused_runtime(&runtime_control_tx, pending_mode); let _ = runtime_terminate_handle.terminate_execution(); if runtime_closed { if let Some(response_tx) = response_tx.take() { - let _ = response_tx.send(RuntimeResponse::Terminated { + let response = RuntimeResponse::Terminated { cell_id: cell_id.clone(), content_items: std::mem::take(&mut content_items), - }); + }; + send_terminal_response(response_tx, response); } break; } else { @@ -482,20 +647,34 @@ async fn run_session_control( } } => { yield_timer = None; - if let Some(response_tx) = response_tx.take() { - let _ = response_tx.send(RuntimeResponse::Yielded { - cell_id: cell_id.clone(), - content_items: std::mem::take(&mut content_items), - }); - } + send_yield_response(&cell_id, &mut content_items, &mut response_tx); } } } let _ = runtime_tx.send(RuntimeCommand::Terminate); + terminate_paused_runtime(&runtime_control_tx, pending_mode); inner.sessions.lock().await.remove(&cell_id); } +fn resume_paused_runtime( + runtime_control_tx: &std::sync::mpsc::Sender, + pending_mode: PendingRuntimeMode, +) { + if pending_mode == PendingRuntimeMode::PauseUntilResumed { + let _ = runtime_control_tx.send(RuntimeControlCommand::Resume); + } +} + +fn terminate_paused_runtime( + runtime_control_tx: &std::sync::mpsc::Sender, + pending_mode: PendingRuntimeMode, +) { + if pending_mode == PendingRuntimeMode::PauseUntilResumed { + let _ = runtime_control_tx.send(RuntimeControlCommand::Terminate); + } +} + #[cfg(test)] mod tests { use std::collections::HashMap; @@ -503,6 +682,7 @@ mod tests { use std::sync::atomic::AtomicU64; use std::time::Duration; + use codex_protocol::ToolName; use pretty_assertions::assert_eq; use tokio::sync::Mutex; use tokio::sync::mpsc; @@ -510,15 +690,22 @@ mod tests { use super::CodeModeService; use super::Inner; + use super::PendingRuntimeMode; use super::RuntimeCommand; use super::RuntimeResponse; use super::SessionControlCommand; use super::SessionControlContext; + use super::SessionResponseSender; use super::WaitOutcome; use super::WaitRequest; + use super::WaitToPendingOutcome; + use super::WaitToPendingRequest; use super::run_session_control; + use crate::CodeModeToolKind; use crate::FunctionCallOutputContentItem; + use crate::ToolDefinition; use crate::runtime::ExecuteRequest; + use crate::runtime::ExecuteToPendingOutcome; use crate::runtime::RuntimeEvent; use crate::runtime::spawn_runtime; @@ -571,6 +758,363 @@ mod tests { ); } + #[tokio::test] + async fn execute_to_pending_returns_completed_for_synchronous_results() { + let service = CodeModeService::new(); + + let response = service + .execute_to_pending(ExecuteRequest { + source: r#"text("done");"#.to_string(), + yield_time_ms: Some(60_000), + ..execute_request("") + }) + .await + .unwrap(); + + assert_eq!( + response, + ExecuteToPendingOutcome::Completed(RuntimeResponse::Result { + cell_id: "1".to_string(), + content_items: vec![FunctionCallOutputContentItem::InputText { + text: "done".to_string(), + }], + stored_values: HashMap::new(), + error_text: None, + }) + ); + } + + #[tokio::test] + async fn execute_to_pending_returns_once_the_runtime_is_quiescent() { + let service = CodeModeService::new(); + + let response = tokio::time::timeout( + Duration::from_secs(1), + service.execute_to_pending(ExecuteRequest { + source: r#"text("before"); await new Promise(() => {});"#.to_string(), + yield_time_ms: Some(60_000), + ..execute_request("") + }), + ) + .await + .unwrap() + .unwrap(); + + assert_eq!( + response, + ExecuteToPendingOutcome::Pending { + cell_id: "1".to_string(), + content_items: vec![FunctionCallOutputContentItem::InputText { + text: "before".to_string(), + }], + pending_tool_call_ids: Vec::new(), + } + ); + + let termination = service + .wait(WaitRequest { + cell_id: "1".to_string(), + yield_time_ms: 1, + terminate: true, + }) + .await + .unwrap(); + + assert_eq!( + termination, + WaitOutcome::LiveCell(RuntimeResponse::Terminated { + cell_id: "1".to_string(), + content_items: Vec::new(), + }) + ); + } + + #[tokio::test] + async fn execute_to_pending_identifies_tool_calls_in_paused_frontier() { + let service = CodeModeService::new(); + + let response = service + .execute_to_pending(ExecuteRequest { + enabled_tools: vec![ToolDefinition { + name: "echo".to_string(), + tool_name: ToolName::plain("echo"), + description: String::new(), + kind: CodeModeToolKind::Function, + input_schema: None, + output_schema: None, + }], + source: r#" +await Promise.all([ + tools.echo({ value: "first" }), + tools.echo({ value: "second" }), +]); +"# + .to_string(), + yield_time_ms: Some(60_000), + ..execute_request("") + }) + .await + .unwrap(); + + assert_eq!( + response, + ExecuteToPendingOutcome::Pending { + cell_id: "1".to_string(), + content_items: Vec::new(), + pending_tool_call_ids: vec!["tool-1".to_string(), "tool-2".to_string()], + } + ); + + let termination = service + .wait(WaitRequest { + cell_id: "1".to_string(), + yield_time_ms: 1, + terminate: true, + }) + .await + .unwrap(); + + assert_eq!( + termination, + WaitOutcome::LiveCell(RuntimeResponse::Terminated { + cell_id: "1".to_string(), + content_items: Vec::new(), + }) + ); + } + + #[tokio::test] + async fn execute_to_pending_excludes_delayed_timeout_tool_calls_until_wait() { + let service = CodeModeService::new(); + + let initial_response = service + .execute_to_pending(ExecuteRequest { + enabled_tools: vec![ToolDefinition { + name: "echo".to_string(), + tool_name: ToolName::plain("echo"), + description: String::new(), + kind: CodeModeToolKind::Function, + input_schema: None, + output_schema: None, + }], + source: r#" +setTimeout(() => { + tools.echo({ value: "delayed" }); +}, 1000); +await Promise.all([ + tools.echo({ value: "second" }), + tools.echo({ value: "third" }), +]); +"# + .to_string(), + yield_time_ms: Some(60_000), + ..execute_request("") + }) + .await + .unwrap(); + + assert_eq!( + initial_response, + ExecuteToPendingOutcome::Pending { + cell_id: "1".to_string(), + content_items: Vec::new(), + pending_tool_call_ids: vec!["tool-1".to_string(), "tool-2".to_string()], + } + ); + + let runtime_tx = service + .inner + .sessions + .lock() + .await + .get("1") + .unwrap() + .runtime_tx + .clone(); + runtime_tx + .send(RuntimeCommand::TimeoutFired { id: 1 }) + .unwrap(); + + let resumed_response = tokio::time::timeout( + Duration::from_secs(1), + service.wait_to_pending(WaitToPendingRequest { + cell_id: "1".to_string(), + }), + ) + .await + .unwrap() + .unwrap(); + + assert_eq!( + resumed_response, + WaitToPendingOutcome::LiveCell(ExecuteToPendingOutcome::Pending { + cell_id: "1".to_string(), + content_items: Vec::new(), + pending_tool_call_ids: vec!["tool-3".to_string()], + }) + ); + + let termination = service + .wait(WaitRequest { + cell_id: "1".to_string(), + yield_time_ms: 1, + terminate: true, + }) + .await + .unwrap(); + + assert_eq!( + termination, + WaitOutcome::LiveCell(RuntimeResponse::Terminated { + cell_id: "1".to_string(), + content_items: Vec::new(), + }) + ); + } + + #[tokio::test] + async fn wait_to_pending_returns_after_resumed_runtime_becomes_quiescent_again() { + let service = CodeModeService::new(); + + let initial_response = service + .execute_to_pending(ExecuteRequest { + source: r#" +await new Promise((resolve) => setTimeout(resolve, 60_000)); +text("after"); +await new Promise(() => {}); +"# + .to_string(), + yield_time_ms: Some(60_000), + ..execute_request("") + }) + .await + .unwrap(); + + assert_eq!( + initial_response, + ExecuteToPendingOutcome::Pending { + cell_id: "1".to_string(), + content_items: Vec::new(), + pending_tool_call_ids: Vec::new(), + } + ); + + let runtime_tx = service + .inner + .sessions + .lock() + .await + .get("1") + .unwrap() + .runtime_tx + .clone(); + runtime_tx + .send(RuntimeCommand::TimeoutFired { id: 1 }) + .unwrap(); + + let resumed_response = tokio::time::timeout( + Duration::from_secs(1), + service.wait_to_pending(WaitToPendingRequest { + cell_id: "1".to_string(), + }), + ) + .await + .unwrap() + .unwrap(); + + assert_eq!( + resumed_response, + WaitToPendingOutcome::LiveCell(ExecuteToPendingOutcome::Pending { + cell_id: "1".to_string(), + content_items: vec![FunctionCallOutputContentItem::InputText { + text: "after".to_string(), + }], + pending_tool_call_ids: Vec::new(), + }) + ); + + let termination = service + .wait(WaitRequest { + cell_id: "1".to_string(), + yield_time_ms: 1, + terminate: true, + }) + .await + .unwrap(); + + assert_eq!( + termination, + WaitOutcome::LiveCell(RuntimeResponse::Terminated { + cell_id: "1".to_string(), + content_items: Vec::new(), + }) + ); + } + + #[tokio::test] + async fn wait_to_pending_returns_completed_after_resumed_runtime_finishes() { + let service = CodeModeService::new(); + + let initial_response = service + .execute_to_pending(ExecuteRequest { + source: r#" +await new Promise((resolve) => setTimeout(resolve, 60_000)); +text("done"); +"# + .to_string(), + yield_time_ms: Some(60_000), + ..execute_request("") + }) + .await + .unwrap(); + + assert_eq!( + initial_response, + ExecuteToPendingOutcome::Pending { + cell_id: "1".to_string(), + content_items: Vec::new(), + pending_tool_call_ids: Vec::new(), + } + ); + + let runtime_tx = service + .inner + .sessions + .lock() + .await + .get("1") + .unwrap() + .runtime_tx + .clone(); + runtime_tx + .send(RuntimeCommand::TimeoutFired { id: 1 }) + .unwrap(); + + let resumed_response = tokio::time::timeout( + Duration::from_secs(1), + service.wait_to_pending(WaitToPendingRequest { + cell_id: "1".to_string(), + }), + ) + .await + .unwrap() + .unwrap(); + + assert_eq!( + resumed_response, + WaitToPendingOutcome::LiveCell(ExecuteToPendingOutcome::Completed( + RuntimeResponse::Result { + cell_id: "1".to_string(), + content_items: vec![FunctionCallOutputContentItem::InputText { + text: "done".to_string(), + }], + stored_values: HashMap::new(), + error_text: None, + } + )) + ); + } + #[tokio::test] async fn v8_console_is_not_exposed_on_global_this() { let service = CodeModeService::new(); @@ -898,13 +1442,14 @@ image({ let (control_tx, control_rx) = mpsc::unbounded_channel(); let (initial_response_tx, initial_response_rx) = oneshot::channel(); let (runtime_event_tx, _runtime_event_rx) = mpsc::unbounded_channel(); - let (runtime_tx, runtime_terminate_handle) = spawn_runtime( + let (runtime_tx, runtime_control_tx, runtime_terminate_handle) = spawn_runtime( ExecuteRequest { source: "await new Promise(() => {})".to_string(), yield_time_ms: None, ..execute_request("") }, runtime_event_tx, + PendingRuntimeMode::Continue, ) .unwrap(); @@ -913,12 +1458,14 @@ image({ SessionControlContext { cell_id: "cell-1".to_string(), runtime_tx: runtime_tx.clone(), + runtime_control_tx, + pending_mode: PendingRuntimeMode::Continue, runtime_terminate_handle, }, event_rx, control_rx, - initial_response_tx, - /*initial_yield_time_ms*/ 60_000, + SessionResponseSender::Runtime(initial_response_tx), + Some(/*initial_yield_time_ms*/ 60_000), )); event_tx.send(RuntimeEvent::Started).unwrap();