mirror of
https://github.com/openai/codex.git
synced 2026-05-17 01:32:32 +00:00
code-mode: Add pending-aware code mode execution (#22280)
Introduce execute_to_pending and wait_to_pending APIs that freeze pending-mode runtimes until an explicit resume, while preserving the existing continuously-running execute path. Add runtime and service coverage for pending, resume, completion, and freeze behavior.
This commit is contained in:
@@ -23,9 +23,12 @@ pub use runtime::DEFAULT_EXEC_YIELD_TIME_MS;
|
||||
pub use runtime::DEFAULT_MAX_OUTPUT_TOKENS_PER_EXEC_CALL;
|
||||
pub use runtime::DEFAULT_WAIT_YIELD_TIME_MS;
|
||||
pub use runtime::ExecuteRequest;
|
||||
pub use runtime::ExecuteToPendingOutcome;
|
||||
pub use runtime::RuntimeResponse;
|
||||
pub use runtime::WaitOutcome;
|
||||
pub use runtime::WaitRequest;
|
||||
pub use runtime::WaitToPendingOutcome;
|
||||
pub use runtime::WaitToPendingRequest;
|
||||
pub use service::CodeModeService;
|
||||
pub use service::CodeModeTurnHost;
|
||||
pub use service::CodeModeTurnWorker;
|
||||
|
||||
@@ -47,6 +47,11 @@ pub struct WaitRequest {
|
||||
pub terminate: bool,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct WaitToPendingRequest {
|
||||
pub cell_id: String,
|
||||
}
|
||||
|
||||
/// Result of waiting on a code-mode cell.
|
||||
///
|
||||
/// The wrapped `RuntimeResponse` is the model-facing wait result. The enum
|
||||
@@ -62,6 +67,34 @@ pub enum WaitOutcome {
|
||||
MissingCell(RuntimeResponse),
|
||||
}
|
||||
|
||||
/// Result of executing a code-mode cell until it either completes or reaches a
|
||||
/// quiescent pending state.
|
||||
#[derive(Debug, PartialEq)]
|
||||
pub enum ExecuteToPendingOutcome {
|
||||
/// The cell is waiting for more runtime input after draining the runtime
|
||||
/// input queue that was ready at the pending boundary.
|
||||
Pending {
|
||||
cell_id: String,
|
||||
content_items: Vec<FunctionCallOutputContentItem>,
|
||||
/// Runtime tool-call ids emitted before this paused execution frontier
|
||||
/// sealed. Hosts can use these ids to drain their tool-call transport
|
||||
/// before surfacing the pending boundary to callers.
|
||||
pending_tool_call_ids: Vec<String>,
|
||||
},
|
||||
/// The cell reached a terminal runtime response before going pending.
|
||||
Completed(RuntimeResponse),
|
||||
}
|
||||
|
||||
/// Result of resuming a live code-mode cell until it completes or becomes
|
||||
/// quiescent again.
|
||||
#[derive(Debug, PartialEq)]
|
||||
pub enum WaitToPendingOutcome {
|
||||
/// The requested code cell was live when the wait command was accepted.
|
||||
LiveCell(ExecuteToPendingOutcome),
|
||||
/// The requested code cell was not live.
|
||||
MissingCell(RuntimeResponse),
|
||||
}
|
||||
|
||||
impl From<WaitOutcome> for RuntimeResponse {
|
||||
fn from(outcome: WaitOutcome) -> Self {
|
||||
match outcome {
|
||||
@@ -120,9 +153,22 @@ pub(crate) enum RuntimeCommand {
|
||||
Terminate,
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Debug, PartialEq)]
|
||||
pub(crate) enum PendingRuntimeMode {
|
||||
Continue,
|
||||
PauseUntilResumed,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) enum RuntimeControlCommand {
|
||||
Resume,
|
||||
Terminate,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) enum RuntimeEvent {
|
||||
Started,
|
||||
Pending,
|
||||
ContentItem(FunctionCallOutputContentItem),
|
||||
YieldRequested,
|
||||
ToolCall {
|
||||
@@ -144,10 +190,19 @@ pub(crate) enum RuntimeEvent {
|
||||
pub(crate) fn spawn_runtime(
|
||||
request: ExecuteRequest,
|
||||
event_tx: mpsc::UnboundedSender<RuntimeEvent>,
|
||||
) -> Result<(std_mpsc::Sender<RuntimeCommand>, v8::IsolateHandle), String> {
|
||||
pending_mode: PendingRuntimeMode,
|
||||
) -> Result<
|
||||
(
|
||||
std_mpsc::Sender<RuntimeCommand>,
|
||||
std_mpsc::Sender<RuntimeControlCommand>,
|
||||
v8::IsolateHandle,
|
||||
),
|
||||
String,
|
||||
> {
|
||||
initialize_v8()?;
|
||||
|
||||
let (command_tx, command_rx) = std_mpsc::channel();
|
||||
let (control_tx, control_rx) = std_mpsc::channel();
|
||||
let runtime_command_tx = command_tx.clone();
|
||||
let (isolate_handle_tx, isolate_handle_rx) = std_mpsc::sync_channel(1);
|
||||
let enabled_tools = request
|
||||
@@ -167,6 +222,8 @@ pub(crate) fn spawn_runtime(
|
||||
config,
|
||||
event_tx,
|
||||
command_rx,
|
||||
control_rx,
|
||||
pending_mode,
|
||||
isolate_handle_tx,
|
||||
runtime_command_tx,
|
||||
);
|
||||
@@ -175,7 +232,7 @@ pub(crate) fn spawn_runtime(
|
||||
let isolate_handle = isolate_handle_rx
|
||||
.recv()
|
||||
.map_err(|_| "failed to initialize code mode runtime".to_string())?;
|
||||
Ok((command_tx, isolate_handle))
|
||||
Ok((command_tx, control_tx, isolate_handle))
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
@@ -227,6 +284,8 @@ fn run_runtime(
|
||||
config: RuntimeConfig,
|
||||
event_tx: mpsc::UnboundedSender<RuntimeEvent>,
|
||||
command_rx: std_mpsc::Receiver<RuntimeCommand>,
|
||||
control_rx: std_mpsc::Receiver<RuntimeControlCommand>,
|
||||
pending_mode: PendingRuntimeMode,
|
||||
isolate_handle_tx: std_mpsc::SyncSender<v8::IsolateHandle>,
|
||||
runtime_command_tx: std_mpsc::Sender<RuntimeCommand>,
|
||||
) {
|
||||
@@ -282,7 +341,8 @@ fn run_runtime(
|
||||
|
||||
let mut pending_promise = pending_promise;
|
||||
loop {
|
||||
let Ok(command) = command_rx.recv() else {
|
||||
let Some(command) = next_runtime_command(&event_tx, &command_rx, &control_rx, pending_mode)
|
||||
else {
|
||||
break;
|
||||
};
|
||||
|
||||
@@ -333,6 +393,30 @@ fn run_runtime(
|
||||
}
|
||||
}
|
||||
|
||||
fn next_runtime_command(
|
||||
event_tx: &mpsc::UnboundedSender<RuntimeEvent>,
|
||||
command_rx: &std_mpsc::Receiver<RuntimeCommand>,
|
||||
control_rx: &std_mpsc::Receiver<RuntimeControlCommand>,
|
||||
pending_mode: PendingRuntimeMode,
|
||||
) -> Option<RuntimeCommand> {
|
||||
loop {
|
||||
match command_rx.try_recv() {
|
||||
Ok(command) => return Some(command),
|
||||
Err(std_mpsc::TryRecvError::Disconnected) => return None,
|
||||
Err(std_mpsc::TryRecvError::Empty) => {}
|
||||
}
|
||||
|
||||
let _ = event_tx.send(RuntimeEvent::Pending);
|
||||
match pending_mode {
|
||||
PendingRuntimeMode::Continue => return command_rx.recv().ok(),
|
||||
PendingRuntimeMode::PauseUntilResumed => match control_rx.recv().ok()? {
|
||||
RuntimeControlCommand::Resume => continue,
|
||||
RuntimeControlCommand::Terminate => return Some(RuntimeCommand::Terminate),
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn capture_scope_send_error(
|
||||
scope: &mut v8::PinScope<'_, '_>,
|
||||
event_tx: &mpsc::UnboundedSender<RuntimeEvent>,
|
||||
@@ -366,8 +450,12 @@ mod tests {
|
||||
use tokio::sync::mpsc;
|
||||
|
||||
use super::ExecuteRequest;
|
||||
use super::PendingRuntimeMode;
|
||||
use super::RuntimeCommand;
|
||||
use super::RuntimeControlCommand;
|
||||
use super::RuntimeEvent;
|
||||
use super::spawn_runtime;
|
||||
use crate::FunctionCallOutputContentItem;
|
||||
|
||||
fn execute_request(source: &str) -> ExecuteRequest {
|
||||
ExecuteRequest {
|
||||
@@ -384,8 +472,12 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn terminate_execution_stops_cpu_bound_module() {
|
||||
let (event_tx, mut event_rx) = mpsc::unbounded_channel();
|
||||
let (_runtime_tx, runtime_terminate_handle) =
|
||||
spawn_runtime(execute_request("while (true) {}"), event_tx).unwrap();
|
||||
let (_runtime_tx, _runtime_control_tx, runtime_terminate_handle) = spawn_runtime(
|
||||
execute_request("while (true) {}"),
|
||||
event_tx,
|
||||
PendingRuntimeMode::Continue,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let started_event = tokio::time::timeout(Duration::from_secs(1), event_rx.recv())
|
||||
.await
|
||||
@@ -416,4 +508,71 @@ mod tests {
|
||||
.is_none()
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn pending_mode_freezes_runtime_commands_until_resume() {
|
||||
let (event_tx, mut event_rx) = mpsc::unbounded_channel();
|
||||
let (runtime_tx, runtime_control_tx, _runtime_terminate_handle) = spawn_runtime(
|
||||
execute_request(
|
||||
r#"
|
||||
await new Promise((resolve) => setTimeout(resolve, 60_000));
|
||||
text("after");
|
||||
await new Promise(() => {});
|
||||
"#,
|
||||
),
|
||||
event_tx,
|
||||
PendingRuntimeMode::PauseUntilResumed,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
assert!(matches!(
|
||||
tokio::time::timeout(Duration::from_secs(1), event_rx.recv())
|
||||
.await
|
||||
.unwrap()
|
||||
.unwrap(),
|
||||
RuntimeEvent::Started
|
||||
));
|
||||
assert!(matches!(
|
||||
tokio::time::timeout(Duration::from_secs(1), event_rx.recv())
|
||||
.await
|
||||
.unwrap()
|
||||
.unwrap(),
|
||||
RuntimeEvent::Pending
|
||||
));
|
||||
|
||||
runtime_tx
|
||||
.send(RuntimeCommand::TimeoutFired { id: 1 })
|
||||
.unwrap();
|
||||
assert!(
|
||||
tokio::time::timeout(Duration::from_millis(100), event_rx.recv())
|
||||
.await
|
||||
.is_err()
|
||||
);
|
||||
|
||||
runtime_control_tx
|
||||
.send(RuntimeControlCommand::Resume)
|
||||
.unwrap();
|
||||
|
||||
let content_event = tokio::time::timeout(Duration::from_secs(1), event_rx.recv())
|
||||
.await
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
let RuntimeEvent::ContentItem(FunctionCallOutputContentItem::InputText { text }) =
|
||||
content_event
|
||||
else {
|
||||
panic!("expected resumed runtime output");
|
||||
};
|
||||
assert_eq!(text, "after");
|
||||
assert!(matches!(
|
||||
tokio::time::timeout(Duration::from_secs(1), event_rx.recv())
|
||||
.await
|
||||
.unwrap()
|
||||
.unwrap(),
|
||||
RuntimeEvent::Pending
|
||||
));
|
||||
|
||||
runtime_control_tx
|
||||
.send(RuntimeControlCommand::Terminate)
|
||||
.unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -16,12 +16,17 @@ use crate::FunctionCallOutputContentItem;
|
||||
use crate::runtime::CodeModeNestedToolCall;
|
||||
use crate::runtime::DEFAULT_EXEC_YIELD_TIME_MS;
|
||||
use crate::runtime::ExecuteRequest;
|
||||
use crate::runtime::ExecuteToPendingOutcome;
|
||||
use crate::runtime::PendingRuntimeMode;
|
||||
use crate::runtime::RuntimeCommand;
|
||||
use crate::runtime::RuntimeControlCommand;
|
||||
use crate::runtime::RuntimeEvent;
|
||||
use crate::runtime::RuntimeResponse;
|
||||
use crate::runtime::TurnMessage;
|
||||
use crate::runtime::WaitOutcome;
|
||||
use crate::runtime::WaitRequest;
|
||||
use crate::runtime::WaitToPendingOutcome;
|
||||
use crate::runtime::WaitToPendingRequest;
|
||||
use crate::runtime::spawn_runtime;
|
||||
|
||||
#[async_trait]
|
||||
@@ -90,18 +95,57 @@ impl CodeModeService {
|
||||
}
|
||||
|
||||
pub async fn execute(&self, request: ExecuteRequest) -> Result<RuntimeResponse, String> {
|
||||
let cell_id = request.cell_id.clone();
|
||||
let initial_yield_time_ms = request.yield_time_ms.unwrap_or(DEFAULT_EXEC_YIELD_TIME_MS);
|
||||
let (response_tx, response_rx) = oneshot::channel();
|
||||
self.start_session(
|
||||
request,
|
||||
SessionResponseSender::Runtime(response_tx),
|
||||
Some(initial_yield_time_ms),
|
||||
PendingRuntimeMode::Continue,
|
||||
)
|
||||
.await?;
|
||||
|
||||
response_rx
|
||||
.await
|
||||
.map_err(|_| "exec runtime ended unexpectedly".to_string())
|
||||
}
|
||||
|
||||
pub async fn execute_to_pending(
|
||||
&self,
|
||||
request: ExecuteRequest,
|
||||
) -> Result<ExecuteToPendingOutcome, String> {
|
||||
let (response_tx, response_rx) = oneshot::channel();
|
||||
self.start_session(
|
||||
request,
|
||||
SessionResponseSender::ExecuteToPending(response_tx),
|
||||
/*initial_yield_time_ms*/ None,
|
||||
PendingRuntimeMode::PauseUntilResumed,
|
||||
)
|
||||
.await?;
|
||||
|
||||
response_rx
|
||||
.await
|
||||
.map_err(|_| "exec runtime ended unexpectedly".to_string())
|
||||
}
|
||||
|
||||
async fn start_session(
|
||||
&self,
|
||||
request: ExecuteRequest,
|
||||
initial_response_tx: SessionResponseSender,
|
||||
initial_yield_time_ms: Option<u64>,
|
||||
pending_mode: PendingRuntimeMode,
|
||||
) -> Result<(), String> {
|
||||
let cell_id = request.cell_id.clone();
|
||||
let (event_tx, event_rx) = mpsc::unbounded_channel();
|
||||
let (control_tx, control_rx) = mpsc::unbounded_channel();
|
||||
let (response_tx, response_rx) = oneshot::channel();
|
||||
let (runtime_tx, runtime_terminate_handle) = {
|
||||
let (runtime_tx, runtime_control_tx, runtime_terminate_handle) = {
|
||||
let mut sessions = self.inner.sessions.lock().await;
|
||||
if sessions.contains_key(&cell_id) {
|
||||
return Err(format!("exec cell {cell_id} already exists"));
|
||||
}
|
||||
|
||||
let (runtime_tx, runtime_terminate_handle) = spawn_runtime(request, event_tx)?;
|
||||
let (runtime_tx, runtime_control_tx, runtime_terminate_handle) =
|
||||
spawn_runtime(request, event_tx, pending_mode)?;
|
||||
|
||||
// Keep the session registry locked through insertion so a
|
||||
// caller-owned cell id cannot race with another execute and replace
|
||||
@@ -113,7 +157,7 @@ impl CodeModeService {
|
||||
runtime_tx: runtime_tx.clone(),
|
||||
},
|
||||
);
|
||||
(runtime_tx, runtime_terminate_handle)
|
||||
(runtime_tx, runtime_control_tx, runtime_terminate_handle)
|
||||
};
|
||||
|
||||
tokio::spawn(run_session_control(
|
||||
@@ -121,17 +165,17 @@ impl CodeModeService {
|
||||
SessionControlContext {
|
||||
cell_id: cell_id.clone(),
|
||||
runtime_tx,
|
||||
runtime_control_tx,
|
||||
pending_mode,
|
||||
runtime_terminate_handle,
|
||||
},
|
||||
event_rx,
|
||||
control_rx,
|
||||
response_tx,
|
||||
initial_response_tx,
|
||||
initial_yield_time_ms,
|
||||
));
|
||||
|
||||
response_rx
|
||||
.await
|
||||
.map_err(|_| "exec runtime ended unexpectedly".to_string())
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn wait(&self, request: WaitRequest) -> Result<WaitOutcome, String> {
|
||||
@@ -166,6 +210,41 @@ impl CodeModeService {
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn wait_to_pending(
|
||||
&self,
|
||||
request: WaitToPendingRequest,
|
||||
) -> Result<WaitToPendingOutcome, String> {
|
||||
let cell_id = request.cell_id.clone();
|
||||
let handle = self
|
||||
.inner
|
||||
.sessions
|
||||
.lock()
|
||||
.await
|
||||
.get(&request.cell_id)
|
||||
.cloned();
|
||||
let Some(handle) = handle else {
|
||||
return Ok(WaitToPendingOutcome::MissingCell(missing_cell_response(
|
||||
cell_id,
|
||||
)));
|
||||
};
|
||||
let (response_tx, response_rx) = oneshot::channel();
|
||||
if handle
|
||||
.control_tx
|
||||
.send(SessionControlCommand::PollToPending { response_tx })
|
||||
.is_err()
|
||||
{
|
||||
return Ok(WaitToPendingOutcome::MissingCell(missing_cell_response(
|
||||
cell_id,
|
||||
)));
|
||||
}
|
||||
match response_rx.await {
|
||||
Ok(response) => Ok(WaitToPendingOutcome::LiveCell(response)),
|
||||
Err(_) => Ok(WaitToPendingOutcome::MissingCell(missing_cell_response(
|
||||
request.cell_id,
|
||||
))),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn start_turn_worker(&self, host: Arc<dyn CodeModeTurnHost>) -> CodeModeTurnWorker {
|
||||
let (shutdown_tx, mut shutdown_rx) = oneshot::channel();
|
||||
let inner = Arc::clone(&self.inner);
|
||||
@@ -255,11 +334,19 @@ enum SessionControlCommand {
|
||||
yield_time_ms: u64,
|
||||
response_tx: oneshot::Sender<RuntimeResponse>,
|
||||
},
|
||||
PollToPending {
|
||||
response_tx: oneshot::Sender<ExecuteToPendingOutcome>,
|
||||
},
|
||||
Terminate {
|
||||
response_tx: oneshot::Sender<RuntimeResponse>,
|
||||
},
|
||||
}
|
||||
|
||||
enum SessionResponseSender {
|
||||
Runtime(oneshot::Sender<RuntimeResponse>),
|
||||
ExecuteToPending(oneshot::Sender<ExecuteToPendingOutcome>),
|
||||
}
|
||||
|
||||
struct PendingResult {
|
||||
content_items: Vec<FunctionCallOutputContentItem>,
|
||||
stored_values: HashMap<String, JsonValue>,
|
||||
@@ -269,6 +356,8 @@ struct PendingResult {
|
||||
struct SessionControlContext {
|
||||
cell_id: String,
|
||||
runtime_tx: std::sync::mpsc::Sender<RuntimeCommand>,
|
||||
runtime_control_tx: std::sync::mpsc::Sender<RuntimeControlCommand>,
|
||||
pending_mode: PendingRuntimeMode,
|
||||
runtime_terminate_handle: v8::IsolateHandle,
|
||||
}
|
||||
|
||||
@@ -290,14 +379,26 @@ fn pending_result_response(cell_id: &str, result: PendingResult) -> RuntimeRespo
|
||||
}
|
||||
}
|
||||
|
||||
fn send_terminal_response(response_tx: SessionResponseSender, response: RuntimeResponse) {
|
||||
match response_tx {
|
||||
SessionResponseSender::Runtime(response_tx) => {
|
||||
let _ = response_tx.send(response);
|
||||
}
|
||||
SessionResponseSender::ExecuteToPending(response_tx) => {
|
||||
let _ = response_tx.send(ExecuteToPendingOutcome::Completed(response));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn send_or_buffer_result(
|
||||
cell_id: &str,
|
||||
result: PendingResult,
|
||||
response_tx: &mut Option<oneshot::Sender<RuntimeResponse>>,
|
||||
response_tx: &mut Option<SessionResponseSender>,
|
||||
pending_result: &mut Option<PendingResult>,
|
||||
) -> bool {
|
||||
if let Some(response_tx) = response_tx.take() {
|
||||
let _ = response_tx.send(pending_result_response(cell_id, result));
|
||||
let response = pending_result_response(cell_id, result);
|
||||
send_terminal_response(response_tx, response);
|
||||
return true;
|
||||
}
|
||||
|
||||
@@ -305,20 +406,46 @@ fn send_or_buffer_result(
|
||||
false
|
||||
}
|
||||
|
||||
fn send_yield_response(
|
||||
cell_id: &str,
|
||||
content_items: &mut Vec<FunctionCallOutputContentItem>,
|
||||
response_tx: &mut Option<SessionResponseSender>,
|
||||
) {
|
||||
let Some(current_response_tx) = response_tx.take() else {
|
||||
return;
|
||||
};
|
||||
match current_response_tx {
|
||||
SessionResponseSender::Runtime(response_tx) => {
|
||||
let _ = response_tx.send(RuntimeResponse::Yielded {
|
||||
cell_id: cell_id.to_string(),
|
||||
content_items: std::mem::take(content_items),
|
||||
});
|
||||
}
|
||||
SessionResponseSender::ExecuteToPending(execute_to_pending_tx) => {
|
||||
*response_tx = Some(SessionResponseSender::ExecuteToPending(
|
||||
execute_to_pending_tx,
|
||||
));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn run_session_control(
|
||||
inner: Arc<Inner>,
|
||||
context: SessionControlContext,
|
||||
mut event_rx: mpsc::UnboundedReceiver<RuntimeEvent>,
|
||||
mut control_rx: mpsc::UnboundedReceiver<SessionControlCommand>,
|
||||
initial_response_tx: oneshot::Sender<RuntimeResponse>,
|
||||
initial_yield_time_ms: u64,
|
||||
initial_response_tx: SessionResponseSender,
|
||||
initial_yield_time_ms: Option<u64>,
|
||||
) {
|
||||
let SessionControlContext {
|
||||
cell_id,
|
||||
runtime_tx,
|
||||
runtime_control_tx,
|
||||
pending_mode,
|
||||
runtime_terminate_handle,
|
||||
} = context;
|
||||
let mut content_items = Vec::new();
|
||||
let mut pending_tool_call_ids = Vec::new();
|
||||
let mut pending_result: Option<PendingResult> = None;
|
||||
let mut response_tx = Some(initial_response_tx);
|
||||
let mut termination_requested = false;
|
||||
@@ -338,10 +465,11 @@ async fn run_session_control(
|
||||
runtime_closed = true;
|
||||
if termination_requested {
|
||||
if let Some(response_tx) = response_tx.take() {
|
||||
let _ = response_tx.send(RuntimeResponse::Terminated {
|
||||
let response = RuntimeResponse::Terminated {
|
||||
cell_id: cell_id.clone(),
|
||||
content_items: std::mem::take(&mut content_items),
|
||||
});
|
||||
};
|
||||
send_terminal_response(response_tx, response);
|
||||
}
|
||||
break;
|
||||
}
|
||||
@@ -364,19 +492,35 @@ async fn run_session_control(
|
||||
};
|
||||
match event {
|
||||
RuntimeEvent::Started => {
|
||||
yield_timer = Some(Box::pin(tokio::time::sleep(Duration::from_millis(initial_yield_time_ms))));
|
||||
yield_timer = initial_yield_time_ms.map(|initial_yield_time_ms| {
|
||||
Box::pin(tokio::time::sleep(Duration::from_millis(initial_yield_time_ms)))
|
||||
});
|
||||
}
|
||||
RuntimeEvent::Pending => {
|
||||
if let Some(current_response_tx) = response_tx.take() {
|
||||
match current_response_tx {
|
||||
SessionResponseSender::Runtime(runtime_response_tx) => {
|
||||
response_tx =
|
||||
Some(SessionResponseSender::Runtime(runtime_response_tx));
|
||||
}
|
||||
SessionResponseSender::ExecuteToPending(response_tx) => {
|
||||
let _ = response_tx.send(ExecuteToPendingOutcome::Pending {
|
||||
cell_id: cell_id.clone(),
|
||||
content_items: std::mem::take(&mut content_items),
|
||||
pending_tool_call_ids: std::mem::take(
|
||||
&mut pending_tool_call_ids,
|
||||
),
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
RuntimeEvent::ContentItem(item) => {
|
||||
content_items.push(item);
|
||||
}
|
||||
RuntimeEvent::YieldRequested => {
|
||||
yield_timer = None;
|
||||
if let Some(response_tx) = response_tx.take() {
|
||||
let _ = response_tx.send(RuntimeResponse::Yielded {
|
||||
cell_id: cell_id.clone(),
|
||||
content_items: std::mem::take(&mut content_items),
|
||||
});
|
||||
}
|
||||
send_yield_response(&cell_id, &mut content_items, &mut response_tx);
|
||||
}
|
||||
RuntimeEvent::Notify { call_id, text } => {
|
||||
let _ = inner.turn_message_tx.send(TurnMessage::Notify {
|
||||
@@ -391,6 +535,9 @@ async fn run_session_control(
|
||||
kind,
|
||||
input,
|
||||
} => {
|
||||
if pending_mode == PendingRuntimeMode::PauseUntilResumed {
|
||||
pending_tool_call_ids.push(id.clone());
|
||||
}
|
||||
let tool_call = CodeModeNestedToolCall {
|
||||
cell_id: cell_id.clone(),
|
||||
runtime_tool_call_id: id,
|
||||
@@ -410,10 +557,11 @@ async fn run_session_control(
|
||||
yield_timer = None;
|
||||
if termination_requested {
|
||||
if let Some(response_tx) = response_tx.take() {
|
||||
let _ = response_tx.send(RuntimeResponse::Terminated {
|
||||
let response = RuntimeResponse::Terminated {
|
||||
cell_id: cell_id.clone(),
|
||||
content_items: std::mem::take(&mut content_items),
|
||||
});
|
||||
};
|
||||
send_terminal_response(response_tx, response);
|
||||
}
|
||||
break;
|
||||
}
|
||||
@@ -446,8 +594,23 @@ async fn run_session_control(
|
||||
let _ = next_response_tx.send(pending_result_response(&cell_id, result));
|
||||
break;
|
||||
}
|
||||
response_tx = Some(next_response_tx);
|
||||
response_tx = Some(SessionResponseSender::Runtime(next_response_tx));
|
||||
yield_timer = Some(Box::pin(tokio::time::sleep(Duration::from_millis(yield_time_ms))));
|
||||
resume_paused_runtime(&runtime_control_tx, pending_mode);
|
||||
}
|
||||
SessionControlCommand::PollToPending {
|
||||
response_tx: next_response_tx,
|
||||
} => {
|
||||
if let Some(result) = pending_result.take() {
|
||||
let response = pending_result_response(&cell_id, result);
|
||||
let _ = next_response_tx
|
||||
.send(ExecuteToPendingOutcome::Completed(response));
|
||||
break;
|
||||
}
|
||||
response_tx =
|
||||
Some(SessionResponseSender::ExecuteToPending(next_response_tx));
|
||||
yield_timer = None;
|
||||
resume_paused_runtime(&runtime_control_tx, pending_mode);
|
||||
}
|
||||
SessionControlCommand::Terminate { response_tx: next_response_tx } => {
|
||||
if let Some(result) = pending_result.take() {
|
||||
@@ -455,17 +618,19 @@ async fn run_session_control(
|
||||
break;
|
||||
}
|
||||
|
||||
response_tx = Some(next_response_tx);
|
||||
response_tx = Some(SessionResponseSender::Runtime(next_response_tx));
|
||||
termination_requested = true;
|
||||
yield_timer = None;
|
||||
let _ = runtime_tx.send(RuntimeCommand::Terminate);
|
||||
terminate_paused_runtime(&runtime_control_tx, pending_mode);
|
||||
let _ = runtime_terminate_handle.terminate_execution();
|
||||
if runtime_closed {
|
||||
if let Some(response_tx) = response_tx.take() {
|
||||
let _ = response_tx.send(RuntimeResponse::Terminated {
|
||||
let response = RuntimeResponse::Terminated {
|
||||
cell_id: cell_id.clone(),
|
||||
content_items: std::mem::take(&mut content_items),
|
||||
});
|
||||
};
|
||||
send_terminal_response(response_tx, response);
|
||||
}
|
||||
break;
|
||||
} else {
|
||||
@@ -482,20 +647,34 @@ async fn run_session_control(
|
||||
}
|
||||
} => {
|
||||
yield_timer = None;
|
||||
if let Some(response_tx) = response_tx.take() {
|
||||
let _ = response_tx.send(RuntimeResponse::Yielded {
|
||||
cell_id: cell_id.clone(),
|
||||
content_items: std::mem::take(&mut content_items),
|
||||
});
|
||||
}
|
||||
send_yield_response(&cell_id, &mut content_items, &mut response_tx);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let _ = runtime_tx.send(RuntimeCommand::Terminate);
|
||||
terminate_paused_runtime(&runtime_control_tx, pending_mode);
|
||||
inner.sessions.lock().await.remove(&cell_id);
|
||||
}
|
||||
|
||||
fn resume_paused_runtime(
|
||||
runtime_control_tx: &std::sync::mpsc::Sender<RuntimeControlCommand>,
|
||||
pending_mode: PendingRuntimeMode,
|
||||
) {
|
||||
if pending_mode == PendingRuntimeMode::PauseUntilResumed {
|
||||
let _ = runtime_control_tx.send(RuntimeControlCommand::Resume);
|
||||
}
|
||||
}
|
||||
|
||||
fn terminate_paused_runtime(
|
||||
runtime_control_tx: &std::sync::mpsc::Sender<RuntimeControlCommand>,
|
||||
pending_mode: PendingRuntimeMode,
|
||||
) {
|
||||
if pending_mode == PendingRuntimeMode::PauseUntilResumed {
|
||||
let _ = runtime_control_tx.send(RuntimeControlCommand::Terminate);
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::collections::HashMap;
|
||||
@@ -503,6 +682,7 @@ mod tests {
|
||||
use std::sync::atomic::AtomicU64;
|
||||
use std::time::Duration;
|
||||
|
||||
use codex_protocol::ToolName;
|
||||
use pretty_assertions::assert_eq;
|
||||
use tokio::sync::Mutex;
|
||||
use tokio::sync::mpsc;
|
||||
@@ -510,15 +690,22 @@ mod tests {
|
||||
|
||||
use super::CodeModeService;
|
||||
use super::Inner;
|
||||
use super::PendingRuntimeMode;
|
||||
use super::RuntimeCommand;
|
||||
use super::RuntimeResponse;
|
||||
use super::SessionControlCommand;
|
||||
use super::SessionControlContext;
|
||||
use super::SessionResponseSender;
|
||||
use super::WaitOutcome;
|
||||
use super::WaitRequest;
|
||||
use super::WaitToPendingOutcome;
|
||||
use super::WaitToPendingRequest;
|
||||
use super::run_session_control;
|
||||
use crate::CodeModeToolKind;
|
||||
use crate::FunctionCallOutputContentItem;
|
||||
use crate::ToolDefinition;
|
||||
use crate::runtime::ExecuteRequest;
|
||||
use crate::runtime::ExecuteToPendingOutcome;
|
||||
use crate::runtime::RuntimeEvent;
|
||||
use crate::runtime::spawn_runtime;
|
||||
|
||||
@@ -571,6 +758,363 @@ mod tests {
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn execute_to_pending_returns_completed_for_synchronous_results() {
|
||||
let service = CodeModeService::new();
|
||||
|
||||
let response = service
|
||||
.execute_to_pending(ExecuteRequest {
|
||||
source: r#"text("done");"#.to_string(),
|
||||
yield_time_ms: Some(60_000),
|
||||
..execute_request("")
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(
|
||||
response,
|
||||
ExecuteToPendingOutcome::Completed(RuntimeResponse::Result {
|
||||
cell_id: "1".to_string(),
|
||||
content_items: vec![FunctionCallOutputContentItem::InputText {
|
||||
text: "done".to_string(),
|
||||
}],
|
||||
stored_values: HashMap::new(),
|
||||
error_text: None,
|
||||
})
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn execute_to_pending_returns_once_the_runtime_is_quiescent() {
|
||||
let service = CodeModeService::new();
|
||||
|
||||
let response = tokio::time::timeout(
|
||||
Duration::from_secs(1),
|
||||
service.execute_to_pending(ExecuteRequest {
|
||||
source: r#"text("before"); await new Promise(() => {});"#.to_string(),
|
||||
yield_time_ms: Some(60_000),
|
||||
..execute_request("")
|
||||
}),
|
||||
)
|
||||
.await
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(
|
||||
response,
|
||||
ExecuteToPendingOutcome::Pending {
|
||||
cell_id: "1".to_string(),
|
||||
content_items: vec![FunctionCallOutputContentItem::InputText {
|
||||
text: "before".to_string(),
|
||||
}],
|
||||
pending_tool_call_ids: Vec::new(),
|
||||
}
|
||||
);
|
||||
|
||||
let termination = service
|
||||
.wait(WaitRequest {
|
||||
cell_id: "1".to_string(),
|
||||
yield_time_ms: 1,
|
||||
terminate: true,
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(
|
||||
termination,
|
||||
WaitOutcome::LiveCell(RuntimeResponse::Terminated {
|
||||
cell_id: "1".to_string(),
|
||||
content_items: Vec::new(),
|
||||
})
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn execute_to_pending_identifies_tool_calls_in_paused_frontier() {
|
||||
let service = CodeModeService::new();
|
||||
|
||||
let response = service
|
||||
.execute_to_pending(ExecuteRequest {
|
||||
enabled_tools: vec![ToolDefinition {
|
||||
name: "echo".to_string(),
|
||||
tool_name: ToolName::plain("echo"),
|
||||
description: String::new(),
|
||||
kind: CodeModeToolKind::Function,
|
||||
input_schema: None,
|
||||
output_schema: None,
|
||||
}],
|
||||
source: r#"
|
||||
await Promise.all([
|
||||
tools.echo({ value: "first" }),
|
||||
tools.echo({ value: "second" }),
|
||||
]);
|
||||
"#
|
||||
.to_string(),
|
||||
yield_time_ms: Some(60_000),
|
||||
..execute_request("")
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(
|
||||
response,
|
||||
ExecuteToPendingOutcome::Pending {
|
||||
cell_id: "1".to_string(),
|
||||
content_items: Vec::new(),
|
||||
pending_tool_call_ids: vec!["tool-1".to_string(), "tool-2".to_string()],
|
||||
}
|
||||
);
|
||||
|
||||
let termination = service
|
||||
.wait(WaitRequest {
|
||||
cell_id: "1".to_string(),
|
||||
yield_time_ms: 1,
|
||||
terminate: true,
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(
|
||||
termination,
|
||||
WaitOutcome::LiveCell(RuntimeResponse::Terminated {
|
||||
cell_id: "1".to_string(),
|
||||
content_items: Vec::new(),
|
||||
})
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn execute_to_pending_excludes_delayed_timeout_tool_calls_until_wait() {
|
||||
let service = CodeModeService::new();
|
||||
|
||||
let initial_response = service
|
||||
.execute_to_pending(ExecuteRequest {
|
||||
enabled_tools: vec![ToolDefinition {
|
||||
name: "echo".to_string(),
|
||||
tool_name: ToolName::plain("echo"),
|
||||
description: String::new(),
|
||||
kind: CodeModeToolKind::Function,
|
||||
input_schema: None,
|
||||
output_schema: None,
|
||||
}],
|
||||
source: r#"
|
||||
setTimeout(() => {
|
||||
tools.echo({ value: "delayed" });
|
||||
}, 1000);
|
||||
await Promise.all([
|
||||
tools.echo({ value: "second" }),
|
||||
tools.echo({ value: "third" }),
|
||||
]);
|
||||
"#
|
||||
.to_string(),
|
||||
yield_time_ms: Some(60_000),
|
||||
..execute_request("")
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(
|
||||
initial_response,
|
||||
ExecuteToPendingOutcome::Pending {
|
||||
cell_id: "1".to_string(),
|
||||
content_items: Vec::new(),
|
||||
pending_tool_call_ids: vec!["tool-1".to_string(), "tool-2".to_string()],
|
||||
}
|
||||
);
|
||||
|
||||
let runtime_tx = service
|
||||
.inner
|
||||
.sessions
|
||||
.lock()
|
||||
.await
|
||||
.get("1")
|
||||
.unwrap()
|
||||
.runtime_tx
|
||||
.clone();
|
||||
runtime_tx
|
||||
.send(RuntimeCommand::TimeoutFired { id: 1 })
|
||||
.unwrap();
|
||||
|
||||
let resumed_response = tokio::time::timeout(
|
||||
Duration::from_secs(1),
|
||||
service.wait_to_pending(WaitToPendingRequest {
|
||||
cell_id: "1".to_string(),
|
||||
}),
|
||||
)
|
||||
.await
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(
|
||||
resumed_response,
|
||||
WaitToPendingOutcome::LiveCell(ExecuteToPendingOutcome::Pending {
|
||||
cell_id: "1".to_string(),
|
||||
content_items: Vec::new(),
|
||||
pending_tool_call_ids: vec!["tool-3".to_string()],
|
||||
})
|
||||
);
|
||||
|
||||
let termination = service
|
||||
.wait(WaitRequest {
|
||||
cell_id: "1".to_string(),
|
||||
yield_time_ms: 1,
|
||||
terminate: true,
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(
|
||||
termination,
|
||||
WaitOutcome::LiveCell(RuntimeResponse::Terminated {
|
||||
cell_id: "1".to_string(),
|
||||
content_items: Vec::new(),
|
||||
})
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn wait_to_pending_returns_after_resumed_runtime_becomes_quiescent_again() {
|
||||
let service = CodeModeService::new();
|
||||
|
||||
let initial_response = service
|
||||
.execute_to_pending(ExecuteRequest {
|
||||
source: r#"
|
||||
await new Promise((resolve) => setTimeout(resolve, 60_000));
|
||||
text("after");
|
||||
await new Promise(() => {});
|
||||
"#
|
||||
.to_string(),
|
||||
yield_time_ms: Some(60_000),
|
||||
..execute_request("")
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(
|
||||
initial_response,
|
||||
ExecuteToPendingOutcome::Pending {
|
||||
cell_id: "1".to_string(),
|
||||
content_items: Vec::new(),
|
||||
pending_tool_call_ids: Vec::new(),
|
||||
}
|
||||
);
|
||||
|
||||
let runtime_tx = service
|
||||
.inner
|
||||
.sessions
|
||||
.lock()
|
||||
.await
|
||||
.get("1")
|
||||
.unwrap()
|
||||
.runtime_tx
|
||||
.clone();
|
||||
runtime_tx
|
||||
.send(RuntimeCommand::TimeoutFired { id: 1 })
|
||||
.unwrap();
|
||||
|
||||
let resumed_response = tokio::time::timeout(
|
||||
Duration::from_secs(1),
|
||||
service.wait_to_pending(WaitToPendingRequest {
|
||||
cell_id: "1".to_string(),
|
||||
}),
|
||||
)
|
||||
.await
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(
|
||||
resumed_response,
|
||||
WaitToPendingOutcome::LiveCell(ExecuteToPendingOutcome::Pending {
|
||||
cell_id: "1".to_string(),
|
||||
content_items: vec![FunctionCallOutputContentItem::InputText {
|
||||
text: "after".to_string(),
|
||||
}],
|
||||
pending_tool_call_ids: Vec::new(),
|
||||
})
|
||||
);
|
||||
|
||||
let termination = service
|
||||
.wait(WaitRequest {
|
||||
cell_id: "1".to_string(),
|
||||
yield_time_ms: 1,
|
||||
terminate: true,
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(
|
||||
termination,
|
||||
WaitOutcome::LiveCell(RuntimeResponse::Terminated {
|
||||
cell_id: "1".to_string(),
|
||||
content_items: Vec::new(),
|
||||
})
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn wait_to_pending_returns_completed_after_resumed_runtime_finishes() {
|
||||
let service = CodeModeService::new();
|
||||
|
||||
let initial_response = service
|
||||
.execute_to_pending(ExecuteRequest {
|
||||
source: r#"
|
||||
await new Promise((resolve) => setTimeout(resolve, 60_000));
|
||||
text("done");
|
||||
"#
|
||||
.to_string(),
|
||||
yield_time_ms: Some(60_000),
|
||||
..execute_request("")
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(
|
||||
initial_response,
|
||||
ExecuteToPendingOutcome::Pending {
|
||||
cell_id: "1".to_string(),
|
||||
content_items: Vec::new(),
|
||||
pending_tool_call_ids: Vec::new(),
|
||||
}
|
||||
);
|
||||
|
||||
let runtime_tx = service
|
||||
.inner
|
||||
.sessions
|
||||
.lock()
|
||||
.await
|
||||
.get("1")
|
||||
.unwrap()
|
||||
.runtime_tx
|
||||
.clone();
|
||||
runtime_tx
|
||||
.send(RuntimeCommand::TimeoutFired { id: 1 })
|
||||
.unwrap();
|
||||
|
||||
let resumed_response = tokio::time::timeout(
|
||||
Duration::from_secs(1),
|
||||
service.wait_to_pending(WaitToPendingRequest {
|
||||
cell_id: "1".to_string(),
|
||||
}),
|
||||
)
|
||||
.await
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(
|
||||
resumed_response,
|
||||
WaitToPendingOutcome::LiveCell(ExecuteToPendingOutcome::Completed(
|
||||
RuntimeResponse::Result {
|
||||
cell_id: "1".to_string(),
|
||||
content_items: vec![FunctionCallOutputContentItem::InputText {
|
||||
text: "done".to_string(),
|
||||
}],
|
||||
stored_values: HashMap::new(),
|
||||
error_text: None,
|
||||
}
|
||||
))
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn v8_console_is_not_exposed_on_global_this() {
|
||||
let service = CodeModeService::new();
|
||||
@@ -898,13 +1442,14 @@ image({
|
||||
let (control_tx, control_rx) = mpsc::unbounded_channel();
|
||||
let (initial_response_tx, initial_response_rx) = oneshot::channel();
|
||||
let (runtime_event_tx, _runtime_event_rx) = mpsc::unbounded_channel();
|
||||
let (runtime_tx, runtime_terminate_handle) = spawn_runtime(
|
||||
let (runtime_tx, runtime_control_tx, runtime_terminate_handle) = spawn_runtime(
|
||||
ExecuteRequest {
|
||||
source: "await new Promise(() => {})".to_string(),
|
||||
yield_time_ms: None,
|
||||
..execute_request("")
|
||||
},
|
||||
runtime_event_tx,
|
||||
PendingRuntimeMode::Continue,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
@@ -913,12 +1458,14 @@ image({
|
||||
SessionControlContext {
|
||||
cell_id: "cell-1".to_string(),
|
||||
runtime_tx: runtime_tx.clone(),
|
||||
runtime_control_tx,
|
||||
pending_mode: PendingRuntimeMode::Continue,
|
||||
runtime_terminate_handle,
|
||||
},
|
||||
event_rx,
|
||||
control_rx,
|
||||
initial_response_tx,
|
||||
/*initial_yield_time_ms*/ 60_000,
|
||||
SessionResponseSender::Runtime(initial_response_tx),
|
||||
Some(/*initial_yield_time_ms*/ 60_000),
|
||||
));
|
||||
|
||||
event_tx.send(RuntimeEvent::Started).unwrap();
|
||||
|
||||
Reference in New Issue
Block a user