[4/6] Abstract MCP stdio server launching (#18087)

## Summary
- Move local MCP stdio process startup behind a launcher trait.
- Preserve existing local stdio behavior while making transport creation
explicit.

## Stack
```text
o  #18027 [6/6] Fail exec client operations after disconnect
│
o  #18212 [5/6] Wire executor-backed MCP stdio
│
@  #18087 [4/6] Abstract MCP stdio server launching
│
o  #18020 [3/6] Add pushed exec process events
│
o  #18086 [2/6] Support piped stdin in exec process API
│
o  #18085 [1/6] Add MCP server environment config
│
o  main
```

---------

Co-authored-by: Codex <noreply@openai.com>
This commit is contained in:
Ahmed Ibrahim
2026-04-17 12:34:48 -07:00
committed by GitHub
parent d8b91f5fa1
commit 92cf90277d
6 changed files with 299 additions and 150 deletions

View File

@@ -50,8 +50,10 @@ use codex_protocol::protocol::McpStartupStatus;
use codex_protocol::protocol::McpStartupUpdateEvent;
use codex_protocol::protocol::SandboxPolicy;
use codex_rmcp_client::ElicitationResponse;
use codex_rmcp_client::LocalStdioServerLauncher;
use codex_rmcp_client::RmcpClient;
use codex_rmcp_client::SendElicitation;
use codex_rmcp_client::StdioServerLauncher;
use futures::future::BoxFuture;
use futures::future::FutureExt;
use futures::future::Shared;
@@ -1500,7 +1502,8 @@ async fn make_rmcp_client(
.map(|(key, value)| (key.into(), value.into()))
.collect::<HashMap<_, _>>()
});
RmcpClient::new_stdio_client(command_os, args_os, env_os, &env_vars, cwd)
let launcher = Arc::new(LocalStdioServerLauncher) as Arc<dyn StdioServerLauncher>;
RmcpClient::new_stdio_client(command_os, args_os, env_os, &env_vars, cwd, launcher)
.await
.map_err(|err| StartupOutcomeError::from(anyhow!(err)))
}

View File

@@ -5,6 +5,7 @@ mod oauth;
mod perform_oauth_login;
mod program_resolver;
mod rmcp_client;
mod stdio_server_launcher;
mod utils;
pub use auth_status::StreamableHttpOAuthDiscovery;
@@ -29,3 +30,5 @@ pub use rmcp_client::ListToolsWithConnectorIdResult;
pub use rmcp_client::RmcpClient;
pub use rmcp_client::SendElicitation;
pub use rmcp_client::ToolWithConnectorId;
pub use stdio_server_launcher::LocalStdioServerLauncher;
pub use stdio_server_launcher::StdioServerLauncher;

View File

