mirror of
https://github.com/openai/codex.git
synced 2026-04-24 06:35:50 +00:00
Merge remote-tracking branch 'origin/model-fallback-list' into repair/collab-stack-refresh-20260402
# Conflicts: # codex-rs/core/src/tools/handlers/multi_agents/spawn.rs # codex-rs/core/src/tools/handlers/multi_agents_tests.rs # codex-rs/core/src/tools/handlers/multi_agents_v2/spawn.rs # codex-rs/tools/src/agent_tool.rs # codex-rs/tools/src/agent_tool_tests.rs
This commit is contained in:
@@ -76,6 +76,11 @@ pub struct ThreadHistoryBuilder {
|
||||
next_item_index: i64,
|
||||
current_rollout_index: usize,
|
||||
next_rollout_index: usize,
|
||||
// Current streams emit per-attempt spawn ids (`call_id`, `call_id#2`, ...); legacy rollouts
|
||||
// reused the raw tool call_id for each retry, so replay still synthesizes stable per-attempt
|
||||
// item ids when needed to preserve each attempt row.
|
||||
current_spawn_attempt_ids: HashMap<String, String>,
|
||||
spawn_attempt_counts: HashMap<String, usize>,
|
||||
}
|
||||
|
||||
impl Default for ThreadHistoryBuilder {
|
||||
@@ -92,6 +97,8 @@ impl ThreadHistoryBuilder {
|
||||
next_item_index: 1,
|
||||
current_rollout_index: 0,
|
||||
next_rollout_index: 0,
|
||||
current_spawn_attempt_ids: HashMap::new(),
|
||||
spawn_attempt_counts: HashMap::new(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -610,8 +617,9 @@ impl ThreadHistoryBuilder {
|
||||
&mut self,
|
||||
payload: &codex_protocol::protocol::CollabAgentSpawnBeginEvent,
|
||||
) {
|
||||
let item_id = self.next_collab_spawn_attempt_item_id(&payload.call_id);
|
||||
let item = ThreadItem::CollabAgentToolCall {
|
||||
id: payload.call_id.clone(),
|
||||
id: item_id,
|
||||
tool: CollabAgentTool::SpawnAgent,
|
||||
status: CollabAgentToolCallStatus::InProgress,
|
||||
sender_thread_id: payload.sender_thread_id.to_string(),
|
||||
@@ -628,6 +636,10 @@ impl ThreadHistoryBuilder {
|
||||
&mut self,
|
||||
payload: &codex_protocol::protocol::CollabAgentSpawnEndEvent,
|
||||
) {
|
||||
let item_id = self
|
||||
.current_spawn_attempt_ids
|
||||
.remove(&payload.call_id)
|
||||
.unwrap_or_else(|| payload.call_id.clone());
|
||||
let has_receiver = payload.new_thread_id.is_some();
|
||||
let status = match &payload.status {
|
||||
AgentStatus::Errored(_) | AgentStatus::NotFound => CollabAgentToolCallStatus::Failed,
|
||||
@@ -646,7 +658,7 @@ impl ThreadHistoryBuilder {
|
||||
None => (Vec::new(), HashMap::new()),
|
||||
};
|
||||
self.upsert_item_in_current_turn(ThreadItem::CollabAgentToolCall {
|
||||
id: payload.call_id.clone(),
|
||||
id: item_id,
|
||||
tool: CollabAgentTool::SpawnAgent,
|
||||
status,
|
||||
sender_thread_id: payload.sender_thread_id.to_string(),
|
||||
@@ -979,6 +991,8 @@ impl ThreadHistoryBuilder {
|
||||
}
|
||||
|
||||
fn finish_current_turn(&mut self) {
|
||||
self.current_spawn_attempt_ids.clear();
|
||||
self.spawn_attempt_counts.clear();
|
||||
if let Some(turn) = self.current_turn.take() {
|
||||
if turn.items.is_empty() && !turn.opened_explicitly && !turn.saw_compaction {
|
||||
return;
|
||||
@@ -1042,6 +1056,22 @@ impl ThreadHistoryBuilder {
|
||||
id
|
||||
}
|
||||
|
||||
fn next_collab_spawn_attempt_item_id(&mut self, call_id: &str) -> String {
|
||||
let attempt_number = self
|
||||
.spawn_attempt_counts
|
||||
.entry(call_id.to_string())
|
||||
.and_modify(|count| *count += 1)
|
||||
.or_insert(1);
|
||||
let item_id = if *attempt_number == 1 {
|
||||
call_id.to_string()
|
||||
} else {
|
||||
format!("{call_id}#{attempt_number}")
|
||||
};
|
||||
self.current_spawn_attempt_ids
|
||||
.insert(call_id.to_string(), item_id.clone());
|
||||
item_id
|
||||
}
|
||||
|
||||
fn build_user_inputs(&self, payload: &UserMessageEvent) -> Vec<UserInput> {
|
||||
let mut content = Vec::new();
|
||||
if !payload.message.trim().is_empty() {
|
||||
@@ -2564,6 +2594,157 @@ mod tests {
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn reconstructs_collab_spawn_end_without_receiver_as_failed_spawn_attempt() {
|
||||
let sender_thread_id = ThreadId::try_from("00000000-0000-0000-0000-000000000001")
|
||||
.expect("valid sender thread id");
|
||||
let events = vec![
|
||||
EventMsg::UserMessage(UserMessageEvent {
|
||||
message: "spawn agent".into(),
|
||||
images: None,
|
||||
text_elements: Vec::new(),
|
||||
local_images: Vec::new(),
|
||||
}),
|
||||
EventMsg::CollabAgentSpawnBegin(codex_protocol::protocol::CollabAgentSpawnBeginEvent {
|
||||
call_id: "spawn-1".into(),
|
||||
sender_thread_id,
|
||||
prompt: "inspect the repo".into(),
|
||||
model: "gpt-5.4-mini".into(),
|
||||
reasoning_effort: codex_protocol::openai_models::ReasoningEffort::Medium,
|
||||
}),
|
||||
EventMsg::CollabAgentSpawnEnd(codex_protocol::protocol::CollabAgentSpawnEndEvent {
|
||||
call_id: "spawn-1".into(),
|
||||
sender_thread_id,
|
||||
new_thread_id: None,
|
||||
new_agent_nickname: None,
|
||||
new_agent_role: None,
|
||||
prompt: "inspect the repo".into(),
|
||||
model: "gpt-5.4-mini".into(),
|
||||
reasoning_effort: codex_protocol::openai_models::ReasoningEffort::Medium,
|
||||
status: AgentStatus::PendingInit,
|
||||
}),
|
||||
];
|
||||
|
||||
let items = events
|
||||
.into_iter()
|
||||
.map(RolloutItem::EventMsg)
|
||||
.collect::<Vec<_>>();
|
||||
let turns = build_turns_from_rollout_items(&items);
|
||||
assert_eq!(turns.len(), 1);
|
||||
assert_eq!(turns[0].items.len(), 2);
|
||||
assert_eq!(
|
||||
turns[0].items[1],
|
||||
ThreadItem::CollabAgentToolCall {
|
||||
id: "spawn-1".into(),
|
||||
tool: CollabAgentTool::SpawnAgent,
|
||||
status: CollabAgentToolCallStatus::Failed,
|
||||
sender_thread_id: "00000000-0000-0000-0000-000000000001".into(),
|
||||
receiver_thread_ids: Vec::new(),
|
||||
prompt: Some("inspect the repo".into()),
|
||||
model: Some("gpt-5.4-mini".into()),
|
||||
reasoning_effort: Some(codex_protocol::openai_models::ReasoningEffort::Medium),
|
||||
agents_states: HashMap::new(),
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn reconstructs_collab_spawn_retries_as_distinct_attempt_items() {
|
||||
let sender_thread_id = ThreadId::try_from("00000000-0000-0000-0000-000000000001")
|
||||
.expect("valid sender thread id");
|
||||
let spawned_thread_id = ThreadId::try_from("00000000-0000-0000-0000-000000000002")
|
||||
.expect("valid receiver thread id");
|
||||
let events = vec![
|
||||
EventMsg::UserMessage(UserMessageEvent {
|
||||
message: "spawn agent".into(),
|
||||
images: None,
|
||||
text_elements: Vec::new(),
|
||||
local_images: Vec::new(),
|
||||
}),
|
||||
EventMsg::CollabAgentSpawnBegin(codex_protocol::protocol::CollabAgentSpawnBeginEvent {
|
||||
call_id: "spawn-1".into(),
|
||||
sender_thread_id,
|
||||
prompt: "inspect the repo".into(),
|
||||
model: "gpt-5.4-mini".into(),
|
||||
reasoning_effort: codex_protocol::openai_models::ReasoningEffort::Low,
|
||||
}),
|
||||
EventMsg::CollabAgentSpawnEnd(codex_protocol::protocol::CollabAgentSpawnEndEvent {
|
||||
call_id: "spawn-1".into(),
|
||||
sender_thread_id,
|
||||
new_thread_id: None,
|
||||
new_agent_nickname: None,
|
||||
new_agent_role: None,
|
||||
prompt: "inspect the repo".into(),
|
||||
model: "gpt-5.4-mini".into(),
|
||||
reasoning_effort: codex_protocol::openai_models::ReasoningEffort::Low,
|
||||
status: AgentStatus::Errored("insufficient_quota".into()),
|
||||
}),
|
||||
EventMsg::CollabAgentSpawnBegin(codex_protocol::protocol::CollabAgentSpawnBeginEvent {
|
||||
call_id: "spawn-1".into(),
|
||||
sender_thread_id,
|
||||
prompt: "inspect the repo".into(),
|
||||
model: "gpt-5".into(),
|
||||
reasoning_effort: codex_protocol::openai_models::ReasoningEffort::Medium,
|
||||
}),
|
||||
EventMsg::CollabAgentSpawnEnd(codex_protocol::protocol::CollabAgentSpawnEndEvent {
|
||||
call_id: "spawn-1".into(),
|
||||
sender_thread_id,
|
||||
new_thread_id: Some(spawned_thread_id),
|
||||
new_agent_nickname: Some("Scout".into()),
|
||||
new_agent_role: Some("explorer".into()),
|
||||
prompt: "inspect the repo".into(),
|
||||
model: "gpt-5".into(),
|
||||
reasoning_effort: codex_protocol::openai_models::ReasoningEffort::Medium,
|
||||
status: AgentStatus::Running,
|
||||
}),
|
||||
];
|
||||
|
||||
let items = events
|
||||
.into_iter()
|
||||
.map(RolloutItem::EventMsg)
|
||||
.collect::<Vec<_>>();
|
||||
let turns = build_turns_from_rollout_items(&items);
|
||||
|
||||
assert_eq!(turns.len(), 1);
|
||||
assert_eq!(turns[0].items.len(), 3);
|
||||
assert_eq!(
|
||||
turns[0].items[1],
|
||||
ThreadItem::CollabAgentToolCall {
|
||||
id: "spawn-1".into(),
|
||||
tool: CollabAgentTool::SpawnAgent,
|
||||
status: CollabAgentToolCallStatus::Failed,
|
||||
sender_thread_id: "00000000-0000-0000-0000-000000000001".into(),
|
||||
receiver_thread_ids: Vec::new(),
|
||||
prompt: Some("inspect the repo".into()),
|
||||
model: Some("gpt-5.4-mini".into()),
|
||||
reasoning_effort: Some(codex_protocol::openai_models::ReasoningEffort::Low),
|
||||
agents_states: HashMap::new(),
|
||||
}
|
||||
);
|
||||
assert_eq!(
|
||||
turns[0].items[2],
|
||||
ThreadItem::CollabAgentToolCall {
|
||||
id: "spawn-1#2".into(),
|
||||
tool: CollabAgentTool::SpawnAgent,
|
||||
status: CollabAgentToolCallStatus::Completed,
|
||||
sender_thread_id: "00000000-0000-0000-0000-000000000001".into(),
|
||||
receiver_thread_ids: vec!["00000000-0000-0000-0000-000000000002".into()],
|
||||
prompt: Some("inspect the repo".into()),
|
||||
model: Some("gpt-5".into()),
|
||||
reasoning_effort: Some(codex_protocol::openai_models::ReasoningEffort::Medium),
|
||||
agents_states: [(
|
||||
"00000000-0000-0000-0000-000000000002".into(),
|
||||
CollabAgentState {
|
||||
status: crate::protocol::v2::CollabAgentStatus::Running,
|
||||
message: None,
|
||||
},
|
||||
)]
|
||||
.into_iter()
|
||||
.collect(),
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn reconstructs_interrupted_send_input_as_completed_collab_call() {
|
||||
// `send_input(interrupt=true)` first stops the child's active turn, then redirects it with
|
||||
|
||||
@@ -24,7 +24,6 @@ use codex_protocol::openai_models::ReasoningEffort;
|
||||
use codex_protocol::protocol::CollabAgentInteractionBeginEvent;
|
||||
use codex_protocol::protocol::CollabAgentInteractionEndEvent;
|
||||
use codex_protocol::protocol::CollabAgentRef;
|
||||
use codex_protocol::protocol::CollabAgentSpawnBeginEvent;
|
||||
use codex_protocol::protocol::CollabAgentSpawnEndEvent;
|
||||
use codex_protocol::protocol::CollabCloseBeginEvent;
|
||||
use codex_protocol::protocol::CollabCloseEndEvent;
|
||||
|
||||
@@ -71,30 +71,6 @@ impl ToolHandler for Handler {
|
||||
let fork_context = args
|
||||
.fork_context
|
||||
.unwrap_or_else(|| default_fork_context_for_role(&turn.config, role_name));
|
||||
|
||||
session
|
||||
.send_event(
|
||||
&turn,
|
||||
CollabAgentSpawnBeginEvent {
|
||||
call_id: call_id.clone(),
|
||||
sender_thread_id: session.conversation_id,
|
||||
prompt: prompt.clone(),
|
||||
model: args
|
||||
.model_fallback_list
|
||||
.as_ref()
|
||||
.and_then(|list| list.first())
|
||||
.map(|candidate| candidate.model.clone())
|
||||
.unwrap_or_else(|| args.model.clone().unwrap_or_default()),
|
||||
reasoning_effort: args
|
||||
.model_fallback_list
|
||||
.as_ref()
|
||||
.and_then(|list| list.first())
|
||||
.and_then(|candidate| candidate.reasoning_effort)
|
||||
.unwrap_or_else(|| args.reasoning_effort.unwrap_or_default()),
|
||||
}
|
||||
.into(),
|
||||
)
|
||||
.await;
|
||||
let config =
|
||||
build_agent_spawn_config(&session.get_base_instructions().await, turn.as_ref())?;
|
||||
let mut candidates_to_try = collect_spawn_agent_model_candidates(
|
||||
@@ -111,6 +87,18 @@ impl ToolHandler for Handler {
|
||||
|
||||
let mut spawn_result = None;
|
||||
for (idx, candidate) in candidates_to_try.iter().enumerate() {
|
||||
let attempt_call_id = spawn_attempt_event_call_id(&call_id, idx);
|
||||
let candidate_model = candidate.model.clone().unwrap_or_default();
|
||||
let candidate_reasoning_effort = candidate.reasoning_effort.unwrap_or_default();
|
||||
send_collab_agent_spawn_begin_event(
|
||||
&session,
|
||||
&turn,
|
||||
attempt_call_id.clone(),
|
||||
prompt.clone(),
|
||||
candidate_model.clone(),
|
||||
candidate_reasoning_effort,
|
||||
)
|
||||
.await;
|
||||
let mut candidate_config = config.clone();
|
||||
if !fork_context {
|
||||
apply_requested_spawn_agent_model_overrides(
|
||||
@@ -159,20 +147,57 @@ impl ToolHandler for Handler {
|
||||
.await;
|
||||
match attempt_result {
|
||||
Ok(spawned_agent) => {
|
||||
if spawn_should_retry_on_async_quota_exhaustion(
|
||||
spawned_agent.status.clone(),
|
||||
spawned_agent.thread_id,
|
||||
&session.services.agent_control,
|
||||
)
|
||||
.await
|
||||
&& idx + 1 < candidates_to_try.len()
|
||||
{
|
||||
continue;
|
||||
}
|
||||
spawn_result = Some(spawned_agent);
|
||||
let status = if idx + 1 < candidates_to_try.len() {
|
||||
match probe_spawn_attempt_for_async_quota_exhaustion(
|
||||
spawned_agent.status.clone(),
|
||||
spawned_agent.thread_id,
|
||||
&session.services.agent_control,
|
||||
)
|
||||
.await
|
||||
{
|
||||
SpawnAttemptRetryDecision::Accept(status) => status,
|
||||
SpawnAttemptRetryDecision::Retry(retry_status) => {
|
||||
match close_quota_exhausted_spawn_attempt(
|
||||
&session.services.agent_control,
|
||||
spawned_agent.thread_id,
|
||||
retry_status,
|
||||
)
|
||||
.await
|
||||
{
|
||||
SpawnAttemptRetryDecision::Accept(status) => status,
|
||||
SpawnAttemptRetryDecision::Retry(status) => {
|
||||
send_collab_agent_spawn_retry_preempted_event(
|
||||
&session,
|
||||
&turn,
|
||||
attempt_call_id,
|
||||
prompt.clone(),
|
||||
candidate_model,
|
||||
candidate_reasoning_effort,
|
||||
status,
|
||||
)
|
||||
.await;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
spawned_agent.status.clone()
|
||||
};
|
||||
spawn_result = Some((spawned_agent, status, attempt_call_id));
|
||||
break;
|
||||
}
|
||||
Err(err) => {
|
||||
send_collab_agent_spawn_error_event(
|
||||
&session,
|
||||
&turn,
|
||||
attempt_call_id,
|
||||
prompt.clone(),
|
||||
candidate_model,
|
||||
candidate_reasoning_effort,
|
||||
&err,
|
||||
)
|
||||
.await;
|
||||
if spawn_should_retry_on_quota_exhaustion(&err)
|
||||
&& idx + 1 < candidates_to_try.len()
|
||||
{
|
||||
@@ -182,14 +207,13 @@ impl ToolHandler for Handler {
|
||||
}
|
||||
}
|
||||
}
|
||||
let Some(spawned_agent) = spawn_result else {
|
||||
let Some((spawned_agent, status, spawn_event_call_id)) = spawn_result else {
|
||||
return Err(FunctionCallError::RespondToModel(
|
||||
"No spawn attempts were executed".to_string(),
|
||||
));
|
||||
};
|
||||
let new_thread_id = Some(spawned_agent.thread_id);
|
||||
let new_agent_metadata = Some(spawned_agent.metadata.clone());
|
||||
let status = spawned_agent.status.clone();
|
||||
let agent_snapshot = match new_thread_id {
|
||||
Some(thread_id) => {
|
||||
session
|
||||
@@ -227,7 +251,7 @@ impl ToolHandler for Handler {
|
||||
.send_event(
|
||||
&turn,
|
||||
CollabAgentSpawnEndEvent {
|
||||
call_id,
|
||||
call_id: spawn_event_call_id,
|
||||
sender_thread_id: session.conversation_id,
|
||||
new_thread_id,
|
||||
new_agent_nickname,
|
||||
|
||||
@@ -1,5 +1,4 @@
|
||||
use crate::agent::AgentStatus;
|
||||
use crate::agent::status::is_final;
|
||||
use crate::codex::Session;
|
||||
use crate::codex::TurnContext;
|
||||
use crate::config::Config;
|
||||
@@ -17,6 +16,8 @@ use codex_protocol::models::ResponseInputItem;
|
||||
use codex_protocol::openai_models::ReasoningEffort;
|
||||
use codex_protocol::openai_models::ReasoningEffortPreset;
|
||||
use codex_protocol::protocol::CollabAgentRef;
|
||||
use codex_protocol::protocol::CollabAgentSpawnBeginEvent;
|
||||
use codex_protocol::protocol::CollabAgentSpawnEndEvent;
|
||||
use codex_protocol::protocol::CollabAgentStatusEntry;
|
||||
use codex_protocol::protocol::Op;
|
||||
use codex_protocol::protocol::SessionSource;
|
||||
@@ -27,12 +28,27 @@ use serde::Serialize;
|
||||
use serde_json::Value as JsonValue;
|
||||
use std::collections::HashMap;
|
||||
use tokio::time::Duration;
|
||||
use tokio::time::Instant;
|
||||
use tokio::time::timeout;
|
||||
|
||||
/// Minimum wait timeout to prevent tight polling loops from burning CPU.
|
||||
pub(crate) const MIN_WAIT_TIMEOUT_MS: i64 = 10_000;
|
||||
pub(crate) const DEFAULT_WAIT_TIMEOUT_MS: i64 = 30_000;
|
||||
pub(crate) const MAX_WAIT_TIMEOUT_MS: i64 = 3600 * 1000;
|
||||
const ASYNC_QUOTA_EXHAUSTION_STATUS_TIMEOUT: Duration = Duration::from_secs(2);
|
||||
|
||||
pub(crate) enum SpawnAttemptRetryDecision {
|
||||
Accept(AgentStatus),
|
||||
Retry(AgentStatus),
|
||||
}
|
||||
|
||||
pub(crate) fn spawn_attempt_event_call_id(call_id: &str, attempt_index: usize) -> String {
|
||||
if attempt_index == 0 {
|
||||
call_id.to_string()
|
||||
} else {
|
||||
format!("{call_id}#{}", attempt_index + 1)
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn function_arguments(payload: ToolPayload) -> Result<String, FunctionCallError> {
|
||||
match payload {
|
||||
@@ -98,7 +114,7 @@ pub(crate) fn collect_spawn_agent_model_candidates(
|
||||
.iter()
|
||||
.map(|candidate| SpawnAgentModelCandidate {
|
||||
model: Some(candidate.model.clone()),
|
||||
reasoning_effort: candidate.reasoning_effort,
|
||||
reasoning_effort: candidate.reasoning_effort.or(requested_reasoning_effort),
|
||||
})
|
||||
.collect();
|
||||
}
|
||||
@@ -113,6 +129,30 @@ pub(crate) fn collect_spawn_agent_model_candidates(
|
||||
candidates
|
||||
}
|
||||
|
||||
pub(crate) async fn close_quota_exhausted_spawn_attempt(
|
||||
agent_control: &crate::agent::control::AgentControl,
|
||||
thread_id: ThreadId,
|
||||
retry_status: AgentStatus,
|
||||
) -> SpawnAttemptRetryDecision {
|
||||
let retry_decision =
|
||||
recheck_spawn_attempt_retry_decision(retry_status, thread_id, agent_control).await;
|
||||
let SpawnAttemptRetryDecision::Retry(status) = retry_decision else {
|
||||
return retry_decision;
|
||||
};
|
||||
|
||||
// There is still a narrow TOCTOU window: a child can leave `PendingInit` after the final
|
||||
// status read above and before `close_agent` runs. `AgentControl` does not currently expose
|
||||
// a compare-and-close primitive, so this is the strongest local mitigation available.
|
||||
if let Err(err) = agent_control.close_agent(thread_id).await
|
||||
&& !matches!(
|
||||
err,
|
||||
CodexErr::ThreadNotFound(_) | CodexErr::InternalAgentDied
|
||||
)
|
||||
{
|
||||
tracing::warn!("failed to close quota-exhausted spawn attempt {thread_id}: {err}");
|
||||
}
|
||||
SpawnAttemptRetryDecision::Retry(status)
|
||||
}
|
||||
pub(crate) fn spawn_should_retry_on_quota_exhaustion(error: &CodexErr) -> bool {
|
||||
matches!(
|
||||
error,
|
||||
@@ -120,36 +160,93 @@ pub(crate) fn spawn_should_retry_on_quota_exhaustion(error: &CodexErr) -> bool {
|
||||
)
|
||||
}
|
||||
|
||||
pub(crate) async fn spawn_should_retry_on_async_quota_exhaustion(
|
||||
pub(crate) async fn probe_spawn_attempt_for_async_quota_exhaustion(
|
||||
thread_status: AgentStatus,
|
||||
thread_id: ThreadId,
|
||||
agent_control: &crate::agent::control::AgentControl,
|
||||
) -> bool {
|
||||
if is_final(&thread_status) && spawn_should_retry_on_quota_exhaustion_status(&thread_status) {
|
||||
return true;
|
||||
) -> SpawnAttemptRetryDecision {
|
||||
match thread_status {
|
||||
AgentStatus::Completed(_)
|
||||
| AgentStatus::Errored(_)
|
||||
| AgentStatus::Shutdown
|
||||
| AgentStatus::NotFound => {
|
||||
return retry_decision_for_final_spawn_status(thread_status);
|
||||
}
|
||||
AgentStatus::PendingInit | AgentStatus::Running | AgentStatus::Interrupted => {}
|
||||
}
|
||||
|
||||
let Ok(mut status_rx) = agent_control.subscribe_status(thread_id).await else {
|
||||
return false;
|
||||
return match thread_status {
|
||||
AgentStatus::Running | AgentStatus::Interrupted => {
|
||||
SpawnAttemptRetryDecision::Accept(thread_status)
|
||||
}
|
||||
_ => SpawnAttemptRetryDecision::Retry(AgentStatus::PendingInit),
|
||||
};
|
||||
};
|
||||
let mut status = status_rx.borrow_and_update().clone();
|
||||
if is_final(&status) && spawn_should_retry_on_quota_exhaustion_status(&status) {
|
||||
return true;
|
||||
}
|
||||
let deadline = Instant::now() + ASYNC_QUOTA_EXHAUSTION_STATUS_TIMEOUT;
|
||||
|
||||
loop {
|
||||
if timeout(Duration::from_millis(250), status_rx.changed())
|
||||
.await
|
||||
.is_err()
|
||||
{
|
||||
break;
|
||||
let status = status_rx.borrow_and_update().clone();
|
||||
match status {
|
||||
AgentStatus::Completed(_)
|
||||
| AgentStatus::Errored(_)
|
||||
| AgentStatus::Shutdown
|
||||
| AgentStatus::NotFound => {
|
||||
return retry_decision_for_final_spawn_status(status);
|
||||
}
|
||||
AgentStatus::PendingInit | AgentStatus::Running | AgentStatus::Interrupted => {}
|
||||
}
|
||||
status = status_rx.borrow().clone();
|
||||
if is_final(&status) {
|
||||
return spawn_should_retry_on_quota_exhaustion_status(&status);
|
||||
|
||||
let Some(remaining) = deadline.checked_duration_since(Instant::now()) else {
|
||||
return match status {
|
||||
AgentStatus::PendingInit => {
|
||||
SpawnAttemptRetryDecision::Retry(AgentStatus::PendingInit)
|
||||
}
|
||||
AgentStatus::Running | AgentStatus::Interrupted => {
|
||||
SpawnAttemptRetryDecision::Accept(status)
|
||||
}
|
||||
AgentStatus::Completed(_)
|
||||
| AgentStatus::Errored(_)
|
||||
| AgentStatus::Shutdown
|
||||
| AgentStatus::NotFound => retry_decision_for_final_spawn_status(status),
|
||||
};
|
||||
};
|
||||
match timeout(remaining, status_rx.changed()).await {
|
||||
Ok(Ok(())) => {}
|
||||
Ok(Err(_)) => return SpawnAttemptRetryDecision::Retry(AgentStatus::PendingInit),
|
||||
Err(_) => return SpawnAttemptRetryDecision::Retry(AgentStatus::PendingInit),
|
||||
}
|
||||
}
|
||||
false
|
||||
}
|
||||
|
||||
pub(crate) async fn recheck_spawn_attempt_retry_decision(
|
||||
status: AgentStatus,
|
||||
thread_id: ThreadId,
|
||||
agent_control: &crate::agent::control::AgentControl,
|
||||
) -> SpawnAttemptRetryDecision {
|
||||
if !matches!(status, AgentStatus::PendingInit) {
|
||||
return SpawnAttemptRetryDecision::Retry(status);
|
||||
}
|
||||
|
||||
let latest_status = agent_control.get_status(thread_id).await;
|
||||
match latest_status {
|
||||
AgentStatus::Running | AgentStatus::Interrupted => {
|
||||
SpawnAttemptRetryDecision::Accept(latest_status)
|
||||
}
|
||||
AgentStatus::Completed(_)
|
||||
| AgentStatus::Errored(_)
|
||||
| AgentStatus::Shutdown
|
||||
| AgentStatus::NotFound => retry_decision_for_final_spawn_status(latest_status),
|
||||
AgentStatus::PendingInit => SpawnAttemptRetryDecision::Retry(AgentStatus::PendingInit),
|
||||
}
|
||||
}
|
||||
|
||||
fn retry_decision_for_final_spawn_status(status: AgentStatus) -> SpawnAttemptRetryDecision {
|
||||
if spawn_should_retry_on_quota_exhaustion_status(&status) {
|
||||
SpawnAttemptRetryDecision::Retry(status)
|
||||
} else {
|
||||
SpawnAttemptRetryDecision::Accept(status)
|
||||
}
|
||||
}
|
||||
|
||||
fn spawn_should_retry_on_quota_exhaustion_status(status: &AgentStatus) -> bool {
|
||||
@@ -212,6 +309,88 @@ pub(crate) fn collab_spawn_error(err: CodexErr) -> FunctionCallError {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn send_collab_agent_spawn_error_event(
|
||||
session: &Session,
|
||||
turn: &TurnContext,
|
||||
call_id: String,
|
||||
prompt: String,
|
||||
model: String,
|
||||
reasoning_effort: ReasoningEffort,
|
||||
err: &CodexErr,
|
||||
) {
|
||||
session
|
||||
.send_event(
|
||||
turn,
|
||||
CollabAgentSpawnEndEvent {
|
||||
call_id,
|
||||
sender_thread_id: session.conversation_id,
|
||||
new_thread_id: None,
|
||||
new_agent_nickname: None,
|
||||
new_agent_role: None,
|
||||
prompt,
|
||||
model,
|
||||
reasoning_effort,
|
||||
status: match err {
|
||||
CodexErr::ThreadNotFound(_) => AgentStatus::NotFound,
|
||||
err => AgentStatus::Errored(err.to_string()),
|
||||
},
|
||||
}
|
||||
.into(),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
pub(crate) async fn send_collab_agent_spawn_begin_event(
|
||||
session: &Session,
|
||||
turn: &TurnContext,
|
||||
call_id: String,
|
||||
prompt: String,
|
||||
model: String,
|
||||
reasoning_effort: ReasoningEffort,
|
||||
) {
|
||||
session
|
||||
.send_event(
|
||||
turn,
|
||||
CollabAgentSpawnBeginEvent {
|
||||
call_id,
|
||||
sender_thread_id: session.conversation_id,
|
||||
prompt,
|
||||
model,
|
||||
reasoning_effort,
|
||||
}
|
||||
.into(),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
pub(crate) async fn send_collab_agent_spawn_retry_preempted_event(
|
||||
session: &Session,
|
||||
turn: &TurnContext,
|
||||
call_id: String,
|
||||
prompt: String,
|
||||
model: String,
|
||||
reasoning_effort: ReasoningEffort,
|
||||
status: AgentStatus,
|
||||
) {
|
||||
session
|
||||
.send_event(
|
||||
turn,
|
||||
CollabAgentSpawnEndEvent {
|
||||
call_id,
|
||||
sender_thread_id: session.conversation_id,
|
||||
new_thread_id: None,
|
||||
new_agent_nickname: None,
|
||||
new_agent_role: None,
|
||||
prompt,
|
||||
model,
|
||||
reasoning_effort,
|
||||
status,
|
||||
}
|
||||
.into(),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
pub(crate) fn collab_agent_error(agent_id: ThreadId, err: CodexErr) -> FunctionCallError {
|
||||
match err {
|
||||
CodexErr::ThreadNotFound(id) => {
|
||||
|
||||
@@ -3,6 +3,7 @@ use crate::ThreadManager;
|
||||
use crate::agent::WatchdogRegistration;
|
||||
use crate::built_in_model_providers;
|
||||
use crate::codex::make_session_and_context;
|
||||
use crate::codex::make_session_and_context_with_rx;
|
||||
use crate::config::AgentRoleConfig;
|
||||
use crate::config::DEFAULT_AGENT_MAX_DEPTH;
|
||||
use crate::function_tool::FunctionCallError;
|
||||
@@ -35,6 +36,8 @@ use codex_protocol::models::ResponseItem;
|
||||
use codex_protocol::openai_models::ReasoningEffort;
|
||||
use codex_protocol::protocol::AgentStatus;
|
||||
use codex_protocol::protocol::AskForApproval;
|
||||
use codex_protocol::protocol::CollabAgentSpawnEndEvent;
|
||||
use codex_protocol::protocol::Event;
|
||||
use codex_protocol::protocol::EventMsg;
|
||||
use codex_protocol::protocol::FileSystemSandboxPolicy;
|
||||
use codex_protocol::protocol::InitialHistory;
|
||||
@@ -264,6 +267,20 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
async fn wait_for_collab_spawn_end_event(
|
||||
rx: &async_channel::Receiver<Event>,
|
||||
) -> CollabAgentSpawnEndEvent {
|
||||
loop {
|
||||
let event = timeout(Duration::from_secs(1), rx.recv())
|
||||
.await
|
||||
.expect("collab spawn-end event timed out")
|
||||
.expect("collab spawn-end event missing");
|
||||
if let EventMsg::CollabAgentSpawnEnd(event) = event.msg {
|
||||
return event;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
struct ListAgentsResult {
|
||||
agents: Vec<ListedAgentResult>,
|
||||
@@ -704,22 +721,175 @@ async fn multi_agent_v2_spawn_rejects_legacy_items_field() {
|
||||
|
||||
#[tokio::test]
|
||||
async fn spawn_agent_errors_when_manager_dropped() {
|
||||
let (session, turn) = make_session_and_context().await;
|
||||
let (session, turn, rx) = make_session_and_context_with_rx().await;
|
||||
let invocation = invocation(
|
||||
Arc::new(session),
|
||||
Arc::new(turn),
|
||||
session.clone(),
|
||||
turn.clone(),
|
||||
"spawn_agent",
|
||||
function_payload(json!({"message": "hello", "fork_context": false})),
|
||||
);
|
||||
let Err(err) = SpawnAgentHandler.handle(invocation).await else {
|
||||
panic!("spawn should fail without a manager");
|
||||
};
|
||||
let spawn_end_event = wait_for_collab_spawn_end_event(&rx).await;
|
||||
assert_eq!(spawn_end_event.call_id, "call-1");
|
||||
assert_eq!(spawn_end_event.sender_thread_id, session.conversation_id);
|
||||
assert_eq!(spawn_end_event.new_thread_id, None);
|
||||
assert_eq!(spawn_end_event.new_agent_nickname, None);
|
||||
assert_eq!(spawn_end_event.new_agent_role, None);
|
||||
assert_eq!(spawn_end_event.prompt, "hello");
|
||||
assert_eq!(spawn_end_event.model, "");
|
||||
assert_eq!(spawn_end_event.reasoning_effort, ReasoningEffort::default());
|
||||
assert!(matches!(
|
||||
spawn_end_event.status,
|
||||
AgentStatus::Errored(ref message) if message.contains("thread manager dropped")
|
||||
));
|
||||
assert_eq!(
|
||||
err,
|
||||
FunctionCallError::RespondToModel("collab manager unavailable".to_string())
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn multi_agent_v2_spawn_agent_errors_when_manager_dropped() {
|
||||
let (session, mut turn, rx) = make_session_and_context_with_rx().await;
|
||||
let turn_context = Arc::get_mut(&mut turn).expect("single turn context ref");
|
||||
let mut config = (*turn_context.config).clone();
|
||||
config
|
||||
.features
|
||||
.enable(Feature::MultiAgentV2)
|
||||
.expect("test config should allow feature update");
|
||||
turn_context.config = Arc::new(config);
|
||||
|
||||
let invocation = invocation(
|
||||
session.clone(),
|
||||
turn.clone(),
|
||||
"spawn_agent",
|
||||
function_payload(json!({
|
||||
"message": "inspect this repo",
|
||||
"task_name": "worker"
|
||||
})),
|
||||
);
|
||||
let Err(err) = SpawnAgentHandlerV2.handle(invocation).await else {
|
||||
panic!("spawn should fail without a manager");
|
||||
};
|
||||
let spawn_end_event = wait_for_collab_spawn_end_event(&rx).await;
|
||||
assert_eq!(spawn_end_event.call_id, "call-1");
|
||||
assert_eq!(spawn_end_event.sender_thread_id, session.conversation_id);
|
||||
assert_eq!(spawn_end_event.new_thread_id, None);
|
||||
assert_eq!(spawn_end_event.new_agent_nickname, None);
|
||||
assert_eq!(spawn_end_event.new_agent_role, None);
|
||||
assert_eq!(spawn_end_event.prompt, "inspect this repo");
|
||||
assert_eq!(spawn_end_event.model, "");
|
||||
assert_eq!(spawn_end_event.reasoning_effort, ReasoningEffort::default());
|
||||
assert!(matches!(
|
||||
spawn_end_event.status,
|
||||
AgentStatus::Errored(ref message) if message.contains("thread manager dropped")
|
||||
));
|
||||
assert_eq!(
|
||||
err,
|
||||
FunctionCallError::RespondToModel("collab manager unavailable".to_string())
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn spawn_retry_preempted_event_omits_thread_identity() {
|
||||
let (session, turn, rx) = make_session_and_context_with_rx().await;
|
||||
|
||||
send_collab_agent_spawn_retry_preempted_event(
|
||||
session.as_ref(),
|
||||
turn.as_ref(),
|
||||
"call-1".to_string(),
|
||||
"inspect this repo".to_string(),
|
||||
"gpt-5.4-mini".to_string(),
|
||||
ReasoningEffort::Medium,
|
||||
AgentStatus::PendingInit,
|
||||
)
|
||||
.await;
|
||||
|
||||
let spawn_end_event = wait_for_collab_spawn_end_event(&rx).await;
|
||||
assert_eq!(spawn_end_event.call_id, "call-1");
|
||||
assert_eq!(spawn_end_event.sender_thread_id, session.conversation_id);
|
||||
assert_eq!(spawn_end_event.new_thread_id, None);
|
||||
assert_eq!(spawn_end_event.new_agent_nickname, None);
|
||||
assert_eq!(spawn_end_event.new_agent_role, None);
|
||||
assert_eq!(spawn_end_event.prompt, "inspect this repo");
|
||||
assert_eq!(spawn_end_event.model, "gpt-5.4-mini");
|
||||
assert_eq!(spawn_end_event.reasoning_effort, ReasoningEffort::Medium);
|
||||
assert_eq!(spawn_end_event.status, AgentStatus::PendingInit);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn spawn_async_quota_probe_accepts_running_child() {
|
||||
let decision = probe_spawn_attempt_for_async_quota_exhaustion(
|
||||
AgentStatus::Running,
|
||||
ThreadId::default(),
|
||||
&crate::agent::control::AgentControl::default(),
|
||||
)
|
||||
.await;
|
||||
|
||||
assert!(matches!(
|
||||
decision,
|
||||
SpawnAttemptRetryDecision::Accept(AgentStatus::Running)
|
||||
));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn close_quota_exhausted_spawn_attempt_accepts_child_that_started_running() {
|
||||
let (_session, turn) = make_session_and_context().await;
|
||||
let manager = thread_manager();
|
||||
let thread = manager
|
||||
.start_thread((*turn.config).clone())
|
||||
.await
|
||||
.expect("child thread should start");
|
||||
let active_turn = thread.thread.codex.session.new_default_turn().await;
|
||||
thread
|
||||
.thread
|
||||
.codex
|
||||
.session
|
||||
.spawn_task(
|
||||
Arc::clone(&active_turn),
|
||||
vec![UserInput::Text {
|
||||
text: "working".to_string(),
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
NeverEndingTask,
|
||||
)
|
||||
.await;
|
||||
timeout(Duration::from_secs(1), async {
|
||||
loop {
|
||||
if manager.agent_control().get_status(thread.thread_id).await == AgentStatus::Running {
|
||||
break;
|
||||
}
|
||||
tokio::time::sleep(Duration::from_millis(10)).await;
|
||||
}
|
||||
})
|
||||
.await
|
||||
.expect("child should reach running");
|
||||
|
||||
let decision = close_quota_exhausted_spawn_attempt(
|
||||
&manager.agent_control(),
|
||||
thread.thread_id,
|
||||
AgentStatus::PendingInit,
|
||||
)
|
||||
.await;
|
||||
|
||||
assert!(matches!(
|
||||
decision,
|
||||
SpawnAttemptRetryDecision::Accept(AgentStatus::Running)
|
||||
));
|
||||
assert_eq!(
|
||||
manager.agent_control().get_status(thread.thread_id).await,
|
||||
AgentStatus::Running
|
||||
);
|
||||
|
||||
let _ = thread
|
||||
.thread
|
||||
.submit(Op::Shutdown {})
|
||||
.await
|
||||
.expect("shutdown should submit");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn multi_agent_v2_spawn_returns_path_and_send_message_accepts_relative_path() {
|
||||
#[derive(Debug, Deserialize)]
|
||||
|
||||
@@ -17,7 +17,6 @@ use codex_protocol::models::ResponseInputItem;
|
||||
use codex_protocol::openai_models::ReasoningEffort;
|
||||
use codex_protocol::protocol::CollabAgentInteractionBeginEvent;
|
||||
use codex_protocol::protocol::CollabAgentInteractionEndEvent;
|
||||
use codex_protocol::protocol::CollabAgentSpawnBeginEvent;
|
||||
use codex_protocol::protocol::CollabAgentSpawnEndEvent;
|
||||
use codex_protocol::protocol::CollabCloseBeginEvent;
|
||||
use codex_protocol::protocol::CollabCloseEndEvent;
|
||||
|
||||
@@ -53,29 +53,6 @@ impl ToolHandler for Handler {
|
||||
"Agent depth limit reached. Solve the task yourself.".to_string(),
|
||||
));
|
||||
}
|
||||
session
|
||||
.send_event(
|
||||
&turn,
|
||||
CollabAgentSpawnBeginEvent {
|
||||
call_id: call_id.clone(),
|
||||
sender_thread_id: session.conversation_id,
|
||||
prompt: prompt.clone(),
|
||||
model: args
|
||||
.model_fallback_list
|
||||
.as_ref()
|
||||
.and_then(|list| list.first())
|
||||
.map(|candidate| candidate.model.clone())
|
||||
.unwrap_or_else(|| args.model.clone().unwrap_or_default()),
|
||||
reasoning_effort: args
|
||||
.model_fallback_list
|
||||
.as_ref()
|
||||
.and_then(|list| list.first())
|
||||
.and_then(|candidate| candidate.reasoning_effort)
|
||||
.unwrap_or_else(|| args.reasoning_effort.unwrap_or_default()),
|
||||
}
|
||||
.into(),
|
||||
)
|
||||
.await;
|
||||
let config =
|
||||
build_agent_spawn_config(&session.get_base_instructions().await, turn.as_ref())?;
|
||||
|
||||
@@ -120,6 +97,18 @@ impl ToolHandler for Handler {
|
||||
|
||||
let mut spawn_result = None;
|
||||
for (idx, candidate) in candidates_to_try.iter().enumerate() {
|
||||
let attempt_call_id = spawn_attempt_event_call_id(&call_id, idx);
|
||||
let candidate_model = candidate.model.clone().unwrap_or_default();
|
||||
let candidate_reasoning_effort = candidate.reasoning_effort.unwrap_or_default();
|
||||
send_collab_agent_spawn_begin_event(
|
||||
&session,
|
||||
&turn,
|
||||
attempt_call_id.clone(),
|
||||
prompt.clone(),
|
||||
candidate_model.clone(),
|
||||
candidate_reasoning_effort,
|
||||
)
|
||||
.await;
|
||||
let mut candidate_config = config.clone();
|
||||
if fork_mode.is_none() {
|
||||
apply_requested_spawn_agent_model_overrides(
|
||||
@@ -147,31 +136,64 @@ impl ToolHandler for Handler {
|
||||
initial_agent_op.clone(),
|
||||
Some(spawn_source.clone()),
|
||||
SpawnAgentOptions {
|
||||
fork_parent_spawn_call_id: if fork_mode.is_some() {
|
||||
Some(call_id.clone())
|
||||
} else {
|
||||
None
|
||||
},
|
||||
fork_parent_spawn_call_id: fork_mode.as_ref().map(|_| call_id.clone()),
|
||||
fork_mode: fork_mode.clone(),
|
||||
},
|
||||
)
|
||||
.await;
|
||||
match attempt_result {
|
||||
Ok(spawned_agent) => {
|
||||
if spawn_should_retry_on_async_quota_exhaustion(
|
||||
spawned_agent.status.clone(),
|
||||
spawned_agent.thread_id,
|
||||
&session.services.agent_control,
|
||||
)
|
||||
.await
|
||||
&& idx + 1 < candidates_to_try.len()
|
||||
{
|
||||
continue;
|
||||
}
|
||||
spawn_result = Some(spawned_agent);
|
||||
let status = if idx + 1 < candidates_to_try.len() {
|
||||
match probe_spawn_attempt_for_async_quota_exhaustion(
|
||||
spawned_agent.status.clone(),
|
||||
spawned_agent.thread_id,
|
||||
&session.services.agent_control,
|
||||
)
|
||||
.await
|
||||
{
|
||||
SpawnAttemptRetryDecision::Accept(status) => status,
|
||||
SpawnAttemptRetryDecision::Retry(retry_status) => {
|
||||
match close_quota_exhausted_spawn_attempt(
|
||||
&session.services.agent_control,
|
||||
spawned_agent.thread_id,
|
||||
retry_status,
|
||||
)
|
||||
.await
|
||||
{
|
||||
SpawnAttemptRetryDecision::Accept(status) => status,
|
||||
SpawnAttemptRetryDecision::Retry(status) => {
|
||||
send_collab_agent_spawn_retry_preempted_event(
|
||||
&session,
|
||||
&turn,
|
||||
attempt_call_id,
|
||||
prompt.clone(),
|
||||
candidate_model,
|
||||
candidate_reasoning_effort,
|
||||
status,
|
||||
)
|
||||
.await;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
spawned_agent.status.clone()
|
||||
};
|
||||
spawn_result = Some((spawned_agent, status, attempt_call_id));
|
||||
break;
|
||||
}
|
||||
Err(err) => {
|
||||
send_collab_agent_spawn_error_event(
|
||||
&session,
|
||||
&turn,
|
||||
attempt_call_id,
|
||||
prompt.clone(),
|
||||
candidate_model,
|
||||
candidate_reasoning_effort,
|
||||
&err,
|
||||
)
|
||||
.await;
|
||||
if spawn_should_retry_on_quota_exhaustion(&err)
|
||||
&& idx + 1 < candidates_to_try.len()
|
||||
{
|
||||
@@ -181,14 +203,13 @@ impl ToolHandler for Handler {
|
||||
}
|
||||
}
|
||||
}
|
||||
let Some(spawned_agent) = spawn_result else {
|
||||
let Some((spawned_agent, status, spawn_event_call_id)) = spawn_result else {
|
||||
return Err(FunctionCallError::RespondToModel(
|
||||
"No spawn attempts were executed".to_string(),
|
||||
));
|
||||
};
|
||||
let new_thread_id = Some(spawned_agent.thread_id);
|
||||
let new_agent_metadata = Some(spawned_agent.metadata.clone());
|
||||
let status = spawned_agent.status.clone();
|
||||
let agent_snapshot = match new_thread_id {
|
||||
Some(thread_id) => {
|
||||
session
|
||||
@@ -226,7 +247,7 @@ impl ToolHandler for Handler {
|
||||
.send_event(
|
||||
&turn,
|
||||
CollabAgentSpawnEndEvent {
|
||||
call_id,
|
||||
call_id: spawn_event_call_id,
|
||||
sender_thread_id: session.conversation_id,
|
||||
new_thread_id,
|
||||
new_agent_nickname,
|
||||
@@ -260,6 +281,7 @@ impl ToolHandler for Handler {
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
#[serde(deny_unknown_fields)]
|
||||
struct SpawnAgentArgs {
|
||||
message: String,
|
||||
task_name: String,
|
||||
|
||||
@@ -4,6 +4,14 @@ use codex_core::config::AgentRoleConfig;
|
||||
use codex_features::Feature;
|
||||
use codex_protocol::ThreadId;
|
||||
use codex_protocol::openai_models::ReasoningEffort;
|
||||
use codex_protocol::protocol::AgentStatus;
|
||||
use codex_protocol::protocol::AskForApproval;
|
||||
use codex_protocol::protocol::CollabAgentSpawnBeginEvent;
|
||||
use codex_protocol::protocol::CollabAgentSpawnEndEvent;
|
||||
use codex_protocol::protocol::EventMsg;
|
||||
use codex_protocol::protocol::Op;
|
||||
use codex_protocol::protocol::SandboxPolicy;
|
||||
use codex_protocol::user_input::UserInput;
|
||||
use core_test_support::responses::ResponsesRequest;
|
||||
use core_test_support::responses::ev_assistant_message;
|
||||
use core_test_support::responses::ev_completed;
|
||||
@@ -17,6 +25,8 @@ use core_test_support::responses::start_mock_server;
|
||||
use core_test_support::skip_if_no_network;
|
||||
use core_test_support::test_codex::TestCodex;
|
||||
use core_test_support::test_codex::test_codex;
|
||||
use core_test_support::wait_for_event;
|
||||
use core_test_support::wait_for_event_match;
|
||||
use pretty_assertions::assert_eq;
|
||||
use serde_json::Value;
|
||||
use serde_json::json;
|
||||
@@ -41,7 +51,7 @@ const ROLE_MODEL: &str = "gpt-5.1-codex-max";
|
||||
const ROLE_REASONING_EFFORT: ReasoningEffort = ReasoningEffort::High;
|
||||
const FALLBACK_MODEL_A: &str = "gpt-5.1";
|
||||
const FALLBACK_REASONING_EFFORT_A: ReasoningEffort = ReasoningEffort::Low;
|
||||
const FALLBACK_MODEL_B: &str = "gpt-5.2";
|
||||
const FALLBACK_MODEL_B: &str = "gpt-5.2-codex";
|
||||
const FALLBACK_REASONING_EFFORT_B: ReasoningEffort = ReasoningEffort::Medium;
|
||||
|
||||
fn body_contains(req: &wiremock::Request, text: &str) -> bool {
|
||||
@@ -170,7 +180,7 @@ fn role_block(description: &str, role_name: &str) -> Option<String> {
|
||||
}
|
||||
|
||||
async fn wait_for_spawned_thread_id(test: &TestCodex) -> Result<String> {
|
||||
let deadline = Instant::now() + Duration::from_secs(2);
|
||||
let deadline = Instant::now() + Duration::from_secs(5);
|
||||
loop {
|
||||
let ids = test.thread_manager.list_thread_ids().await;
|
||||
if let Some(spawned_id) = ids
|
||||
@@ -202,6 +212,61 @@ async fn wait_for_requests(
|
||||
}
|
||||
}
|
||||
|
||||
async fn submit_turn_and_wait_for_spawn_attempt_events(
|
||||
test: &TestCodex,
|
||||
prompt: &str,
|
||||
expected_attempts: usize,
|
||||
) -> Result<Vec<(CollabAgentSpawnBeginEvent, CollabAgentSpawnEndEvent)>> {
|
||||
test.codex
|
||||
.submit(Op::UserTurn {
|
||||
items: vec![UserInput::Text {
|
||||
text: prompt.to_string(),
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
final_output_json_schema: None,
|
||||
cwd: test.cwd_path().to_path_buf(),
|
||||
approval_policy: AskForApproval::Never,
|
||||
approvals_reviewer: None,
|
||||
sandbox_policy: SandboxPolicy::DangerFullAccess,
|
||||
model: test.session_configured.model.clone(),
|
||||
effort: None,
|
||||
summary: None,
|
||||
service_tier: None,
|
||||
collaboration_mode: None,
|
||||
personality: None,
|
||||
})
|
||||
.await?;
|
||||
|
||||
let turn_id = wait_for_event_match(&test.codex, |event| match event {
|
||||
EventMsg::TurnStarted(event) => Some(event.turn_id.clone()),
|
||||
_ => None,
|
||||
})
|
||||
.await;
|
||||
let mut spawn_events = Vec::with_capacity(expected_attempts);
|
||||
let mut pending_begin = None;
|
||||
loop {
|
||||
let event = wait_for_event(&test.codex, |_| true).await;
|
||||
match event {
|
||||
EventMsg::CollabAgentSpawnBegin(event) => {
|
||||
pending_begin = Some(event);
|
||||
}
|
||||
EventMsg::CollabAgentSpawnEnd(event) => {
|
||||
let begin_event = pending_begin
|
||||
.take()
|
||||
.ok_or_else(|| anyhow::anyhow!("spawn end event without matching begin"))?;
|
||||
spawn_events.push((begin_event, event));
|
||||
}
|
||||
EventMsg::TurnComplete(event) if event.turn_id == turn_id => break,
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
if let Some(begin_event) = pending_begin {
|
||||
anyhow::bail!("spawn begin event without matching end: {begin_event:?}");
|
||||
}
|
||||
assert_eq!(spawn_events.len(), expected_attempts);
|
||||
Ok(spawn_events)
|
||||
}
|
||||
|
||||
async fn setup_turn_one_with_spawned_child(
|
||||
server: &MockServer,
|
||||
child_response_delay: Option<Duration>,
|
||||
@@ -643,7 +708,52 @@ async fn spawn_agent_model_fallback_list_retries_after_quota_exhaustion() -> Res
|
||||
});
|
||||
let test = builder.build(&server).await?;
|
||||
|
||||
test.submit_turn(TURN_1_PROMPT).await?;
|
||||
let spawn_events = submit_turn_and_wait_for_spawn_attempt_events(
|
||||
&test,
|
||||
TURN_1_PROMPT,
|
||||
/*expected_attempts*/ 2,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let (quota_begin_event, quota_end_event) = &spawn_events[0];
|
||||
assert_eq!(quota_begin_event.call_id, SPAWN_CALL_ID);
|
||||
assert_eq!(quota_begin_event.prompt, CHILD_PROMPT);
|
||||
assert_eq!(quota_begin_event.model, FALLBACK_MODEL_A);
|
||||
assert_eq!(
|
||||
quota_begin_event.reasoning_effort,
|
||||
FALLBACK_REASONING_EFFORT_A
|
||||
);
|
||||
assert_eq!(quota_end_event.call_id, SPAWN_CALL_ID);
|
||||
assert_eq!(quota_end_event.new_thread_id, None);
|
||||
assert_eq!(quota_end_event.new_agent_nickname, None);
|
||||
assert_eq!(quota_end_event.new_agent_role, None);
|
||||
assert_eq!(quota_end_event.prompt, CHILD_PROMPT);
|
||||
assert_eq!(quota_end_event.model, FALLBACK_MODEL_A);
|
||||
assert_eq!(
|
||||
quota_end_event.reasoning_effort,
|
||||
FALLBACK_REASONING_EFFORT_A
|
||||
);
|
||||
match "a_end_event.status {
|
||||
AgentStatus::PendingInit => {}
|
||||
AgentStatus::Errored(message) if message.to_lowercase().contains("quota") => {}
|
||||
status => panic!("unexpected first-attempt retry status: {status:?}"),
|
||||
}
|
||||
|
||||
let (fallback_begin_event, fallback_end_event) = &spawn_events[1];
|
||||
assert_eq!(fallback_begin_event.call_id, format!("{SPAWN_CALL_ID}#2"));
|
||||
assert_eq!(fallback_begin_event.prompt, CHILD_PROMPT);
|
||||
assert_eq!(fallback_begin_event.model, FALLBACK_MODEL_B);
|
||||
assert_eq!(
|
||||
fallback_begin_event.reasoning_effort,
|
||||
FALLBACK_REASONING_EFFORT_B
|
||||
);
|
||||
assert_eq!(fallback_end_event.call_id, format!("{SPAWN_CALL_ID}#2"));
|
||||
assert_eq!(fallback_end_event.prompt, CHILD_PROMPT);
|
||||
assert_eq!(fallback_end_event.model, FALLBACK_MODEL_B);
|
||||
assert_eq!(
|
||||
fallback_end_event.reasoning_effort,
|
||||
FALLBACK_REASONING_EFFORT_B
|
||||
);
|
||||
|
||||
let quota_requests = quota_child_attempt
|
||||
.requests()
|
||||
@@ -667,8 +777,8 @@ async fn spawn_agent_model_fallback_list_retries_after_quota_exhaustion() -> Res
|
||||
);
|
||||
}
|
||||
|
||||
let fallback_requests = fallback_child_attempt
|
||||
.requests()
|
||||
let fallback_requests = wait_for_requests(&fallback_child_attempt)
|
||||
.await?
|
||||
.into_iter()
|
||||
.filter(|request| {
|
||||
request.body_json().get("model").and_then(Value::as_str) == Some(FALLBACK_MODEL_B)
|
||||
|
||||
@@ -23,7 +23,7 @@ pub fn create_spawn_agent_tool_v1(options: SpawnAgentToolOptions<'_>) -> ToolSpe
|
||||
let available_models_description = spawn_agent_models_description(options.available_models);
|
||||
let return_value_description =
|
||||
"Returns the spawned agent id plus the user-facing nickname when available.";
|
||||
let properties = spawn_agent_common_properties(&options.agent_type_description);
|
||||
let properties = spawn_agent_common_properties_v1(&options.agent_type_description);
|
||||
|
||||
ToolSpec::Function(ResponsesApiTool {
|
||||
name: "spawn_agent".to_string(),
|
||||
@@ -647,7 +647,7 @@ fn create_collab_input_items_schema() -> JsonSchema {
|
||||
}
|
||||
}
|
||||
|
||||
fn spawn_agent_common_properties(agent_type_description: &str) -> BTreeMap<String, JsonSchema> {
|
||||
fn spawn_agent_model_fallback_list_schema() -> JsonSchema {
|
||||
let model_fallback_item_properties = BTreeMap::from([
|
||||
(
|
||||
"model".to_string(),
|
||||
@@ -669,6 +669,20 @@ fn spawn_agent_common_properties(agent_type_description: &str) -> BTreeMap<Strin
|
||||
),
|
||||
]);
|
||||
|
||||
JsonSchema::Array {
|
||||
items: Box::new(JsonSchema::Object {
|
||||
properties: model_fallback_item_properties,
|
||||
required: Some(vec!["model".to_string()]),
|
||||
additional_properties: Some(false.into()),
|
||||
}),
|
||||
description: Some(
|
||||
"Ordered model candidates for fallback retries. Each entry may include an optional reasoning effort."
|
||||
.to_string(),
|
||||
),
|
||||
}
|
||||
}
|
||||
|
||||
fn spawn_agent_common_properties_v1(agent_type_description: &str) -> BTreeMap<String, JsonSchema> {
|
||||
BTreeMap::from([
|
||||
(
|
||||
"message".to_string(),
|
||||
@@ -704,20 +718,6 @@ fn spawn_agent_common_properties(agent_type_description: &str) -> BTreeMap<Strin
|
||||
),
|
||||
},
|
||||
),
|
||||
(
|
||||
"model_fallback_list".to_string(),
|
||||
JsonSchema::Array {
|
||||
items: Box::new(JsonSchema::Object {
|
||||
properties: model_fallback_item_properties,
|
||||
required: Some(vec!["model".to_string()]),
|
||||
additional_properties: Some(false.into()),
|
||||
}),
|
||||
description: Some(
|
||||
"Ordered model candidates for fallback retries. Each entry may include an optional reasoning effort."
|
||||
.to_string(),
|
||||
),
|
||||
},
|
||||
),
|
||||
(
|
||||
"reasoning_effort".to_string(),
|
||||
JsonSchema::String {
|
||||
@@ -727,31 +727,14 @@ fn spawn_agent_common_properties(agent_type_description: &str) -> BTreeMap<Strin
|
||||
),
|
||||
},
|
||||
),
|
||||
(
|
||||
"model_fallback_list".to_string(),
|
||||
spawn_agent_model_fallback_list_schema(),
|
||||
),
|
||||
])
|
||||
}
|
||||
|
||||
fn spawn_agent_common_properties_v2(agent_type_description: &str) -> BTreeMap<String, JsonSchema> {
|
||||
let model_fallback_item_properties = BTreeMap::from([
|
||||
(
|
||||
"model".to_string(),
|
||||
JsonSchema::String {
|
||||
description: Some(
|
||||
"Model to try. Must be a model slug from the current model picker list."
|
||||
.to_string(),
|
||||
),
|
||||
},
|
||||
),
|
||||
(
|
||||
"reasoning_effort".to_string(),
|
||||
JsonSchema::String {
|
||||
description: Some(
|
||||
"Optional reasoning effort override for this candidate. Replaces the inherited reasoning effort."
|
||||
.to_string(),
|
||||
),
|
||||
},
|
||||
),
|
||||
]);
|
||||
|
||||
BTreeMap::from([
|
||||
(
|
||||
"message".to_string(),
|
||||
@@ -792,20 +775,6 @@ fn spawn_agent_common_properties_v2(agent_type_description: &str) -> BTreeMap<St
|
||||
),
|
||||
},
|
||||
),
|
||||
(
|
||||
"model_fallback_list".to_string(),
|
||||
JsonSchema::Array {
|
||||
items: Box::new(JsonSchema::Object {
|
||||
properties: model_fallback_item_properties,
|
||||
required: Some(vec!["model".to_string()]),
|
||||
additional_properties: Some(false.into()),
|
||||
}),
|
||||
description: Some(
|
||||
"Ordered model candidates for fallback retries. Each entry may include an optional reasoning effort."
|
||||
.to_string(),
|
||||
),
|
||||
},
|
||||
),
|
||||
(
|
||||
"reasoning_effort".to_string(),
|
||||
JsonSchema::String {
|
||||
@@ -815,6 +784,10 @@ fn spawn_agent_common_properties_v2(agent_type_description: &str) -> BTreeMap<St
|
||||
),
|
||||
},
|
||||
),
|
||||
(
|
||||
"model_fallback_list".to_string(),
|
||||
spawn_agent_model_fallback_list_schema(),
|
||||
),
|
||||
])
|
||||
}
|
||||
|
||||
|
||||
@@ -72,7 +72,7 @@ fn spawn_agent_tool_v2_requires_task_name_and_lists_visible_models() {
|
||||
Some(vec!["task_name".to_string(), "message".to_string()])
|
||||
);
|
||||
let Some(JsonSchema::Array { items, .. }) = properties.get("model_fallback_list") else {
|
||||
panic!("spawn_agent v2 should define model_fallback_list as an array of objects");
|
||||
panic!("spawn_agent v2 should define model_fallback_list as an array");
|
||||
};
|
||||
let JsonSchema::Object {
|
||||
properties: model_fallback_item_properties,
|
||||
@@ -91,6 +91,7 @@ fn spawn_agent_tool_v2_requires_task_name_and_lists_visible_models() {
|
||||
),
|
||||
})
|
||||
);
|
||||
assert!(model_fallback_item_properties.contains_key("reasoning_effort"));
|
||||
assert_eq!(model_fallback_item_required, &vec!["model".to_string()]);
|
||||
assert_eq!(
|
||||
output_schema.expect("spawn_agent output schema")["required"],
|
||||
@@ -100,19 +101,19 @@ fn spawn_agent_tool_v2_requires_task_name_and_lists_visible_models() {
|
||||
|
||||
#[test]
|
||||
fn spawn_agent_tool_v1_includes_model_fallback_list() {
|
||||
let ToolSpec::Function(ResponsesApiTool { parameters, .. }) =
|
||||
create_spawn_agent_tool_v1(SpawnAgentToolOptions {
|
||||
available_models: &[model_preset("visible", /*show_in_picker*/ true)],
|
||||
agent_type_description: "role help".to_string(),
|
||||
})
|
||||
else {
|
||||
let tool = create_spawn_agent_tool_v1(SpawnAgentToolOptions {
|
||||
available_models: &[],
|
||||
agent_type_description: "role help".to_string(),
|
||||
});
|
||||
|
||||
let ToolSpec::Function(ResponsesApiTool { parameters, .. }) = tool else {
|
||||
panic!("spawn_agent should be a function tool");
|
||||
};
|
||||
let JsonSchema::Object { properties, .. } = parameters else {
|
||||
panic!("spawn_agent should use object params");
|
||||
};
|
||||
let Some(JsonSchema::Array { .. }) = properties.get("model_fallback_list") else {
|
||||
panic!("model_fallback_list should be an array");
|
||||
panic!("spawn_agent v1 should define model_fallback_list as an array");
|
||||
};
|
||||
assert!(properties.contains_key("model_fallback_list"));
|
||||
assert!(properties.contains_key("fork_context"));
|
||||
|
||||
@@ -769,6 +769,35 @@ mod tests {
|
||||
assert_eq!(title.spans[6].style.fg, Some(Color::Magenta));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn spawn_end_without_receiver_renders_failed_spawn_attempt() {
|
||||
let sender_thread_id = ThreadId::from_string("00000000-0000-0000-0000-000000000001")
|
||||
.expect("valid sender thread id");
|
||||
|
||||
let cell = spawn_end(
|
||||
CollabAgentSpawnEndEvent {
|
||||
call_id: "call-spawn".to_string(),
|
||||
sender_thread_id,
|
||||
new_thread_id: None,
|
||||
new_agent_nickname: None,
|
||||
new_agent_role: None,
|
||||
prompt: "inspect the repo".to_string(),
|
||||
model: "gpt-5".to_string(),
|
||||
reasoning_effort: ReasoningEffortConfig::High,
|
||||
status: AgentStatus::PendingInit,
|
||||
},
|
||||
Some(&SpawnRequestSummary {
|
||||
model: "gpt-5".to_string(),
|
||||
reasoning_effort: ReasoningEffortConfig::High,
|
||||
}),
|
||||
);
|
||||
|
||||
assert_eq!(
|
||||
cell_to_text(&cell),
|
||||
"• Agent spawn failed\n └ inspect the repo"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn collab_resume_interrupted_snapshot() {
|
||||
let sender_thread_id = ThreadId::from_string("00000000-0000-0000-0000-000000000001")
|
||||
|
||||
Reference in New Issue
Block a user