mirror of
https://github.com/openai/codex.git
synced 2026-02-01 22:47:52 +00:00
feat: add thread spawn source for collab tools (#9769)
This commit is contained in:
@@ -30,6 +30,7 @@ use codex_protocol::protocol::SkillErrorInfo as CoreSkillErrorInfo;
|
||||
use codex_protocol::protocol::SkillInterface as CoreSkillInterface;
|
||||
use codex_protocol::protocol::SkillMetadata as CoreSkillMetadata;
|
||||
use codex_protocol::protocol::SkillScope as CoreSkillScope;
|
||||
use codex_protocol::protocol::SubAgentSource as CoreSubAgentSource;
|
||||
use codex_protocol::protocol::TokenUsage as CoreTokenUsage;
|
||||
use codex_protocol::protocol::TokenUsageInfo as CoreTokenUsageInfo;
|
||||
use codex_protocol::user_input::ByteRange as CoreByteRange;
|
||||
@@ -700,6 +701,7 @@ pub enum SessionSource {
|
||||
VsCode,
|
||||
Exec,
|
||||
AppServer,
|
||||
SubAgent(CoreSubAgentSource),
|
||||
#[serde(other)]
|
||||
Unknown,
|
||||
}
|
||||
@@ -711,7 +713,7 @@ impl From<CoreSessionSource> for SessionSource {
|
||||
CoreSessionSource::VSCode => SessionSource::VsCode,
|
||||
CoreSessionSource::Exec => SessionSource::Exec,
|
||||
CoreSessionSource::Mcp => SessionSource::AppServer,
|
||||
CoreSessionSource::SubAgent(_) => SessionSource::Unknown,
|
||||
CoreSessionSource::SubAgent(sub) => SessionSource::SubAgent(sub),
|
||||
CoreSessionSource::Unknown => SessionSource::Unknown,
|
||||
}
|
||||
}
|
||||
@@ -724,6 +726,7 @@ impl From<SessionSource> for CoreSessionSource {
|
||||
SessionSource::VsCode => CoreSessionSource::VSCode,
|
||||
SessionSource::Exec => CoreSessionSource::Exec,
|
||||
SessionSource::AppServer => CoreSessionSource::Mcp,
|
||||
SessionSource::SubAgent(sub) => CoreSessionSource::SubAgent(sub),
|
||||
SessionSource::Unknown => CoreSessionSource::Unknown,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -15,13 +15,12 @@ pub(crate) fn subagent_header(source: &Option<SessionSource>) -> Option<String>
|
||||
return None;
|
||||
};
|
||||
match sub {
|
||||
codex_protocol::protocol::SubAgentSource::Review => Some("review".to_string()),
|
||||
codex_protocol::protocol::SubAgentSource::Compact => Some("compact".to_string()),
|
||||
codex_protocol::protocol::SubAgentSource::ThreadSpawn { .. } => {
|
||||
Some("collab_spawn".to_string())
|
||||
}
|
||||
codex_protocol::protocol::SubAgentSource::Other(label) => Some(label.clone()),
|
||||
other => Some(
|
||||
serde_json::to_value(other)
|
||||
.ok()
|
||||
.and_then(|v| v.as_str().map(std::string::ToString::to_string))
|
||||
.unwrap_or_else(|| "other".to_string()),
|
||||
),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -39,12 +39,20 @@ impl AgentControl {
|
||||
&self,
|
||||
config: crate::config::Config,
|
||||
prompt: String,
|
||||
session_source: Option<codex_protocol::protocol::SessionSource>,
|
||||
) -> CodexResult<ThreadId> {
|
||||
let state = self.upgrade()?;
|
||||
let reservation = self.state.reserve_spawn_slot(config.agent_max_threads)?;
|
||||
|
||||
// The same `AgentControl` is sent to spawn the thread.
|
||||
let new_thread = state.spawn_new_thread(config, self.clone()).await?;
|
||||
let new_thread = match session_source {
|
||||
Some(session_source) => {
|
||||
state
|
||||
.spawn_new_thread_with_source(config, self.clone(), session_source)
|
||||
.await?
|
||||
}
|
||||
None => state.spawn_new_thread(config, self.clone()).await?,
|
||||
};
|
||||
reservation.commit(new_thread.thread_id);
|
||||
|
||||
// Notify a new thread has been created. This notification will be processed by clients
|
||||
@@ -268,7 +276,7 @@ mod tests {
|
||||
let control = AgentControl::default();
|
||||
let (_home, config) = test_config().await;
|
||||
let err = control
|
||||
.spawn_agent(config, "hello".to_string())
|
||||
.spawn_agent(config, "hello".to_string(), None)
|
||||
.await
|
||||
.expect_err("spawn_agent should fail without a manager");
|
||||
assert_eq!(
|
||||
@@ -370,7 +378,7 @@ mod tests {
|
||||
let harness = AgentControlHarness::new().await;
|
||||
let thread_id = harness
|
||||
.control
|
||||
.spawn_agent(harness.config.clone(), "spawned".to_string())
|
||||
.spawn_agent(harness.config.clone(), "spawned".to_string(), None)
|
||||
.await
|
||||
.expect("spawn_agent should succeed");
|
||||
let _thread = harness
|
||||
@@ -417,12 +425,12 @@ mod tests {
|
||||
.expect("start thread");
|
||||
|
||||
let first_agent_id = control
|
||||
.spawn_agent(config.clone(), "hello".to_string())
|
||||
.spawn_agent(config.clone(), "hello".to_string(), None)
|
||||
.await
|
||||
.expect("spawn_agent should succeed");
|
||||
|
||||
let err = control
|
||||
.spawn_agent(config, "hello again".to_string())
|
||||
.spawn_agent(config, "hello again".to_string(), None)
|
||||
.await
|
||||
.expect_err("spawn_agent should respect max threads");
|
||||
let CodexErr::AgentLimitReached {
|
||||
@@ -455,7 +463,7 @@ mod tests {
|
||||
let control = manager.agent_control();
|
||||
|
||||
let first_agent_id = control
|
||||
.spawn_agent(config.clone(), "hello".to_string())
|
||||
.spawn_agent(config.clone(), "hello".to_string(), None)
|
||||
.await
|
||||
.expect("spawn_agent should succeed");
|
||||
let _ = control
|
||||
@@ -464,7 +472,7 @@ mod tests {
|
||||
.expect("shutdown agent");
|
||||
|
||||
let second_agent_id = control
|
||||
.spawn_agent(config.clone(), "hello again".to_string())
|
||||
.spawn_agent(config.clone(), "hello again".to_string(), None)
|
||||
.await
|
||||
.expect("spawn_agent should succeed after shutdown");
|
||||
let _ = control
|
||||
@@ -490,12 +498,12 @@ mod tests {
|
||||
let cloned = control.clone();
|
||||
|
||||
let first_agent_id = cloned
|
||||
.spawn_agent(config.clone(), "hello".to_string())
|
||||
.spawn_agent(config.clone(), "hello".to_string(), None)
|
||||
.await
|
||||
.expect("spawn_agent should succeed");
|
||||
|
||||
let err = control
|
||||
.spawn_agent(config, "hello again".to_string())
|
||||
.spawn_agent(config, "hello again".to_string(), None)
|
||||
.await
|
||||
.expect_err("spawn_agent should respect shared guard");
|
||||
let CodexErr::AgentLimitReached { max_threads } = err else {
|
||||
|
||||
@@ -226,13 +226,11 @@ impl ModelClient {
|
||||
|
||||
let mut extra_headers = ApiHeaderMap::new();
|
||||
if let SessionSource::SubAgent(sub) = &self.state.session_source {
|
||||
let subagent = if let crate::protocol::SubAgentSource::Other(label) = sub {
|
||||
label.clone()
|
||||
} else {
|
||||
serde_json::to_value(sub)
|
||||
.ok()
|
||||
.and_then(|v| v.as_str().map(std::string::ToString::to_string))
|
||||
.unwrap_or_else(|| "other".to_string())
|
||||
let subagent = match sub {
|
||||
crate::protocol::SubAgentSource::Review => "review".to_string(),
|
||||
crate::protocol::SubAgentSource::Compact => "compact".to_string(),
|
||||
crate::protocol::SubAgentSource::ThreadSpawn { .. } => "collab_spawn".to_string(),
|
||||
crate::protocol::SubAgentSource::Other(label) => label.clone(),
|
||||
};
|
||||
if let Ok(val) = HeaderValue::from_str(&subagent) {
|
||||
extra_headers.insert("x-openai-subagent", val);
|
||||
|
||||
@@ -314,11 +314,22 @@ impl ThreadManagerState {
|
||||
config: Config,
|
||||
agent_control: AgentControl,
|
||||
) -> CodexResult<NewThread> {
|
||||
self.spawn_thread(
|
||||
self.spawn_new_thread_with_source(config, agent_control, self.session_source.clone())
|
||||
.await
|
||||
}
|
||||
|
||||
pub(crate) async fn spawn_new_thread_with_source(
|
||||
&self,
|
||||
config: Config,
|
||||
agent_control: AgentControl,
|
||||
session_source: SessionSource,
|
||||
) -> CodexResult<NewThread> {
|
||||
self.spawn_thread_with_source(
|
||||
config,
|
||||
InitialHistory::New,
|
||||
Arc::clone(&self.auth_manager),
|
||||
agent_control,
|
||||
session_source,
|
||||
)
|
||||
.await
|
||||
}
|
||||
@@ -330,6 +341,24 @@ impl ThreadManagerState {
|
||||
initial_history: InitialHistory,
|
||||
auth_manager: Arc<AuthManager>,
|
||||
agent_control: AgentControl,
|
||||
) -> CodexResult<NewThread> {
|
||||
self.spawn_thread_with_source(
|
||||
config,
|
||||
initial_history,
|
||||
auth_manager,
|
||||
agent_control,
|
||||
self.session_source.clone(),
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
pub(crate) async fn spawn_thread_with_source(
|
||||
&self,
|
||||
config: Config,
|
||||
initial_history: InitialHistory,
|
||||
auth_manager: Arc<AuthManager>,
|
||||
agent_control: AgentControl,
|
||||
session_source: SessionSource,
|
||||
) -> CodexResult<NewThread> {
|
||||
let CodexSpawnOk {
|
||||
codex, thread_id, ..
|
||||
@@ -339,7 +368,7 @@ impl ThreadManagerState {
|
||||
Arc::clone(&self.models_manager),
|
||||
Arc::clone(&self.skills_manager),
|
||||
initial_history,
|
||||
self.session_source.clone(),
|
||||
session_source,
|
||||
agent_control,
|
||||
)
|
||||
.await?;
|
||||
|
||||
@@ -78,6 +78,8 @@ impl ToolHandler for CollabHandler {
|
||||
mod spawn {
|
||||
use super::*;
|
||||
use crate::agent::AgentRole;
|
||||
use codex_protocol::protocol::SessionSource;
|
||||
use codex_protocol::protocol::SubAgentSource;
|
||||
use std::sync::Arc;
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
@@ -125,7 +127,13 @@ mod spawn {
|
||||
let result = session
|
||||
.services
|
||||
.agent_control
|
||||
.spawn_agent(config, prompt.clone())
|
||||
.spawn_agent(
|
||||
config,
|
||||
prompt.clone(),
|
||||
Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
|
||||
parent_thread_id: session.conversation_id,
|
||||
})),
|
||||
)
|
||||
.await
|
||||
.map_err(collab_spawn_error);
|
||||
let (new_thread_id, status) = match &result {
|
||||
|
||||
@@ -1518,6 +1518,7 @@ pub enum SessionSource {
|
||||
pub enum SubAgentSource {
|
||||
Review,
|
||||
Compact,
|
||||
ThreadSpawn { parent_thread_id: ThreadId },
|
||||
Other(String),
|
||||
}
|
||||
|
||||
@@ -1539,6 +1540,9 @@ impl fmt::Display for SubAgentSource {
|
||||
match self {
|
||||
SubAgentSource::Review => f.write_str("review"),
|
||||
SubAgentSource::Compact => f.write_str("compact"),
|
||||
SubAgentSource::ThreadSpawn { parent_thread_id } => {
|
||||
write!(f, "thread_spawn_{parent_thread_id}")
|
||||
}
|
||||
SubAgentSource::Other(other) => f.write_str(other),
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user