Remove outer handler mutex from exec-server RPC base

Co-authored-by: Codex <noreply@openai.com>
This commit is contained in:
starr-openai
2026-03-18 14:34:21 -07:00
parent c5dbe421bb
commit 66f49ea604
4 changed files with 20 additions and 31 deletions

View File

@@ -1,3 +1,6 @@
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use codex_app_server_protocol::JSONRPCErrorError;
use crate::protocol::InitializeResponse;
@@ -6,38 +9,37 @@ use crate::rpc::RpcNotificationSender;
pub(crate) struct ExecServerHandler {
_notifications: RpcNotificationSender,
initialize_requested: bool,
initialized: bool,
initialize_requested: AtomicBool,
initialized: AtomicBool,
}
impl ExecServerHandler {
pub(crate) fn new(notifications: RpcNotificationSender) -> Self {
Self {
_notifications: notifications,
initialize_requested: false,
initialized: false,
initialize_requested: AtomicBool::new(false),
initialized: AtomicBool::new(false),
}
}
pub(crate) async fn shutdown(&self) {}
pub(crate) fn initialize(&mut self) -> Result<InitializeResponse, JSONRPCErrorError> {
if self.initialize_requested {
pub(crate) fn initialize(&self) -> Result<InitializeResponse, JSONRPCErrorError> {
if self.initialize_requested.swap(true, Ordering::SeqCst) {
return Err(crate::rpc::invalid_request(
"initialize may only be sent once per connection".to_string(),
));
}
self.initialize_requested = true;
Ok(InitializeResponse {
protocol_version: PROTOCOL_VERSION.to_string(),
})
}
pub(crate) fn initialized(&mut self) -> Result<(), String> {
if !self.initialize_requested {
pub(crate) fn initialized(&self) -> Result<(), String> {
if !self.initialize_requested.load(Ordering::SeqCst) {
return Err("received `initialized` notification before `initialize`".into());
}
self.initialized = true;
self.initialized.store(true, Ordering::SeqCst);
Ok(())
}
}

View File

@@ -1,6 +1,5 @@
use std::sync::Arc;
use tokio::sync::Mutex;
use tokio::sync::mpsc;
use tracing::debug;
use tracing::warn;
@@ -21,7 +20,7 @@ pub(crate) async fn run_connection(connection: JsonRpcConnection) {
let (outgoing_tx, mut outgoing_rx) =
mpsc::channel::<RpcServerOutboundMessage>(CHANNEL_CAPACITY);
let notifications = RpcNotificationSender::new(outgoing_tx.clone());
let handler = Arc::new(Mutex::new(ExecServerHandler::new(notifications)));
let handler = Arc::new(ExecServerHandler::new(notifications));
let outbound_task = tokio::spawn(async move {
while let Some(message) = outgoing_rx.recv().await {
@@ -101,7 +100,7 @@ pub(crate) async fn run_connection(connection: JsonRpcConnection) {
}
}
handler.lock().await.shutdown().await;
handler.shutdown().await;
drop(outgoing_tx);
let _ = outbound_task.await;
}

View File

@@ -1,26 +1,20 @@
use std::sync::Arc;
use tokio::sync::Mutex;
use crate::protocol::INITIALIZE_METHOD;
use crate::protocol::INITIALIZED_METHOD;
use crate::protocol::InitializeParams;
use crate::rpc::RpcRouter;
use crate::server::ExecServerHandler;
pub(crate) fn build_router() -> RpcRouter<Mutex<ExecServerHandler>> {
pub(crate) fn build_router() -> RpcRouter<ExecServerHandler> {
let mut router = RpcRouter::new();
router.request(
INITIALIZE_METHOD,
|handler: Arc<Mutex<ExecServerHandler>>, _params: InitializeParams| async move {
handler.lock().await.initialize()
},
|handler: Arc<ExecServerHandler>, _params: InitializeParams| async move { handler.initialize() },
);
router.notification(
INITIALIZED_METHOD,
|handler: Arc<Mutex<ExecServerHandler>>, (): ()| async move {
handler.lock().await.initialized()
},
|handler: Arc<ExecServerHandler>, (): ()| async move { handler.initialized() },
);
router
}