Files
codex/codex-rs/core/src/agent/control.rs
2026-02-26 18:55:34 +00:00

1445 lines
52 KiB
Rust

use crate::agent::AgentStatus;
use crate::agent::guards::Guards;
use crate::agent::status::is_final;
use crate::error::CodexErr;
use crate::error::Result as CodexResult;
use crate::find_thread_path_by_id_str;
use crate::rollout::RolloutRecorder;
use crate::session_prefix::format_subagent_context_line;
use crate::session_prefix::format_subagent_notification_message;
use crate::state_db;
use crate::thread_manager::ThreadManagerState;
use codex_protocol::ThreadId;
use codex_protocol::models::FunctionCallOutputPayload;
use codex_protocol::models::ResponseItem;
use codex_protocol::protocol::InitialHistory;
use codex_protocol::protocol::Op;
use codex_protocol::protocol::RolloutItem;
use codex_protocol::protocol::SessionSource;
use codex_protocol::protocol::SubAgentSource;
use codex_protocol::protocol::TokenUsage;
use codex_protocol::user_input::UserInput;
use std::sync::Arc;
use std::sync::Weak;
use tokio::sync::watch;
const AGENT_NAMES: &str = include_str!("agent_names.txt");
const FORKED_SPAWN_AGENT_OUTPUT_MESSAGE: &str = "You are the newly spawned agent. The prior conversation history was forked from your parent agent. Treat the next user message as your new task, and use the forked history only as background context.";
#[derive(Clone, Debug, Default)]
pub(crate) struct SpawnAgentOptions {
pub(crate) fork_parent_spawn_call_id: Option<String>,
}
fn agent_nickname_list() -> Vec<&'static str> {
AGENT_NAMES
.lines()
.map(str::trim)
.filter(|name| !name.is_empty())
.collect()
}
/// Control-plane handle for multi-agent operations.
/// `AgentControl` is held by each session (via `SessionServices`). It provides capability to
/// spawn new agents and the inter-agent communication layer.
/// An `AgentControl` instance is shared per "user session" which means the same `AgentControl`
/// is used for every sub-agent spawned by Codex. By doing so, we make sure the guards are
/// scoped to a user session.
#[derive(Clone, Default)]
pub(crate) struct AgentControl {
/// Weak handle back to the global thread registry/state.
/// This is `Weak` to avoid reference cycles and shadow persistence of the form
/// `ThreadManagerState -> CodexThread -> Session -> SessionServices -> ThreadManagerState`.
manager: Weak<ThreadManagerState>,
state: Arc<Guards>,
}
impl AgentControl {
/// Construct a new `AgentControl` that can spawn/message agents via the given manager state.
pub(crate) fn new(manager: Weak<ThreadManagerState>) -> Self {
Self {
manager,
..Default::default()
}
}
/// Spawn a new agent thread and submit the initial prompt.
pub(crate) async fn spawn_agent(
&self,
config: crate::config::Config,
items: Vec<UserInput>,
session_source: Option<SessionSource>,
) -> CodexResult<ThreadId> {
self.spawn_agent_with_options(config, items, session_source, SpawnAgentOptions::default())
.await
}
pub(crate) async fn spawn_agent_with_options(
&self,
config: crate::config::Config,
items: Vec<UserInput>,
session_source: Option<SessionSource>,
options: SpawnAgentOptions,
) -> CodexResult<ThreadId> {
let state = self.upgrade()?;
let mut reservation = self.state.reserve_spawn_slot(config.agent_max_threads)?;
let session_source = match session_source {
Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
parent_thread_id,
depth,
agent_role,
..
})) => {
let agent_nickname = reservation.reserve_agent_nickname(&agent_nickname_list())?;
Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
parent_thread_id,
depth,
agent_nickname: Some(agent_nickname),
agent_role,
}))
}
other => other,
};
let notification_source = session_source.clone();
// The same `AgentControl` is sent to spawn the thread.
let new_thread = match session_source {
Some(session_source) => {
if let Some(call_id) = options.fork_parent_spawn_call_id.as_ref() {
let SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
parent_thread_id,
..
}) = session_source.clone()
else {
return Err(CodexErr::Fatal(
"spawn_agent fork requires a thread-spawn session source".to_string(),
));
};
let parent_thread = state.get_thread(parent_thread_id).await.ok();
if let Some(parent_thread) = parent_thread.as_ref() {
// `record_conversation_items` only queues rollout writes asynchronously.
// Flush/materialize the live parent before snapshotting JSONL for a fork.
parent_thread
.codex
.session
.ensure_rollout_materialized()
.await;
parent_thread.codex.session.flush_rollout().await;
}
let rollout_path = parent_thread
.as_ref()
.and_then(|parent_thread| parent_thread.rollout_path())
.or(find_thread_path_by_id_str(
config.codex_home.as_path(),
&parent_thread_id.to_string(),
)
.await?)
.ok_or_else(|| {
CodexErr::Fatal(format!(
"parent thread rollout unavailable for fork: {parent_thread_id}"
))
})?;
let mut forked_rollout_items =
RolloutRecorder::get_rollout_history(&rollout_path)
.await?
.get_rollout_items();
let mut output = FunctionCallOutputPayload::from_text(
FORKED_SPAWN_AGENT_OUTPUT_MESSAGE.to_string(),
);
output.success = Some(true);
forked_rollout_items.push(RolloutItem::ResponseItem(
ResponseItem::FunctionCallOutput {
call_id: call_id.clone(),
output,
},
));
let initial_history = InitialHistory::Forked(forked_rollout_items);
state
.fork_thread_with_source(
config,
initial_history,
self.clone(),
session_source,
false,
)
.await?
} else {
state
.spawn_new_thread_with_source(
config,
self.clone(),
session_source,
false,
None,
)
.await?
}
}
None => state.spawn_new_thread(config, self.clone()).await?,
};
reservation.commit(new_thread.thread_id);
// Notify a new thread has been created. This notification will be processed by clients
// to subscribe or drain this newly created thread.
// TODO(jif) add helper for drain
state.notify_thread_created(new_thread.thread_id);
self.send_input(new_thread.thread_id, items).await?;
self.maybe_start_completion_watcher(new_thread.thread_id, notification_source);
Ok(new_thread.thread_id)
}
/// Resume an existing agent thread from a recorded rollout file.
pub(crate) async fn resume_agent_from_rollout(
&self,
config: crate::config::Config,
thread_id: ThreadId,
session_source: SessionSource,
) -> CodexResult<ThreadId> {
let state = self.upgrade()?;
let mut reservation = self.state.reserve_spawn_slot(config.agent_max_threads)?;
let session_source = match session_source {
SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
parent_thread_id,
depth,
..
}) => {
// Collab resume callers rebuild a placeholder ThreadSpawn source. Rehydrate the
// stored nickname/role from sqlite when available; otherwise leave both unset.
let (resumed_agent_nickname, resumed_agent_role) =
if let Some(state_db_ctx) = state_db::get_state_db(&config, None).await {
match state_db_ctx.get_thread(thread_id).await {
Ok(Some(metadata)) => (metadata.agent_nickname, metadata.agent_role),
Ok(None) | Err(_) => (None, None),
}
} else {
(None, None)
};
let reserved_agent_nickname = resumed_agent_nickname
.as_deref()
.map(|agent_nickname| {
reservation.reserve_agent_nickname_with_preference(
&agent_nickname_list(),
Some(agent_nickname),
)
})
.transpose()?;
SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
parent_thread_id,
depth,
agent_nickname: reserved_agent_nickname,
agent_role: resumed_agent_role,
})
}
other => other,
};
let notification_source = session_source.clone();
let rollout_path =
find_thread_path_by_id_str(config.codex_home.as_path(), &thread_id.to_string())
.await?
.ok_or_else(|| CodexErr::ThreadNotFound(thread_id))?;
let resumed_thread = state
.resume_thread_from_rollout_with_source(
config,
rollout_path,
self.clone(),
session_source,
)
.await?;
reservation.commit(resumed_thread.thread_id);
// Resumed threads are re-registered in-memory and need the same listener
// attachment path as freshly spawned threads.
state.notify_thread_created(resumed_thread.thread_id);
self.maybe_start_completion_watcher(resumed_thread.thread_id, Some(notification_source));
Ok(resumed_thread.thread_id)
}
/// Send rich user input items to an existing agent thread.
pub(crate) async fn send_input(
&self,
agent_id: ThreadId,
items: Vec<UserInput>,
) -> CodexResult<String> {
let state = self.upgrade()?;
let result = state
.send_op(
agent_id,
Op::UserInput {
items,
final_output_json_schema: None,
},
)
.await;
if matches!(result, Err(CodexErr::InternalAgentDied)) {
let _ = state.remove_thread(&agent_id).await;
self.state.release_spawned_thread(agent_id);
}
result
}
/// Interrupt the current task for an existing agent thread.
pub(crate) async fn interrupt_agent(&self, agent_id: ThreadId) -> CodexResult<String> {
let state = self.upgrade()?;
state.send_op(agent_id, Op::Interrupt).await
}
/// Submit a shutdown request to an existing agent thread.
pub(crate) async fn shutdown_agent(&self, agent_id: ThreadId) -> CodexResult<String> {
let state = self.upgrade()?;
let result = state.send_op(agent_id, Op::Shutdown {}).await;
let _ = state.remove_thread(&agent_id).await;
self.state.release_spawned_thread(agent_id);
result
}
/// Fetch the last known status for `agent_id`, returning `NotFound` when unavailable.
pub(crate) async fn get_status(&self, agent_id: ThreadId) -> AgentStatus {
let Ok(state) = self.upgrade() else {
// No agent available if upgrade fails.
return AgentStatus::NotFound;
};
let Ok(thread) = state.get_thread(agent_id).await else {
return AgentStatus::NotFound;
};
thread.agent_status().await
}
pub(crate) async fn get_agent_nickname_and_role(
&self,
agent_id: ThreadId,
) -> Option<(Option<String>, Option<String>)> {
let Ok(state) = self.upgrade() else {
return None;
};
let Ok(thread) = state.get_thread(agent_id).await else {
return None;
};
let session_source = thread.config_snapshot().await.session_source;
Some((
session_source.get_nickname(),
session_source.get_agent_role(),
))
}
/// Subscribe to status updates for `agent_id`, yielding the latest value and changes.
pub(crate) async fn subscribe_status(
&self,
agent_id: ThreadId,
) -> CodexResult<watch::Receiver<AgentStatus>> {
let state = self.upgrade()?;
let thread = state.get_thread(agent_id).await?;
Ok(thread.subscribe_status())
}
pub(crate) async fn get_total_token_usage(&self, agent_id: ThreadId) -> Option<TokenUsage> {
let Ok(state) = self.upgrade() else {
return None;
};
let Ok(thread) = state.get_thread(agent_id).await else {
return None;
};
thread.total_token_usage().await
}
pub(crate) async fn format_environment_context_subagents(
&self,
parent_thread_id: ThreadId,
) -> String {
let Ok(state) = self.upgrade() else {
return String::new();
};
let mut agents = Vec::new();
for thread_id in state.list_thread_ids().await {
let Ok(thread) = state.get_thread(thread_id).await else {
continue;
};
let snapshot = thread.config_snapshot().await;
let SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
parent_thread_id: agent_parent_thread_id,
agent_nickname,
..
}) = snapshot.session_source
else {
continue;
};
if agent_parent_thread_id != parent_thread_id {
continue;
}
agents.push(format_subagent_context_line(
&thread_id.to_string(),
agent_nickname.as_deref(),
));
}
agents.sort();
agents.join("\n")
}
/// Starts a detached watcher for sub-agents spawned from another thread.
///
/// This is only enabled for `SubAgentSource::ThreadSpawn`, where a parent thread exists and
/// can receive completion notifications.
fn maybe_start_completion_watcher(
&self,
child_thread_id: ThreadId,
session_source: Option<SessionSource>,
) {
let Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
parent_thread_id, ..
})) = session_source
else {
return;
};
let control = self.clone();
tokio::spawn(async move {
let mut status_rx = match control.subscribe_status(child_thread_id).await {
Ok(rx) => rx,
Err(_) => return,
};
let mut status = status_rx.borrow().clone();
while !is_final(&status) {
if status_rx.changed().await.is_err() {
status = control.get_status(child_thread_id).await;
break;
}
status = status_rx.borrow().clone();
}
if !is_final(&status) {
return;
}
let Ok(state) = control.upgrade() else {
return;
};
let Ok(parent_thread) = state.get_thread(parent_thread_id).await else {
return;
};
parent_thread
.inject_user_message_without_turn(format_subagent_notification_message(
&child_thread_id.to_string(),
&status,
))
.await;
});
}
fn upgrade(&self) -> CodexResult<Arc<ThreadManagerState>> {
self.manager
.upgrade()
.ok_or_else(|| CodexErr::UnsupportedOperation("thread manager dropped".to_string()))
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::CodexAuth;
use crate::CodexThread;
use crate::ThreadManager;
use crate::agent::agent_status_from_event;
use crate::config::Config;
use crate::config::ConfigBuilder;
use crate::config_loader::LoaderOverrides;
use crate::contextual_user_message::SUBAGENT_NOTIFICATION_OPEN_TAG;
use crate::features::Feature;
use assert_matches::assert_matches;
use codex_protocol::config_types::ModeKind;
use codex_protocol::models::ContentItem;
use codex_protocol::models::ResponseItem;
use codex_protocol::protocol::ErrorEvent;
use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::SessionSource;
use codex_protocol::protocol::SubAgentSource;
use codex_protocol::protocol::TurnAbortReason;
use codex_protocol::protocol::TurnAbortedEvent;
use codex_protocol::protocol::TurnCompleteEvent;
use codex_protocol::protocol::TurnStartedEvent;
use pretty_assertions::assert_eq;
use tempfile::TempDir;
use tokio::time::Duration;
use tokio::time::sleep;
use tokio::time::timeout;
use toml::Value as TomlValue;
async fn test_config_with_cli_overrides(
cli_overrides: Vec<(String, TomlValue)>,
) -> (TempDir, Config) {
let home = TempDir::new().expect("create temp dir");
let config = ConfigBuilder::default()
.codex_home(home.path().to_path_buf())
.cli_overrides(cli_overrides)
.loader_overrides(LoaderOverrides {
#[cfg(target_os = "macos")]
managed_preferences_base64: Some(String::new()),
macos_managed_config_requirements_base64: Some(String::new()),
..LoaderOverrides::default()
})
.build()
.await
.expect("load default test config");
(home, config)
}
async fn test_config() -> (TempDir, Config) {
test_config_with_cli_overrides(Vec::new()).await
}
fn text_input(text: &str) -> Vec<UserInput> {
vec![UserInput::Text {
text: text.to_string(),
text_elements: Vec::new(),
}]
}
struct AgentControlHarness {
_home: TempDir,
config: Config,
manager: ThreadManager,
control: AgentControl,
}
impl AgentControlHarness {
async fn new() -> Self {
let (home, config) = test_config().await;
let manager = ThreadManager::with_models_provider_and_home_for_tests(
CodexAuth::from_api_key("dummy"),
config.model_provider.clone(),
config.codex_home.clone(),
);
let control = manager.agent_control();
Self {
_home: home,
config,
manager,
control,
}
}
async fn start_thread(&self) -> (ThreadId, Arc<CodexThread>) {
let new_thread = self
.manager
.start_thread(self.config.clone())
.await
.expect("start thread");
(new_thread.thread_id, new_thread.thread)
}
}
fn has_subagent_notification(history_items: &[ResponseItem]) -> bool {
history_items.iter().any(|item| {
let ResponseItem::Message { role, content, .. } = item else {
return false;
};
if role != "user" {
return false;
}
content.iter().any(|content_item| match content_item {
ContentItem::InputText { text } | ContentItem::OutputText { text } => {
text.contains(SUBAGENT_NOTIFICATION_OPEN_TAG)
}
ContentItem::InputImage { .. } => false,
})
})
}
/// Returns true when any message item contains `needle` in a text span.
fn history_contains_text(history_items: &[ResponseItem], needle: &str) -> bool {
history_items.iter().any(|item| {
let ResponseItem::Message { content, .. } = item else {
return false;
};
content.iter().any(|content_item| match content_item {
ContentItem::InputText { text } | ContentItem::OutputText { text } => {
text.contains(needle)
}
ContentItem::InputImage { .. } => false,
})
})
}
async fn wait_for_subagent_notification(parent_thread: &Arc<CodexThread>) -> bool {
let wait = async {
loop {
let history_items = parent_thread
.codex
.session
.clone_history()
.await
.raw_items()
.to_vec();
if has_subagent_notification(&history_items) {
return true;
}
sleep(Duration::from_millis(25)).await;
}
};
timeout(Duration::from_secs(2), wait).await.is_ok()
}
#[tokio::test]
async fn send_input_errors_when_manager_dropped() {
let control = AgentControl::default();
let err = control
.send_input(
ThreadId::new(),
vec![UserInput::Text {
text: "hello".to_string(),
text_elements: Vec::new(),
}],
)
.await
.expect_err("send_input should fail without a manager");
assert_eq!(
err.to_string(),
"unsupported operation: thread manager dropped"
);
}
#[tokio::test]
async fn get_status_returns_not_found_without_manager() {
let control = AgentControl::default();
let got = control.get_status(ThreadId::new()).await;
assert_eq!(got, AgentStatus::NotFound);
}
#[tokio::test]
async fn on_event_updates_status_from_task_started() {
let status = agent_status_from_event(&EventMsg::TurnStarted(TurnStartedEvent {
turn_id: "turn-1".to_string(),
model_context_window: None,
collaboration_mode_kind: ModeKind::Default,
}));
assert_eq!(status, Some(AgentStatus::Running));
}
#[tokio::test]
async fn on_event_updates_status_from_task_complete() {
let status = agent_status_from_event(&EventMsg::TurnComplete(TurnCompleteEvent {
turn_id: "turn-1".to_string(),
last_agent_message: Some("done".to_string()),
}));
let expected = AgentStatus::Completed(Some("done".to_string()));
assert_eq!(status, Some(expected));
}
#[tokio::test]
async fn on_event_updates_status_from_error() {
let status = agent_status_from_event(&EventMsg::Error(ErrorEvent {
message: "boom".to_string(),
codex_error_info: None,
}));
let expected = AgentStatus::Errored("boom".to_string());
assert_eq!(status, Some(expected));
}
#[tokio::test]
async fn on_event_updates_status_from_turn_aborted() {
let status = agent_status_from_event(&EventMsg::TurnAborted(TurnAbortedEvent {
turn_id: Some("turn-1".to_string()),
reason: TurnAbortReason::Interrupted,
}));
let expected = AgentStatus::Errored("Interrupted".to_string());
assert_eq!(status, Some(expected));
}
#[tokio::test]
async fn on_event_updates_status_from_shutdown_complete() {
let status = agent_status_from_event(&EventMsg::ShutdownComplete);
assert_eq!(status, Some(AgentStatus::Shutdown));
}
#[tokio::test]
async fn spawn_agent_errors_when_manager_dropped() {
let control = AgentControl::default();
let (_home, config) = test_config().await;
let err = control
.spawn_agent(config, text_input("hello"), None)
.await
.expect_err("spawn_agent should fail without a manager");
assert_eq!(
err.to_string(),
"unsupported operation: thread manager dropped"
);
}
#[tokio::test]
async fn resume_agent_errors_when_manager_dropped() {
let control = AgentControl::default();
let (_home, config) = test_config().await;
let err = control
.resume_agent_from_rollout(config, ThreadId::new(), SessionSource::Exec)
.await
.expect_err("resume_agent should fail without a manager");
assert_eq!(
err.to_string(),
"unsupported operation: thread manager dropped"
);
}
#[tokio::test]
async fn send_input_errors_when_thread_missing() {
let harness = AgentControlHarness::new().await;
let thread_id = ThreadId::new();
let err = harness
.control
.send_input(
thread_id,
vec![UserInput::Text {
text: "hello".to_string(),
text_elements: Vec::new(),
}],
)
.await
.expect_err("send_input should fail for missing thread");
assert_matches!(err, CodexErr::ThreadNotFound(id) if id == thread_id);
}
#[tokio::test]
async fn get_status_returns_not_found_for_missing_thread() {
let harness = AgentControlHarness::new().await;
let status = harness.control.get_status(ThreadId::new()).await;
assert_eq!(status, AgentStatus::NotFound);
}
#[tokio::test]
async fn get_status_returns_pending_init_for_new_thread() {
let harness = AgentControlHarness::new().await;
let (thread_id, _) = harness.start_thread().await;
let status = harness.control.get_status(thread_id).await;
assert_eq!(status, AgentStatus::PendingInit);
}
#[tokio::test]
async fn subscribe_status_errors_for_missing_thread() {
let harness = AgentControlHarness::new().await;
let thread_id = ThreadId::new();
let err = harness
.control
.subscribe_status(thread_id)
.await
.expect_err("subscribe_status should fail for missing thread");
assert_matches!(err, CodexErr::ThreadNotFound(id) if id == thread_id);
}
#[tokio::test]
async fn subscribe_status_updates_on_shutdown() {
let harness = AgentControlHarness::new().await;
let (thread_id, thread) = harness.start_thread().await;
let mut status_rx = harness
.control
.subscribe_status(thread_id)
.await
.expect("subscribe_status should succeed");
assert_eq!(status_rx.borrow().clone(), AgentStatus::PendingInit);
let _ = thread
.submit(Op::Shutdown {})
.await
.expect("shutdown should submit");
let _ = status_rx.changed().await;
assert_eq!(status_rx.borrow().clone(), AgentStatus::Shutdown);
}
#[tokio::test]
async fn send_input_submits_user_message() {
let harness = AgentControlHarness::new().await;
let (thread_id, _thread) = harness.start_thread().await;
let submission_id = harness
.control
.send_input(
thread_id,
vec![UserInput::Text {
text: "hello from tests".to_string(),
text_elements: Vec::new(),
}],
)
.await
.expect("send_input should succeed");
assert!(!submission_id.is_empty());
let expected = (
thread_id,
Op::UserInput {
items: vec![UserInput::Text {
text: "hello from tests".to_string(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
},
);
let captured = harness
.manager
.captured_ops()
.into_iter()
.find(|entry| *entry == expected);
assert_eq!(captured, Some(expected));
}
#[tokio::test]
async fn spawn_agent_creates_thread_and_sends_prompt() {
let harness = AgentControlHarness::new().await;
let thread_id = harness
.control
.spawn_agent(harness.config.clone(), text_input("spawned"), None)
.await
.expect("spawn_agent should succeed");
let _thread = harness
.manager
.get_thread(thread_id)
.await
.expect("thread should be registered");
let expected = (
thread_id,
Op::UserInput {
items: vec![UserInput::Text {
text: "spawned".to_string(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
},
);
let captured = harness
.manager
.captured_ops()
.into_iter()
.find(|entry| *entry == expected);
assert_eq!(captured, Some(expected));
}
#[tokio::test]
async fn spawn_agent_can_fork_parent_thread_history() {
let harness = AgentControlHarness::new().await;
let (parent_thread_id, parent_thread) = harness.start_thread().await;
parent_thread
.inject_user_message_without_turn("parent seed context".to_string())
.await;
let turn_context = parent_thread.codex.session.new_default_turn().await;
let parent_spawn_call_id = "spawn-call-history".to_string();
let parent_spawn_call = ResponseItem::FunctionCall {
id: None,
name: "spawn_agent".to_string(),
arguments: "{}".to_string(),
call_id: parent_spawn_call_id.clone(),
};
parent_thread
.codex
.session
.record_conversation_items(turn_context.as_ref(), &[parent_spawn_call])
.await;
parent_thread
.codex
.session
.ensure_rollout_materialized()
.await;
parent_thread.codex.session.flush_rollout().await;
let child_thread_id = harness
.control
.spawn_agent_with_options(
harness.config.clone(),
text_input("child task"),
Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
parent_thread_id,
depth: 1,
agent_nickname: None,
agent_role: None,
})),
SpawnAgentOptions {
fork_parent_spawn_call_id: Some(parent_spawn_call_id),
},
)
.await
.expect("forked spawn should succeed");
let child_thread = harness
.manager
.get_thread(child_thread_id)
.await
.expect("child thread should be registered");
assert_ne!(child_thread_id, parent_thread_id);
let history = child_thread.codex.session.clone_history().await;
assert!(history_contains_text(
history.raw_items(),
"parent seed context"
));
let expected = (
child_thread_id,
Op::UserInput {
items: vec![UserInput::Text {
text: "child task".to_string(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
},
);
let captured = harness
.manager
.captured_ops()
.into_iter()
.find(|entry| *entry == expected);
assert_eq!(captured, Some(expected));
let _ = harness
.control
.shutdown_agent(child_thread_id)
.await
.expect("child shutdown should submit");
let _ = parent_thread
.submit(Op::Shutdown {})
.await
.expect("parent shutdown should submit");
}
#[tokio::test]
async fn spawn_agent_fork_injects_output_for_parent_spawn_call() {
let harness = AgentControlHarness::new().await;
let (parent_thread_id, parent_thread) = harness.start_thread().await;
let turn_context = parent_thread.codex.session.new_default_turn().await;
let parent_spawn_call_id = "spawn-call-1".to_string();
let parent_spawn_call = ResponseItem::FunctionCall {
id: None,
name: "spawn_agent".to_string(),
arguments: "{}".to_string(),
call_id: parent_spawn_call_id.clone(),
};
parent_thread
.codex
.session
.record_conversation_items(turn_context.as_ref(), &[parent_spawn_call])
.await;
parent_thread
.codex
.session
.ensure_rollout_materialized()
.await;
parent_thread.codex.session.flush_rollout().await;
let child_thread_id = harness
.control
.spawn_agent_with_options(
harness.config.clone(),
text_input("child task"),
Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
parent_thread_id,
depth: 1,
agent_nickname: None,
agent_role: None,
})),
SpawnAgentOptions {
fork_parent_spawn_call_id: Some(parent_spawn_call_id.clone()),
},
)
.await
.expect("forked spawn should succeed");
let child_thread = harness
.manager
.get_thread(child_thread_id)
.await
.expect("child thread should be registered");
let history = child_thread.codex.session.clone_history().await;
let injected_output = history.raw_items().iter().find_map(|item| match item {
ResponseItem::FunctionCallOutput { call_id, output }
if call_id == &parent_spawn_call_id =>
{
Some(output)
}
_ => None,
});
let injected_output =
injected_output.expect("forked child should contain synthetic tool output");
assert_eq!(
injected_output.text_content(),
Some(FORKED_SPAWN_AGENT_OUTPUT_MESSAGE)
);
assert_eq!(injected_output.success, Some(true));
let _ = harness
.control
.shutdown_agent(child_thread_id)
.await
.expect("child shutdown should submit");
let _ = parent_thread
.submit(Op::Shutdown {})
.await
.expect("parent shutdown should submit");
}
#[tokio::test]
async fn spawn_agent_fork_flushes_parent_rollout_before_loading_history() {
let harness = AgentControlHarness::new().await;
let (parent_thread_id, parent_thread) = harness.start_thread().await;
let turn_context = parent_thread.codex.session.new_default_turn().await;
let parent_spawn_call_id = "spawn-call-unflushed".to_string();
let parent_spawn_call = ResponseItem::FunctionCall {
id: None,
name: "spawn_agent".to_string(),
arguments: "{}".to_string(),
call_id: parent_spawn_call_id.clone(),
};
parent_thread
.codex
.session
.record_conversation_items(turn_context.as_ref(), &[parent_spawn_call])
.await;
let child_thread_id = harness
.control
.spawn_agent_with_options(
harness.config.clone(),
text_input("child task"),
Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
parent_thread_id,
depth: 1,
agent_nickname: None,
agent_role: None,
})),
SpawnAgentOptions {
fork_parent_spawn_call_id: Some(parent_spawn_call_id.clone()),
},
)
.await
.expect("forked spawn should flush parent rollout before loading history");
let child_thread = harness
.manager
.get_thread(child_thread_id)
.await
.expect("child thread should be registered");
let history = child_thread.codex.session.clone_history().await;
let mut parent_call_index = None;
let mut injected_output_index = None;
for (idx, item) in history.raw_items().iter().enumerate() {
match item {
ResponseItem::FunctionCall { call_id, .. } if call_id == &parent_spawn_call_id => {
parent_call_index = Some(idx);
}
ResponseItem::FunctionCallOutput { call_id, .. }
if call_id == &parent_spawn_call_id =>
{
injected_output_index = Some(idx);
}
_ => {}
}
}
let parent_call_index =
parent_call_index.expect("forked child should include the parent spawn_agent call");
let injected_output_index = injected_output_index
.expect("forked child should include synthetic output for the parent spawn_agent call");
assert!(parent_call_index < injected_output_index);
let _ = harness
.control
.shutdown_agent(child_thread_id)
.await
.expect("child shutdown should submit");
let _ = parent_thread
.submit(Op::Shutdown {})
.await
.expect("parent shutdown should submit");
}
#[tokio::test]
async fn spawn_agent_respects_max_threads_limit() {
let max_threads = 1usize;
let (_home, config) = test_config_with_cli_overrides(vec![(
"agents.max_threads".to_string(),
TomlValue::Integer(max_threads as i64),
)])
.await;
let manager = ThreadManager::with_models_provider_and_home_for_tests(
CodexAuth::from_api_key("dummy"),
config.model_provider.clone(),
config.codex_home.clone(),
);
let control = manager.agent_control();
let _ = manager
.start_thread(config.clone())
.await
.expect("start thread");
let first_agent_id = control
.spawn_agent(config.clone(), text_input("hello"), None)
.await
.expect("spawn_agent should succeed");
let err = control
.spawn_agent(config, text_input("hello again"), None)
.await
.expect_err("spawn_agent should respect max threads");
let CodexErr::AgentLimitReached {
max_threads: seen_max_threads,
} = err
else {
panic!("expected CodexErr::AgentLimitReached");
};
assert_eq!(seen_max_threads, max_threads);
let _ = control
.shutdown_agent(first_agent_id)
.await
.expect("shutdown agent");
}
#[tokio::test]
async fn spawn_agent_releases_slot_after_shutdown() {
let max_threads = 1usize;
let (_home, config) = test_config_with_cli_overrides(vec![(
"agents.max_threads".to_string(),
TomlValue::Integer(max_threads as i64),
)])
.await;
let manager = ThreadManager::with_models_provider_and_home_for_tests(
CodexAuth::from_api_key("dummy"),
config.model_provider.clone(),
config.codex_home.clone(),
);
let control = manager.agent_control();
let first_agent_id = control
.spawn_agent(config.clone(), text_input("hello"), None)
.await
.expect("spawn_agent should succeed");
let _ = control
.shutdown_agent(first_agent_id)
.await
.expect("shutdown agent");
let second_agent_id = control
.spawn_agent(config.clone(), text_input("hello again"), None)
.await
.expect("spawn_agent should succeed after shutdown");
let _ = control
.shutdown_agent(second_agent_id)
.await
.expect("shutdown agent");
}
#[tokio::test]
async fn spawn_agent_limit_shared_across_clones() {
let max_threads = 1usize;
let (_home, config) = test_config_with_cli_overrides(vec![(
"agents.max_threads".to_string(),
TomlValue::Integer(max_threads as i64),
)])
.await;
let manager = ThreadManager::with_models_provider_and_home_for_tests(
CodexAuth::from_api_key("dummy"),
config.model_provider.clone(),
config.codex_home.clone(),
);
let control = manager.agent_control();
let cloned = control.clone();
let first_agent_id = cloned
.spawn_agent(config.clone(), text_input("hello"), None)
.await
.expect("spawn_agent should succeed");
let err = control
.spawn_agent(config, text_input("hello again"), None)
.await
.expect_err("spawn_agent should respect shared guard");
let CodexErr::AgentLimitReached { max_threads } = err else {
panic!("expected CodexErr::AgentLimitReached");
};
assert_eq!(max_threads, 1);
let _ = control
.shutdown_agent(first_agent_id)
.await
.expect("shutdown agent");
}
#[tokio::test]
async fn resume_agent_respects_max_threads_limit() {
let max_threads = 1usize;
let (_home, config) = test_config_with_cli_overrides(vec![(
"agents.max_threads".to_string(),
TomlValue::Integer(max_threads as i64),
)])
.await;
let manager = ThreadManager::with_models_provider_and_home_for_tests(
CodexAuth::from_api_key("dummy"),
config.model_provider.clone(),
config.codex_home.clone(),
);
let control = manager.agent_control();
let resumable_id = control
.spawn_agent(config.clone(), text_input("hello"), None)
.await
.expect("spawn_agent should succeed");
let _ = control
.shutdown_agent(resumable_id)
.await
.expect("shutdown resumable thread");
let active_id = control
.spawn_agent(config.clone(), text_input("occupy"), None)
.await
.expect("spawn_agent should succeed for active slot");
let err = control
.resume_agent_from_rollout(config, resumable_id, SessionSource::Exec)
.await
.expect_err("resume should respect max threads");
let CodexErr::AgentLimitReached {
max_threads: seen_max_threads,
} = err
else {
panic!("expected CodexErr::AgentLimitReached");
};
assert_eq!(seen_max_threads, max_threads);
let _ = control
.shutdown_agent(active_id)
.await
.expect("shutdown active thread");
}
#[tokio::test]
async fn resume_agent_releases_slot_after_resume_failure() {
let max_threads = 1usize;
let (_home, config) = test_config_with_cli_overrides(vec![(
"agents.max_threads".to_string(),
TomlValue::Integer(max_threads as i64),
)])
.await;
let manager = ThreadManager::with_models_provider_and_home_for_tests(
CodexAuth::from_api_key("dummy"),
config.model_provider.clone(),
config.codex_home.clone(),
);
let control = manager.agent_control();
let _ = control
.resume_agent_from_rollout(config.clone(), ThreadId::new(), SessionSource::Exec)
.await
.expect_err("resume should fail for missing rollout path");
let resumed_id = control
.spawn_agent(config, text_input("hello"), None)
.await
.expect("spawn should succeed after failed resume");
let _ = control
.shutdown_agent(resumed_id)
.await
.expect("shutdown resumed thread");
}
#[tokio::test]
async fn spawn_child_completion_notifies_parent_history() {
let harness = AgentControlHarness::new().await;
let (parent_thread_id, parent_thread) = harness.start_thread().await;
let child_thread_id = harness
.control
.spawn_agent(
harness.config.clone(),
text_input("hello child"),
Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
parent_thread_id,
depth: 1,
agent_nickname: None,
agent_role: Some("explorer".to_string()),
})),
)
.await
.expect("child spawn should succeed");
let child_thread = harness
.manager
.get_thread(child_thread_id)
.await
.expect("child thread should exist");
let _ = child_thread
.submit(Op::Shutdown {})
.await
.expect("child shutdown should submit");
assert_eq!(wait_for_subagent_notification(&parent_thread).await, true);
}
#[tokio::test]
async fn spawn_thread_subagent_gets_random_nickname_in_session_source() {
let harness = AgentControlHarness::new().await;
let (parent_thread_id, _parent_thread) = harness.start_thread().await;
let child_thread_id = harness
.control
.spawn_agent(
harness.config.clone(),
text_input("hello child"),
Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
parent_thread_id,
depth: 1,
agent_nickname: None,
agent_role: Some("explorer".to_string()),
})),
)
.await
.expect("child spawn should succeed");
let child_thread = harness
.manager
.get_thread(child_thread_id)
.await
.expect("child thread should be registered");
let snapshot = child_thread.config_snapshot().await;
let SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
parent_thread_id: seen_parent_thread_id,
depth,
agent_nickname,
agent_role,
}) = snapshot.session_source
else {
panic!("expected thread-spawn sub-agent source");
};
assert_eq!(seen_parent_thread_id, parent_thread_id);
assert_eq!(depth, 1);
assert!(agent_nickname.is_some());
assert_eq!(agent_role, Some("explorer".to_string()));
}
#[tokio::test]
async fn resume_thread_subagent_restores_stored_nickname_and_role() {
let (home, mut config) = test_config().await;
config.features.enable(Feature::Sqlite);
let manager = ThreadManager::with_models_provider_and_home_for_tests(
CodexAuth::from_api_key("dummy"),
config.model_provider.clone(),
config.codex_home.clone(),
);
let control = manager.agent_control();
let harness = AgentControlHarness {
_home: home,
config,
manager,
control,
};
let (parent_thread_id, _parent_thread) = harness.start_thread().await;
let child_thread_id = harness
.control
.spawn_agent(
harness.config.clone(),
text_input("hello child"),
Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
parent_thread_id,
depth: 1,
agent_nickname: None,
agent_role: Some("explorer".to_string()),
})),
)
.await
.expect("child spawn should succeed");
let child_thread = harness
.manager
.get_thread(child_thread_id)
.await
.expect("child thread should exist");
let mut status_rx = harness
.control
.subscribe_status(child_thread_id)
.await
.expect("status subscription should succeed");
if matches!(status_rx.borrow().clone(), AgentStatus::PendingInit) {
timeout(Duration::from_secs(5), async {
loop {
status_rx
.changed()
.await
.expect("child status should advance past pending init");
if !matches!(status_rx.borrow().clone(), AgentStatus::PendingInit) {
break;
}
}
})
.await
.expect("child should initialize before shutdown");
}
let original_snapshot = child_thread.config_snapshot().await;
let original_nickname = original_snapshot
.session_source
.get_nickname()
.expect("spawned sub-agent should have a nickname");
let state_db = child_thread
.state_db()
.expect("sqlite state db should be available for nickname resume test");
timeout(Duration::from_secs(5), async {
loop {
if let Ok(Some(metadata)) = state_db.get_thread(child_thread_id).await
&& metadata.agent_nickname.is_some()
&& metadata.agent_role.as_deref() == Some("explorer")
{
break;
}
sleep(Duration::from_millis(10)).await;
}
})
.await
.expect("child thread metadata should be persisted to sqlite before shutdown");
let _ = harness
.control
.shutdown_agent(child_thread_id)
.await
.expect("child shutdown should submit");
let resumed_thread_id = harness
.control
.resume_agent_from_rollout(
harness.config.clone(),
child_thread_id,
SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
parent_thread_id,
depth: 1,
agent_nickname: None,
agent_role: None,
}),
)
.await
.expect("resume should succeed");
assert_eq!(resumed_thread_id, child_thread_id);
let resumed_snapshot = harness
.manager
.get_thread(resumed_thread_id)
.await
.expect("resumed child thread should exist")
.config_snapshot()
.await;
let SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
parent_thread_id: resumed_parent_thread_id,
depth: resumed_depth,
agent_nickname: resumed_nickname,
agent_role: resumed_role,
}) = resumed_snapshot.session_source
else {
panic!("expected thread-spawn sub-agent source");
};
assert_eq!(resumed_parent_thread_id, parent_thread_id);
assert_eq!(resumed_depth, 1);
assert_eq!(resumed_nickname, Some(original_nickname));
assert_eq!(resumed_role, Some("explorer".to_string()));
let _ = harness
.control
.shutdown_agent(resumed_thread_id)
.await
.expect("resumed child shutdown should submit");
}
}