Add exec-server exec RPC implementation (#15090)

Stacked PR 2/3, based on the stub PR.

Adds the exec RPC implementation and process/event flow in exec-server
only.

---------

Co-authored-by: Codex <noreply@openai.com>
This commit is contained in:
starr-openai
2026-03-19 12:00:36 -07:00
committed by GitHub
parent b87ba0a3cc
commit 1d210f639e
16 changed files with 1891 additions and 141 deletions

View File

@@ -1,53 +1,109 @@
use codex_app_server_protocol::JSONRPCMessage;
use codex_app_server_protocol::JSONRPCNotification;
use codex_app_server_protocol::JSONRPCRequest;
use tracing::debug;
use std::sync::Arc;
use crate::connection::JsonRpcConnection;
use crate::connection::JsonRpcConnectionEvent;
use crate::protocol::INITIALIZE_METHOD;
use crate::protocol::INITIALIZED_METHOD;
use crate::protocol::InitializeParams;
use crate::server::ExecServerHandler;
use crate::server::jsonrpc::invalid_params;
use crate::server::jsonrpc::invalid_request_message;
use crate::server::jsonrpc::method_not_found;
use crate::server::jsonrpc::response_message;
use tokio::sync::mpsc;
use tracing::debug;
use tracing::warn;
pub(crate) async fn run_connection(connection: JsonRpcConnection) {
let (json_outgoing_tx, mut incoming_rx, _connection_tasks) = connection.into_parts();
let handler = ExecServerHandler::new();
use crate::connection::CHANNEL_CAPACITY;
use crate::connection::JsonRpcConnection;
use crate::connection::JsonRpcConnectionEvent;
use crate::rpc::RpcNotificationSender;
use crate::rpc::RpcServerOutboundMessage;
use crate::rpc::encode_server_message;
use crate::rpc::invalid_request;
use crate::rpc::method_not_found;
use crate::server::ExecServerHandler;
use crate::server::registry::build_router;
while let Some(event) = incoming_rx.recv().await {
match event {
JsonRpcConnectionEvent::Message(message) => {
let response = match handle_connection_message(&handler, message).await {
Ok(response) => response,
Err(err) => {
tracing::warn!(
"closing exec-server connection after protocol error: {err}"
);
break;
}
};
let Some(response) = response else {
continue;
};
if json_outgoing_tx.send(response).await.is_err() {
pub(crate) async fn run_connection(connection: JsonRpcConnection) {
let router = Arc::new(build_router());
let (json_outgoing_tx, mut incoming_rx, connection_tasks) = connection.into_parts();
let (outgoing_tx, mut outgoing_rx) =
mpsc::channel::<RpcServerOutboundMessage>(CHANNEL_CAPACITY);
let notifications = RpcNotificationSender::new(outgoing_tx.clone());
let handler = Arc::new(ExecServerHandler::new(notifications));
let outbound_task = tokio::spawn(async move {
while let Some(message) = outgoing_rx.recv().await {
let json_message = match encode_server_message(message) {
Ok(json_message) => json_message,
Err(err) => {
warn!("failed to serialize exec-server outbound message: {err}");
break;
}
};
if json_outgoing_tx.send(json_message).await.is_err() {
break;
}
}
});
// Process inbound events sequentially to preserve initialize/initialized ordering.
while let Some(event) = incoming_rx.recv().await {
match event {
JsonRpcConnectionEvent::MalformedMessage { reason } => {
warn!("ignoring malformed exec-server message: {reason}");
if json_outgoing_tx
.send(invalid_request_message(reason))
if outgoing_tx
.send(RpcServerOutboundMessage::Error {
request_id: codex_app_server_protocol::RequestId::Integer(-1),
error: invalid_request(reason),
})
.await
.is_err()
{
break;
}
}
JsonRpcConnectionEvent::Message(message) => match message {
codex_app_server_protocol::JSONRPCMessage::Request(request) => {
if let Some(route) = router.request_route(request.method.as_str()) {
let message = route(handler.clone(), request).await;
if outgoing_tx.send(message).await.is_err() {
break;
}
} else if outgoing_tx
.send(RpcServerOutboundMessage::Error {
request_id: request.id,
error: method_not_found(format!(
"exec-server stub does not implement `{}` yet",
request.method
)),
})
.await
.is_err()
{
break;
}
}
codex_app_server_protocol::JSONRPCMessage::Notification(notification) => {
let Some(route) = router.notification_route(notification.method.as_str())
else {
warn!(
"closing exec-server connection after unexpected notification: {}",
notification.method
);
break;
};
if let Err(err) = route(handler.clone(), notification).await {
warn!("closing exec-server connection after protocol error: {err}");
break;
}
}
codex_app_server_protocol::JSONRPCMessage::Response(response) => {
warn!(
"closing exec-server connection after unexpected client response: {:?}",
response.id
);
break;
}
codex_app_server_protocol::JSONRPCMessage::Error(error) => {
warn!(
"closing exec-server connection after unexpected client error: {:?}",
error.id
);
break;
}
},
JsonRpcConnectionEvent::Disconnected { reason } => {
if let Some(reason) = reason {
debug!("exec-server connection disconnected: {reason}");
@@ -58,64 +114,10 @@ pub(crate) async fn run_connection(connection: JsonRpcConnection) {
}
handler.shutdown().await;
}
pub(crate) async fn handle_connection_message(
handler: &ExecServerHandler,
message: JSONRPCMessage,
) -> Result<Option<JSONRPCMessage>, String> {
match message {
JSONRPCMessage::Request(request) => Ok(Some(dispatch_request(handler, request))),
JSONRPCMessage::Notification(notification) => {
handle_notification(handler, notification)?;
Ok(None)
}
JSONRPCMessage::Response(response) => Err(format!(
"unexpected client response for request id {:?}",
response.id
)),
JSONRPCMessage::Error(error) => Err(format!(
"unexpected client error for request id {:?}",
error.id
)),
}
}
fn dispatch_request(handler: &ExecServerHandler, request: JSONRPCRequest) -> JSONRPCMessage {
let JSONRPCRequest {
id,
method,
params,
trace: _,
} = request;
match method.as_str() {
INITIALIZE_METHOD => {
let result = serde_json::from_value::<InitializeParams>(
params.unwrap_or(serde_json::Value::Null),
)
.map_err(|err| invalid_params(err.to_string()))
.and_then(|_params| handler.initialize())
.and_then(|response| {
serde_json::to_value(response).map_err(|err| invalid_params(err.to_string()))
});
response_message(id, result)
}
other => response_message(
id,
Err(method_not_found(format!(
"exec-server stub does not implement `{other}` yet"
))),
),
}
}
fn handle_notification(
handler: &ExecServerHandler,
notification: JSONRPCNotification,
) -> Result<(), String> {
match notification.method.as_str() {
INITIALIZED_METHOD => handler.initialized(),
other => Err(format!("unexpected notification method: {other}")),
drop(outgoing_tx);
for task in connection_tasks {
task.abort();
let _ = task.await;
}
let _ = outbound_task.await;
}