exec-server: add remote transport diagnostics

This commit is contained in:
Michael Zeng
2026-05-19 00:33:02 -07:00
parent 3009e23644
commit bbd53bbd04
2 changed files with 75 additions and 12 deletions

View File

@@ -17,7 +17,7 @@ use tokio::sync::mpsc;
use tokio::sync::watch;
use tokio::time::timeout;
use tracing::debug;
use tracing::{debug, warn};
use crate::ProcessId;
use crate::client_api::ExecServerClientConnectOptions;
@@ -307,6 +307,10 @@ impl ExecServerClient {
*session_id = Some(response.session_id.clone());
}
self.notify_initialized().await?;
debug!(
session_id = %response.session_id,
"exec-server initialize handshake completed"
);
Ok(response)
})
.await
@@ -439,6 +443,10 @@ impl ExecServerClient {
&& let Err(err) =
handle_server_notification(&inner, notification).await
{
warn!(
error = %err,
"exec-server notification handling failed"
);
let message = record_disconnected(
&inner,
format!("exec-server notification handling failed: {err}"),
@@ -449,6 +457,10 @@ impl ExecServerClient {
}
RpcClientEvent::Disconnected { reason } => {
if let Some(inner) = weak.upgrade() {
warn!(
reason = reason.as_deref().unwrap_or("unknown"),
"exec-server transport disconnected"
);
let message = record_disconnected(
&inner,
disconnected_message(reason.as_deref()),
@@ -508,6 +520,7 @@ impl ExecServerClient {
// A call can race with disconnect after the preflight
// check. Only the reader task drains sessions so queued
// process notifications stay ordered before disconnect.
warn!(method, "exec-server request observed a closed transport");
let message = disconnected_message(/*reason*/ None);
let message = record_disconnected(&self.inner, message);
Err(ExecServerError::Disconnected(message))

View File

@@ -5,6 +5,7 @@ use tokio::process::Command;
use tokio::time::timeout;
use tokio_tungstenite::connect_async;
use tracing::debug;
use tracing::info;
use tracing::warn;
use codex_utils_rustls_provider::ensure_rustls_crypto_provider;
@@ -58,17 +59,45 @@ impl ExecServerClient {
) -> Result<Self, ExecServerError> {
ensure_rustls_crypto_provider();
let websocket_url = args.websocket_url.clone();
let redacted_websocket_url = url_without_query(&websocket_url);
let connect_timeout = args.connect_timeout;
let (stream, _) = timeout(connect_timeout, connect_async(websocket_url.as_str()))
.await
.map_err(|_| ExecServerError::WebSocketConnectTimeout {
url: websocket_url.clone(),
timeout: connect_timeout,
})?
.map_err(|source| ExecServerError::WebSocketConnect {
url: websocket_url.clone(),
source,
})?;
info!(
websocket_url = %redacted_websocket_url,
connect_timeout_ms = connect_timeout.as_millis(),
"connecting exec-server websocket"
);
let (stream, _) =
match timeout(connect_timeout, connect_async(websocket_url.as_str())).await {
Ok(Ok(websocket)) => {
info!(
websocket_url = %redacted_websocket_url,
"exec-server websocket transport connected"
);
websocket
}
Ok(Err(source)) => {
warn!(
websocket_url = %redacted_websocket_url,
error = %source,
"failed to connect exec-server websocket transport"
);
return Err(ExecServerError::WebSocketConnect {
url: websocket_url.clone(),
source,
});
}
Err(_) => {
warn!(
websocket_url = %redacted_websocket_url,
connect_timeout_ms = connect_timeout.as_millis(),
"timed out connecting exec-server websocket transport"
);
return Err(ExecServerError::WebSocketConnectTimeout {
url: websocket_url.clone(),
timeout: connect_timeout,
});
}
};
let connection_label = format!("exec-server websocket {websocket_url}");
let connection = if is_rendezvous_harness_url(&websocket_url) {
@@ -76,7 +105,24 @@ impl ExecServerClient {
} else {
JsonRpcConnection::from_websocket(stream, connection_label)
};
Self::connect(connection, args.into()).await
match Self::connect(connection, args.into()).await {
Ok(client) => {
info!(
websocket_url = %redacted_websocket_url,
session_id = ?client.session_id(),
"exec-server websocket initialized"
);
Ok(client)
}
Err(err) => {
warn!(
websocket_url = %redacted_websocket_url,
error = %err,
"failed to initialize exec-server websocket"
);
Err(err)
}
}
}
pub(crate) async fn connect_stdio_command(
@@ -130,6 +176,10 @@ fn is_rendezvous_harness_url(websocket_url: &str) -> bool {
.any(|(key, value)| key == "role" && value == "harness")
}
fn url_without_query(url: &str) -> &str {
url.split_once('?').map_or(url, |(prefix, _)| prefix)
}
fn stdio_command_process(stdio_command: &StdioExecServerCommand) -> Command {
let mut command = Command::new(&stdio_command.program);
command.args(&stdio_command.args);