mirror of
https://github.com/openai/codex.git
synced 2026-02-23 01:03:48 +00:00
Compare commits
4 Commits
main
...
jif/agent-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c449d6b8cf | ||
|
|
1befa18255 | ||
|
|
e8de9b8d3f | ||
|
|
b6d6011edb |
@@ -4,6 +4,7 @@ use crate::agent::status::is_final;
|
||||
use crate::error::CodexErr;
|
||||
use crate::error::Result as CodexResult;
|
||||
use crate::find_thread_path_by_id_str;
|
||||
use crate::rollout::RolloutRecorder;
|
||||
use crate::session_prefix::format_subagent_notification_message;
|
||||
use crate::state_db;
|
||||
use crate::thread_manager::ThreadManagerState;
|
||||
@@ -19,6 +20,11 @@ use tokio::sync::watch;
|
||||
|
||||
const AGENT_NAMES: &str = include_str!("agent_names.txt");
|
||||
|
||||
#[derive(Clone, Copy, Debug, Default)]
|
||||
pub(crate) struct SpawnAgentOptions {
|
||||
pub(crate) fork_parent_thread: bool,
|
||||
}
|
||||
|
||||
fn agent_nickname_list() -> Vec<&'static str> {
|
||||
AGENT_NAMES
|
||||
.lines()
|
||||
@@ -57,6 +63,17 @@ impl AgentControl {
|
||||
config: crate::config::Config,
|
||||
items: Vec<UserInput>,
|
||||
session_source: Option<SessionSource>,
|
||||
) -> CodexResult<ThreadId> {
|
||||
self.spawn_agent_with_options(config, items, session_source, SpawnAgentOptions::default())
|
||||
.await
|
||||
}
|
||||
|
||||
pub(crate) async fn spawn_agent_with_options(
|
||||
&self,
|
||||
config: crate::config::Config,
|
||||
items: Vec<UserInput>,
|
||||
session_source: Option<SessionSource>,
|
||||
options: SpawnAgentOptions,
|
||||
) -> CodexResult<ThreadId> {
|
||||
let state = self.upgrade()?;
|
||||
let mut reservation = self.state.reserve_spawn_slot(config.agent_max_threads)?;
|
||||
@@ -82,9 +99,46 @@ impl AgentControl {
|
||||
// The same `AgentControl` is sent to spawn the thread.
|
||||
let new_thread = match session_source {
|
||||
Some(session_source) => {
|
||||
state
|
||||
.spawn_new_thread_with_source(config, self.clone(), session_source, false)
|
||||
.await?
|
||||
if options.fork_parent_thread {
|
||||
let SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
|
||||
parent_thread_id,
|
||||
..
|
||||
}) = session_source.clone()
|
||||
else {
|
||||
return Err(CodexErr::Fatal(
|
||||
"spawn_agent fork requires a thread-spawn session source".to_string(),
|
||||
));
|
||||
};
|
||||
let rollout_path = match state.get_thread(parent_thread_id).await {
|
||||
Ok(parent_thread) => parent_thread.rollout_path(),
|
||||
Err(_) => None,
|
||||
}
|
||||
.or(find_thread_path_by_id_str(
|
||||
config.codex_home.as_path(),
|
||||
&parent_thread_id.to_string(),
|
||||
)
|
||||
.await?)
|
||||
.ok_or_else(|| {
|
||||
CodexErr::Fatal(format!(
|
||||
"parent thread rollout unavailable for fork: {parent_thread_id}"
|
||||
))
|
||||
})?;
|
||||
let initial_history =
|
||||
RolloutRecorder::get_rollout_history(&rollout_path).await?;
|
||||
state
|
||||
.fork_thread_with_source(
|
||||
config,
|
||||
initial_history,
|
||||
self.clone(),
|
||||
session_source,
|
||||
false,
|
||||
)
|
||||
.await?
|
||||
} else {
|
||||
state
|
||||
.spawn_new_thread_with_source(config, self.clone(), session_source, false)
|
||||
.await?
|
||||
}
|
||||
}
|
||||
None => state.spawn_new_thread(config, self.clone()).await?,
|
||||
};
|
||||
@@ -421,6 +475,20 @@ mod tests {
|
||||
})
|
||||
}
|
||||
|
||||
fn history_contains_text(history_items: &[ResponseItem], needle: &str) -> bool {
|
||||
history_items.iter().any(|item| {
|
||||
let ResponseItem::Message { content, .. } = item else {
|
||||
return false;
|
||||
};
|
||||
content.iter().any(|content_item| match content_item {
|
||||
ContentItem::InputText { text } | ContentItem::OutputText { text } => {
|
||||
text.contains(needle)
|
||||
}
|
||||
ContentItem::InputImage { .. } => false,
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
async fn wait_for_subagent_notification(parent_thread: &Arc<CodexThread>) -> bool {
|
||||
let wait = async {
|
||||
loop {
|
||||
@@ -673,6 +741,77 @@ mod tests {
|
||||
assert_eq!(captured, Some(expected));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn spawn_agent_can_fork_parent_thread_history() {
|
||||
let harness = AgentControlHarness::new().await;
|
||||
let (parent_thread_id, parent_thread) = harness.start_thread().await;
|
||||
parent_thread
|
||||
.inject_user_message_without_turn("parent seed context".to_string())
|
||||
.await;
|
||||
parent_thread
|
||||
.codex
|
||||
.session
|
||||
.ensure_rollout_materialized()
|
||||
.await;
|
||||
parent_thread.codex.session.flush_rollout().await;
|
||||
|
||||
let child_thread_id = harness
|
||||
.control
|
||||
.spawn_agent_with_options(
|
||||
harness.config.clone(),
|
||||
text_input("child task"),
|
||||
Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
|
||||
parent_thread_id,
|
||||
depth: 1,
|
||||
agent_nickname: None,
|
||||
agent_role: None,
|
||||
})),
|
||||
SpawnAgentOptions {
|
||||
fork_parent_thread: true,
|
||||
},
|
||||
)
|
||||
.await
|
||||
.expect("forked spawn should succeed");
|
||||
|
||||
let child_thread = harness
|
||||
.manager
|
||||
.get_thread(child_thread_id)
|
||||
.await
|
||||
.expect("child thread should be registered");
|
||||
let history = child_thread.codex.session.clone_history().await;
|
||||
assert!(history_contains_text(
|
||||
history.raw_items(),
|
||||
"parent seed context"
|
||||
));
|
||||
|
||||
let expected = (
|
||||
child_thread_id,
|
||||
Op::UserInput {
|
||||
items: vec![UserInput::Text {
|
||||
text: "child task".to_string(),
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
final_output_json_schema: None,
|
||||
},
|
||||
);
|
||||
let captured = harness
|
||||
.manager
|
||||
.captured_ops()
|
||||
.into_iter()
|
||||
.find(|entry| *entry == expected);
|
||||
assert_eq!(captured, Some(expected));
|
||||
|
||||
let _ = harness
|
||||
.control
|
||||
.shutdown_agent(child_thread_id)
|
||||
.await
|
||||
.expect("child shutdown should submit");
|
||||
let _ = parent_thread
|
||||
.submit(Op::Shutdown {})
|
||||
.await
|
||||
.expect("parent shutdown should submit");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn spawn_agent_respects_max_threads_limit() {
|
||||
let max_threads = 1usize;
|
||||
|
||||
@@ -464,6 +464,26 @@ impl ThreadManagerState {
|
||||
.await
|
||||
}
|
||||
|
||||
pub(crate) async fn fork_thread_with_source(
|
||||
&self,
|
||||
config: Config,
|
||||
initial_history: InitialHistory,
|
||||
agent_control: AgentControl,
|
||||
session_source: SessionSource,
|
||||
persist_extended_history: bool,
|
||||
) -> CodexResult<NewThread> {
|
||||
self.spawn_thread_with_source(
|
||||
config,
|
||||
initial_history,
|
||||
Arc::clone(&self.auth_manager),
|
||||
agent_control,
|
||||
session_source,
|
||||
Vec::new(),
|
||||
persist_extended_history,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
/// Spawn a new thread with optional history and register it with the manager.
|
||||
pub(crate) async fn spawn_thread(
|
||||
&self,
|
||||
|
||||
@@ -93,6 +93,7 @@ impl ToolHandler for MultiAgentHandler {
|
||||
|
||||
mod spawn {
|
||||
use super::*;
|
||||
use crate::agent::control::SpawnAgentOptions;
|
||||
use crate::agent::role::apply_role_to_config;
|
||||
|
||||
use crate::agent::exceeds_thread_spawn_depth_limit;
|
||||
@@ -104,6 +105,8 @@ mod spawn {
|
||||
message: Option<String>,
|
||||
items: Option<Vec<UserInput>>,
|
||||
agent_type: Option<String>,
|
||||
#[serde(default)]
|
||||
fork_current_thread: bool,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
@@ -156,7 +159,7 @@ mod spawn {
|
||||
let result = session
|
||||
.services
|
||||
.agent_control
|
||||
.spawn_agent(
|
||||
.spawn_agent_with_options(
|
||||
config,
|
||||
input_items,
|
||||
Some(thread_spawn_source(
|
||||
@@ -164,6 +167,9 @@ mod spawn {
|
||||
child_depth,
|
||||
role_name,
|
||||
)),
|
||||
SpawnAgentOptions {
|
||||
fork_parent_thread: args.fork_current_thread,
|
||||
},
|
||||
)
|
||||
.await
|
||||
.map_err(collab_spawn_error);
|
||||
|
||||
@@ -552,6 +552,15 @@ fn create_spawn_agent_tool(config: &ToolsConfig) -> ToolSpec {
|
||||
)),
|
||||
},
|
||||
),
|
||||
(
|
||||
"fork_current_thread".to_string(),
|
||||
JsonSchema::Boolean {
|
||||
description: Some(
|
||||
"When true, fork the current thread history into the new agent before sending the initial prompt."
|
||||
.to_string(),
|
||||
),
|
||||
},
|
||||
),
|
||||
]);
|
||||
|
||||
ToolSpec::Function(ResponsesApiTool {
|
||||
|
||||
Reference in New Issue
Block a user