Simplify exec-server transport internals

Keep environment transport connection policy on ExecServerClient instead of the transport enum, and replace the JSON-RPC connection tuple alias with named connection parts.

Co-authored-by: Codex <noreply@openai.com>
This commit is contained in:
starr-openai
2026-05-04 12:52:40 -07:00
parent caea51d3b7
commit 74e96987b8
5 changed files with 37 additions and 34 deletions

View File

@@ -207,7 +207,7 @@ impl LazyRemoteExecServerClient {
self.client
.get_or_try_init(|| {
let transport = self.transport.clone();
async move { transport.connect_for_environment().await }
async move { ExecServerClient::connect_for_environment(transport).await }
})
.await
.cloned()

View File

@@ -21,7 +21,6 @@ use tracing::warn;
use crate::ExecServerClient;
use crate::ExecServerError;
use crate::client_api::ExecServerTransport;
use crate::client_api::RemoteExecServerConnectArgs;
use crate::client_api::StdioExecServerConnectArgs;
use crate::connection::JsonRpcConnection;
@@ -32,11 +31,13 @@ const ENVIRONMENT_INITIALIZE_TIMEOUT: Duration = Duration::from_secs(5);
#[cfg(unix)]
const STDIO_CHILD_TERM_GRACE_PERIOD: Duration = Duration::from_millis(500);
impl ExecServerTransport {
pub(crate) async fn connect_for_environment(self) -> Result<ExecServerClient, ExecServerError> {
match self {
ExecServerTransport::WebSocketUrl(websocket_url) => {
ExecServerClient::connect_websocket(RemoteExecServerConnectArgs {
impl ExecServerClient {
pub(crate) async fn connect_for_environment(
transport: crate::client_api::ExecServerTransport,
) -> Result<Self, ExecServerError> {
match transport {
crate::client_api::ExecServerTransport::WebSocketUrl(websocket_url) => {
Self::connect_websocket(RemoteExecServerConnectArgs {
websocket_url,
client_name: ENVIRONMENT_CLIENT_NAME.to_string(),
connect_timeout: ENVIRONMENT_CONNECT_TIMEOUT,
@@ -45,8 +46,8 @@ impl ExecServerTransport {
})
.await
}
ExecServerTransport::StdioShellCommand(shell_command) => {
ExecServerClient::connect_stdio_command(StdioExecServerConnectArgs {
crate::client_api::ExecServerTransport::StdioShellCommand(shell_command) => {
Self::connect_stdio_command(StdioExecServerConnectArgs {
shell_command,
client_name: ENVIRONMENT_CLIENT_NAME.to_string(),
initialize_timeout: ENVIRONMENT_INITIALIZE_TIMEOUT,
@@ -56,9 +57,7 @@ impl ExecServerTransport {
}
}
}
}
impl ExecServerClient {
pub async fn connect_websocket(
args: RemoteExecServerConnectArgs,
) -> Result<Self, ExecServerError> {

View File

@@ -16,13 +16,14 @@ use tokio::io::BufWriter;
pub(crate) const CHANNEL_CAPACITY: usize = 128;
pub(crate) type JsonRpcTransportLifetime = Box<dyn Send>;
pub(crate) type JsonRpcConnectionParts = (
mpsc::Sender<JSONRPCMessage>,
mpsc::Receiver<JsonRpcConnectionEvent>,
watch::Receiver<bool>,
Vec<tokio::task::JoinHandle<()>>,
Option<JsonRpcTransportLifetime>,
);
pub(crate) struct JsonRpcConnectionParts {
pub(crate) outgoing_tx: mpsc::Sender<JSONRPCMessage>,
pub(crate) incoming_rx: mpsc::Receiver<JsonRpcConnectionEvent>,
pub(crate) disconnected_rx: watch::Receiver<bool>,
pub(crate) task_handles: Vec<tokio::task::JoinHandle<()>>,
pub(crate) transport_lifetime: Option<JsonRpcTransportLifetime>,
}
#[derive(Debug)]
pub(crate) enum JsonRpcConnectionEvent {
@@ -272,13 +273,13 @@ impl JsonRpcConnection {
}
pub(crate) fn into_parts(self) -> JsonRpcConnectionParts {
(
self.outgoing_tx,
self.incoming_rx,
self.disconnected_rx,
self.task_handles,
self.transport_lifetime,
)
JsonRpcConnectionParts {
outgoing_tx: self.outgoing_tx,
incoming_rx: self.incoming_rx,
disconnected_rx: self.disconnected_rx,
task_handles: self.task_handles,
transport_lifetime: self.transport_lifetime,
}
}
}

View File

@@ -243,8 +243,12 @@ struct TransportLifetime {
impl RpcClient {
pub(crate) fn new(connection: JsonRpcConnection) -> (Self, mpsc::Receiver<RpcClientEvent>) {
let (write_tx, mut incoming_rx, disconnected_rx, transport_tasks, transport_lifetime) =
connection.into_parts();
let connection_parts = connection.into_parts();
let write_tx = connection_parts.outgoing_tx;
let mut incoming_rx = connection_parts.incoming_rx;
let disconnected_rx = connection_parts.disconnected_rx;
let transport_tasks = connection_parts.task_handles;
let transport_lifetime = connection_parts.transport_lifetime;
let pending = Arc::new(Mutex::new(HashMap::<RequestId, PendingRequest>::new()));
let (event_tx, event_rx) = mpsc::channel(128);

View File

@@ -47,13 +47,12 @@ async fn run_connection(
runtime_paths: ExecServerRuntimePaths,
) {
let router = Arc::new(build_router());
let (
json_outgoing_tx,
mut incoming_rx,
mut disconnected_rx,
connection_tasks,
_transport_lifetime,
) = connection.into_parts();
let connection_parts = connection.into_parts();
let json_outgoing_tx = connection_parts.outgoing_tx;
let mut incoming_rx = connection_parts.incoming_rx;
let mut disconnected_rx = connection_parts.disconnected_rx;
let connection_tasks = connection_parts.task_handles;
let _transport_lifetime = connection_parts.transport_lifetime;
let (outgoing_tx, mut outgoing_rx) =
mpsc::channel::<RpcServerOutboundMessage>(CHANNEL_CAPACITY);
let notifications = RpcNotificationSender::new(outgoing_tx.clone());