@@ -4,7 +4,6 @@ use std::ffi::OsString;
use std::future::Future;
use std::io;
use std::path::PathBuf;
use std::process::Stdio;
use std::sync::Arc;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
@@ -52,7 +51,6 @@ use rmcp::transport::StreamableHttpClientTransport;
use rmcp::transport::auth::AuthClient;
use rmcp::transport::auth::AuthError;
use rmcp::transport::auth::OAuthState;
use rmcp::transport::child_process::TokioChildProcess;
use rmcp::transport::streamable_http_client::AuthRequiredError;
use rmcp::transport::streamable_http_client::StreamableHttpClient;
use rmcp::transport::streamable_http_client::StreamableHttpClientTransportConfig;
@@ -63,23 +61,20 @@ use serde::Serialize;
use serde_json::Value;
use sse_stream::Sse;
use sse_stream::SseStream;
use tokio::io::AsyncBufReadExt;
use tokio::io::BufReader;
use tokio::process::Command;
use tokio::sync::Mutex;
use tokio::sync::watch;
use tokio::time;
use tracing::info;
use tracing::warn;
use crate::elicitation_client_service::ElicitationClientService;
use crate::load_oauth_tokens;
use crate::oauth::OAuthPersistor;
use crate::oauth::StoredOAuthTokens;
use crate::program_resolver;
use crate::stdio_server_launcher::StdioServerCommand;
use crate::stdio_server_launcher::StdioServerLauncher;
use crate::stdio_server_launcher::StdioServerTransport;
use crate::utils::apply_default_headers;
use crate::utils::build_default_headers;
use crate::utils::create_env_for_mcp_server;
use codex_config::types::OAuthCredentialsStoreMode;
const EVENT_STREAM_MIME_TYPE: &str = "text/event-stream";
@@ -307,9 +302,8 @@ impl StreamableHttpClient for StreamableHttpResponseClient {
}
enum PendingTransport {
ChildProcess {
transport: TokioChildProcess,
process_group_guard: Option<ProcessGroupGuard>,
Stdio {
transport: StdioServerTransport,
},
StreamableHttp {
transport: StreamableHttpClientTransport<StreamableHttpResponseClient>,
@@ -325,79 +319,16 @@ enum ClientState {
transport: Option<PendingTransport>,
},
Ready {
_process_group_guard: Option<ProcessGroupGuard>,
service: Arc<RunningService<RoleClient, ElicitationClientService>>,
oauth: Option<OAuthPersistor>,
},
}
#[cfg(unix)]
const PROCESS_GROUP_TERM_GRACE_PERIOD: Duration = Duration::from_secs(2);
#[cfg(unix)]
struct ProcessGroupGuard {
process_group_id: u32,
}
#[cfg(not(unix))]
struct ProcessGroupGuard;
impl ProcessGroupGuard {
fn new(process_group_id: u32) -> Self {
#[cfg(unix)]
{
Self { process_group_id }
}
#[cfg(not(unix))]
{
let _ = process_group_id;
Self
}
}
#[cfg(unix)]
fn maybe_terminate_process_group(&self) {
let process_group_id = self.process_group_id;
let should_escalate =
match codex_utils_pty::process_group::terminate_process_group(process_group_id) {
Ok(exists) => exists,
Err(error) => {
warn!("Failed to terminate MCP process group {process_group_id}: {error}");
false
}
};
if should_escalate {
std::thread::spawn(move || {
std::thread::sleep(PROCESS_GROUP_TERM_GRACE_PERIOD);
if let Err(error) =
codex_utils_pty::process_group::kill_process_group(process_group_id)
{
warn!("Failed to kill MCP process group {process_group_id}: {error}");
}
});
}
}
#[cfg(not(unix))]
fn maybe_terminate_process_group(&self) {}
}
impl Drop for ProcessGroupGuard {
fn drop(&mut self) {
if cfg!(unix) {
self.maybe_terminate_process_group();
}
}
}
#[derive(Clone)]
enum TransportRecipe {
Stdio {
program: OsString,
args: Vec<OsString>,
env: Option<HashMap<OsString, OsString>>,
env_vars: Vec<String>,
cwd: Option<PathBuf>,
command: StdioServerCommand,
launcher: Arc<dyn StdioServerLauncher>,
},
StreamableHttp {
server_name: String,
@@ -574,13 +505,11 @@ impl RmcpClient {
env: Option<HashMap<OsString, OsString>>,
env_vars: &[String],
cwd: Option<PathBuf>,
launcher: Arc<dyn StdioServerLauncher>,
) -> io::Result<Self> {
let transport_recipe = TransportRecipe::Stdio {
program,
args,
env,
env_vars: env_vars.to_vec(),
cwd,
command: StdioServerCommand::new(program, args, env, env_vars.to_vec(), cwd),
launcher,
};
let transport = Self::create_pending_transport(&transport_recipe)
.await
@@ -650,7 +579,7 @@ impl RmcpClient {
}
};
let (service, oauth_persistor, process_group_guard) =
let (service, oauth_persistor) =
Self::connect_pending_transport(pending_transport, client_service.clone(), timeout)
.await?;
@@ -671,7 +600,6 @@ impl RmcpClient {
{
let mut guard = self.state.lock().await;
*guard = ClientState::Ready {
_process_group_guard: process_group_guard,
service,
oauth: oauth_persistor.clone(),
};
@@ -954,60 +882,9 @@ impl RmcpClient {
transport_recipe: &TransportRecipe,
) -> Result<PendingTransport> {
match transport_recipe {
TransportRecipe::Stdio {
program,
args,
env,
env_vars,
cwd,
} => {
let program_name = program.to_string_lossy().into_owned();
let envs = create_env_for_mcp_server(env.clone(), env_vars);
let resolved_program = program_resolver::resolve(program.clone(), &envs)?;
let mut command = Command::new(resolved_program);
command
.kill_on_drop(true)
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.env_clear()
.envs(envs)
.args(args);
#[cfg(unix)]
command.process_group(0);
if let Some(cwd) = cwd {
command.current_dir(cwd);
}
let (transport, stderr) = TokioChildProcess::builder(command)
.stderr(Stdio::piped())
.spawn()?;
let process_group_guard = transport.id().map(ProcessGroupGuard::new);
if let Some(stderr) = stderr {
tokio::spawn(async move {
let mut reader = BufReader::new(stderr).lines();
loop {
match reader.next_line().await {
Ok(Some(line)) => {
info!("MCP server stderr ({program_name}): {line}");
}
Ok(None) => break,
Err(error) => {
warn!(
"Failed to read MCP server stderr ({program_name}): {error}"
);
break;
}
}
}
});
}
Ok(PendingTransport::ChildProcess {
transport,
process_group_guard,
})
TransportRecipe::Stdio { command, launcher } => {
let transport = launcher.launch(command.clone()).await?;
Ok(PendingTransport::Stdio { transport })
}
TransportRecipe::StreamableHttp {
server_name,
@@ -1101,21 +978,15 @@ impl RmcpClient {
) -> Result<(
Arc<RunningService<RoleClient, ElicitationClientService>>,
Option<OAuthPersistor>,
Option<ProcessGroupGuard>,
)> {
let (transport, oauth_persistor, process_group_guard) = match pending_transport {
PendingTransport::ChildProcess {
transport,
process_group_guard,
} => (
let (transport, oauth_persistor) = match pending_transport {
PendingTransport::Stdio { transport } => (
service::serve_client(client_service, transport).boxed(),
None,
process_group_guard,
),
PendingTransport::StreamableHttp { transport } => (
service::serve_client(client_service, transport).boxed(),
None,
None,
),
PendingTransport::StreamableHttpWithOAuth {
transport,
@@ -1123,7 +994,6 @@ impl RmcpClient {
} => (
service::serve_client(client_service, transport).boxed(),
Some(oauth_persistor),
None,
),
};
@@ -1137,7 +1007,7 @@ impl RmcpClient {
.map_err(|err| anyhow!("handshaking with MCP server failed: {err}"))?,
};
Ok((Arc::new(service), oauth_persistor, process_group_guard))
Ok((Arc::new(service), oauth_persistor))
}
async fn run_service_operation<T, F, Fut>(
@@ -1249,7 +1119,7 @@ impl RmcpClient {
.clone()
.ok_or_else(|| anyhow!("MCP client cannot recover before initialize succeeds"))?;
let pending_transport = Self::create_pending_transport(&self.transport_recipe).await?;
let (service, oauth_persistor, process_group_guard) = Self::connect_pending_transport(
let (service, oauth_persistor) = Self::connect_pending_transport(
pending_transport,
initialize_context.client_service,
initialize_context.timeout,
@@ -1259,7 +1129,6 @@ impl RmcpClient {
{
let mut guard = self.state.lock().await;
*guard = ClientState::Ready {
_process_group_guard: process_group_guard,
service,
oauth: oauth_persistor.clone(),
};

View File

@@ -0,0 +1,268 @@
//! Launch MCP stdio servers and return the transport rmcp should use.
//!
//! This module owns the "where does the server process run?" boundary for
//! stdio MCP servers. In this PR there is only the local launcher, which keeps
//! the existing behavior: the orchestrator starts the configured command and
//! rmcp talks to the child process through local stdin/stdout pipes.
//!
//! Later stack entries add an executor-backed launcher without changing
//! `RmcpClient`'s MCP lifecycle code.
use std::collections::HashMap;
use std::ffi::OsString;
use std::future::Future;
use std::io;
use std::path::PathBuf;
use std::process::Stdio;
#[cfg(unix)]
use std::thread::sleep;
#[cfg(unix)]
use std::thread::spawn;
#[cfg(unix)]
use std::time::Duration;
#[cfg(unix)]
use codex_utils_pty::process_group::kill_process_group;
#[cfg(unix)]
use codex_utils_pty::process_group::terminate_process_group;
use futures::FutureExt;
use futures::future::BoxFuture;
use rmcp::service::RoleClient;
use rmcp::service::RxJsonRpcMessage;
use rmcp::service::TxJsonRpcMessage;
use rmcp::transport::Transport;
use rmcp::transport::child_process::TokioChildProcess;
use tokio::io::AsyncBufReadExt;
use tokio::io::BufReader;
use tokio::process::Command;
use tracing::info;
use tracing::warn;
use crate::program_resolver;
use crate::utils::create_env_for_mcp_server;
// General purpose public code.
/// Launches an MCP stdio server and returns the transport for rmcp.
///
/// This trait is the boundary between MCP lifecycle code and process placement.
/// `RmcpClient` owns MCP operations such as `initialize` and `tools/list`; the
/// launcher owns starting the configured command and producing an rmcp
/// [`Transport`] over the server's stdin/stdout bytes.
pub trait StdioServerLauncher: private::Sealed + Send + Sync {
/// Start the configured stdio server and return its rmcp-facing transport.
fn launch(
&self,
command: StdioServerCommand,
) -> BoxFuture<'static, io::Result<StdioServerTransport>>;
}
/// Command-line process shape shared by stdio server launchers.
#[derive(Clone)]
pub struct StdioServerCommand {
program: OsString,
args: Vec<OsString>,
env: Option<HashMap<OsString, OsString>>,
env_vars: Vec<String>,
cwd: Option<PathBuf>,
}
/// Client-side rmcp transport for a launched MCP stdio server.
///
/// The concrete process placement stays private to this module. `RmcpClient`
/// only sees the standard rmcp transport abstraction and can pass this value
/// directly to `rmcp::service::serve_client`.
pub struct StdioServerTransport {
inner: StdioServerTransportInner,
_process_group_guard: Option<ProcessGroupGuard>,
}
enum StdioServerTransportInner {
Local(TokioChildProcess),
}
impl Transport<RoleClient> for StdioServerTransport {
type Error = io::Error;
fn send(
&mut self,
item: TxJsonRpcMessage<RoleClient>,
) -> impl Future<Output = std::result::Result<(), Self::Error>> + Send + 'static {
match &mut self.inner {
StdioServerTransportInner::Local(transport) => transport.send(item).boxed(),
}
}
fn receive(&mut self) -> impl Future<Output = Option<RxJsonRpcMessage<RoleClient>>> + Send {
match &mut self.inner {
StdioServerTransportInner::Local(transport) => transport.receive().boxed(),
}
}
async fn close(&mut self) -> std::result::Result<(), Self::Error> {
match &mut self.inner {
StdioServerTransportInner::Local(transport) => transport.close().await,
}
}
}
impl StdioServerCommand {
/// Build the stdio process parameters before choosing where the process
/// runs.
pub(super) fn new(
program: OsString,
args: Vec<OsString>,
env: Option<HashMap<OsString, OsString>>,
env_vars: Vec<String>,
cwd: Option<PathBuf>,
) -> Self {
Self {
program,
args,
env,
env_vars,
cwd,
}
}
}
// Local public implementation.
/// Starts MCP stdio servers as local child processes.
///
/// This is the existing behavior for local MCP servers: the orchestrator
/// process spawns the configured command and rmcp talks to the child's local
/// stdin/stdout pipes directly.
#[derive(Clone)]
pub struct LocalStdioServerLauncher;
impl StdioServerLauncher for LocalStdioServerLauncher {
fn launch(
&self,
command: StdioServerCommand,
) -> BoxFuture<'static, io::Result<StdioServerTransport>> {
async move { Self::launch_server(command) }.boxed()
}
}
// Local private implementation.
#[cfg(unix)]
const PROCESS_GROUP_TERM_GRACE_PERIOD: Duration = Duration::from_secs(2);
#[cfg(unix)]
struct ProcessGroupGuard {
process_group_id: u32,
}
#[cfg(not(unix))]
struct ProcessGroupGuard;
mod private {
pub trait Sealed {}
}
impl private::Sealed for LocalStdioServerLauncher {}
impl LocalStdioServerLauncher {
fn launch_server(command: StdioServerCommand) -> io::Result<StdioServerTransport> {
let StdioServerCommand {
program,
args,
env,
env_vars,
cwd,
} = command;
let program_name = program.to_string_lossy().into_owned();
let envs = create_env_for_mcp_server(env, &env_vars);
let resolved_program =
program_resolver::resolve(program, &envs).map_err(io::Error::other)?;
let mut command = Command::new(resolved_program);
command
.kill_on_drop(true)
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.env_clear()
.envs(envs)
.args(args);
#[cfg(unix)]
command.process_group(0);
if let Some(cwd) = cwd {
command.current_dir(cwd);
}
let (transport, stderr) = TokioChildProcess::builder(command)
.stderr(Stdio::piped())
.spawn()?;
let process_group_guard = transport.id().map(ProcessGroupGuard::new);
if let Some(stderr) = stderr {
tokio::spawn(async move {
let mut reader = BufReader::new(stderr).lines();
loop {
match reader.next_line().await {
Ok(Some(line)) => {
info!("MCP server stderr ({program_name}): {line}");
}
Ok(None) => break,
Err(error) => {
warn!("Failed to read MCP server stderr ({program_name}): {error}");
break;
}
}
}
});
}
Ok(StdioServerTransport {
inner: StdioServerTransportInner::Local(transport),
_process_group_guard: process_group_guard,
})
}
}
impl ProcessGroupGuard {
fn new(process_group_id: u32) -> Self {
#[cfg(unix)]
{
Self { process_group_id }
}
#[cfg(not(unix))]
{
let _ = process_group_id;
Self
}
}
#[cfg(unix)]
fn maybe_terminate_process_group(&self) {
let process_group_id = self.process_group_id;
let should_escalate = match terminate_process_group(process_group_id) {
Ok(exists) => exists,
Err(error) => {
warn!("Failed to terminate MCP process group {process_group_id}: {error}");
false
}
};
if should_escalate {
spawn(move || {
sleep(PROCESS_GROUP_TERM_GRACE_PERIOD);
if let Err(error) = kill_process_group(process_group_id) {
warn!("Failed to kill MCP process group {process_group_id}: {error}");
}
});
}
}
#[cfg(not(unix))]
fn maybe_terminate_process_group(&self) {}
}
impl Drop for ProcessGroupGuard {
fn drop(&mut self) {
if cfg!(unix) {
self.maybe_terminate_process_group();
}
}
}

View File

@@ -4,10 +4,12 @@ use std::collections::HashMap;
use std::ffi::OsString;
use std::fs;
use std::path::Path;
use std::sync::Arc;
use std::time::Duration;
use anyhow::Context;
use anyhow::Result;
use codex_rmcp_client::LocalStdioServerLauncher;
use codex_rmcp_client::RmcpClient;
fn process_exists(pid: u32) -> bool {
@@ -78,6 +80,7 @@ async fn drop_kills_wrapper_process_group() -> Result<()> {
)])),
&[],
/*cwd*/ None,
Arc::new(LocalStdioServerLauncher),
)
.await?;

View File

@@ -1,9 +1,11 @@
use std::ffi::OsString;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use codex_rmcp_client::ElicitationAction;
use codex_rmcp_client::ElicitationResponse;
use codex_rmcp_client::LocalStdioServerLauncher;
use codex_rmcp_client::RmcpClient;
use codex_utils_cargo_bin::CargoBinError;
use futures::FutureExt as _;
@@ -61,6 +63,7 @@ async fn rmcp_client_can_list_and_read_resources() -> anyhow::Result<()> {
/*env*/ None,
&[],
/*cwd*/ None,
Arc::new(LocalStdioServerLauncher),
)
.await?;