Compare commits

...

2 Commits

Author SHA1 Message Date
starr-openai
b2ed5b0ab8 exec-server: drop protocol version from json-rpc base sketch
Co-authored-by: Codex <noreply@openai.com>
2026-03-18 23:18:12 +00:00
starr-openai
1bb5404998 exec-server: sketch app-server-shaped json-rpc base
Co-authored-by: Codex <noreply@openai.com>
2026-03-18 23:17:17 +00:00
8 changed files with 338 additions and 0 deletions

View File

@@ -0,0 +1,14 @@
# Exec-Server JSON-RPC Base Sketch
This is a review-only sketch of the simplified exec-server JSON-RPC base.
It is intentionally scoped to the transport and dispatch shape:
- websocket-only transport
- direct `JSONRPCMessage -> handler -> JSONRPCMessage` processing
- no `"jsonrpc": "2.0"` envelope requirement
- a local bypass path represented by direct handler calls rather than an in-memory transport
It does not include the later `process/*` or filesystem RPC surface.
The goal of this sketch is to show the app-server-like structure without stacking the rest of the exec-server implementation on top of it.

View File

@@ -0,0 +1,2 @@
pub mod protocol;
pub mod server;

View File

@@ -0,0 +1,16 @@
use serde::Deserialize;
use serde::Serialize;
pub const INITIALIZE_METHOD: &str = "initialize";
pub const INITIALIZED_METHOD: &str = "initialized";
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct InitializeParams {
pub client_name: String,
pub auth_token: Option<String>,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct InitializeResponse {}

View File

@@ -0,0 +1,15 @@
mod handler;
mod jsonrpc;
mod processor;
mod transport;
pub(crate) use handler::ExecServerHandler;
pub(crate) use jsonrpc::invalid_request;
pub(crate) use jsonrpc::unauthorized;
pub use transport::ExecServerTransport;
pub use transport::ExecServerTransportParseError;
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct ExecServerConfig {
pub auth_token: Option<String>,
}

View File

@@ -0,0 +1,51 @@
use crate::protocol::InitializeParams;
use crate::protocol::InitializeResponse;
use crate::server::invalid_request;
use crate::server::unauthorized;
pub(crate) struct ExecServerHandler {
required_auth_token: Option<String>,
initialize_requested: bool,
initialized: bool,
}
impl ExecServerHandler {
pub(crate) fn new(required_auth_token: Option<String>) -> Self {
Self {
required_auth_token,
initialize_requested: false,
initialized: false,
}
}
pub(crate) fn initialize(
&mut self,
params: InitializeParams,
) -> Result<InitializeResponse, codex_app_server_protocol::JSONRPCErrorError> {
if self.initialize_requested {
return Err(invalid_request(
"initialize may only be sent once per connection".to_string(),
));
}
if let Some(required_auth_token) = &self.required_auth_token
&& params.auth_token.as_deref() != Some(required_auth_token.as_str())
{
return Err(unauthorized("invalid exec-server auth token".to_string()));
}
self.initialize_requested = true;
Ok(InitializeResponse {})
}
pub(crate) fn initialized(&mut self) -> Result<(), String> {
if !self.initialize_requested {
return Err("received `initialized` notification before `initialize`".to_string());
}
self.initialized = true;
Ok(())
}
#[allow(dead_code)]
pub(crate) fn is_ready(&self) -> bool {
self.initialize_requested && self.initialized
}
}

View File

@@ -0,0 +1,17 @@
use codex_app_server_protocol::JSONRPCErrorError;
pub(crate) fn invalid_request(message: String) -> JSONRPCErrorError {
JSONRPCErrorError {
code: -32600,
data: None,
message,
}
}
pub(crate) fn unauthorized(message: String) -> JSONRPCErrorError {
JSONRPCErrorError {
code: -32001,
data: None,
message,
}
}

View File

@@ -0,0 +1,93 @@
use codex_app_server_protocol::JSONRPCError;
use codex_app_server_protocol::JSONRPCMessage;
use codex_app_server_protocol::JSONRPCNotification;
use codex_app_server_protocol::JSONRPCRequest;
use codex_app_server_protocol::JSONRPCResponse;
use codex_app_server_protocol::RequestId;
use crate::protocol::INITIALIZE_METHOD;
use crate::protocol::INITIALIZED_METHOD;
use crate::protocol::InitializeParams;
use crate::server::ExecServerHandler;
use crate::server::invalid_request;
pub(crate) async fn handle_connection_message(
handler: &mut ExecServerHandler,
message: JSONRPCMessage,
) -> Result<Option<JSONRPCMessage>, String> {
match message {
JSONRPCMessage::Request(request) => Ok(Some(dispatch_request(handler, request))),
JSONRPCMessage::Notification(notification) => {
handle_notification(handler, notification)?;
Ok(None)
}
JSONRPCMessage::Response(response) => Err(format!(
"unexpected client response for request id {:?}",
response.id
)),
JSONRPCMessage::Error(error) => Err(format!(
"unexpected client error for request id {:?}",
error.id
)),
}
}
fn dispatch_request(handler: &mut ExecServerHandler, request: JSONRPCRequest) -> JSONRPCMessage {
let JSONRPCRequest {
id,
method,
params,
trace: _,
} = request;
match method.as_str() {
INITIALIZE_METHOD => {
let result = serde_json::from_value::<InitializeParams>(
params.unwrap_or(serde_json::Value::Null),
)
.map_err(|err| codex_app_server_protocol::JSONRPCErrorError {
code: -32602,
data: None,
message: err.to_string(),
})
.and_then(|params| handler.initialize(params))
.and_then(|response| {
serde_json::to_value(response).map_err(|err| {
codex_app_server_protocol::JSONRPCErrorError {
code: -32603,
data: None,
message: err.to_string(),
}
})
});
response_message(id, result)
}
other => response_message(id, Err(invalid_request(format!("unknown method: {other}")))),
}
}
fn handle_notification(
handler: &mut ExecServerHandler,
notification: JSONRPCNotification,
) -> Result<(), String> {
match notification.method.as_str() {
INITIALIZED_METHOD => handler.initialized(),
other => Err(format!("unexpected notification method: {other}")),
}
}
fn response_message(
request_id: RequestId,
result: Result<serde_json::Value, codex_app_server_protocol::JSONRPCErrorError>,
) -> JSONRPCMessage {
match result {
Ok(result) => JSONRPCMessage::Response(JSONRPCResponse {
id: request_id,
result,
}),
Err(error) => JSONRPCMessage::Error(JSONRPCError {
id: request_id,
error,
}),
}
}

View File

@@ -0,0 +1,130 @@
use std::net::SocketAddr;
use std::str::FromStr;
use tokio::net::TcpListener;
use tokio_tungstenite::accept_async;
use tracing::warn;
use crate::server::ExecServerConfig;
use crate::server::ExecServerHandler;
use crate::server::processor::handle_connection_message;
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub enum ExecServerTransport {
WebSocket { bind_address: SocketAddr },
}
#[derive(Debug, Clone, Eq, PartialEq)]
pub enum ExecServerTransportParseError {
UnsupportedListenUrl(String),
InvalidWebSocketListenUrl(String),
}
impl std::fmt::Display for ExecServerTransportParseError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
ExecServerTransportParseError::UnsupportedListenUrl(listen_url) => write!(
f,
"unsupported --listen URL `{listen_url}`; expected `ws://IP:PORT`"
),
ExecServerTransportParseError::InvalidWebSocketListenUrl(listen_url) => write!(
f,
"invalid websocket --listen URL `{listen_url}`; expected `ws://IP:PORT`"
),
}
}
}
impl std::error::Error for ExecServerTransportParseError {}
impl ExecServerTransport {
pub const DEFAULT_LISTEN_URL: &str = "ws://127.0.0.1:0";
pub fn from_listen_url(listen_url: &str) -> Result<Self, ExecServerTransportParseError> {
if let Some(socket_addr) = listen_url.strip_prefix("ws://") {
let bind_address = socket_addr.parse::<SocketAddr>().map_err(|_| {
ExecServerTransportParseError::InvalidWebSocketListenUrl(listen_url.to_string())
})?;
return Ok(Self::WebSocket { bind_address });
}
Err(ExecServerTransportParseError::UnsupportedListenUrl(
listen_url.to_string(),
))
}
}
impl FromStr for ExecServerTransport {
type Err = ExecServerTransportParseError;
fn from_str(s: &str) -> Result<Self, Self::Err> {
Self::from_listen_url(s)
}
}
pub(crate) async fn run_transport(
transport: ExecServerTransport,
config: ExecServerConfig,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
match transport {
ExecServerTransport::WebSocket { bind_address } => {
run_websocket_listener(bind_address, config).await
}
}
}
async fn run_websocket_listener(
bind_address: SocketAddr,
config: ExecServerConfig,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let listener = TcpListener::bind(bind_address).await?;
loop {
let (stream, peer_addr) = listener.accept().await?;
let config = config.clone();
tokio::spawn(async move {
match accept_async(stream).await {
Ok(mut websocket) => {
let mut handler = ExecServerHandler::new(config.auth_token);
while let Some(message) = futures::StreamExt::next(&mut websocket).await {
let Ok(message) = message else {
break;
};
let tokio_tungstenite::tungstenite::Message::Text(text) = message else {
continue;
};
let Ok(message) =
serde_json::from_str::<codex_app_server_protocol::JSONRPCMessage>(
text.as_ref(),
)
else {
continue;
};
let Ok(response) = handle_connection_message(&mut handler, message).await
else {
break;
};
let Some(response) = response else {
continue;
};
let Ok(text) = serde_json::to_string(&response) else {
break;
};
if futures::SinkExt::send(
&mut websocket,
tokio_tungstenite::tungstenite::Message::Text(text.into()),
)
.await
.is_err()
{
break;
}
}
}
Err(err) => {
warn!("failed to accept exec-server websocket connection from {peer_addr}: {err}");
}
}
});
}
}