feat: exec-server prep for unified exec

This commit is contained in:
jif-oai
2026-03-24 22:14:43 +00:00
parent 621862a7d1
commit 3ce6a55209
11 changed files with 528 additions and 155 deletions

View File

@@ -1,6 +1,7 @@
use std::collections::HashMap;
use std::collections::VecDeque;
use std::sync::Arc;
use std::sync::Mutex as StdMutex;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use std::time::Duration;
@@ -15,9 +16,12 @@ use tokio::sync::broadcast;
use tokio::sync::mpsc;
use tracing::warn;
use crate::ExecBackend;
use crate::ExecProcess;
use crate::ExecServerError;
use crate::ExecServerEvent;
use crate::ExecSessionEvent;
use crate::protocol::EXEC_CLOSED_METHOD;
use crate::protocol::ExecClosedNotification;
use crate::protocol::ExecExitedNotification;
use crate::protocol::ExecOutputDeltaNotification;
use crate::protocol::ExecOutputStream;
@@ -60,6 +64,9 @@ struct RunningProcess {
next_seq: u64,
exit_code: Option<i32>,
output_notify: Arc<Notify>,
session_events_tx: broadcast::Sender<ExecSessionEvent>,
open_streams: usize,
closed: bool,
}
enum ProcessEntry {
@@ -69,7 +76,6 @@ enum ProcessEntry {
struct Inner {
notifications: RpcNotificationSender,
events_tx: broadcast::Sender<ExecServerEvent>,
processes: Mutex<HashMap<String, ProcessEntry>>,
initialize_requested: AtomicBool,
initialized: AtomicBool,
@@ -80,6 +86,13 @@ pub(crate) struct LocalProcess {
inner: Arc<Inner>,
}
#[derive(Clone)]
struct LocalExecProcess {
process_id: String,
events: StdMutex<broadcast::Receiver<ExecSessionEvent>>,
backend: LocalProcess,
}
impl Default for LocalProcess {
fn default() -> Self {
let (outgoing_tx, mut outgoing_rx) =
@@ -94,7 +107,6 @@ impl LocalProcess {
Self {
inner: Arc::new(Inner {
notifications,
events_tx: broadcast::channel(EVENT_CHANNEL_CAPACITY).0,
processes: Mutex::new(HashMap::new()),
initialize_requested: AtomicBool::new(false),
initialized: AtomicBool::new(false),
@@ -113,6 +125,10 @@ impl LocalProcess {
})
.collect::<Vec<_>>()
};
warn!(
remaining_processes = remaining.len(),
"exec-server shutting down local process handler"
);
for process in remaining {
process.session.terminate();
}
@@ -124,6 +140,7 @@ impl LocalProcess {
"initialize may only be sent once per connection".to_string(),
));
}
warn!("exec-server received initialize request");
Ok(InitializeResponse {})
}
@@ -132,6 +149,7 @@ impl LocalProcess {
return Err("received `initialized` notification before `initialize`".into());
}
self.inner.initialized.store(true, Ordering::SeqCst);
warn!("exec-server received initialized notification");
Ok(())
}
@@ -152,9 +170,19 @@ impl LocalProcess {
Ok(())
}
pub(crate) async fn exec(&self, params: ExecParams) -> Result<ExecResponse, JSONRPCErrorError> {
async fn start_process(
&self,
params: ExecParams,
) -> Result<(ExecResponse, broadcast::Receiver<ExecSessionEvent>), JSONRPCErrorError> {
self.require_initialized_for("exec")?;
let process_id = params.process_id.clone();
warn!(
process_id = %process_id,
tty = params.tty,
cwd = %params.cwd.display(),
argv = ?params.argv,
"exec-server starting process"
);
let (program, args) = params
.argv
@@ -198,11 +226,17 @@ impl LocalProcess {
if matches!(process_map.get(&process_id), Some(ProcessEntry::Starting)) {
process_map.remove(&process_id);
}
warn!(
process_id = %process_id,
error = %err,
"exec-server failed to spawn process"
);
return Err(internal_error(err.to_string()));
}
};
let output_notify = Arc::new(Notify::new());
let (session_events_tx, session_events_rx) = broadcast::channel(EVENT_CHANNEL_CAPACITY);
{
let mut process_map = self.inner.processes.lock().await;
process_map.insert(
@@ -215,6 +249,9 @@ impl LocalProcess {
next_seq: 1,
exit_code: None,
output_notify: Arc::clone(&output_notify),
session_events_tx,
open_streams: 2,
closed: false,
})),
);
}
@@ -248,7 +285,18 @@ impl LocalProcess {
output_notify,
));
Ok(ExecResponse { process_id })
warn!(
process_id = %process_id,
tty = params.tty,
"exec-server started process"
);
Ok((ExecResponse { process_id }, session_events_rx))
}
pub(crate) async fn exec(&self, params: ExecParams) -> Result<ExecResponse, JSONRPCErrorError> {
self.start_process(params)
.await
.map(|(response, _)| response)
}
pub(crate) async fn exec_read(
@@ -256,6 +304,7 @@ impl LocalProcess {
params: ReadParams,
) -> Result<ReadResponse, JSONRPCErrorError> {
self.require_initialized_for("exec")?;
let process_id = params.process_id.clone();
let after_seq = params.after_seq.unwrap_or(0);
let max_bytes = params.max_bytes.unwrap_or(usize::MAX);
let wait = Duration::from_millis(params.wait_ms.unwrap_or(0));
@@ -309,6 +358,23 @@ impl LocalProcess {
|| response.exited
|| tokio::time::Instant::now() >= deadline
{
let total_bytes: usize = response
.chunks
.iter()
.map(|chunk| chunk.chunk.0.len())
.sum();
warn!(
process_id = %process_id,
after_seq,
next_seq = response.next_seq,
chunk_count = response.chunks.len(),
total_bytes,
exited = response.exited,
exit_code = ?response.exit_code,
wait_ms = params.wait_ms.unwrap_or(0),
max_bytes,
"exec-server returning process/read response"
);
return Ok(response);
}
@@ -325,6 +391,8 @@ impl LocalProcess {
params: WriteParams,
) -> Result<WriteResponse, JSONRPCErrorError> {
self.require_initialized_for("exec")?;
let process_id = params.process_id.clone();
let input_bytes = params.chunk.0.len();
let writer_tx = {
let process_map = self.inner.processes.lock().await;
let process = process_map.get(&params.process_id).ok_or_else(|| {
@@ -350,6 +418,11 @@ impl LocalProcess {
.await
.map_err(|_| internal_error("failed to write to process stdin".to_string()))?;
warn!(
process_id = %process_id,
input_bytes,
"exec-server wrote stdin to process"
);
Ok(WriteResponse { accepted: true })
}
@@ -358,6 +431,7 @@ impl LocalProcess {
params: TerminateParams,
) -> Result<TerminateResponse, JSONRPCErrorError> {
self.require_initialized_for("exec")?;
let process_id = params.process_id.clone();
let running = {
let process_map = self.inner.processes.lock().await;
match process_map.get(&params.process_id) {
@@ -372,43 +446,78 @@ impl LocalProcess {
}
};
warn!(
process_id = %process_id,
running,
"exec-server processed terminate request"
);
Ok(TerminateResponse { running })
}
}
#[async_trait]
impl ExecProcess for LocalProcess {
async fn start(&self, params: ExecParams) -> Result<ExecResponse, ExecServerError> {
self.exec(params).await.map_err(map_handler_error)
impl ExecBackend for LocalProcess {
async fn start(&self, params: ExecParams) -> Result<Arc<dyn ExecProcess>, ExecServerError> {
let (response, events) = self
.start_process(params)
.await
.map_err(map_handler_error)?;
Ok(Arc::new(LocalExecProcess {
process_id: response.process_id,
events: StdMutex::new(events),
backend: self.clone(),
}))
}
}
#[async_trait]
impl ExecProcess for LocalExecProcess {
fn process_id(&self) -> &str {
&self.process_id
}
async fn read(&self, params: ReadParams) -> Result<ReadResponse, ExecServerError> {
self.exec_read(params).await.map_err(map_handler_error)
fn subscribe(&self) -> broadcast::Receiver<ExecSessionEvent> {
self
.events
.lock()
.expect("local exec process events mutex should not be poisoned")
.resubscribe()
}
async fn write(
&self,
process_id: &str,
chunk: Vec<u8>,
) -> Result<WriteResponse, ExecServerError> {
self.exec_write(WriteParams {
process_id: process_id.to_string(),
chunk: chunk.into(),
})
.await
.map_err(map_handler_error)
async fn write_stdin(&self, chunk: Vec<u8>) -> Result<(), ExecServerError> {
self.backend.write_stdin(&self.process_id, chunk).await
}
async fn terminate(&self, process_id: &str) -> Result<TerminateResponse, ExecServerError> {
async fn terminate(&self) -> Result<(), ExecServerError> {
self.backend.terminate(&self.process_id).await
}
}
impl LocalProcess {
async fn write_stdin(&self, process_id: &str, chunk: Vec<u8>) -> Result<(), ExecServerError> {
let response = self
.exec_write(WriteParams {
process_id: process_id.to_string(),
chunk: chunk.into(),
})
.await
.map_err(map_handler_error)?;
if response.accepted {
Ok(())
} else {
Err(ExecServerError::Protocol(format!(
"exec-server did not accept stdin for process {process_id}"
)))
}
}
async fn terminate(&self, process_id: &str) -> Result<(), ExecServerError> {
self.terminate_process(TerminateParams {
process_id: process_id.to_string(),
})
.await
.map_err(map_handler_error)
}
fn subscribe_events(&self) -> broadcast::Receiver<ExecServerEvent> {
self.inner.events_tx.subscribe()
.map_err(map_handler_error)?;
Ok(())
}
}
@@ -427,6 +536,7 @@ async fn stream_output(
output_notify: Arc<Notify>,
) {
while let Some(chunk) = receiver.recv().await {
let chunk_len = chunk.len();
let notification = {
let mut processes = inner.processes.lock().await;
let Some(entry) = processes.get_mut(&process_id) else {
@@ -452,17 +562,26 @@ async fn stream_output(
"retained output cap exceeded for process {process_id}; dropping oldest output"
);
}
let event = ExecSessionEvent::Output {
seq,
stream,
chunk: chunk.clone(),
};
let _ = process.session_events_tx.send(event);
ExecOutputDeltaNotification {
process_id: process_id.clone(),
seq,
stream,
chunk: chunk.into(),
}
};
warn!(
process_id = %process_id,
?stream,
chunk_bytes = chunk_len,
"exec-server emitted output chunk"
);
output_notify.notify_waiters();
let _ = inner
.events_tx
.send(ExecServerEvent::OutputDelta(notification.clone()));
if inner
.notifications
.notify(crate::protocol::EXEC_OUTPUT_DELTA_METHOD, &notification)
@@ -472,6 +591,8 @@ async fn stream_output(
break;
}
}
finish_output_stream(process_id, inner).await;
}
async fn watch_exit(
@@ -481,29 +602,42 @@ async fn watch_exit(
output_notify: Arc<Notify>,
) {
let exit_code = exit_rx.await.unwrap_or(-1);
{
warn!(
process_id = %process_id,
exit_code,
"exec-server observed process exit"
);
let notification = {
let mut processes = inner.processes.lock().await;
if let Some(ProcessEntry::Running(process)) = processes.get_mut(&process_id) {
let seq = process.next_seq;
process.next_seq += 1;
process.exit_code = Some(exit_code);
let _ = process
.session_events_tx
.send(ExecSessionEvent::Exited { seq, exit_code });
Some(ExecExitedNotification {
process_id: process_id.clone(),
seq,
exit_code,
})
} else {
None
}
}
output_notify.notify_waiters();
let notification = ExecExitedNotification {
process_id: process_id.clone(),
exit_code,
};
let _ = inner
.events_tx
.send(ExecServerEvent::Exited(notification.clone()));
if inner
.notifications
.notify(crate::protocol::EXEC_EXITED_METHOD, &notification)
.await
.is_err()
output_notify.notify_waiters();
if let Some(notification) = notification
&& inner
.notifications
.notify(crate::protocol::EXEC_EXITED_METHOD, &notification)
.await
.is_err()
{
return;
}
maybe_emit_closed(process_id.clone(), Arc::clone(&inner)).await;
tokio::time::sleep(EXITED_PROCESS_RETENTION).await;
let mut processes = inner.processes.lock().await;
if matches!(
@@ -511,5 +645,62 @@ async fn watch_exit(
Some(ProcessEntry::Running(process)) if process.exit_code == Some(exit_code)
) {
processes.remove(&process_id);
warn!(
process_id = %process_id,
exit_code,
"exec-server evicted exited process from retention cache"
);
}
}
async fn finish_output_stream(process_id: String, inner: Arc<Inner>) {
{
let mut processes = inner.processes.lock().await;
let Some(ProcessEntry::Running(process)) = processes.get_mut(&process_id) else {
return;
};
if process.open_streams > 0 {
process.open_streams -= 1;
}
}
maybe_emit_closed(process_id, inner).await;
}
async fn maybe_emit_closed(process_id: String, inner: Arc<Inner>) {
let notification = {
let mut processes = inner.processes.lock().await;
let Some(ProcessEntry::Running(process)) = processes.get_mut(&process_id) else {
return;
};
if process.closed || process.open_streams != 0 || process.exit_code.is_none() {
return;
}
process.closed = true;
let seq = process.next_seq;
process.next_seq += 1;
let _ = process
.session_events_tx
.send(ExecSessionEvent::Closed { seq });
Some(ExecClosedNotification {
process_id: process_id.clone(),
seq,
})
};
let Some(notification) = notification else {
return;
};
if inner
.notifications
.notify(EXEC_CLOSED_METHOD, &notification)
.await
.is_err()
{
return;
}
}