Clarify exec-server transport lifetime ownership

Co-authored-by: Codex <noreply@openai.com>
This commit is contained in:
starr-openai
2026-05-04 11:10:40 -07:00
parent 3d7522777c
commit 7834efe652
4 changed files with 28 additions and 15 deletions

View File

@@ -114,7 +114,7 @@ impl ExecServerClient {
stdin,
format!("exec-server stdio command `{shell_command}`"),
)
.with_lifetime_guard(Box::new(StdioChildGuard { child: Some(child) })),
.with_transport_lifetime(Box::new(StdioChildGuard { child: Some(child) })),
args.into(),
)
.await

View File

@@ -15,13 +15,13 @@ use tokio::io::BufWriter;
pub(crate) const CHANNEL_CAPACITY: usize = 128;
pub(crate) type JsonRpcConnectionLifetimeGuard = Box<dyn Send>;
pub(crate) type JsonRpcTransportLifetime = Box<dyn Send>;
pub(crate) type JsonRpcConnectionParts = (
mpsc::Sender<JSONRPCMessage>,
mpsc::Receiver<JsonRpcConnectionEvent>,
watch::Receiver<bool>,
Vec<tokio::task::JoinHandle<()>>,
Option<JsonRpcConnectionLifetimeGuard>,
Option<JsonRpcTransportLifetime>,
);
#[derive(Debug)]
@@ -36,7 +36,7 @@ pub(crate) struct JsonRpcConnection {
incoming_rx: mpsc::Receiver<JsonRpcConnectionEvent>,
disconnected_rx: watch::Receiver<bool>,
task_handles: Vec<tokio::task::JoinHandle<()>>,
lifetime_guard: Option<JsonRpcConnectionLifetimeGuard>,
transport_lifetime: Option<JsonRpcTransportLifetime>,
}
impl JsonRpcConnection {
@@ -127,7 +127,7 @@ impl JsonRpcConnection {
incoming_rx,
disconnected_rx,
task_handles: vec![reader_task, writer_task],
lifetime_guard: None,
transport_lifetime: None,
}
}
@@ -262,12 +262,12 @@ impl JsonRpcConnection {
incoming_rx,
disconnected_rx,
task_handles: vec![reader_task, writer_task],
lifetime_guard: None,
transport_lifetime: None,
}
}
pub(crate) fn with_lifetime_guard(mut self, guard: JsonRpcConnectionLifetimeGuard) -> Self {
self.lifetime_guard = Some(guard);
pub(crate) fn with_transport_lifetime(mut self, lifetime: JsonRpcTransportLifetime) -> Self {
self.transport_lifetime = Some(lifetime);
self
}
@@ -277,7 +277,7 @@ impl JsonRpcConnection {
self.incoming_rx,
self.disconnected_rx,
self.task_handles,
self.lifetime_guard,
self.transport_lifetime,
)
}
}

View File

@@ -24,7 +24,7 @@ use tokio::task::JoinHandle;
use crate::connection::JsonRpcConnection;
use crate::connection::JsonRpcConnectionEvent;
use crate::connection::JsonRpcConnectionLifetimeGuard;
use crate::connection::JsonRpcTransportLifetime;
#[derive(Debug)]
pub(crate) enum RpcCallError {
@@ -231,13 +231,19 @@ pub(crate) struct RpcClient {
disconnected_rx: watch::Receiver<bool>,
next_request_id: AtomicI64,
transport_tasks: Vec<JoinHandle<()>>,
_transport_lifetime_guard: Option<StdMutex<JsonRpcConnectionLifetimeGuard>>,
_transport_lifetime: Option<TransportLifetime>,
reader_task: JoinHandle<()>,
}
// Holds transport-owned resources, such as a stdio child process, for as long
// as the RPC client owns the underlying connection.
struct TransportLifetime {
_guard: StdMutex<JsonRpcTransportLifetime>,
}
impl RpcClient {
pub(crate) fn new(connection: JsonRpcConnection) -> (Self, mpsc::Receiver<RpcClientEvent>) {
let (write_tx, mut incoming_rx, disconnected_rx, transport_tasks, lifetime_guard) =
let (write_tx, mut incoming_rx, disconnected_rx, transport_tasks, transport_lifetime) =
connection.into_parts();
let pending = Arc::new(Mutex::new(HashMap::<RequestId, PendingRequest>::new()));
let (event_tx, event_rx) = mpsc::channel(128);
@@ -279,7 +285,9 @@ impl RpcClient {
disconnected_rx,
next_request_id: AtomicI64::new(1),
transport_tasks,
_transport_lifetime_guard: lifetime_guard.map(StdMutex::new),
_transport_lifetime: transport_lifetime.map(|lifetime| TransportLifetime {
_guard: StdMutex::new(lifetime),
}),
reader_task,
},
event_rx,

View File

@@ -47,8 +47,13 @@ async fn run_connection(
runtime_paths: ExecServerRuntimePaths,
) {
let router = Arc::new(build_router());
let (json_outgoing_tx, mut incoming_rx, mut disconnected_rx, connection_tasks, _lifetime_guard) =
connection.into_parts();
let (
json_outgoing_tx,
mut incoming_rx,
mut disconnected_rx,
connection_tasks,
_transport_lifetime,
) = connection.into_parts();
let (outgoing_tx, mut outgoing_rx) =
mpsc::channel::<RpcServerOutboundMessage>(CHANNEL_CAPACITY);
let notifications = RpcNotificationSender::new(outgoing_tx.clone());