Simplify optional local environment follow-up

This commit is contained in:
starr-openai
2026-05-18 18:56:11 -07:00
parent 4af256e946
commit 4fdb044248
13 changed files with 231 additions and 95 deletions

View File

@@ -166,6 +166,7 @@ pub(crate) struct MessageProcessor {
command_exec_processor: CommandExecRequestProcessor,
process_exec_processor: ProcessExecRequestProcessor,
config_processor: ConfigRequestProcessor,
environment_manager: Arc<EnvironmentManager>,
environment_processor: EnvironmentRequestProcessor,
external_agent_config_processor: ExternalAgentConfigRequestProcessor,
feedback_processor: FeedbackRequestProcessor,
@@ -279,6 +280,13 @@ impl MessageProcessor {
.ok_or_else(|| internal_error("local filesystem is not configured"))
}
fn require_local_environment(&self) -> Result<(), JSONRPCErrorError> {
self.environment_manager
.has_local_environment()
.then_some(())
.ok_or_else(|| internal_error("local environment is not configured"))
}
/// Create a new `MessageProcessor`, retaining a handle to the outgoing
/// `Sender` so handlers can enqueue messages to be written to stdout.
pub(crate) fn new(args: MessageProcessorArgs) -> Self {
@@ -308,6 +316,7 @@ impl MessageProcessor {
// affect per-thread behavior, but they must not move newly started,
// resumed, or forked threads to a different persistence backend/root.
let thread_store = codex_core::thread_store_from_config(config.as_ref(), state_db.clone());
let environment_manager_for_requests = Arc::clone(&environment_manager);
let thread_manager = Arc::new_cyclic(|thread_manager| {
ThreadManager::new(
config.as_ref(),
@@ -362,12 +371,8 @@ impl MessageProcessor {
Arc::clone(&config),
outgoing.clone(),
config_manager.clone(),
thread_manager.environment_manager(),
);
let process_exec_processor = ProcessExecRequestProcessor::new(
outgoing.clone(),
thread_manager.environment_manager(),
);
let process_exec_processor = ProcessExecRequestProcessor::new(outgoing.clone());
let feedback_processor = FeedbackRequestProcessor::new(
auth_manager.clone(),
Arc::clone(&thread_manager),
@@ -494,6 +499,7 @@ impl MessageProcessor {
command_exec_processor,
process_exec_processor,
config_processor,
environment_manager: environment_manager_for_requests,
environment_processor,
external_agent_config_processor,
feedback_processor,
@@ -1294,6 +1300,7 @@ impl MessageProcessor {
.await
.map(|response| Some(response.into())),
ClientRequest::OneOffCommandExec { params, .. } => {
self.require_local_environment()?;
self.command_exec_processor
.one_off_command_exec(&request_id, params)
.await
@@ -1313,11 +1320,13 @@ impl MessageProcessor {
.command_exec_terminate(request_id.clone(), params)
.await
}
ClientRequest::ProcessSpawn { params, .. } => self
.process_exec_processor
.process_spawn(request_id.clone(), params)
.await
.map(|()| None),
ClientRequest::ProcessSpawn { params, .. } => {
self.require_local_environment()?;
self.process_exec_processor
.process_spawn(request_id.clone(), params)
.await
.map(|()| None)
}
ClientRequest::ProcessWriteStdin { params, .. } => {
self.process_exec_processor
.process_write_stdin(request_id.clone(), params)

View File

@@ -6,7 +6,6 @@ pub(crate) struct CommandExecRequestProcessor {
config: Arc<Config>,
outgoing: Arc<OutgoingMessageSender>,
config_manager: ConfigManager,
environment_manager: Arc<EnvironmentManager>,
command_exec_manager: CommandExecManager,
}
@@ -16,14 +15,12 @@ impl CommandExecRequestProcessor {
config: Arc<Config>,
outgoing: Arc<OutgoingMessageSender>,
config_manager: ConfigManager,
environment_manager: Arc<EnvironmentManager>,
) -> Self {
Self {
arg0_paths,
config,
outgoing,
config_manager,
environment_manager,
command_exec_manager: CommandExecManager::default(),
}
}
@@ -93,10 +90,6 @@ impl CommandExecRequestProcessor {
) -> Result<(), JSONRPCErrorError> {
tracing::debug!("ExecOneOffCommand params: {params:?}");
if self.environment_manager.try_local_environment().is_none() {
return Err(internal_error("local environment is not configured"));
}
let request = request_id.clone();
if params.command.is_empty() {

View File

@@ -209,7 +209,11 @@ impl McpRequestProcessor {
// executor-backed stdio MCPs whose config omits `cwd`.
let runtime_environment = McpRuntimeEnvironment::new(
environment_manager.default_or_local_environment(),
environment_manager.try_local_environment(),
if environment_manager.has_local_environment() {
codex_mcp::LocalStdioAvailability::Enabled
} else {
codex_mcp::LocalStdioAvailability::Disabled
},
config.cwd.to_path_buf(),
);
@@ -370,7 +374,11 @@ impl McpRequestProcessor {
// used only by executor-backed stdio MCPs whose config omits `cwd`.
let runtime_environment = McpRuntimeEnvironment::new(
environment_manager.default_or_local_environment(),
environment_manager.try_local_environment(),
if environment_manager.has_local_environment() {
codex_mcp::LocalStdioAvailability::Enabled
} else {
codex_mcp::LocalStdioAvailability::Disabled
},
config.cwd.to_path_buf(),
);
let request_id = request_id.clone();

View File

@@ -23,7 +23,6 @@ use codex_app_server_protocol::ServerNotification;
use codex_core::exec::ExecExpiration;
use codex_core::exec::ExecExpirationOutcome;
use codex_core::exec::IO_DRAIN_TIMEOUT_MS;
use codex_exec_server::EnvironmentManager;
use codex_protocol::exec_output::bytes_to_string_smart;
use codex_utils_absolute_path::AbsolutePathBuf;
use codex_utils_pty::DEFAULT_OUTPUT_BYTES_CAP;
@@ -49,18 +48,13 @@ const OUTPUT_CHUNK_SIZE_HINT: usize = 64 * 1024;
#[derive(Clone)]
pub(crate) struct ProcessExecRequestProcessor {
outgoing: Arc<OutgoingMessageSender>,
environment_manager: Arc<EnvironmentManager>,
process_exec_manager: ProcessExecManager,
}
impl ProcessExecRequestProcessor {
pub(crate) fn new(
outgoing: Arc<OutgoingMessageSender>,
environment_manager: Arc<EnvironmentManager>,
) -> Self {
pub(crate) fn new(outgoing: Arc<OutgoingMessageSender>) -> Self {
Self {
outgoing,
environment_manager,
process_exec_manager: ProcessExecManager::default(),
}
}
@@ -70,10 +64,6 @@ impl ProcessExecRequestProcessor {
request_id: ConnectionRequestId,
params: ProcessSpawnParams,
) -> Result<(), JSONRPCErrorError> {
if self.environment_manager.try_local_environment().is_none() {
return Err(internal_error("local environment is not configured"));
}
let ProcessSpawnParams {
command,
process_handle,

View File

@@ -72,7 +72,6 @@ pub struct McpConnectionManager {
clients: HashMap<String, AsyncManagedClient>,
server_metadata: HashMap<String, McpServerMetadata>,
tool_plugin_provenance: Arc<ToolPluginProvenance>,
startup_failures: HashMap<String, String>,
host_owned_codex_apps_enabled: bool,
elicitation_requests: ElicitationRequestManager,
startup_cancellation_token: CancellationToken,
@@ -94,7 +93,6 @@ impl McpConnectionManager {
clients: HashMap::new(),
server_metadata: HashMap::new(),
tool_plugin_provenance: Arc::new(ToolPluginProvenance::default()),
startup_failures: HashMap::new(),
host_owned_codex_apps_enabled: false,
elicitation_requests: ElicitationRequestManager::new(
approval_policy.value(),
@@ -115,7 +113,6 @@ impl McpConnectionManager {
self.startup_cancellation_token.cancel();
let clients = std::mem::take(&mut self.clients);
self.server_metadata.clear();
self.startup_failures.clear();
async move {
for client in clients.into_values() {
client.shutdown().await;
@@ -191,7 +188,7 @@ impl McpConnectionManager {
let cancel_token = CancellationToken::new();
let mut clients = HashMap::new();
let mut server_metadata = HashMap::new();
let mut startup_failures = HashMap::new();
let mut preflight_failures = Vec::new();
let mut join_set = JoinSet::new();
let elicitation_requests = ElicitationRequestManager::new(
approval_policy.value(),
@@ -227,28 +224,26 @@ impl McpConnectionManager {
)
.await;
if let Some(reason) = startup_unavailable_reason {
startup_failures.insert(server_name.clone(), reason.clone());
let tx_event = tx_event.clone();
let submit_id = startup_submit_id.clone();
let auth_entry = auth_entries.get(&server_name).cloned();
join_set.spawn(async move {
let outcome = Err(StartupOutcomeError::Failed { error: reason });
let error = match &outcome {
Err(error) => {
mcp_init_error_display(server_name.as_str(), auth_entry.as_ref(), error)
}
Ok(_) => unreachable!("preflight failure cannot start an MCP client"),
};
let _ = emit_update(
submit_id.as_str(),
&tx_event,
McpStartupUpdateEvent {
server: server_name.clone(),
status: McpStartupStatus::Failed { error },
},
)
.await;
(server_name, outcome)
let error = mcp_init_error_display(
server_name.as_str(),
auth_entry.as_ref(),
&StartupOutcomeError::Failed {
error: reason.clone(),
},
);
let _ = emit_update(
startup_submit_id.as_str(),
&tx_event,
McpStartupUpdateEvent {
server: server_name.clone(),
status: McpStartupStatus::Failed { error },
},
)
.await;
preflight_failures.push(McpStartupFailure {
server: server_name,
error: reason,
});
continue;
}
@@ -329,14 +324,16 @@ impl McpConnectionManager {
clients,
server_metadata,
tool_plugin_provenance,
startup_failures,
host_owned_codex_apps_enabled,
elicitation_requests: elicitation_requests.clone(),
startup_cancellation_token: cancel_token.clone(),
};
tokio::spawn(async move {
let outcomes = join_set.join_all().await;
let mut summary = McpStartupCompleteEvent::default();
let mut summary = McpStartupCompleteEvent {
failed: preflight_failures,
..Default::default()
};
for (server_name, outcome) in outcomes {
match outcome {
Ok(_) => summary.ready.push(server_name),
@@ -388,13 +385,6 @@ impl McpConnectionManager {
let mut failures = Vec::new();
for server_name in required_servers {
let Some(async_managed_client) = self.clients.get(server_name).cloned() else {
if let Some(error) = self.startup_failures.get(server_name) {
failures.push(McpStartupFailure {
server: server_name.clone(),
error: error.clone(),
});
continue;
}
failures.push(McpStartupFailure {
server: server_name.clone(),
error: format!("required MCP server `{server_name}` was not initialized"),

View File

@@ -10,6 +10,7 @@ use crate::elicitation::elicitation_is_rejected_by_policy;
use crate::rmcp_client::AsyncManagedClient;
use crate::rmcp_client::ManagedClient;
use crate::rmcp_client::StartupOutcomeError;
use crate::runtime::LocalStdioAvailability;
use crate::server::McpServerOrigin;
use crate::tools::ToolFilter;
use crate::tools::ToolInfo;
@@ -917,7 +918,7 @@ async fn no_local_runtime_skips_local_stdio_but_keeps_local_http_server() {
PermissionProfile::default(),
McpRuntimeEnvironment::new(
/*environment*/ None,
/*local_environment*/ None,
LocalStdioAvailability::Disabled,
PathBuf::from("/tmp"),
),
codex_home.path().to_path_buf(),
@@ -946,10 +947,6 @@ async fn no_local_runtime_skips_local_stdio_but_keeps_local_http_server() {
.await;
assert_eq!(failures.len(), 1);
assert_eq!(failures[0].server, "stdio");
assert_eq!(
failures[0].error,
"local stdio MCP server `stdio` requires a local environment"
);
cancel_token.cancel();
}

View File

@@ -3,6 +3,7 @@ pub use elicitation::ElicitationReviewRequest;
pub use elicitation::ElicitationReviewer;
pub use elicitation::ElicitationReviewerHandle;
pub use rmcp_client::MCP_SANDBOX_STATE_META_CAPABILITY;
pub use runtime::LocalStdioAvailability;
pub use runtime::McpRuntimeEnvironment;
pub use runtime::SandboxState;
pub use tools::ToolInfo;

View File

@@ -30,28 +30,35 @@ pub struct SandboxState {
/// Runtime placement information used when starting MCP server transports.
///
/// `McpConfig` describes what servers exist. This value describes where those
/// servers should run for the current caller. Keep it explicit at manager
/// construction time so status/snapshot paths and real sessions make the same
/// local-vs-remote decision. `fallback_cwd` is not a per-server override; it is
/// used when a stdio server omits `cwd` and the launcher needs a concrete
/// process working directory.
/// `McpConfig` describes what servers exist. This value describes which
/// selected/default environment remote MCP servers should use plus whether
/// local stdio MCP startup is allowed for the current caller. Keep it explicit
/// at manager construction time so status/snapshot paths and real sessions
/// make the same placement decision. `fallback_cwd` is not a per-server
/// override; it is used when a stdio server omits `cwd` and the launcher needs
/// a concrete process working directory.
#[derive(Clone)]
pub struct McpRuntimeEnvironment {
environment: Option<Arc<Environment>>,
local_environment: Option<Arc<Environment>>,
local_stdio_availability: LocalStdioAvailability,
fallback_cwd: PathBuf,
}
#[derive(Clone, Copy)]
pub enum LocalStdioAvailability {
Enabled,
Disabled,
}
impl McpRuntimeEnvironment {
pub fn new(
environment: Option<Arc<Environment>>,
local_environment: Option<Arc<Environment>>,
local_stdio_availability: LocalStdioAvailability,
fallback_cwd: PathBuf,
) -> Self {
Self {
environment,
local_environment,
local_stdio_availability,
fallback_cwd,
}
}
@@ -71,12 +78,13 @@ impl McpRuntimeEnvironment {
) -> Option<String> {
match config.experimental_environment.as_deref() {
None | Some("local") => {
if self.local_environment.is_none()
&& matches!(
config.transport,
codex_config::McpServerTransportConfig::Stdio { .. }
)
{
if matches!(
self.local_stdio_availability,
LocalStdioAvailability::Disabled
) && matches!(
config.transport,
codex_config::McpServerTransportConfig::Stdio { .. }
) {
Some(format!(
"local stdio MCP server `{server_name}` requires a local environment"
))
@@ -102,3 +110,117 @@ pub(crate) fn emit_duration(metric: &str, duration: Duration, tags: &[(&str, &st
let _ = metrics.record_duration(metric, duration, tags);
}
}
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use codex_config::McpServerConfig;
use codex_config::McpServerTransportConfig;
use pretty_assertions::assert_eq;
use super::*;
fn stdio_server(experimental_environment: Option<&str>) -> McpServerConfig {
McpServerConfig {
transport: McpServerTransportConfig::Stdio {
command: "echo".to_string(),
args: Vec::new(),
env: None,
env_vars: Vec::new(),
cwd: None,
},
experimental_environment: experimental_environment.map(str::to_string),
enabled: true,
required: false,
supports_parallel_tool_calls: false,
disabled_reason: None,
startup_timeout_sec: None,
tool_timeout_sec: None,
default_tools_approval_mode: None,
enabled_tools: None,
disabled_tools: None,
scopes: None,
oauth: None,
oauth_resource: None,
tools: HashMap::new(),
}
}
fn http_server(experimental_environment: Option<&str>) -> McpServerConfig {
McpServerConfig {
transport: McpServerTransportConfig::StreamableHttp {
url: "http://127.0.0.1:1".to_string(),
bearer_token_env_var: None,
http_headers: None,
env_http_headers: None,
},
experimental_environment: experimental_environment.map(str::to_string),
..stdio_server(None)
}
}
#[test]
fn local_stdio_requires_local_stdio_availability() {
let runtime_environment = McpRuntimeEnvironment::new(
/*environment*/ None,
LocalStdioAvailability::Disabled,
PathBuf::from("/tmp"),
);
assert_eq!(
runtime_environment.startup_unavailable_reason("stdio", &stdio_server(None)),
Some("local stdio MCP server `stdio` requires a local environment".to_string())
);
}
#[test]
fn local_http_does_not_require_local_stdio_availability() {
let runtime_environment = McpRuntimeEnvironment::new(
/*environment*/ None,
LocalStdioAvailability::Disabled,
PathBuf::from("/tmp"),
);
assert_eq!(
runtime_environment.startup_unavailable_reason("http", &http_server(None)),
None
);
}
#[test]
fn remote_stdio_requires_remote_environment() {
let runtime_environment = McpRuntimeEnvironment::new(
/*environment*/ None,
LocalStdioAvailability::Enabled,
PathBuf::from("/tmp"),
);
assert_eq!(
runtime_environment.startup_unavailable_reason("stdio", &stdio_server(Some("remote"))),
Some("remote MCP server `stdio` requires a remote environment".to_string())
);
}
#[test]
fn remote_stdio_and_http_accept_remote_environment() {
let environment = Arc::new(
Environment::create_for_tests(Some("ws://127.0.0.1:8765".to_string()))
.expect("remote environment"),
);
let runtime_environment = McpRuntimeEnvironment::new(
Some(environment),
LocalStdioAvailability::Disabled,
PathBuf::from("/tmp"),
);
assert_eq!(
runtime_environment.startup_unavailable_reason("stdio", &stdio_server(Some("remote"))),
None
);
assert_eq!(
runtime_environment.startup_unavailable_reason("http", &http_server(Some("remote"))),
None
);
}
}

View File

@@ -272,7 +272,11 @@ pub async fn list_accessible_connectors_from_mcp_tools_with_environment_manager(
PermissionProfile::default(),
McpRuntimeEnvironment::new(
environment_manager.default_or_local_environment(),
environment_manager.try_local_environment(),
if environment_manager.has_local_environment() {
codex_mcp::LocalStdioAvailability::Enabled
} else {
codex_mcp::LocalStdioAvailability::Disabled
},
config.cwd.to_path_buf(),
),
config.codex_home.to_path_buf(),

View File

@@ -1245,10 +1245,14 @@ async fn install_host_owned_codex_apps_manager(session: &Session, turn_context:
turn_context.sub_id.clone(),
session.get_tx_event(),
turn_context.permission_profile(),
codex_mcp::McpRuntimeEnvironment::new(Some(environment.clone()), Some(environment), {
#[allow(deprecated)]
turn_context.cwd.to_path_buf()
}),
codex_mcp::McpRuntimeEnvironment::new(
Some(environment),
codex_mcp::LocalStdioAvailability::Enabled,
{
#[allow(deprecated)]
turn_context.cwd.to_path_buf()
},
),
turn_context.config.codex_home.to_path_buf(),
codex_mcp::codex_apps_tools_cache_key(auth.as_ref()),
/*host_owned_codex_apps_enabled*/ true,

View File

@@ -289,18 +289,23 @@ impl Session {
host_owned_codex_apps_enabled(&mcp_config, auth.as_ref());
let auth_statuses =
compute_auth_statuses(mcp_servers.iter(), store_mode, auth.as_ref()).await;
let local_environment = self.services.environment_manager.try_local_environment();
let local_stdio_availability = if self.services.environment_manager.has_local_environment()
{
codex_mcp::LocalStdioAvailability::Enabled
} else {
codex_mcp::LocalStdioAvailability::Disabled
};
let mcp_runtime_environment = match turn_context.environments.primary() {
Some(turn_environment) => McpRuntimeEnvironment::new(
Some(Arc::clone(&turn_environment.environment)),
local_environment,
local_stdio_availability,
turn_environment.cwd.to_path_buf(),
),
None => McpRuntimeEnvironment::new(
self.services
.environment_manager
.default_or_local_environment(),
local_environment,
local_stdio_availability,
#[allow(deprecated)]
turn_context.cwd.to_path_buf(),
),

View File

@@ -1046,16 +1046,24 @@ impl Session {
})?
.primary()
.cloned();
let local_environment = sess.services.environment_manager.try_local_environment();
let local_stdio_availability = if sess
.services
.environment_manager
.has_local_environment()
{
codex_mcp::LocalStdioAvailability::Enabled
} else {
codex_mcp::LocalStdioAvailability::Disabled
};
let mcp_runtime_environment = match turn_environment {
Some(turn_environment) => McpRuntimeEnvironment::new(
Some(Arc::clone(&turn_environment.environment)),
local_environment,
local_stdio_availability,
turn_environment.cwd.to_path_buf(),
),
None => McpRuntimeEnvironment::new(
sess.services.environment_manager.default_or_local_environment(),
local_environment,
local_stdio_availability,
session_configuration.cwd.to_path_buf(),
),
};

View File

@@ -238,6 +238,11 @@ impl EnvironmentManager {
self.local_environment.as_ref().map(Arc::clone)
}
/// Returns whether a local environment is configured.
pub fn has_local_environment(&self) -> bool {
self.local_environment.is_some()
}
/// Returns the default environment or local environment when either exists.
pub fn default_or_local_environment(&self) -> Option<Arc<Environment>> {
self.default_environment()