mirror of
https://github.com/openai/codex.git
synced 2026-04-24 14:45:27 +00:00
## Summary Add dynamic tool injection to thread startup in API v2, wire dynamic tool calls through the app server to clients, and plumb responses back into the model tool pipeline. ### Flow (high level) - Thread start injects `dynamic_tools` into the model tool list for that thread (validation is done here). - When the model emits a tool call for one of those names, core raises a `DynamicToolCallRequest` event. - The app server forwards it to the client as `item/tool/call`, waits for the client’s response, then submits a `DynamicToolResponse` back to core. - Core turns that into a `function_call_output` in the next model request so the model can continue. ### What changed - Added dynamic tool specs to v2 thread start params and protocol types; introduced `item/tool/call` (request/response) for dynamic tool execution. - Core now registers dynamic tool specs at request time and routes those calls via a new dynamic tool handler. - App server validates tool names/schemas, forwards dynamic tool call requests to clients, and publishes tool outputs back into the session. - Integration tests
517 lines
16 KiB
Rust
517 lines
16 KiB
Rust
use std::collections::HashMap;
|
|
use std::sync::Arc;
|
|
use std::sync::atomic::AtomicU64;
|
|
|
|
use async_channel::Receiver;
|
|
use async_channel::Sender;
|
|
use codex_async_utils::OrCancelExt;
|
|
use codex_protocol::protocol::ApplyPatchApprovalRequestEvent;
|
|
use codex_protocol::protocol::Event;
|
|
use codex_protocol::protocol::EventMsg;
|
|
use codex_protocol::protocol::ExecApprovalRequestEvent;
|
|
use codex_protocol::protocol::Op;
|
|
use codex_protocol::protocol::RequestUserInputEvent;
|
|
use codex_protocol::protocol::SessionSource;
|
|
use codex_protocol::protocol::SubAgentSource;
|
|
use codex_protocol::protocol::Submission;
|
|
use codex_protocol::request_user_input::RequestUserInputArgs;
|
|
use codex_protocol::request_user_input::RequestUserInputResponse;
|
|
use codex_protocol::user_input::UserInput;
|
|
use std::time::Duration;
|
|
use tokio::time::timeout;
|
|
use tokio_util::sync::CancellationToken;
|
|
|
|
use crate::AuthManager;
|
|
use crate::codex::Codex;
|
|
use crate::codex::CodexSpawnOk;
|
|
use crate::codex::SUBMISSION_CHANNEL_CAPACITY;
|
|
use crate::codex::Session;
|
|
use crate::codex::TurnContext;
|
|
use crate::config::Config;
|
|
use crate::error::CodexErr;
|
|
use crate::models_manager::manager::ModelsManager;
|
|
use codex_protocol::protocol::InitialHistory;
|
|
|
|
/// Start an interactive sub-Codex thread and return IO channels.
|
|
///
|
|
/// The returned `events_rx` yields non-approval events emitted by the sub-agent.
|
|
/// Approval requests are handled via `parent_session` and are not surfaced.
|
|
/// The returned `ops_tx` allows the caller to submit additional `Op`s to the sub-agent.
|
|
pub(crate) async fn run_codex_thread_interactive(
|
|
config: Config,
|
|
auth_manager: Arc<AuthManager>,
|
|
models_manager: Arc<ModelsManager>,
|
|
parent_session: Arc<Session>,
|
|
parent_ctx: Arc<TurnContext>,
|
|
cancel_token: CancellationToken,
|
|
initial_history: Option<InitialHistory>,
|
|
) -> Result<Codex, CodexErr> {
|
|
let (tx_sub, rx_sub) = async_channel::bounded(SUBMISSION_CHANNEL_CAPACITY);
|
|
let (tx_ops, rx_ops) = async_channel::bounded(SUBMISSION_CHANNEL_CAPACITY);
|
|
|
|
let CodexSpawnOk { codex, .. } = Codex::spawn(
|
|
config,
|
|
auth_manager,
|
|
models_manager,
|
|
Arc::clone(&parent_session.services.skills_manager),
|
|
initial_history.unwrap_or(InitialHistory::New),
|
|
SessionSource::SubAgent(SubAgentSource::Review),
|
|
parent_session.services.agent_control.clone(),
|
|
Vec::new(),
|
|
)
|
|
.await?;
|
|
let codex = Arc::new(codex);
|
|
|
|
// Use a child token so parent cancel cascades but we can scope it to this task
|
|
let cancel_token_events = cancel_token.child_token();
|
|
let cancel_token_ops = cancel_token.child_token();
|
|
|
|
// Forward events from the sub-agent to the consumer, filtering approvals and
|
|
// routing them to the parent session for decisions.
|
|
let parent_session_clone = Arc::clone(&parent_session);
|
|
let parent_ctx_clone = Arc::clone(&parent_ctx);
|
|
let codex_for_events = Arc::clone(&codex);
|
|
tokio::spawn(async move {
|
|
forward_events(
|
|
codex_for_events,
|
|
tx_sub,
|
|
parent_session_clone,
|
|
parent_ctx_clone,
|
|
cancel_token_events,
|
|
)
|
|
.await;
|
|
});
|
|
|
|
// Forward ops from the caller to the sub-agent.
|
|
let codex_for_ops = Arc::clone(&codex);
|
|
tokio::spawn(async move {
|
|
forward_ops(codex_for_ops, rx_ops, cancel_token_ops).await;
|
|
});
|
|
|
|
Ok(Codex {
|
|
next_id: AtomicU64::new(0),
|
|
tx_sub: tx_ops,
|
|
rx_event: rx_sub,
|
|
agent_status: codex.agent_status.clone(),
|
|
session: Arc::clone(&codex.session),
|
|
})
|
|
}
|
|
|
|
/// Convenience wrapper for one-time use with an initial prompt.
|
|
///
|
|
/// Internally calls the interactive variant, then immediately submits the provided input.
|
|
#[allow(clippy::too_many_arguments)]
|
|
pub(crate) async fn run_codex_thread_one_shot(
|
|
config: Config,
|
|
auth_manager: Arc<AuthManager>,
|
|
models_manager: Arc<ModelsManager>,
|
|
input: Vec<UserInput>,
|
|
parent_session: Arc<Session>,
|
|
parent_ctx: Arc<TurnContext>,
|
|
cancel_token: CancellationToken,
|
|
initial_history: Option<InitialHistory>,
|
|
) -> Result<Codex, CodexErr> {
|
|
// Use a child token so we can stop the delegate after completion without
|
|
// requiring the caller to cancel the parent token.
|
|
let child_cancel = cancel_token.child_token();
|
|
let io = run_codex_thread_interactive(
|
|
config,
|
|
auth_manager,
|
|
models_manager,
|
|
parent_session,
|
|
parent_ctx,
|
|
child_cancel.clone(),
|
|
initial_history,
|
|
)
|
|
.await?;
|
|
|
|
// Send the initial input to kick off the one-shot turn.
|
|
io.submit(Op::UserInput {
|
|
items: input,
|
|
final_output_json_schema: None,
|
|
})
|
|
.await?;
|
|
|
|
// Bridge events so we can observe completion and shut down automatically.
|
|
let (tx_bridge, rx_bridge) = async_channel::bounded(SUBMISSION_CHANNEL_CAPACITY);
|
|
let ops_tx = io.tx_sub.clone();
|
|
let agent_status = io.agent_status.clone();
|
|
let session = Arc::clone(&io.session);
|
|
let io_for_bridge = io;
|
|
tokio::spawn(async move {
|
|
while let Ok(event) = io_for_bridge.next_event().await {
|
|
let should_shutdown = matches!(
|
|
event.msg,
|
|
EventMsg::TurnComplete(_) | EventMsg::TurnAborted(_)
|
|
);
|
|
let _ = tx_bridge.send(event).await;
|
|
if should_shutdown {
|
|
let _ = ops_tx
|
|
.send(Submission {
|
|
id: "shutdown".to_string(),
|
|
op: Op::Shutdown {},
|
|
})
|
|
.await;
|
|
child_cancel.cancel();
|
|
break;
|
|
}
|
|
}
|
|
});
|
|
|
|
// For one-shot usage, return a closed `tx_sub` so callers cannot submit
|
|
// additional ops after the initial request. Create a channel and drop the
|
|
// receiver to close it immediately.
|
|
let (tx_closed, rx_closed) = async_channel::bounded(SUBMISSION_CHANNEL_CAPACITY);
|
|
drop(rx_closed);
|
|
|
|
Ok(Codex {
|
|
next_id: AtomicU64::new(0),
|
|
rx_event: rx_bridge,
|
|
tx_sub: tx_closed,
|
|
agent_status,
|
|
session,
|
|
})
|
|
}
|
|
|
|
async fn forward_events(
|
|
codex: Arc<Codex>,
|
|
tx_sub: Sender<Event>,
|
|
parent_session: Arc<Session>,
|
|
parent_ctx: Arc<TurnContext>,
|
|
cancel_token: CancellationToken,
|
|
) {
|
|
let cancelled = cancel_token.cancelled();
|
|
tokio::pin!(cancelled);
|
|
|
|
loop {
|
|
tokio::select! {
|
|
_ = &mut cancelled => {
|
|
shutdown_delegate(&codex).await;
|
|
break;
|
|
}
|
|
event = codex.next_event() => {
|
|
let event = match event {
|
|
Ok(event) => event,
|
|
Err(_) => break,
|
|
};
|
|
match event {
|
|
// ignore all legacy delta events
|
|
Event {
|
|
id: _,
|
|
msg: EventMsg::AgentMessageDelta(_) | EventMsg::AgentReasoningDelta(_),
|
|
} => {}
|
|
Event {
|
|
id: _,
|
|
msg: EventMsg::TokenCount(_),
|
|
} => {}
|
|
Event {
|
|
id: _,
|
|
msg: EventMsg::SessionConfigured(_),
|
|
} => {}
|
|
Event {
|
|
id,
|
|
msg: EventMsg::ExecApprovalRequest(event),
|
|
} => {
|
|
// Initiate approval via parent session; do not surface to consumer.
|
|
handle_exec_approval(
|
|
&codex,
|
|
id,
|
|
&parent_session,
|
|
&parent_ctx,
|
|
event,
|
|
&cancel_token,
|
|
)
|
|
.await;
|
|
}
|
|
Event {
|
|
id,
|
|
msg: EventMsg::ApplyPatchApprovalRequest(event),
|
|
} => {
|
|
handle_patch_approval(
|
|
&codex,
|
|
id,
|
|
&parent_session,
|
|
&parent_ctx,
|
|
event,
|
|
&cancel_token,
|
|
)
|
|
.await;
|
|
}
|
|
Event {
|
|
id,
|
|
msg: EventMsg::RequestUserInput(event),
|
|
} => {
|
|
handle_request_user_input(
|
|
&codex,
|
|
id,
|
|
&parent_session,
|
|
&parent_ctx,
|
|
event,
|
|
&cancel_token,
|
|
)
|
|
.await;
|
|
}
|
|
other => {
|
|
match tx_sub.send(other).or_cancel(&cancel_token).await {
|
|
Ok(Ok(())) => {}
|
|
_ => {
|
|
shutdown_delegate(&codex).await;
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Ask the delegate to stop and drain its events so background sends do not hit a closed channel.
|
|
async fn shutdown_delegate(codex: &Codex) {
|
|
let _ = codex.submit(Op::Interrupt).await;
|
|
let _ = codex.submit(Op::Shutdown {}).await;
|
|
|
|
let _ = timeout(Duration::from_millis(500), async {
|
|
while let Ok(event) = codex.next_event().await {
|
|
if matches!(
|
|
event.msg,
|
|
EventMsg::TurnAborted(_) | EventMsg::TurnComplete(_)
|
|
) {
|
|
break;
|
|
}
|
|
}
|
|
})
|
|
.await;
|
|
}
|
|
|
|
/// Forward ops from a caller to a sub-agent, respecting cancellation.
|
|
async fn forward_ops(
|
|
codex: Arc<Codex>,
|
|
rx_ops: Receiver<Submission>,
|
|
cancel_token_ops: CancellationToken,
|
|
) {
|
|
loop {
|
|
let op: Op = match rx_ops.recv().or_cancel(&cancel_token_ops).await {
|
|
Ok(Ok(Submission { id: _, op })) => op,
|
|
Ok(Err(_)) | Err(_) => break,
|
|
};
|
|
let _ = codex.submit(op).await;
|
|
}
|
|
}
|
|
|
|
/// Handle an ExecApprovalRequest by consulting the parent session and replying.
|
|
async fn handle_exec_approval(
|
|
codex: &Codex,
|
|
id: String,
|
|
parent_session: &Session,
|
|
parent_ctx: &TurnContext,
|
|
event: ExecApprovalRequestEvent,
|
|
cancel_token: &CancellationToken,
|
|
) {
|
|
// Race approval with cancellation and timeout to avoid hangs.
|
|
let approval_fut = parent_session.request_command_approval(
|
|
parent_ctx,
|
|
parent_ctx.sub_id.clone(),
|
|
event.command,
|
|
event.cwd,
|
|
event.reason,
|
|
event.proposed_execpolicy_amendment,
|
|
);
|
|
let decision = await_approval_with_cancel(
|
|
approval_fut,
|
|
parent_session,
|
|
&parent_ctx.sub_id,
|
|
cancel_token,
|
|
)
|
|
.await;
|
|
|
|
let _ = codex.submit(Op::ExecApproval { id, decision }).await;
|
|
}
|
|
|
|
/// Handle an ApplyPatchApprovalRequest by consulting the parent session and replying.
|
|
async fn handle_patch_approval(
|
|
codex: &Codex,
|
|
id: String,
|
|
parent_session: &Session,
|
|
parent_ctx: &TurnContext,
|
|
event: ApplyPatchApprovalRequestEvent,
|
|
cancel_token: &CancellationToken,
|
|
) {
|
|
let decision_rx = parent_session
|
|
.request_patch_approval(
|
|
parent_ctx,
|
|
parent_ctx.sub_id.clone(),
|
|
event.changes,
|
|
event.reason,
|
|
event.grant_root,
|
|
)
|
|
.await;
|
|
let decision = await_approval_with_cancel(
|
|
async move { decision_rx.await.unwrap_or_default() },
|
|
parent_session,
|
|
&parent_ctx.sub_id,
|
|
cancel_token,
|
|
)
|
|
.await;
|
|
let _ = codex.submit(Op::PatchApproval { id, decision }).await;
|
|
}
|
|
|
|
async fn handle_request_user_input(
|
|
codex: &Codex,
|
|
id: String,
|
|
parent_session: &Session,
|
|
parent_ctx: &TurnContext,
|
|
event: RequestUserInputEvent,
|
|
cancel_token: &CancellationToken,
|
|
) {
|
|
let args = RequestUserInputArgs {
|
|
questions: event.questions,
|
|
};
|
|
let response_fut =
|
|
parent_session.request_user_input(parent_ctx, parent_ctx.sub_id.clone(), args);
|
|
let response = await_user_input_with_cancel(
|
|
response_fut,
|
|
parent_session,
|
|
&parent_ctx.sub_id,
|
|
cancel_token,
|
|
)
|
|
.await;
|
|
let _ = codex.submit(Op::UserInputAnswer { id, response }).await;
|
|
}
|
|
|
|
async fn await_user_input_with_cancel<F>(
|
|
fut: F,
|
|
parent_session: &Session,
|
|
sub_id: &str,
|
|
cancel_token: &CancellationToken,
|
|
) -> RequestUserInputResponse
|
|
where
|
|
F: core::future::Future<Output = Option<RequestUserInputResponse>>,
|
|
{
|
|
tokio::select! {
|
|
biased;
|
|
_ = cancel_token.cancelled() => {
|
|
let empty = RequestUserInputResponse {
|
|
answers: HashMap::new(),
|
|
};
|
|
parent_session
|
|
.notify_user_input_response(sub_id, empty.clone())
|
|
.await;
|
|
empty
|
|
}
|
|
response = fut => response.unwrap_or_else(|| RequestUserInputResponse {
|
|
answers: HashMap::new(),
|
|
}),
|
|
}
|
|
}
|
|
|
|
/// Await an approval decision, aborting on cancellation.
|
|
async fn await_approval_with_cancel<F>(
|
|
fut: F,
|
|
parent_session: &Session,
|
|
sub_id: &str,
|
|
cancel_token: &CancellationToken,
|
|
) -> codex_protocol::protocol::ReviewDecision
|
|
where
|
|
F: core::future::Future<Output = codex_protocol::protocol::ReviewDecision>,
|
|
{
|
|
tokio::select! {
|
|
biased;
|
|
_ = cancel_token.cancelled() => {
|
|
parent_session
|
|
.notify_approval(sub_id, codex_protocol::protocol::ReviewDecision::Abort)
|
|
.await;
|
|
codex_protocol::protocol::ReviewDecision::Abort
|
|
}
|
|
decision = fut => {
|
|
decision
|
|
}
|
|
}
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use super::*;
|
|
use async_channel::bounded;
|
|
use codex_protocol::models::ResponseItem;
|
|
use codex_protocol::protocol::AgentStatus;
|
|
use codex_protocol::protocol::RawResponseItemEvent;
|
|
use codex_protocol::protocol::TurnAbortReason;
|
|
use codex_protocol::protocol::TurnAbortedEvent;
|
|
use pretty_assertions::assert_eq;
|
|
use tokio::sync::watch;
|
|
|
|
#[tokio::test]
|
|
async fn forward_events_cancelled_while_send_blocked_shuts_down_delegate() {
|
|
let (tx_events, rx_events) = bounded(1);
|
|
let (tx_sub, rx_sub) = bounded(SUBMISSION_CHANNEL_CAPACITY);
|
|
let (_agent_status_tx, agent_status) = watch::channel(AgentStatus::PendingInit);
|
|
let (session, ctx, _rx_evt) = crate::codex::make_session_and_context_with_rx().await;
|
|
let codex = Arc::new(Codex {
|
|
next_id: AtomicU64::new(0),
|
|
tx_sub,
|
|
rx_event: rx_events,
|
|
agent_status,
|
|
session: Arc::clone(&session),
|
|
});
|
|
|
|
let (tx_out, rx_out) = bounded(1);
|
|
tx_out
|
|
.send(Event {
|
|
id: "full".to_string(),
|
|
msg: EventMsg::TurnAborted(TurnAbortedEvent {
|
|
reason: TurnAbortReason::Interrupted,
|
|
}),
|
|
})
|
|
.await
|
|
.unwrap();
|
|
|
|
let cancel = CancellationToken::new();
|
|
let forward = tokio::spawn(forward_events(
|
|
Arc::clone(&codex),
|
|
tx_out.clone(),
|
|
session,
|
|
ctx,
|
|
cancel.clone(),
|
|
));
|
|
|
|
tx_events
|
|
.send(Event {
|
|
id: "evt".to_string(),
|
|
msg: EventMsg::RawResponseItem(RawResponseItemEvent {
|
|
item: ResponseItem::CustomToolCall {
|
|
id: None,
|
|
status: None,
|
|
call_id: "call-1".to_string(),
|
|
name: "tool".to_string(),
|
|
input: "{}".to_string(),
|
|
},
|
|
}),
|
|
})
|
|
.await
|
|
.unwrap();
|
|
|
|
drop(tx_events);
|
|
cancel.cancel();
|
|
timeout(std::time::Duration::from_millis(1000), forward)
|
|
.await
|
|
.expect("forward_events hung")
|
|
.expect("forward_events join error");
|
|
|
|
let received = rx_out.recv().await.expect("prefilled event missing");
|
|
assert_eq!("full", received.id);
|
|
let mut ops = Vec::new();
|
|
while let Ok(sub) = rx_sub.try_recv() {
|
|
ops.push(sub.op);
|
|
}
|
|
assert!(
|
|
ops.iter().any(|op| matches!(op, Op::Interrupt)),
|
|
"expected Interrupt op after cancellation"
|
|
);
|
|
assert!(
|
|
ops.iter().any(|op| matches!(op, Op::Shutdown)),
|
|
"expected Shutdown op after cancellation"
|
|
);
|
|
}
|
|
}
|