core: inject code-mode session providers

This commit is contained in:
Channing Conger
2026-05-22 18:55:42 +00:00
parent 53ac02356e
commit 8034aa2790
11 changed files with 208 additions and 50 deletions

View File

@@ -308,6 +308,9 @@ impl ExternalAgentConfigRequestProcessor {
initial_history: InitialHistory::Forked(rollout_items),
session_source: None,
thread_source: None,
code_mode_session_provider: Some(
ThreadManager::in_process_code_mode_session_provider(),
),
dynamic_tools: Vec::new(),
persist_extended_history: false,
metrics_service_name: None,

View File

@@ -1105,6 +1105,9 @@ impl ThreadRequestProcessor {
},
session_source: None,
thread_source,
code_mode_session_provider: Some(
ThreadManager::in_process_code_mode_session_provider(),
),
dynamic_tools: core_dynamic_tools,
persist_extended_history: false,
metrics_service_name: service_name,

View File

@@ -161,13 +161,18 @@ pub(crate) struct AgentControl {
/// `ThreadManagerState -> CodexThread -> Session -> SessionServices -> ThreadManagerState`.
manager: Weak<ThreadManagerState>,
state: Arc<AgentRegistry>,
code_mode_session_provider: Option<Arc<dyn codex_code_mode::CodeModeSessionProvider>>,
}
impl AgentControl {
/// Construct a new `AgentControl` that can spawn/message agents via the given manager state.
pub(crate) fn new(manager: Weak<ThreadManagerState>) -> Self {
pub(crate) fn new(
manager: Weak<ThreadManagerState>,
code_mode_session_provider: Option<Arc<dyn codex_code_mode::CodeModeSessionProvider>>,
) -> Self {
Self {
manager,
code_mode_session_provider,
..Default::default()
}
}
@@ -181,6 +186,12 @@ impl AgentControl {
self.session_id
}
pub(crate) fn code_mode_session_provider(
&self,
) -> Option<Arc<dyn codex_code_mode::CodeModeSessionProvider>> {
self.code_mode_session_provider.clone()
}
/// Spawn a new agent thread and submit the initial prompt.
#[cfg(test)]
pub(crate) async fn spawn_agent(

View File

@@ -608,6 +608,7 @@ impl Codex {
// Generate a unique ID for the lifetime of this Codex session.
let session_source_clone = session_configuration.session_source.clone();
let (agent_status_tx, agent_status_rx) = watch::channel(AgentStatus::PendingInit);
let code_mode_session_provider = agent_control.code_mode_session_provider();
let session = Session::new(
session_configuration,
@@ -623,6 +624,7 @@ impl Codex {
skills_manager,
plugins_manager,
mcp_manager.clone(),
code_mode_session_provider,
extensions,
agent_control,
environment_manager,

View File

@@ -497,6 +497,7 @@ impl Session {
skills_manager: Arc<SkillsManager>,
plugins_manager: Arc<PluginsManager>,
mcp_manager: Arc<McpManager>,
code_mode_session_provider: Option<Arc<dyn codex_code_mode::CodeModeSessionProvider>>,
extensions: Arc<codex_extension_api::ExtensionRegistry<crate::config::Config>>,
agent_control: AgentControl,
environment_manager: Arc<EnvironmentManager>,
@@ -1052,7 +1053,11 @@ impl Session {
session_configuration.parent_thread_id,
),
),
code_mode_service: crate::tools::code_mode::CodeModeService::new(),
code_mode_service: crate::tools::code_mode::CodeModeService::from_provider(
code_mode_session_provider,
)
.await
.map_err(anyhow::Error::msg)?,
environment_manager,
};
services

View File

@@ -4508,6 +4508,7 @@ async fn session_new_fails_when_zsh_fork_enabled_without_packaged_zsh() {
skills_manager,
plugins_manager,
mcp_manager,
/*code_mode_session_provider*/ None,
Arc::new(codex_extension_api::ExtensionRegistryBuilder::new().build()),
AgentControl::default(),
Arc::new(codex_exec_server::EnvironmentManager::default_for_tests()),
@@ -4857,6 +4858,7 @@ async fn make_session_with_config_and_rx(
skills_manager,
plugins_manager,
mcp_manager,
/*code_mode_session_provider*/ None,
Arc::new(codex_extension_api::ExtensionRegistryBuilder::new().build()),
AgentControl::default(),
Arc::new(codex_exec_server::EnvironmentManager::default_for_tests()),
@@ -4962,6 +4964,7 @@ async fn make_session_with_history_source_and_agent_control_and_rx(
skills_manager,
plugins_manager,
mcp_manager,
/*code_mode_session_provider*/ None,
Arc::new(codex_extension_api::ExtensionRegistryBuilder::new().build()),
agent_control,
Arc::new(codex_exec_server::EnvironmentManager::default_for_tests()),

View File

@@ -68,6 +68,7 @@ use std::collections::HashMap;
use std::collections::HashSet;
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::LazyLock;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use std::time::Duration;
@@ -82,6 +83,9 @@ const THREAD_CREATED_CHANNEL_CAPACITY: usize = 1024;
/// In production builds this value should remain at its default (`false`) and
/// must not be toggled.
static FORCE_TEST_THREAD_MANAGER_BEHAVIOR: AtomicBool = AtomicBool::new(false);
static IN_PROCESS_CODE_MODE_SESSION_PROVIDER: LazyLock<
Arc<dyn codex_code_mode::CodeModeSessionProvider>,
> = LazyLock::new(|| Arc::new(codex_code_mode::InProcessCodeModeSessionProvider));
type CapturedOps = Vec<(ThreadId, Op)>;
type SharedCapturedOps = Arc<std::sync::Mutex<CapturedOps>>;
@@ -176,6 +180,7 @@ pub struct StartThreadOptions {
pub initial_history: InitialHistory,
pub session_source: Option<SessionSource>,
pub thread_source: Option<ThreadSource>,
pub code_mode_session_provider: Option<Arc<dyn codex_code_mode::CodeModeSessionProvider>>,
pub dynamic_tools: Vec<codex_protocol::dynamic_tools::DynamicToolSpec>,
pub persist_extended_history: bool,
pub metrics_service_name: Option<String>,
@@ -404,6 +409,11 @@ impl ThreadManager {
self.state.session_source.clone()
}
pub fn in_process_code_mode_session_provider()
-> Arc<dyn codex_code_mode::CodeModeSessionProvider> {
Arc::clone(&IN_PROCESS_CODE_MODE_SESSION_PROVIDER)
}
pub fn auth_manager(&self) -> Arc<AuthManager> {
self.state.auth_manager.clone()
}
@@ -577,6 +587,7 @@ impl ThreadManager {
initial_history: InitialHistory::New,
session_source: None,
thread_source: None,
code_mode_session_provider: Some(Self::in_process_code_mode_session_provider()),
dynamic_tools,
persist_extended_history,
metrics_service_name: None,
@@ -605,11 +616,13 @@ impl ThreadManager {
.unwrap_or_else(|| (self.state.session_source.clone(), None));
let session_source = options.session_source.unwrap_or(resumed_session_source);
let thread_source = options.thread_source.or(resumed_thread_source);
let agent_control =
self.agent_control_with_code_mode_session_provider(options.code_mode_session_provider);
Box::pin(self.state.spawn_thread_with_source(
options.config,
options.initial_history,
Arc::clone(&self.state.auth_manager),
self.agent_control(),
agent_control,
session_source,
/*parent_thread_id*/ None,
forked_from_thread_id,
@@ -944,7 +957,16 @@ impl ThreadManager {
}
pub(crate) fn agent_control(&self) -> AgentControl {
AgentControl::new(Arc::downgrade(&self.state))
self.agent_control_with_code_mode_session_provider(Some(
Self::in_process_code_mode_session_provider(),
))
}
fn agent_control_with_code_mode_session_provider(
&self,
code_mode_session_provider: Option<Arc<dyn codex_code_mode::CodeModeSessionProvider>>,
) -> AgentControl {
AgentControl::new(Arc::downgrade(&self.state), code_mode_session_provider)
}
#[cfg(test)]

View File

@@ -27,12 +27,33 @@ use core_test_support::PathBufExt;
use core_test_support::PathExt;
use core_test_support::responses::mount_models_once;
use pretty_assertions::assert_eq;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use std::time::Duration;
use tempfile::tempdir;
use wiremock::MockServer;
const TEST_INSTALLATION_ID: &str = "11111111-1111-4111-8111-111111111111";
struct CountingCodeModeSessionProvider {
created_sessions: Arc<AtomicUsize>,
}
impl codex_code_mode::CodeModeSessionProvider for CountingCodeModeSessionProvider {
fn create_session<'a>(
&'a self,
delegate: Arc<dyn codex_code_mode::CodeModeSessionDelegate>,
) -> codex_code_mode::CodeModeSessionProviderFuture<'a> {
let created_sessions = Arc::clone(&self.created_sessions);
Box::pin(async move {
created_sessions.fetch_add(1, Ordering::Relaxed);
let session: Arc<dyn codex_code_mode::CodeModeSession> =
Arc::new(codex_code_mode::CodeModeService::with_delegate(delegate));
Ok(session)
})
}
}
fn user_msg(text: &str) -> ResponseItem {
ResponseItem::Message {
id: None,
@@ -321,22 +342,26 @@ async fn start_thread_rejects_explicit_local_environment_when_default_provider_i
environment_manager,
);
let result = manager
.start_thread_with_options(StartThreadOptions {
config: config.clone(),
initial_history: InitialHistory::New,
session_source: None,
thread_source: None,
dynamic_tools: Vec::new(),
persist_extended_history: false,
metrics_service_name: None,
parent_trace: None,
environments: vec![TurnEnvironmentSelection {
environment_id: "local".to_string(),
cwd: config.cwd.clone(),
}],
})
.await;
let result =
manager
.start_thread_with_options(StartThreadOptions {
config: config.clone(),
initial_history: InitialHistory::New,
session_source: None,
thread_source: None,
code_mode_session_provider: Some(
ThreadManager::in_process_code_mode_session_provider(),
),
dynamic_tools: Vec::new(),
persist_extended_history: false,
metrics_service_name: None,
parent_trace: None,
environments: vec![TurnEnvironmentSelection {
environment_id: "local".to_string(),
cwd: config.cwd.clone(),
}],
})
.await;
let err = match result {
Ok(_) => panic!("explicit local environment should not resolve when provider is disabled"),
Err(err) => err,
@@ -439,6 +464,61 @@ args = ["dev", "cd /tmp && true"]
assert!(!environment_context.contains("\n <shell>"));
}
#[test]
fn in_process_code_mode_session_provider_is_a_singleton() {
assert!(Arc::ptr_eq(
&ThreadManager::in_process_code_mode_session_provider(),
&ThreadManager::in_process_code_mode_session_provider(),
));
}
#[tokio::test]
async fn start_thread_options_code_mode_provider_is_propagated_to_spawned_agents() {
let temp_dir = tempdir().expect("tempdir");
let mut config = test_config().await;
config.codex_home = temp_dir.path().join("codex-home").abs();
config.cwd = config.codex_home.abs();
std::fs::create_dir_all(&config.codex_home).expect("create codex home");
let manager = ThreadManager::with_models_provider_and_home_for_tests(
CodexAuth::from_api_key("dummy"),
config.model_provider.clone(),
config.codex_home.to_path_buf(),
Arc::new(codex_exec_server::EnvironmentManager::default_for_tests()),
);
let created_sessions = Arc::new(AtomicUsize::new(0));
let provider: Arc<dyn codex_code_mode::CodeModeSessionProvider> =
Arc::new(CountingCodeModeSessionProvider {
created_sessions: Arc::clone(&created_sessions),
});
let root = manager
.start_thread_with_options(StartThreadOptions {
config: config.clone(),
initial_history: InitialHistory::New,
session_source: None,
thread_source: None,
code_mode_session_provider: Some(provider),
dynamic_tools: Vec::new(),
persist_extended_history: false,
metrics_service_name: None,
parent_trace: None,
environments: Vec::new(),
})
.await
.expect("root thread should start");
assert_eq!(created_sessions.load(Ordering::Relaxed), 1);
root.thread
.codex
.session
.services
.agent_control
.spawn_agent(config, Op::Shutdown, /*session_source*/ None)
.await
.expect("child thread should start");
assert_eq!(created_sessions.load(Ordering::Relaxed), 2);
}
#[tokio::test]
async fn start_thread_keeps_internal_threads_hidden_from_normal_lookups() {
let temp_dir = tempdir().expect("tempdir");
@@ -453,22 +533,26 @@ async fn start_thread_keeps_internal_threads_hidden_from_normal_lookups() {
config.codex_home.to_path_buf(),
Arc::new(codex_exec_server::EnvironmentManager::default_for_tests()),
);
let thread = manager
.start_thread_with_options(StartThreadOptions {
config,
initial_history: InitialHistory::New,
session_source: Some(SessionSource::Internal(
InternalSessionSource::MemoryConsolidation,
)),
thread_source: None,
dynamic_tools: Vec::new(),
persist_extended_history: false,
metrics_service_name: None,
parent_trace: None,
environments: Vec::new(),
})
.await
.expect("internal thread should start");
let thread =
manager
.start_thread_with_options(StartThreadOptions {
config,
initial_history: InitialHistory::New,
session_source: Some(SessionSource::Internal(
InternalSessionSource::MemoryConsolidation,
)),
thread_source: None,
code_mode_session_provider: Some(
ThreadManager::in_process_code_mode_session_provider(),
),
dynamic_tools: Vec::new(),
persist_extended_history: false,
metrics_service_name: None,
parent_trace: None,
environments: Vec::new(),
})
.await
.expect("internal thread should start");
assert_eq!(manager.list_thread_ids().await, Vec::new());
assert!(manager.get_thread(thread.thread_id).await.is_err());
@@ -517,6 +601,7 @@ async fn resume_and_fork_do_not_restore_thread_environments_from_rollout() {
initial_history: InitialHistory::New,
session_source: None,
thread_source: None,
code_mode_session_provider: None,
dynamic_tools: Vec::new(),
persist_extended_history: false,
metrics_service_name: None,
@@ -789,6 +874,7 @@ async fn resume_stopped_thread_from_rollout_preserves_thread_source() {
initial_history: InitialHistory::New,
session_source: None,
thread_source: Some(ThreadSource::User),
code_mode_session_provider: None,
dynamic_tools: Vec::new(),
persist_extended_history: false,
metrics_service_name: None,

View File

@@ -11,6 +11,7 @@ use std::time::Duration;
use codex_code_mode::CellId;
use codex_code_mode::CodeModeNestedToolCall;
use codex_code_mode::CodeModeSession;
use codex_code_mode::CodeModeSessionProvider;
use codex_code_mode::CodeModeToolKind;
use codex_code_mode::RuntimeResponse;
use codex_protocol::models::FunctionCallOutputContentItem;
@@ -63,6 +64,7 @@ pub(crate) struct CodeModeService {
}
impl CodeModeService {
#[cfg(test)]
pub(crate) fn new() -> Self {
let dispatch_broker = Arc::new(CodeModeDispatchBroker::new());
Self {
@@ -73,6 +75,20 @@ impl CodeModeService {
}
}
pub(crate) async fn from_provider(
provider: Option<Arc<dyn CodeModeSessionProvider>>,
) -> Result<Self, String> {
let dispatch_broker = Arc::new(CodeModeDispatchBroker::new());
let session = match provider {
Some(provider) => Some(provider.create_session(dispatch_broker.clone()).await?),
None => None,
};
Ok(Self {
session,
dispatch_broker,
})
}
pub(crate) async fn execute(
&self,
request: codex_code_mode::ExecuteRequest,

View File

@@ -1,6 +1,7 @@
use anyhow::Result;
use codex_core::StartThreadOptions;
use codex_core::ThreadConfigSnapshot;
use codex_core::ThreadManager;
use codex_core::config::AgentRoleConfig;
use codex_features::Feature;
use codex_protocol::ThreadId;
@@ -744,20 +745,23 @@ async fn subagent_stop_replaces_stop_and_skips_internal_subagents() -> Result<()
// This matcher would catch the old synthetic "review" SubagentStop target
// because the SubagentStop hook above intentionally matches all agent types.
let internal_thread = test
.thread_manager
.start_thread_with_options(StartThreadOptions {
config: test.config.clone(),
initial_history: InitialHistory::New,
session_source: Some(SessionSource::SubAgent(SubAgentSource::Review)),
thread_source: None,
dynamic_tools: Vec::new(),
persist_extended_history: false,
metrics_service_name: None,
parent_trace: None,
environments: Vec::new(),
})
.await?;
let internal_thread =
test.thread_manager
.start_thread_with_options(StartThreadOptions {
config: test.config.clone(),
initial_history: InitialHistory::New,
session_source: Some(SessionSource::SubAgent(SubAgentSource::Review)),
thread_source: None,
code_mode_session_provider: Some(
ThreadManager::in_process_code_mode_session_provider(),
),
dynamic_tools: Vec::new(),
persist_extended_history: false,
metrics_service_name: None,
parent_trace: None,
environments: Vec::new(),
})
.await?;
let (sandbox_policy, permission_profile) =
turn_permission_fields(PermissionProfile::Disabled, test.cwd_path());

View File

@@ -247,6 +247,9 @@ impl MemoryStartupContext {
InternalSessionSource::MemoryConsolidation,
)),
thread_source: Some(ThreadSource::MemoryConsolidation),
code_mode_session_provider: Some(
ThreadManager::in_process_code_mode_session_provider(),
),
dynamic_tools: Vec::new(),
persist_extended_history: false,
metrics_service_name: None,