mirror of
https://github.com/openai/codex.git
synced 2026-04-29 17:06:51 +00:00
Make MCP resource read threadless (#18292)
## Summary Making thread id optional so that we can better cache resources for MCPs for connectors since their resource templates is universal and not particular to projects. - Make `mcpServer/resource/read` accept an optional `threadId` - Read resources from the current MCP config when no thread is supplied - Keep the existing thread-scoped path when `threadId` is present - Update the generated schemas, README, and integration coverage ## Testing - `just write-app-server-schema` - `just fmt` - `cargo test -p codex-app-server-protocol` - `cargo test -p codex-mcp` - `cargo test -p codex-app-server --test all mcp_resource` - `just fix -p codex-mcp` - `just fix -p codex-app-server-protocol` - `just fix -p codex-app-server`
This commit is contained in:
@@ -43,6 +43,7 @@ use rmcp::transport::StreamableHttpService;
|
||||
use rmcp::transport::streamable_http_server::session::local::LocalSessionManager;
|
||||
use tempfile::TempDir;
|
||||
use tokio::net::TcpListener;
|
||||
use tokio::task::JoinHandle;
|
||||
use tokio::time::timeout;
|
||||
|
||||
const DEFAULT_READ_TIMEOUT: Duration = Duration::from_secs(10);
|
||||
@@ -54,19 +55,7 @@ const TEST_RESOURCE_TEXT: &str = "Resource body from the MCP server.";
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn mcp_resource_read_returns_resource_contents() -> Result<()> {
|
||||
let responses_server = responses::start_mock_server().await;
|
||||
let listener = TcpListener::bind("127.0.0.1:0").await?;
|
||||
let addr = listener.local_addr()?;
|
||||
let apps_server_url = format!("http://{addr}");
|
||||
|
||||
let mcp_service = StreamableHttpService::new(
|
||||
move || Ok(ResourceAppsMcpServer),
|
||||
Arc::new(LocalSessionManager::default()),
|
||||
StreamableHttpServerConfig::default(),
|
||||
);
|
||||
let router = Router::new().nest_service("/api/codex/apps", mcp_service);
|
||||
let apps_server_handle = tokio::spawn(async move {
|
||||
let _ = axum::serve(listener, router).await;
|
||||
});
|
||||
let (apps_server_url, apps_server_handle) = start_resource_apps_mcp_server().await?;
|
||||
|
||||
let codex_home = TempDir::new()?;
|
||||
let responses_server_uri = responses_server.uri();
|
||||
@@ -121,7 +110,7 @@ stream_max_retries = 0
|
||||
|
||||
let read_request_id = mcp
|
||||
.send_mcp_resource_read_request(McpResourceReadParams {
|
||||
thread_id: thread.id,
|
||||
thread_id: Some(thread.id),
|
||||
server: "codex_apps".to_string(),
|
||||
uri: TEST_RESOURCE_URI.to_string(),
|
||||
})
|
||||
@@ -134,22 +123,59 @@ stream_max_retries = 0
|
||||
|
||||
assert_eq!(
|
||||
to_response::<McpResourceReadResponse>(read_response)?,
|
||||
McpResourceReadResponse {
|
||||
contents: vec![
|
||||
McpResourceContent::Text {
|
||||
uri: TEST_RESOURCE_URI.to_string(),
|
||||
mime_type: Some("text/markdown".to_string()),
|
||||
text: TEST_RESOURCE_TEXT.to_string(),
|
||||
meta: None,
|
||||
},
|
||||
McpResourceContent::Blob {
|
||||
uri: TEST_BLOB_RESOURCE_URI.to_string(),
|
||||
mime_type: Some("application/octet-stream".to_string()),
|
||||
blob: TEST_RESOURCE_BLOB.to_string(),
|
||||
meta: None,
|
||||
},
|
||||
],
|
||||
}
|
||||
expected_resource_read_response()
|
||||
);
|
||||
|
||||
apps_server_handle.abort();
|
||||
let _ = apps_server_handle.await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn mcp_resource_read_returns_resource_contents_without_thread() -> Result<()> {
|
||||
let (apps_server_url, apps_server_handle) = start_resource_apps_mcp_server().await?;
|
||||
|
||||
let codex_home = TempDir::new()?;
|
||||
std::fs::write(
|
||||
codex_home.path().join("config.toml"),
|
||||
format!(
|
||||
r#"
|
||||
chatgpt_base_url = "{apps_server_url}"
|
||||
mcp_oauth_credentials_store = "file"
|
||||
|
||||
[features]
|
||||
apps = true
|
||||
"#
|
||||
),
|
||||
)?;
|
||||
write_chatgpt_auth(
|
||||
codex_home.path(),
|
||||
ChatGptAuthFixture::new("chatgpt-token")
|
||||
.account_id("account-123")
|
||||
.chatgpt_user_id("user-123")
|
||||
.chatgpt_account_id("account-123"),
|
||||
AuthCredentialsStoreMode::File,
|
||||
)?;
|
||||
|
||||
let mut mcp = McpProcess::new(codex_home.path()).await?;
|
||||
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
|
||||
|
||||
let read_request_id = mcp
|
||||
.send_mcp_resource_read_request(McpResourceReadParams {
|
||||
thread_id: None,
|
||||
server: "codex_apps".to_string(),
|
||||
uri: TEST_RESOURCE_URI.to_string(),
|
||||
})
|
||||
.await?;
|
||||
let read_response: JSONRPCResponse = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(read_request_id)),
|
||||
)
|
||||
.await??;
|
||||
|
||||
assert_eq!(
|
||||
to_response::<McpResourceReadResponse>(read_response)?,
|
||||
expected_resource_read_response()
|
||||
);
|
||||
|
||||
apps_server_handle.abort();
|
||||
@@ -198,7 +224,7 @@ async fn mcp_resource_read_returns_error_for_unknown_thread() -> Result<()> {
|
||||
.request(ClientRequest::McpResourceRead {
|
||||
request_id: RequestId::Integer(1),
|
||||
params: McpResourceReadParams {
|
||||
thread_id: "00000000-0000-4000-8000-000000000000".to_string(),
|
||||
thread_id: Some("00000000-0000-4000-8000-000000000000".to_string()),
|
||||
server: "codex_apps".to_string(),
|
||||
uri: TEST_RESOURCE_URI.to_string(),
|
||||
},
|
||||
@@ -218,6 +244,43 @@ async fn mcp_resource_read_returns_error_for_unknown_thread() -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn start_resource_apps_mcp_server() -> Result<(String, JoinHandle<()>)> {
|
||||
let listener = TcpListener::bind("127.0.0.1:0").await?;
|
||||
let addr = listener.local_addr()?;
|
||||
let apps_server_url = format!("http://{addr}");
|
||||
|
||||
let mcp_service = StreamableHttpService::new(
|
||||
move || Ok(ResourceAppsMcpServer),
|
||||
Arc::new(LocalSessionManager::default()),
|
||||
StreamableHttpServerConfig::default(),
|
||||
);
|
||||
let router = Router::new().nest_service("/api/codex/apps", mcp_service);
|
||||
let apps_server_handle = tokio::spawn(async move {
|
||||
let _ = axum::serve(listener, router).await;
|
||||
});
|
||||
|
||||
Ok((apps_server_url, apps_server_handle))
|
||||
}
|
||||
|
||||
fn expected_resource_read_response() -> McpResourceReadResponse {
|
||||
McpResourceReadResponse {
|
||||
contents: vec![
|
||||
McpResourceContent::Text {
|
||||
uri: TEST_RESOURCE_URI.to_string(),
|
||||
mime_type: Some("text/markdown".to_string()),
|
||||
text: TEST_RESOURCE_TEXT.to_string(),
|
||||
meta: None,
|
||||
},
|
||||
McpResourceContent::Blob {
|
||||
uri: TEST_BLOB_RESOURCE_URI.to_string(),
|
||||
mime_type: Some("application/octet-stream".to_string()),
|
||||
blob: TEST_RESOURCE_BLOB.to_string(),
|
||||
meta: None,
|
||||
},
|
||||
],
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Default)]
|
||||
struct ResourceAppsMcpServer;
|
||||
|
||||
|
||||
Reference in New Issue
Block a user