Compare commits

...

1 Commits

Author SHA1 Message Date
jif-oai
194e8e10f1 Bound MCP stderr log buffers 2026-05-18 10:59:59 +02:00
4 changed files with 137 additions and 36 deletions

View File

@@ -20,7 +20,6 @@
use std::future::Future;
use std::io;
use std::mem::take;
use std::sync::Arc;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
@@ -41,9 +40,10 @@ use serde_json::to_vec;
use tokio::runtime::Handle;
use tokio::sync::broadcast;
use tracing::debug;
use tracing::info;
use tracing::warn;
use crate::stderr_log::StderrLogBuffer;
static PROCESS_COUNTER: AtomicUsize = AtomicUsize::new(1);
// Remote public implementation.
@@ -76,7 +76,7 @@ pub(super) struct ExecutorProcessTransport {
stdout: Vec<u8>,
/// Buffered stderr bytes for diagnostic logging.
stderr: Vec<u8>,
stderr: StderrLogBuffer,
/// Whether the executor has reported process closure or a terminal
/// subscription failure. Once closed, any remaining partial stdout line is
@@ -101,12 +101,13 @@ impl ExecutorProcessTransport {
// process event log will replay anything that landed before this
// subscriber was attached.
let events = process.subscribe_events();
let stderr = StderrLogBuffer::new(program_name.clone());
Self {
process,
events,
program_name,
stdout: Vec::new(),
stderr: Vec::new(),
stderr,
closed: false,
terminated: false,
last_seq: 0,
@@ -312,33 +313,11 @@ impl ExecutorProcessTransport {
}
fn push_stderr(&mut self, bytes: &[u8]) {
// Keep stderr line-oriented in logs so a chatty MCP server does not
// produce one log record per byte chunk.
self.stderr.extend_from_slice(bytes);
while let Some(index) = self.stderr.iter().position(|byte| *byte == b'\n') {
let mut line = self.stderr.drain(..=index).collect::<Vec<_>>();
line.pop();
if line.last() == Some(&b'\r') {
line.pop();
}
info!(
"MCP server stderr ({}): {}",
self.program_name,
String::from_utf8_lossy(&line)
);
}
self.stderr.push(bytes);
}
fn flush_stderr(&mut self) {
if self.stderr.is_empty() {
return;
}
let line = take(&mut self.stderr);
info!(
"MCP server stderr ({}): {}",
self.program_name,
String::from_utf8_lossy(&line)
);
self.stderr.flush();
}
fn trim_trailing_carriage_return(mut line: Vec<u8>) -> Vec<u8> {

View File

@@ -8,6 +8,7 @@ mod oauth;
mod perform_oauth_login;
mod program_resolver;
mod rmcp_client;
mod stderr_log;
mod stdio_server_launcher;
mod utils;

View File

@@ -0,0 +1,119 @@
use tracing::info;
const MAX_STDERR_LOG_LINE_BYTES: usize = 16 * 1024;
pub(crate) struct StderrLogBuffer {
program_name: String,
buffer: Vec<u8>,
}
impl StderrLogBuffer {
pub(crate) fn new(program_name: String) -> Self {
Self {
program_name,
buffer: Vec::new(),
}
}
pub(crate) fn push(&mut self, mut bytes: &[u8]) {
while !bytes.is_empty() {
if let Some(newline_index) = bytes.iter().position(|byte| *byte == b'\n') {
self.push_without_newline(&bytes[..newline_index]);
self.log_complete_line();
bytes = &bytes[newline_index + 1..];
} else {
self.push_without_newline(bytes);
return;
}
}
}
pub(crate) fn flush(&mut self) {
if self.buffer.is_empty() {
return;
}
self.log_line("MCP server stderr");
self.buffer.clear();
}
fn push_without_newline(&mut self, mut bytes: &[u8]) {
while !bytes.is_empty() {
let remaining_capacity = MAX_STDERR_LOG_LINE_BYTES.saturating_sub(self.buffer.len());
if remaining_capacity == 0 {
self.log_line("MCP server stderr line exceeded limit");
self.buffer.clear();
continue;
}
let chunk_len = remaining_capacity.min(bytes.len());
self.buffer.extend_from_slice(&bytes[..chunk_len]);
bytes = &bytes[chunk_len..];
if self.buffer.len() >= MAX_STDERR_LOG_LINE_BYTES {
self.log_line("MCP server stderr line exceeded limit");
self.buffer.clear();
}
}
}
fn log_complete_line(&mut self) {
if self.buffer.last() == Some(&b'\r') {
self.buffer.pop();
}
if self.buffer.is_empty() {
return;
}
self.log_line("MCP server stderr");
self.buffer.clear();
}
fn log_line(&self, label: &str) {
info!(
"{} ({}): {}",
label,
self.program_name,
String::from_utf8_lossy(&self.buffer)
);
}
#[cfg(test)]
fn buffered_len(&self) -> usize {
self.buffer.len()
}
}
#[cfg(test)]
mod tests {
use pretty_assertions::assert_eq;
use super::*;
#[test]
fn no_newline_stderr_does_not_grow_past_log_limit() {
let mut buffer = StderrLogBuffer::new("server".to_string());
let bytes = vec![b'a'; MAX_STDERR_LOG_LINE_BYTES * 2 + 17];
buffer.push(&bytes);
assert_eq!(17, buffer.buffered_len());
}
#[test]
fn newline_flushes_buffered_line() {
let mut buffer = StderrLogBuffer::new("server".to_string());
buffer.push(b"hello\n");
assert_eq!(0, buffer.buffered_len());
}
#[test]
fn flush_clears_partial_line() {
let mut buffer = StderrLogBuffer::new("server".to_string());
buffer.push(b"hello");
buffer.flush();
assert_eq!(0, buffer.buffered_len());
}
}

View File

@@ -46,14 +46,13 @@ use rmcp::service::RxJsonRpcMessage;
use rmcp::service::TxJsonRpcMessage;
use rmcp::transport::Transport;
use rmcp::transport::child_process::TokioChildProcess;
use tokio::io::AsyncBufReadExt;
use tokio::io::BufReader;
use tokio::io::AsyncReadExt;
use tokio::process::Command;
use tracing::info;
use tracing::warn;
use crate::executor_process_transport::ExecutorProcessTransport;
use crate::program_resolver;
use crate::stderr_log::StderrLogBuffer;
use crate::utils::create_env_for_mcp_server;
use crate::utils::create_env_overlay_for_remote_mcp_server;
use crate::utils::remote_mcp_env_var_names;
@@ -272,13 +271,16 @@ impl LocalStdioServerLauncher {
if let Some(stderr) = stderr {
tokio::spawn(async move {
let mut reader = BufReader::new(stderr).lines();
let mut stderr = stderr;
let mut stderr_log = StderrLogBuffer::new(program_name.clone());
let mut buffer = [0_u8; 8192];
loop {
match reader.next_line().await {
Ok(Some(line)) => {
info!("MCP server stderr ({program_name}): {line}");
match stderr.read(&mut buffer).await {
Ok(0) => {
stderr_log.flush();
break;
}
Ok(None) => break,
Ok(bytes_read) => stderr_log.push(&buffer[..bytes_read]),
Err(error) => {
warn!("Failed to read MCP server stderr ({program_name}): {error}");
break;