Files
codex/codex-rs/core/src/agent/control_tests.rs
starr-openai 32deb67fc6 Harden Windows realtime and agent resume tests
Avoid PowerShell command forms that depend on method invocation for the delegated realtime shell-tool test, and wait for a shutdown status before resuming the same subagent thread in the nickname/role restore test.

Co-authored-by: Codex <noreply@openai.com>
2026-05-07 14:48:08 -07:00

2560 lines
83 KiB
Rust

use super::*;
use crate::CodexThread;
use crate::StateDbHandle;
use crate::ThreadManager;
use crate::agent::agent_status_from_event;
use crate::config::AgentRoleConfig;
use crate::config::Config;
use crate::config::ConfigBuilder;
use crate::context::ContextualUserFragment;
use crate::context::SubagentNotification;
use crate::init_state_db;
use assert_matches::assert_matches;
use codex_features::Feature;
use codex_login::CodexAuth;
use codex_protocol::AgentPath;
use codex_protocol::config_types::ModeKind;
use codex_protocol::models::ContentItem;
use codex_protocol::models::MessagePhase;
use codex_protocol::models::ResponseItem;
use codex_protocol::protocol::ErrorEvent;
use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::InterAgentCommunication;
use codex_protocol::protocol::SessionSource;
use codex_protocol::protocol::SubAgentSource;
use codex_protocol::protocol::TurnAbortReason;
use codex_protocol::protocol::TurnAbortedEvent;
use codex_protocol::protocol::TurnCompleteEvent;
use codex_protocol::protocol::TurnStartedEvent;
use codex_thread_store::ArchiveThreadParams;
use codex_thread_store::LocalThreadStore;
use codex_thread_store::LocalThreadStoreConfig;
use codex_thread_store::ThreadStore;
use pretty_assertions::assert_eq;
use tempfile::TempDir;
use tokio::time::Duration;
use tokio::time::sleep;
use tokio::time::timeout;
use toml::Value as TomlValue;
async fn test_config_with_cli_overrides(
cli_overrides: Vec<(String, TomlValue)>,
) -> (TempDir, Config) {
let home = TempDir::new().expect("create temp dir");
let config = ConfigBuilder::without_managed_config_for_tests()
.codex_home(home.path().to_path_buf())
.cli_overrides(cli_overrides)
.build()
.await
.expect("load default test config");
(home, config)
}
async fn test_config() -> (TempDir, Config) {
test_config_with_cli_overrides(Vec::new()).await
}
fn text_input(text: &str) -> Op {
vec![UserInput::Text {
text: text.to_string(),
text_elements: Vec::new(),
}]
.into()
}
fn assistant_message(text: &str, phase: Option<MessagePhase>) -> ResponseItem {
ResponseItem::Message {
id: None,
role: "assistant".to_string(),
content: vec![ContentItem::OutputText {
text: text.to_string(),
}],
phase,
}
}
fn spawn_agent_call(call_id: &str) -> ResponseItem {
ResponseItem::FunctionCall {
id: None,
name: "spawn_agent".to_string(),
namespace: None,
arguments: "{}".to_string(),
call_id: call_id.to_string(),
}
}
struct AgentControlHarness {
_home: TempDir,
config: Config,
state_db: Option<StateDbHandle>,
manager: ThreadManager,
control: AgentControl,
}
impl AgentControlHarness {
async fn new() -> Self {
let (home, config) = test_config().await;
let state_db = init_state_db(&config).await;
let manager = ThreadManager::with_models_provider_home_and_state_for_tests(
CodexAuth::from_api_key("dummy"),
config.model_provider.clone(),
config.codex_home.to_path_buf(),
std::sync::Arc::new(codex_exec_server::EnvironmentManager::default_for_tests()),
state_db.clone(),
);
let control = manager.agent_control();
Self {
_home: home,
config,
state_db,
manager,
control,
}
}
async fn start_thread(&self) -> (ThreadId, Arc<CodexThread>) {
let new_thread = self
.manager
.start_thread(self.config.clone())
.await
.expect("start thread");
(new_thread.thread_id, new_thread.thread)
}
}
fn has_subagent_notification(history_items: &[ResponseItem]) -> bool {
history_items.iter().any(|item| {
let ResponseItem::Message { role, content, .. } = item else {
return false;
};
if role != "user" {
return false;
}
content.iter().any(|content_item| match content_item {
ContentItem::InputText { text } | ContentItem::OutputText { text } => {
SubagentNotification::matches_text(text)
}
ContentItem::InputImage { .. } => false,
})
})
}
/// Returns true when any message item contains `needle` in a text span.
fn history_contains_text(history_items: &[ResponseItem], needle: &str) -> bool {
history_items.iter().any(|item| {
let ResponseItem::Message { content, .. } = item else {
return false;
};
content.iter().any(|content_item| match content_item {
ContentItem::InputText { text } | ContentItem::OutputText { text } => {
text.contains(needle)
}
ContentItem::InputImage { .. } => false,
})
})
}
fn history_contains_assistant_inter_agent_communication(
history_items: &[ResponseItem],
expected: &InterAgentCommunication,
) -> bool {
history_items.iter().any(|item| {
let ResponseItem::Message { role, content, .. } = item else {
return false;
};
if role != "assistant" {
return false;
}
content.iter().any(|content_item| match content_item {
ContentItem::OutputText { text } => {
serde_json::from_str::<InterAgentCommunication>(text)
.ok()
.as_ref()
== Some(expected)
}
ContentItem::InputText { .. } | ContentItem::InputImage { .. } => false,
})
})
}
async fn wait_for_subagent_notification(parent_thread: &Arc<CodexThread>) -> bool {
let wait = async {
loop {
let history_items = parent_thread
.codex
.session
.clone_history()
.await
.raw_items()
.to_vec();
if has_subagent_notification(&history_items) {
return true;
}
sleep(Duration::from_millis(25)).await;
}
};
// CI can take several seconds to schedule the detached completion watcher,
// especially on slower Windows runners.
timeout(Duration::from_secs(10), wait).await.is_ok()
}
async fn persist_thread_for_tree_resume(thread: &Arc<CodexThread>, message: &str) {
thread
.inject_user_message_without_turn(message.to_string())
.await;
thread.codex.session.ensure_rollout_materialized().await;
thread
.codex
.session
.flush_rollout()
.await
.expect("test thread rollout should flush");
}
async fn wait_for_live_thread_spawn_children(
control: &AgentControl,
parent_thread_id: ThreadId,
expected_children: &[ThreadId],
) {
let mut expected_children = expected_children.to_vec();
expected_children.sort_by_key(std::string::ToString::to_string);
timeout(Duration::from_secs(5), async {
loop {
let mut child_ids = control
.open_thread_spawn_children(parent_thread_id)
.await
.expect("live child list should load")
.into_iter()
.map(|(thread_id, _)| thread_id)
.collect::<Vec<_>>();
child_ids.sort_by_key(std::string::ToString::to_string);
if child_ids == expected_children {
break;
}
sleep(Duration::from_millis(25)).await;
}
})
.await
.expect("expected persisted child tree");
}
#[tokio::test]
async fn send_input_errors_when_manager_dropped() {
let control = AgentControl::default();
let err = control
.send_input(
ThreadId::new(),
vec![UserInput::Text {
text: "hello".to_string(),
text_elements: Vec::new(),
}]
.into(),
)
.await
.expect_err("send_input should fail without a manager");
assert_eq!(
err.to_string(),
"unsupported operation: thread manager dropped"
);
}
#[tokio::test]
async fn get_status_returns_not_found_without_manager() {
let control = AgentControl::default();
let got = control.get_status(ThreadId::new()).await;
assert_eq!(got, AgentStatus::NotFound);
}
#[tokio::test]
async fn on_event_updates_status_from_task_started() {
let status = agent_status_from_event(&EventMsg::TurnStarted(TurnStartedEvent {
turn_id: "turn-1".to_string(),
started_at: None,
model_context_window: None,
collaboration_mode_kind: ModeKind::Default,
}));
assert_eq!(status, Some(AgentStatus::Running));
}
#[tokio::test]
async fn on_event_updates_status_from_task_complete() {
let status = agent_status_from_event(&EventMsg::TurnComplete(TurnCompleteEvent {
turn_id: "turn-1".to_string(),
last_agent_message: Some("done".to_string()),
completed_at: None,
duration_ms: None,
time_to_first_token_ms: None,
}));
let expected = AgentStatus::Completed(Some("done".to_string()));
assert_eq!(status, Some(expected));
}
#[tokio::test]
async fn on_event_updates_status_from_error() {
let status = agent_status_from_event(&EventMsg::Error(ErrorEvent {
message: "boom".to_string(),
codex_error_info: None,
}));
let expected = AgentStatus::Errored("boom".to_string());
assert_eq!(status, Some(expected));
}
#[tokio::test]
async fn on_event_updates_status_from_turn_aborted() {
let status = agent_status_from_event(&EventMsg::TurnAborted(TurnAbortedEvent {
turn_id: Some("turn-1".to_string()),
reason: TurnAbortReason::Interrupted,
completed_at: None,
duration_ms: None,
}));
let expected = AgentStatus::Interrupted;
assert_eq!(status, Some(expected));
}
#[tokio::test]
async fn on_event_updates_status_from_shutdown_complete() {
let status = agent_status_from_event(&EventMsg::ShutdownComplete);
assert_eq!(status, Some(AgentStatus::Shutdown));
}
#[tokio::test]
async fn spawn_agent_errors_when_manager_dropped() {
let control = AgentControl::default();
let (_home, config) = test_config().await;
let err = control
.spawn_agent(config, text_input("hello"), /*session_source*/ None)
.await
.expect_err("spawn_agent should fail without a manager");
assert_eq!(
err.to_string(),
"unsupported operation: thread manager dropped"
);
}
#[tokio::test]
async fn resume_agent_errors_when_manager_dropped() {
let control = AgentControl::default();
let (_home, config) = test_config().await;
let err = control
.resume_agent_from_rollout(config, ThreadId::new(), SessionSource::Exec)
.await
.expect_err("resume_agent should fail without a manager");
assert_eq!(
err.to_string(),
"unsupported operation: thread manager dropped"
);
}
#[tokio::test]
async fn send_input_errors_when_thread_missing() {
let harness = AgentControlHarness::new().await;
let thread_id = ThreadId::new();
let err = harness
.control
.send_input(
thread_id,
vec![UserInput::Text {
text: "hello".to_string(),
text_elements: Vec::new(),
}]
.into(),
)
.await
.expect_err("send_input should fail for missing thread");
assert_matches!(err, CodexErr::ThreadNotFound(id) if id == thread_id);
}
#[tokio::test]
async fn get_status_returns_not_found_for_missing_thread() {
let harness = AgentControlHarness::new().await;
let status = harness.control.get_status(ThreadId::new()).await;
assert_eq!(status, AgentStatus::NotFound);
}
#[tokio::test]
async fn get_status_returns_pending_init_for_new_thread() {
let harness = AgentControlHarness::new().await;
let (thread_id, _) = harness.start_thread().await;
let status = harness.control.get_status(thread_id).await;
assert_eq!(status, AgentStatus::PendingInit);
}
#[tokio::test]
async fn subscribe_status_errors_for_missing_thread() {
let harness = AgentControlHarness::new().await;
let thread_id = ThreadId::new();
let err = harness
.control
.subscribe_status(thread_id)
.await
.expect_err("subscribe_status should fail for missing thread");
assert_matches!(err, CodexErr::ThreadNotFound(id) if id == thread_id);
}
#[tokio::test]
async fn subscribe_status_updates_on_shutdown() {
let harness = AgentControlHarness::new().await;
let (thread_id, thread) = harness.start_thread().await;
let mut status_rx = harness
.control
.subscribe_status(thread_id)
.await
.expect("subscribe_status should succeed");
assert_eq!(status_rx.borrow().clone(), AgentStatus::PendingInit);
let _ = thread
.submit(Op::Shutdown {})
.await
.expect("shutdown should submit");
let _ = status_rx.changed().await;
assert_eq!(status_rx.borrow().clone(), AgentStatus::Shutdown);
}
#[tokio::test]
async fn send_input_submits_user_message() {
let harness = AgentControlHarness::new().await;
let (thread_id, _thread) = harness.start_thread().await;
let submission_id = harness
.control
.send_input(
thread_id,
vec![UserInput::Text {
text: "hello from tests".to_string(),
text_elements: Vec::new(),
}]
.into(),
)
.await
.expect("send_input should succeed");
assert!(!submission_id.is_empty());
let expected = (
thread_id,
Op::UserInput {
environments: None,
items: vec![UserInput::Text {
text: "hello from tests".to_string(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
responsesapi_client_metadata: None,
},
);
let captured = harness
.manager
.captured_ops()
.into_iter()
.find(|entry| *entry == expected);
assert_eq!(captured, Some(expected));
}
#[tokio::test]
async fn send_inter_agent_communication_without_turn_queues_message_without_triggering_turn() {
let harness = AgentControlHarness::new().await;
let (thread_id, thread) = harness.start_thread().await;
let communication = InterAgentCommunication::new(
AgentPath::root(),
AgentPath::try_from("/root/worker").expect("agent path"),
Vec::new(),
"hello from tests".to_string(),
/*trigger_turn*/ false,
);
let submission_id = harness
.control
.send_inter_agent_communication(thread_id, communication.clone())
.await
.expect("send_inter_agent_communication should succeed");
assert!(!submission_id.is_empty());
let expected = (
thread_id,
Op::InterAgentCommunication {
communication: communication.clone(),
},
);
let captured = harness
.manager
.captured_ops()
.into_iter()
.find(|entry| *entry == expected);
assert_eq!(captured, Some(expected));
timeout(Duration::from_secs(5), async {
loop {
if thread.codex.session.has_pending_input().await {
break;
}
sleep(Duration::from_millis(10)).await;
}
})
.await
.expect("inter-agent communication should stay pending");
let history_items = thread
.codex
.session
.clone_history()
.await
.raw_items()
.to_vec();
assert!(!history_contains_assistant_inter_agent_communication(
&history_items,
&communication
));
}
#[tokio::test]
async fn append_message_records_assistant_message() {
let harness = AgentControlHarness::new().await;
let (thread_id, thread) = harness.start_thread().await;
let message =
"author: /root\nrecipient: /root/worker\nother_recipients: []\nContent: hello from tests";
let submission_id = harness
.control
.append_message(
thread_id,
ResponseItem::Message {
id: None,
role: "assistant".to_string(),
content: vec![ContentItem::InputText {
text: message.to_string(),
}],
phase: None,
},
)
.await
.expect("append_message should succeed");
assert!(!submission_id.is_empty());
timeout(Duration::from_secs(5), async {
loop {
let history_items = thread
.codex
.session
.clone_history()
.await
.raw_items()
.to_vec();
let recorded = history_items.iter().any(|item| {
matches!(
item,
ResponseItem::Message { role, content, .. }
if role == "assistant"
&& content.iter().any(|content_item| matches!(
content_item,
ContentItem::InputText { text } if text == message
))
)
});
if recorded {
break;
}
sleep(Duration::from_millis(10)).await;
}
})
.await
.expect("assistant message should be recorded");
}
#[tokio::test]
async fn spawn_agent_creates_thread_and_sends_prompt() {
let harness = AgentControlHarness::new().await;
let thread_id = harness
.control
.spawn_agent(
harness.config.clone(),
text_input("spawned"),
/*session_source*/ None,
)
.await
.expect("spawn_agent should succeed");
let _thread = harness
.manager
.get_thread(thread_id)
.await
.expect("thread should be registered");
let expected = (
thread_id,
Op::UserInput {
environments: None,
items: vec![UserInput::Text {
text: "spawned".to_string(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
responsesapi_client_metadata: None,
},
);
let captured = harness
.manager
.captured_ops()
.into_iter()
.find(|entry| *entry == expected);
assert_eq!(captured, Some(expected));
}
#[tokio::test]
async fn spawn_agent_can_fork_parent_thread_history_with_sanitized_items() {
let harness = AgentControlHarness::new().await;
let mut parent_config = harness.config.clone();
let _ = parent_config.features.enable(Feature::MultiAgentV2);
parent_config.multi_agent_v2.root_agent_usage_hint_text =
Some("Parent root guidance.".to_string());
parent_config.multi_agent_v2.subagent_usage_hint_text =
Some("Parent subagent guidance.".to_string());
let mut child_config = harness.config.clone();
let _ = child_config.features.enable(Feature::MultiAgentV2);
child_config.multi_agent_v2.root_agent_usage_hint_text =
Some("Child root guidance.".to_string());
child_config.multi_agent_v2.subagent_usage_hint_text =
Some("Child subagent guidance.".to_string());
let new_thread = harness
.manager
.start_thread(parent_config.clone())
.await
.expect("start parent thread");
let parent_thread_id = new_thread.thread_id;
let parent_thread = new_thread.thread;
parent_thread
.inject_user_message_without_turn("parent seed context".to_string())
.await;
let turn_context = parent_thread.codex.session.new_default_turn().await;
let parent_spawn_call_id = "spawn-call-history".to_string();
let trigger_message = InterAgentCommunication::new(
AgentPath::root(),
AgentPath::try_from("/root/worker").expect("agent path"),
Vec::new(),
"parent trigger message".to_string(),
/*trigger_turn*/ true,
);
parent_thread
.codex
.session
.record_conversation_items(
turn_context.as_ref(),
&[
ResponseItem::Message {
id: None,
role: "developer".to_string(),
content: vec![ContentItem::InputText {
text: "Parent root guidance.".to_string(),
}],
phase: None,
},
ResponseItem::Message {
id: None,
role: "developer".to_string(),
content: vec![ContentItem::InputText {
text: "Parent subagent guidance.".to_string(),
}],
phase: None,
},
assistant_message("parent commentary", Some(MessagePhase::Commentary)),
assistant_message("parent final answer", Some(MessagePhase::FinalAnswer)),
assistant_message("parent unknown phase", /*phase*/ None),
ResponseItem::Reasoning {
id: "parent-reasoning".to_string(),
summary: Vec::new(),
content: None,
encrypted_content: None,
},
trigger_message.to_response_input_item().into(),
spawn_agent_call(&parent_spawn_call_id),
],
)
.await;
parent_thread
.codex
.session
.ensure_rollout_materialized()
.await;
parent_thread
.codex
.session
.flush_rollout()
.await
.expect("parent rollout should flush");
let child_thread_id = harness
.control
.spawn_agent_with_metadata(
child_config,
text_input("child task"),
Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
parent_thread_id,
depth: 1,
agent_path: None,
agent_nickname: None,
agent_role: None,
})),
SpawnAgentOptions {
fork_parent_spawn_call_id: Some(parent_spawn_call_id.clone()),
fork_mode: Some(SpawnAgentForkMode::FullHistory),
..Default::default()
},
)
.await
.expect("forked spawn should succeed")
.thread_id;
let child_thread = harness
.manager
.get_thread(child_thread_id)
.await
.expect("child thread should be registered");
assert_ne!(child_thread_id, parent_thread_id);
let history = child_thread.codex.session.clone_history().await;
let expected_history = [
ResponseItem::Message {
id: None,
role: "user".to_string(),
content: vec![ContentItem::InputText {
text: "parent seed context".to_string(),
}],
phase: None,
},
assistant_message("parent final answer", Some(MessagePhase::FinalAnswer)),
];
assert_eq!(
history.raw_items(),
&expected_history,
"forked child history should keep only parent user messages and assistant final answers"
);
let expected = (
child_thread_id,
Op::UserInput {
environments: None,
items: vec![UserInput::Text {
text: "child task".to_string(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
responsesapi_client_metadata: None,
},
);
let captured = harness
.manager
.captured_ops()
.into_iter()
.find(|entry| *entry == expected);
assert_eq!(captured, Some(expected));
let _ = harness
.control
.shutdown_live_agent(child_thread_id)
.await
.expect("child shutdown should submit");
let _ = parent_thread
.submit(Op::Shutdown {})
.await
.expect("parent shutdown should submit");
}
#[tokio::test]
async fn spawn_agent_fork_flushes_parent_rollout_before_loading_history() {
let harness = AgentControlHarness::new().await;
let (parent_thread_id, parent_thread) = harness.start_thread().await;
let turn_context = parent_thread.codex.session.new_default_turn().await;
let parent_spawn_call_id = "spawn-call-unflushed".to_string();
parent_thread
.codex
.session
.record_conversation_items(
turn_context.as_ref(),
&[
assistant_message("unflushed final answer", Some(MessagePhase::FinalAnswer)),
spawn_agent_call(&parent_spawn_call_id),
],
)
.await;
let child_thread_id = harness
.control
.spawn_agent_with_metadata(
harness.config.clone(),
text_input("child task"),
Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
parent_thread_id,
depth: 1,
agent_path: None,
agent_nickname: None,
agent_role: None,
})),
SpawnAgentOptions {
fork_parent_spawn_call_id: Some(parent_spawn_call_id.clone()),
fork_mode: Some(SpawnAgentForkMode::FullHistory),
..Default::default()
},
)
.await
.expect("forked spawn should flush parent rollout before loading history")
.thread_id;
let child_thread = harness
.manager
.get_thread(child_thread_id)
.await
.expect("child thread should be registered");
let history = child_thread.codex.session.clone_history().await;
assert!(
history_contains_text(history.raw_items(), "unflushed final answer"),
"forked child history should include unflushed assistant final answers after flushing the parent rollout"
);
let _ = harness
.control
.shutdown_live_agent(child_thread_id)
.await
.expect("child shutdown should submit");
let _ = parent_thread
.submit(Op::Shutdown {})
.await
.expect("parent shutdown should submit");
}
#[tokio::test]
async fn spawn_agent_fork_last_n_turns_keeps_only_recent_turns() {
let harness = AgentControlHarness::new().await;
let (parent_thread_id, parent_thread) = harness.start_thread().await;
parent_thread
.inject_user_message_without_turn("old parent context".to_string())
.await;
let queued_communication = InterAgentCommunication::new(
AgentPath::root(),
AgentPath::try_from("/root/worker").expect("agent path"),
Vec::new(),
"queued message".to_string(),
/*trigger_turn*/ false,
);
let queued_turn_context = parent_thread.codex.session.new_default_turn().await;
parent_thread
.codex
.session
.record_conversation_items(
queued_turn_context.as_ref(),
&[queued_communication.to_response_input_item().into()],
)
.await;
let triggered_communication = InterAgentCommunication::new(
AgentPath::root(),
AgentPath::try_from("/root/worker").expect("agent path"),
Vec::new(),
"triggered context".to_string(),
/*trigger_turn*/ true,
);
let triggered_turn_context = parent_thread.codex.session.new_default_turn().await;
parent_thread
.codex
.session
.record_conversation_items(
triggered_turn_context.as_ref(),
&[triggered_communication.to_response_input_item().into()],
)
.await;
parent_thread
.inject_user_message_without_turn("current parent task".to_string())
.await;
let spawn_turn_context = parent_thread.codex.session.new_default_turn().await;
let parent_spawn_call_id = "spawn-call-last-n".to_string();
parent_thread
.codex
.session
.record_conversation_items(
spawn_turn_context.as_ref(),
&[spawn_agent_call(&parent_spawn_call_id)],
)
.await;
parent_thread
.codex
.session
.ensure_rollout_materialized()
.await;
parent_thread
.codex
.session
.flush_rollout()
.await
.expect("parent rollout should flush");
let child_thread_id = harness
.control
.spawn_agent_with_metadata(
harness.config.clone(),
text_input("child task"),
Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
parent_thread_id,
depth: 1,
agent_path: None,
agent_nickname: None,
agent_role: None,
})),
SpawnAgentOptions {
fork_parent_spawn_call_id: Some(parent_spawn_call_id.clone()),
fork_mode: Some(SpawnAgentForkMode::LastNTurns(2)),
..Default::default()
},
)
.await
.expect("forked spawn should keep only the last two turns")
.thread_id;
let child_thread = harness
.manager
.get_thread(child_thread_id)
.await
.expect("child thread should be registered");
let history = child_thread.codex.session.clone_history().await;
assert!(
!history_contains_text(history.raw_items(), "old parent context"),
"forked child history should drop parent context outside the requested last-N turn window"
);
assert!(
!history_contains_text(history.raw_items(), "queued message"),
"forked child history should drop queued inter-agent messages outside the requested last-N turn window"
);
assert!(
!history_contains_text(history.raw_items(), "triggered context"),
"forked child history should filter assistant inter-agent messages even when they fall inside the requested last-N turn window"
);
assert!(
history_contains_text(history.raw_items(), "current parent task"),
"forked child history should keep the parent user message from the requested last-N turn window"
);
let _ = harness
.control
.shutdown_live_agent(child_thread_id)
.await
.expect("child shutdown should submit");
let _ = parent_thread
.submit(Op::Shutdown {})
.await
.expect("parent shutdown should submit");
}
#[tokio::test]
async fn spawn_agent_respects_max_threads_limit() {
let max_threads = 1usize;
let (_home, config) = test_config_with_cli_overrides(vec![(
"agents.max_threads".to_string(),
TomlValue::Integer(max_threads as i64),
)])
.await;
let manager = ThreadManager::with_models_provider_and_home_for_tests(
CodexAuth::from_api_key("dummy"),
config.model_provider.clone(),
config.codex_home.to_path_buf(),
std::sync::Arc::new(codex_exec_server::EnvironmentManager::default_for_tests()),
);
let control = manager.agent_control();
let _ = manager
.start_thread(config.clone())
.await
.expect("start thread");
let first_agent_id = control
.spawn_agent(
config.clone(),
text_input("hello"),
/*session_source*/ None,
)
.await
.expect("spawn_agent should succeed");
let err = control
.spawn_agent(
config,
text_input("hello again"),
/*session_source*/ None,
)
.await
.expect_err("spawn_agent should respect max threads");
let CodexErr::AgentLimitReached {
max_threads: seen_max_threads,
} = err
else {
panic!("expected CodexErr::AgentLimitReached");
};
assert_eq!(seen_max_threads, max_threads);
let _ = control
.shutdown_live_agent(first_agent_id)
.await
.expect("shutdown agent");
}
#[tokio::test]
async fn spawn_agent_releases_slot_after_shutdown() {
let max_threads = 1usize;
let (_home, config) = test_config_with_cli_overrides(vec![(
"agents.max_threads".to_string(),
TomlValue::Integer(max_threads as i64),
)])
.await;
let manager = ThreadManager::with_models_provider_and_home_for_tests(
CodexAuth::from_api_key("dummy"),
config.model_provider.clone(),
config.codex_home.to_path_buf(),
std::sync::Arc::new(codex_exec_server::EnvironmentManager::default_for_tests()),
);
let control = manager.agent_control();
let first_agent_id = control
.spawn_agent(
config.clone(),
text_input("hello"),
/*session_source*/ None,
)
.await
.expect("spawn_agent should succeed");
let _ = control
.shutdown_live_agent(first_agent_id)
.await
.expect("shutdown agent");
let second_agent_id = control
.spawn_agent(
config.clone(),
text_input("hello again"),
/*session_source*/ None,
)
.await
.expect("spawn_agent should succeed after shutdown");
let _ = control
.shutdown_live_agent(second_agent_id)
.await
.expect("shutdown agent");
}
#[tokio::test]
async fn spawn_agent_limit_shared_across_clones() {
let max_threads = 1usize;
let (_home, config) = test_config_with_cli_overrides(vec![(
"agents.max_threads".to_string(),
TomlValue::Integer(max_threads as i64),
)])
.await;
let manager = ThreadManager::with_models_provider_and_home_for_tests(
CodexAuth::from_api_key("dummy"),
config.model_provider.clone(),
config.codex_home.to_path_buf(),
std::sync::Arc::new(codex_exec_server::EnvironmentManager::default_for_tests()),
);
let control = manager.agent_control();
let cloned = control.clone();
let first_agent_id = cloned
.spawn_agent(
config.clone(),
text_input("hello"),
/*session_source*/ None,
)
.await
.expect("spawn_agent should succeed");
let err = control
.spawn_agent(
config,
text_input("hello again"),
/*session_source*/ None,
)
.await
.expect_err("spawn_agent should respect shared guard");
let CodexErr::AgentLimitReached { max_threads } = err else {
panic!("expected CodexErr::AgentLimitReached");
};
assert_eq!(max_threads, 1);
let _ = control
.shutdown_live_agent(first_agent_id)
.await
.expect("shutdown agent");
}
#[tokio::test]
async fn resume_agent_respects_max_threads_limit() {
let max_threads = 1usize;
let (_home, config) = test_config_with_cli_overrides(vec![(
"agents.max_threads".to_string(),
TomlValue::Integer(max_threads as i64),
)])
.await;
let manager = ThreadManager::with_models_provider_and_home_for_tests(
CodexAuth::from_api_key("dummy"),
config.model_provider.clone(),
config.codex_home.to_path_buf(),
std::sync::Arc::new(codex_exec_server::EnvironmentManager::default_for_tests()),
);
let control = manager.agent_control();
let resumable_id = control
.spawn_agent(
config.clone(),
text_input("hello"),
/*session_source*/ None,
)
.await
.expect("spawn_agent should succeed");
let _ = control
.shutdown_live_agent(resumable_id)
.await
.expect("shutdown resumable thread");
let active_id = control
.spawn_agent(
config.clone(),
text_input("occupy"),
/*session_source*/ None,
)
.await
.expect("spawn_agent should succeed for active slot");
let err = control
.resume_agent_from_rollout(config, resumable_id, SessionSource::Exec)
.await
.expect_err("resume should respect max threads");
let CodexErr::AgentLimitReached {
max_threads: seen_max_threads,
} = err
else {
panic!("expected CodexErr::AgentLimitReached");
};
assert_eq!(seen_max_threads, max_threads);
let _ = control
.shutdown_live_agent(active_id)
.await
.expect("shutdown active thread");
}
#[tokio::test]
async fn resume_agent_releases_slot_after_resume_failure() {
let max_threads = 1usize;
let (_home, config) = test_config_with_cli_overrides(vec![(
"agents.max_threads".to_string(),
TomlValue::Integer(max_threads as i64),
)])
.await;
let manager = ThreadManager::with_models_provider_and_home_for_tests(
CodexAuth::from_api_key("dummy"),
config.model_provider.clone(),
config.codex_home.to_path_buf(),
std::sync::Arc::new(codex_exec_server::EnvironmentManager::default_for_tests()),
);
let control = manager.agent_control();
let _ = control
.resume_agent_from_rollout(config.clone(), ThreadId::new(), SessionSource::Exec)
.await
.expect_err("resume should fail for missing rollout path");
let resumed_id = control
.spawn_agent(config, text_input("hello"), /*session_source*/ None)
.await
.expect("spawn should succeed after failed resume");
let _ = control
.shutdown_live_agent(resumed_id)
.await
.expect("shutdown resumed thread");
}
#[tokio::test]
async fn spawn_child_completion_notifies_parent_history() {
let harness = AgentControlHarness::new().await;
let (parent_thread_id, parent_thread) = harness.start_thread().await;
let child_thread_id = harness
.control
.spawn_agent(
harness.config.clone(),
text_input("hello child"),
Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
parent_thread_id,
depth: 1,
agent_path: None,
agent_nickname: None,
agent_role: Some("explorer".to_string()),
})),
)
.await
.expect("child spawn should succeed");
let child_thread = harness
.manager
.get_thread(child_thread_id)
.await
.expect("child thread should exist");
let _ = child_thread
.submit(Op::Shutdown {})
.await
.expect("child shutdown should submit");
assert_eq!(wait_for_subagent_notification(&parent_thread).await, true);
}
#[tokio::test]
async fn multi_agent_v2_completion_ignores_dead_direct_parent() {
let harness = AgentControlHarness::new().await;
let (root_thread_id, root_thread) = harness.start_thread().await;
let mut config = harness.config.clone();
let _ = config.features.enable(Feature::MultiAgentV2);
let worker_path = AgentPath::root().join("worker_a").expect("worker path");
let worker_thread_id = harness
.control
.spawn_agent(
config.clone(),
text_input("hello worker"),
Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
parent_thread_id: root_thread_id,
depth: 1,
agent_path: Some(worker_path.clone()),
agent_nickname: None,
agent_role: Some("explorer".to_string()),
})),
)
.await
.expect("worker spawn should succeed");
let tester_path = worker_path.join("tester").expect("tester path");
let tester_thread_id = harness
.control
.spawn_agent(
config,
text_input("hello tester"),
Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
parent_thread_id: worker_thread_id,
depth: 2,
agent_path: Some(tester_path.clone()),
agent_nickname: None,
agent_role: Some("explorer".to_string()),
})),
)
.await
.expect("tester spawn should succeed");
harness
.control
.shutdown_live_agent(worker_thread_id)
.await
.expect("worker shutdown should succeed");
let tester_thread = harness
.manager
.get_thread(tester_thread_id)
.await
.expect("tester thread should exist");
let tester_turn = tester_thread.codex.session.new_default_turn().await;
tester_thread
.codex
.session
.send_event(
tester_turn.as_ref(),
EventMsg::TurnComplete(TurnCompleteEvent {
turn_id: tester_turn.sub_id.clone(),
last_agent_message: Some("done".to_string()),
completed_at: None,
duration_ms: None,
time_to_first_token_ms: None,
}),
)
.await;
sleep(Duration::from_millis(100)).await;
assert!(
!harness
.manager
.captured_ops()
.into_iter()
.any(|(thread_id, op)| {
thread_id == worker_thread_id
&& matches!(
op,
Op::InterAgentCommunication { communication }
if communication.author == tester_path
&& communication.recipient == worker_path
&& communication.content == "done"
)
})
);
let root_history_items = root_thread
.codex
.session
.clone_history()
.await
.raw_items()
.to_vec();
assert!(!history_contains_assistant_inter_agent_communication(
&root_history_items,
&InterAgentCommunication::new(
tester_path,
AgentPath::root(),
Vec::new(),
"done".to_string(),
/*trigger_turn*/ true,
)
));
assert!(!has_subagent_notification(&root_history_items));
}
#[tokio::test]
async fn multi_agent_v2_completion_queues_message_for_direct_parent() {
let harness = AgentControlHarness::new().await;
let (_root_thread_id, root_thread) = harness.start_thread().await;
let (worker_thread_id, _worker_thread) = harness.start_thread().await;
let mut tester_config = harness.config.clone();
let _ = tester_config.features.enable(Feature::MultiAgentV2);
let tester_thread_id = harness
.manager
.start_thread(tester_config.clone())
.await
.expect("tester thread should start")
.thread_id;
let tester_thread = harness
.manager
.get_thread(tester_thread_id)
.await
.expect("tester thread should exist");
let worker_path = AgentPath::root().join("worker_a").expect("worker path");
let tester_path = worker_path.join("tester").expect("tester path");
harness.control.maybe_start_completion_watcher(
tester_thread_id,
Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
parent_thread_id: worker_thread_id,
depth: 2,
agent_path: Some(tester_path.clone()),
agent_nickname: None,
agent_role: Some("explorer".to_string()),
})),
tester_path.to_string(),
Some(tester_path.clone()),
);
let tester_turn = tester_thread.codex.session.new_default_turn().await;
tester_thread
.codex
.session
.send_event(
tester_turn.as_ref(),
EventMsg::TurnComplete(TurnCompleteEvent {
turn_id: tester_turn.sub_id.clone(),
last_agent_message: Some("done".to_string()),
completed_at: None,
duration_ms: None,
time_to_first_token_ms: None,
}),
)
.await;
let expected_message = crate::session_prefix::format_subagent_notification_message(
tester_path.as_str(),
&AgentStatus::Completed(Some("done".to_string())),
);
let expected = (
worker_thread_id,
Op::InterAgentCommunication {
communication: InterAgentCommunication::new(
tester_path.clone(),
worker_path.clone(),
Vec::new(),
expected_message.clone(),
/*trigger_turn*/ false,
),
},
);
timeout(Duration::from_secs(5), async {
loop {
let captured = harness
.manager
.captured_ops()
.into_iter()
.find(|entry| *entry == expected);
if captured == Some(expected.clone()) {
break;
}
sleep(Duration::from_millis(10)).await;
}
})
.await
.expect("completion watcher should queue a direct-parent message");
let root_history_items = root_thread
.codex
.session
.clone_history()
.await
.raw_items()
.to_vec();
assert!(!history_contains_assistant_inter_agent_communication(
&root_history_items,
&InterAgentCommunication::new(
tester_path,
AgentPath::root(),
Vec::new(),
expected_message,
/*trigger_turn*/ false,
)
));
}
#[tokio::test]
async fn completion_watcher_notifies_parent_when_child_is_missing() {
let harness = AgentControlHarness::new().await;
let (parent_thread_id, parent_thread) = harness.start_thread().await;
let child_thread_id = ThreadId::new();
harness.control.maybe_start_completion_watcher(
child_thread_id,
Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
parent_thread_id,
depth: 1,
agent_path: None,
agent_nickname: None,
agent_role: Some("explorer".to_string()),
})),
child_thread_id.to_string(),
/*child_agent_path*/ None,
);
assert_eq!(wait_for_subagent_notification(&parent_thread).await, true);
let history_items = parent_thread
.codex
.session
.clone_history()
.await
.raw_items()
.to_vec();
assert_eq!(
history_contains_text(
&history_items,
&format!("\"agent_path\":\"{child_thread_id}\"")
),
true
);
assert_eq!(
history_contains_text(&history_items, "\"status\":\"not_found\""),
true
);
}
#[tokio::test]
async fn spawn_thread_subagent_gets_random_nickname_in_session_source() {
let harness = AgentControlHarness::new().await;
let (parent_thread_id, _parent_thread) = harness.start_thread().await;
let child_thread_id = harness
.control
.spawn_agent(
harness.config.clone(),
text_input("hello child"),
Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
parent_thread_id,
depth: 1,
agent_path: None,
agent_nickname: None,
agent_role: Some("explorer".to_string()),
})),
)
.await
.expect("child spawn should succeed");
let child_thread = harness
.manager
.get_thread(child_thread_id)
.await
.expect("child thread should be registered");
let snapshot = child_thread.config_snapshot().await;
let SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
parent_thread_id: seen_parent_thread_id,
depth,
agent_nickname,
agent_role,
..
}) = snapshot.session_source
else {
panic!("expected thread-spawn sub-agent source");
};
assert_eq!(seen_parent_thread_id, parent_thread_id);
assert_eq!(depth, 1);
assert!(agent_nickname.is_some());
assert_eq!(agent_role, Some("explorer".to_string()));
}
#[tokio::test]
async fn spawn_thread_subagent_uses_role_specific_nickname_candidates() {
let mut harness = AgentControlHarness::new().await;
harness.config.agent_roles.insert(
"researcher".to_string(),
AgentRoleConfig {
description: Some("Research role".to_string()),
config_file: None,
nickname_candidates: Some(vec!["Atlas".to_string()]),
},
);
let (parent_thread_id, _parent_thread) = harness.start_thread().await;
let child_thread_id = harness
.control
.spawn_agent(
harness.config.clone(),
text_input("hello child"),
Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
parent_thread_id,
depth: 1,
agent_path: None,
agent_nickname: None,
agent_role: Some("researcher".to_string()),
})),
)
.await
.expect("child spawn should succeed");
let child_thread = harness
.manager
.get_thread(child_thread_id)
.await
.expect("child thread should be registered");
let snapshot = child_thread.config_snapshot().await;
let SessionSource::SubAgent(SubAgentSource::ThreadSpawn { agent_nickname, .. }) =
snapshot.session_source
else {
panic!("expected thread-spawn sub-agent source");
};
assert_eq!(agent_nickname, Some("Atlas".to_string()));
}
#[tokio::test]
async fn resume_thread_subagent_restores_stored_nickname_and_role() {
let (home, mut config) = test_config().await;
config
.features
.enable(Feature::Sqlite)
.expect("test config should allow sqlite");
let state_db = init_state_db(&config).await;
let manager = ThreadManager::with_models_provider_home_and_state_for_tests(
CodexAuth::from_api_key("dummy"),
config.model_provider.clone(),
config.codex_home.to_path_buf(),
std::sync::Arc::new(codex_exec_server::EnvironmentManager::default_for_tests()),
state_db.clone(),
);
let control = manager.agent_control();
let harness = AgentControlHarness {
_home: home,
config,
state_db,
manager,
control,
};
let (parent_thread_id, _parent_thread) = harness.start_thread().await;
let agent_path = AgentPath::from_string("/root/explorer".to_string())
.expect("test agent path should be valid");
let child_thread_id = harness
.control
.spawn_agent(
harness.config.clone(),
text_input("hello child"),
Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
parent_thread_id,
depth: 1,
agent_path: Some(agent_path.clone()),
agent_nickname: None,
agent_role: Some("explorer".to_string()),
})),
)
.await
.expect("child spawn should succeed");
let child_thread = harness
.manager
.get_thread(child_thread_id)
.await
.expect("child thread should exist");
let mut status_rx = harness
.control
.subscribe_status(child_thread_id)
.await
.expect("status subscription should succeed");
if matches!(status_rx.borrow().clone(), AgentStatus::PendingInit) {
timeout(Duration::from_secs(5), async {
loop {
status_rx
.changed()
.await
.expect("child status should advance past pending init");
if !matches!(status_rx.borrow().clone(), AgentStatus::PendingInit) {
break;
}
}
})
.await
.expect("child should initialize before shutdown");
}
let original_snapshot = child_thread.config_snapshot().await;
let original_nickname = original_snapshot
.session_source
.get_nickname()
.expect("spawned sub-agent should have a nickname");
let state_db = child_thread
.state_db()
.expect("sqlite state db should be available for nickname resume test");
timeout(Duration::from_secs(5), async {
loop {
if let Ok(Some(metadata)) = state_db.get_thread(child_thread_id).await
&& metadata.agent_nickname.is_some()
&& metadata.agent_role.as_deref() == Some("explorer")
{
break;
}
sleep(Duration::from_millis(10)).await;
}
})
.await
.expect("child thread metadata should be persisted to sqlite before shutdown");
let _ = harness
.control
.shutdown_live_agent(child_thread_id)
.await
.expect("child shutdown should submit");
if !matches!(status_rx.borrow().clone(), AgentStatus::Shutdown) {
timeout(Duration::from_secs(5), async {
loop {
status_rx
.changed()
.await
.expect("child status should reach shutdown");
if matches!(status_rx.borrow().clone(), AgentStatus::Shutdown) {
break;
}
}
})
.await
.expect("child should shut down before resume");
}
drop(child_thread);
let resumed_thread_id = harness
.control
.resume_agent_from_rollout(
harness.config.clone(),
child_thread_id,
SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
parent_thread_id,
depth: 1,
agent_path: Some(agent_path.clone()),
agent_nickname: None,
agent_role: None,
}),
)
.await
.expect("resume should succeed");
assert_eq!(resumed_thread_id, child_thread_id);
let resumed_snapshot = harness
.manager
.get_thread(resumed_thread_id)
.await
.expect("resumed child thread should exist")
.config_snapshot()
.await;
let SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
parent_thread_id: resumed_parent_thread_id,
depth: resumed_depth,
agent_path: resumed_agent_path,
agent_nickname: resumed_nickname,
agent_role: resumed_role,
..
}) = resumed_snapshot.session_source
else {
panic!("expected thread-spawn sub-agent source");
};
assert_eq!(resumed_parent_thread_id, parent_thread_id);
assert_eq!(resumed_depth, 1);
assert_eq!(resumed_agent_path, Some(agent_path));
assert_eq!(resumed_nickname, Some(original_nickname));
assert_eq!(resumed_role, Some("explorer".to_string()));
let _ = harness
.control
.shutdown_live_agent(resumed_thread_id)
.await
.expect("resumed child shutdown should submit");
}
#[tokio::test]
async fn resume_agent_from_rollout_reads_archived_rollout_path() {
let harness = AgentControlHarness::new().await;
let child_thread_id = harness
.control
.spawn_agent(
harness.config.clone(),
text_input("hello"),
/*session_source*/ None,
)
.await
.expect("child spawn should succeed");
let child_thread = harness
.manager
.get_thread(child_thread_id)
.await
.expect("child thread should exist");
persist_thread_for_tree_resume(&child_thread, "persist before archiving").await;
let _ = harness
.control
.shutdown_live_agent(child_thread_id)
.await
.expect("child shutdown should succeed");
let store = LocalThreadStore::new(
LocalThreadStoreConfig::from_config(&harness.config),
harness.state_db.clone(),
);
store
.archive_thread(ArchiveThreadParams {
thread_id: child_thread_id,
})
.await
.expect("child thread should archive");
let resumed_thread_id = harness
.control
.resume_agent_from_rollout(harness.config.clone(), child_thread_id, SessionSource::Exec)
.await
.expect("resume should find archived rollout");
assert_eq!(resumed_thread_id, child_thread_id);
let _ = harness
.control
.shutdown_live_agent(child_thread_id)
.await
.expect("resumed child shutdown should succeed");
}
#[tokio::test]
async fn list_agent_subtree_thread_ids_includes_anonymous_and_closed_descendants() {
let harness = AgentControlHarness::new().await;
let (parent_thread_id, _parent_thread) = harness.start_thread().await;
let worker_path = AgentPath::root().join("worker").expect("worker path");
let reviewer_path = AgentPath::root().join("reviewer").expect("reviewer path");
let worker_thread_id = harness
.control
.spawn_agent(
harness.config.clone(),
text_input("hello worker"),
Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
parent_thread_id,
depth: 1,
agent_path: Some(worker_path.clone()),
agent_nickname: None,
agent_role: Some("worker".to_string()),
})),
)
.await
.expect("worker spawn should succeed");
let worker_child_thread_id = harness
.control
.spawn_agent(
harness.config.clone(),
text_input("hello worker child"),
Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
parent_thread_id: worker_thread_id,
depth: 2,
agent_path: Some(
worker_path
.join("child")
.expect("worker child path should be valid"),
),
agent_nickname: None,
agent_role: Some("worker".to_string()),
})),
)
.await
.expect("worker child spawn should succeed");
let no_path_child_thread_id = harness
.control
.spawn_agent(
harness.config.clone(),
text_input("hello anonymous child"),
Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
parent_thread_id: worker_thread_id,
depth: 2,
agent_path: None,
agent_nickname: None,
agent_role: Some("worker".to_string()),
})),
)
.await
.expect("no-path child spawn should succeed");
let no_path_grandchild_thread_id = harness
.control
.spawn_agent(
harness.config.clone(),
text_input("hello anonymous grandchild"),
Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
parent_thread_id: no_path_child_thread_id,
depth: 3,
agent_path: None,
agent_nickname: None,
agent_role: Some("worker".to_string()),
})),
)
.await
.expect("no-path grandchild spawn should succeed");
let _reviewer_thread_id = harness
.control
.spawn_agent(
harness.config.clone(),
text_input("hello reviewer"),
Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
parent_thread_id,
depth: 1,
agent_path: Some(reviewer_path),
agent_nickname: None,
agent_role: Some("reviewer".to_string()),
})),
)
.await
.expect("reviewer spawn should succeed");
let _ = harness
.control
.shutdown_live_agent(no_path_grandchild_thread_id)
.await
.expect("no-path grandchild shutdown should succeed");
let mut worker_subtree_thread_ids = harness
.manager
.list_agent_subtree_thread_ids(worker_thread_id)
.await
.expect("worker subtree thread ids should load");
worker_subtree_thread_ids.sort_by_key(ToString::to_string);
let mut expected_worker_subtree_thread_ids = vec![
worker_thread_id,
worker_child_thread_id,
no_path_child_thread_id,
no_path_grandchild_thread_id,
];
expected_worker_subtree_thread_ids.sort_by_key(ToString::to_string);
assert_eq!(
worker_subtree_thread_ids,
expected_worker_subtree_thread_ids
);
let mut no_path_child_subtree_thread_ids = harness
.manager
.list_agent_subtree_thread_ids(no_path_child_thread_id)
.await
.expect("no-path subtree thread ids should load");
no_path_child_subtree_thread_ids.sort_by_key(ToString::to_string);
let mut expected_no_path_child_subtree_thread_ids =
vec![no_path_child_thread_id, no_path_grandchild_thread_id];
expected_no_path_child_subtree_thread_ids.sort_by_key(ToString::to_string);
assert_eq!(
no_path_child_subtree_thread_ids,
expected_no_path_child_subtree_thread_ids
);
}
#[tokio::test]
async fn shutdown_agent_tree_closes_live_descendants() {
let harness = AgentControlHarness::new().await;
let (parent_thread_id, _parent_thread) = harness.start_thread().await;
let child_thread_id = harness
.control
.spawn_agent(
harness.config.clone(),
text_input("hello child"),
Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
parent_thread_id,
depth: 1,
agent_path: None,
agent_nickname: None,
agent_role: Some("explorer".to_string()),
})),
)
.await
.expect("child spawn should succeed");
let grandchild_thread_id = harness
.control
.spawn_agent(
harness.config.clone(),
text_input("hello grandchild"),
Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
parent_thread_id: child_thread_id,
depth: 2,
agent_path: None,
agent_nickname: None,
agent_role: Some("worker".to_string()),
})),
)
.await
.expect("grandchild spawn should succeed");
let child_thread = harness
.manager
.get_thread(child_thread_id)
.await
.expect("child thread should exist");
let grandchild_thread = harness
.manager
.get_thread(grandchild_thread_id)
.await
.expect("grandchild thread should exist");
persist_thread_for_tree_resume(&child_thread, "child persisted").await;
persist_thread_for_tree_resume(&grandchild_thread, "grandchild persisted").await;
wait_for_live_thread_spawn_children(&harness.control, parent_thread_id, &[child_thread_id])
.await;
wait_for_live_thread_spawn_children(&harness.control, child_thread_id, &[grandchild_thread_id])
.await;
let _ = harness
.control
.shutdown_agent_tree(parent_thread_id)
.await
.expect("tree shutdown should succeed");
assert_eq!(
harness.control.get_status(parent_thread_id).await,
AgentStatus::NotFound
);
assert_eq!(
harness.control.get_status(child_thread_id).await,
AgentStatus::NotFound
);
assert_eq!(
harness.control.get_status(grandchild_thread_id).await,
AgentStatus::NotFound
);
let shutdown_ids = harness
.manager
.captured_ops()
.into_iter()
.filter_map(|(thread_id, op)| matches!(op, Op::Shutdown).then_some(thread_id))
.collect::<Vec<_>>();
let mut expected_shutdown_ids = vec![parent_thread_id, child_thread_id, grandchild_thread_id];
expected_shutdown_ids.sort_by_key(std::string::ToString::to_string);
let mut shutdown_ids = shutdown_ids;
shutdown_ids.sort_by_key(std::string::ToString::to_string);
assert_eq!(shutdown_ids, expected_shutdown_ids);
}
#[tokio::test]
async fn shutdown_agent_tree_closes_descendants_when_started_at_child() {
let harness = AgentControlHarness::new().await;
let (parent_thread_id, _parent_thread) = harness.start_thread().await;
let child_thread_id = harness
.control
.spawn_agent(
harness.config.clone(),
text_input("hello child"),
Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
parent_thread_id,
depth: 1,
agent_path: None,
agent_nickname: None,
agent_role: Some("explorer".to_string()),
})),
)
.await
.expect("child spawn should succeed");
let grandchild_thread_id = harness
.control
.spawn_agent(
harness.config.clone(),
text_input("hello grandchild"),
Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
parent_thread_id: child_thread_id,
depth: 2,
agent_path: None,
agent_nickname: None,
agent_role: Some("worker".to_string()),
})),
)
.await
.expect("grandchild spawn should succeed");
let child_thread = harness
.manager
.get_thread(child_thread_id)
.await
.expect("child thread should exist");
let grandchild_thread = harness
.manager
.get_thread(grandchild_thread_id)
.await
.expect("grandchild thread should exist");
persist_thread_for_tree_resume(&child_thread, "child persisted").await;
persist_thread_for_tree_resume(&grandchild_thread, "grandchild persisted").await;
wait_for_live_thread_spawn_children(&harness.control, parent_thread_id, &[child_thread_id])
.await;
wait_for_live_thread_spawn_children(&harness.control, child_thread_id, &[grandchild_thread_id])
.await;
let _ = harness
.control
.close_agent(child_thread_id)
.await
.expect("child close should succeed");
let _ = harness
.control
.shutdown_agent_tree(parent_thread_id)
.await
.expect("tree shutdown should succeed");
assert_eq!(
harness.control.get_status(child_thread_id).await,
AgentStatus::NotFound
);
assert_eq!(
harness.control.get_status(grandchild_thread_id).await,
AgentStatus::NotFound
);
assert_eq!(
harness.control.get_status(parent_thread_id).await,
AgentStatus::NotFound
);
let shutdown_ids = harness
.manager
.captured_ops()
.into_iter()
.filter_map(|(thread_id, op)| matches!(op, Op::Shutdown).then_some(thread_id))
.collect::<Vec<_>>();
let mut expected_shutdown_ids = vec![parent_thread_id, child_thread_id, grandchild_thread_id];
expected_shutdown_ids.sort_by_key(std::string::ToString::to_string);
let mut shutdown_ids = shutdown_ids;
shutdown_ids.sort_by_key(std::string::ToString::to_string);
assert_eq!(shutdown_ids, expected_shutdown_ids);
}
#[tokio::test]
async fn resume_agent_from_rollout_does_not_reopen_closed_descendants() {
let harness = AgentControlHarness::new().await;
let (parent_thread_id, parent_thread) = harness.start_thread().await;
let child_thread_id = harness
.control
.spawn_agent(
harness.config.clone(),
text_input("hello child"),
Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
parent_thread_id,
depth: 1,
agent_path: None,
agent_nickname: None,
agent_role: Some("explorer".to_string()),
})),
)
.await
.expect("child spawn should succeed");
let grandchild_thread_id = harness
.control
.spawn_agent(
harness.config.clone(),
text_input("hello grandchild"),
Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
parent_thread_id: child_thread_id,
depth: 2,
agent_path: None,
agent_nickname: None,
agent_role: Some("worker".to_string()),
})),
)
.await
.expect("grandchild spawn should succeed");
let child_thread = harness
.manager
.get_thread(child_thread_id)
.await
.expect("child thread should exist");
let grandchild_thread = harness
.manager
.get_thread(grandchild_thread_id)
.await
.expect("grandchild thread should exist");
persist_thread_for_tree_resume(&parent_thread, "parent persisted").await;
persist_thread_for_tree_resume(&child_thread, "child persisted").await;
persist_thread_for_tree_resume(&grandchild_thread, "grandchild persisted").await;
wait_for_live_thread_spawn_children(&harness.control, parent_thread_id, &[child_thread_id])
.await;
wait_for_live_thread_spawn_children(&harness.control, child_thread_id, &[grandchild_thread_id])
.await;
let _ = harness
.control
.close_agent(child_thread_id)
.await
.expect("child close should succeed");
let _ = harness
.control
.shutdown_live_agent(parent_thread_id)
.await
.expect("parent shutdown should succeed");
let resumed_parent_thread_id = harness
.control
.resume_agent_from_rollout(
harness.config.clone(),
parent_thread_id,
SessionSource::Exec,
)
.await
.expect("single-thread resume should succeed");
assert_eq!(resumed_parent_thread_id, parent_thread_id);
assert_ne!(
harness.control.get_status(parent_thread_id).await,
AgentStatus::NotFound
);
assert_eq!(
harness.control.get_status(child_thread_id).await,
AgentStatus::NotFound
);
assert_eq!(
harness.control.get_status(grandchild_thread_id).await,
AgentStatus::NotFound
);
let _ = harness
.control
.shutdown_agent_tree(parent_thread_id)
.await
.expect("tree shutdown after resume should succeed");
}
#[tokio::test]
async fn resume_closed_child_reopens_open_descendants() {
let harness = AgentControlHarness::new().await;
let (parent_thread_id, parent_thread) = harness.start_thread().await;
let child_thread_id = harness
.control
.spawn_agent(
harness.config.clone(),
text_input("hello child"),
Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
parent_thread_id,
depth: 1,
agent_path: None,
agent_nickname: None,
agent_role: Some("explorer".to_string()),
})),
)
.await
.expect("child spawn should succeed");
let grandchild_thread_id = harness
.control
.spawn_agent(
harness.config.clone(),
text_input("hello grandchild"),
Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
parent_thread_id: child_thread_id,
depth: 2,
agent_path: None,
agent_nickname: None,
agent_role: Some("worker".to_string()),
})),
)
.await
.expect("grandchild spawn should succeed");
let child_thread = harness
.manager
.get_thread(child_thread_id)
.await
.expect("child thread should exist");
let grandchild_thread = harness
.manager
.get_thread(grandchild_thread_id)
.await
.expect("grandchild thread should exist");
persist_thread_for_tree_resume(&parent_thread, "parent persisted").await;
persist_thread_for_tree_resume(&child_thread, "child persisted").await;
persist_thread_for_tree_resume(&grandchild_thread, "grandchild persisted").await;
wait_for_live_thread_spawn_children(&harness.control, parent_thread_id, &[child_thread_id])
.await;
wait_for_live_thread_spawn_children(&harness.control, child_thread_id, &[grandchild_thread_id])
.await;
let _ = harness
.control
.close_agent(child_thread_id)
.await
.expect("child close should succeed");
let resumed_child_thread_id = harness
.control
.resume_agent_from_rollout(
harness.config.clone(),
child_thread_id,
SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
parent_thread_id,
depth: 1,
agent_path: None,
agent_nickname: None,
agent_role: None,
}),
)
.await
.expect("child resume should succeed");
assert_eq!(resumed_child_thread_id, child_thread_id);
assert_ne!(
harness.control.get_status(child_thread_id).await,
AgentStatus::NotFound
);
assert_ne!(
harness.control.get_status(grandchild_thread_id).await,
AgentStatus::NotFound
);
let _ = harness
.control
.close_agent(child_thread_id)
.await
.expect("child close after resume should succeed");
let _ = harness
.control
.shutdown_live_agent(parent_thread_id)
.await
.expect("parent shutdown should succeed");
}
#[tokio::test]
async fn resume_agent_from_rollout_reopens_open_descendants_after_manager_shutdown() {
let harness = AgentControlHarness::new().await;
let (parent_thread_id, parent_thread) = harness.start_thread().await;
let child_thread_id = harness
.control
.spawn_agent(
harness.config.clone(),
text_input("hello child"),
Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
parent_thread_id,
depth: 1,
agent_path: None,
agent_nickname: None,
agent_role: Some("explorer".to_string()),
})),
)
.await
.expect("child spawn should succeed");
let grandchild_thread_id = harness
.control
.spawn_agent(
harness.config.clone(),
text_input("hello grandchild"),
Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
parent_thread_id: child_thread_id,
depth: 2,
agent_path: None,
agent_nickname: None,
agent_role: Some("worker".to_string()),
})),
)
.await
.expect("grandchild spawn should succeed");
let child_thread = harness
.manager
.get_thread(child_thread_id)
.await
.expect("child thread should exist");
let grandchild_thread = harness
.manager
.get_thread(grandchild_thread_id)
.await
.expect("grandchild thread should exist");
persist_thread_for_tree_resume(&parent_thread, "parent persisted").await;
persist_thread_for_tree_resume(&child_thread, "child persisted").await;
persist_thread_for_tree_resume(&grandchild_thread, "grandchild persisted").await;
wait_for_live_thread_spawn_children(&harness.control, parent_thread_id, &[child_thread_id])
.await;
wait_for_live_thread_spawn_children(&harness.control, child_thread_id, &[grandchild_thread_id])
.await;
let report = harness
.manager
.shutdown_all_threads_bounded(Duration::from_secs(5))
.await;
assert_eq!(report.submit_failed, Vec::<ThreadId>::new());
assert_eq!(report.timed_out, Vec::<ThreadId>::new());
let resumed_parent_thread_id = harness
.control
.resume_agent_from_rollout(
harness.config.clone(),
parent_thread_id,
SessionSource::Exec,
)
.await
.expect("tree resume should succeed");
assert_eq!(resumed_parent_thread_id, parent_thread_id);
assert_ne!(
harness.control.get_status(parent_thread_id).await,
AgentStatus::NotFound
);
assert_ne!(
harness.control.get_status(child_thread_id).await,
AgentStatus::NotFound
);
assert_ne!(
harness.control.get_status(grandchild_thread_id).await,
AgentStatus::NotFound
);
let _ = harness
.control
.shutdown_agent_tree(parent_thread_id)
.await
.expect("tree shutdown after subtree resume should succeed");
}
#[tokio::test]
async fn resume_agent_from_rollout_uses_edge_data_when_descendant_metadata_source_is_stale() {
let harness = AgentControlHarness::new().await;
let (parent_thread_id, parent_thread) = harness.start_thread().await;
let child_thread_id = harness
.control
.spawn_agent(
harness.config.clone(),
text_input("hello child"),
Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
parent_thread_id,
depth: 1,
agent_path: None,
agent_nickname: None,
agent_role: Some("explorer".to_string()),
})),
)
.await
.expect("child spawn should succeed");
let grandchild_thread_id = harness
.control
.spawn_agent(
harness.config.clone(),
text_input("hello grandchild"),
Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
parent_thread_id: child_thread_id,
depth: 2,
agent_path: None,
agent_nickname: None,
agent_role: Some("worker".to_string()),
})),
)
.await
.expect("grandchild spawn should succeed");
let child_thread = harness
.manager
.get_thread(child_thread_id)
.await
.expect("child thread should exist");
let grandchild_thread = harness
.manager
.get_thread(grandchild_thread_id)
.await
.expect("grandchild thread should exist");
persist_thread_for_tree_resume(&parent_thread, "parent persisted").await;
persist_thread_for_tree_resume(&child_thread, "child persisted").await;
persist_thread_for_tree_resume(&grandchild_thread, "grandchild persisted").await;
wait_for_live_thread_spawn_children(&harness.control, parent_thread_id, &[child_thread_id])
.await;
wait_for_live_thread_spawn_children(&harness.control, child_thread_id, &[grandchild_thread_id])
.await;
let state_db = grandchild_thread
.state_db()
.expect("sqlite state db should be available");
let mut stale_metadata = state_db
.get_thread(grandchild_thread_id)
.await
.expect("grandchild metadata query should succeed")
.expect("grandchild metadata should exist");
stale_metadata.source =
serde_json::to_string(&SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
parent_thread_id: ThreadId::new(),
depth: 99,
agent_path: None,
agent_nickname: None,
agent_role: Some("worker".to_string()),
}))
.expect("stale session source should serialize");
state_db
.upsert_thread(&stale_metadata)
.await
.expect("stale grandchild metadata should persist");
let report = harness
.manager
.shutdown_all_threads_bounded(Duration::from_secs(5))
.await;
assert_eq!(report.submit_failed, Vec::<ThreadId>::new());
assert_eq!(report.timed_out, Vec::<ThreadId>::new());
let resumed_parent_thread_id = harness
.control
.resume_agent_from_rollout(
harness.config.clone(),
parent_thread_id,
SessionSource::Exec,
)
.await
.expect("tree resume should succeed");
assert_eq!(resumed_parent_thread_id, parent_thread_id);
assert_ne!(
harness.control.get_status(parent_thread_id).await,
AgentStatus::NotFound
);
assert_ne!(
harness.control.get_status(child_thread_id).await,
AgentStatus::NotFound
);
assert_ne!(
harness.control.get_status(grandchild_thread_id).await,
AgentStatus::NotFound
);
let resumed_grandchild_snapshot = harness
.manager
.get_thread(grandchild_thread_id)
.await
.expect("resumed grandchild thread should exist")
.config_snapshot()
.await;
let SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
parent_thread_id: resumed_parent_thread_id,
depth: resumed_depth,
..
}) = resumed_grandchild_snapshot.session_source
else {
panic!("expected thread-spawn sub-agent source");
};
assert_eq!(resumed_parent_thread_id, child_thread_id);
assert_eq!(resumed_depth, 2);
let _ = harness
.control
.shutdown_agent_tree(parent_thread_id)
.await
.expect("tree shutdown after subtree resume should succeed");
}
#[tokio::test]
async fn resume_agent_from_rollout_skips_descendants_when_parent_resume_fails() {
let harness = AgentControlHarness::new().await;
let (parent_thread_id, parent_thread) = harness.start_thread().await;
let child_thread_id = harness
.control
.spawn_agent(
harness.config.clone(),
text_input("hello child"),
Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
parent_thread_id,
depth: 1,
agent_path: None,
agent_nickname: None,
agent_role: Some("explorer".to_string()),
})),
)
.await
.expect("child spawn should succeed");
let grandchild_thread_id = harness
.control
.spawn_agent(
harness.config.clone(),
text_input("hello grandchild"),
Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
parent_thread_id: child_thread_id,
depth: 2,
agent_path: None,
agent_nickname: None,
agent_role: Some("worker".to_string()),
})),
)
.await
.expect("grandchild spawn should succeed");
let child_thread = harness
.manager
.get_thread(child_thread_id)
.await
.expect("child thread should exist");
let grandchild_thread = harness
.manager
.get_thread(grandchild_thread_id)
.await
.expect("grandchild thread should exist");
persist_thread_for_tree_resume(&parent_thread, "parent persisted").await;
persist_thread_for_tree_resume(&child_thread, "child persisted").await;
persist_thread_for_tree_resume(&grandchild_thread, "grandchild persisted").await;
wait_for_live_thread_spawn_children(&harness.control, parent_thread_id, &[child_thread_id])
.await;
wait_for_live_thread_spawn_children(&harness.control, child_thread_id, &[grandchild_thread_id])
.await;
let child_rollout_path = child_thread
.rollout_path()
.expect("child thread should have rollout path");
let report = harness
.manager
.shutdown_all_threads_bounded(Duration::from_secs(5))
.await;
assert_eq!(report.submit_failed, Vec::<ThreadId>::new());
assert_eq!(report.timed_out, Vec::<ThreadId>::new());
tokio::fs::remove_file(&child_rollout_path)
.await
.expect("child rollout path should be removable");
let resumed_parent_thread_id = harness
.control
.resume_agent_from_rollout(
harness.config.clone(),
parent_thread_id,
SessionSource::Exec,
)
.await
.expect("root resume should succeed");
assert_eq!(resumed_parent_thread_id, parent_thread_id);
assert_ne!(
harness.control.get_status(parent_thread_id).await,
AgentStatus::NotFound
);
assert_eq!(
harness.control.get_status(child_thread_id).await,
AgentStatus::NotFound
);
assert_eq!(
harness.control.get_status(grandchild_thread_id).await,
AgentStatus::NotFound
);
let _ = harness
.control
.shutdown_agent_tree(parent_thread_id)
.await
.expect("tree shutdown after partial subtree resume should succeed");
}