mirror of
https://github.com/openai/codex.git
synced 2026-06-02 19:31:59 +00:00
Address exec-server OTEL review findings
This commit is contained in:
@@ -1508,10 +1508,10 @@ async fn run_exec_server_command(
|
||||
codex_self_exe,
|
||||
arg0_paths.codex_linux_sandbox_exe.clone(),
|
||||
)?;
|
||||
let config = load_exec_server_config(root_config_overrides, strict_config).await?;
|
||||
let (_otel, telemetry) = init_exec_server_tracing(&config);
|
||||
let exec_server_span = exec_server_root_span();
|
||||
if let Some(base_url) = cmd.remote {
|
||||
let config = load_exec_server_config(root_config_overrides, strict_config).await?;
|
||||
let (_otel, telemetry) = init_exec_server_tracing(Some(&config));
|
||||
let exec_server_span = exec_server_root_span();
|
||||
let environment_id = cmd
|
||||
.environment_id
|
||||
.ok_or_else(|| anyhow::anyhow!("--environment-id is required when --remote is set"))?;
|
||||
@@ -1533,6 +1533,15 @@ async fn run_exec_server_command(
|
||||
.await?;
|
||||
Ok(())
|
||||
} else {
|
||||
let config = if strict_config {
|
||||
Some(load_exec_server_config(root_config_overrides, strict_config).await?)
|
||||
} else {
|
||||
load_exec_server_config(root_config_overrides, /*strict_config*/ false)
|
||||
.await
|
||||
.ok()
|
||||
};
|
||||
let (_otel, telemetry) = init_exec_server_tracing(config.as_ref());
|
||||
let exec_server_span = exec_server_root_span();
|
||||
exec_server_span.record("mode", "local");
|
||||
let listen_url = cmd
|
||||
.listen
|
||||
@@ -1546,29 +1555,31 @@ async fn run_exec_server_command(
|
||||
}
|
||||
|
||||
fn init_exec_server_tracing(
|
||||
config: &codex_core::config::Config,
|
||||
config: Option<&codex_core::config::Config>,
|
||||
) -> (impl Send + Sync, codex_exec_server::ExecServerTelemetry) {
|
||||
let fmt_layer = tracing_subscriber::fmt::layer()
|
||||
.with_writer(std::io::stderr)
|
||||
.with_filter(exec_server_stderr_env_filter());
|
||||
let otel = match std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
|
||||
codex_core::otel_init::build_provider(
|
||||
config,
|
||||
env!("CARGO_PKG_VERSION"),
|
||||
Some(EXEC_SERVER_OTEL_SERVICE_NAME),
|
||||
EXEC_SERVER_DEFAULT_ANALYTICS_ENABLED,
|
||||
)
|
||||
})) {
|
||||
Ok(Ok(otel)) => otel,
|
||||
Ok(Err(err)) => {
|
||||
eprintln!("Could not create otel exporter: {err}");
|
||||
None
|
||||
let otel = config.and_then(|config| {
|
||||
match std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
|
||||
codex_core::otel_init::build_provider(
|
||||
config,
|
||||
env!("CARGO_PKG_VERSION"),
|
||||
Some(EXEC_SERVER_OTEL_SERVICE_NAME),
|
||||
EXEC_SERVER_DEFAULT_ANALYTICS_ENABLED,
|
||||
)
|
||||
})) {
|
||||
Ok(Ok(otel)) => otel,
|
||||
Ok(Err(err)) => {
|
||||
eprintln!("Could not create otel exporter: {err}");
|
||||
None
|
||||
}
|
||||
Err(_) => {
|
||||
eprintln!("Could not create otel exporter: panicked during initialization");
|
||||
None
|
||||
}
|
||||
}
|
||||
Err(_) => {
|
||||
eprintln!("Could not create otel exporter: panicked during initialization");
|
||||
None
|
||||
}
|
||||
};
|
||||
});
|
||||
codex_core::otel_init::record_process_start(otel.as_ref(), EXEC_SERVER_OTEL_SERVICE_NAME);
|
||||
|
||||
let otel_logger_layer = otel.as_ref().and_then(|otel| otel.logger_layer());
|
||||
|
||||
@@ -1,6 +1,15 @@
|
||||
use std::io::Read as _;
|
||||
use std::io::Write as _;
|
||||
use std::net::TcpListener;
|
||||
use std::path::Path;
|
||||
use std::process::Stdio;
|
||||
use std::sync::mpsc;
|
||||
use std::thread;
|
||||
use std::time::Duration;
|
||||
use std::time::Instant;
|
||||
|
||||
use anyhow::Result;
|
||||
use predicates::prelude::PredicateBooleanExt;
|
||||
use predicates::str::contains;
|
||||
use tempfile::TempDir;
|
||||
|
||||
@@ -33,3 +42,234 @@ foo = "bar"
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn local_exec_server_ignores_invalid_config_without_strict_config() -> Result<()> {
|
||||
let codex_home = TempDir::new()?;
|
||||
std::fs::write(codex_home.path().join("config.toml"), "not valid toml = [")?;
|
||||
|
||||
let mut cmd = codex_command(codex_home.path())?;
|
||||
cmd.args(["exec-server", "--listen", "stdio"])
|
||||
.assert()
|
||||
.success()
|
||||
.stderr(contains("not valid toml").not());
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn local_exec_server_exports_real_otel_metrics() -> Result<()> {
|
||||
let collector = TestCollector::start()?;
|
||||
let codex_home = TempDir::new()?;
|
||||
std::fs::write(
|
||||
codex_home.path().join("config.toml"),
|
||||
format!(
|
||||
r#"
|
||||
[analytics]
|
||||
enabled = true
|
||||
|
||||
[otel]
|
||||
environment = "test"
|
||||
metrics_exporter = {{ otlp-http = {{ endpoint = "{}/v1/metrics", protocol = "json" }} }}
|
||||
"#,
|
||||
collector.base_url
|
||||
),
|
||||
)?;
|
||||
|
||||
let mut cmd = codex_command(codex_home.path())?;
|
||||
cmd.args(["exec-server", "--listen", "stdio"])
|
||||
.write_stdin(
|
||||
r#"{"id":1,"method":"initialize","params":{"clientName":"otel-test","resumeSessionId":null}}"#,
|
||||
)
|
||||
.assert()
|
||||
.success();
|
||||
|
||||
let requests = collector.finish()?;
|
||||
let metrics = requests
|
||||
.iter()
|
||||
.filter(|request| request.path == "/v1/metrics")
|
||||
.map(|request| request.body.as_str())
|
||||
.collect::<Vec<_>>()
|
||||
.join("\n");
|
||||
assert!(
|
||||
metrics.contains("exec_server.connections.active"),
|
||||
"{metrics}"
|
||||
);
|
||||
assert!(metrics.contains("exec_server.requests.total"), "{metrics}");
|
||||
assert!(metrics.contains("initialize"), "{metrics}");
|
||||
assert!(
|
||||
metrics.contains("success") || metrics.contains("disconnected"),
|
||||
"{metrics}"
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn remote_exec_server_preserves_websocket_error_in_stderr() -> Result<()> {
|
||||
let failed_websocket_listener = TcpListener::bind("127.0.0.1:0")?;
|
||||
let failed_websocket_url = format!("ws://{}", failed_websocket_listener.local_addr()?);
|
||||
drop(failed_websocket_listener);
|
||||
|
||||
let registry = TestRegistry::start(&failed_websocket_url)?;
|
||||
let codex_home = TempDir::new()?;
|
||||
let mut cmd = std::process::Command::new(codex_utils_cargo_bin::cargo_bin("codex")?);
|
||||
cmd.env("CODEX_HOME", codex_home.path())
|
||||
.env("CODEX_API_KEY", "test-key")
|
||||
.env("RUST_LOG", "codex_exec_server=warn")
|
||||
.args([
|
||||
"exec-server",
|
||||
"--remote",
|
||||
®istry.base_url,
|
||||
"--environment-id",
|
||||
"env-test",
|
||||
])
|
||||
.stdout(Stdio::piped())
|
||||
.stderr(Stdio::piped());
|
||||
|
||||
let mut child = cmd.spawn()?;
|
||||
thread::sleep(Duration::from_millis(500));
|
||||
let _ = child.kill();
|
||||
let output = child.wait_with_output()?;
|
||||
registry.finish()?;
|
||||
|
||||
let stderr = String::from_utf8_lossy(&output.stderr);
|
||||
assert!(
|
||||
stderr.contains("failed to connect remote exec-server websocket"),
|
||||
"{stderr}"
|
||||
);
|
||||
assert!(stderr.contains("Connection refused"), "{stderr}");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
struct CapturedRequest {
|
||||
path: String,
|
||||
body: String,
|
||||
}
|
||||
|
||||
struct TestRegistry {
|
||||
base_url: String,
|
||||
server: thread::JoinHandle<Result<()>>,
|
||||
}
|
||||
|
||||
impl TestRegistry {
|
||||
fn start(websocket_url: &str) -> Result<Self> {
|
||||
let listener = TcpListener::bind("127.0.0.1:0")?;
|
||||
let addr = listener.local_addr()?;
|
||||
let websocket_url = websocket_url.to_string();
|
||||
let server = thread::spawn(move || {
|
||||
let (mut stream, _) = listener.accept()?;
|
||||
let _request = read_http_request(&mut stream)?;
|
||||
let body = format!(r#"{{"environment_id":"env-test","url":"{websocket_url}"}}"#);
|
||||
let response = format!(
|
||||
"HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{body}",
|
||||
body.len()
|
||||
);
|
||||
stream.write_all(response.as_bytes())?;
|
||||
Ok(())
|
||||
});
|
||||
Ok(Self {
|
||||
base_url: format!("http://{addr}"),
|
||||
server,
|
||||
})
|
||||
}
|
||||
|
||||
fn finish(self) -> Result<()> {
|
||||
self.server
|
||||
.join()
|
||||
.map_err(|_| anyhow::anyhow!("registry thread panicked"))?
|
||||
}
|
||||
}
|
||||
|
||||
struct TestCollector {
|
||||
base_url: String,
|
||||
requests: mpsc::Receiver<Vec<CapturedRequest>>,
|
||||
server: thread::JoinHandle<()>,
|
||||
}
|
||||
|
||||
impl TestCollector {
|
||||
fn start() -> Result<Self> {
|
||||
let listener = TcpListener::bind("127.0.0.1:0")?;
|
||||
let addr = listener.local_addr()?;
|
||||
listener.set_nonblocking(true)?;
|
||||
let (tx, requests) = mpsc::channel();
|
||||
let server = thread::spawn(move || {
|
||||
let deadline = Instant::now() + Duration::from_secs(3);
|
||||
let mut captured = Vec::new();
|
||||
while Instant::now() < deadline {
|
||||
match listener.accept() {
|
||||
Ok((mut stream, _)) => {
|
||||
if let Ok(request) = read_http_request(&mut stream) {
|
||||
captured.push(request);
|
||||
}
|
||||
let _ = stream.write_all(
|
||||
b"HTTP/1.1 202 Accepted\r\nContent-Length: 0\r\nConnection: close\r\n\r\n",
|
||||
);
|
||||
}
|
||||
Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => {
|
||||
thread::sleep(Duration::from_millis(10));
|
||||
}
|
||||
Err(_) => break,
|
||||
}
|
||||
}
|
||||
let _ = tx.send(captured);
|
||||
});
|
||||
Ok(Self {
|
||||
base_url: format!("http://{addr}"),
|
||||
requests,
|
||||
server,
|
||||
})
|
||||
}
|
||||
|
||||
fn finish(self) -> Result<Vec<CapturedRequest>> {
|
||||
self.server
|
||||
.join()
|
||||
.map_err(|_| anyhow::anyhow!("collector thread panicked"))?;
|
||||
Ok(self.requests.recv_timeout(Duration::from_secs(1))?)
|
||||
}
|
||||
}
|
||||
|
||||
fn read_http_request(stream: &mut std::net::TcpStream) -> std::io::Result<CapturedRequest> {
|
||||
stream.set_read_timeout(Some(Duration::from_secs(2)))?;
|
||||
let mut bytes = Vec::new();
|
||||
let mut scratch = [0_u8; 8192];
|
||||
let header_end = loop {
|
||||
let read = stream.read(&mut scratch)?;
|
||||
if read == 0 {
|
||||
return Err(std::io::Error::new(
|
||||
std::io::ErrorKind::UnexpectedEof,
|
||||
"request closed before headers",
|
||||
));
|
||||
}
|
||||
bytes.extend_from_slice(&scratch[..read]);
|
||||
if let Some(header_end) = bytes.windows(4).position(|window| window == b"\r\n\r\n") {
|
||||
break header_end;
|
||||
}
|
||||
};
|
||||
let headers = String::from_utf8_lossy(&bytes[..header_end]);
|
||||
let mut lines = headers.split("\r\n");
|
||||
let path = lines
|
||||
.next()
|
||||
.and_then(|line| line.split_whitespace().nth(1))
|
||||
.unwrap_or_default()
|
||||
.to_string();
|
||||
let content_length = lines
|
||||
.filter_map(|line| line.split_once(':'))
|
||||
.find(|(key, _)| key.eq_ignore_ascii_case("content-length"))
|
||||
.and_then(|(_, value)| value.trim().parse::<usize>().ok())
|
||||
.unwrap_or_default();
|
||||
let mut body = bytes[header_end + 4..].to_vec();
|
||||
while body.len() < content_length {
|
||||
let read = stream.read(&mut scratch)?;
|
||||
if read == 0 {
|
||||
break;
|
||||
}
|
||||
body.extend_from_slice(&scratch[..read]);
|
||||
}
|
||||
body.truncate(content_length);
|
||||
Ok(CapturedRequest {
|
||||
path,
|
||||
body: String::from_utf8_lossy(&body).into_owned(),
|
||||
})
|
||||
}
|
||||
|
||||
@@ -18,91 +18,10 @@ use crate::server::ConnectionProcessor;
|
||||
|
||||
const ERROR_BODY_PREVIEW_BYTES: usize = 4096;
|
||||
|
||||
macro_rules! remote_event {
|
||||
($log:ident, $level:ident, $event_name:literal, $message:literal) => {{
|
||||
$log!($message);
|
||||
tracing::event!(
|
||||
target: "codex_otel.log_only",
|
||||
tracing::Level::$level,
|
||||
event.name = $event_name,
|
||||
$message
|
||||
);
|
||||
tracing::event!(
|
||||
target: "codex_otel.trace_safe",
|
||||
tracing::Level::$level,
|
||||
event.name = $event_name,
|
||||
$message
|
||||
);
|
||||
}};
|
||||
($log:ident, $level:ident, $attempt:expr, $event_name:literal, $message:literal) => {{
|
||||
$log!(attempt = $attempt, $message);
|
||||
tracing::event!(
|
||||
target: "codex_otel.log_only",
|
||||
tracing::Level::$level,
|
||||
event.name = $event_name,
|
||||
attempt = $attempt,
|
||||
$message
|
||||
);
|
||||
tracing::event!(
|
||||
target: "codex_otel.trace_safe",
|
||||
tracing::Level::$level,
|
||||
event.name = $event_name,
|
||||
attempt = $attempt,
|
||||
$message
|
||||
);
|
||||
}};
|
||||
($log:ident, $level:ident, with_error $error:expr, $event_name:literal, $message:literal) => {{
|
||||
let _ = &$error;
|
||||
$log!($message);
|
||||
tracing::event!(
|
||||
target: "codex_otel.log_only",
|
||||
tracing::Level::$level,
|
||||
event.name = $event_name,
|
||||
$message
|
||||
);
|
||||
tracing::event!(
|
||||
target: "codex_otel.trace_safe",
|
||||
tracing::Level::$level,
|
||||
event.name = $event_name,
|
||||
$message
|
||||
);
|
||||
}};
|
||||
($log:ident, $level:ident, $attempt:expr, with_error $error:expr, $event_name:literal, $message:literal) => {{
|
||||
let _ = &$error;
|
||||
$log!(attempt = $attempt, $message);
|
||||
tracing::event!(
|
||||
target: "codex_otel.log_only",
|
||||
tracing::Level::$level,
|
||||
event.name = $event_name,
|
||||
attempt = $attempt,
|
||||
$message
|
||||
);
|
||||
tracing::event!(
|
||||
target: "codex_otel.trace_safe",
|
||||
tracing::Level::$level,
|
||||
event.name = $event_name,
|
||||
attempt = $attempt,
|
||||
$message
|
||||
);
|
||||
}};
|
||||
($log:ident, $level:ident, $attempt:expr, with_backoff_ms $backoff_ms:expr, $event_name:literal, $message:literal) => {{
|
||||
$log!(attempt = $attempt, backoff_ms = $backoff_ms, $message);
|
||||
tracing::event!(
|
||||
target: "codex_otel.log_only",
|
||||
tracing::Level::$level,
|
||||
event.name = $event_name,
|
||||
attempt = $attempt,
|
||||
backoff_ms = $backoff_ms,
|
||||
$message
|
||||
);
|
||||
tracing::event!(
|
||||
target: "codex_otel.trace_safe",
|
||||
tracing::Level::$level,
|
||||
event.name = $event_name,
|
||||
attempt = $attempt,
|
||||
backoff_ms = $backoff_ms,
|
||||
$message
|
||||
);
|
||||
macro_rules! emit_remote_otel_event {
|
||||
($level:ident, $($fields:tt)*) => {{
|
||||
tracing::event!(target: "codex_otel.log_only", tracing::Level::$level, $($fields)*);
|
||||
tracing::event!(target: "codex_otel.trace_safe", tracing::Level::$level, $($fields)*);
|
||||
}};
|
||||
}
|
||||
|
||||
@@ -237,53 +156,61 @@ pub async fn run_remote_environment(
|
||||
let response = match client.register_environment(&config.environment_id).await {
|
||||
Ok(response) => response,
|
||||
Err(err) => {
|
||||
remote_event!(
|
||||
warn,
|
||||
warn!(error = %err, "failed to register remote exec-server environment");
|
||||
emit_remote_otel_event!(
|
||||
WARN,
|
||||
with_error err,
|
||||
"codex.exec_server.remote_environment_registration_failed",
|
||||
event.name = "codex.exec_server.remote_environment_registration_failed",
|
||||
"failed to register remote exec-server environment"
|
||||
);
|
||||
return Err(err);
|
||||
}
|
||||
};
|
||||
eprintln!("codex exec-server remote environment registered");
|
||||
remote_event!(
|
||||
info,
|
||||
info!("codex exec-server remote environment registered");
|
||||
emit_remote_otel_event!(
|
||||
INFO,
|
||||
"codex.exec_server.remote_environment_registered",
|
||||
event.name = "codex.exec_server.remote_environment_registered",
|
||||
"codex exec-server remote environment registered"
|
||||
);
|
||||
|
||||
match connect_async(response.url.as_str()).await {
|
||||
Ok((websocket, _)) => {
|
||||
connection_attempt += 1;
|
||||
remote_event!(
|
||||
info,
|
||||
info!(
|
||||
attempt = connection_attempt,
|
||||
"connected remote exec-server websocket"
|
||||
);
|
||||
emit_remote_otel_event!(
|
||||
INFO,
|
||||
connection_attempt,
|
||||
"codex.exec_server.remote_websocket_connected",
|
||||
event.name = "codex.exec_server.remote_websocket_connected",
|
||||
attempt = connection_attempt,
|
||||
"connected remote exec-server websocket"
|
||||
);
|
||||
backoff = Duration::from_secs(1);
|
||||
run_multiplexed_environment(websocket, processor.clone()).await;
|
||||
config.telemetry.relay_reconnect("disconnected");
|
||||
remote_event!(
|
||||
warn,
|
||||
warn!(
|
||||
attempt = connection_attempt,
|
||||
"remote exec-server websocket disconnected; retrying"
|
||||
);
|
||||
emit_remote_otel_event!(
|
||||
WARN,
|
||||
connection_attempt,
|
||||
"codex.exec_server.remote_websocket_disconnected",
|
||||
event.name = "codex.exec_server.remote_websocket_disconnected",
|
||||
attempt = connection_attempt,
|
||||
"remote exec-server websocket disconnected; retrying"
|
||||
);
|
||||
}
|
||||
Err(err) => {
|
||||
connection_attempt += 1;
|
||||
remote_event!(
|
||||
warn,
|
||||
warn!(
|
||||
attempt = connection_attempt,
|
||||
error = %err,
|
||||
"failed to connect remote exec-server websocket"
|
||||
);
|
||||
emit_remote_otel_event!(
|
||||
WARN,
|
||||
connection_attempt,
|
||||
with_error err,
|
||||
"codex.exec_server.remote_websocket_connect_failed",
|
||||
event.name = "codex.exec_server.remote_websocket_connect_failed",
|
||||
attempt = connection_attempt,
|
||||
"failed to connect remote exec-server websocket"
|
||||
);
|
||||
config.telemetry.relay_reconnect("connect_failed");
|
||||
@@ -291,12 +218,15 @@ pub async fn run_remote_environment(
|
||||
}
|
||||
|
||||
let backoff_ms = backoff.as_millis();
|
||||
remote_event!(
|
||||
info,
|
||||
info!(
|
||||
attempt = connection_attempt,
|
||||
backoff_ms, "retrying remote exec-server websocket"
|
||||
);
|
||||
emit_remote_otel_event!(
|
||||
INFO,
|
||||
connection_attempt,
|
||||
with_backoff_ms backoff_ms,
|
||||
"codex.exec_server.remote_websocket_retrying",
|
||||
event.name = "codex.exec_server.remote_websocket_retrying",
|
||||
attempt = connection_attempt,
|
||||
backoff_ms,
|
||||
"retrying remote exec-server websocket"
|
||||
);
|
||||
sleep(backoff).await;
|
||||
|
||||
@@ -6,6 +6,9 @@ use codex_app_server_protocol::JSONRPCNotification;
|
||||
use codex_app_server_protocol::JSONRPCRequest;
|
||||
use codex_app_server_protocol::JSONRPCResponse;
|
||||
use codex_app_server_protocol::RequestId;
|
||||
use codex_otel::MetricsConfig;
|
||||
use opentelemetry_sdk::metrics::InMemoryMetricExporter;
|
||||
use opentelemetry_sdk::metrics::data::ScopeMetrics;
|
||||
use pretty_assertions::assert_eq;
|
||||
use tokio::io::AsyncBufReadExt;
|
||||
use tokio::io::AsyncWriteExt;
|
||||
@@ -112,6 +115,76 @@ async fn stdio_listen_transport_serves_initialize() {
|
||||
.expect("stdio transport should not fail");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn stdio_listen_transport_emits_connection_and_request_metrics() {
|
||||
let exporter = InMemoryMetricExporter::default();
|
||||
let metrics = codex_otel::MetricsClient::new(MetricsConfig::in_memory(
|
||||
"test",
|
||||
"codex-exec-server",
|
||||
env!("CARGO_PKG_VERSION"),
|
||||
exporter.clone(),
|
||||
))
|
||||
.expect("metrics");
|
||||
let telemetry = crate::ExecServerTelemetry::new(Some(metrics.clone()));
|
||||
let (mut client_writer, server_reader) = duplex(1 << 20);
|
||||
let (server_writer, client_reader) = duplex(1 << 20);
|
||||
let server_task = tokio::spawn(run_stdio_connection_with_io(
|
||||
server_reader,
|
||||
server_writer,
|
||||
test_runtime_paths(),
|
||||
telemetry,
|
||||
));
|
||||
let mut client_lines = BufReader::new(client_reader).lines();
|
||||
|
||||
write_jsonrpc_line(
|
||||
&mut client_writer,
|
||||
&JSONRPCMessage::Request(JSONRPCRequest {
|
||||
id: RequestId::Integer(1),
|
||||
method: INITIALIZE_METHOD.to_string(),
|
||||
params: Some(
|
||||
serde_json::to_value(InitializeParams {
|
||||
client_name: "exec-server-metrics-test".to_string(),
|
||||
resume_session_id: None,
|
||||
})
|
||||
.expect("initialize params should serialize"),
|
||||
),
|
||||
trace: None,
|
||||
}),
|
||||
)
|
||||
.await;
|
||||
let _response = timeout(Duration::from_secs(1), client_lines.next_line())
|
||||
.await
|
||||
.expect("initialize response should arrive")
|
||||
.expect("initialize response read should succeed")
|
||||
.expect("initialize response should be present");
|
||||
|
||||
drop(client_writer);
|
||||
drop(client_lines);
|
||||
timeout(Duration::from_secs(1), server_task)
|
||||
.await
|
||||
.expect("stdio transport should finish after client disconnect")
|
||||
.expect("stdio transport task should join")
|
||||
.expect("stdio transport should not fail");
|
||||
metrics.shutdown().expect("shutdown metrics");
|
||||
|
||||
let metric_names = exporter
|
||||
.get_finished_metrics()
|
||||
.expect("finished metrics")
|
||||
.into_iter()
|
||||
.flat_map(|metrics| {
|
||||
metrics
|
||||
.scope_metrics()
|
||||
.flat_map(ScopeMetrics::metrics)
|
||||
.map(|metric| metric.name().to_string())
|
||||
.collect::<Vec<_>>()
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
assert!(metric_names.contains(&"exec_server.connections.active".to_string()));
|
||||
assert!(metric_names.contains(&"exec_server.connections.total".to_string()));
|
||||
assert!(metric_names.contains(&"exec_server.requests.total".to_string()));
|
||||
assert!(metric_names.contains(&"exec_server.request.duration".to_string()));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_listen_url_accepts_websocket_url() {
|
||||
let transport =
|
||||
|
||||
Reference in New Issue
Block a user