mirror of
https://github.com/openai/codex.git
synced 2026-03-06 14:43:21 +00:00
Compare commits
1 Commits
fix/notify
...
jif/review
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d624b79177 |
@@ -2,9 +2,11 @@ use crate::agent::AgentStatus;
|
||||
use crate::agent::guards::Guards;
|
||||
use crate::error::CodexErr;
|
||||
use crate::error::Result as CodexResult;
|
||||
use crate::protocol::Event;
|
||||
use crate::thread_manager::ThreadManagerState;
|
||||
use codex_protocol::ThreadId;
|
||||
use codex_protocol::protocol::Op;
|
||||
use codex_protocol::protocol::SessionSource;
|
||||
use codex_protocol::user_input::UserInput;
|
||||
use std::sync::Arc;
|
||||
use std::sync::Weak;
|
||||
@@ -34,18 +36,30 @@ impl AgentControl {
|
||||
}
|
||||
}
|
||||
|
||||
/// Spawn a new agent thread and submit the initial prompt.
|
||||
/// Spawn a new agent thread and optionally submit initial input.
|
||||
pub(crate) async fn spawn_agent(
|
||||
&self,
|
||||
config: crate::config::Config,
|
||||
prompt: String,
|
||||
session_source: Option<codex_protocol::protocol::SessionSource>,
|
||||
prompt: Option<String>,
|
||||
session_source_override: Option<SessionSource>,
|
||||
) -> CodexResult<ThreadId> {
|
||||
self.spawn_agent_with_options(config, prompt, session_source_override, true)
|
||||
.await
|
||||
}
|
||||
|
||||
/// Spawn a new agent thread with explicit spawn options.
|
||||
pub(crate) async fn spawn_agent_with_options(
|
||||
&self,
|
||||
config: crate::config::Config,
|
||||
prompt: Option<String>,
|
||||
session_source_override: Option<SessionSource>,
|
||||
announce_thread_created: bool,
|
||||
) -> CodexResult<ThreadId> {
|
||||
let state = self.upgrade()?;
|
||||
let reservation = self.state.reserve_spawn_slot(config.agent_max_threads)?;
|
||||
|
||||
// The same `AgentControl` is sent to spawn the thread.
|
||||
let new_thread = match session_source {
|
||||
let new_thread = match session_source_override {
|
||||
Some(session_source) => {
|
||||
state
|
||||
.spawn_new_thread_with_source(config, self.clone(), session_source)
|
||||
@@ -55,12 +69,16 @@ impl AgentControl {
|
||||
};
|
||||
reservation.commit(new_thread.thread_id);
|
||||
|
||||
// Notify a new thread has been created. This notification will be processed by clients
|
||||
// to subscribe or drain this newly created thread.
|
||||
// TODO(jif) add helper for drain
|
||||
state.notify_thread_created(new_thread.thread_id);
|
||||
if announce_thread_created {
|
||||
// Notify a new thread has been created. This notification will be processed by clients
|
||||
// to subscribe or drain this newly created thread.
|
||||
// TODO(jif) add helper for drain
|
||||
state.notify_thread_created(new_thread.thread_id);
|
||||
}
|
||||
|
||||
self.send_prompt(new_thread.thread_id, prompt).await?;
|
||||
if let Some(prompt) = prompt {
|
||||
self.send_prompt(new_thread.thread_id, prompt).await?;
|
||||
}
|
||||
|
||||
Ok(new_thread.thread_id)
|
||||
}
|
||||
@@ -71,20 +89,37 @@ impl AgentControl {
|
||||
agent_id: ThreadId,
|
||||
prompt: String,
|
||||
) -> CodexResult<String> {
|
||||
self.send_input(
|
||||
agent_id,
|
||||
vec![UserInput::Text {
|
||||
text: prompt,
|
||||
// Agent control prompts are plain text with no UI text elements.
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
/// Send full `user` input items to an existing agent thread.
|
||||
pub(crate) async fn send_input(
|
||||
&self,
|
||||
agent_id: ThreadId,
|
||||
input: Vec<UserInput>,
|
||||
) -> CodexResult<String> {
|
||||
self.send_op(
|
||||
agent_id,
|
||||
Op::UserInput {
|
||||
items: input,
|
||||
final_output_json_schema: None,
|
||||
},
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
/// Send an operation to an existing agent thread.
|
||||
pub(crate) async fn send_op(&self, agent_id: ThreadId, op: Op) -> CodexResult<String> {
|
||||
let state = self.upgrade()?;
|
||||
let result = state
|
||||
.send_op(
|
||||
agent_id,
|
||||
Op::UserInput {
|
||||
items: vec![UserInput::Text {
|
||||
text: prompt,
|
||||
// Agent control prompts are plain text with no UI text elements.
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
final_output_json_schema: None,
|
||||
},
|
||||
)
|
||||
.await;
|
||||
let result = state.send_op(agent_id, op).await;
|
||||
if matches!(result, Err(CodexErr::InternalAgentDied)) {
|
||||
let _ = state.remove_thread(&agent_id).await;
|
||||
self.state.release_spawned_thread(agent_id);
|
||||
@@ -92,6 +127,13 @@ impl AgentControl {
|
||||
result
|
||||
}
|
||||
|
||||
/// Read the next event from an existing agent thread.
|
||||
pub(crate) async fn next_event(&self, agent_id: ThreadId) -> CodexResult<Event> {
|
||||
let state = self.upgrade()?;
|
||||
let thread = state.get_thread(agent_id).await?;
|
||||
thread.next_event().await
|
||||
}
|
||||
|
||||
/// Interrupt the current task for an existing agent thread.
|
||||
pub(crate) async fn interrupt_agent(&self, agent_id: ThreadId) -> CodexResult<String> {
|
||||
let state = self.upgrade()?;
|
||||
@@ -278,7 +320,7 @@ mod tests {
|
||||
let control = AgentControl::default();
|
||||
let (_home, config) = test_config().await;
|
||||
let err = control
|
||||
.spawn_agent(config, "hello".to_string(), None)
|
||||
.spawn_agent(config, Some("hello".to_string()), None)
|
||||
.await
|
||||
.expect_err("spawn_agent should fail without a manager");
|
||||
assert_eq!(
|
||||
@@ -375,12 +417,49 @@ mod tests {
|
||||
assert_eq!(captured, Some(expected));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn send_input_submits_user_items() {
|
||||
let harness = AgentControlHarness::new().await;
|
||||
let (thread_id, _thread) = harness.start_thread().await;
|
||||
let input = vec![
|
||||
UserInput::Text {
|
||||
text: "hello".to_string(),
|
||||
text_elements: Vec::new(),
|
||||
},
|
||||
UserInput::Text {
|
||||
text: "world".to_string(),
|
||||
text_elements: Vec::new(),
|
||||
},
|
||||
];
|
||||
|
||||
let submission_id = harness
|
||||
.control
|
||||
.send_input(thread_id, input.clone())
|
||||
.await
|
||||
.expect("send_input should succeed");
|
||||
assert!(!submission_id.is_empty());
|
||||
|
||||
let expected = (
|
||||
thread_id,
|
||||
Op::UserInput {
|
||||
items: input,
|
||||
final_output_json_schema: None,
|
||||
},
|
||||
);
|
||||
let captured = harness
|
||||
.manager
|
||||
.captured_ops()
|
||||
.into_iter()
|
||||
.find(|entry| *entry == expected);
|
||||
assert_eq!(captured, Some(expected));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn spawn_agent_creates_thread_and_sends_prompt() {
|
||||
let harness = AgentControlHarness::new().await;
|
||||
let thread_id = harness
|
||||
.control
|
||||
.spawn_agent(harness.config.clone(), "spawned".to_string(), None)
|
||||
.spawn_agent(harness.config.clone(), Some("spawned".to_string()), None)
|
||||
.await
|
||||
.expect("spawn_agent should succeed");
|
||||
let _thread = harness
|
||||
@@ -406,6 +485,61 @@ mod tests {
|
||||
assert_eq!(captured, Some(expected));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn spawn_agent_without_prompt_does_not_submit_input() {
|
||||
let harness = AgentControlHarness::new().await;
|
||||
let thread_id = harness
|
||||
.control
|
||||
.spawn_agent(harness.config.clone(), None, None)
|
||||
.await
|
||||
.expect("spawn_agent should succeed");
|
||||
let _thread = harness
|
||||
.manager
|
||||
.get_thread(thread_id)
|
||||
.await
|
||||
.expect("thread should be registered");
|
||||
let captured = harness.manager.captured_ops().into_iter().find(|(id, op)| {
|
||||
*id == thread_id
|
||||
&& matches!(
|
||||
op,
|
||||
Op::UserInput {
|
||||
final_output_json_schema: None,
|
||||
..
|
||||
}
|
||||
)
|
||||
});
|
||||
assert_eq!(captured, None);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn spawn_agent_with_options_can_skip_created_notification() {
|
||||
let harness = AgentControlHarness::new().await;
|
||||
let mut created_rx = harness.manager.subscribe_thread_created();
|
||||
|
||||
let thread_id = harness
|
||||
.control
|
||||
.spawn_agent_with_options(
|
||||
harness.config.clone(),
|
||||
None,
|
||||
Some(SessionSource::SubAgent(
|
||||
codex_protocol::protocol::SubAgentSource::Review,
|
||||
)),
|
||||
false,
|
||||
)
|
||||
.await
|
||||
.expect("spawn_agent_with_options should succeed");
|
||||
|
||||
let result =
|
||||
tokio::time::timeout(std::time::Duration::from_millis(100), created_rx.recv()).await;
|
||||
assert!(result.is_err(), "did not expect thread_created event");
|
||||
|
||||
let _ = harness
|
||||
.control
|
||||
.shutdown_agent(thread_id)
|
||||
.await
|
||||
.expect("shutdown should succeed");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn spawn_agent_respects_max_threads_limit() {
|
||||
let max_threads = 1usize;
|
||||
@@ -427,12 +561,12 @@ mod tests {
|
||||
.expect("start thread");
|
||||
|
||||
let first_agent_id = control
|
||||
.spawn_agent(config.clone(), "hello".to_string(), None)
|
||||
.spawn_agent(config.clone(), Some("hello".to_string()), None)
|
||||
.await
|
||||
.expect("spawn_agent should succeed");
|
||||
|
||||
let err = control
|
||||
.spawn_agent(config, "hello again".to_string(), None)
|
||||
.spawn_agent(config, Some("hello again".to_string()), None)
|
||||
.await
|
||||
.expect_err("spawn_agent should respect max threads");
|
||||
let CodexErr::AgentLimitReached {
|
||||
@@ -465,7 +599,7 @@ mod tests {
|
||||
let control = manager.agent_control();
|
||||
|
||||
let first_agent_id = control
|
||||
.spawn_agent(config.clone(), "hello".to_string(), None)
|
||||
.spawn_agent(config.clone(), Some("hello".to_string()), None)
|
||||
.await
|
||||
.expect("spawn_agent should succeed");
|
||||
let _ = control
|
||||
@@ -474,7 +608,7 @@ mod tests {
|
||||
.expect("shutdown agent");
|
||||
|
||||
let second_agent_id = control
|
||||
.spawn_agent(config.clone(), "hello again".to_string(), None)
|
||||
.spawn_agent(config.clone(), Some("hello again".to_string()), None)
|
||||
.await
|
||||
.expect("spawn_agent should succeed after shutdown");
|
||||
let _ = control
|
||||
@@ -500,12 +634,12 @@ mod tests {
|
||||
let cloned = control.clone();
|
||||
|
||||
let first_agent_id = cloned
|
||||
.spawn_agent(config.clone(), "hello".to_string(), None)
|
||||
.spawn_agent(config.clone(), Some("hello".to_string()), None)
|
||||
.await
|
||||
.expect("spawn_agent should succeed");
|
||||
|
||||
let err = control
|
||||
.spawn_agent(config, "hello again".to_string(), None)
|
||||
.spawn_agent(config, Some("hello again".to_string()), None)
|
||||
.await
|
||||
.expect_err("spawn_agent should respect shared guard");
|
||||
let CodexErr::AgentLimitReached { max_threads } = err else {
|
||||
|
||||
@@ -4496,8 +4496,6 @@ pub(super) fn get_last_assistant_message_from_turn(responses: &[ResponseItem]) -
|
||||
|
||||
#[cfg(test)]
|
||||
pub(crate) use tests::make_session_and_context;
|
||||
#[cfg(test)]
|
||||
pub(crate) use tests::make_session_and_context_with_rx;
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
|
||||
@@ -1,520 +0,0 @@
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::AtomicU64;
|
||||
|
||||
use async_channel::Receiver;
|
||||
use async_channel::Sender;
|
||||
use codex_async_utils::OrCancelExt;
|
||||
use codex_protocol::protocol::ApplyPatchApprovalRequestEvent;
|
||||
use codex_protocol::protocol::Event;
|
||||
use codex_protocol::protocol::EventMsg;
|
||||
use codex_protocol::protocol::ExecApprovalRequestEvent;
|
||||
use codex_protocol::protocol::Op;
|
||||
use codex_protocol::protocol::RequestUserInputEvent;
|
||||
use codex_protocol::protocol::SessionSource;
|
||||
use codex_protocol::protocol::SubAgentSource;
|
||||
use codex_protocol::protocol::Submission;
|
||||
use codex_protocol::request_user_input::RequestUserInputArgs;
|
||||
use codex_protocol::request_user_input::RequestUserInputResponse;
|
||||
use codex_protocol::user_input::UserInput;
|
||||
use std::time::Duration;
|
||||
use tokio::time::timeout;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
|
||||
use crate::AuthManager;
|
||||
use crate::codex::Codex;
|
||||
use crate::codex::CodexSpawnOk;
|
||||
use crate::codex::SUBMISSION_CHANNEL_CAPACITY;
|
||||
use crate::codex::Session;
|
||||
use crate::codex::TurnContext;
|
||||
use crate::config::Config;
|
||||
use crate::error::CodexErr;
|
||||
use crate::models_manager::manager::ModelsManager;
|
||||
use codex_protocol::protocol::InitialHistory;
|
||||
|
||||
/// Start an interactive sub-Codex thread and return IO channels.
|
||||
///
|
||||
/// The returned `events_rx` yields non-approval events emitted by the sub-agent.
|
||||
/// Approval requests are handled via `parent_session` and are not surfaced.
|
||||
/// The returned `ops_tx` allows the caller to submit additional `Op`s to the sub-agent.
|
||||
pub(crate) async fn run_codex_thread_interactive(
|
||||
config: Config,
|
||||
auth_manager: Arc<AuthManager>,
|
||||
models_manager: Arc<ModelsManager>,
|
||||
parent_session: Arc<Session>,
|
||||
parent_ctx: Arc<TurnContext>,
|
||||
cancel_token: CancellationToken,
|
||||
initial_history: Option<InitialHistory>,
|
||||
) -> Result<Codex, CodexErr> {
|
||||
let (tx_sub, rx_sub) = async_channel::bounded(SUBMISSION_CHANNEL_CAPACITY);
|
||||
let (tx_ops, rx_ops) = async_channel::bounded(SUBMISSION_CHANNEL_CAPACITY);
|
||||
|
||||
let CodexSpawnOk { codex, .. } = Codex::spawn(
|
||||
config,
|
||||
auth_manager,
|
||||
models_manager,
|
||||
Arc::clone(&parent_session.services.skills_manager),
|
||||
initial_history.unwrap_or(InitialHistory::New),
|
||||
SessionSource::SubAgent(SubAgentSource::Review),
|
||||
parent_session.services.agent_control.clone(),
|
||||
Vec::new(),
|
||||
)
|
||||
.await?;
|
||||
let codex = Arc::new(codex);
|
||||
|
||||
// Use a child token so parent cancel cascades but we can scope it to this task
|
||||
let cancel_token_events = cancel_token.child_token();
|
||||
let cancel_token_ops = cancel_token.child_token();
|
||||
|
||||
// Forward events from the sub-agent to the consumer, filtering approvals and
|
||||
// routing them to the parent session for decisions.
|
||||
let parent_session_clone = Arc::clone(&parent_session);
|
||||
let parent_ctx_clone = Arc::clone(&parent_ctx);
|
||||
let codex_for_events = Arc::clone(&codex);
|
||||
tokio::spawn(async move {
|
||||
forward_events(
|
||||
codex_for_events,
|
||||
tx_sub,
|
||||
parent_session_clone,
|
||||
parent_ctx_clone,
|
||||
cancel_token_events,
|
||||
)
|
||||
.await;
|
||||
});
|
||||
|
||||
// Forward ops from the caller to the sub-agent.
|
||||
let codex_for_ops = Arc::clone(&codex);
|
||||
tokio::spawn(async move {
|
||||
forward_ops(codex_for_ops, rx_ops, cancel_token_ops).await;
|
||||
});
|
||||
|
||||
Ok(Codex {
|
||||
next_id: AtomicU64::new(0),
|
||||
tx_sub: tx_ops,
|
||||
rx_event: rx_sub,
|
||||
agent_status: codex.agent_status.clone(),
|
||||
session: Arc::clone(&codex.session),
|
||||
})
|
||||
}
|
||||
|
||||
/// Convenience wrapper for one-time use with an initial prompt.
|
||||
///
|
||||
/// Internally calls the interactive variant, then immediately submits the provided input.
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub(crate) async fn run_codex_thread_one_shot(
|
||||
config: Config,
|
||||
auth_manager: Arc<AuthManager>,
|
||||
models_manager: Arc<ModelsManager>,
|
||||
input: Vec<UserInput>,
|
||||
parent_session: Arc<Session>,
|
||||
parent_ctx: Arc<TurnContext>,
|
||||
cancel_token: CancellationToken,
|
||||
initial_history: Option<InitialHistory>,
|
||||
) -> Result<Codex, CodexErr> {
|
||||
// Use a child token so we can stop the delegate after completion without
|
||||
// requiring the caller to cancel the parent token.
|
||||
let child_cancel = cancel_token.child_token();
|
||||
let io = run_codex_thread_interactive(
|
||||
config,
|
||||
auth_manager,
|
||||
models_manager,
|
||||
parent_session,
|
||||
parent_ctx,
|
||||
child_cancel.clone(),
|
||||
initial_history,
|
||||
)
|
||||
.await?;
|
||||
|
||||
// Send the initial input to kick off the one-shot turn.
|
||||
io.submit(Op::UserInput {
|
||||
items: input,
|
||||
final_output_json_schema: None,
|
||||
})
|
||||
.await?;
|
||||
|
||||
// Bridge events so we can observe completion and shut down automatically.
|
||||
let (tx_bridge, rx_bridge) = async_channel::bounded(SUBMISSION_CHANNEL_CAPACITY);
|
||||
let ops_tx = io.tx_sub.clone();
|
||||
let agent_status = io.agent_status.clone();
|
||||
let session = Arc::clone(&io.session);
|
||||
let io_for_bridge = io;
|
||||
tokio::spawn(async move {
|
||||
while let Ok(event) = io_for_bridge.next_event().await {
|
||||
let should_shutdown = matches!(
|
||||
event.msg,
|
||||
EventMsg::TurnComplete(_) | EventMsg::TurnAborted(_)
|
||||
);
|
||||
let _ = tx_bridge.send(event).await;
|
||||
if should_shutdown {
|
||||
let _ = ops_tx
|
||||
.send(Submission {
|
||||
id: "shutdown".to_string(),
|
||||
op: Op::Shutdown {},
|
||||
})
|
||||
.await;
|
||||
child_cancel.cancel();
|
||||
break;
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
// For one-shot usage, return a closed `tx_sub` so callers cannot submit
|
||||
// additional ops after the initial request. Create a channel and drop the
|
||||
// receiver to close it immediately.
|
||||
let (tx_closed, rx_closed) = async_channel::bounded(SUBMISSION_CHANNEL_CAPACITY);
|
||||
drop(rx_closed);
|
||||
|
||||
Ok(Codex {
|
||||
next_id: AtomicU64::new(0),
|
||||
rx_event: rx_bridge,
|
||||
tx_sub: tx_closed,
|
||||
agent_status,
|
||||
session,
|
||||
})
|
||||
}
|
||||
|
||||
async fn forward_events(
|
||||
codex: Arc<Codex>,
|
||||
tx_sub: Sender<Event>,
|
||||
parent_session: Arc<Session>,
|
||||
parent_ctx: Arc<TurnContext>,
|
||||
cancel_token: CancellationToken,
|
||||
) {
|
||||
let cancelled = cancel_token.cancelled();
|
||||
tokio::pin!(cancelled);
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
_ = &mut cancelled => {
|
||||
shutdown_delegate(&codex).await;
|
||||
break;
|
||||
}
|
||||
event = codex.next_event() => {
|
||||
let event = match event {
|
||||
Ok(event) => event,
|
||||
Err(_) => break,
|
||||
};
|
||||
match event {
|
||||
// ignore all legacy delta events
|
||||
Event {
|
||||
id: _,
|
||||
msg: EventMsg::AgentMessageDelta(_) | EventMsg::AgentReasoningDelta(_),
|
||||
} => {}
|
||||
Event {
|
||||
id: _,
|
||||
msg: EventMsg::TokenCount(_),
|
||||
} => {}
|
||||
Event {
|
||||
id: _,
|
||||
msg: EventMsg::SessionConfigured(_),
|
||||
} => {}
|
||||
Event {
|
||||
id: _,
|
||||
msg: EventMsg::ThreadNameUpdated(_),
|
||||
} => {}
|
||||
Event {
|
||||
id,
|
||||
msg: EventMsg::ExecApprovalRequest(event),
|
||||
} => {
|
||||
// Initiate approval via parent session; do not surface to consumer.
|
||||
handle_exec_approval(
|
||||
&codex,
|
||||
id,
|
||||
&parent_session,
|
||||
&parent_ctx,
|
||||
event,
|
||||
&cancel_token,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
Event {
|
||||
id,
|
||||
msg: EventMsg::ApplyPatchApprovalRequest(event),
|
||||
} => {
|
||||
handle_patch_approval(
|
||||
&codex,
|
||||
id,
|
||||
&parent_session,
|
||||
&parent_ctx,
|
||||
event,
|
||||
&cancel_token,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
Event {
|
||||
id,
|
||||
msg: EventMsg::RequestUserInput(event),
|
||||
} => {
|
||||
handle_request_user_input(
|
||||
&codex,
|
||||
id,
|
||||
&parent_session,
|
||||
&parent_ctx,
|
||||
event,
|
||||
&cancel_token,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
other => {
|
||||
match tx_sub.send(other).or_cancel(&cancel_token).await {
|
||||
Ok(Ok(())) => {}
|
||||
_ => {
|
||||
shutdown_delegate(&codex).await;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Ask the delegate to stop and drain its events so background sends do not hit a closed channel.
|
||||
async fn shutdown_delegate(codex: &Codex) {
|
||||
let _ = codex.submit(Op::Interrupt).await;
|
||||
let _ = codex.submit(Op::Shutdown {}).await;
|
||||
|
||||
let _ = timeout(Duration::from_millis(500), async {
|
||||
while let Ok(event) = codex.next_event().await {
|
||||
if matches!(
|
||||
event.msg,
|
||||
EventMsg::TurnAborted(_) | EventMsg::TurnComplete(_)
|
||||
) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
})
|
||||
.await;
|
||||
}
|
||||
|
||||
/// Forward ops from a caller to a sub-agent, respecting cancellation.
|
||||
async fn forward_ops(
|
||||
codex: Arc<Codex>,
|
||||
rx_ops: Receiver<Submission>,
|
||||
cancel_token_ops: CancellationToken,
|
||||
) {
|
||||
loop {
|
||||
let op: Op = match rx_ops.recv().or_cancel(&cancel_token_ops).await {
|
||||
Ok(Ok(Submission { id: _, op })) => op,
|
||||
Ok(Err(_)) | Err(_) => break,
|
||||
};
|
||||
let _ = codex.submit(op).await;
|
||||
}
|
||||
}
|
||||
|
||||
/// Handle an ExecApprovalRequest by consulting the parent session and replying.
|
||||
async fn handle_exec_approval(
|
||||
codex: &Codex,
|
||||
id: String,
|
||||
parent_session: &Session,
|
||||
parent_ctx: &TurnContext,
|
||||
event: ExecApprovalRequestEvent,
|
||||
cancel_token: &CancellationToken,
|
||||
) {
|
||||
// Race approval with cancellation and timeout to avoid hangs.
|
||||
let approval_fut = parent_session.request_command_approval(
|
||||
parent_ctx,
|
||||
parent_ctx.sub_id.clone(),
|
||||
event.command,
|
||||
event.cwd,
|
||||
event.reason,
|
||||
event.proposed_execpolicy_amendment,
|
||||
);
|
||||
let decision = await_approval_with_cancel(
|
||||
approval_fut,
|
||||
parent_session,
|
||||
&parent_ctx.sub_id,
|
||||
cancel_token,
|
||||
)
|
||||
.await;
|
||||
|
||||
let _ = codex.submit(Op::ExecApproval { id, decision }).await;
|
||||
}
|
||||
|
||||
/// Handle an ApplyPatchApprovalRequest by consulting the parent session and replying.
|
||||
async fn handle_patch_approval(
|
||||
codex: &Codex,
|
||||
id: String,
|
||||
parent_session: &Session,
|
||||
parent_ctx: &TurnContext,
|
||||
event: ApplyPatchApprovalRequestEvent,
|
||||
cancel_token: &CancellationToken,
|
||||
) {
|
||||
let decision_rx = parent_session
|
||||
.request_patch_approval(
|
||||
parent_ctx,
|
||||
parent_ctx.sub_id.clone(),
|
||||
event.changes,
|
||||
event.reason,
|
||||
event.grant_root,
|
||||
)
|
||||
.await;
|
||||
let decision = await_approval_with_cancel(
|
||||
async move { decision_rx.await.unwrap_or_default() },
|
||||
parent_session,
|
||||
&parent_ctx.sub_id,
|
||||
cancel_token,
|
||||
)
|
||||
.await;
|
||||
let _ = codex.submit(Op::PatchApproval { id, decision }).await;
|
||||
}
|
||||
|
||||
async fn handle_request_user_input(
|
||||
codex: &Codex,
|
||||
id: String,
|
||||
parent_session: &Session,
|
||||
parent_ctx: &TurnContext,
|
||||
event: RequestUserInputEvent,
|
||||
cancel_token: &CancellationToken,
|
||||
) {
|
||||
let args = RequestUserInputArgs {
|
||||
questions: event.questions,
|
||||
};
|
||||
let response_fut =
|
||||
parent_session.request_user_input(parent_ctx, parent_ctx.sub_id.clone(), args);
|
||||
let response = await_user_input_with_cancel(
|
||||
response_fut,
|
||||
parent_session,
|
||||
&parent_ctx.sub_id,
|
||||
cancel_token,
|
||||
)
|
||||
.await;
|
||||
let _ = codex.submit(Op::UserInputAnswer { id, response }).await;
|
||||
}
|
||||
|
||||
async fn await_user_input_with_cancel<F>(
|
||||
fut: F,
|
||||
parent_session: &Session,
|
||||
sub_id: &str,
|
||||
cancel_token: &CancellationToken,
|
||||
) -> RequestUserInputResponse
|
||||
where
|
||||
F: core::future::Future<Output = Option<RequestUserInputResponse>>,
|
||||
{
|
||||
tokio::select! {
|
||||
biased;
|
||||
_ = cancel_token.cancelled() => {
|
||||
let empty = RequestUserInputResponse {
|
||||
answers: HashMap::new(),
|
||||
};
|
||||
parent_session
|
||||
.notify_user_input_response(sub_id, empty.clone())
|
||||
.await;
|
||||
empty
|
||||
}
|
||||
response = fut => response.unwrap_or_else(|| RequestUserInputResponse {
|
||||
answers: HashMap::new(),
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
/// Await an approval decision, aborting on cancellation.
|
||||
async fn await_approval_with_cancel<F>(
|
||||
fut: F,
|
||||
parent_session: &Session,
|
||||
sub_id: &str,
|
||||
cancel_token: &CancellationToken,
|
||||
) -> codex_protocol::protocol::ReviewDecision
|
||||
where
|
||||
F: core::future::Future<Output = codex_protocol::protocol::ReviewDecision>,
|
||||
{
|
||||
tokio::select! {
|
||||
biased;
|
||||
_ = cancel_token.cancelled() => {
|
||||
parent_session
|
||||
.notify_approval(sub_id, codex_protocol::protocol::ReviewDecision::Abort)
|
||||
.await;
|
||||
codex_protocol::protocol::ReviewDecision::Abort
|
||||
}
|
||||
decision = fut => {
|
||||
decision
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use async_channel::bounded;
|
||||
use codex_protocol::models::ResponseItem;
|
||||
use codex_protocol::protocol::AgentStatus;
|
||||
use codex_protocol::protocol::RawResponseItemEvent;
|
||||
use codex_protocol::protocol::TurnAbortReason;
|
||||
use codex_protocol::protocol::TurnAbortedEvent;
|
||||
use pretty_assertions::assert_eq;
|
||||
use tokio::sync::watch;
|
||||
|
||||
#[tokio::test]
|
||||
async fn forward_events_cancelled_while_send_blocked_shuts_down_delegate() {
|
||||
let (tx_events, rx_events) = bounded(1);
|
||||
let (tx_sub, rx_sub) = bounded(SUBMISSION_CHANNEL_CAPACITY);
|
||||
let (_agent_status_tx, agent_status) = watch::channel(AgentStatus::PendingInit);
|
||||
let (session, ctx, _rx_evt) = crate::codex::make_session_and_context_with_rx().await;
|
||||
let codex = Arc::new(Codex {
|
||||
next_id: AtomicU64::new(0),
|
||||
tx_sub,
|
||||
rx_event: rx_events,
|
||||
agent_status,
|
||||
session: Arc::clone(&session),
|
||||
});
|
||||
|
||||
let (tx_out, rx_out) = bounded(1);
|
||||
tx_out
|
||||
.send(Event {
|
||||
id: "full".to_string(),
|
||||
msg: EventMsg::TurnAborted(TurnAbortedEvent {
|
||||
reason: TurnAbortReason::Interrupted,
|
||||
}),
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let cancel = CancellationToken::new();
|
||||
let forward = tokio::spawn(forward_events(
|
||||
Arc::clone(&codex),
|
||||
tx_out.clone(),
|
||||
session,
|
||||
ctx,
|
||||
cancel.clone(),
|
||||
));
|
||||
|
||||
tx_events
|
||||
.send(Event {
|
||||
id: "evt".to_string(),
|
||||
msg: EventMsg::RawResponseItem(RawResponseItemEvent {
|
||||
item: ResponseItem::CustomToolCall {
|
||||
id: None,
|
||||
status: None,
|
||||
call_id: "call-1".to_string(),
|
||||
name: "tool".to_string(),
|
||||
input: "{}".to_string(),
|
||||
},
|
||||
}),
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
drop(tx_events);
|
||||
cancel.cancel();
|
||||
timeout(std::time::Duration::from_millis(1000), forward)
|
||||
.await
|
||||
.expect("forward_events hung")
|
||||
.expect("forward_events join error");
|
||||
|
||||
let received = rx_out.recv().await.expect("prefilled event missing");
|
||||
assert_eq!("full", received.id);
|
||||
let mut ops = Vec::new();
|
||||
while let Ok(sub) = rx_sub.try_recv() {
|
||||
ops.push(sub.op);
|
||||
}
|
||||
assert!(
|
||||
ops.iter().any(|op| matches!(op, Op::Interrupt)),
|
||||
"expected Interrupt op after cancellation"
|
||||
);
|
||||
assert!(
|
||||
ops.iter().any(|op| matches!(op, Op::Shutdown)),
|
||||
"expected Shutdown op after cancellation"
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -18,7 +18,6 @@ mod compact_remote;
|
||||
pub use codex_thread::CodexThread;
|
||||
pub use codex_thread::ThreadConfigSnapshot;
|
||||
mod agent;
|
||||
mod codex_delegate;
|
||||
mod command_safety;
|
||||
pub mod config;
|
||||
pub mod config_loader;
|
||||
|
||||
@@ -18,10 +18,8 @@ use tracing::Span;
|
||||
use tracing::trace;
|
||||
use tracing::warn;
|
||||
|
||||
use crate::AuthManager;
|
||||
use crate::codex::Session;
|
||||
use crate::codex::TurnContext;
|
||||
use crate::models_manager::manager::ModelsManager;
|
||||
use crate::protocol::EventMsg;
|
||||
use crate::protocol::TurnAbortReason;
|
||||
use crate::protocol::TurnAbortedEvent;
|
||||
@@ -59,14 +57,6 @@ impl SessionTaskContext {
|
||||
pub(crate) fn clone_session(&self) -> Arc<Session> {
|
||||
Arc::clone(&self.session)
|
||||
}
|
||||
|
||||
pub(crate) fn auth_manager(&self) -> Arc<AuthManager> {
|
||||
Arc::clone(&self.session.services.auth_manager)
|
||||
}
|
||||
|
||||
pub(crate) fn models_manager(&self) -> Arc<ModelsManager> {
|
||||
Arc::clone(&self.session.services.models_manager)
|
||||
}
|
||||
}
|
||||
|
||||
/// Async task that drives a [`Session`] turn.
|
||||
|
||||
@@ -1,26 +1,38 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use codex_async_utils::OrCancelExt;
|
||||
use codex_protocol::ThreadId;
|
||||
use codex_protocol::config_types::WebSearchMode;
|
||||
use codex_protocol::items::TurnItem;
|
||||
use codex_protocol::models::ContentItem;
|
||||
use codex_protocol::models::ResponseItem;
|
||||
use codex_protocol::protocol::AgentMessageContentDeltaEvent;
|
||||
use codex_protocol::protocol::AgentMessageDeltaEvent;
|
||||
use codex_protocol::protocol::ApplyPatchApprovalRequestEvent;
|
||||
use codex_protocol::protocol::Event;
|
||||
use codex_protocol::protocol::EventMsg;
|
||||
use codex_protocol::protocol::ExecApprovalRequestEvent;
|
||||
use codex_protocol::protocol::ExitedReviewModeEvent;
|
||||
use codex_protocol::protocol::ItemCompletedEvent;
|
||||
use codex_protocol::protocol::Op;
|
||||
use codex_protocol::protocol::RequestUserInputEvent;
|
||||
use codex_protocol::protocol::ReviewDecision;
|
||||
use codex_protocol::protocol::ReviewOutputEvent;
|
||||
use codex_protocol::protocol::SessionSource;
|
||||
use codex_protocol::protocol::SubAgentSource;
|
||||
use codex_protocol::request_user_input::RequestUserInputArgs;
|
||||
use codex_protocol::request_user_input::RequestUserInputResponse;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
|
||||
use crate::codex::SUBMISSION_CHANNEL_CAPACITY;
|
||||
use crate::codex::Session;
|
||||
use crate::codex::TurnContext;
|
||||
use crate::codex_delegate::run_codex_thread_one_shot;
|
||||
use crate::review_format::format_review_findings_block;
|
||||
use crate::review_format::render_review_output_text;
|
||||
use crate::state::TaskKind;
|
||||
use codex_protocol::user_input::UserInput;
|
||||
use std::collections::HashMap;
|
||||
|
||||
use super::SessionTask;
|
||||
use super::SessionTaskContext;
|
||||
@@ -84,7 +96,7 @@ async fn start_review_conversation(
|
||||
) -> Option<async_channel::Receiver<Event>> {
|
||||
let config = ctx.client.config();
|
||||
let mut sub_agent_config = config.as_ref().clone();
|
||||
// Carry over review-only feature restrictions so the delegate cannot
|
||||
// Carry over review-only feature restrictions so the review agent cannot
|
||||
// re-enable blocked tools (web search, view image).
|
||||
sub_agent_config.web_search_mode = Some(WebSearchMode::Disabled);
|
||||
|
||||
@@ -96,19 +108,295 @@ async fn start_review_conversation(
|
||||
.clone()
|
||||
.unwrap_or_else(|| ctx.client.get_model());
|
||||
sub_agent_config.model = Some(model);
|
||||
(run_codex_thread_one_shot(
|
||||
sub_agent_config,
|
||||
session.auth_manager(),
|
||||
session.models_manager(),
|
||||
input,
|
||||
session.clone_session(),
|
||||
ctx.clone(),
|
||||
cancellation_token,
|
||||
None,
|
||||
let agent_control = session.session.services.agent_control.clone();
|
||||
let agent_id = match agent_control
|
||||
.spawn_agent_with_options(
|
||||
sub_agent_config,
|
||||
None,
|
||||
Some(SessionSource::SubAgent(SubAgentSource::Review)),
|
||||
false,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(agent_id) => agent_id,
|
||||
Err(_) => return None,
|
||||
};
|
||||
|
||||
if agent_control.send_input(agent_id, input).await.is_err() {
|
||||
let _ = agent_control.shutdown_agent(agent_id).await;
|
||||
return None;
|
||||
}
|
||||
|
||||
let (tx_sub, rx_sub) = async_channel::bounded(SUBMISSION_CHANNEL_CAPACITY);
|
||||
let parent_session = session.clone_session();
|
||||
let parent_ctx = ctx.clone();
|
||||
let cancel_token = cancellation_token.child_token();
|
||||
tokio::spawn(async move {
|
||||
forward_review_events(
|
||||
agent_control,
|
||||
agent_id,
|
||||
tx_sub,
|
||||
parent_session,
|
||||
parent_ctx,
|
||||
cancel_token,
|
||||
)
|
||||
.await;
|
||||
});
|
||||
|
||||
Some(rx_sub)
|
||||
}
|
||||
|
||||
async fn forward_review_events(
|
||||
agent_control: crate::agent::AgentControl,
|
||||
agent_id: ThreadId,
|
||||
tx_sub: async_channel::Sender<Event>,
|
||||
parent_session: Arc<Session>,
|
||||
parent_ctx: Arc<TurnContext>,
|
||||
cancellation_token: CancellationToken,
|
||||
) {
|
||||
let cancelled = cancellation_token.cancelled();
|
||||
tokio::pin!(cancelled);
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
_ = &mut cancelled => {
|
||||
shutdown_review_agent(&agent_control, agent_id).await;
|
||||
break;
|
||||
}
|
||||
event = agent_control.next_event(agent_id) => {
|
||||
let event = match event {
|
||||
Ok(event) => event,
|
||||
Err(_) => {
|
||||
shutdown_review_agent(&agent_control, agent_id).await;
|
||||
break;
|
||||
}
|
||||
};
|
||||
|
||||
match event.msg.clone() {
|
||||
EventMsg::AgentMessageDelta(_)
|
||||
| EventMsg::AgentReasoningDelta(_)
|
||||
| EventMsg::TokenCount(_)
|
||||
| EventMsg::SessionConfigured(_)
|
||||
| EventMsg::ThreadNameUpdated(_) => {}
|
||||
EventMsg::ExecApprovalRequest(request) => {
|
||||
if handle_exec_approval(
|
||||
&agent_control,
|
||||
agent_id,
|
||||
event.id,
|
||||
&parent_session,
|
||||
&parent_ctx,
|
||||
request,
|
||||
&cancellation_token,
|
||||
)
|
||||
.await
|
||||
.is_err()
|
||||
{
|
||||
shutdown_review_agent(&agent_control, agent_id).await;
|
||||
break;
|
||||
}
|
||||
}
|
||||
EventMsg::ApplyPatchApprovalRequest(request) => {
|
||||
if handle_patch_approval(
|
||||
&agent_control,
|
||||
agent_id,
|
||||
event.id,
|
||||
&parent_session,
|
||||
&parent_ctx,
|
||||
request,
|
||||
&cancellation_token,
|
||||
)
|
||||
.await
|
||||
.is_err()
|
||||
{
|
||||
shutdown_review_agent(&agent_control, agent_id).await;
|
||||
break;
|
||||
}
|
||||
}
|
||||
EventMsg::RequestUserInput(request) => {
|
||||
if handle_request_user_input(
|
||||
&agent_control,
|
||||
agent_id,
|
||||
event.id,
|
||||
&parent_session,
|
||||
&parent_ctx,
|
||||
request,
|
||||
&cancellation_token,
|
||||
)
|
||||
.await
|
||||
.is_err()
|
||||
{
|
||||
shutdown_review_agent(&agent_control, agent_id).await;
|
||||
break;
|
||||
}
|
||||
}
|
||||
EventMsg::TurnComplete(_) | EventMsg::TurnAborted(_) => {
|
||||
if !send_review_event(&tx_sub, event, &cancellation_token).await {
|
||||
shutdown_review_agent(&agent_control, agent_id).await;
|
||||
break;
|
||||
}
|
||||
shutdown_review_agent(&agent_control, agent_id).await;
|
||||
break;
|
||||
}
|
||||
_ => {
|
||||
if !send_review_event(&tx_sub, event, &cancellation_token).await {
|
||||
shutdown_review_agent(&agent_control, agent_id).await;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn send_review_event(
|
||||
tx_sub: &async_channel::Sender<Event>,
|
||||
event: Event,
|
||||
cancellation_token: &CancellationToken,
|
||||
) -> bool {
|
||||
matches!(
|
||||
tx_sub.send(event).or_cancel(cancellation_token).await,
|
||||
Ok(Ok(()))
|
||||
)
|
||||
.await)
|
||||
.ok()
|
||||
.map(|io| io.rx_event)
|
||||
}
|
||||
|
||||
async fn shutdown_review_agent(agent_control: &crate::agent::AgentControl, agent_id: ThreadId) {
|
||||
let _ = agent_control.shutdown_agent(agent_id).await;
|
||||
}
|
||||
|
||||
async fn handle_exec_approval(
|
||||
agent_control: &crate::agent::AgentControl,
|
||||
agent_id: ThreadId,
|
||||
id: String,
|
||||
parent_session: &Session,
|
||||
parent_ctx: &TurnContext,
|
||||
event: ExecApprovalRequestEvent,
|
||||
cancellation_token: &CancellationToken,
|
||||
) -> crate::error::Result<()> {
|
||||
let approval_fut = parent_session.request_command_approval(
|
||||
parent_ctx,
|
||||
parent_ctx.sub_id.clone(),
|
||||
event.command,
|
||||
event.cwd,
|
||||
event.reason,
|
||||
event.proposed_execpolicy_amendment,
|
||||
);
|
||||
let decision = await_approval_with_cancel(
|
||||
approval_fut,
|
||||
parent_session,
|
||||
&parent_ctx.sub_id,
|
||||
cancellation_token,
|
||||
)
|
||||
.await;
|
||||
agent_control
|
||||
.send_op(agent_id, Op::ExecApproval { id, decision })
|
||||
.await
|
||||
.map(|_| ())
|
||||
}
|
||||
|
||||
async fn handle_patch_approval(
|
||||
agent_control: &crate::agent::AgentControl,
|
||||
agent_id: ThreadId,
|
||||
id: String,
|
||||
parent_session: &Session,
|
||||
parent_ctx: &TurnContext,
|
||||
event: ApplyPatchApprovalRequestEvent,
|
||||
cancellation_token: &CancellationToken,
|
||||
) -> crate::error::Result<()> {
|
||||
let decision_rx = parent_session
|
||||
.request_patch_approval(
|
||||
parent_ctx,
|
||||
parent_ctx.sub_id.clone(),
|
||||
event.changes,
|
||||
event.reason,
|
||||
event.grant_root,
|
||||
)
|
||||
.await;
|
||||
let decision = await_approval_with_cancel(
|
||||
async move { decision_rx.await.unwrap_or_default() },
|
||||
parent_session,
|
||||
&parent_ctx.sub_id,
|
||||
cancellation_token,
|
||||
)
|
||||
.await;
|
||||
agent_control
|
||||
.send_op(agent_id, Op::PatchApproval { id, decision })
|
||||
.await
|
||||
.map(|_| ())
|
||||
}
|
||||
|
||||
async fn handle_request_user_input(
|
||||
agent_control: &crate::agent::AgentControl,
|
||||
agent_id: ThreadId,
|
||||
id: String,
|
||||
parent_session: &Session,
|
||||
parent_ctx: &TurnContext,
|
||||
event: RequestUserInputEvent,
|
||||
cancellation_token: &CancellationToken,
|
||||
) -> crate::error::Result<()> {
|
||||
let args = RequestUserInputArgs {
|
||||
questions: event.questions,
|
||||
};
|
||||
let response_fut =
|
||||
parent_session.request_user_input(parent_ctx, parent_ctx.sub_id.clone(), args);
|
||||
let response = await_user_input_with_cancel(
|
||||
response_fut,
|
||||
parent_session,
|
||||
&parent_ctx.sub_id,
|
||||
cancellation_token,
|
||||
)
|
||||
.await;
|
||||
agent_control
|
||||
.send_op(agent_id, Op::UserInputAnswer { id, response })
|
||||
.await
|
||||
.map(|_| ())
|
||||
}
|
||||
|
||||
async fn await_user_input_with_cancel<F>(
|
||||
fut: F,
|
||||
parent_session: &Session,
|
||||
sub_id: &str,
|
||||
cancellation_token: &CancellationToken,
|
||||
) -> RequestUserInputResponse
|
||||
where
|
||||
F: core::future::Future<Output = Option<RequestUserInputResponse>>,
|
||||
{
|
||||
tokio::select! {
|
||||
biased;
|
||||
_ = cancellation_token.cancelled() => {
|
||||
let empty = RequestUserInputResponse {
|
||||
answers: HashMap::new(),
|
||||
};
|
||||
parent_session
|
||||
.notify_user_input_response(sub_id, empty.clone())
|
||||
.await;
|
||||
empty
|
||||
}
|
||||
response = fut => response.unwrap_or_else(|| RequestUserInputResponse {
|
||||
answers: HashMap::new(),
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
async fn await_approval_with_cancel<F>(
|
||||
fut: F,
|
||||
parent_session: &Session,
|
||||
sub_id: &str,
|
||||
cancellation_token: &CancellationToken,
|
||||
) -> ReviewDecision
|
||||
where
|
||||
F: core::future::Future<Output = ReviewDecision>,
|
||||
{
|
||||
tokio::select! {
|
||||
biased;
|
||||
_ = cancellation_token.cancelled() => {
|
||||
parent_session.notify_approval(sub_id, ReviewDecision::Abort).await;
|
||||
ReviewDecision::Abort
|
||||
}
|
||||
decision = fut => {
|
||||
decision
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn process_review_events(
|
||||
|
||||
@@ -146,7 +146,7 @@ mod spawn {
|
||||
.agent_control
|
||||
.spawn_agent(
|
||||
config,
|
||||
prompt.clone(),
|
||||
Some(prompt.clone()),
|
||||
Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
|
||||
parent_thread_id: session.conversation_id,
|
||||
depth: child_depth,
|
||||
|
||||
@@ -23,7 +23,6 @@ mod auth_refresh;
|
||||
mod cli_stream;
|
||||
mod client;
|
||||
mod client_websockets;
|
||||
mod codex_delegate;
|
||||
mod collaboration_instructions;
|
||||
mod compact;
|
||||
mod compact_remote;
|
||||
@@ -59,6 +58,7 @@ mod request_user_input;
|
||||
mod resume;
|
||||
mod resume_warning;
|
||||
mod review;
|
||||
mod review_subagent;
|
||||
mod rmcp_client;
|
||||
mod rollout_list_find;
|
||||
mod seatbelt;
|
||||
|
||||
@@ -22,10 +22,10 @@ use core_test_support::test_codex::test_codex;
|
||||
use core_test_support::wait_for_event;
|
||||
use pretty_assertions::assert_eq;
|
||||
|
||||
/// Delegate should surface ExecApprovalRequest from sub-agent and proceed
|
||||
/// Review sub-agent should surface ExecApprovalRequest and proceed
|
||||
/// after parent submits an approval decision.
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn codex_delegate_forwards_exec_approval_and_proceeds_on_approval() {
|
||||
async fn review_subagent_forwards_exec_approval_and_proceeds_on_approval() {
|
||||
skip_if_no_network!();
|
||||
|
||||
// Sub-agent turn 1: emit a shell_command function_call requiring approval, then complete.
|
||||
@@ -46,7 +46,7 @@ async fn codex_delegate_forwards_exec_approval_and_proceeds_on_approval() {
|
||||
let review_json = serde_json::json!({
|
||||
"findings": [],
|
||||
"overall_correctness": "ok",
|
||||
"overall_explanation": "delegate approved exec",
|
||||
"overall_explanation": "review sub-agent approved exec",
|
||||
"overall_confidence_score": 0.5
|
||||
})
|
||||
.to_string();
|
||||
@@ -59,7 +59,7 @@ async fn codex_delegate_forwards_exec_approval_and_proceeds_on_approval() {
|
||||
let server = start_mock_server().await;
|
||||
mount_sse_sequence(&server, vec![sse1, sse2]).await;
|
||||
|
||||
// Build a conversation configured to require approvals so the delegate
|
||||
// Build a conversation configured to require approvals so the review sub-agent
|
||||
// routes ExecApprovalRequest via the parent.
|
||||
let mut builder = test_codex().with_model("gpt-5.1").with_config(|config| {
|
||||
config.approval_policy = Constrained::allow_any(AskForApproval::OnRequest);
|
||||
@@ -86,7 +86,7 @@ async fn codex_delegate_forwards_exec_approval_and_proceeds_on_approval() {
|
||||
})
|
||||
.await;
|
||||
|
||||
// Expect parent-side approval request (forwarded by delegate).
|
||||
// Expect parent-side approval request forwarded by the review sub-agent.
|
||||
wait_for_event(&test.codex, |ev| {
|
||||
matches!(ev, EventMsg::ExecApprovalRequest(_))
|
||||
})
|
||||
@@ -108,10 +108,10 @@ async fn codex_delegate_forwards_exec_approval_and_proceeds_on_approval() {
|
||||
wait_for_event(&test.codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
|
||||
}
|
||||
|
||||
/// Delegate should surface ApplyPatchApprovalRequest and honor parent decision
|
||||
/// Review sub-agent should surface ApplyPatchApprovalRequest and honor parent decision
|
||||
/// so the sub-agent can proceed to completion.
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn codex_delegate_forwards_patch_approval_and_proceeds_on_decision() {
|
||||
async fn review_subagent_forwards_patch_approval_and_proceeds_on_decision() {
|
||||
skip_if_no_network!();
|
||||
|
||||
let call_id = "call-patch-1";
|
||||
@@ -124,7 +124,7 @@ async fn codex_delegate_forwards_patch_approval_and_proceeds_on_decision() {
|
||||
let review_json = serde_json::json!({
|
||||
"findings": [],
|
||||
"overall_correctness": "ok",
|
||||
"overall_explanation": "delegate patch handled",
|
||||
"overall_explanation": "review sub-agent patch handled",
|
||||
"overall_confidence_score": 0.5
|
||||
})
|
||||
.to_string();
|
||||
@@ -166,7 +166,7 @@ async fn codex_delegate_forwards_patch_approval_and_proceeds_on_decision() {
|
||||
})
|
||||
.await;
|
||||
|
||||
// Deny via parent so delegate can continue; id "0" is the active sub_id in tests.
|
||||
// Deny via parent so review sub-agent can continue; id "0" is the active sub_id in tests.
|
||||
test.codex
|
||||
.submit(Op::PatchApproval {
|
||||
id: "0".into(),
|
||||
@@ -183,7 +183,7 @@ async fn codex_delegate_forwards_patch_approval_and_proceeds_on_decision() {
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn codex_delegate_ignores_legacy_deltas() {
|
||||
async fn review_subagent_ignores_legacy_deltas() {
|
||||
skip_if_no_network!();
|
||||
|
||||
// Single response with reasoning summary deltas.
|
||||
@@ -200,7 +200,7 @@ async fn codex_delegate_ignores_legacy_deltas() {
|
||||
let mut builder = test_codex();
|
||||
let test = builder.build(&server).await.expect("build test codex");
|
||||
|
||||
// Kick off review (delegated).
|
||||
// Kick off review in a spawned sub-agent.
|
||||
test.codex
|
||||
.submit(Op::Review {
|
||||
review_request: ReviewRequest {
|
||||
Reference in New Issue
Block a user