mirror of
https://github.com/openai/codex.git
synced 2026-04-24 14:45:27 +00:00
feat: review as sub agent
This commit is contained in:
@@ -1,9 +1,12 @@
|
||||
use crate::agent::AgentStatus;
|
||||
use crate::error::CodexErr;
|
||||
use crate::error::Result as CodexResult;
|
||||
use crate::protocol::Event;
|
||||
use crate::thread_manager::ThreadManagerState;
|
||||
use codex_protocol::ThreadId;
|
||||
use codex_protocol::protocol::InitialHistory;
|
||||
use codex_protocol::protocol::Op;
|
||||
use codex_protocol::protocol::SessionSource;
|
||||
use codex_protocol::user_input::UserInput;
|
||||
use std::sync::Arc;
|
||||
use std::sync::Weak;
|
||||
@@ -26,21 +29,30 @@ impl AgentControl {
|
||||
Self { manager }
|
||||
}
|
||||
|
||||
/// Spawn a new agent thread and submit the initial prompt.
|
||||
/// Spawn a new agent thread and submit the initial prompt if defined.
|
||||
pub(crate) async fn spawn_agent(
|
||||
&self,
|
||||
config: crate::config::Config,
|
||||
prompt: String,
|
||||
prompt: Option<String>,
|
||||
session_source_override: Option<SessionSource>,
|
||||
) -> CodexResult<ThreadId> {
|
||||
let state = self.upgrade()?;
|
||||
let new_thread = state.spawn_new_thread(config, self.clone()).await?;
|
||||
let new_thread = state
|
||||
.start_thread(
|
||||
config,
|
||||
InitialHistory::New,
|
||||
self.clone(),
|
||||
session_source_override,
|
||||
)
|
||||
.await?;
|
||||
|
||||
// Notify a new thread has been created. This notification will be processed by clients
|
||||
// to subscribe or drain this newly created thread.
|
||||
// TODO(jif) add helper for drain
|
||||
state.notify_thread_created(new_thread.thread_id);
|
||||
|
||||
self.send_prompt(new_thread.thread_id, prompt).await?;
|
||||
if let Some(prompt) = prompt {
|
||||
self.send_prompt(new_thread.thread_id, prompt).await?;
|
||||
}
|
||||
|
||||
Ok(new_thread.thread_id)
|
||||
}
|
||||
@@ -50,17 +62,30 @@ impl AgentControl {
|
||||
&self,
|
||||
agent_id: ThreadId,
|
||||
prompt: String,
|
||||
) -> CodexResult<String> {
|
||||
self.send_input(
|
||||
agent_id,
|
||||
vec![UserInput::Text {
|
||||
text: prompt,
|
||||
// Plain text conversion has no UI element ranges.
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
/// Send a `UserInput` to an existing agent thread.
|
||||
pub(crate) async fn send_input(
|
||||
&self,
|
||||
agent_id: ThreadId,
|
||||
input: Vec<UserInput>,
|
||||
) -> CodexResult<String> {
|
||||
let state = self.upgrade()?;
|
||||
let result = state
|
||||
.send_op(
|
||||
agent_id,
|
||||
Op::UserInput {
|
||||
items: vec![UserInput::Text {
|
||||
text: prompt,
|
||||
// Plain text conversion has no UI element ranges.
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
items: input,
|
||||
final_output_json_schema: None,
|
||||
},
|
||||
)
|
||||
@@ -246,7 +271,7 @@ mod tests {
|
||||
let control = AgentControl::default();
|
||||
let (_home, config) = test_config().await;
|
||||
let err = control
|
||||
.spawn_agent(config, "hello".to_string())
|
||||
.spawn_agent(config, Some("hello".to_string()), None)
|
||||
.await
|
||||
.expect_err("spawn_agent should fail without a manager");
|
||||
assert_eq!(
|
||||
@@ -348,7 +373,7 @@ mod tests {
|
||||
let harness = AgentControlHarness::new().await;
|
||||
let thread_id = harness
|
||||
.control
|
||||
.spawn_agent(harness.config.clone(), "spawned".to_string())
|
||||
.spawn_agent(harness.config.clone(), Some("spawned".to_string()), None)
|
||||
.await
|
||||
.expect("spawn_agent should succeed");
|
||||
let _thread = harness
|
||||
|
||||
@@ -1,445 +0,0 @@
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::AtomicU64;
|
||||
|
||||
use async_channel::Receiver;
|
||||
use async_channel::Sender;
|
||||
use codex_async_utils::OrCancelExt;
|
||||
use codex_protocol::protocol::ApplyPatchApprovalRequestEvent;
|
||||
use codex_protocol::protocol::Event;
|
||||
use codex_protocol::protocol::EventMsg;
|
||||
use codex_protocol::protocol::ExecApprovalRequestEvent;
|
||||
use codex_protocol::protocol::Op;
|
||||
use codex_protocol::protocol::SessionSource;
|
||||
use codex_protocol::protocol::SubAgentSource;
|
||||
use codex_protocol::protocol::Submission;
|
||||
use codex_protocol::user_input::UserInput;
|
||||
use std::time::Duration;
|
||||
use tokio::time::timeout;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
|
||||
use crate::AuthManager;
|
||||
use crate::codex::Codex;
|
||||
use crate::codex::CodexSpawnOk;
|
||||
use crate::codex::SUBMISSION_CHANNEL_CAPACITY;
|
||||
use crate::codex::Session;
|
||||
use crate::codex::TurnContext;
|
||||
use crate::config::Config;
|
||||
use crate::error::CodexErr;
|
||||
use crate::models_manager::manager::ModelsManager;
|
||||
use codex_protocol::protocol::InitialHistory;
|
||||
|
||||
/// Start an interactive sub-Codex thread and return IO channels.
|
||||
///
|
||||
/// The returned `events_rx` yields non-approval events emitted by the sub-agent.
|
||||
/// Approval requests are handled via `parent_session` and are not surfaced.
|
||||
/// The returned `ops_tx` allows the caller to submit additional `Op`s to the sub-agent.
|
||||
pub(crate) async fn run_codex_thread_interactive(
|
||||
config: Config,
|
||||
auth_manager: Arc<AuthManager>,
|
||||
models_manager: Arc<ModelsManager>,
|
||||
parent_session: Arc<Session>,
|
||||
parent_ctx: Arc<TurnContext>,
|
||||
cancel_token: CancellationToken,
|
||||
initial_history: Option<InitialHistory>,
|
||||
) -> Result<Codex, CodexErr> {
|
||||
let (tx_sub, rx_sub) = async_channel::bounded(SUBMISSION_CHANNEL_CAPACITY);
|
||||
let (tx_ops, rx_ops) = async_channel::bounded(SUBMISSION_CHANNEL_CAPACITY);
|
||||
|
||||
let CodexSpawnOk { codex, .. } = Codex::spawn(
|
||||
config,
|
||||
auth_manager,
|
||||
models_manager,
|
||||
Arc::clone(&parent_session.services.skills_manager),
|
||||
initial_history.unwrap_or(InitialHistory::New),
|
||||
SessionSource::SubAgent(SubAgentSource::Review),
|
||||
parent_session.services.agent_control.clone(),
|
||||
)
|
||||
.await?;
|
||||
let codex = Arc::new(codex);
|
||||
|
||||
// Use a child token so parent cancel cascades but we can scope it to this task
|
||||
let cancel_token_events = cancel_token.child_token();
|
||||
let cancel_token_ops = cancel_token.child_token();
|
||||
|
||||
// Forward events from the sub-agent to the consumer, filtering approvals and
|
||||
// routing them to the parent session for decisions.
|
||||
let parent_session_clone = Arc::clone(&parent_session);
|
||||
let parent_ctx_clone = Arc::clone(&parent_ctx);
|
||||
let codex_for_events = Arc::clone(&codex);
|
||||
tokio::spawn(async move {
|
||||
forward_events(
|
||||
codex_for_events,
|
||||
tx_sub,
|
||||
parent_session_clone,
|
||||
parent_ctx_clone,
|
||||
cancel_token_events,
|
||||
)
|
||||
.await;
|
||||
});
|
||||
|
||||
// Forward ops from the caller to the sub-agent.
|
||||
let codex_for_ops = Arc::clone(&codex);
|
||||
tokio::spawn(async move {
|
||||
forward_ops(codex_for_ops, rx_ops, cancel_token_ops).await;
|
||||
});
|
||||
|
||||
Ok(Codex {
|
||||
next_id: AtomicU64::new(0),
|
||||
tx_sub: tx_ops,
|
||||
rx_event: rx_sub,
|
||||
agent_status: codex.agent_status.clone(),
|
||||
})
|
||||
}
|
||||
|
||||
/// Convenience wrapper for one-time use with an initial prompt.
|
||||
///
|
||||
/// Internally calls the interactive variant, then immediately submits the provided input.
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub(crate) async fn run_codex_thread_one_shot(
|
||||
config: Config,
|
||||
auth_manager: Arc<AuthManager>,
|
||||
models_manager: Arc<ModelsManager>,
|
||||
input: Vec<UserInput>,
|
||||
parent_session: Arc<Session>,
|
||||
parent_ctx: Arc<TurnContext>,
|
||||
cancel_token: CancellationToken,
|
||||
initial_history: Option<InitialHistory>,
|
||||
) -> Result<Codex, CodexErr> {
|
||||
// Use a child token so we can stop the delegate after completion without
|
||||
// requiring the caller to cancel the parent token.
|
||||
let child_cancel = cancel_token.child_token();
|
||||
let io = run_codex_thread_interactive(
|
||||
config,
|
||||
auth_manager,
|
||||
models_manager,
|
||||
parent_session,
|
||||
parent_ctx,
|
||||
child_cancel.clone(),
|
||||
initial_history,
|
||||
)
|
||||
.await?;
|
||||
|
||||
// Send the initial input to kick off the one-shot turn.
|
||||
io.submit(Op::UserInput {
|
||||
items: input,
|
||||
final_output_json_schema: None,
|
||||
})
|
||||
.await?;
|
||||
|
||||
// Bridge events so we can observe completion and shut down automatically.
|
||||
let (tx_bridge, rx_bridge) = async_channel::bounded(SUBMISSION_CHANNEL_CAPACITY);
|
||||
let ops_tx = io.tx_sub.clone();
|
||||
let agent_status = io.agent_status.clone();
|
||||
let io_for_bridge = io;
|
||||
tokio::spawn(async move {
|
||||
while let Ok(event) = io_for_bridge.next_event().await {
|
||||
let should_shutdown = matches!(
|
||||
event.msg,
|
||||
EventMsg::TurnComplete(_) | EventMsg::TurnAborted(_)
|
||||
);
|
||||
let _ = tx_bridge.send(event).await;
|
||||
if should_shutdown {
|
||||
let _ = ops_tx
|
||||
.send(Submission {
|
||||
id: "shutdown".to_string(),
|
||||
op: Op::Shutdown {},
|
||||
})
|
||||
.await;
|
||||
child_cancel.cancel();
|
||||
break;
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
// For one-shot usage, return a closed `tx_sub` so callers cannot submit
|
||||
// additional ops after the initial request. Create a channel and drop the
|
||||
// receiver to close it immediately.
|
||||
let (tx_closed, rx_closed) = async_channel::bounded(SUBMISSION_CHANNEL_CAPACITY);
|
||||
drop(rx_closed);
|
||||
|
||||
Ok(Codex {
|
||||
next_id: AtomicU64::new(0),
|
||||
rx_event: rx_bridge,
|
||||
tx_sub: tx_closed,
|
||||
agent_status,
|
||||
})
|
||||
}
|
||||
|
||||
async fn forward_events(
|
||||
codex: Arc<Codex>,
|
||||
tx_sub: Sender<Event>,
|
||||
parent_session: Arc<Session>,
|
||||
parent_ctx: Arc<TurnContext>,
|
||||
cancel_token: CancellationToken,
|
||||
) {
|
||||
let cancelled = cancel_token.cancelled();
|
||||
tokio::pin!(cancelled);
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
_ = &mut cancelled => {
|
||||
shutdown_delegate(&codex).await;
|
||||
break;
|
||||
}
|
||||
event = codex.next_event() => {
|
||||
let event = match event {
|
||||
Ok(event) => event,
|
||||
Err(_) => break,
|
||||
};
|
||||
match event {
|
||||
// ignore all legacy delta events
|
||||
Event {
|
||||
id: _,
|
||||
msg: EventMsg::AgentMessageDelta(_) | EventMsg::AgentReasoningDelta(_),
|
||||
} => {}
|
||||
Event {
|
||||
id: _,
|
||||
msg: EventMsg::TokenCount(_),
|
||||
} => {}
|
||||
Event {
|
||||
id: _,
|
||||
msg: EventMsg::SessionConfigured(_),
|
||||
} => {}
|
||||
Event {
|
||||
id,
|
||||
msg: EventMsg::ExecApprovalRequest(event),
|
||||
} => {
|
||||
// Initiate approval via parent session; do not surface to consumer.
|
||||
handle_exec_approval(
|
||||
&codex,
|
||||
id,
|
||||
&parent_session,
|
||||
&parent_ctx,
|
||||
event,
|
||||
&cancel_token,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
Event {
|
||||
id,
|
||||
msg: EventMsg::ApplyPatchApprovalRequest(event),
|
||||
} => {
|
||||
handle_patch_approval(
|
||||
&codex,
|
||||
id,
|
||||
&parent_session,
|
||||
&parent_ctx,
|
||||
event,
|
||||
&cancel_token,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
other => {
|
||||
match tx_sub.send(other).or_cancel(&cancel_token).await {
|
||||
Ok(Ok(())) => {}
|
||||
_ => {
|
||||
shutdown_delegate(&codex).await;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Ask the delegate to stop and drain its events so background sends do not hit a closed channel.
|
||||
async fn shutdown_delegate(codex: &Codex) {
|
||||
let _ = codex.submit(Op::Interrupt).await;
|
||||
let _ = codex.submit(Op::Shutdown {}).await;
|
||||
|
||||
let _ = timeout(Duration::from_millis(500), async {
|
||||
while let Ok(event) = codex.next_event().await {
|
||||
if matches!(
|
||||
event.msg,
|
||||
EventMsg::TurnAborted(_) | EventMsg::TurnComplete(_)
|
||||
) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
})
|
||||
.await;
|
||||
}
|
||||
|
||||
/// Forward ops from a caller to a sub-agent, respecting cancellation.
|
||||
async fn forward_ops(
|
||||
codex: Arc<Codex>,
|
||||
rx_ops: Receiver<Submission>,
|
||||
cancel_token_ops: CancellationToken,
|
||||
) {
|
||||
loop {
|
||||
let op: Op = match rx_ops.recv().or_cancel(&cancel_token_ops).await {
|
||||
Ok(Ok(Submission { id: _, op })) => op,
|
||||
Ok(Err(_)) | Err(_) => break,
|
||||
};
|
||||
let _ = codex.submit(op).await;
|
||||
}
|
||||
}
|
||||
|
||||
/// Handle an ExecApprovalRequest by consulting the parent session and replying.
|
||||
async fn handle_exec_approval(
|
||||
codex: &Codex,
|
||||
id: String,
|
||||
parent_session: &Session,
|
||||
parent_ctx: &TurnContext,
|
||||
event: ExecApprovalRequestEvent,
|
||||
cancel_token: &CancellationToken,
|
||||
) {
|
||||
// Race approval with cancellation and timeout to avoid hangs.
|
||||
let approval_fut = parent_session.request_command_approval(
|
||||
parent_ctx,
|
||||
parent_ctx.sub_id.clone(),
|
||||
event.command,
|
||||
event.cwd,
|
||||
event.reason,
|
||||
event.proposed_execpolicy_amendment,
|
||||
);
|
||||
let decision = await_approval_with_cancel(
|
||||
approval_fut,
|
||||
parent_session,
|
||||
&parent_ctx.sub_id,
|
||||
cancel_token,
|
||||
)
|
||||
.await;
|
||||
|
||||
let _ = codex.submit(Op::ExecApproval { id, decision }).await;
|
||||
}
|
||||
|
||||
/// Handle an ApplyPatchApprovalRequest by consulting the parent session and replying.
|
||||
async fn handle_patch_approval(
|
||||
codex: &Codex,
|
||||
id: String,
|
||||
parent_session: &Session,
|
||||
parent_ctx: &TurnContext,
|
||||
event: ApplyPatchApprovalRequestEvent,
|
||||
cancel_token: &CancellationToken,
|
||||
) {
|
||||
let decision_rx = parent_session
|
||||
.request_patch_approval(
|
||||
parent_ctx,
|
||||
parent_ctx.sub_id.clone(),
|
||||
event.changes,
|
||||
event.reason,
|
||||
event.grant_root,
|
||||
)
|
||||
.await;
|
||||
let decision = await_approval_with_cancel(
|
||||
async move { decision_rx.await.unwrap_or_default() },
|
||||
parent_session,
|
||||
&parent_ctx.sub_id,
|
||||
cancel_token,
|
||||
)
|
||||
.await;
|
||||
let _ = codex.submit(Op::PatchApproval { id, decision }).await;
|
||||
}
|
||||
|
||||
/// Await an approval decision, aborting on cancellation.
|
||||
async fn await_approval_with_cancel<F>(
|
||||
fut: F,
|
||||
parent_session: &Session,
|
||||
sub_id: &str,
|
||||
cancel_token: &CancellationToken,
|
||||
) -> codex_protocol::protocol::ReviewDecision
|
||||
where
|
||||
F: core::future::Future<Output = codex_protocol::protocol::ReviewDecision>,
|
||||
{
|
||||
tokio::select! {
|
||||
biased;
|
||||
_ = cancel_token.cancelled() => {
|
||||
parent_session
|
||||
.notify_approval(sub_id, codex_protocol::protocol::ReviewDecision::Abort)
|
||||
.await;
|
||||
codex_protocol::protocol::ReviewDecision::Abort
|
||||
}
|
||||
decision = fut => {
|
||||
decision
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use async_channel::bounded;
|
||||
use codex_protocol::models::ResponseItem;
|
||||
use codex_protocol::protocol::AgentStatus;
|
||||
use codex_protocol::protocol::RawResponseItemEvent;
|
||||
use codex_protocol::protocol::TurnAbortReason;
|
||||
use codex_protocol::protocol::TurnAbortedEvent;
|
||||
use pretty_assertions::assert_eq;
|
||||
use tokio::sync::watch;
|
||||
|
||||
#[tokio::test]
|
||||
async fn forward_events_cancelled_while_send_blocked_shuts_down_delegate() {
|
||||
let (tx_events, rx_events) = bounded(1);
|
||||
let (tx_sub, rx_sub) = bounded(SUBMISSION_CHANNEL_CAPACITY);
|
||||
let (_agent_status_tx, agent_status) = watch::channel(AgentStatus::PendingInit);
|
||||
let codex = Arc::new(Codex {
|
||||
next_id: AtomicU64::new(0),
|
||||
tx_sub,
|
||||
rx_event: rx_events,
|
||||
agent_status,
|
||||
});
|
||||
|
||||
let (session, ctx, _rx_evt) = crate::codex::make_session_and_context_with_rx().await;
|
||||
|
||||
let (tx_out, rx_out) = bounded(1);
|
||||
tx_out
|
||||
.send(Event {
|
||||
id: "full".to_string(),
|
||||
msg: EventMsg::TurnAborted(TurnAbortedEvent {
|
||||
reason: TurnAbortReason::Interrupted,
|
||||
}),
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let cancel = CancellationToken::new();
|
||||
let forward = tokio::spawn(forward_events(
|
||||
Arc::clone(&codex),
|
||||
tx_out.clone(),
|
||||
session,
|
||||
ctx,
|
||||
cancel.clone(),
|
||||
));
|
||||
|
||||
tx_events
|
||||
.send(Event {
|
||||
id: "evt".to_string(),
|
||||
msg: EventMsg::RawResponseItem(RawResponseItemEvent {
|
||||
item: ResponseItem::CustomToolCall {
|
||||
id: None,
|
||||
status: None,
|
||||
call_id: "call-1".to_string(),
|
||||
name: "tool".to_string(),
|
||||
input: "{}".to_string(),
|
||||
},
|
||||
}),
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
drop(tx_events);
|
||||
cancel.cancel();
|
||||
timeout(std::time::Duration::from_millis(1000), forward)
|
||||
.await
|
||||
.expect("forward_events hung")
|
||||
.expect("forward_events join error");
|
||||
|
||||
let received = rx_out.recv().await.expect("prefilled event missing");
|
||||
assert_eq!("full", received.id);
|
||||
let mut ops = Vec::new();
|
||||
while let Ok(sub) = rx_sub.try_recv() {
|
||||
ops.push(sub.op);
|
||||
}
|
||||
assert!(
|
||||
ops.iter().any(|op| matches!(op, Op::Interrupt)),
|
||||
"expected Interrupt op after cancellation"
|
||||
);
|
||||
assert!(
|
||||
ops.iter().any(|op| matches!(op, Op::Shutdown)),
|
||||
"expected Shutdown op after cancellation"
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -16,7 +16,6 @@ mod codex_thread;
|
||||
mod compact_remote;
|
||||
pub use codex_thread::CodexThread;
|
||||
mod agent;
|
||||
mod codex_delegate;
|
||||
mod command_safety;
|
||||
pub mod config;
|
||||
pub mod config_loader;
|
||||
|
||||
@@ -16,10 +16,9 @@ use tokio_util::task::AbortOnDropHandle;
|
||||
use tracing::trace;
|
||||
use tracing::warn;
|
||||
|
||||
use crate::AuthManager;
|
||||
use crate::agent::AgentControl;
|
||||
use crate::codex::Session;
|
||||
use crate::codex::TurnContext;
|
||||
use crate::models_manager::manager::ModelsManager;
|
||||
use crate::protocol::EventMsg;
|
||||
use crate::protocol::TurnAbortReason;
|
||||
use crate::protocol::TurnAbortedEvent;
|
||||
@@ -53,12 +52,8 @@ impl SessionTaskContext {
|
||||
Arc::clone(&self.session)
|
||||
}
|
||||
|
||||
pub(crate) fn auth_manager(&self) -> Arc<AuthManager> {
|
||||
Arc::clone(&self.session.services.auth_manager)
|
||||
}
|
||||
|
||||
pub(crate) fn models_manager(&self) -> Arc<ModelsManager> {
|
||||
Arc::clone(&self.session.services.models_manager)
|
||||
pub(crate) fn agent_control(&self) -> AgentControl {
|
||||
self.session.services.agent_control.clone()
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1,5 +1,3 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use codex_protocol::config_types::WebSearchMode;
|
||||
use codex_protocol::items::TurnItem;
|
||||
@@ -12,11 +10,12 @@ use codex_protocol::protocol::EventMsg;
|
||||
use codex_protocol::protocol::ExitedReviewModeEvent;
|
||||
use codex_protocol::protocol::ItemCompletedEvent;
|
||||
use codex_protocol::protocol::ReviewOutputEvent;
|
||||
use codex_protocol::protocol::SessionSource;
|
||||
use codex_protocol::protocol::SubAgentSource;
|
||||
use std::sync::Arc;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
|
||||
use crate::codex::Session;
|
||||
use crate::codex::TurnContext;
|
||||
use crate::codex_delegate::run_codex_thread_one_shot;
|
||||
use crate::review_format::format_review_findings_block;
|
||||
use crate::review_format::render_review_output_text;
|
||||
use crate::state::TaskKind;
|
||||
@@ -84,7 +83,7 @@ async fn start_review_conversation(
|
||||
) -> Option<async_channel::Receiver<Event>> {
|
||||
let config = ctx.client.config();
|
||||
let mut sub_agent_config = config.as_ref().clone();
|
||||
// Carry over review-only feature restrictions so the delegate cannot
|
||||
// Carry over review-only feature restrictions so the sub-agent cannot
|
||||
// re-enable blocked tools (web search, view image).
|
||||
sub_agent_config.web_search_mode = Some(WebSearchMode::Disabled);
|
||||
|
||||
@@ -96,19 +95,21 @@ async fn start_review_conversation(
|
||||
.clone()
|
||||
.unwrap_or_else(|| ctx.client.get_model());
|
||||
sub_agent_config.model = Some(model);
|
||||
(run_codex_thread_one_shot(
|
||||
sub_agent_config,
|
||||
session.auth_manager(),
|
||||
session.models_manager(),
|
||||
input,
|
||||
session.clone_session(),
|
||||
ctx.clone(),
|
||||
cancellation_token,
|
||||
None,
|
||||
)
|
||||
.await)
|
||||
.ok()
|
||||
.map(|io| io.rx_event)
|
||||
|
||||
let agent_control = session.agent_control();
|
||||
let thread_id = agent_control
|
||||
.spawn_agent(
|
||||
sub_agent_config,
|
||||
None,
|
||||
Some(SessionSource::SubAgent(SubAgentSource::Review)),
|
||||
)
|
||||
.await
|
||||
.ok()?;
|
||||
|
||||
agent_control.send_input(thread_id, input).await.ok()?;
|
||||
|
||||
// todo this part is not done yet and won't be easy
|
||||
None
|
||||
}
|
||||
|
||||
async fn process_review_events(
|
||||
|
||||
@@ -187,6 +187,7 @@ impl ThreadManager {
|
||||
}
|
||||
}
|
||||
|
||||
/// Subscribe on the channel that sends the ID of newly created threads.
|
||||
pub fn subscribe_thread_created(&self) -> broadcast::Receiver<ThreadId> {
|
||||
self.state.thread_created_tx.subscribe()
|
||||
}
|
||||
@@ -197,12 +198,7 @@ impl ThreadManager {
|
||||
|
||||
pub async fn start_thread(&self, config: Config) -> CodexResult<NewThread> {
|
||||
self.state
|
||||
.spawn_thread(
|
||||
config,
|
||||
InitialHistory::New,
|
||||
Arc::clone(&self.state.auth_manager),
|
||||
self.agent_control(),
|
||||
)
|
||||
.start_thread(config, InitialHistory::New, self.agent_control(), None)
|
||||
.await
|
||||
}
|
||||
|
||||
@@ -210,10 +206,9 @@ impl ThreadManager {
|
||||
&self,
|
||||
config: Config,
|
||||
rollout_path: PathBuf,
|
||||
auth_manager: Arc<AuthManager>,
|
||||
) -> CodexResult<NewThread> {
|
||||
let initial_history = RolloutRecorder::get_rollout_history(&rollout_path).await?;
|
||||
self.resume_thread_with_history(config, initial_history, auth_manager)
|
||||
self.resume_thread_with_history(config, initial_history)
|
||||
.await
|
||||
}
|
||||
|
||||
@@ -221,10 +216,9 @@ impl ThreadManager {
|
||||
&self,
|
||||
config: Config,
|
||||
initial_history: InitialHistory,
|
||||
auth_manager: Arc<AuthManager>,
|
||||
) -> CodexResult<NewThread> {
|
||||
self.state
|
||||
.spawn_thread(config, initial_history, auth_manager, self.agent_control())
|
||||
.start_thread(config, initial_history, self.agent_control(), None)
|
||||
.await
|
||||
}
|
||||
|
||||
@@ -257,12 +251,7 @@ impl ThreadManager {
|
||||
let history = RolloutRecorder::get_rollout_history(&path).await?;
|
||||
let history = truncate_before_nth_user_message(history, nth_user_message);
|
||||
self.state
|
||||
.spawn_thread(
|
||||
config,
|
||||
history,
|
||||
Arc::clone(&self.state.auth_manager),
|
||||
self.agent_control(),
|
||||
)
|
||||
.start_thread(config, history, self.agent_control(), None)
|
||||
.await
|
||||
}
|
||||
|
||||
@@ -308,38 +297,24 @@ impl ThreadManagerState {
|
||||
self.threads.write().await.remove(thread_id)
|
||||
}
|
||||
|
||||
/// Spawn a new thread with no history using a provided config.
|
||||
pub(crate) async fn spawn_new_thread(
|
||||
&self,
|
||||
config: Config,
|
||||
agent_control: AgentControl,
|
||||
) -> CodexResult<NewThread> {
|
||||
self.spawn_thread(
|
||||
config,
|
||||
InitialHistory::New,
|
||||
Arc::clone(&self.auth_manager),
|
||||
agent_control,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
/// Spawn a new thread with optional history and register it with the manager.
|
||||
pub(crate) async fn spawn_thread(
|
||||
/// Spawn a new thread with optional history and register it with the manager. If
|
||||
/// `session_source_override` is not defined, fallback on the current session_source.
|
||||
pub(crate) async fn start_thread(
|
||||
&self,
|
||||
config: Config,
|
||||
initial_history: InitialHistory,
|
||||
auth_manager: Arc<AuthManager>,
|
||||
agent_control: AgentControl,
|
||||
session_source_override: Option<SessionSource>,
|
||||
) -> CodexResult<NewThread> {
|
||||
let CodexSpawnOk {
|
||||
codex, thread_id, ..
|
||||
} = Codex::spawn(
|
||||
config,
|
||||
auth_manager,
|
||||
self.auth_manager.clone(),
|
||||
Arc::clone(&self.models_manager),
|
||||
Arc::clone(&self.skills_manager),
|
||||
initial_history,
|
||||
self.session_source.clone(),
|
||||
session_source_override.unwrap_or_else(|| self.session_source.clone()),
|
||||
agent_control,
|
||||
)
|
||||
.await?;
|
||||
|
||||
@@ -77,6 +77,8 @@ impl ToolHandler for CollabHandler {
|
||||
mod spawn {
|
||||
use super::*;
|
||||
use crate::agent::AgentRole;
|
||||
use codex_protocol::protocol::SessionSource;
|
||||
use codex_protocol::protocol::SubAgentSource;
|
||||
use std::sync::Arc;
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
@@ -122,7 +124,11 @@ mod spawn {
|
||||
let result = session
|
||||
.services
|
||||
.agent_control
|
||||
.spawn_agent(config, prompt.clone())
|
||||
.spawn_agent(
|
||||
config,
|
||||
Some(prompt.clone()),
|
||||
Some(SessionSource::SubAgent(SubAgentSource::Collab)),
|
||||
)
|
||||
.await
|
||||
.map_err(collab_spawn_error);
|
||||
let (new_thread_id, status) = match &result {
|
||||
|
||||
@@ -165,9 +165,8 @@ impl TestCodexBuilder {
|
||||
|
||||
let new_conversation = match resume_from {
|
||||
Some(path) => {
|
||||
let auth_manager = codex_core::AuthManager::from_auth_for_testing(auth);
|
||||
thread_manager
|
||||
.resume_thread_from_rollout(config.clone(), path, auth_manager)
|
||||
.resume_thread_from_rollout(config.clone(), path)
|
||||
.await?
|
||||
}
|
||||
None => thread_manager.start_thread(config.clone()).await?,
|
||||
|
||||
@@ -268,14 +268,12 @@ async fn resume_includes_initial_messages_and_sends_prior_items() {
|
||||
config.model_provider.clone(),
|
||||
config.codex_home.clone(),
|
||||
);
|
||||
let auth_manager =
|
||||
codex_core::AuthManager::from_auth_for_testing(CodexAuth::from_api_key("Test API Key"));
|
||||
let NewThread {
|
||||
thread: codex,
|
||||
session_configured,
|
||||
..
|
||||
} = thread_manager
|
||||
.resume_thread_from_rollout(config, session_path.clone(), auth_manager)
|
||||
.resume_thread_from_rollout(config, session_path.clone())
|
||||
.await
|
||||
.expect("resume conversation");
|
||||
|
||||
|
||||
@@ -1,16 +1,8 @@
|
||||
use codex_core::config::Constrained;
|
||||
use codex_core::protocol::AskForApproval;
|
||||
use codex_core::protocol::EventMsg;
|
||||
use codex_core::protocol::Op;
|
||||
use codex_core::protocol::ReviewDecision;
|
||||
use codex_core::protocol::ReviewRequest;
|
||||
use codex_core::protocol::ReviewTarget;
|
||||
use codex_core::protocol::SandboxPolicy;
|
||||
use codex_core::sandboxing::SandboxPermissions;
|
||||
use core_test_support::responses::ev_apply_patch_function_call;
|
||||
use core_test_support::responses::ev_assistant_message;
|
||||
use core_test_support::responses::ev_completed;
|
||||
use core_test_support::responses::ev_function_call;
|
||||
use core_test_support::responses::ev_reasoning_item_added;
|
||||
use core_test_support::responses::ev_reasoning_summary_text_delta;
|
||||
use core_test_support::responses::ev_response_created;
|
||||
@@ -22,168 +14,8 @@ use core_test_support::test_codex::test_codex;
|
||||
use core_test_support::wait_for_event;
|
||||
use pretty_assertions::assert_eq;
|
||||
|
||||
/// Delegate should surface ExecApprovalRequest from sub-agent and proceed
|
||||
/// after parent submits an approval decision.
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn codex_delegate_forwards_exec_approval_and_proceeds_on_approval() {
|
||||
skip_if_no_network!();
|
||||
|
||||
// Sub-agent turn 1: emit a shell_command function_call requiring approval, then complete.
|
||||
let call_id = "call-exec-1";
|
||||
let args = serde_json::json!({
|
||||
"command": "rm -rf delegated",
|
||||
"timeout_ms": 1000,
|
||||
"sandbox_permissions": SandboxPermissions::RequireEscalated,
|
||||
})
|
||||
.to_string();
|
||||
let sse1 = sse(vec![
|
||||
ev_response_created("resp-1"),
|
||||
ev_function_call(call_id, "shell_command", &args),
|
||||
ev_completed("resp-1"),
|
||||
]);
|
||||
|
||||
// Sub-agent turn 2: return structured review output and complete.
|
||||
let review_json = serde_json::json!({
|
||||
"findings": [],
|
||||
"overall_correctness": "ok",
|
||||
"overall_explanation": "delegate approved exec",
|
||||
"overall_confidence_score": 0.5
|
||||
})
|
||||
.to_string();
|
||||
let sse2 = sse(vec![
|
||||
ev_response_created("resp-2"),
|
||||
ev_assistant_message("msg-1", &review_json),
|
||||
ev_completed("resp-2"),
|
||||
]);
|
||||
|
||||
let server = start_mock_server().await;
|
||||
mount_sse_sequence(&server, vec![sse1, sse2]).await;
|
||||
|
||||
// Build a conversation configured to require approvals so the delegate
|
||||
// routes ExecApprovalRequest via the parent.
|
||||
let mut builder = test_codex().with_model("gpt-5.1").with_config(|config| {
|
||||
config.approval_policy = Constrained::allow_any(AskForApproval::OnRequest);
|
||||
config.sandbox_policy = Constrained::allow_any(SandboxPolicy::ReadOnly);
|
||||
});
|
||||
let test = builder.build(&server).await.expect("build test codex");
|
||||
|
||||
// Kick off review (sub-agent starts internally).
|
||||
test.codex
|
||||
.submit(Op::Review {
|
||||
review_request: ReviewRequest {
|
||||
target: ReviewTarget::Custom {
|
||||
instructions: "Please review".to_string(),
|
||||
},
|
||||
user_facing_hint: None,
|
||||
},
|
||||
})
|
||||
.await
|
||||
.expect("submit review");
|
||||
|
||||
// Lifecycle: Entered -> ExecApprovalRequest -> Exited(Some) -> TurnComplete.
|
||||
wait_for_event(&test.codex, |ev| {
|
||||
matches!(ev, EventMsg::EnteredReviewMode(_))
|
||||
})
|
||||
.await;
|
||||
|
||||
// Expect parent-side approval request (forwarded by delegate).
|
||||
wait_for_event(&test.codex, |ev| {
|
||||
matches!(ev, EventMsg::ExecApprovalRequest(_))
|
||||
})
|
||||
.await;
|
||||
|
||||
// Approve via parent; id "0" is the active sub_id in tests.
|
||||
test.codex
|
||||
.submit(Op::ExecApproval {
|
||||
id: "0".into(),
|
||||
decision: ReviewDecision::Approved,
|
||||
})
|
||||
.await
|
||||
.expect("submit exec approval");
|
||||
|
||||
wait_for_event(&test.codex, |ev| {
|
||||
matches!(ev, EventMsg::ExitedReviewMode(_))
|
||||
})
|
||||
.await;
|
||||
wait_for_event(&test.codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
|
||||
}
|
||||
|
||||
/// Delegate should surface ApplyPatchApprovalRequest and honor parent decision
|
||||
/// so the sub-agent can proceed to completion.
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn codex_delegate_forwards_patch_approval_and_proceeds_on_decision() {
|
||||
skip_if_no_network!();
|
||||
|
||||
let call_id = "call-patch-1";
|
||||
let patch = "*** Begin Patch\n*** Add File: delegated.txt\n+hello\n*** End Patch\n";
|
||||
let sse1 = sse(vec![
|
||||
ev_response_created("resp-1"),
|
||||
ev_apply_patch_function_call(call_id, patch),
|
||||
ev_completed("resp-1"),
|
||||
]);
|
||||
let review_json = serde_json::json!({
|
||||
"findings": [],
|
||||
"overall_correctness": "ok",
|
||||
"overall_explanation": "delegate patch handled",
|
||||
"overall_confidence_score": 0.5
|
||||
})
|
||||
.to_string();
|
||||
let sse2 = sse(vec![
|
||||
ev_response_created("resp-2"),
|
||||
ev_assistant_message("msg-1", &review_json),
|
||||
ev_completed("resp-2"),
|
||||
]);
|
||||
|
||||
let server = start_mock_server().await;
|
||||
mount_sse_sequence(&server, vec![sse1, sse2]).await;
|
||||
|
||||
let mut builder = test_codex().with_model("gpt-5.1").with_config(|config| {
|
||||
config.approval_policy = Constrained::allow_any(AskForApproval::OnRequest);
|
||||
// Use a restricted sandbox so patch approval is required
|
||||
config.sandbox_policy = Constrained::allow_any(SandboxPolicy::ReadOnly);
|
||||
config.include_apply_patch_tool = true;
|
||||
});
|
||||
let test = builder.build(&server).await.expect("build test codex");
|
||||
|
||||
test.codex
|
||||
.submit(Op::Review {
|
||||
review_request: ReviewRequest {
|
||||
target: ReviewTarget::Custom {
|
||||
instructions: "Please review".to_string(),
|
||||
},
|
||||
user_facing_hint: None,
|
||||
},
|
||||
})
|
||||
.await
|
||||
.expect("submit review");
|
||||
|
||||
wait_for_event(&test.codex, |ev| {
|
||||
matches!(ev, EventMsg::EnteredReviewMode(_))
|
||||
})
|
||||
.await;
|
||||
wait_for_event(&test.codex, |ev| {
|
||||
matches!(ev, EventMsg::ApplyPatchApprovalRequest(_))
|
||||
})
|
||||
.await;
|
||||
|
||||
// Deny via parent so delegate can continue; id "0" is the active sub_id in tests.
|
||||
test.codex
|
||||
.submit(Op::PatchApproval {
|
||||
id: "0".into(),
|
||||
decision: ReviewDecision::Denied,
|
||||
})
|
||||
.await
|
||||
.expect("submit patch approval");
|
||||
|
||||
wait_for_event(&test.codex, |ev| {
|
||||
matches!(ev, EventMsg::ExitedReviewMode(_))
|
||||
})
|
||||
.await;
|
||||
wait_for_event(&test.codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn codex_delegate_ignores_legacy_deltas() {
|
||||
async fn review_subagent_ignores_legacy_deltas() {
|
||||
skip_if_no_network!();
|
||||
|
||||
// Single response with reasoning summary deltas.
|
||||
|
||||
@@ -1016,10 +1016,8 @@ async fn resume_conversation(
|
||||
config: &Config,
|
||||
path: std::path::PathBuf,
|
||||
) -> Arc<CodexThread> {
|
||||
let auth_manager =
|
||||
codex_core::AuthManager::from_auth_for_testing(CodexAuth::from_api_key("dummy"));
|
||||
let NewThread { thread, .. } = manager
|
||||
.resume_thread_from_rollout(config.clone(), path, auth_manager)
|
||||
.resume_thread_from_rollout(config.clone(), path)
|
||||
.await
|
||||
.expect("resume conversation");
|
||||
thread
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
#![allow(clippy::unwrap_used, clippy::expect_used)]
|
||||
|
||||
use codex_core::AuthManager;
|
||||
use codex_core::CodexAuth;
|
||||
use codex_core::NewThread;
|
||||
use codex_core::ThreadManager;
|
||||
@@ -60,14 +59,13 @@ async fn emits_warning_when_resumed_model_differs() {
|
||||
CodexAuth::from_api_key("test"),
|
||||
config.model_provider.clone(),
|
||||
);
|
||||
let auth_manager = AuthManager::from_auth_for_testing(CodexAuth::from_api_key("test"));
|
||||
|
||||
// Act: resume the conversation.
|
||||
let NewThread {
|
||||
thread: conversation,
|
||||
..
|
||||
} = thread_manager
|
||||
.resume_thread_with_history(config, initial_history, auth_manager)
|
||||
.resume_thread_with_history(config, initial_history)
|
||||
.await
|
||||
.expect("resume conversation");
|
||||
|
||||
|
||||
@@ -921,10 +921,8 @@ where
|
||||
CodexAuth::from_api_key("Test API Key"),
|
||||
config.model_provider.clone(),
|
||||
);
|
||||
let auth_manager =
|
||||
codex_core::AuthManager::from_auth_for_testing(CodexAuth::from_api_key("Test API Key"));
|
||||
thread_manager
|
||||
.resume_thread_from_rollout(config, resume_path, auth_manager)
|
||||
.resume_thread_from_rollout(config, resume_path)
|
||||
.await
|
||||
.expect("resume conversation")
|
||||
.thread
|
||||
|
||||
@@ -1459,6 +1459,7 @@ pub enum SessionSource {
|
||||
pub enum SubAgentSource {
|
||||
Review,
|
||||
Compact,
|
||||
Collab,
|
||||
Other(String),
|
||||
}
|
||||
|
||||
@@ -1480,6 +1481,7 @@ impl fmt::Display for SubAgentSource {
|
||||
match self {
|
||||
SubAgentSource::Review => f.write_str("review"),
|
||||
SubAgentSource::Compact => f.write_str("compact"),
|
||||
SubAgentSource::Collab => f.write_str("collab"),
|
||||
SubAgentSource::Other(other) => f.write_str(other),
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user