Compare commits

...

1 Commits

Author SHA1 Message Date
jif-oai
d624b79177 feat: review as sub agent 2026-02-03 10:35:16 +00:00
9 changed files with 479 additions and 590 deletions

View File

@@ -2,9 +2,11 @@ use crate::agent::AgentStatus;
use crate::agent::guards::Guards;
use crate::error::CodexErr;
use crate::error::Result as CodexResult;
use crate::protocol::Event;
use crate::thread_manager::ThreadManagerState;
use codex_protocol::ThreadId;
use codex_protocol::protocol::Op;
use codex_protocol::protocol::SessionSource;
use codex_protocol::user_input::UserInput;
use std::sync::Arc;
use std::sync::Weak;
@@ -34,18 +36,30 @@ impl AgentControl {
}
}
/// Spawn a new agent thread and submit the initial prompt.
/// Spawn a new agent thread and optionally submit initial input.
pub(crate) async fn spawn_agent(
&self,
config: crate::config::Config,
prompt: String,
session_source: Option<codex_protocol::protocol::SessionSource>,
prompt: Option<String>,
session_source_override: Option<SessionSource>,
) -> CodexResult<ThreadId> {
self.spawn_agent_with_options(config, prompt, session_source_override, true)
.await
}
/// Spawn a new agent thread with explicit spawn options.
pub(crate) async fn spawn_agent_with_options(
&self,
config: crate::config::Config,
prompt: Option<String>,
session_source_override: Option<SessionSource>,
announce_thread_created: bool,
) -> CodexResult<ThreadId> {
let state = self.upgrade()?;
let reservation = self.state.reserve_spawn_slot(config.agent_max_threads)?;
// The same `AgentControl` is sent to spawn the thread.
let new_thread = match session_source {
let new_thread = match session_source_override {
Some(session_source) => {
state
.spawn_new_thread_with_source(config, self.clone(), session_source)
@@ -55,12 +69,16 @@ impl AgentControl {
};
reservation.commit(new_thread.thread_id);
// Notify a new thread has been created. This notification will be processed by clients
// to subscribe or drain this newly created thread.
// TODO(jif) add helper for drain
state.notify_thread_created(new_thread.thread_id);
if announce_thread_created {
// Notify a new thread has been created. This notification will be processed by clients
// to subscribe or drain this newly created thread.
// TODO(jif) add helper for drain
state.notify_thread_created(new_thread.thread_id);
}
self.send_prompt(new_thread.thread_id, prompt).await?;
if let Some(prompt) = prompt {
self.send_prompt(new_thread.thread_id, prompt).await?;
}
Ok(new_thread.thread_id)
}
@@ -71,20 +89,37 @@ impl AgentControl {
agent_id: ThreadId,
prompt: String,
) -> CodexResult<String> {
self.send_input(
agent_id,
vec![UserInput::Text {
text: prompt,
// Agent control prompts are plain text with no UI text elements.
text_elements: Vec::new(),
}],
)
.await
}
/// Send full `user` input items to an existing agent thread.
pub(crate) async fn send_input(
&self,
agent_id: ThreadId,
input: Vec<UserInput>,
) -> CodexResult<String> {
self.send_op(
agent_id,
Op::UserInput {
items: input,
final_output_json_schema: None,
},
)
.await
}
/// Send an operation to an existing agent thread.
pub(crate) async fn send_op(&self, agent_id: ThreadId, op: Op) -> CodexResult<String> {
let state = self.upgrade()?;
let result = state
.send_op(
agent_id,
Op::UserInput {
items: vec![UserInput::Text {
text: prompt,
// Agent control prompts are plain text with no UI text elements.
text_elements: Vec::new(),
}],
final_output_json_schema: None,
},
)
.await;
let result = state.send_op(agent_id, op).await;
if matches!(result, Err(CodexErr::InternalAgentDied)) {
let _ = state.remove_thread(&agent_id).await;
self.state.release_spawned_thread(agent_id);
@@ -92,6 +127,13 @@ impl AgentControl {
result
}
/// Read the next event from an existing agent thread.
pub(crate) async fn next_event(&self, agent_id: ThreadId) -> CodexResult<Event> {
let state = self.upgrade()?;
let thread = state.get_thread(agent_id).await?;
thread.next_event().await
}
/// Interrupt the current task for an existing agent thread.
pub(crate) async fn interrupt_agent(&self, agent_id: ThreadId) -> CodexResult<String> {
let state = self.upgrade()?;
@@ -278,7 +320,7 @@ mod tests {
let control = AgentControl::default();
let (_home, config) = test_config().await;
let err = control
.spawn_agent(config, "hello".to_string(), None)
.spawn_agent(config, Some("hello".to_string()), None)
.await
.expect_err("spawn_agent should fail without a manager");
assert_eq!(
@@ -375,12 +417,49 @@ mod tests {
assert_eq!(captured, Some(expected));
}
#[tokio::test]
async fn send_input_submits_user_items() {
let harness = AgentControlHarness::new().await;
let (thread_id, _thread) = harness.start_thread().await;
let input = vec![
UserInput::Text {
text: "hello".to_string(),
text_elements: Vec::new(),
},
UserInput::Text {
text: "world".to_string(),
text_elements: Vec::new(),
},
];
let submission_id = harness
.control
.send_input(thread_id, input.clone())
.await
.expect("send_input should succeed");
assert!(!submission_id.is_empty());
let expected = (
thread_id,
Op::UserInput {
items: input,
final_output_json_schema: None,
},
);
let captured = harness
.manager
.captured_ops()
.into_iter()
.find(|entry| *entry == expected);
assert_eq!(captured, Some(expected));
}
#[tokio::test]
async fn spawn_agent_creates_thread_and_sends_prompt() {
let harness = AgentControlHarness::new().await;
let thread_id = harness
.control
.spawn_agent(harness.config.clone(), "spawned".to_string(), None)
.spawn_agent(harness.config.clone(), Some("spawned".to_string()), None)
.await
.expect("spawn_agent should succeed");
let _thread = harness
@@ -406,6 +485,61 @@ mod tests {
assert_eq!(captured, Some(expected));
}
#[tokio::test]
async fn spawn_agent_without_prompt_does_not_submit_input() {
let harness = AgentControlHarness::new().await;
let thread_id = harness
.control
.spawn_agent(harness.config.clone(), None, None)
.await
.expect("spawn_agent should succeed");
let _thread = harness
.manager
.get_thread(thread_id)
.await
.expect("thread should be registered");
let captured = harness.manager.captured_ops().into_iter().find(|(id, op)| {
*id == thread_id
&& matches!(
op,
Op::UserInput {
final_output_json_schema: None,
..
}
)
});
assert_eq!(captured, None);
}
#[tokio::test]
async fn spawn_agent_with_options_can_skip_created_notification() {
let harness = AgentControlHarness::new().await;
let mut created_rx = harness.manager.subscribe_thread_created();
let thread_id = harness
.control
.spawn_agent_with_options(
harness.config.clone(),
None,
Some(SessionSource::SubAgent(
codex_protocol::protocol::SubAgentSource::Review,
)),
false,
)
.await
.expect("spawn_agent_with_options should succeed");
let result =
tokio::time::timeout(std::time::Duration::from_millis(100), created_rx.recv()).await;
assert!(result.is_err(), "did not expect thread_created event");
let _ = harness
.control
.shutdown_agent(thread_id)
.await
.expect("shutdown should succeed");
}
#[tokio::test]
async fn spawn_agent_respects_max_threads_limit() {
let max_threads = 1usize;
@@ -427,12 +561,12 @@ mod tests {
.expect("start thread");
let first_agent_id = control
.spawn_agent(config.clone(), "hello".to_string(), None)
.spawn_agent(config.clone(), Some("hello".to_string()), None)
.await
.expect("spawn_agent should succeed");
let err = control
.spawn_agent(config, "hello again".to_string(), None)
.spawn_agent(config, Some("hello again".to_string()), None)
.await
.expect_err("spawn_agent should respect max threads");
let CodexErr::AgentLimitReached {
@@ -465,7 +599,7 @@ mod tests {
let control = manager.agent_control();
let first_agent_id = control
.spawn_agent(config.clone(), "hello".to_string(), None)
.spawn_agent(config.clone(), Some("hello".to_string()), None)
.await
.expect("spawn_agent should succeed");
let _ = control
@@ -474,7 +608,7 @@ mod tests {
.expect("shutdown agent");
let second_agent_id = control
.spawn_agent(config.clone(), "hello again".to_string(), None)
.spawn_agent(config.clone(), Some("hello again".to_string()), None)
.await
.expect("spawn_agent should succeed after shutdown");
let _ = control
@@ -500,12 +634,12 @@ mod tests {
let cloned = control.clone();
let first_agent_id = cloned
.spawn_agent(config.clone(), "hello".to_string(), None)
.spawn_agent(config.clone(), Some("hello".to_string()), None)
.await
.expect("spawn_agent should succeed");
let err = control
.spawn_agent(config, "hello again".to_string(), None)
.spawn_agent(config, Some("hello again".to_string()), None)
.await
.expect_err("spawn_agent should respect shared guard");
let CodexErr::AgentLimitReached { max_threads } = err else {

View File

@@ -4496,8 +4496,6 @@ pub(super) fn get_last_assistant_message_from_turn(responses: &[ResponseItem]) -
#[cfg(test)]
pub(crate) use tests::make_session_and_context;
#[cfg(test)]
pub(crate) use tests::make_session_and_context_with_rx;
#[cfg(test)]
mod tests {

View File

@@ -1,520 +0,0 @@
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::atomic::AtomicU64;
use async_channel::Receiver;
use async_channel::Sender;
use codex_async_utils::OrCancelExt;
use codex_protocol::protocol::ApplyPatchApprovalRequestEvent;
use codex_protocol::protocol::Event;
use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::ExecApprovalRequestEvent;
use codex_protocol::protocol::Op;
use codex_protocol::protocol::RequestUserInputEvent;
use codex_protocol::protocol::SessionSource;
use codex_protocol::protocol::SubAgentSource;
use codex_protocol::protocol::Submission;
use codex_protocol::request_user_input::RequestUserInputArgs;
use codex_protocol::request_user_input::RequestUserInputResponse;
use codex_protocol::user_input::UserInput;
use std::time::Duration;
use tokio::time::timeout;
use tokio_util::sync::CancellationToken;
use crate::AuthManager;
use crate::codex::Codex;
use crate::codex::CodexSpawnOk;
use crate::codex::SUBMISSION_CHANNEL_CAPACITY;
use crate::codex::Session;
use crate::codex::TurnContext;
use crate::config::Config;
use crate::error::CodexErr;
use crate::models_manager::manager::ModelsManager;
use codex_protocol::protocol::InitialHistory;
/// Start an interactive sub-Codex thread and return IO channels.
///
/// The returned `events_rx` yields non-approval events emitted by the sub-agent.
/// Approval requests are handled via `parent_session` and are not surfaced.
/// The returned `ops_tx` allows the caller to submit additional `Op`s to the sub-agent.
pub(crate) async fn run_codex_thread_interactive(
config: Config,
auth_manager: Arc<AuthManager>,
models_manager: Arc<ModelsManager>,
parent_session: Arc<Session>,
parent_ctx: Arc<TurnContext>,
cancel_token: CancellationToken,
initial_history: Option<InitialHistory>,
) -> Result<Codex, CodexErr> {
let (tx_sub, rx_sub) = async_channel::bounded(SUBMISSION_CHANNEL_CAPACITY);
let (tx_ops, rx_ops) = async_channel::bounded(SUBMISSION_CHANNEL_CAPACITY);
let CodexSpawnOk { codex, .. } = Codex::spawn(
config,
auth_manager,
models_manager,
Arc::clone(&parent_session.services.skills_manager),
initial_history.unwrap_or(InitialHistory::New),
SessionSource::SubAgent(SubAgentSource::Review),
parent_session.services.agent_control.clone(),
Vec::new(),
)
.await?;
let codex = Arc::new(codex);
// Use a child token so parent cancel cascades but we can scope it to this task
let cancel_token_events = cancel_token.child_token();
let cancel_token_ops = cancel_token.child_token();
// Forward events from the sub-agent to the consumer, filtering approvals and
// routing them to the parent session for decisions.
let parent_session_clone = Arc::clone(&parent_session);
let parent_ctx_clone = Arc::clone(&parent_ctx);
let codex_for_events = Arc::clone(&codex);
tokio::spawn(async move {
forward_events(
codex_for_events,
tx_sub,
parent_session_clone,
parent_ctx_clone,
cancel_token_events,
)
.await;
});
// Forward ops from the caller to the sub-agent.
let codex_for_ops = Arc::clone(&codex);
tokio::spawn(async move {
forward_ops(codex_for_ops, rx_ops, cancel_token_ops).await;
});
Ok(Codex {
next_id: AtomicU64::new(0),
tx_sub: tx_ops,
rx_event: rx_sub,
agent_status: codex.agent_status.clone(),
session: Arc::clone(&codex.session),
})
}
/// Convenience wrapper for one-time use with an initial prompt.
///
/// Internally calls the interactive variant, then immediately submits the provided input.
#[allow(clippy::too_many_arguments)]
pub(crate) async fn run_codex_thread_one_shot(
config: Config,
auth_manager: Arc<AuthManager>,
models_manager: Arc<ModelsManager>,
input: Vec<UserInput>,
parent_session: Arc<Session>,
parent_ctx: Arc<TurnContext>,
cancel_token: CancellationToken,
initial_history: Option<InitialHistory>,
) -> Result<Codex, CodexErr> {
// Use a child token so we can stop the delegate after completion without
// requiring the caller to cancel the parent token.
let child_cancel = cancel_token.child_token();
let io = run_codex_thread_interactive(
config,
auth_manager,
models_manager,
parent_session,
parent_ctx,
child_cancel.clone(),
initial_history,
)
.await?;
// Send the initial input to kick off the one-shot turn.
io.submit(Op::UserInput {
items: input,
final_output_json_schema: None,
})
.await?;
// Bridge events so we can observe completion and shut down automatically.
let (tx_bridge, rx_bridge) = async_channel::bounded(SUBMISSION_CHANNEL_CAPACITY);
let ops_tx = io.tx_sub.clone();
let agent_status = io.agent_status.clone();
let session = Arc::clone(&io.session);
let io_for_bridge = io;
tokio::spawn(async move {
while let Ok(event) = io_for_bridge.next_event().await {
let should_shutdown = matches!(
event.msg,
EventMsg::TurnComplete(_) | EventMsg::TurnAborted(_)
);
let _ = tx_bridge.send(event).await;
if should_shutdown {
let _ = ops_tx
.send(Submission {
id: "shutdown".to_string(),
op: Op::Shutdown {},
})
.await;
child_cancel.cancel();
break;
}
}
});
// For one-shot usage, return a closed `tx_sub` so callers cannot submit
// additional ops after the initial request. Create a channel and drop the
// receiver to close it immediately.
let (tx_closed, rx_closed) = async_channel::bounded(SUBMISSION_CHANNEL_CAPACITY);
drop(rx_closed);
Ok(Codex {
next_id: AtomicU64::new(0),
rx_event: rx_bridge,
tx_sub: tx_closed,
agent_status,
session,
})
}
async fn forward_events(
codex: Arc<Codex>,
tx_sub: Sender<Event>,
parent_session: Arc<Session>,
parent_ctx: Arc<TurnContext>,
cancel_token: CancellationToken,
) {
let cancelled = cancel_token.cancelled();
tokio::pin!(cancelled);
loop {
tokio::select! {
_ = &mut cancelled => {
shutdown_delegate(&codex).await;
break;
}
event = codex.next_event() => {
let event = match event {
Ok(event) => event,
Err(_) => break,
};
match event {
// ignore all legacy delta events
Event {
id: _,
msg: EventMsg::AgentMessageDelta(_) | EventMsg::AgentReasoningDelta(_),
} => {}
Event {
id: _,
msg: EventMsg::TokenCount(_),
} => {}
Event {
id: _,
msg: EventMsg::SessionConfigured(_),
} => {}
Event {
id: _,
msg: EventMsg::ThreadNameUpdated(_),
} => {}
Event {
id,
msg: EventMsg::ExecApprovalRequest(event),
} => {
// Initiate approval via parent session; do not surface to consumer.
handle_exec_approval(
&codex,
id,
&parent_session,
&parent_ctx,
event,
&cancel_token,
)
.await;
}
Event {
id,
msg: EventMsg::ApplyPatchApprovalRequest(event),
} => {
handle_patch_approval(
&codex,
id,
&parent_session,
&parent_ctx,
event,
&cancel_token,
)
.await;
}
Event {
id,
msg: EventMsg::RequestUserInput(event),
} => {
handle_request_user_input(
&codex,
id,
&parent_session,
&parent_ctx,
event,
&cancel_token,
)
.await;
}
other => {
match tx_sub.send(other).or_cancel(&cancel_token).await {
Ok(Ok(())) => {}
_ => {
shutdown_delegate(&codex).await;
break;
}
}
}
}
}
}
}
}
/// Ask the delegate to stop and drain its events so background sends do not hit a closed channel.
async fn shutdown_delegate(codex: &Codex) {
let _ = codex.submit(Op::Interrupt).await;
let _ = codex.submit(Op::Shutdown {}).await;
let _ = timeout(Duration::from_millis(500), async {
while let Ok(event) = codex.next_event().await {
if matches!(
event.msg,
EventMsg::TurnAborted(_) | EventMsg::TurnComplete(_)
) {
break;
}
}
})
.await;
}
/// Forward ops from a caller to a sub-agent, respecting cancellation.
async fn forward_ops(
codex: Arc<Codex>,
rx_ops: Receiver<Submission>,
cancel_token_ops: CancellationToken,
) {
loop {
let op: Op = match rx_ops.recv().or_cancel(&cancel_token_ops).await {
Ok(Ok(Submission { id: _, op })) => op,
Ok(Err(_)) | Err(_) => break,
};
let _ = codex.submit(op).await;
}
}
/// Handle an ExecApprovalRequest by consulting the parent session and replying.
async fn handle_exec_approval(
codex: &Codex,
id: String,
parent_session: &Session,
parent_ctx: &TurnContext,
event: ExecApprovalRequestEvent,
cancel_token: &CancellationToken,
) {
// Race approval with cancellation and timeout to avoid hangs.
let approval_fut = parent_session.request_command_approval(
parent_ctx,
parent_ctx.sub_id.clone(),
event.command,
event.cwd,
event.reason,
event.proposed_execpolicy_amendment,
);
let decision = await_approval_with_cancel(
approval_fut,
parent_session,
&parent_ctx.sub_id,
cancel_token,
)
.await;
let _ = codex.submit(Op::ExecApproval { id, decision }).await;
}
/// Handle an ApplyPatchApprovalRequest by consulting the parent session and replying.
async fn handle_patch_approval(
codex: &Codex,
id: String,
parent_session: &Session,
parent_ctx: &TurnContext,
event: ApplyPatchApprovalRequestEvent,
cancel_token: &CancellationToken,
) {
let decision_rx = parent_session
.request_patch_approval(
parent_ctx,
parent_ctx.sub_id.clone(),
event.changes,
event.reason,
event.grant_root,
)
.await;
let decision = await_approval_with_cancel(
async move { decision_rx.await.unwrap_or_default() },
parent_session,
&parent_ctx.sub_id,
cancel_token,
)
.await;
let _ = codex.submit(Op::PatchApproval { id, decision }).await;
}
async fn handle_request_user_input(
codex: &Codex,
id: String,
parent_session: &Session,
parent_ctx: &TurnContext,
event: RequestUserInputEvent,
cancel_token: &CancellationToken,
) {
let args = RequestUserInputArgs {
questions: event.questions,
};
let response_fut =
parent_session.request_user_input(parent_ctx, parent_ctx.sub_id.clone(), args);
let response = await_user_input_with_cancel(
response_fut,
parent_session,
&parent_ctx.sub_id,
cancel_token,
)
.await;
let _ = codex.submit(Op::UserInputAnswer { id, response }).await;
}
async fn await_user_input_with_cancel<F>(
fut: F,
parent_session: &Session,
sub_id: &str,
cancel_token: &CancellationToken,
) -> RequestUserInputResponse
where
F: core::future::Future<Output = Option<RequestUserInputResponse>>,
{
tokio::select! {
biased;
_ = cancel_token.cancelled() => {
let empty = RequestUserInputResponse {
answers: HashMap::new(),
};
parent_session
.notify_user_input_response(sub_id, empty.clone())
.await;
empty
}
response = fut => response.unwrap_or_else(|| RequestUserInputResponse {
answers: HashMap::new(),
}),
}
}
/// Await an approval decision, aborting on cancellation.
async fn await_approval_with_cancel<F>(
fut: F,
parent_session: &Session,
sub_id: &str,
cancel_token: &CancellationToken,
) -> codex_protocol::protocol::ReviewDecision
where
F: core::future::Future<Output = codex_protocol::protocol::ReviewDecision>,
{
tokio::select! {
biased;
_ = cancel_token.cancelled() => {
parent_session
.notify_approval(sub_id, codex_protocol::protocol::ReviewDecision::Abort)
.await;
codex_protocol::protocol::ReviewDecision::Abort
}
decision = fut => {
decision
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use async_channel::bounded;
use codex_protocol::models::ResponseItem;
use codex_protocol::protocol::AgentStatus;
use codex_protocol::protocol::RawResponseItemEvent;
use codex_protocol::protocol::TurnAbortReason;
use codex_protocol::protocol::TurnAbortedEvent;
use pretty_assertions::assert_eq;
use tokio::sync::watch;
#[tokio::test]
async fn forward_events_cancelled_while_send_blocked_shuts_down_delegate() {
let (tx_events, rx_events) = bounded(1);
let (tx_sub, rx_sub) = bounded(SUBMISSION_CHANNEL_CAPACITY);
let (_agent_status_tx, agent_status) = watch::channel(AgentStatus::PendingInit);
let (session, ctx, _rx_evt) = crate::codex::make_session_and_context_with_rx().await;
let codex = Arc::new(Codex {
next_id: AtomicU64::new(0),
tx_sub,
rx_event: rx_events,
agent_status,
session: Arc::clone(&session),
});
let (tx_out, rx_out) = bounded(1);
tx_out
.send(Event {
id: "full".to_string(),
msg: EventMsg::TurnAborted(TurnAbortedEvent {
reason: TurnAbortReason::Interrupted,
}),
})
.await
.unwrap();
let cancel = CancellationToken::new();
let forward = tokio::spawn(forward_events(
Arc::clone(&codex),
tx_out.clone(),
session,
ctx,
cancel.clone(),
));
tx_events
.send(Event {
id: "evt".to_string(),
msg: EventMsg::RawResponseItem(RawResponseItemEvent {
item: ResponseItem::CustomToolCall {
id: None,
status: None,
call_id: "call-1".to_string(),
name: "tool".to_string(),
input: "{}".to_string(),
},
}),
})
.await
.unwrap();
drop(tx_events);
cancel.cancel();
timeout(std::time::Duration::from_millis(1000), forward)
.await
.expect("forward_events hung")
.expect("forward_events join error");
let received = rx_out.recv().await.expect("prefilled event missing");
assert_eq!("full", received.id);
let mut ops = Vec::new();
while let Ok(sub) = rx_sub.try_recv() {
ops.push(sub.op);
}
assert!(
ops.iter().any(|op| matches!(op, Op::Interrupt)),
"expected Interrupt op after cancellation"
);
assert!(
ops.iter().any(|op| matches!(op, Op::Shutdown)),
"expected Shutdown op after cancellation"
);
}
}

View File

@@ -18,7 +18,6 @@ mod compact_remote;
pub use codex_thread::CodexThread;
pub use codex_thread::ThreadConfigSnapshot;
mod agent;
mod codex_delegate;
mod command_safety;
pub mod config;
pub mod config_loader;

View File

@@ -18,10 +18,8 @@ use tracing::Span;
use tracing::trace;
use tracing::warn;
use crate::AuthManager;
use crate::codex::Session;
use crate::codex::TurnContext;
use crate::models_manager::manager::ModelsManager;
use crate::protocol::EventMsg;
use crate::protocol::TurnAbortReason;
use crate::protocol::TurnAbortedEvent;
@@ -59,14 +57,6 @@ impl SessionTaskContext {
pub(crate) fn clone_session(&self) -> Arc<Session> {
Arc::clone(&self.session)
}
pub(crate) fn auth_manager(&self) -> Arc<AuthManager> {
Arc::clone(&self.session.services.auth_manager)
}
pub(crate) fn models_manager(&self) -> Arc<ModelsManager> {
Arc::clone(&self.session.services.models_manager)
}
}
/// Async task that drives a [`Session`] turn.

View File

@@ -1,26 +1,38 @@
use std::sync::Arc;
use async_trait::async_trait;
use codex_async_utils::OrCancelExt;
use codex_protocol::ThreadId;
use codex_protocol::config_types::WebSearchMode;
use codex_protocol::items::TurnItem;
use codex_protocol::models::ContentItem;
use codex_protocol::models::ResponseItem;
use codex_protocol::protocol::AgentMessageContentDeltaEvent;
use codex_protocol::protocol::AgentMessageDeltaEvent;
use codex_protocol::protocol::ApplyPatchApprovalRequestEvent;
use codex_protocol::protocol::Event;
use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::ExecApprovalRequestEvent;
use codex_protocol::protocol::ExitedReviewModeEvent;
use codex_protocol::protocol::ItemCompletedEvent;
use codex_protocol::protocol::Op;
use codex_protocol::protocol::RequestUserInputEvent;
use codex_protocol::protocol::ReviewDecision;
use codex_protocol::protocol::ReviewOutputEvent;
use codex_protocol::protocol::SessionSource;
use codex_protocol::protocol::SubAgentSource;
use codex_protocol::request_user_input::RequestUserInputArgs;
use codex_protocol::request_user_input::RequestUserInputResponse;
use tokio_util::sync::CancellationToken;
use crate::codex::SUBMISSION_CHANNEL_CAPACITY;
use crate::codex::Session;
use crate::codex::TurnContext;
use crate::codex_delegate::run_codex_thread_one_shot;
use crate::review_format::format_review_findings_block;
use crate::review_format::render_review_output_text;
use crate::state::TaskKind;
use codex_protocol::user_input::UserInput;
use std::collections::HashMap;
use super::SessionTask;
use super::SessionTaskContext;
@@ -84,7 +96,7 @@ async fn start_review_conversation(
) -> Option<async_channel::Receiver<Event>> {
let config = ctx.client.config();
let mut sub_agent_config = config.as_ref().clone();
// Carry over review-only feature restrictions so the delegate cannot
// Carry over review-only feature restrictions so the review agent cannot
// re-enable blocked tools (web search, view image).
sub_agent_config.web_search_mode = Some(WebSearchMode::Disabled);
@@ -96,19 +108,295 @@ async fn start_review_conversation(
.clone()
.unwrap_or_else(|| ctx.client.get_model());
sub_agent_config.model = Some(model);
(run_codex_thread_one_shot(
sub_agent_config,
session.auth_manager(),
session.models_manager(),
input,
session.clone_session(),
ctx.clone(),
cancellation_token,
None,
let agent_control = session.session.services.agent_control.clone();
let agent_id = match agent_control
.spawn_agent_with_options(
sub_agent_config,
None,
Some(SessionSource::SubAgent(SubAgentSource::Review)),
false,
)
.await
{
Ok(agent_id) => agent_id,
Err(_) => return None,
};
if agent_control.send_input(agent_id, input).await.is_err() {
let _ = agent_control.shutdown_agent(agent_id).await;
return None;
}
let (tx_sub, rx_sub) = async_channel::bounded(SUBMISSION_CHANNEL_CAPACITY);
let parent_session = session.clone_session();
let parent_ctx = ctx.clone();
let cancel_token = cancellation_token.child_token();
tokio::spawn(async move {
forward_review_events(
agent_control,
agent_id,
tx_sub,
parent_session,
parent_ctx,
cancel_token,
)
.await;
});
Some(rx_sub)
}
async fn forward_review_events(
agent_control: crate::agent::AgentControl,
agent_id: ThreadId,
tx_sub: async_channel::Sender<Event>,
parent_session: Arc<Session>,
parent_ctx: Arc<TurnContext>,
cancellation_token: CancellationToken,
) {
let cancelled = cancellation_token.cancelled();
tokio::pin!(cancelled);
loop {
tokio::select! {
_ = &mut cancelled => {
shutdown_review_agent(&agent_control, agent_id).await;
break;
}
event = agent_control.next_event(agent_id) => {
let event = match event {
Ok(event) => event,
Err(_) => {
shutdown_review_agent(&agent_control, agent_id).await;
break;
}
};
match event.msg.clone() {
EventMsg::AgentMessageDelta(_)
| EventMsg::AgentReasoningDelta(_)
| EventMsg::TokenCount(_)
| EventMsg::SessionConfigured(_)
| EventMsg::ThreadNameUpdated(_) => {}
EventMsg::ExecApprovalRequest(request) => {
if handle_exec_approval(
&agent_control,
agent_id,
event.id,
&parent_session,
&parent_ctx,
request,
&cancellation_token,
)
.await
.is_err()
{
shutdown_review_agent(&agent_control, agent_id).await;
break;
}
}
EventMsg::ApplyPatchApprovalRequest(request) => {
if handle_patch_approval(
&agent_control,
agent_id,
event.id,
&parent_session,
&parent_ctx,
request,
&cancellation_token,
)
.await
.is_err()
{
shutdown_review_agent(&agent_control, agent_id).await;
break;
}
}
EventMsg::RequestUserInput(request) => {
if handle_request_user_input(
&agent_control,
agent_id,
event.id,
&parent_session,
&parent_ctx,
request,
&cancellation_token,
)
.await
.is_err()
{
shutdown_review_agent(&agent_control, agent_id).await;
break;
}
}
EventMsg::TurnComplete(_) | EventMsg::TurnAborted(_) => {
if !send_review_event(&tx_sub, event, &cancellation_token).await {
shutdown_review_agent(&agent_control, agent_id).await;
break;
}
shutdown_review_agent(&agent_control, agent_id).await;
break;
}
_ => {
if !send_review_event(&tx_sub, event, &cancellation_token).await {
shutdown_review_agent(&agent_control, agent_id).await;
break;
}
}
}
}
}
}
}
async fn send_review_event(
tx_sub: &async_channel::Sender<Event>,
event: Event,
cancellation_token: &CancellationToken,
) -> bool {
matches!(
tx_sub.send(event).or_cancel(cancellation_token).await,
Ok(Ok(()))
)
.await)
.ok()
.map(|io| io.rx_event)
}
async fn shutdown_review_agent(agent_control: &crate::agent::AgentControl, agent_id: ThreadId) {
let _ = agent_control.shutdown_agent(agent_id).await;
}
async fn handle_exec_approval(
agent_control: &crate::agent::AgentControl,
agent_id: ThreadId,
id: String,
parent_session: &Session,
parent_ctx: &TurnContext,
event: ExecApprovalRequestEvent,
cancellation_token: &CancellationToken,
) -> crate::error::Result<()> {
let approval_fut = parent_session.request_command_approval(
parent_ctx,
parent_ctx.sub_id.clone(),
event.command,
event.cwd,
event.reason,
event.proposed_execpolicy_amendment,
);
let decision = await_approval_with_cancel(
approval_fut,
parent_session,
&parent_ctx.sub_id,
cancellation_token,
)
.await;
agent_control
.send_op(agent_id, Op::ExecApproval { id, decision })
.await
.map(|_| ())
}
async fn handle_patch_approval(
agent_control: &crate::agent::AgentControl,
agent_id: ThreadId,
id: String,
parent_session: &Session,
parent_ctx: &TurnContext,
event: ApplyPatchApprovalRequestEvent,
cancellation_token: &CancellationToken,
) -> crate::error::Result<()> {
let decision_rx = parent_session
.request_patch_approval(
parent_ctx,
parent_ctx.sub_id.clone(),
event.changes,
event.reason,
event.grant_root,
)
.await;
let decision = await_approval_with_cancel(
async move { decision_rx.await.unwrap_or_default() },
parent_session,
&parent_ctx.sub_id,
cancellation_token,
)
.await;
agent_control
.send_op(agent_id, Op::PatchApproval { id, decision })
.await
.map(|_| ())
}
async fn handle_request_user_input(
agent_control: &crate::agent::AgentControl,
agent_id: ThreadId,
id: String,
parent_session: &Session,
parent_ctx: &TurnContext,
event: RequestUserInputEvent,
cancellation_token: &CancellationToken,
) -> crate::error::Result<()> {
let args = RequestUserInputArgs {
questions: event.questions,
};
let response_fut =
parent_session.request_user_input(parent_ctx, parent_ctx.sub_id.clone(), args);
let response = await_user_input_with_cancel(
response_fut,
parent_session,
&parent_ctx.sub_id,
cancellation_token,
)
.await;
agent_control
.send_op(agent_id, Op::UserInputAnswer { id, response })
.await
.map(|_| ())
}
async fn await_user_input_with_cancel<F>(
fut: F,
parent_session: &Session,
sub_id: &str,
cancellation_token: &CancellationToken,
) -> RequestUserInputResponse
where
F: core::future::Future<Output = Option<RequestUserInputResponse>>,
{
tokio::select! {
biased;
_ = cancellation_token.cancelled() => {
let empty = RequestUserInputResponse {
answers: HashMap::new(),
};
parent_session
.notify_user_input_response(sub_id, empty.clone())
.await;
empty
}
response = fut => response.unwrap_or_else(|| RequestUserInputResponse {
answers: HashMap::new(),
}),
}
}
async fn await_approval_with_cancel<F>(
fut: F,
parent_session: &Session,
sub_id: &str,
cancellation_token: &CancellationToken,
) -> ReviewDecision
where
F: core::future::Future<Output = ReviewDecision>,
{
tokio::select! {
biased;
_ = cancellation_token.cancelled() => {
parent_session.notify_approval(sub_id, ReviewDecision::Abort).await;
ReviewDecision::Abort
}
decision = fut => {
decision
}
}
}
async fn process_review_events(

View File

@@ -146,7 +146,7 @@ mod spawn {
.agent_control
.spawn_agent(
config,
prompt.clone(),
Some(prompt.clone()),
Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
parent_thread_id: session.conversation_id,
depth: child_depth,

View File

@@ -23,7 +23,6 @@ mod auth_refresh;
mod cli_stream;
mod client;
mod client_websockets;
mod codex_delegate;
mod collaboration_instructions;
mod compact;
mod compact_remote;
@@ -59,6 +58,7 @@ mod request_user_input;
mod resume;
mod resume_warning;
mod review;
mod review_subagent;
mod rmcp_client;
mod rollout_list_find;
mod seatbelt;

View File

@@ -22,10 +22,10 @@ use core_test_support::test_codex::test_codex;
use core_test_support::wait_for_event;
use pretty_assertions::assert_eq;
/// Delegate should surface ExecApprovalRequest from sub-agent and proceed
/// Review sub-agent should surface ExecApprovalRequest and proceed
/// after parent submits an approval decision.
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn codex_delegate_forwards_exec_approval_and_proceeds_on_approval() {
async fn review_subagent_forwards_exec_approval_and_proceeds_on_approval() {
skip_if_no_network!();
// Sub-agent turn 1: emit a shell_command function_call requiring approval, then complete.
@@ -46,7 +46,7 @@ async fn codex_delegate_forwards_exec_approval_and_proceeds_on_approval() {
let review_json = serde_json::json!({
"findings": [],
"overall_correctness": "ok",
"overall_explanation": "delegate approved exec",
"overall_explanation": "review sub-agent approved exec",
"overall_confidence_score": 0.5
})
.to_string();
@@ -59,7 +59,7 @@ async fn codex_delegate_forwards_exec_approval_and_proceeds_on_approval() {
let server = start_mock_server().await;
mount_sse_sequence(&server, vec![sse1, sse2]).await;
// Build a conversation configured to require approvals so the delegate
// Build a conversation configured to require approvals so the review sub-agent
// routes ExecApprovalRequest via the parent.
let mut builder = test_codex().with_model("gpt-5.1").with_config(|config| {
config.approval_policy = Constrained::allow_any(AskForApproval::OnRequest);
@@ -86,7 +86,7 @@ async fn codex_delegate_forwards_exec_approval_and_proceeds_on_approval() {
})
.await;
// Expect parent-side approval request (forwarded by delegate).
// Expect parent-side approval request forwarded by the review sub-agent.
wait_for_event(&test.codex, |ev| {
matches!(ev, EventMsg::ExecApprovalRequest(_))
})
@@ -108,10 +108,10 @@ async fn codex_delegate_forwards_exec_approval_and_proceeds_on_approval() {
wait_for_event(&test.codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
}
/// Delegate should surface ApplyPatchApprovalRequest and honor parent decision
/// Review sub-agent should surface ApplyPatchApprovalRequest and honor parent decision
/// so the sub-agent can proceed to completion.
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn codex_delegate_forwards_patch_approval_and_proceeds_on_decision() {
async fn review_subagent_forwards_patch_approval_and_proceeds_on_decision() {
skip_if_no_network!();
let call_id = "call-patch-1";
@@ -124,7 +124,7 @@ async fn codex_delegate_forwards_patch_approval_and_proceeds_on_decision() {
let review_json = serde_json::json!({
"findings": [],
"overall_correctness": "ok",
"overall_explanation": "delegate patch handled",
"overall_explanation": "review sub-agent patch handled",
"overall_confidence_score": 0.5
})
.to_string();
@@ -166,7 +166,7 @@ async fn codex_delegate_forwards_patch_approval_and_proceeds_on_decision() {
})
.await;
// Deny via parent so delegate can continue; id "0" is the active sub_id in tests.
// Deny via parent so review sub-agent can continue; id "0" is the active sub_id in tests.
test.codex
.submit(Op::PatchApproval {
id: "0".into(),
@@ -183,7 +183,7 @@ async fn codex_delegate_forwards_patch_approval_and_proceeds_on_decision() {
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn codex_delegate_ignores_legacy_deltas() {
async fn review_subagent_ignores_legacy_deltas() {
skip_if_no_network!();
// Single response with reasoning summary deltas.
@@ -200,7 +200,7 @@ async fn codex_delegate_ignores_legacy_deltas() {
let mut builder = test_codex();
let test = builder.build(&server).await.expect("build test codex");
// Kick off review (delegated).
// Kick off review in a spawned sub-agent.
test.codex
.submit(Op::Review {
review_request: ReviewRequest {