Add generic RPC server glue to exec-server stub

Introduce API-agnostic server envelope parsing/encoding and a tiny method-registration router in the initialize-only exec-server slice.

Co-authored-by: Codex <noreply@openai.com>
This commit is contained in:
starr-openai
2026-03-18 13:59:41 -07:00
parent 76071974bb
commit 16ff474725
3 changed files with 185 additions and 67 deletions

View File

@@ -1,5 +1,6 @@
mod client;
mod protocol;
mod rpc;
mod server;
mod server_process;

View File

@@ -0,0 +1,128 @@
use std::collections::HashMap;
use codex_app_server_protocol::JSONRPCError;
use codex_app_server_protocol::JSONRPCErrorError;
use codex_app_server_protocol::JSONRPCMessage;
use codex_app_server_protocol::JSONRPCNotification;
use codex_app_server_protocol::JSONRPCRequest;
use codex_app_server_protocol::JSONRPCResponse;
use codex_app_server_protocol::RequestId;
use serde_json::Value;
#[derive(Debug, Clone, PartialEq)]
pub(crate) enum RpcServerInboundMessage {
Request(JSONRPCRequest),
Notification(JSONRPCNotification),
}
#[derive(Debug, Clone, PartialEq)]
pub(crate) enum RpcServerOutboundMessage {
Response {
request_id: RequestId,
result: Value,
},
Error {
request_id: RequestId,
error: JSONRPCErrorError,
},
}
type RequestRoute<I> = Box<dyn Fn(JSONRPCRequest) -> I + Send + Sync>;
type NotificationRoute<I> = Box<dyn Fn(JSONRPCNotification) -> Result<I, String> + Send + Sync>;
pub(crate) struct RpcRouter<I> {
request_routes: HashMap<&'static str, RequestRoute<I>>,
notification_routes: HashMap<&'static str, NotificationRoute<I>>,
}
impl<I> Default for RpcRouter<I> {
fn default() -> Self {
Self {
request_routes: HashMap::new(),
notification_routes: HashMap::new(),
}
}
}
impl<I> RpcRouter<I> {
pub(crate) fn new() -> Self {
Self::default()
}
pub(crate) fn raw_request<F>(&mut self, method: &'static str, route: F)
where
F: Fn(JSONRPCRequest) -> I + Send + Sync + 'static,
{
self.request_routes.insert(method, Box::new(route));
}
pub(crate) fn notification<F>(&mut self, method: &'static str, route: F)
where
F: Fn(JSONRPCNotification) -> Result<I, String> + Send + Sync + 'static,
{
self.notification_routes.insert(method, Box::new(route));
}
pub(crate) fn route_message(
&self,
message: JSONRPCMessage,
unknown_request: impl FnOnce(JSONRPCRequest) -> I,
) -> Result<I, String> {
match route_server_message(message)? {
RpcServerInboundMessage::Request(request) => {
if let Some(route) = self.request_routes.get(request.method.as_str()) {
Ok(route(request))
} else {
Ok(unknown_request(request))
}
}
RpcServerInboundMessage::Notification(notification) => {
let Some(route) = self.notification_routes.get(notification.method.as_str()) else {
return Err(format!(
"unexpected notification method: {}",
notification.method
));
};
route(notification)
}
}
}
}
pub(crate) fn route_server_message(
message: JSONRPCMessage,
) -> Result<RpcServerInboundMessage, String> {
match message {
JSONRPCMessage::Request(request) => Ok(RpcServerInboundMessage::Request(request)),
JSONRPCMessage::Notification(notification) => {
Ok(RpcServerInboundMessage::Notification(notification))
}
JSONRPCMessage::Response(response) => Err(format!(
"unexpected client response for request id {:?}",
response.id
)),
JSONRPCMessage::Error(error) => Err(format!(
"unexpected client error for request id {:?}",
error.id
)),
}
}
pub(crate) fn encode_server_message(
message: RpcServerOutboundMessage,
) -> Result<JSONRPCMessage, serde_json::Error> {
match message {
RpcServerOutboundMessage::Response { request_id, result } => {
Ok(JSONRPCMessage::Response(JSONRPCResponse {
id: request_id,
result,
}))
}
RpcServerOutboundMessage::Error { request_id, error } => {
Ok(JSONRPCMessage::Error(JSONRPCError {
id: request_id,
error,
}))
}
}
}

View File

@@ -1,8 +1,6 @@
use codex_app_server_protocol::JSONRPCError;
use codex_app_server_protocol::JSONRPCErrorError;
use codex_app_server_protocol::JSONRPCMessage;
use codex_app_server_protocol::JSONRPCRequest;
use codex_app_server_protocol::JSONRPCResponse;
use codex_app_server_protocol::RequestId;
use tokio::io::AsyncBufReadExt;
use tokio::io::AsyncWriteExt;
@@ -12,6 +10,9 @@ use crate::protocol::INITIALIZE_METHOD;
use crate::protocol::INITIALIZED_METHOD;
use crate::protocol::InitializeResponse;
use crate::protocol::PROTOCOL_VERSION;
use crate::rpc::RpcRouter;
use crate::rpc::RpcServerOutboundMessage;
use crate::rpc::encode_server_message;
pub async fn run_main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let mut stdin = BufReader::new(tokio::io::stdin()).lines();
@@ -23,36 +24,20 @@ pub async fn run_main() -> Result<(), Box<dyn std::error::Error + Send + Sync>>
}
let message = serde_json::from_str::<JSONRPCMessage>(&line)?;
match message {
JSONRPCMessage::Request(request) => {
handle_request(request, &mut stdout).await?;
let mut router = RpcRouter::new();
router.raw_request(INITIALIZE_METHOD, handle_request);
router.notification(INITIALIZED_METHOD, |_| Ok(None));
match router.route_message(message, unknown_request) {
Ok(Some(outbound)) => {
send_message(&mut stdout, outbound).await?;
}
JSONRPCMessage::Notification(notification) => {
if notification.method != INITIALIZED_METHOD {
send_error(
&mut stdout,
RequestId::Integer(-1),
invalid_request(format!(
"unexpected notification method: {}",
notification.method
)),
)
.await?;
}
}
JSONRPCMessage::Response(response) => {
Ok(None) => {}
Err(message) => {
send_error(
&mut stdout,
response.id,
invalid_request("unexpected response from client".to_string()),
)
.await?;
}
JSONRPCMessage::Error(error) => {
send_error(
&mut stdout,
error.id,
invalid_request("unexpected error from client".to_string()),
RequestId::Integer(-1),
invalid_request(message),
)
.await?;
}
@@ -62,44 +47,32 @@ pub async fn run_main() -> Result<(), Box<dyn std::error::Error + Send + Sync>>
Ok(())
}
async fn handle_request(
request: JSONRPCRequest,
stdout: &mut tokio::io::Stdout,
) -> Result<(), std::io::Error> {
match request.method.as_str() {
INITIALIZE_METHOD => {
let result = serde_json::to_value(InitializeResponse {
protocol_version: PROTOCOL_VERSION.to_string(),
})
.map_err(std::io::Error::other)?;
send_response(
stdout,
JSONRPCResponse {
id: request.id,
result,
},
)
.await
fn handle_request(request: JSONRPCRequest) -> Option<RpcServerOutboundMessage> {
let result = match serde_json::to_value(InitializeResponse {
protocol_version: PROTOCOL_VERSION.to_string(),
}) {
Ok(result) => result,
Err(err) => {
return Some(RpcServerOutboundMessage::Error {
request_id: request.id,
error: internal_error(err.to_string()),
});
}
method => {
send_error(
stdout,
request.id,
method_not_implemented(format!(
"exec-server stub does not implement `{method}` yet"
)),
)
.await
}
}
};
Some(RpcServerOutboundMessage::Response {
request_id: request.id,
result,
})
}
async fn send_response(
stdout: &mut tokio::io::Stdout,
response: JSONRPCResponse,
) -> Result<(), std::io::Error> {
send_message(stdout, &JSONRPCMessage::Response(response)).await
fn unknown_request(request: JSONRPCRequest) -> Option<RpcServerOutboundMessage> {
Some(RpcServerOutboundMessage::Error {
request_id: request.id,
error: method_not_implemented(format!(
"exec-server stub does not implement `{}` yet",
request.method
)),
})
}
async fn send_error(
@@ -107,14 +80,22 @@ async fn send_error(
id: RequestId,
error: JSONRPCErrorError,
) -> Result<(), std::io::Error> {
send_message(stdout, &JSONRPCMessage::Error(JSONRPCError { id, error })).await
send_message(
stdout,
RpcServerOutboundMessage::Error {
request_id: id,
error,
},
)
.await
}
async fn send_message(
stdout: &mut tokio::io::Stdout,
message: &JSONRPCMessage,
message: RpcServerOutboundMessage,
) -> Result<(), std::io::Error> {
let encoded = serde_json::to_vec(message).map_err(std::io::Error::other)?;
let message = encode_server_message(message).map_err(std::io::Error::other)?;
let encoded = serde_json::to_vec(&message).map_err(std::io::Error::other)?;
stdout.write_all(&encoded).await?;
stdout.write_all(b"\n").await?;
stdout.flush().await
@@ -135,3 +116,11 @@ fn method_not_implemented(message: String) -> JSONRPCErrorError {
data: None,
}
}
fn internal_error(message: String) -> JSONRPCErrorError {
JSONRPCErrorError {
code: -32603,
message,
data: None,
}
}