use std::collections::HashMap; use std::sync::Arc; use std::sync::atomic::AtomicU64; use async_channel::Receiver; use async_channel::Sender; use codex_async_utils::OrCancelExt; use codex_protocol::protocol::ApplyPatchApprovalRequestEvent; use codex_protocol::protocol::Event; use codex_protocol::protocol::EventMsg; use codex_protocol::protocol::ExecApprovalRequestEvent; use codex_protocol::protocol::Op; use codex_protocol::protocol::RequestUserInputEvent; use codex_protocol::protocol::SessionSource; use codex_protocol::protocol::SubAgentSource; use codex_protocol::protocol::Submission; use codex_protocol::request_user_input::RequestUserInputArgs; use codex_protocol::request_user_input::RequestUserInputResponse; use codex_protocol::user_input::UserInput; use std::time::Duration; use tokio::time::timeout; use tokio_util::sync::CancellationToken; use crate::AuthManager; use crate::codex::Codex; use crate::codex::CodexSpawnOk; use crate::codex::SUBMISSION_CHANNEL_CAPACITY; use crate::codex::Session; use crate::codex::TurnContext; use crate::config::Config; use crate::error::CodexErr; use crate::models_manager::manager::ModelsManager; use codex_protocol::protocol::InitialHistory; /// Start an interactive sub-Codex thread and return IO channels. /// /// The returned `events_rx` yields non-approval events emitted by the sub-agent. /// Approval requests are handled via `parent_session` and are not surfaced. /// The returned `ops_tx` allows the caller to submit additional `Op`s to the sub-agent. pub(crate) async fn run_codex_thread_interactive( config: Config, auth_manager: Arc, models_manager: Arc, parent_session: Arc, parent_ctx: Arc, cancel_token: CancellationToken, initial_history: Option, ) -> Result { let (tx_sub, rx_sub) = async_channel::bounded(SUBMISSION_CHANNEL_CAPACITY); let (tx_ops, rx_ops) = async_channel::bounded(SUBMISSION_CHANNEL_CAPACITY); let CodexSpawnOk { codex, .. } = Codex::spawn( config, auth_manager, models_manager, Arc::clone(&parent_session.services.skills_manager), initial_history.unwrap_or(InitialHistory::New), SessionSource::SubAgent(SubAgentSource::Review), parent_session.services.agent_control.clone(), Vec::new(), ) .await?; let codex = Arc::new(codex); // Use a child token so parent cancel cascades but we can scope it to this task let cancel_token_events = cancel_token.child_token(); let cancel_token_ops = cancel_token.child_token(); // Forward events from the sub-agent to the consumer, filtering approvals and // routing them to the parent session for decisions. let parent_session_clone = Arc::clone(&parent_session); let parent_ctx_clone = Arc::clone(&parent_ctx); let codex_for_events = Arc::clone(&codex); tokio::spawn(async move { forward_events( codex_for_events, tx_sub, parent_session_clone, parent_ctx_clone, cancel_token_events, ) .await; }); // Forward ops from the caller to the sub-agent. let codex_for_ops = Arc::clone(&codex); tokio::spawn(async move { forward_ops(codex_for_ops, rx_ops, cancel_token_ops).await; }); Ok(Codex { next_id: AtomicU64::new(0), tx_sub: tx_ops, rx_event: rx_sub, agent_status: codex.agent_status.clone(), session: Arc::clone(&codex.session), }) } /// Convenience wrapper for one-time use with an initial prompt. /// /// Internally calls the interactive variant, then immediately submits the provided input. #[allow(clippy::too_many_arguments)] pub(crate) async fn run_codex_thread_one_shot( config: Config, auth_manager: Arc, models_manager: Arc, input: Vec, parent_session: Arc, parent_ctx: Arc, cancel_token: CancellationToken, initial_history: Option, ) -> Result { // Use a child token so we can stop the delegate after completion without // requiring the caller to cancel the parent token. let child_cancel = cancel_token.child_token(); let io = run_codex_thread_interactive( config, auth_manager, models_manager, parent_session, parent_ctx, child_cancel.clone(), initial_history, ) .await?; // Send the initial input to kick off the one-shot turn. io.submit(Op::UserInput { items: input, final_output_json_schema: None, }) .await?; // Bridge events so we can observe completion and shut down automatically. let (tx_bridge, rx_bridge) = async_channel::bounded(SUBMISSION_CHANNEL_CAPACITY); let ops_tx = io.tx_sub.clone(); let agent_status = io.agent_status.clone(); let session = Arc::clone(&io.session); let io_for_bridge = io; tokio::spawn(async move { while let Ok(event) = io_for_bridge.next_event().await { let should_shutdown = matches!( event.msg, EventMsg::TurnComplete(_) | EventMsg::TurnAborted(_) ); let _ = tx_bridge.send(event).await; if should_shutdown { let _ = ops_tx .send(Submission { id: "shutdown".to_string(), op: Op::Shutdown {}, }) .await; child_cancel.cancel(); break; } } }); // For one-shot usage, return a closed `tx_sub` so callers cannot submit // additional ops after the initial request. Create a channel and drop the // receiver to close it immediately. let (tx_closed, rx_closed) = async_channel::bounded(SUBMISSION_CHANNEL_CAPACITY); drop(rx_closed); Ok(Codex { next_id: AtomicU64::new(0), rx_event: rx_bridge, tx_sub: tx_closed, agent_status, session, }) } async fn forward_events( codex: Arc, tx_sub: Sender, parent_session: Arc, parent_ctx: Arc, cancel_token: CancellationToken, ) { let cancelled = cancel_token.cancelled(); tokio::pin!(cancelled); loop { tokio::select! { _ = &mut cancelled => { shutdown_delegate(&codex).await; break; } event = codex.next_event() => { let event = match event { Ok(event) => event, Err(_) => break, }; match event { // ignore all legacy delta events Event { id: _, msg: EventMsg::AgentMessageDelta(_) | EventMsg::AgentReasoningDelta(_), } => {} Event { id: _, msg: EventMsg::TokenCount(_), } => {} Event { id: _, msg: EventMsg::SessionConfigured(_), } => {} Event { id: _, msg: EventMsg::ThreadNameUpdated(_), } => {} Event { id, msg: EventMsg::ExecApprovalRequest(event), } => { // Initiate approval via parent session; do not surface to consumer. handle_exec_approval( &codex, id, &parent_session, &parent_ctx, event, &cancel_token, ) .await; } Event { id, msg: EventMsg::ApplyPatchApprovalRequest(event), } => { handle_patch_approval( &codex, id, &parent_session, &parent_ctx, event, &cancel_token, ) .await; } Event { id, msg: EventMsg::RequestUserInput(event), } => { handle_request_user_input( &codex, id, &parent_session, &parent_ctx, event, &cancel_token, ) .await; } other => { match tx_sub.send(other).or_cancel(&cancel_token).await { Ok(Ok(())) => {} _ => { shutdown_delegate(&codex).await; break; } } } } } } } } /// Ask the delegate to stop and drain its events so background sends do not hit a closed channel. async fn shutdown_delegate(codex: &Codex) { let _ = codex.submit(Op::Interrupt).await; let _ = codex.submit(Op::Shutdown {}).await; let _ = timeout(Duration::from_millis(500), async { while let Ok(event) = codex.next_event().await { if matches!( event.msg, EventMsg::TurnAborted(_) | EventMsg::TurnComplete(_) ) { break; } } }) .await; } /// Forward ops from a caller to a sub-agent, respecting cancellation. async fn forward_ops( codex: Arc, rx_ops: Receiver, cancel_token_ops: CancellationToken, ) { loop { let op: Op = match rx_ops.recv().or_cancel(&cancel_token_ops).await { Ok(Ok(Submission { id: _, op })) => op, Ok(Err(_)) | Err(_) => break, }; let _ = codex.submit(op).await; } } /// Handle an ExecApprovalRequest by consulting the parent session and replying. async fn handle_exec_approval( codex: &Codex, id: String, parent_session: &Session, parent_ctx: &TurnContext, event: ExecApprovalRequestEvent, cancel_token: &CancellationToken, ) { // Race approval with cancellation and timeout to avoid hangs. let approval_fut = parent_session.request_command_approval( parent_ctx, parent_ctx.sub_id.clone(), event.command, event.cwd, event.reason, event.proposed_execpolicy_amendment, ); let decision = await_approval_with_cancel( approval_fut, parent_session, &parent_ctx.sub_id, cancel_token, ) .await; let _ = codex.submit(Op::ExecApproval { id, decision }).await; } /// Handle an ApplyPatchApprovalRequest by consulting the parent session and replying. async fn handle_patch_approval( codex: &Codex, id: String, parent_session: &Session, parent_ctx: &TurnContext, event: ApplyPatchApprovalRequestEvent, cancel_token: &CancellationToken, ) { let decision_rx = parent_session .request_patch_approval( parent_ctx, parent_ctx.sub_id.clone(), event.changes, event.reason, event.grant_root, ) .await; let decision = await_approval_with_cancel( async move { decision_rx.await.unwrap_or_default() }, parent_session, &parent_ctx.sub_id, cancel_token, ) .await; let _ = codex.submit(Op::PatchApproval { id, decision }).await; } async fn handle_request_user_input( codex: &Codex, id: String, parent_session: &Session, parent_ctx: &TurnContext, event: RequestUserInputEvent, cancel_token: &CancellationToken, ) { let args = RequestUserInputArgs { questions: event.questions, }; let response_fut = parent_session.request_user_input(parent_ctx, parent_ctx.sub_id.clone(), args); let response = await_user_input_with_cancel( response_fut, parent_session, &parent_ctx.sub_id, cancel_token, ) .await; let _ = codex.submit(Op::UserInputAnswer { id, response }).await; } async fn await_user_input_with_cancel( fut: F, parent_session: &Session, sub_id: &str, cancel_token: &CancellationToken, ) -> RequestUserInputResponse where F: core::future::Future>, { tokio::select! { biased; _ = cancel_token.cancelled() => { let empty = RequestUserInputResponse { answers: HashMap::new(), }; parent_session .notify_user_input_response(sub_id, empty.clone()) .await; empty } response = fut => response.unwrap_or_else(|| RequestUserInputResponse { answers: HashMap::new(), }), } } /// Await an approval decision, aborting on cancellation. async fn await_approval_with_cancel( fut: F, parent_session: &Session, sub_id: &str, cancel_token: &CancellationToken, ) -> codex_protocol::protocol::ReviewDecision where F: core::future::Future, { tokio::select! { biased; _ = cancel_token.cancelled() => { parent_session .notify_approval(sub_id, codex_protocol::protocol::ReviewDecision::Abort) .await; codex_protocol::protocol::ReviewDecision::Abort } decision = fut => { decision } } } #[cfg(test)] mod tests { use super::*; use async_channel::bounded; use codex_protocol::models::ResponseItem; use codex_protocol::protocol::AgentStatus; use codex_protocol::protocol::RawResponseItemEvent; use codex_protocol::protocol::TurnAbortReason; use codex_protocol::protocol::TurnAbortedEvent; use pretty_assertions::assert_eq; use tokio::sync::watch; #[tokio::test] async fn forward_events_cancelled_while_send_blocked_shuts_down_delegate() { let (tx_events, rx_events) = bounded(1); let (tx_sub, rx_sub) = bounded(SUBMISSION_CHANNEL_CAPACITY); let (_agent_status_tx, agent_status) = watch::channel(AgentStatus::PendingInit); let (session, ctx, _rx_evt) = crate::codex::make_session_and_context_with_rx().await; let codex = Arc::new(Codex { next_id: AtomicU64::new(0), tx_sub, rx_event: rx_events, agent_status, session: Arc::clone(&session), }); let (tx_out, rx_out) = bounded(1); tx_out .send(Event { id: "full".to_string(), msg: EventMsg::TurnAborted(TurnAbortedEvent { reason: TurnAbortReason::Interrupted, }), }) .await .unwrap(); let cancel = CancellationToken::new(); let forward = tokio::spawn(forward_events( Arc::clone(&codex), tx_out.clone(), session, ctx, cancel.clone(), )); tx_events .send(Event { id: "evt".to_string(), msg: EventMsg::RawResponseItem(RawResponseItemEvent { item: ResponseItem::CustomToolCall { id: None, status: None, call_id: "call-1".to_string(), name: "tool".to_string(), input: "{}".to_string(), }, }), }) .await .unwrap(); drop(tx_events); cancel.cancel(); timeout(std::time::Duration::from_millis(1000), forward) .await .expect("forward_events hung") .expect("forward_events join error"); let received = rx_out.recv().await.expect("prefilled event missing"); assert_eq!("full", received.id); let mut ops = Vec::new(); while let Ok(sub) = rx_sub.try_recv() { ops.push(sub.op); } assert!( ops.iter().any(|op| matches!(op, Op::Interrupt)), "expected Interrupt op after cancellation" ); assert!( ops.iter().any(|op| matches!(op, Op::Shutdown)), "expected Shutdown op after cancellation" ); } }