Compare commits

...

1 Commits

Author SHA1 Message Date
pakrym-oai
5e5165c7c0 Build MCP test server as part of build 2026-01-13 09:09:12 -08:00
19 changed files with 616 additions and 753 deletions

14
codex-rs/Cargo.lock generated
View File

@@ -1286,6 +1286,7 @@ dependencies = [
"codex-file-search",
"codex-git",
"codex-keyring-store",
"codex-mcp-test-server",
"codex-otel",
"codex-protocol",
"codex-rmcp-client",
@@ -1590,6 +1591,18 @@ dependencies = [
"wiremock",
]
[[package]]
name = "codex-mcp-test-server"
version = "0.0.0"
dependencies = [
"anyhow",
"axum",
"rmcp",
"serde",
"serde_json",
"tokio",
]
[[package]]
name = "codex-ollama"
version = "0.0.0"
@@ -1692,6 +1705,7 @@ dependencies = [
"anyhow",
"axum",
"codex-keyring-store",
"codex-mcp-test-server",
"codex-protocol",
"codex-utils-cargo-bin",
"dirs",

View File

@@ -26,6 +26,7 @@ members = [
"lmstudio",
"login",
"mcp-server",
"mcp-test-server",
"mcp-types",
"ollama",
"process-hardening",
@@ -84,6 +85,7 @@ codex-linux-sandbox = { path = "linux-sandbox" }
codex-lmstudio = { path = "lmstudio" }
codex-login = { path = "login" }
codex-mcp-server = { path = "mcp-server" }
codex-mcp-test-server = { path = "mcp-test-server" }
codex-ollama = { path = "ollama" }
codex-otel = { path = "otel" }
codex-process-hardening = { path = "process-hardening" }

View File

@@ -33,8 +33,6 @@ codex_rust_crate(
test_tags = ["no-sandbox"],
extra_binaries = [
"//codex-rs/linux-sandbox:codex-linux-sandbox",
"//codex-rs/rmcp-client:test_stdio_server",
"//codex-rs/rmcp-client:test_streamable_http_server",
"//codex-rs/cli:codex",
],
)

View File

@@ -32,6 +32,7 @@ codex-keyring-store = { workspace = true }
codex-otel = { workspace = true }
codex-protocol = { workspace = true }
codex-rmcp-client = { workspace = true }
codex-mcp-test-server = { workspace = true }
codex-utils-absolute-path = { workspace = true }
codex-utils-pty = { workspace = true }
codex-utils-readiness = { workspace = true }

View File

@@ -0,0 +1,4 @@
#[tokio::main]
async fn main() -> anyhow::Result<()> {
codex_mcp_test_server::run_stdio_server().await
}

View File

@@ -0,0 +1,4 @@
#[tokio::main]
async fn main() -> anyhow::Result<()> {
codex_mcp_test_server::run_streamable_http_server().await
}

View File

@@ -247,7 +247,8 @@ pub fn format_with_current_shell_display_non_login(command: &str) -> String {
}
pub fn stdio_server_bin() -> Result<String, CargoBinError> {
codex_utils_cargo_bin::cargo_bin("test_stdio_server").map(|p| p.to_string_lossy().to_string())
codex_utils_cargo_bin::cargo_bin("core_test_stdio_server")
.map(|p| p.to_string_lossy().to_string())
}
pub mod fs_wait {

View File

@@ -657,7 +657,7 @@ async fn streamable_http_tool_call_round_trip() -> anyhow::Result<()> {
.await;
let expected_env_value = "propagated-env-http";
let rmcp_http_server_bin = cargo_bin("test_streamable_http_server")?;
let rmcp_http_server_bin = cargo_bin("core_test_streamable_http_server")?;
let listener = TcpListener::bind("127.0.0.1:0")?;
let port = listener.local_addr()?.port();
@@ -819,7 +819,7 @@ async fn streamable_http_with_oauth_round_trip() -> anyhow::Result<()> {
let expected_token = "initial-access-token";
let client_id = "test-client-id";
let refresh_token = "initial-refresh-token";
let rmcp_http_server_bin = cargo_bin("test_streamable_http_server")?;
let rmcp_http_server_bin = cargo_bin("core_test_streamable_http_server")?;
let listener = TcpListener::bind("127.0.0.1:0")?;
let port = listener.local_addr()?.port();

View File

@@ -0,0 +1,6 @@
load("//:defs.bzl", "codex_rust_crate")
codex_rust_crate(
name = "mcp-test-server",
crate_name = "codex_mcp_test_server",
)

View File

@@ -0,0 +1,30 @@
[package]
name = "codex-mcp-test-server"
version.workspace = true
edition.workspace = true
license.workspace = true
[lints]
workspace = true
[dependencies]
anyhow = { workspace = true }
axum = { workspace = true, default-features = false, features = [
"http1",
"tokio",
] }
rmcp = { workspace = true, default-features = false, features = [
"base64",
"macros",
"schemars",
"server",
"transport-streamable-http-server",
] }
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
tokio = { workspace = true, features = [
"io-std",
"macros",
"net",
"rt-multi-thread",
] }

View File

@@ -0,0 +1,138 @@
use std::borrow::Cow;
use std::collections::HashMap;
use std::sync::Arc;
use anyhow::Result;
use rmcp::ErrorData as McpError;
use rmcp::ServiceExt;
use rmcp::handler::server::ServerHandler;
use rmcp::model::CallToolRequestParam;
use rmcp::model::CallToolResult;
use rmcp::model::JsonObject;
use rmcp::model::ListToolsResult;
use rmcp::model::PaginatedRequestParam;
use rmcp::model::ServerCapabilities;
use rmcp::model::ServerInfo;
use rmcp::model::Tool;
use serde::Deserialize;
use serde_json::json;
use tokio::task;
#[derive(Clone)]
struct EchoTestToolServer {
tools: Arc<Vec<Tool>>,
}
impl EchoTestToolServer {
fn new() -> Self {
let tools = vec![Self::echo_tool()];
Self {
tools: Arc::new(tools),
}
}
fn echo_tool() -> Tool {
#[expect(clippy::expect_used)]
let schema: JsonObject = serde_json::from_value(json!({
"type": "object",
"properties": {
"message": { "type": "string" },
"env_var": { "type": "string" }
},
"required": ["message"],
"additionalProperties": false
}))
.expect("echo tool schema should deserialize");
Tool::new(
Cow::Borrowed("echo"),
Cow::Borrowed("Echo back the provided message and include environment data."),
Arc::new(schema),
)
}
}
#[derive(Deserialize)]
struct EchoArgs {
message: String,
#[allow(dead_code)]
env_var: Option<String>,
}
impl ServerHandler for EchoTestToolServer {
fn get_info(&self) -> ServerInfo {
ServerInfo {
capabilities: ServerCapabilities::builder()
.enable_tools()
.enable_tool_list_changed()
.build(),
..ServerInfo::default()
}
}
fn list_tools(
&self,
_request: Option<PaginatedRequestParam>,
_context: rmcp::service::RequestContext<rmcp::service::RoleServer>,
) -> impl std::future::Future<Output = Result<ListToolsResult, McpError>> + Send + '_ {
let tools = self.tools.clone();
async move {
Ok(ListToolsResult {
tools: (*tools).clone(),
next_cursor: None,
meta: None,
})
}
}
async fn call_tool(
&self,
request: CallToolRequestParam,
_context: rmcp::service::RequestContext<rmcp::service::RoleServer>,
) -> Result<CallToolResult, McpError> {
match request.name.as_ref() {
"echo" => {
let args: EchoArgs = match request.arguments {
Some(arguments) => serde_json::from_value(serde_json::Value::Object(
arguments.into_iter().collect(),
))
.map_err(|err| McpError::invalid_params(err.to_string(), None))?,
None => {
return Err(McpError::invalid_params(
"missing arguments for echo tool",
None,
));
}
};
let message = args.message;
let env_snapshot: HashMap<String, String> = std::env::vars().collect();
let structured_content = json!({
"echo": message,
"env": env_snapshot.get("MCP_TEST_VALUE"),
});
Ok(CallToolResult {
content: Vec::new(),
structured_content: Some(structured_content),
is_error: Some(false),
meta: None,
})
}
other => Err(McpError::invalid_params(
format!("unknown tool: {other}"),
None,
)),
}
}
}
pub async fn run_echo_stdio_server() -> Result<()> {
eprintln!("starting rmcp test server");
let service = EchoTestToolServer::new();
let running = service.serve(crate::stdio()).await?;
running.waiting().await?;
task::yield_now().await;
Ok(())
}

View File

@@ -0,0 +1,12 @@
mod echo_server;
mod resource_server;
mod stdio_server;
mod streamable_http_server;
pub use echo_server::run_echo_stdio_server;
pub use stdio_server::run_stdio_server;
pub use streamable_http_server::run_streamable_http_server;
fn stdio() -> (tokio::io::Stdin, tokio::io::Stdout) {
(tokio::io::stdin(), tokio::io::stdout())
}

View File

@@ -0,0 +1,272 @@
use std::borrow::Cow;
use std::collections::HashMap;
use std::sync::Arc;
use rmcp::ErrorData as McpError;
use rmcp::handler::server::ServerHandler;
use rmcp::model::CallToolRequestParam;
use rmcp::model::CallToolResult;
use rmcp::model::JsonObject;
use rmcp::model::ListResourceTemplatesResult;
use rmcp::model::ListResourcesResult;
use rmcp::model::ListToolsResult;
use rmcp::model::PaginatedRequestParam;
use rmcp::model::RawResource;
use rmcp::model::RawResourceTemplate;
use rmcp::model::ReadResourceRequestParam;
use rmcp::model::ReadResourceResult;
use rmcp::model::Resource;
use rmcp::model::ResourceContents;
use rmcp::model::ResourceTemplate;
use rmcp::model::ServerCapabilities;
use rmcp::model::ServerInfo;
use rmcp::model::Tool;
use serde::Deserialize;
use serde_json::json;
const MEMO_URI: &str = "memo://codex/example-note";
const MEMO_CONTENT: &str = "This is a sample MCP resource served by the rmcp test server.";
#[derive(Clone)]
pub(crate) struct ResourceTestToolServer {
allow_image: bool,
tools: Arc<Vec<Tool>>,
resources: Arc<Vec<Resource>>,
resource_templates: Arc<Vec<ResourceTemplate>>,
}
impl ResourceTestToolServer {
pub(crate) fn new(allow_image: bool) -> Self {
let mut tools = vec![Self::echo_tool()];
if allow_image {
tools.push(Self::image_tool());
}
let resources = vec![Self::memo_resource()];
let resource_templates = vec![Self::memo_template()];
Self {
allow_image,
tools: Arc::new(tools),
resources: Arc::new(resources),
resource_templates: Arc::new(resource_templates),
}
}
fn echo_tool() -> Tool {
#[expect(clippy::expect_used)]
let schema: JsonObject = serde_json::from_value(json!({
"type": "object",
"properties": {
"message": { "type": "string" },
"env_var": { "type": "string" }
},
"required": ["message"],
"additionalProperties": false
}))
.expect("echo tool schema should deserialize");
Tool::new(
Cow::Borrowed("echo"),
Cow::Borrowed("Echo back the provided message and include environment data."),
Arc::new(schema),
)
}
fn image_tool() -> Tool {
#[expect(clippy::expect_used)]
let schema: JsonObject = serde_json::from_value(serde_json::json!({
"type": "object",
"properties": {},
"additionalProperties": false
}))
.expect("image tool schema should deserialize");
Tool::new(
Cow::Borrowed("image"),
Cow::Borrowed("Return a single image content block."),
Arc::new(schema),
)
}
fn memo_resource() -> Resource {
let raw = RawResource {
uri: MEMO_URI.to_string(),
name: "example-note".to_string(),
title: Some("Example Note".to_string()),
description: Some("A sample MCP resource exposed for integration tests.".to_string()),
mime_type: Some("text/plain".to_string()),
size: None,
icons: None,
meta: None,
};
Resource::new(raw, None)
}
fn memo_template() -> ResourceTemplate {
let raw = RawResourceTemplate {
uri_template: "memo://codex/{slug}".to_string(),
name: "codex-memo".to_string(),
title: Some("Codex Memo".to_string()),
description: Some(
"Template for memo://codex/{slug} resources used in tests.".to_string(),
),
mime_type: Some("text/plain".to_string()),
};
ResourceTemplate::new(raw, None)
}
fn memo_text() -> &'static str {
MEMO_CONTENT
}
}
#[derive(Deserialize)]
struct EchoArgs {
message: String,
#[allow(dead_code)]
env_var: Option<String>,
}
impl ServerHandler for ResourceTestToolServer {
fn get_info(&self) -> ServerInfo {
ServerInfo {
capabilities: ServerCapabilities::builder()
.enable_tools()
.enable_tool_list_changed()
.enable_resources()
.build(),
..ServerInfo::default()
}
}
fn list_tools(
&self,
_request: Option<PaginatedRequestParam>,
_context: rmcp::service::RequestContext<rmcp::service::RoleServer>,
) -> impl std::future::Future<Output = Result<ListToolsResult, McpError>> + Send + '_ {
let tools = self.tools.clone();
async move {
Ok(ListToolsResult {
tools: (*tools).clone(),
next_cursor: None,
meta: None,
})
}
}
fn list_resources(
&self,
_request: Option<PaginatedRequestParam>,
_context: rmcp::service::RequestContext<rmcp::service::RoleServer>,
) -> impl std::future::Future<Output = Result<ListResourcesResult, McpError>> + Send + '_ {
let resources = self.resources.clone();
async move {
Ok(ListResourcesResult {
resources: (*resources).clone(),
next_cursor: None,
meta: None,
})
}
}
async fn list_resource_templates(
&self,
_request: Option<PaginatedRequestParam>,
_context: rmcp::service::RequestContext<rmcp::service::RoleServer>,
) -> Result<ListResourceTemplatesResult, McpError> {
Ok(ListResourceTemplatesResult {
resource_templates: (*self.resource_templates).clone(),
next_cursor: None,
meta: None,
})
}
async fn read_resource(
&self,
ReadResourceRequestParam { uri }: ReadResourceRequestParam,
_context: rmcp::service::RequestContext<rmcp::service::RoleServer>,
) -> Result<ReadResourceResult, McpError> {
if uri == MEMO_URI {
Ok(ReadResourceResult {
contents: vec![ResourceContents::TextResourceContents {
uri,
mime_type: Some("text/plain".to_string()),
text: Self::memo_text().to_string(),
meta: None,
}],
})
} else {
Err(McpError::resource_not_found(
"resource_not_found",
Some(json!({ "uri": uri })),
))
}
}
async fn call_tool(
&self,
request: CallToolRequestParam,
_context: rmcp::service::RequestContext<rmcp::service::RoleServer>,
) -> Result<CallToolResult, McpError> {
match request.name.as_ref() {
"echo" => {
let args: EchoArgs = match request.arguments {
Some(arguments) => serde_json::from_value(serde_json::Value::Object(
arguments.into_iter().collect(),
))
.map_err(|err| McpError::invalid_params(err.to_string(), None))?,
None => {
return Err(McpError::invalid_params(
"missing arguments for echo tool",
None,
));
}
};
let message = args.message;
let env_snapshot: HashMap<String, String> = std::env::vars().collect();
let structured_content = json!({
"echo": format!("ECHOING: {message}"),
"env": env_snapshot.get("MCP_TEST_VALUE"),
});
Ok(CallToolResult {
content: Vec::new(),
structured_content: Some(structured_content),
is_error: Some(false),
meta: None,
})
}
"image" if self.allow_image => {
let data_url = std::env::var("MCP_TEST_IMAGE_DATA_URL").map_err(|_| {
McpError::invalid_params(
"missing MCP_TEST_IMAGE_DATA_URL env var for image tool",
None,
)
})?;
fn parse_data_url(url: &str) -> Option<(String, String)> {
let rest = url.strip_prefix("data:")?;
let (mime_and_opts, data) = rest.split_once(',')?;
let (mime, _opts) =
mime_and_opts.split_once(';').unwrap_or((mime_and_opts, ""));
Some((mime.to_string(), data.to_string()))
}
let (mime_type, data_b64) = parse_data_url(&data_url).ok_or_else(|| {
McpError::invalid_params(
format!("invalid data URL for image tool: {data_url}"),
None,
)
})?;
Ok(CallToolResult::success(vec![rmcp::model::Content::image(
data_b64, mime_type,
)]))
}
other => Err(McpError::invalid_params(
format!("unknown tool: {other}"),
None,
)),
}
}
}

View File

@@ -0,0 +1,15 @@
use anyhow::Result;
use rmcp::ServiceExt;
use tokio::task;
use crate::resource_server::ResourceTestToolServer;
pub async fn run_stdio_server() -> Result<()> {
eprintln!("starting rmcp test server");
let service = ResourceTestToolServer::new(true);
let running = service.serve(crate::stdio()).await?;
running.waiting().await?;
task::yield_now().await;
Ok(())
}

View File

@@ -0,0 +1,107 @@
use std::io::ErrorKind;
use std::net::SocketAddr;
use std::sync::Arc;
use anyhow::Result;
use axum::Router;
use axum::body::Body;
use axum::extract::State;
use axum::http::Request;
use axum::http::StatusCode;
use axum::http::header::AUTHORIZATION;
use axum::http::header::CONTENT_TYPE;
use axum::middleware;
use axum::middleware::Next;
use axum::response::Response;
use axum::routing::get;
use rmcp::transport::StreamableHttpServerConfig;
use rmcp::transport::StreamableHttpService;
use rmcp::transport::streamable_http_server::session::local::LocalSessionManager;
use serde_json::json;
use tokio::task;
use crate::resource_server::ResourceTestToolServer;
pub async fn run_streamable_http_server() -> Result<()> {
let bind_addr = parse_bind_addr()?;
let listener = match tokio::net::TcpListener::bind(&bind_addr).await {
Ok(listener) => listener,
Err(err) if err.kind() == ErrorKind::PermissionDenied => {
eprintln!(
"failed to bind to {bind_addr}: {err}. make sure the process has network access"
);
return Ok(());
}
Err(err) => return Err(err.into()),
};
eprintln!("starting rmcp streamable http test server on http://{bind_addr}/mcp");
let router = Router::new()
.route(
"/.well-known/oauth-authorization-server/mcp",
get({
move || async move {
let metadata_base = format!("http://{bind_addr}");
#[expect(clippy::expect_used)]
Response::builder()
.status(StatusCode::OK)
.header(CONTENT_TYPE, "application/json")
.body(Body::from(
serde_json::to_vec(&json!({
"authorization_endpoint": format!("{metadata_base}/oauth/authorize"),
"token_endpoint": format!("{metadata_base}/oauth/token"),
"scopes_supported": [""],
}))
.expect("failed to serialize metadata"),
))
.expect("valid metadata response")
}
}),
)
.nest_service(
"/mcp",
StreamableHttpService::new(
|| Ok(ResourceTestToolServer::new(false)),
Arc::new(LocalSessionManager::default()),
StreamableHttpServerConfig::default(),
),
);
let router = if let Ok(token) = std::env::var("MCP_EXPECT_BEARER") {
let expected = Arc::new(format!("Bearer {token}"));
router.layer(middleware::from_fn_with_state(expected, require_bearer))
} else {
router
};
axum::serve(listener, router).await?;
task::yield_now().await;
Ok(())
}
fn parse_bind_addr() -> Result<SocketAddr> {
let default_addr = "127.0.0.1:3920";
let bind_addr = std::env::var("MCP_STREAMABLE_HTTP_BIND_ADDR")
.or_else(|_| std::env::var("BIND_ADDR"))
.unwrap_or_else(|_| default_addr.to_string());
Ok(bind_addr.parse()?)
}
async fn require_bearer(
State(expected): State<Arc<String>>,
request: Request<Body>,
next: Next,
) -> Result<Response, StatusCode> {
if request.uri().path().contains("/.well-known/") {
return Ok(next.run(request).await);
}
if request
.headers()
.get(AUTHORIZATION)
.is_some_and(|value| value.as_bytes() == expected.as_bytes())
{
Ok(next.run(request).await)
} else {
Err(StatusCode::UNAUTHORIZED)
}
}

View File

@@ -14,6 +14,7 @@ axum = { workspace = true, default-features = false, features = [
"tokio",
] }
codex-keyring-store = { workspace = true }
codex-mcp-test-server = { workspace = true }
codex-protocol = { workspace = true }
dirs = { workspace = true }
futures = { workspace = true, default-features = false, features = ["std"] }

View File

@@ -1,143 +1,4 @@
use std::borrow::Cow;
use std::collections::HashMap;
use std::sync::Arc;
use rmcp::ErrorData as McpError;
use rmcp::ServiceExt;
use rmcp::handler::server::ServerHandler;
use rmcp::model::CallToolRequestParam;
use rmcp::model::CallToolResult;
use rmcp::model::JsonObject;
use rmcp::model::ListToolsResult;
use rmcp::model::PaginatedRequestParam;
use rmcp::model::ServerCapabilities;
use rmcp::model::ServerInfo;
use rmcp::model::Tool;
use serde::Deserialize;
use serde_json::json;
use tokio::task;
#[derive(Clone)]
struct TestToolServer {
tools: Arc<Vec<Tool>>,
}
pub fn stdio() -> (tokio::io::Stdin, tokio::io::Stdout) {
(tokio::io::stdin(), tokio::io::stdout())
}
impl TestToolServer {
fn new() -> Self {
let tools = vec![Self::echo_tool()];
Self {
tools: Arc::new(tools),
}
}
fn echo_tool() -> Tool {
#[expect(clippy::expect_used)]
let schema: JsonObject = serde_json::from_value(json!({
"type": "object",
"properties": {
"message": { "type": "string" },
"env_var": { "type": "string" }
},
"required": ["message"],
"additionalProperties": false
}))
.expect("echo tool schema should deserialize");
Tool::new(
Cow::Borrowed("echo"),
Cow::Borrowed("Echo back the provided message and include environment data."),
Arc::new(schema),
)
}
}
#[derive(Deserialize)]
struct EchoArgs {
message: String,
#[allow(dead_code)]
env_var: Option<String>,
}
impl ServerHandler for TestToolServer {
fn get_info(&self) -> ServerInfo {
ServerInfo {
capabilities: ServerCapabilities::builder()
.enable_tools()
.enable_tool_list_changed()
.build(),
..ServerInfo::default()
}
}
fn list_tools(
&self,
_request: Option<PaginatedRequestParam>,
_context: rmcp::service::RequestContext<rmcp::service::RoleServer>,
) -> impl std::future::Future<Output = Result<ListToolsResult, McpError>> + Send + '_ {
let tools = self.tools.clone();
async move {
Ok(ListToolsResult {
tools: (*tools).clone(),
next_cursor: None,
meta: None,
})
}
}
async fn call_tool(
&self,
request: CallToolRequestParam,
_context: rmcp::service::RequestContext<rmcp::service::RoleServer>,
) -> Result<CallToolResult, McpError> {
match request.name.as_ref() {
"echo" => {
let args: EchoArgs = match request.arguments {
Some(arguments) => serde_json::from_value(serde_json::Value::Object(
arguments.into_iter().collect(),
))
.map_err(|err| McpError::invalid_params(err.to_string(), None))?,
None => {
return Err(McpError::invalid_params(
"missing arguments for echo tool",
None,
));
}
};
let env_snapshot: HashMap<String, String> = std::env::vars().collect();
let structured_content = json!({
"echo": args.message,
"env": env_snapshot.get("MCP_TEST_VALUE"),
});
Ok(CallToolResult {
content: Vec::new(),
structured_content: Some(structured_content),
is_error: Some(false),
meta: None,
})
}
other => Err(McpError::invalid_params(
format!("unknown tool: {other}"),
None,
)),
}
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
eprintln!("starting rmcp test server");
// Run the server with STDIO transport. If the client disconnects we simply
// bubble up the error so the process exits.
let service = TestToolServer::new();
let running = service.serve(stdio()).await?;
// Wait for the client to finish interacting with the server.
running.waiting().await?;
// Drain background tasks to ensure clean shutdown.
task::yield_now().await;
Ok(())
async fn main() -> anyhow::Result<()> {
codex_mcp_test_server::run_echo_stdio_server().await
}

View File

@@ -1,287 +1,4 @@
use std::borrow::Cow;
use std::collections::HashMap;
use std::sync::Arc;
use rmcp::ErrorData as McpError;
use rmcp::ServiceExt;
use rmcp::handler::server::ServerHandler;
use rmcp::model::CallToolRequestParam;
use rmcp::model::CallToolResult;
use rmcp::model::JsonObject;
use rmcp::model::ListResourceTemplatesResult;
use rmcp::model::ListResourcesResult;
use rmcp::model::ListToolsResult;
use rmcp::model::PaginatedRequestParam;
use rmcp::model::RawResource;
use rmcp::model::RawResourceTemplate;
use rmcp::model::ReadResourceRequestParam;
use rmcp::model::ReadResourceResult;
use rmcp::model::Resource;
use rmcp::model::ResourceContents;
use rmcp::model::ResourceTemplate;
use rmcp::model::ServerCapabilities;
use rmcp::model::ServerInfo;
use rmcp::model::Tool;
use serde::Deserialize;
use serde_json::json;
use tokio::task;
#[derive(Clone)]
struct TestToolServer {
tools: Arc<Vec<Tool>>,
resources: Arc<Vec<Resource>>,
resource_templates: Arc<Vec<ResourceTemplate>>,
}
const MEMO_URI: &str = "memo://codex/example-note";
const MEMO_CONTENT: &str = "This is a sample MCP resource served by the rmcp test server.";
pub fn stdio() -> (tokio::io::Stdin, tokio::io::Stdout) {
(tokio::io::stdin(), tokio::io::stdout())
}
impl TestToolServer {
fn new() -> Self {
let tools = vec![Self::echo_tool(), Self::image_tool()];
let resources = vec![Self::memo_resource()];
let resource_templates = vec![Self::memo_template()];
Self {
tools: Arc::new(tools),
resources: Arc::new(resources),
resource_templates: Arc::new(resource_templates),
}
}
fn echo_tool() -> Tool {
#[expect(clippy::expect_used)]
let schema: JsonObject = serde_json::from_value(json!({
"type": "object",
"properties": {
"message": { "type": "string" },
"env_var": { "type": "string" }
},
"required": ["message"],
"additionalProperties": false
}))
.expect("echo tool schema should deserialize");
Tool::new(
Cow::Borrowed("echo"),
Cow::Borrowed("Echo back the provided message and include environment data."),
Arc::new(schema),
)
}
fn image_tool() -> Tool {
#[expect(clippy::expect_used)]
let schema: JsonObject = serde_json::from_value(serde_json::json!({
"type": "object",
"properties": {},
"additionalProperties": false
}))
.expect("image tool schema should deserialize");
Tool::new(
Cow::Borrowed("image"),
Cow::Borrowed("Return a single image content block."),
Arc::new(schema),
)
}
fn memo_resource() -> Resource {
let raw = RawResource {
uri: MEMO_URI.to_string(),
name: "example-note".to_string(),
title: Some("Example Note".to_string()),
description: Some("A sample MCP resource exposed for integration tests.".to_string()),
mime_type: Some("text/plain".to_string()),
size: None,
icons: None,
meta: None,
};
Resource::new(raw, None)
}
fn memo_template() -> ResourceTemplate {
let raw = RawResourceTemplate {
uri_template: "memo://codex/{slug}".to_string(),
name: "codex-memo".to_string(),
title: Some("Codex Memo".to_string()),
description: Some(
"Template for memo://codex/{slug} resources used in tests.".to_string(),
),
mime_type: Some("text/plain".to_string()),
};
ResourceTemplate::new(raw, None)
}
fn memo_text() -> &'static str {
MEMO_CONTENT
}
}
#[derive(Deserialize)]
struct EchoArgs {
message: String,
#[allow(dead_code)]
env_var: Option<String>,
}
impl ServerHandler for TestToolServer {
fn get_info(&self) -> ServerInfo {
ServerInfo {
capabilities: ServerCapabilities::builder()
.enable_tools()
.enable_tool_list_changed()
.enable_resources()
.build(),
..ServerInfo::default()
}
}
fn list_tools(
&self,
_request: Option<PaginatedRequestParam>,
_context: rmcp::service::RequestContext<rmcp::service::RoleServer>,
) -> impl std::future::Future<Output = Result<ListToolsResult, McpError>> + Send + '_ {
let tools = self.tools.clone();
async move {
Ok(ListToolsResult {
tools: (*tools).clone(),
next_cursor: None,
meta: None,
})
}
}
fn list_resources(
&self,
_request: Option<PaginatedRequestParam>,
_context: rmcp::service::RequestContext<rmcp::service::RoleServer>,
) -> impl std::future::Future<Output = Result<ListResourcesResult, McpError>> + Send + '_ {
let resources = self.resources.clone();
async move {
Ok(ListResourcesResult {
resources: (*resources).clone(),
next_cursor: None,
meta: None,
})
}
}
async fn list_resource_templates(
&self,
_request: Option<PaginatedRequestParam>,
_context: rmcp::service::RequestContext<rmcp::service::RoleServer>,
) -> Result<ListResourceTemplatesResult, McpError> {
Ok(ListResourceTemplatesResult {
resource_templates: (*self.resource_templates).clone(),
next_cursor: None,
meta: None,
})
}
async fn read_resource(
&self,
ReadResourceRequestParam { uri }: ReadResourceRequestParam,
_context: rmcp::service::RequestContext<rmcp::service::RoleServer>,
) -> Result<ReadResourceResult, McpError> {
if uri == MEMO_URI {
Ok(ReadResourceResult {
contents: vec![ResourceContents::TextResourceContents {
uri,
mime_type: Some("text/plain".to_string()),
text: Self::memo_text().to_string(),
meta: None,
}],
})
} else {
Err(McpError::resource_not_found(
"resource_not_found",
Some(json!({ "uri": uri })),
))
}
}
async fn call_tool(
&self,
request: CallToolRequestParam,
_context: rmcp::service::RequestContext<rmcp::service::RoleServer>,
) -> Result<CallToolResult, McpError> {
match request.name.as_ref() {
"echo" => {
let args: EchoArgs = match request.arguments {
Some(arguments) => serde_json::from_value(serde_json::Value::Object(
arguments.into_iter().collect(),
))
.map_err(|err| McpError::invalid_params(err.to_string(), None))?,
None => {
return Err(McpError::invalid_params(
"missing arguments for echo tool",
None,
));
}
};
let env_snapshot: HashMap<String, String> = std::env::vars().collect();
let structured_content = json!({
"echo": format!("ECHOING: {}", args.message),
"env": env_snapshot.get("MCP_TEST_VALUE"),
});
Ok(CallToolResult {
content: Vec::new(),
structured_content: Some(structured_content),
is_error: Some(false),
meta: None,
})
}
"image" => {
// Read a data URL (e.g. data:image/png;base64,AAA...) from env and convert to
// an MCP image content block. Tests set MCP_TEST_IMAGE_DATA_URL.
let data_url = std::env::var("MCP_TEST_IMAGE_DATA_URL").map_err(|_| {
McpError::invalid_params(
"missing MCP_TEST_IMAGE_DATA_URL env var for image tool",
None,
)
})?;
fn parse_data_url(url: &str) -> Option<(String, String)> {
let rest = url.strip_prefix("data:")?;
let (mime_and_opts, data) = rest.split_once(',')?;
let (mime, _opts) =
mime_and_opts.split_once(';').unwrap_or((mime_and_opts, ""));
Some((mime.to_string(), data.to_string()))
}
let (mime_type, data_b64) = parse_data_url(&data_url).ok_or_else(|| {
McpError::invalid_params(
format!("invalid data URL for image tool: {data_url}"),
None,
)
})?;
Ok(CallToolResult::success(vec![rmcp::model::Content::image(
data_b64, mime_type,
)]))
}
other => Err(McpError::invalid_params(
format!("unknown tool: {other}"),
None,
)),
}
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
eprintln!("starting rmcp test server");
// Run the server with STDIO transport. If the client disconnects we simply
// bubble up the error so the process exits.
let service = TestToolServer::new();
let running = service.serve(stdio()).await?;
// Wait for the client to finish interacting with the server.
running.waiting().await?;
// Drain background tasks to ensure clean shutdown.
task::yield_now().await;
Ok(())
async fn main() -> anyhow::Result<()> {
codex_mcp_test_server::run_stdio_server().await
}

View File

@@ -1,324 +1,4 @@
use std::borrow::Cow;
use std::collections::HashMap;
use std::io::ErrorKind;
use std::net::SocketAddr;
use std::sync::Arc;
use axum::Router;
use axum::body::Body;
use axum::extract::State;
use axum::http::Request;
use axum::http::StatusCode;
use axum::http::header::AUTHORIZATION;
use axum::http::header::CONTENT_TYPE;
use axum::middleware;
use axum::middleware::Next;
use axum::response::Response;
use axum::routing::get;
use rmcp::ErrorData as McpError;
use rmcp::handler::server::ServerHandler;
use rmcp::model::CallToolRequestParam;
use rmcp::model::CallToolResult;
use rmcp::model::JsonObject;
use rmcp::model::ListResourceTemplatesResult;
use rmcp::model::ListResourcesResult;
use rmcp::model::ListToolsResult;
use rmcp::model::PaginatedRequestParam;
use rmcp::model::RawResource;
use rmcp::model::RawResourceTemplate;
use rmcp::model::ReadResourceRequestParam;
use rmcp::model::ReadResourceResult;
use rmcp::model::Resource;
use rmcp::model::ResourceContents;
use rmcp::model::ResourceTemplate;
use rmcp::model::ServerCapabilities;
use rmcp::model::ServerInfo;
use rmcp::model::Tool;
use rmcp::transport::StreamableHttpServerConfig;
use rmcp::transport::StreamableHttpService;
use rmcp::transport::streamable_http_server::session::local::LocalSessionManager;
use serde::Deserialize;
use serde_json::json;
use tokio::task;
#[derive(Clone)]
struct TestToolServer {
tools: Arc<Vec<Tool>>,
resources: Arc<Vec<Resource>>,
resource_templates: Arc<Vec<ResourceTemplate>>,
}
const MEMO_URI: &str = "memo://codex/example-note";
const MEMO_CONTENT: &str = "This is a sample MCP resource served by the rmcp test server.";
impl TestToolServer {
fn new() -> Self {
let tools = vec![Self::echo_tool()];
let resources = vec![Self::memo_resource()];
let resource_templates = vec![Self::memo_template()];
Self {
tools: Arc::new(tools),
resources: Arc::new(resources),
resource_templates: Arc::new(resource_templates),
}
}
fn echo_tool() -> Tool {
#[expect(clippy::expect_used)]
let schema: JsonObject = serde_json::from_value(json!({
"type": "object",
"properties": {
"message": { "type": "string" },
"env_var": { "type": "string" }
},
"required": ["message"],
"additionalProperties": false
}))
.expect("echo tool schema should deserialize");
Tool::new(
Cow::Borrowed("echo"),
Cow::Borrowed("Echo back the provided message and include environment data."),
Arc::new(schema),
)
}
fn memo_resource() -> Resource {
let raw = RawResource {
uri: MEMO_URI.to_string(),
name: "example-note".to_string(),
title: Some("Example Note".to_string()),
description: Some("A sample MCP resource exposed for integration tests.".to_string()),
mime_type: Some("text/plain".to_string()),
size: None,
icons: None,
meta: None,
};
Resource::new(raw, None)
}
fn memo_template() -> ResourceTemplate {
let raw = RawResourceTemplate {
uri_template: "memo://codex/{slug}".to_string(),
name: "codex-memo".to_string(),
title: Some("Codex Memo".to_string()),
description: Some(
"Template for memo://codex/{slug} resources used in tests.".to_string(),
),
mime_type: Some("text/plain".to_string()),
};
ResourceTemplate::new(raw, None)
}
fn memo_text() -> &'static str {
MEMO_CONTENT
}
}
#[derive(Deserialize)]
struct EchoArgs {
message: String,
#[allow(dead_code)]
env_var: Option<String>,
}
impl ServerHandler for TestToolServer {
fn get_info(&self) -> ServerInfo {
ServerInfo {
capabilities: ServerCapabilities::builder()
.enable_tools()
.enable_tool_list_changed()
.enable_resources()
.build(),
..ServerInfo::default()
}
}
fn list_tools(
&self,
_request: Option<PaginatedRequestParam>,
_context: rmcp::service::RequestContext<rmcp::service::RoleServer>,
) -> impl std::future::Future<Output = Result<ListToolsResult, McpError>> + Send + '_ {
let tools = self.tools.clone();
async move {
Ok(ListToolsResult {
tools: (*tools).clone(),
next_cursor: None,
meta: None,
})
}
}
fn list_resources(
&self,
_request: Option<PaginatedRequestParam>,
_context: rmcp::service::RequestContext<rmcp::service::RoleServer>,
) -> impl std::future::Future<Output = Result<ListResourcesResult, McpError>> + Send + '_ {
let resources = self.resources.clone();
async move {
Ok(ListResourcesResult {
resources: (*resources).clone(),
next_cursor: None,
meta: None,
})
}
}
async fn list_resource_templates(
&self,
_request: Option<PaginatedRequestParam>,
_context: rmcp::service::RequestContext<rmcp::service::RoleServer>,
) -> Result<ListResourceTemplatesResult, McpError> {
Ok(ListResourceTemplatesResult {
resource_templates: (*self.resource_templates).clone(),
next_cursor: None,
meta: None,
})
}
async fn read_resource(
&self,
ReadResourceRequestParam { uri }: ReadResourceRequestParam,
_context: rmcp::service::RequestContext<rmcp::service::RoleServer>,
) -> Result<ReadResourceResult, McpError> {
if uri == MEMO_URI {
Ok(ReadResourceResult {
contents: vec![ResourceContents::TextResourceContents {
uri,
mime_type: Some("text/plain".to_string()),
text: Self::memo_text().to_string(),
meta: None,
}],
})
} else {
Err(McpError::resource_not_found(
"resource_not_found",
Some(json!({ "uri": uri })),
))
}
}
async fn call_tool(
&self,
request: CallToolRequestParam,
_context: rmcp::service::RequestContext<rmcp::service::RoleServer>,
) -> Result<CallToolResult, McpError> {
match request.name.as_ref() {
"echo" => {
let args: EchoArgs = match request.arguments {
Some(arguments) => serde_json::from_value(serde_json::Value::Object(
arguments.into_iter().collect(),
))
.map_err(|err| McpError::invalid_params(err.to_string(), None))?,
None => {
return Err(McpError::invalid_params(
"missing arguments for echo tool",
None,
));
}
};
let env_snapshot: HashMap<String, String> = std::env::vars().collect();
let structured_content = json!({
"echo": format!("ECHOING: {}", args.message),
"env": env_snapshot.get("MCP_TEST_VALUE"),
});
Ok(CallToolResult {
content: Vec::new(),
structured_content: Some(structured_content),
is_error: Some(false),
meta: None,
})
}
other => Err(McpError::invalid_params(
format!("unknown tool: {other}"),
None,
)),
}
}
}
fn parse_bind_addr() -> Result<SocketAddr, Box<dyn std::error::Error>> {
let default_addr = "127.0.0.1:3920";
let bind_addr = std::env::var("MCP_STREAMABLE_HTTP_BIND_ADDR")
.or_else(|_| std::env::var("BIND_ADDR"))
.unwrap_or_else(|_| default_addr.to_string());
Ok(bind_addr.parse()?)
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let bind_addr = parse_bind_addr()?;
let listener = match tokio::net::TcpListener::bind(&bind_addr).await {
Ok(listener) => listener,
Err(err) if err.kind() == ErrorKind::PermissionDenied => {
eprintln!(
"failed to bind to {bind_addr}: {err}. make sure the process has network access"
);
return Ok(());
}
Err(err) => return Err(err.into()),
};
eprintln!("starting rmcp streamable http test server on http://{bind_addr}/mcp");
let router = Router::new()
.route(
"/.well-known/oauth-authorization-server/mcp",
get({
move || async move {
let metadata_base = format!("http://{bind_addr}");
#[expect(clippy::expect_used)]
Response::builder()
.status(StatusCode::OK)
.header(CONTENT_TYPE, "application/json")
.body(Body::from(
serde_json::to_vec(&json!({
"authorization_endpoint": format!("{metadata_base}/oauth/authorize"),
"token_endpoint": format!("{metadata_base}/oauth/token"),
"scopes_supported": [""],
})).expect("failed to serialize metadata"),
))
.expect("valid metadata response")
}
}),
)
.nest_service(
"/mcp",
StreamableHttpService::new(
|| Ok(TestToolServer::new()),
Arc::new(LocalSessionManager::default()),
StreamableHttpServerConfig::default(),
),
);
let router = if let Ok(token) = std::env::var("MCP_EXPECT_BEARER") {
let expected = Arc::new(format!("Bearer {token}"));
router.layer(middleware::from_fn_with_state(expected, require_bearer))
} else {
router
};
axum::serve(listener, router).await?;
task::yield_now().await;
Ok(())
}
async fn require_bearer(
State(expected): State<Arc<String>>,
request: Request<Body>,
next: Next,
) -> Result<Response, StatusCode> {
if request.uri().path().contains("/.well-known/") {
return Ok(next.run(request).await);
}
if request
.headers()
.get(AUTHORIZATION)
.is_some_and(|value| value.as_bytes() == expected.as_bytes())
{
Ok(next.run(request).await)
} else {
Err(StatusCode::UNAUTHORIZED)
}
async fn main() -> anyhow::Result<()> {
codex_mcp_test_server::run_streamable_http_server().await
}