Compare commits

...

4 Commits

Author SHA1 Message Date
starr-openai
519997aa20 Fix exec-server argument-comment lint
Co-authored-by: Codex <noreply@openai.com>
2026-04-30 19:38:01 -07:00
starr-openai
08bbd154ed Fix exec-server clippy failures
Co-authored-by: Codex <noreply@openai.com>
2026-04-30 19:25:56 -07:00
starr-openai
0e64d282d2 Fix exec-server Bazel status build
Co-authored-by: Codex <noreply@openai.com>
2026-04-30 19:19:23 -07:00
starr-openai
dfe19d244a Add exec-server status endpoints
Expose lightweight health, readiness, JSON status, and Prometheus metrics endpoints from the existing exec-server listener.

Keep status data aggregate-only so the endpoint does not expose commands, paths, environment variables, session IDs, or process IDs.

Co-authored-by: Codex <noreply@openai.com>
2026-04-30 19:13:00 -07:00
13 changed files with 889 additions and 54 deletions

View File

@@ -3,6 +3,7 @@ load("//:defs.bzl", "codex_rust_crate")
codex_rust_crate(
name = "exec-server",
crate_name = "codex_exec_server",
deps_extra = ["@crates//:axum"],
# Keep the crate's integration tests single-threaded under Bazel because
# they install process-global test-binary dispatch state, and the remote
# exec-server cases already rely on serialization around the full CLI path.

View File

@@ -13,6 +13,12 @@ workspace = true
[dependencies]
arc-swap = { workspace = true }
async-trait = { workspace = true }
axum = { workspace = true, default-features = false, features = [
"http1",
"json",
"tokio",
"ws",
] }
base64 = { workspace = true }
bytes = { workspace = true }
codex-app-server-protocol = { workspace = true }

View File

@@ -27,6 +27,22 @@ Wire framing:
- websocket: one JSON-RPC message per websocket text frame
## Status endpoints
When listening on `ws://IP:PORT`, the same TCP listener also serves a small
HTTP operations surface:
- `GET /healthz`: returns `200 OK` with `ok\n` when the process can answer.
- `GET /readyz`: returns `200 OK` with `ready\n` when required helper paths are
usable, otherwise `503 Service Unavailable` with `not ready\n`.
- `GET /status`: returns a JSON summary with service/version, readiness,
uptime, connection/session/process/request counters, and capability flags.
The response intentionally avoids command lines, environment variables,
working directories, session ids, process ids, user names, and local paths.
- `GET /metrics`: returns low-cardinality Prometheus text metrics for uptime,
connections, sessions, processes, and JSON-RPC request totals by fixed method
name and result.
## Lifecycle
Each connection follows this sequence:

View File

@@ -1,3 +1,5 @@
use axum::extract::ws::Message as AxumWebSocketMessage;
use axum::extract::ws::WebSocket as AxumWebSocket;
use codex_app_server_protocol::JSONRPCMessage;
use futures::SinkExt;
use futures::StreamExt;
@@ -34,6 +36,139 @@ pub(crate) struct JsonRpcConnection {
}
impl JsonRpcConnection {
pub(crate) fn from_axum_websocket(stream: AxumWebSocket, connection_label: String) -> Self {
let (outgoing_tx, mut outgoing_rx) = mpsc::channel(CHANNEL_CAPACITY);
let (incoming_tx, incoming_rx) = mpsc::channel(CHANNEL_CAPACITY);
let (disconnected_tx, disconnected_rx) = watch::channel(false);
let (mut websocket_writer, mut websocket_reader) = stream.split();
let reader_label = connection_label.clone();
let incoming_tx_for_reader = incoming_tx.clone();
let disconnected_tx_for_reader = disconnected_tx.clone();
let reader_task = tokio::spawn(async move {
loop {
match websocket_reader.next().await {
Some(Ok(AxumWebSocketMessage::Text(text))) => {
match serde_json::from_str::<JSONRPCMessage>(text.as_ref()) {
Ok(message) => {
if incoming_tx_for_reader
.send(JsonRpcConnectionEvent::Message(message))
.await
.is_err()
{
break;
}
}
Err(err) => {
send_malformed_message(
&incoming_tx_for_reader,
Some(format!(
"failed to parse websocket JSON-RPC message from {reader_label}: {err}"
)),
)
.await;
}
}
}
Some(Ok(AxumWebSocketMessage::Binary(bytes))) => {
match serde_json::from_slice::<JSONRPCMessage>(bytes.as_ref()) {
Ok(message) => {
if incoming_tx_for_reader
.send(JsonRpcConnectionEvent::Message(message))
.await
.is_err()
{
break;
}
}
Err(err) => {
send_malformed_message(
&incoming_tx_for_reader,
Some(format!(
"failed to parse websocket JSON-RPC message from {reader_label}: {err}"
)),
)
.await;
}
}
}
Some(Ok(AxumWebSocketMessage::Close(_))) => {
send_disconnected(
&incoming_tx_for_reader,
&disconnected_tx_for_reader,
/*reason*/ None,
)
.await;
break;
}
Some(Ok(AxumWebSocketMessage::Ping(_)))
| Some(Ok(AxumWebSocketMessage::Pong(_))) => {}
Some(Err(err)) => {
send_disconnected(
&incoming_tx_for_reader,
&disconnected_tx_for_reader,
Some(format!(
"failed to read websocket JSON-RPC message from {reader_label}: {err}"
)),
)
.await;
break;
}
None => {
send_disconnected(
&incoming_tx_for_reader,
&disconnected_tx_for_reader,
/*reason*/ None,
)
.await;
break;
}
}
}
});
let writer_task = tokio::spawn(async move {
while let Some(message) = outgoing_rx.recv().await {
match serialize_jsonrpc_message(&message) {
Ok(encoded) => {
if let Err(err) = websocket_writer
.send(AxumWebSocketMessage::Text(encoded.into()))
.await
{
send_disconnected(
&incoming_tx,
&disconnected_tx,
Some(format!(
"failed to write websocket JSON-RPC message to {connection_label}: {err}"
)),
)
.await;
break;
}
}
Err(err) => {
send_disconnected(
&incoming_tx,
&disconnected_tx,
Some(format!(
"failed to serialize JSON-RPC message for {connection_label}: {err}"
)),
)
.await;
break;
}
}
}
});
Self {
outgoing_tx,
incoming_rx,
disconnected_rx,
task_handles: vec![reader_task, writer_task],
}
}
#[cfg(test)]
pub(crate) fn from_stdio<R, W>(reader: R, writer: W, connection_label: String) -> Self
where

View File

@@ -45,6 +45,8 @@ use crate::rpc::RpcServerOutboundMessage;
use crate::rpc::internal_error;
use crate::rpc::invalid_params;
use crate::rpc::invalid_request;
use crate::server::status::ExecServerStatusState;
use crate::server::status::ProcessStatusSnapshot;
const RETAINED_OUTPUT_BYTES_PER_PROCESS: usize = 1024 * 1024;
const NOTIFICATION_CHANNEL_CAPACITY: usize = 256;
@@ -84,6 +86,7 @@ enum ProcessEntry {
struct Inner {
notifications: std::sync::RwLock<Option<RpcNotificationSender>>,
processes: Mutex<HashMap<ProcessId, ProcessEntry>>,
status: Arc<ExecServerStatusState>,
}
#[derive(Clone)]
@@ -103,16 +106,34 @@ impl Default for LocalProcess {
let (outgoing_tx, mut outgoing_rx) =
mpsc::channel::<RpcServerOutboundMessage>(NOTIFICATION_CHANNEL_CAPACITY);
tokio::spawn(async move { while outgoing_rx.recv().await.is_some() {} });
Self::new(RpcNotificationSender::new(outgoing_tx))
let current_exe = match std::env::current_exe() {
Ok(current_exe) => current_exe,
Err(err) => panic!("current executable should resolve: {err}"),
};
let runtime_paths = match crate::ExecServerRuntimePaths::new(
current_exe,
/*codex_linux_sandbox_exe*/ None,
) {
Ok(runtime_paths) => runtime_paths,
Err(err) => panic!("current executable should be absolute: {err}"),
};
Self::new(
RpcNotificationSender::new(outgoing_tx),
ExecServerStatusState::new(runtime_paths),
)
}
}
impl LocalProcess {
pub(crate) fn new(notifications: RpcNotificationSender) -> Self {
pub(crate) fn new(
notifications: RpcNotificationSender,
status: Arc<ExecServerStatusState>,
) -> Self {
Self {
inner: Arc::new(Inner {
notifications: std::sync::RwLock::new(Some(notifications)),
processes: Mutex::new(HashMap::new()),
status,
}),
}
}
@@ -142,6 +163,21 @@ impl LocalProcess {
*notification_sender = notifications;
}
pub(crate) async fn status_snapshot(&self) -> ProcessStatusSnapshot {
let processes = self.inner.processes.lock().await;
let mut snapshot = ProcessStatusSnapshot::default();
for process in processes.values() {
match process {
ProcessEntry::Starting => snapshot.starting += 1,
ProcessEntry::Running(process) if process.exit_code.is_some() => {
snapshot.exited_retained += 1;
}
ProcessEntry::Running(_) => snapshot.running += 1,
}
}
snapshot
}
async fn start_process(
&self,
params: ExecParams,
@@ -229,6 +265,7 @@ impl LocalProcess {
})),
);
}
self.inner.status.process_started();
tokio::spawn(stream_output(
process_id.clone(),

View File

@@ -4,6 +4,7 @@ mod process_handler;
mod processor;
mod registry;
mod session_registry;
pub(crate) mod status;
mod transport;
pub(crate) use handler::ExecServerHandler;

View File

@@ -77,7 +77,7 @@ fn test_runtime_paths() -> ExecServerRuntimePaths {
async fn initialized_handler() -> Arc<ExecServerHandler> {
let (outgoing_tx, _outgoing_rx) = mpsc::channel(16);
let registry = SessionRegistry::new();
let registry = SessionRegistry::new_for_tests();
let handler = Arc::new(ExecServerHandler::new(
registry,
RpcNotificationSender::new(outgoing_tx),
@@ -155,7 +155,7 @@ async fn terminate_reports_false_after_process_exit() {
#[tokio::test]
async fn long_poll_read_fails_after_session_resume() {
let (first_tx, _first_rx) = mpsc::channel(16);
let registry = SessionRegistry::new();
let registry = SessionRegistry::new_for_tests();
let first_handler = Arc::new(ExecServerHandler::new(
Arc::clone(&registry),
RpcNotificationSender::new(first_tx),
@@ -228,7 +228,7 @@ async fn long_poll_read_fails_after_session_resume() {
#[tokio::test]
async fn active_session_resume_is_rejected() {
let (first_tx, _first_rx) = mpsc::channel(16);
let registry = SessionRegistry::new();
let registry = SessionRegistry::new_for_tests();
let first_handler = Arc::new(ExecServerHandler::new(
Arc::clone(&registry),
RpcNotificationSender::new(first_tx),
@@ -272,7 +272,7 @@ async fn active_session_resume_is_rejected() {
async fn output_and_exit_are_retained_after_notification_receiver_closes() {
let (outgoing_tx, outgoing_rx) = mpsc::channel(16);
let handler = Arc::new(ExecServerHandler::new(
SessionRegistry::new(),
SessionRegistry::new_for_tests(),
RpcNotificationSender::new(outgoing_tx),
test_runtime_paths(),
));

View File

@@ -1,3 +1,5 @@
use std::sync::Arc;
use codex_app_server_protocol::JSONRPCErrorError;
use crate::local_process::LocalProcess;
@@ -10,6 +12,8 @@ use crate::protocol::TerminateResponse;
use crate::protocol::WriteParams;
use crate::protocol::WriteResponse;
use crate::rpc::RpcNotificationSender;
use crate::server::status::ExecServerStatusState;
use crate::server::status::ProcessStatusSnapshot;
#[derive(Clone)]
pub(crate) struct ProcessHandler {
@@ -17,9 +21,12 @@ pub(crate) struct ProcessHandler {
}
impl ProcessHandler {
pub(crate) fn new(notifications: RpcNotificationSender) -> Self {
pub(crate) fn new(
notifications: RpcNotificationSender,
status: Arc<ExecServerStatusState>,
) -> Self {
Self {
process: LocalProcess::new(notifications),
process: LocalProcess::new(notifications, status),
}
}
@@ -31,6 +38,10 @@ impl ProcessHandler {
self.process.set_notification_sender(notifications);
}
pub(crate) async fn status_snapshot(&self) -> ProcessStatusSnapshot {
self.process.status_snapshot().await
}
pub(crate) async fn exec(&self, params: ExecParams) -> Result<ExecResponse, JSONRPCErrorError> {
self.process.exec(params).await
}

View File

@@ -16,28 +16,51 @@ use crate::rpc::method_not_found;
use crate::server::ExecServerHandler;
use crate::server::registry::build_router;
use crate::server::session_registry::SessionRegistry;
use crate::server::status::ExecServerStatusState;
use crate::server::status::StatusResponse;
#[derive(Clone)]
pub(crate) struct ConnectionProcessor {
session_registry: Arc<SessionRegistry>,
runtime_paths: ExecServerRuntimePaths,
status: Arc<ExecServerStatusState>,
}
impl ConnectionProcessor {
pub(crate) fn new(runtime_paths: ExecServerRuntimePaths) -> Self {
pub(crate) fn new(
runtime_paths: ExecServerRuntimePaths,
status: Arc<ExecServerStatusState>,
) -> Self {
Self {
session_registry: SessionRegistry::new(),
session_registry: SessionRegistry::new(Arc::clone(&status)),
runtime_paths,
status,
}
}
pub(crate) async fn run_connection(&self, connection: JsonRpcConnection) {
self.status.connection_opened();
run_connection(
connection,
Arc::clone(&self.session_registry),
self.runtime_paths.clone(),
Arc::clone(&self.status),
)
.await;
self.status.connection_closed();
}
pub(crate) async fn status_snapshot(&self) -> StatusResponse {
let (sessions, processes) = self.session_registry.status_snapshot().await;
self.status.snapshot(sessions, processes).await
}
pub(crate) async fn readiness(&self) -> Result<(), String> {
self.status.readiness().await
}
pub(crate) fn render_prometheus_metrics(&self, snapshot: &StatusResponse) -> String {
self.status.render_prometheus_metrics(snapshot)
}
}
@@ -45,6 +68,7 @@ async fn run_connection(
connection: JsonRpcConnection,
session_registry: Arc<SessionRegistry>,
runtime_paths: ExecServerRuntimePaths,
status: Arc<ExecServerStatusState>,
) {
let router = Arc::new(build_router());
let (json_outgoing_tx, mut incoming_rx, mut disconnected_rx, connection_tasks) =
@@ -96,6 +120,7 @@ async fn run_connection(
JsonRpcConnectionEvent::Message(message) => match message {
codex_app_server_protocol::JSONRPCMessage::Request(request) => {
if let Some(route) = router.request_route(request.method.as_str()) {
let method = request.method.clone();
let message = tokio::select! {
message = route(Arc::clone(&handler), request) => message,
_ = disconnected_rx.changed() => {
@@ -103,23 +128,37 @@ async fn run_connection(
break;
}
};
match &message {
Some(RpcServerOutboundMessage::Error { .. }) => {
status.request_failed(&method);
}
Some(RpcServerOutboundMessage::Response { .. }) | None => {
status.request_succeeded(&method);
}
Some(RpcServerOutboundMessage::Notification(_)) => {
status.request_succeeded(&method);
}
}
if let Some(message) = message
&& outgoing_tx.send(message).await.is_err()
{
break;
}
} else if outgoing_tx
.send(RpcServerOutboundMessage::Error {
request_id: request.id,
error: method_not_found(format!(
"exec-server stub does not implement `{}` yet",
request.method
)),
})
.await
.is_err()
{
break;
} else {
status.request_failed(&request.method);
if outgoing_tx
.send(RpcServerOutboundMessage::Error {
request_id: request.id,
error: method_not_found(format!(
"exec-server stub does not implement `{}` yet",
request.method
)),
})
.await
.is_err()
{
break;
}
}
}
codex_app_server_protocol::JSONRPCMessage::Notification(notification) => {
@@ -218,10 +257,11 @@ mod tests {
use crate::protocol::TerminateParams;
use crate::protocol::TerminateResponse;
use crate::server::session_registry::SessionRegistry;
use crate::server::status::ExecServerStatusState;
#[tokio::test]
async fn transport_disconnect_detaches_session_during_in_flight_read() {
let registry = SessionRegistry::new();
let registry = SessionRegistry::new_for_tests();
let (mut first_writer, mut first_lines, first_task) =
spawn_test_connection(Arc::clone(&registry), "first");
@@ -317,7 +357,9 @@ mod tests {
let (server_writer, client_reader) = duplex(1 << 20);
let connection =
JsonRpcConnection::from_stdio(server_reader, server_writer, label.to_string());
let task = tokio::spawn(run_connection(connection, registry, test_runtime_paths()));
let runtime_paths = test_runtime_paths();
let status = ExecServerStatusState::new(runtime_paths.clone());
let task = tokio::spawn(run_connection(connection, registry, runtime_paths, status));
(client_writer, BufReader::new(client_reader).lines(), task)
}

View File

@@ -10,6 +10,9 @@ use uuid::Uuid;
use crate::rpc::RpcNotificationSender;
use crate::rpc::invalid_request;
use crate::server::process_handler::ProcessHandler;
use crate::server::status::ExecServerStatusState;
use crate::server::status::ProcessStatusSnapshot;
use crate::server::status::SessionStatusSnapshot;
#[cfg(test)]
const DETACHED_SESSION_TTL: Duration = Duration::from_millis(200);
@@ -18,6 +21,7 @@ const DETACHED_SESSION_TTL: Duration = Duration::from_secs(10);
pub(crate) struct SessionRegistry {
sessions: Mutex<HashMap<String, Arc<SessionEntry>>>,
status: Arc<ExecServerStatusState>,
}
struct SessionEntry {
@@ -49,12 +53,29 @@ pub(crate) struct SessionHandle {
}
impl SessionRegistry {
pub(crate) fn new() -> Arc<Self> {
pub(crate) fn new(status: Arc<ExecServerStatusState>) -> Arc<Self> {
Arc::new(Self {
sessions: Mutex::new(HashMap::new()),
status,
})
}
#[cfg(test)]
pub(crate) fn new_for_tests() -> Arc<Self> {
let current_exe = match std::env::current_exe() {
Ok(current_exe) => current_exe,
Err(err) => panic!("current executable should resolve: {err}"),
};
let runtime_paths = match crate::ExecServerRuntimePaths::new(
current_exe,
/*codex_linux_sandbox_exe*/ None,
) {
Ok(runtime_paths) => runtime_paths,
Err(err) => panic!("current executable should be absolute: {err}"),
};
Self::new(ExecServerStatusState::new(runtime_paths))
}
pub(crate) async fn attach(
self: &Arc<Self>,
resume_session_id: Option<String>,
@@ -94,10 +115,11 @@ impl SessionRegistry {
let session_id = Uuid::new_v4().to_string();
let entry = Arc::new(SessionEntry::new(
session_id.clone(),
ProcessHandler::new(notifications),
ProcessHandler::new(notifications, Arc::clone(&self.status)),
connection_id,
));
sessions.insert(session_id, Arc::clone(&entry));
self.status.session_created();
Ok(AttachOutcome::Attached(entry))
}
};
@@ -134,13 +156,25 @@ impl SessionRegistry {
entry.process.shutdown().await;
}
}
}
impl Default for SessionRegistry {
fn default() -> Self {
Self {
sessions: Mutex::new(HashMap::new()),
pub(crate) async fn status_snapshot(&self) -> (SessionStatusSnapshot, ProcessStatusSnapshot) {
let entries = {
let sessions = self.sessions.lock().await;
sessions.values().cloned().collect::<Vec<_>>()
};
let mut sessions = SessionStatusSnapshot::default();
let mut processes = ProcessStatusSnapshot::default();
for entry in entries {
match entry.attachment_status() {
SessionAttachmentStatus::Active => sessions.active += 1,
SessionAttachmentStatus::Detached => sessions.detached += 1,
}
let process = entry.process.status_snapshot().await;
processes.starting += process.starting;
processes.running += process.running;
processes.exited_retained += process.exited_retained;
}
(sessions, processes)
}
}
@@ -221,6 +255,23 @@ impl SessionEntry {
.detached_expires_at
.is_some_and(|deadline| now >= deadline)
}
fn attachment_status(&self) -> SessionAttachmentStatus {
let attachment = self
.attachment
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
if attachment.current_connection_id.is_some() {
SessionAttachmentStatus::Active
} else {
SessionAttachmentStatus::Detached
}
}
}
enum SessionAttachmentStatus {
Active,
Detached,
}
impl SessionHandle {

View File

@@ -0,0 +1,342 @@
use std::collections::BTreeMap;
use std::sync::Arc;
use std::sync::Mutex as StdMutex;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering;
use std::time::Instant;
use serde::Serialize;
use crate::ExecServerRuntimePaths;
const SERVICE_NAME: &str = "codex-exec-server";
const UNKNOWN_METHOD_LABEL: &str = "__unknown__";
#[derive(Default, Debug, Clone, Copy, Eq, PartialEq)]
pub(crate) struct SessionStatusSnapshot {
pub(crate) active: u64,
pub(crate) detached: u64,
}
#[derive(Default, Debug, Clone, Copy, Eq, PartialEq)]
pub(crate) struct ProcessStatusSnapshot {
pub(crate) starting: u64,
pub(crate) running: u64,
pub(crate) exited_retained: u64,
}
#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd)]
struct RequestMetricKey {
method: String,
result: &'static str,
}
#[derive(Debug)]
pub(crate) struct ExecServerStatusState {
started_at: Instant,
runtime_paths: ExecServerRuntimePaths,
active_connections: AtomicU64,
total_connections: AtomicU64,
total_sessions_created: AtomicU64,
total_processes_started: AtomicU64,
total_requests: AtomicU64,
total_request_successes: AtomicU64,
total_request_failures: AtomicU64,
requests_by_method: StdMutex<BTreeMap<RequestMetricKey, u64>>,
}
impl ExecServerStatusState {
pub(crate) fn new(runtime_paths: ExecServerRuntimePaths) -> Arc<Self> {
Arc::new(Self {
started_at: Instant::now(),
runtime_paths,
active_connections: AtomicU64::new(0),
total_connections: AtomicU64::new(0),
total_sessions_created: AtomicU64::new(0),
total_processes_started: AtomicU64::new(0),
total_requests: AtomicU64::new(0),
total_request_successes: AtomicU64::new(0),
total_request_failures: AtomicU64::new(0),
requests_by_method: StdMutex::new(BTreeMap::new()),
})
}
pub(crate) fn connection_opened(&self) {
self.active_connections.fetch_add(1, Ordering::Relaxed);
self.total_connections.fetch_add(1, Ordering::Relaxed);
}
pub(crate) fn connection_closed(&self) {
self.active_connections.fetch_sub(1, Ordering::Relaxed);
}
pub(crate) fn session_created(&self) {
self.total_sessions_created.fetch_add(1, Ordering::Relaxed);
}
pub(crate) fn process_started(&self) {
self.total_processes_started.fetch_add(1, Ordering::Relaxed);
}
pub(crate) fn request_succeeded(&self, method: &str) {
self.record_request(method, "ok");
self.total_request_successes.fetch_add(1, Ordering::Relaxed);
}
pub(crate) fn request_failed(&self, method: &str) {
self.record_request(method, "error");
self.total_request_failures.fetch_add(1, Ordering::Relaxed);
}
pub(crate) async fn readiness(&self) -> Result<(), String> {
helper_path_ready(self.runtime_paths.codex_self_exe.as_ref()).await?;
if let Some(path) = &self.runtime_paths.codex_linux_sandbox_exe {
helper_path_ready(path.as_ref()).await?;
}
Ok(())
}
pub(crate) async fn snapshot(
&self,
sessions: SessionStatusSnapshot,
processes: ProcessStatusSnapshot,
) -> StatusResponse {
let status = if self.readiness().await.is_ok() {
ServiceStatus::Ready
} else {
ServiceStatus::NotReady
};
StatusResponse {
service: SERVICE_NAME.to_string(),
status,
version: env!("CARGO_PKG_VERSION").to_string(),
uptime_seconds: self.started_at.elapsed().as_secs(),
connections: ConnectionStatus {
active: self.active_connections.load(Ordering::Relaxed),
total: self.total_connections.load(Ordering::Relaxed),
},
sessions: SessionStatus {
active: sessions.active,
detached: sessions.detached,
total_created: self.total_sessions_created.load(Ordering::Relaxed),
},
processes: ProcessStatus {
starting: processes.starting,
running: processes.running,
exited_retained: processes.exited_retained,
total_started: self.total_processes_started.load(Ordering::Relaxed),
},
requests: RequestStatus {
total: self.total_requests.load(Ordering::Relaxed),
succeeded: self.total_request_successes.load(Ordering::Relaxed),
failed: self.total_request_failures.load(Ordering::Relaxed),
},
capabilities: CapabilitiesStatus {
process: true,
filesystem: true,
http: true,
metrics: true,
},
}
}
pub(crate) fn render_prometheus_metrics(&self, snapshot: &StatusResponse) -> String {
let request_metrics = self
.requests_by_method
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.clone();
let mut output = String::new();
output.push_str(
"# HELP codex_exec_server_uptime_seconds Seconds since the exec-server started.\n",
);
output.push_str("# TYPE codex_exec_server_uptime_seconds gauge\n");
output.push_str(&format!(
"codex_exec_server_uptime_seconds {}\n",
snapshot.uptime_seconds
));
output.push_str(
"# HELP codex_exec_server_connections Current and cumulative websocket connections.\n",
);
output.push_str("# TYPE codex_exec_server_connections gauge\n");
output.push_str(&format!(
"codex_exec_server_connections{{state=\"active\"}} {}\n",
snapshot.connections.active
));
output.push_str("# TYPE codex_exec_server_connections_total counter\n");
output.push_str(&format!(
"codex_exec_server_connections_total {}\n",
snapshot.connections.total
));
output.push_str(
"# HELP codex_exec_server_sessions Current and cumulative exec-server sessions.\n",
);
output.push_str("# TYPE codex_exec_server_sessions gauge\n");
output.push_str(&format!(
"codex_exec_server_sessions{{state=\"active\"}} {}\n",
snapshot.sessions.active
));
output.push_str(&format!(
"codex_exec_server_sessions{{state=\"detached\"}} {}\n",
snapshot.sessions.detached
));
output.push_str("# TYPE codex_exec_server_sessions_created_total counter\n");
output.push_str(&format!(
"codex_exec_server_sessions_created_total {}\n",
snapshot.sessions.total_created
));
output.push_str("# HELP codex_exec_server_processes Current managed process counts.\n");
output.push_str("# TYPE codex_exec_server_processes gauge\n");
output.push_str(&format!(
"codex_exec_server_processes{{state=\"starting\"}} {}\n",
snapshot.processes.starting
));
output.push_str(&format!(
"codex_exec_server_processes{{state=\"running\"}} {}\n",
snapshot.processes.running
));
output.push_str(&format!(
"codex_exec_server_processes{{state=\"exited_retained\"}} {}\n",
snapshot.processes.exited_retained
));
output.push_str("# TYPE codex_exec_server_processes_started_total counter\n");
output.push_str(&format!(
"codex_exec_server_processes_started_total {}\n",
snapshot.processes.total_started
));
output.push_str("# HELP codex_exec_server_requests_total JSON-RPC requests handled by method and result.\n");
output.push_str("# TYPE codex_exec_server_requests_total counter\n");
for (key, value) in request_metrics {
output.push_str(&format!(
"codex_exec_server_requests_total{{method=\"{}\",result=\"{}\"}} {}\n",
key.method, key.result, value
));
}
output
}
fn record_request(&self, method: &str, result: &'static str) {
self.total_requests.fetch_add(1, Ordering::Relaxed);
let method = match method {
crate::protocol::INITIALIZE_METHOD
| crate::protocol::EXEC_METHOD
| crate::protocol::EXEC_READ_METHOD
| crate::protocol::EXEC_WRITE_METHOD
| crate::protocol::EXEC_TERMINATE_METHOD
| crate::protocol::FS_READ_FILE_METHOD
| crate::protocol::FS_WRITE_FILE_METHOD
| crate::protocol::FS_CREATE_DIRECTORY_METHOD
| crate::protocol::FS_GET_METADATA_METHOD
| crate::protocol::FS_READ_DIRECTORY_METHOD
| crate::protocol::FS_REMOVE_METHOD
| crate::protocol::FS_COPY_METHOD
| crate::protocol::HTTP_REQUEST_METHOD => method,
_ => UNKNOWN_METHOD_LABEL,
};
let mut requests_by_method = self
.requests_by_method
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
*requests_by_method
.entry(RequestMetricKey {
method: method.to_string(),
result,
})
.or_insert(0) += 1;
}
}
async fn helper_path_ready(path: &std::path::Path) -> Result<(), String> {
let metadata = tokio::fs::metadata(path)
.await
.map_err(|err| format!("helper path is unavailable: {err}"))?;
if metadata.is_file() {
Ok(())
} else {
Err("helper path is not a file".to_string())
}
}
#[derive(Debug, Clone, Eq, PartialEq, Serialize)]
#[serde(rename_all = "camelCase")]
pub(crate) struct StatusResponse {
pub(crate) service: String,
pub(crate) status: ServiceStatus,
pub(crate) version: String,
pub(crate) uptime_seconds: u64,
pub(crate) connections: ConnectionStatus,
pub(crate) sessions: SessionStatus,
pub(crate) processes: ProcessStatus,
pub(crate) requests: RequestStatus,
pub(crate) capabilities: CapabilitiesStatus,
}
#[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize)]
#[serde(rename_all = "camelCase")]
pub(crate) enum ServiceStatus {
Ready,
NotReady,
}
#[derive(Debug, Clone, Eq, PartialEq, Serialize)]
#[serde(rename_all = "camelCase")]
pub(crate) struct ConnectionStatus {
pub(crate) active: u64,
pub(crate) total: u64,
}
#[derive(Debug, Clone, Eq, PartialEq, Serialize)]
#[serde(rename_all = "camelCase")]
pub(crate) struct SessionStatus {
pub(crate) active: u64,
pub(crate) detached: u64,
pub(crate) total_created: u64,
}
#[derive(Debug, Clone, Eq, PartialEq, Serialize)]
#[serde(rename_all = "camelCase")]
pub(crate) struct ProcessStatus {
pub(crate) starting: u64,
pub(crate) running: u64,
pub(crate) exited_retained: u64,
pub(crate) total_started: u64,
}
#[derive(Debug, Clone, Eq, PartialEq, Serialize)]
#[serde(rename_all = "camelCase")]
pub(crate) struct RequestStatus {
pub(crate) total: u64,
pub(crate) succeeded: u64,
pub(crate) failed: u64,
}
#[derive(Debug, Clone, Eq, PartialEq, Serialize)]
#[serde(rename_all = "camelCase")]
pub(crate) struct CapabilitiesStatus {
pub(crate) process: bool,
pub(crate) filesystem: bool,
pub(crate) http: bool,
pub(crate) metrics: bool,
}
#[cfg(test)]
mod tests {
use super::ExecServerStatusState;
use crate::ExecServerRuntimePaths;
#[tokio::test]
async fn readiness_rejects_missing_required_helper_path() {
let tempdir = tempfile::tempdir().expect("tempdir");
let missing = tempdir.path().join("missing-codex");
let runtime_paths =
ExecServerRuntimePaths::new(missing, /*codex_linux_sandbox_exe*/ None)
.expect("runtime paths");
let status = ExecServerStatusState::new(runtime_paths);
let error = status
.readiness()
.await
.expect_err("missing helper should make exec-server not ready");
assert!(error.contains("helper path is unavailable"));
}
}

View File

@@ -1,12 +1,23 @@
use std::io::Write as _;
use std::net::SocketAddr;
use std::sync::Arc;
use axum::Json;
use axum::Router;
use axum::extract::ConnectInfo;
use axum::extract::State;
use axum::extract::ws::WebSocketUpgrade;
use axum::http::StatusCode;
use axum::response::IntoResponse;
use axum::response::Response;
use axum::routing::any;
use axum::routing::get;
use tokio::net::TcpListener;
use tokio_tungstenite::accept_async;
use tracing::warn;
use crate::ExecServerRuntimePaths;
use crate::connection::JsonRpcConnection;
use crate::server::processor::ConnectionProcessor;
use crate::server::status::ExecServerStatusState;
pub const DEFAULT_LISTEN_URL: &str = "ws://127.0.0.1:0";
@@ -61,34 +72,70 @@ async fn run_websocket_listener(
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let listener = TcpListener::bind(bind_address).await?;
let local_addr = listener.local_addr()?;
let processor = ConnectionProcessor::new(runtime_paths);
let status_state = ExecServerStatusState::new(runtime_paths.clone());
let processor = ConnectionProcessor::new(runtime_paths, status_state);
tracing::info!("codex-exec-server listening on ws://{local_addr}");
println!("ws://{local_addr}");
std::io::stdout().flush()?;
eprintln!("codex exec-server listening on ws://{local_addr}");
eprintln!(" readyz: http://{local_addr}/readyz");
eprintln!(" healthz: http://{local_addr}/healthz");
eprintln!(" status: http://{local_addr}/status");
eprintln!(" metrics: http://{local_addr}/metrics");
loop {
let (stream, peer_addr) = listener.accept().await?;
let processor = processor.clone();
tokio::spawn(async move {
match accept_async(stream).await {
Ok(websocket) => {
processor
.run_connection(JsonRpcConnection::from_websocket(
websocket,
format!("exec-server websocket {peer_addr}"),
))
.await;
}
Err(err) => {
warn!(
"failed to accept exec-server websocket connection from {peer_addr}: {err}"
);
}
}
});
let router = Router::new()
.route("/healthz", get(healthz))
.route("/readyz", get(readyz))
.route("/status", get(status))
.route("/metrics", get(metrics))
.fallback(any(websocket_upgrade))
.with_state(Arc::new(processor));
axum::serve(
listener,
router.into_make_service_with_connect_info::<SocketAddr>(),
)
.await?;
Ok(())
}
async fn healthz() -> impl IntoResponse {
(StatusCode::OK, "ok\n")
}
async fn readyz(State(processor): State<Arc<ConnectionProcessor>>) -> impl IntoResponse {
match processor.readiness().await {
Ok(()) => (StatusCode::OK, "ready\n"),
Err(_) => (StatusCode::SERVICE_UNAVAILABLE, "not ready\n"),
}
}
async fn status(State(processor): State<Arc<ConnectionProcessor>>) -> impl IntoResponse {
Json(processor.status_snapshot().await)
}
async fn metrics(State(processor): State<Arc<ConnectionProcessor>>) -> impl IntoResponse {
let snapshot = processor.status_snapshot().await;
let metrics = processor.render_prometheus_metrics(&snapshot);
([("content-type", "text/plain; version=0.0.4")], metrics)
}
async fn websocket_upgrade(
websocket: WebSocketUpgrade,
ConnectInfo(peer_addr): ConnectInfo<SocketAddr>,
State(processor): State<Arc<ConnectionProcessor>>,
) -> Response {
websocket
.on_upgrade(move |websocket| async move {
processor
.run_connection(JsonRpcConnection::from_axum_websocket(
websocket,
format!("exec-server websocket {peer_addr}"),
))
.await;
})
.into_response()
}
#[cfg(test)]
#[path = "transport_tests.rs"]
mod transport_tests;

View File

@@ -5,12 +5,150 @@ mod common;
use codex_app_server_protocol::JSONRPCError;
use codex_app_server_protocol::JSONRPCMessage;
use codex_app_server_protocol::JSONRPCResponse;
use codex_exec_server::ExecResponse;
use codex_exec_server::InitializeParams;
use codex_exec_server::InitializeResponse;
use codex_exec_server::ProcessId;
use common::exec_server::exec_server;
use pretty_assertions::assert_eq;
use reqwest::StatusCode;
use uuid::Uuid;
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn exec_server_serves_status_endpoints_on_same_listener() -> anyhow::Result<()> {
let mut server = exec_server().await?;
let client = reqwest::Client::new();
let healthz = client
.get(http_url(server.websocket_url(), "/healthz"))
.send()
.await?;
assert_eq!(healthz.status(), StatusCode::OK);
assert_eq!(healthz.text().await?, "ok\n");
let readyz = client
.get(http_url(server.websocket_url(), "/readyz"))
.send()
.await?;
assert_eq!(readyz.status(), StatusCode::OK);
assert_eq!(readyz.text().await?, "ready\n");
let initial_status: serde_json::Value = client
.get(http_url(server.websocket_url(), "/status"))
.send()
.await?
.json()
.await?;
assert_eq!(initial_status["service"], "codex-exec-server");
assert_eq!(initial_status["status"], "ready");
assert_eq!(
initial_status["connections"]["active"],
serde_json::json!(1)
);
assert_eq!(initial_status["sessions"]["active"], serde_json::json!(0));
let initialize_id = server
.send_request(
"initialize",
serde_json::to_value(InitializeParams {
client_name: "exec-server-test".to_string(),
resume_session_id: None,
})?,
)
.await?;
let response = server
.wait_for_event(|event| {
matches!(
event,
JSONRPCMessage::Response(JSONRPCResponse { id, .. }) if id == &initialize_id
)
})
.await?;
let JSONRPCMessage::Response(JSONRPCResponse { result, .. }) = response else {
panic!("expected initialize response");
};
let initialize_response: InitializeResponse = serde_json::from_value(result)?;
Uuid::parse_str(&initialize_response.session_id)?;
server
.send_notification("initialized", serde_json::json!({}))
.await?;
let process_start_id = server
.send_request(
"process/start",
serde_json::json!({
"processId": "proc-status",
"argv": ["sh", "-c", "sleep 5"],
"cwd": std::env::current_dir()?,
"env": {},
"tty": false,
"pipeStdin": false,
"arg0": null
}),
)
.await?;
let response = server
.wait_for_event(|event| {
matches!(
event,
JSONRPCMessage::Response(JSONRPCResponse { id, .. }) if id == &process_start_id
)
})
.await?;
let JSONRPCMessage::Response(JSONRPCResponse { result, .. }) = response else {
panic!("expected process/start response");
};
let process_start_response: ExecResponse = serde_json::from_value(result)?;
assert_eq!(
process_start_response,
ExecResponse {
process_id: ProcessId::from("proc-status")
}
);
let status_after_process: serde_json::Value = client
.get(http_url(server.websocket_url(), "/status"))
.send()
.await?
.json()
.await?;
assert_eq!(
status_after_process["sessions"]["active"],
serde_json::json!(1)
);
assert_eq!(
status_after_process["processes"]["running"],
serde_json::json!(1)
);
assert_eq!(
status_after_process["processes"]["totalStarted"],
serde_json::json!(1)
);
assert_eq!(
status_after_process["requests"]["succeeded"],
serde_json::json!(2)
);
let metrics = client
.get(http_url(server.websocket_url(), "/metrics"))
.send()
.await?
.text()
.await?;
assert!(metrics.contains("codex_exec_server_uptime_seconds"));
assert!(metrics.contains("codex_exec_server_connections{state=\"active\"} 1"));
assert!(
metrics.contains("codex_exec_server_requests_total{method=\"initialize\",result=\"ok\"} 1")
);
assert!(
metrics
.contains("codex_exec_server_requests_total{method=\"process/start\",result=\"ok\"} 1")
);
server.shutdown().await?;
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn exec_server_reports_malformed_websocket_json_and_keeps_running() -> anyhow::Result<()> {
let mut server = exec_server().await?;
@@ -60,3 +198,11 @@ async fn exec_server_reports_malformed_websocket_json_and_keeps_running() -> any
server.shutdown().await?;
Ok(())
}
fn http_url(websocket_url: &str, path: &str) -> String {
let http_authority = match websocket_url.strip_prefix("ws://") {
Some(http_authority) => http_authority,
None => panic!("exec-server harness should expose a ws:// URL: {websocket_url}"),
};
format!("http://{http_authority}{path}",)
}