Compare commits

...

4 Commits

Author SHA1 Message Date
jif-oai
c449d6b8cf nits 3 2026-02-22 14:28:31 +00:00
jif-oai
1befa18255 nits 2 2026-02-22 14:26:39 +00:00
jif-oai
e8de9b8d3f nits 2026-02-22 14:21:09 +00:00
jif-oai
b6d6011edb feat: fork thread multi agent 2026-02-22 14:15:52 +00:00
4 changed files with 178 additions and 4 deletions

View File

@@ -4,6 +4,7 @@ use crate::agent::status::is_final;
use crate::error::CodexErr;
use crate::error::Result as CodexResult;
use crate::find_thread_path_by_id_str;
use crate::rollout::RolloutRecorder;
use crate::session_prefix::format_subagent_notification_message;
use crate::state_db;
use crate::thread_manager::ThreadManagerState;
@@ -19,6 +20,11 @@ use tokio::sync::watch;
const AGENT_NAMES: &str = include_str!("agent_names.txt");
#[derive(Clone, Copy, Debug, Default)]
pub(crate) struct SpawnAgentOptions {
pub(crate) fork_parent_thread: bool,
}
fn agent_nickname_list() -> Vec<&'static str> {
AGENT_NAMES
.lines()
@@ -57,6 +63,17 @@ impl AgentControl {
config: crate::config::Config,
items: Vec<UserInput>,
session_source: Option<SessionSource>,
) -> CodexResult<ThreadId> {
self.spawn_agent_with_options(config, items, session_source, SpawnAgentOptions::default())
.await
}
pub(crate) async fn spawn_agent_with_options(
&self,
config: crate::config::Config,
items: Vec<UserInput>,
session_source: Option<SessionSource>,
options: SpawnAgentOptions,
) -> CodexResult<ThreadId> {
let state = self.upgrade()?;
let mut reservation = self.state.reserve_spawn_slot(config.agent_max_threads)?;
@@ -82,9 +99,46 @@ impl AgentControl {
// The same `AgentControl` is sent to spawn the thread.
let new_thread = match session_source {
Some(session_source) => {
state
.spawn_new_thread_with_source(config, self.clone(), session_source, false)
.await?
if options.fork_parent_thread {
let SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
parent_thread_id,
..
}) = session_source.clone()
else {
return Err(CodexErr::Fatal(
"spawn_agent fork requires a thread-spawn session source".to_string(),
));
};
let rollout_path = match state.get_thread(parent_thread_id).await {
Ok(parent_thread) => parent_thread.rollout_path(),
Err(_) => None,
}
.or(find_thread_path_by_id_str(
config.codex_home.as_path(),
&parent_thread_id.to_string(),
)
.await?)
.ok_or_else(|| {
CodexErr::Fatal(format!(
"parent thread rollout unavailable for fork: {parent_thread_id}"
))
})?;
let initial_history =
RolloutRecorder::get_rollout_history(&rollout_path).await?;
state
.fork_thread_with_source(
config,
initial_history,
self.clone(),
session_source,
false,
)
.await?
} else {
state
.spawn_new_thread_with_source(config, self.clone(), session_source, false)
.await?
}
}
None => state.spawn_new_thread(config, self.clone()).await?,
};
@@ -421,6 +475,20 @@ mod tests {
})
}
fn history_contains_text(history_items: &[ResponseItem], needle: &str) -> bool {
history_items.iter().any(|item| {
let ResponseItem::Message { content, .. } = item else {
return false;
};
content.iter().any(|content_item| match content_item {
ContentItem::InputText { text } | ContentItem::OutputText { text } => {
text.contains(needle)
}
ContentItem::InputImage { .. } => false,
})
})
}
async fn wait_for_subagent_notification(parent_thread: &Arc<CodexThread>) -> bool {
let wait = async {
loop {
@@ -673,6 +741,77 @@ mod tests {
assert_eq!(captured, Some(expected));
}
#[tokio::test]
async fn spawn_agent_can_fork_parent_thread_history() {
let harness = AgentControlHarness::new().await;
let (parent_thread_id, parent_thread) = harness.start_thread().await;
parent_thread
.inject_user_message_without_turn("parent seed context".to_string())
.await;
parent_thread
.codex
.session
.ensure_rollout_materialized()
.await;
parent_thread.codex.session.flush_rollout().await;
let child_thread_id = harness
.control
.spawn_agent_with_options(
harness.config.clone(),
text_input("child task"),
Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
parent_thread_id,
depth: 1,
agent_nickname: None,
agent_role: None,
})),
SpawnAgentOptions {
fork_parent_thread: true,
},
)
.await
.expect("forked spawn should succeed");
let child_thread = harness
.manager
.get_thread(child_thread_id)
.await
.expect("child thread should be registered");
let history = child_thread.codex.session.clone_history().await;
assert!(history_contains_text(
history.raw_items(),
"parent seed context"
));
let expected = (
child_thread_id,
Op::UserInput {
items: vec![UserInput::Text {
text: "child task".to_string(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
},
);
let captured = harness
.manager
.captured_ops()
.into_iter()
.find(|entry| *entry == expected);
assert_eq!(captured, Some(expected));
let _ = harness
.control
.shutdown_agent(child_thread_id)
.await
.expect("child shutdown should submit");
let _ = parent_thread
.submit(Op::Shutdown {})
.await
.expect("parent shutdown should submit");
}
#[tokio::test]
async fn spawn_agent_respects_max_threads_limit() {
let max_threads = 1usize;

View File

@@ -464,6 +464,26 @@ impl ThreadManagerState {
.await
}
pub(crate) async fn fork_thread_with_source(
&self,
config: Config,
initial_history: InitialHistory,
agent_control: AgentControl,
session_source: SessionSource,
persist_extended_history: bool,
) -> CodexResult<NewThread> {
self.spawn_thread_with_source(
config,
initial_history,
Arc::clone(&self.auth_manager),
agent_control,
session_source,
Vec::new(),
persist_extended_history,
)
.await
}
/// Spawn a new thread with optional history and register it with the manager.
pub(crate) async fn spawn_thread(
&self,

View File

@@ -93,6 +93,7 @@ impl ToolHandler for MultiAgentHandler {
mod spawn {
use super::*;
use crate::agent::control::SpawnAgentOptions;
use crate::agent::role::apply_role_to_config;
use crate::agent::exceeds_thread_spawn_depth_limit;
@@ -104,6 +105,8 @@ mod spawn {
message: Option<String>,
items: Option<Vec<UserInput>>,
agent_type: Option<String>,
#[serde(default)]
fork_current_thread: bool,
}
#[derive(Debug, Serialize)]
@@ -156,7 +159,7 @@ mod spawn {
let result = session
.services
.agent_control
.spawn_agent(
.spawn_agent_with_options(
config,
input_items,
Some(thread_spawn_source(
@@ -164,6 +167,9 @@ mod spawn {
child_depth,
role_name,
)),
SpawnAgentOptions {
fork_parent_thread: args.fork_current_thread,
},
)
.await
.map_err(collab_spawn_error);

View File

@@ -552,6 +552,15 @@ fn create_spawn_agent_tool(config: &ToolsConfig) -> ToolSpec {
)),
},
),
(
"fork_current_thread".to_string(),
JsonSchema::Boolean {
description: Some(
"When true, fork the current thread history into the new agent before sending the initial prompt."
.to_string(),
),
},
),
]);
ToolSpec::Function(ResponsesApiTool {