Reorder MCP stdio launcher code

Keep the shared launcher API before the local implementation and move local launch helpers onto LocalStdioServerLauncher.\n\nCo-authored-by: Codex <noreply@openai.com>
This commit is contained in:
Ahmed Ibrahim
2026-04-15 14:55:29 -07:00
parent 6e1452d0db
commit d94930b3fa

View File

@@ -13,8 +13,11 @@ use std::ffi::OsString;
use std::io;
use std::path::PathBuf;
use std::process::Stdio;
#[cfg(unix)]
use std::thread::sleep;
#[cfg(unix)]
use std::thread::spawn;
#[cfg(unix)]
use std::time::Duration;
#[cfg(unix)]
@@ -33,6 +36,8 @@ use tracing::warn;
use crate::program_resolver;
use crate::utils::create_env_for_mcp_server;
// General purpose public code.
/// Launches an MCP stdio server and returns the byte transport for rmcp.
///
/// This trait is the boundary between MCP lifecycle code and process placement.
@@ -47,14 +52,6 @@ pub trait StdioServerLauncher: private::Sealed + Send + Sync {
) -> BoxFuture<'static, io::Result<LaunchedStdioServer>>;
}
/// Starts MCP stdio servers as local child processes.
///
/// This is the existing behavior for local MCP servers: the orchestrator
/// process spawns the configured command and rmcp talks to the child's local
/// stdin/stdout pipes directly.
#[derive(Clone)]
pub struct LocalStdioServerLauncher;
/// Command-line process shape shared by stdio server launchers.
#[derive(Clone)]
pub struct StdioServerCommand {
@@ -81,23 +78,6 @@ pub(super) enum LaunchedStdioServerTransport {
},
}
#[cfg(unix)]
const PROCESS_GROUP_TERM_GRACE_PERIOD: Duration = Duration::from_secs(2);
#[cfg(unix)]
pub(super) struct ProcessGroupGuard {
process_group_id: u32,
}
#[cfg(not(unix))]
pub(super) struct ProcessGroupGuard;
mod private {
pub trait Sealed {}
}
impl private::Sealed for LocalStdioServerLauncher {}
impl StdioServerCommand {
/// Build the stdio process parameters before choosing where the process
/// runs.
@@ -118,70 +98,102 @@ impl StdioServerCommand {
}
}
// Local public implementation.
/// Starts MCP stdio servers as local child processes.
///
/// This is the existing behavior for local MCP servers: the orchestrator
/// process spawns the configured command and rmcp talks to the child's local
/// stdin/stdout pipes directly.
#[derive(Clone)]
pub struct LocalStdioServerLauncher;
impl StdioServerLauncher for LocalStdioServerLauncher {
fn launch(
&self,
command: StdioServerCommand,
) -> BoxFuture<'static, io::Result<LaunchedStdioServer>> {
async move { launch_stdio_server_locally(command) }.boxed()
async move { Self::launch_server(command) }.boxed()
}
}
fn launch_stdio_server_locally(command: StdioServerCommand) -> io::Result<LaunchedStdioServer> {
let StdioServerCommand {
program,
args,
env,
env_vars,
cwd,
} = command;
let program_name = program.to_string_lossy().into_owned();
let envs = create_env_for_mcp_server(env, &env_vars);
let resolved_program = program_resolver::resolve(program, &envs).map_err(io::Error::other)?;
// Local private implementation.
let mut command = Command::new(resolved_program);
command
.kill_on_drop(true)
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.env_clear()
.envs(envs)
.args(args);
#[cfg(unix)]
command.process_group(0);
if let Some(cwd) = cwd {
command.current_dir(cwd);
}
#[cfg(unix)]
const PROCESS_GROUP_TERM_GRACE_PERIOD: Duration = Duration::from_secs(2);
let (transport, stderr) = TokioChildProcess::builder(command)
.stderr(Stdio::piped())
.spawn()?;
let process_group_guard = transport.id().map(ProcessGroupGuard::new);
#[cfg(unix)]
pub(super) struct ProcessGroupGuard {
process_group_id: u32,
}
if let Some(stderr) = stderr {
tokio::spawn(async move {
let mut reader = BufReader::new(stderr).lines();
loop {
match reader.next_line().await {
Ok(Some(line)) => {
info!("MCP server stderr ({program_name}): {line}");
}
Ok(None) => break,
Err(error) => {
warn!("Failed to read MCP server stderr ({program_name}): {error}");
break;
#[cfg(not(unix))]
pub(super) struct ProcessGroupGuard;
mod private {
pub trait Sealed {}
}
impl private::Sealed for LocalStdioServerLauncher {}
impl LocalStdioServerLauncher {
fn launch_server(command: StdioServerCommand) -> io::Result<LaunchedStdioServer> {
let StdioServerCommand {
program,
args,
env,
env_vars,
cwd,
} = command;
let program_name = program.to_string_lossy().into_owned();
let envs = create_env_for_mcp_server(env, &env_vars);
let resolved_program =
program_resolver::resolve(program, &envs).map_err(io::Error::other)?;
let mut command = Command::new(resolved_program);
command
.kill_on_drop(true)
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.env_clear()
.envs(envs)
.args(args);
#[cfg(unix)]
command.process_group(0);
if let Some(cwd) = cwd {
command.current_dir(cwd);
}
let (transport, stderr) = TokioChildProcess::builder(command)
.stderr(Stdio::piped())
.spawn()?;
let process_group_guard = transport.id().map(ProcessGroupGuard::new);
if let Some(stderr) = stderr {
tokio::spawn(async move {
let mut reader = BufReader::new(stderr).lines();
loop {
match reader.next_line().await {
Ok(Some(line)) => {
info!("MCP server stderr ({program_name}): {line}");
}
Ok(None) => break,
Err(error) => {
warn!("Failed to read MCP server stderr ({program_name}): {error}");
break;
}
}
}
}
});
}
});
}
Ok(LaunchedStdioServer {
transport: LaunchedStdioServerTransport::Local {
transport,
process_group_guard,
},
})
Ok(LaunchedStdioServer {
transport: LaunchedStdioServerTransport::Local {
transport,
process_group_guard,
},
})
}
}
impl ProcessGroupGuard {