mirror of
https://github.com/openai/codex.git
synced 2026-05-04 11:26:33 +00:00
Compare commits
4 Commits
codex/wind
...
starr/exec
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
519997aa20 | ||
|
|
08bbd154ed | ||
|
|
0e64d282d2 | ||
|
|
dfe19d244a |
@@ -3,6 +3,7 @@ load("//:defs.bzl", "codex_rust_crate")
|
||||
codex_rust_crate(
|
||||
name = "exec-server",
|
||||
crate_name = "codex_exec_server",
|
||||
deps_extra = ["@crates//:axum"],
|
||||
# Keep the crate's integration tests single-threaded under Bazel because
|
||||
# they install process-global test-binary dispatch state, and the remote
|
||||
# exec-server cases already rely on serialization around the full CLI path.
|
||||
|
||||
@@ -13,6 +13,12 @@ workspace = true
|
||||
[dependencies]
|
||||
arc-swap = { workspace = true }
|
||||
async-trait = { workspace = true }
|
||||
axum = { workspace = true, default-features = false, features = [
|
||||
"http1",
|
||||
"json",
|
||||
"tokio",
|
||||
"ws",
|
||||
] }
|
||||
base64 = { workspace = true }
|
||||
bytes = { workspace = true }
|
||||
codex-app-server-protocol = { workspace = true }
|
||||
|
||||
@@ -27,6 +27,22 @@ Wire framing:
|
||||
|
||||
- websocket: one JSON-RPC message per websocket text frame
|
||||
|
||||
## Status endpoints
|
||||
|
||||
When listening on `ws://IP:PORT`, the same TCP listener also serves a small
|
||||
HTTP operations surface:
|
||||
|
||||
- `GET /healthz`: returns `200 OK` with `ok\n` when the process can answer.
|
||||
- `GET /readyz`: returns `200 OK` with `ready\n` when required helper paths are
|
||||
usable, otherwise `503 Service Unavailable` with `not ready\n`.
|
||||
- `GET /status`: returns a JSON summary with service/version, readiness,
|
||||
uptime, connection/session/process/request counters, and capability flags.
|
||||
The response intentionally avoids command lines, environment variables,
|
||||
working directories, session ids, process ids, user names, and local paths.
|
||||
- `GET /metrics`: returns low-cardinality Prometheus text metrics for uptime,
|
||||
connections, sessions, processes, and JSON-RPC request totals by fixed method
|
||||
name and result.
|
||||
|
||||
## Lifecycle
|
||||
|
||||
Each connection follows this sequence:
|
||||
|
||||
@@ -1,3 +1,5 @@
|
||||
use axum::extract::ws::Message as AxumWebSocketMessage;
|
||||
use axum::extract::ws::WebSocket as AxumWebSocket;
|
||||
use codex_app_server_protocol::JSONRPCMessage;
|
||||
use futures::SinkExt;
|
||||
use futures::StreamExt;
|
||||
@@ -34,6 +36,139 @@ pub(crate) struct JsonRpcConnection {
|
||||
}
|
||||
|
||||
impl JsonRpcConnection {
|
||||
pub(crate) fn from_axum_websocket(stream: AxumWebSocket, connection_label: String) -> Self {
|
||||
let (outgoing_tx, mut outgoing_rx) = mpsc::channel(CHANNEL_CAPACITY);
|
||||
let (incoming_tx, incoming_rx) = mpsc::channel(CHANNEL_CAPACITY);
|
||||
let (disconnected_tx, disconnected_rx) = watch::channel(false);
|
||||
let (mut websocket_writer, mut websocket_reader) = stream.split();
|
||||
|
||||
let reader_label = connection_label.clone();
|
||||
let incoming_tx_for_reader = incoming_tx.clone();
|
||||
let disconnected_tx_for_reader = disconnected_tx.clone();
|
||||
let reader_task = tokio::spawn(async move {
|
||||
loop {
|
||||
match websocket_reader.next().await {
|
||||
Some(Ok(AxumWebSocketMessage::Text(text))) => {
|
||||
match serde_json::from_str::<JSONRPCMessage>(text.as_ref()) {
|
||||
Ok(message) => {
|
||||
if incoming_tx_for_reader
|
||||
.send(JsonRpcConnectionEvent::Message(message))
|
||||
.await
|
||||
.is_err()
|
||||
{
|
||||
break;
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
send_malformed_message(
|
||||
&incoming_tx_for_reader,
|
||||
Some(format!(
|
||||
"failed to parse websocket JSON-RPC message from {reader_label}: {err}"
|
||||
)),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
}
|
||||
}
|
||||
Some(Ok(AxumWebSocketMessage::Binary(bytes))) => {
|
||||
match serde_json::from_slice::<JSONRPCMessage>(bytes.as_ref()) {
|
||||
Ok(message) => {
|
||||
if incoming_tx_for_reader
|
||||
.send(JsonRpcConnectionEvent::Message(message))
|
||||
.await
|
||||
.is_err()
|
||||
{
|
||||
break;
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
send_malformed_message(
|
||||
&incoming_tx_for_reader,
|
||||
Some(format!(
|
||||
"failed to parse websocket JSON-RPC message from {reader_label}: {err}"
|
||||
)),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
}
|
||||
}
|
||||
Some(Ok(AxumWebSocketMessage::Close(_))) => {
|
||||
send_disconnected(
|
||||
&incoming_tx_for_reader,
|
||||
&disconnected_tx_for_reader,
|
||||
/*reason*/ None,
|
||||
)
|
||||
.await;
|
||||
break;
|
||||
}
|
||||
Some(Ok(AxumWebSocketMessage::Ping(_)))
|
||||
| Some(Ok(AxumWebSocketMessage::Pong(_))) => {}
|
||||
Some(Err(err)) => {
|
||||
send_disconnected(
|
||||
&incoming_tx_for_reader,
|
||||
&disconnected_tx_for_reader,
|
||||
Some(format!(
|
||||
"failed to read websocket JSON-RPC message from {reader_label}: {err}"
|
||||
)),
|
||||
)
|
||||
.await;
|
||||
break;
|
||||
}
|
||||
None => {
|
||||
send_disconnected(
|
||||
&incoming_tx_for_reader,
|
||||
&disconnected_tx_for_reader,
|
||||
/*reason*/ None,
|
||||
)
|
||||
.await;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
let writer_task = tokio::spawn(async move {
|
||||
while let Some(message) = outgoing_rx.recv().await {
|
||||
match serialize_jsonrpc_message(&message) {
|
||||
Ok(encoded) => {
|
||||
if let Err(err) = websocket_writer
|
||||
.send(AxumWebSocketMessage::Text(encoded.into()))
|
||||
.await
|
||||
{
|
||||
send_disconnected(
|
||||
&incoming_tx,
|
||||
&disconnected_tx,
|
||||
Some(format!(
|
||||
"failed to write websocket JSON-RPC message to {connection_label}: {err}"
|
||||
)),
|
||||
)
|
||||
.await;
|
||||
break;
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
send_disconnected(
|
||||
&incoming_tx,
|
||||
&disconnected_tx,
|
||||
Some(format!(
|
||||
"failed to serialize JSON-RPC message for {connection_label}: {err}"
|
||||
)),
|
||||
)
|
||||
.await;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
Self {
|
||||
outgoing_tx,
|
||||
incoming_rx,
|
||||
disconnected_rx,
|
||||
task_handles: vec![reader_task, writer_task],
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub(crate) fn from_stdio<R, W>(reader: R, writer: W, connection_label: String) -> Self
|
||||
where
|
||||
|
||||
@@ -45,6 +45,8 @@ use crate::rpc::RpcServerOutboundMessage;
|
||||
use crate::rpc::internal_error;
|
||||
use crate::rpc::invalid_params;
|
||||
use crate::rpc::invalid_request;
|
||||
use crate::server::status::ExecServerStatusState;
|
||||
use crate::server::status::ProcessStatusSnapshot;
|
||||
|
||||
const RETAINED_OUTPUT_BYTES_PER_PROCESS: usize = 1024 * 1024;
|
||||
const NOTIFICATION_CHANNEL_CAPACITY: usize = 256;
|
||||
@@ -84,6 +86,7 @@ enum ProcessEntry {
|
||||
struct Inner {
|
||||
notifications: std::sync::RwLock<Option<RpcNotificationSender>>,
|
||||
processes: Mutex<HashMap<ProcessId, ProcessEntry>>,
|
||||
status: Arc<ExecServerStatusState>,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
@@ -103,16 +106,34 @@ impl Default for LocalProcess {
|
||||
let (outgoing_tx, mut outgoing_rx) =
|
||||
mpsc::channel::<RpcServerOutboundMessage>(NOTIFICATION_CHANNEL_CAPACITY);
|
||||
tokio::spawn(async move { while outgoing_rx.recv().await.is_some() {} });
|
||||
Self::new(RpcNotificationSender::new(outgoing_tx))
|
||||
let current_exe = match std::env::current_exe() {
|
||||
Ok(current_exe) => current_exe,
|
||||
Err(err) => panic!("current executable should resolve: {err}"),
|
||||
};
|
||||
let runtime_paths = match crate::ExecServerRuntimePaths::new(
|
||||
current_exe,
|
||||
/*codex_linux_sandbox_exe*/ None,
|
||||
) {
|
||||
Ok(runtime_paths) => runtime_paths,
|
||||
Err(err) => panic!("current executable should be absolute: {err}"),
|
||||
};
|
||||
Self::new(
|
||||
RpcNotificationSender::new(outgoing_tx),
|
||||
ExecServerStatusState::new(runtime_paths),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
impl LocalProcess {
|
||||
pub(crate) fn new(notifications: RpcNotificationSender) -> Self {
|
||||
pub(crate) fn new(
|
||||
notifications: RpcNotificationSender,
|
||||
status: Arc<ExecServerStatusState>,
|
||||
) -> Self {
|
||||
Self {
|
||||
inner: Arc::new(Inner {
|
||||
notifications: std::sync::RwLock::new(Some(notifications)),
|
||||
processes: Mutex::new(HashMap::new()),
|
||||
status,
|
||||
}),
|
||||
}
|
||||
}
|
||||
@@ -142,6 +163,21 @@ impl LocalProcess {
|
||||
*notification_sender = notifications;
|
||||
}
|
||||
|
||||
pub(crate) async fn status_snapshot(&self) -> ProcessStatusSnapshot {
|
||||
let processes = self.inner.processes.lock().await;
|
||||
let mut snapshot = ProcessStatusSnapshot::default();
|
||||
for process in processes.values() {
|
||||
match process {
|
||||
ProcessEntry::Starting => snapshot.starting += 1,
|
||||
ProcessEntry::Running(process) if process.exit_code.is_some() => {
|
||||
snapshot.exited_retained += 1;
|
||||
}
|
||||
ProcessEntry::Running(_) => snapshot.running += 1,
|
||||
}
|
||||
}
|
||||
snapshot
|
||||
}
|
||||
|
||||
async fn start_process(
|
||||
&self,
|
||||
params: ExecParams,
|
||||
@@ -229,6 +265,7 @@ impl LocalProcess {
|
||||
})),
|
||||
);
|
||||
}
|
||||
self.inner.status.process_started();
|
||||
|
||||
tokio::spawn(stream_output(
|
||||
process_id.clone(),
|
||||
|
||||
@@ -4,6 +4,7 @@ mod process_handler;
|
||||
mod processor;
|
||||
mod registry;
|
||||
mod session_registry;
|
||||
pub(crate) mod status;
|
||||
mod transport;
|
||||
|
||||
pub(crate) use handler::ExecServerHandler;
|
||||
|
||||
@@ -77,7 +77,7 @@ fn test_runtime_paths() -> ExecServerRuntimePaths {
|
||||
|
||||
async fn initialized_handler() -> Arc<ExecServerHandler> {
|
||||
let (outgoing_tx, _outgoing_rx) = mpsc::channel(16);
|
||||
let registry = SessionRegistry::new();
|
||||
let registry = SessionRegistry::new_for_tests();
|
||||
let handler = Arc::new(ExecServerHandler::new(
|
||||
registry,
|
||||
RpcNotificationSender::new(outgoing_tx),
|
||||
@@ -155,7 +155,7 @@ async fn terminate_reports_false_after_process_exit() {
|
||||
#[tokio::test]
|
||||
async fn long_poll_read_fails_after_session_resume() {
|
||||
let (first_tx, _first_rx) = mpsc::channel(16);
|
||||
let registry = SessionRegistry::new();
|
||||
let registry = SessionRegistry::new_for_tests();
|
||||
let first_handler = Arc::new(ExecServerHandler::new(
|
||||
Arc::clone(®istry),
|
||||
RpcNotificationSender::new(first_tx),
|
||||
@@ -228,7 +228,7 @@ async fn long_poll_read_fails_after_session_resume() {
|
||||
#[tokio::test]
|
||||
async fn active_session_resume_is_rejected() {
|
||||
let (first_tx, _first_rx) = mpsc::channel(16);
|
||||
let registry = SessionRegistry::new();
|
||||
let registry = SessionRegistry::new_for_tests();
|
||||
let first_handler = Arc::new(ExecServerHandler::new(
|
||||
Arc::clone(®istry),
|
||||
RpcNotificationSender::new(first_tx),
|
||||
@@ -272,7 +272,7 @@ async fn active_session_resume_is_rejected() {
|
||||
async fn output_and_exit_are_retained_after_notification_receiver_closes() {
|
||||
let (outgoing_tx, outgoing_rx) = mpsc::channel(16);
|
||||
let handler = Arc::new(ExecServerHandler::new(
|
||||
SessionRegistry::new(),
|
||||
SessionRegistry::new_for_tests(),
|
||||
RpcNotificationSender::new(outgoing_tx),
|
||||
test_runtime_paths(),
|
||||
));
|
||||
|
||||
@@ -1,3 +1,5 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use codex_app_server_protocol::JSONRPCErrorError;
|
||||
|
||||
use crate::local_process::LocalProcess;
|
||||
@@ -10,6 +12,8 @@ use crate::protocol::TerminateResponse;
|
||||
use crate::protocol::WriteParams;
|
||||
use crate::protocol::WriteResponse;
|
||||
use crate::rpc::RpcNotificationSender;
|
||||
use crate::server::status::ExecServerStatusState;
|
||||
use crate::server::status::ProcessStatusSnapshot;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub(crate) struct ProcessHandler {
|
||||
@@ -17,9 +21,12 @@ pub(crate) struct ProcessHandler {
|
||||
}
|
||||
|
||||
impl ProcessHandler {
|
||||
pub(crate) fn new(notifications: RpcNotificationSender) -> Self {
|
||||
pub(crate) fn new(
|
||||
notifications: RpcNotificationSender,
|
||||
status: Arc<ExecServerStatusState>,
|
||||
) -> Self {
|
||||
Self {
|
||||
process: LocalProcess::new(notifications),
|
||||
process: LocalProcess::new(notifications, status),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -31,6 +38,10 @@ impl ProcessHandler {
|
||||
self.process.set_notification_sender(notifications);
|
||||
}
|
||||
|
||||
pub(crate) async fn status_snapshot(&self) -> ProcessStatusSnapshot {
|
||||
self.process.status_snapshot().await
|
||||
}
|
||||
|
||||
pub(crate) async fn exec(&self, params: ExecParams) -> Result<ExecResponse, JSONRPCErrorError> {
|
||||
self.process.exec(params).await
|
||||
}
|
||||
|
||||
@@ -16,28 +16,51 @@ use crate::rpc::method_not_found;
|
||||
use crate::server::ExecServerHandler;
|
||||
use crate::server::registry::build_router;
|
||||
use crate::server::session_registry::SessionRegistry;
|
||||
use crate::server::status::ExecServerStatusState;
|
||||
use crate::server::status::StatusResponse;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub(crate) struct ConnectionProcessor {
|
||||
session_registry: Arc<SessionRegistry>,
|
||||
runtime_paths: ExecServerRuntimePaths,
|
||||
status: Arc<ExecServerStatusState>,
|
||||
}
|
||||
|
||||
impl ConnectionProcessor {
|
||||
pub(crate) fn new(runtime_paths: ExecServerRuntimePaths) -> Self {
|
||||
pub(crate) fn new(
|
||||
runtime_paths: ExecServerRuntimePaths,
|
||||
status: Arc<ExecServerStatusState>,
|
||||
) -> Self {
|
||||
Self {
|
||||
session_registry: SessionRegistry::new(),
|
||||
session_registry: SessionRegistry::new(Arc::clone(&status)),
|
||||
runtime_paths,
|
||||
status,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn run_connection(&self, connection: JsonRpcConnection) {
|
||||
self.status.connection_opened();
|
||||
run_connection(
|
||||
connection,
|
||||
Arc::clone(&self.session_registry),
|
||||
self.runtime_paths.clone(),
|
||||
Arc::clone(&self.status),
|
||||
)
|
||||
.await;
|
||||
self.status.connection_closed();
|
||||
}
|
||||
|
||||
pub(crate) async fn status_snapshot(&self) -> StatusResponse {
|
||||
let (sessions, processes) = self.session_registry.status_snapshot().await;
|
||||
self.status.snapshot(sessions, processes).await
|
||||
}
|
||||
|
||||
pub(crate) async fn readiness(&self) -> Result<(), String> {
|
||||
self.status.readiness().await
|
||||
}
|
||||
|
||||
pub(crate) fn render_prometheus_metrics(&self, snapshot: &StatusResponse) -> String {
|
||||
self.status.render_prometheus_metrics(snapshot)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -45,6 +68,7 @@ async fn run_connection(
|
||||
connection: JsonRpcConnection,
|
||||
session_registry: Arc<SessionRegistry>,
|
||||
runtime_paths: ExecServerRuntimePaths,
|
||||
status: Arc<ExecServerStatusState>,
|
||||
) {
|
||||
let router = Arc::new(build_router());
|
||||
let (json_outgoing_tx, mut incoming_rx, mut disconnected_rx, connection_tasks) =
|
||||
@@ -96,6 +120,7 @@ async fn run_connection(
|
||||
JsonRpcConnectionEvent::Message(message) => match message {
|
||||
codex_app_server_protocol::JSONRPCMessage::Request(request) => {
|
||||
if let Some(route) = router.request_route(request.method.as_str()) {
|
||||
let method = request.method.clone();
|
||||
let message = tokio::select! {
|
||||
message = route(Arc::clone(&handler), request) => message,
|
||||
_ = disconnected_rx.changed() => {
|
||||
@@ -103,23 +128,37 @@ async fn run_connection(
|
||||
break;
|
||||
}
|
||||
};
|
||||
match &message {
|
||||
Some(RpcServerOutboundMessage::Error { .. }) => {
|
||||
status.request_failed(&method);
|
||||
}
|
||||
Some(RpcServerOutboundMessage::Response { .. }) | None => {
|
||||
status.request_succeeded(&method);
|
||||
}
|
||||
Some(RpcServerOutboundMessage::Notification(_)) => {
|
||||
status.request_succeeded(&method);
|
||||
}
|
||||
}
|
||||
if let Some(message) = message
|
||||
&& outgoing_tx.send(message).await.is_err()
|
||||
{
|
||||
break;
|
||||
}
|
||||
} else if outgoing_tx
|
||||
.send(RpcServerOutboundMessage::Error {
|
||||
request_id: request.id,
|
||||
error: method_not_found(format!(
|
||||
"exec-server stub does not implement `{}` yet",
|
||||
request.method
|
||||
)),
|
||||
})
|
||||
.await
|
||||
.is_err()
|
||||
{
|
||||
break;
|
||||
} else {
|
||||
status.request_failed(&request.method);
|
||||
if outgoing_tx
|
||||
.send(RpcServerOutboundMessage::Error {
|
||||
request_id: request.id,
|
||||
error: method_not_found(format!(
|
||||
"exec-server stub does not implement `{}` yet",
|
||||
request.method
|
||||
)),
|
||||
})
|
||||
.await
|
||||
.is_err()
|
||||
{
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
codex_app_server_protocol::JSONRPCMessage::Notification(notification) => {
|
||||
@@ -218,10 +257,11 @@ mod tests {
|
||||
use crate::protocol::TerminateParams;
|
||||
use crate::protocol::TerminateResponse;
|
||||
use crate::server::session_registry::SessionRegistry;
|
||||
use crate::server::status::ExecServerStatusState;
|
||||
|
||||
#[tokio::test]
|
||||
async fn transport_disconnect_detaches_session_during_in_flight_read() {
|
||||
let registry = SessionRegistry::new();
|
||||
let registry = SessionRegistry::new_for_tests();
|
||||
let (mut first_writer, mut first_lines, first_task) =
|
||||
spawn_test_connection(Arc::clone(®istry), "first");
|
||||
|
||||
@@ -317,7 +357,9 @@ mod tests {
|
||||
let (server_writer, client_reader) = duplex(1 << 20);
|
||||
let connection =
|
||||
JsonRpcConnection::from_stdio(server_reader, server_writer, label.to_string());
|
||||
let task = tokio::spawn(run_connection(connection, registry, test_runtime_paths()));
|
||||
let runtime_paths = test_runtime_paths();
|
||||
let status = ExecServerStatusState::new(runtime_paths.clone());
|
||||
let task = tokio::spawn(run_connection(connection, registry, runtime_paths, status));
|
||||
(client_writer, BufReader::new(client_reader).lines(), task)
|
||||
}
|
||||
|
||||
|
||||
@@ -10,6 +10,9 @@ use uuid::Uuid;
|
||||
use crate::rpc::RpcNotificationSender;
|
||||
use crate::rpc::invalid_request;
|
||||
use crate::server::process_handler::ProcessHandler;
|
||||
use crate::server::status::ExecServerStatusState;
|
||||
use crate::server::status::ProcessStatusSnapshot;
|
||||
use crate::server::status::SessionStatusSnapshot;
|
||||
|
||||
#[cfg(test)]
|
||||
const DETACHED_SESSION_TTL: Duration = Duration::from_millis(200);
|
||||
@@ -18,6 +21,7 @@ const DETACHED_SESSION_TTL: Duration = Duration::from_secs(10);
|
||||
|
||||
pub(crate) struct SessionRegistry {
|
||||
sessions: Mutex<HashMap<String, Arc<SessionEntry>>>,
|
||||
status: Arc<ExecServerStatusState>,
|
||||
}
|
||||
|
||||
struct SessionEntry {
|
||||
@@ -49,12 +53,29 @@ pub(crate) struct SessionHandle {
|
||||
}
|
||||
|
||||
impl SessionRegistry {
|
||||
pub(crate) fn new() -> Arc<Self> {
|
||||
pub(crate) fn new(status: Arc<ExecServerStatusState>) -> Arc<Self> {
|
||||
Arc::new(Self {
|
||||
sessions: Mutex::new(HashMap::new()),
|
||||
status,
|
||||
})
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub(crate) fn new_for_tests() -> Arc<Self> {
|
||||
let current_exe = match std::env::current_exe() {
|
||||
Ok(current_exe) => current_exe,
|
||||
Err(err) => panic!("current executable should resolve: {err}"),
|
||||
};
|
||||
let runtime_paths = match crate::ExecServerRuntimePaths::new(
|
||||
current_exe,
|
||||
/*codex_linux_sandbox_exe*/ None,
|
||||
) {
|
||||
Ok(runtime_paths) => runtime_paths,
|
||||
Err(err) => panic!("current executable should be absolute: {err}"),
|
||||
};
|
||||
Self::new(ExecServerStatusState::new(runtime_paths))
|
||||
}
|
||||
|
||||
pub(crate) async fn attach(
|
||||
self: &Arc<Self>,
|
||||
resume_session_id: Option<String>,
|
||||
@@ -94,10 +115,11 @@ impl SessionRegistry {
|
||||
let session_id = Uuid::new_v4().to_string();
|
||||
let entry = Arc::new(SessionEntry::new(
|
||||
session_id.clone(),
|
||||
ProcessHandler::new(notifications),
|
||||
ProcessHandler::new(notifications, Arc::clone(&self.status)),
|
||||
connection_id,
|
||||
));
|
||||
sessions.insert(session_id, Arc::clone(&entry));
|
||||
self.status.session_created();
|
||||
Ok(AttachOutcome::Attached(entry))
|
||||
}
|
||||
};
|
||||
@@ -134,13 +156,25 @@ impl SessionRegistry {
|
||||
entry.process.shutdown().await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for SessionRegistry {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
sessions: Mutex::new(HashMap::new()),
|
||||
pub(crate) async fn status_snapshot(&self) -> (SessionStatusSnapshot, ProcessStatusSnapshot) {
|
||||
let entries = {
|
||||
let sessions = self.sessions.lock().await;
|
||||
sessions.values().cloned().collect::<Vec<_>>()
|
||||
};
|
||||
let mut sessions = SessionStatusSnapshot::default();
|
||||
let mut processes = ProcessStatusSnapshot::default();
|
||||
for entry in entries {
|
||||
match entry.attachment_status() {
|
||||
SessionAttachmentStatus::Active => sessions.active += 1,
|
||||
SessionAttachmentStatus::Detached => sessions.detached += 1,
|
||||
}
|
||||
let process = entry.process.status_snapshot().await;
|
||||
processes.starting += process.starting;
|
||||
processes.running += process.running;
|
||||
processes.exited_retained += process.exited_retained;
|
||||
}
|
||||
(sessions, processes)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -221,6 +255,23 @@ impl SessionEntry {
|
||||
.detached_expires_at
|
||||
.is_some_and(|deadline| now >= deadline)
|
||||
}
|
||||
|
||||
fn attachment_status(&self) -> SessionAttachmentStatus {
|
||||
let attachment = self
|
||||
.attachment
|
||||
.lock()
|
||||
.unwrap_or_else(std::sync::PoisonError::into_inner);
|
||||
if attachment.current_connection_id.is_some() {
|
||||
SessionAttachmentStatus::Active
|
||||
} else {
|
||||
SessionAttachmentStatus::Detached
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
enum SessionAttachmentStatus {
|
||||
Active,
|
||||
Detached,
|
||||
}
|
||||
|
||||
impl SessionHandle {
|
||||
|
||||
342
codex-rs/exec-server/src/server/status.rs
Normal file
342
codex-rs/exec-server/src/server/status.rs
Normal file
@@ -0,0 +1,342 @@
|
||||
use std::collections::BTreeMap;
|
||||
use std::sync::Arc;
|
||||
use std::sync::Mutex as StdMutex;
|
||||
use std::sync::atomic::AtomicU64;
|
||||
use std::sync::atomic::Ordering;
|
||||
use std::time::Instant;
|
||||
|
||||
use serde::Serialize;
|
||||
|
||||
use crate::ExecServerRuntimePaths;
|
||||
|
||||
const SERVICE_NAME: &str = "codex-exec-server";
|
||||
const UNKNOWN_METHOD_LABEL: &str = "__unknown__";
|
||||
|
||||
#[derive(Default, Debug, Clone, Copy, Eq, PartialEq)]
|
||||
pub(crate) struct SessionStatusSnapshot {
|
||||
pub(crate) active: u64,
|
||||
pub(crate) detached: u64,
|
||||
}
|
||||
|
||||
#[derive(Default, Debug, Clone, Copy, Eq, PartialEq)]
|
||||
pub(crate) struct ProcessStatusSnapshot {
|
||||
pub(crate) starting: u64,
|
||||
pub(crate) running: u64,
|
||||
pub(crate) exited_retained: u64,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd)]
|
||||
struct RequestMetricKey {
|
||||
method: String,
|
||||
result: &'static str,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct ExecServerStatusState {
|
||||
started_at: Instant,
|
||||
runtime_paths: ExecServerRuntimePaths,
|
||||
active_connections: AtomicU64,
|
||||
total_connections: AtomicU64,
|
||||
total_sessions_created: AtomicU64,
|
||||
total_processes_started: AtomicU64,
|
||||
total_requests: AtomicU64,
|
||||
total_request_successes: AtomicU64,
|
||||
total_request_failures: AtomicU64,
|
||||
requests_by_method: StdMutex<BTreeMap<RequestMetricKey, u64>>,
|
||||
}
|
||||
|
||||
impl ExecServerStatusState {
|
||||
pub(crate) fn new(runtime_paths: ExecServerRuntimePaths) -> Arc<Self> {
|
||||
Arc::new(Self {
|
||||
started_at: Instant::now(),
|
||||
runtime_paths,
|
||||
active_connections: AtomicU64::new(0),
|
||||
total_connections: AtomicU64::new(0),
|
||||
total_sessions_created: AtomicU64::new(0),
|
||||
total_processes_started: AtomicU64::new(0),
|
||||
total_requests: AtomicU64::new(0),
|
||||
total_request_successes: AtomicU64::new(0),
|
||||
total_request_failures: AtomicU64::new(0),
|
||||
requests_by_method: StdMutex::new(BTreeMap::new()),
|
||||
})
|
||||
}
|
||||
|
||||
pub(crate) fn connection_opened(&self) {
|
||||
self.active_connections.fetch_add(1, Ordering::Relaxed);
|
||||
self.total_connections.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
pub(crate) fn connection_closed(&self) {
|
||||
self.active_connections.fetch_sub(1, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
pub(crate) fn session_created(&self) {
|
||||
self.total_sessions_created.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
pub(crate) fn process_started(&self) {
|
||||
self.total_processes_started.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
pub(crate) fn request_succeeded(&self, method: &str) {
|
||||
self.record_request(method, "ok");
|
||||
self.total_request_successes.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
pub(crate) fn request_failed(&self, method: &str) {
|
||||
self.record_request(method, "error");
|
||||
self.total_request_failures.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
pub(crate) async fn readiness(&self) -> Result<(), String> {
|
||||
helper_path_ready(self.runtime_paths.codex_self_exe.as_ref()).await?;
|
||||
if let Some(path) = &self.runtime_paths.codex_linux_sandbox_exe {
|
||||
helper_path_ready(path.as_ref()).await?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn snapshot(
|
||||
&self,
|
||||
sessions: SessionStatusSnapshot,
|
||||
processes: ProcessStatusSnapshot,
|
||||
) -> StatusResponse {
|
||||
let status = if self.readiness().await.is_ok() {
|
||||
ServiceStatus::Ready
|
||||
} else {
|
||||
ServiceStatus::NotReady
|
||||
};
|
||||
StatusResponse {
|
||||
service: SERVICE_NAME.to_string(),
|
||||
status,
|
||||
version: env!("CARGO_PKG_VERSION").to_string(),
|
||||
uptime_seconds: self.started_at.elapsed().as_secs(),
|
||||
connections: ConnectionStatus {
|
||||
active: self.active_connections.load(Ordering::Relaxed),
|
||||
total: self.total_connections.load(Ordering::Relaxed),
|
||||
},
|
||||
sessions: SessionStatus {
|
||||
active: sessions.active,
|
||||
detached: sessions.detached,
|
||||
total_created: self.total_sessions_created.load(Ordering::Relaxed),
|
||||
},
|
||||
processes: ProcessStatus {
|
||||
starting: processes.starting,
|
||||
running: processes.running,
|
||||
exited_retained: processes.exited_retained,
|
||||
total_started: self.total_processes_started.load(Ordering::Relaxed),
|
||||
},
|
||||
requests: RequestStatus {
|
||||
total: self.total_requests.load(Ordering::Relaxed),
|
||||
succeeded: self.total_request_successes.load(Ordering::Relaxed),
|
||||
failed: self.total_request_failures.load(Ordering::Relaxed),
|
||||
},
|
||||
capabilities: CapabilitiesStatus {
|
||||
process: true,
|
||||
filesystem: true,
|
||||
http: true,
|
||||
metrics: true,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn render_prometheus_metrics(&self, snapshot: &StatusResponse) -> String {
|
||||
let request_metrics = self
|
||||
.requests_by_method
|
||||
.lock()
|
||||
.unwrap_or_else(std::sync::PoisonError::into_inner)
|
||||
.clone();
|
||||
let mut output = String::new();
|
||||
output.push_str(
|
||||
"# HELP codex_exec_server_uptime_seconds Seconds since the exec-server started.\n",
|
||||
);
|
||||
output.push_str("# TYPE codex_exec_server_uptime_seconds gauge\n");
|
||||
output.push_str(&format!(
|
||||
"codex_exec_server_uptime_seconds {}\n",
|
||||
snapshot.uptime_seconds
|
||||
));
|
||||
output.push_str(
|
||||
"# HELP codex_exec_server_connections Current and cumulative websocket connections.\n",
|
||||
);
|
||||
output.push_str("# TYPE codex_exec_server_connections gauge\n");
|
||||
output.push_str(&format!(
|
||||
"codex_exec_server_connections{{state=\"active\"}} {}\n",
|
||||
snapshot.connections.active
|
||||
));
|
||||
output.push_str("# TYPE codex_exec_server_connections_total counter\n");
|
||||
output.push_str(&format!(
|
||||
"codex_exec_server_connections_total {}\n",
|
||||
snapshot.connections.total
|
||||
));
|
||||
output.push_str(
|
||||
"# HELP codex_exec_server_sessions Current and cumulative exec-server sessions.\n",
|
||||
);
|
||||
output.push_str("# TYPE codex_exec_server_sessions gauge\n");
|
||||
output.push_str(&format!(
|
||||
"codex_exec_server_sessions{{state=\"active\"}} {}\n",
|
||||
snapshot.sessions.active
|
||||
));
|
||||
output.push_str(&format!(
|
||||
"codex_exec_server_sessions{{state=\"detached\"}} {}\n",
|
||||
snapshot.sessions.detached
|
||||
));
|
||||
output.push_str("# TYPE codex_exec_server_sessions_created_total counter\n");
|
||||
output.push_str(&format!(
|
||||
"codex_exec_server_sessions_created_total {}\n",
|
||||
snapshot.sessions.total_created
|
||||
));
|
||||
output.push_str("# HELP codex_exec_server_processes Current managed process counts.\n");
|
||||
output.push_str("# TYPE codex_exec_server_processes gauge\n");
|
||||
output.push_str(&format!(
|
||||
"codex_exec_server_processes{{state=\"starting\"}} {}\n",
|
||||
snapshot.processes.starting
|
||||
));
|
||||
output.push_str(&format!(
|
||||
"codex_exec_server_processes{{state=\"running\"}} {}\n",
|
||||
snapshot.processes.running
|
||||
));
|
||||
output.push_str(&format!(
|
||||
"codex_exec_server_processes{{state=\"exited_retained\"}} {}\n",
|
||||
snapshot.processes.exited_retained
|
||||
));
|
||||
output.push_str("# TYPE codex_exec_server_processes_started_total counter\n");
|
||||
output.push_str(&format!(
|
||||
"codex_exec_server_processes_started_total {}\n",
|
||||
snapshot.processes.total_started
|
||||
));
|
||||
output.push_str("# HELP codex_exec_server_requests_total JSON-RPC requests handled by method and result.\n");
|
||||
output.push_str("# TYPE codex_exec_server_requests_total counter\n");
|
||||
for (key, value) in request_metrics {
|
||||
output.push_str(&format!(
|
||||
"codex_exec_server_requests_total{{method=\"{}\",result=\"{}\"}} {}\n",
|
||||
key.method, key.result, value
|
||||
));
|
||||
}
|
||||
output
|
||||
}
|
||||
|
||||
fn record_request(&self, method: &str, result: &'static str) {
|
||||
self.total_requests.fetch_add(1, Ordering::Relaxed);
|
||||
let method = match method {
|
||||
crate::protocol::INITIALIZE_METHOD
|
||||
| crate::protocol::EXEC_METHOD
|
||||
| crate::protocol::EXEC_READ_METHOD
|
||||
| crate::protocol::EXEC_WRITE_METHOD
|
||||
| crate::protocol::EXEC_TERMINATE_METHOD
|
||||
| crate::protocol::FS_READ_FILE_METHOD
|
||||
| crate::protocol::FS_WRITE_FILE_METHOD
|
||||
| crate::protocol::FS_CREATE_DIRECTORY_METHOD
|
||||
| crate::protocol::FS_GET_METADATA_METHOD
|
||||
| crate::protocol::FS_READ_DIRECTORY_METHOD
|
||||
| crate::protocol::FS_REMOVE_METHOD
|
||||
| crate::protocol::FS_COPY_METHOD
|
||||
| crate::protocol::HTTP_REQUEST_METHOD => method,
|
||||
_ => UNKNOWN_METHOD_LABEL,
|
||||
};
|
||||
let mut requests_by_method = self
|
||||
.requests_by_method
|
||||
.lock()
|
||||
.unwrap_or_else(std::sync::PoisonError::into_inner);
|
||||
*requests_by_method
|
||||
.entry(RequestMetricKey {
|
||||
method: method.to_string(),
|
||||
result,
|
||||
})
|
||||
.or_insert(0) += 1;
|
||||
}
|
||||
}
|
||||
|
||||
async fn helper_path_ready(path: &std::path::Path) -> Result<(), String> {
|
||||
let metadata = tokio::fs::metadata(path)
|
||||
.await
|
||||
.map_err(|err| format!("helper path is unavailable: {err}"))?;
|
||||
if metadata.is_file() {
|
||||
Ok(())
|
||||
} else {
|
||||
Err("helper path is not a file".to_string())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Eq, PartialEq, Serialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub(crate) struct StatusResponse {
|
||||
pub(crate) service: String,
|
||||
pub(crate) status: ServiceStatus,
|
||||
pub(crate) version: String,
|
||||
pub(crate) uptime_seconds: u64,
|
||||
pub(crate) connections: ConnectionStatus,
|
||||
pub(crate) sessions: SessionStatus,
|
||||
pub(crate) processes: ProcessStatus,
|
||||
pub(crate) requests: RequestStatus,
|
||||
pub(crate) capabilities: CapabilitiesStatus,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub(crate) enum ServiceStatus {
|
||||
Ready,
|
||||
NotReady,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Eq, PartialEq, Serialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub(crate) struct ConnectionStatus {
|
||||
pub(crate) active: u64,
|
||||
pub(crate) total: u64,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Eq, PartialEq, Serialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub(crate) struct SessionStatus {
|
||||
pub(crate) active: u64,
|
||||
pub(crate) detached: u64,
|
||||
pub(crate) total_created: u64,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Eq, PartialEq, Serialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub(crate) struct ProcessStatus {
|
||||
pub(crate) starting: u64,
|
||||
pub(crate) running: u64,
|
||||
pub(crate) exited_retained: u64,
|
||||
pub(crate) total_started: u64,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Eq, PartialEq, Serialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub(crate) struct RequestStatus {
|
||||
pub(crate) total: u64,
|
||||
pub(crate) succeeded: u64,
|
||||
pub(crate) failed: u64,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Eq, PartialEq, Serialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub(crate) struct CapabilitiesStatus {
|
||||
pub(crate) process: bool,
|
||||
pub(crate) filesystem: bool,
|
||||
pub(crate) http: bool,
|
||||
pub(crate) metrics: bool,
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::ExecServerStatusState;
|
||||
use crate::ExecServerRuntimePaths;
|
||||
|
||||
#[tokio::test]
|
||||
async fn readiness_rejects_missing_required_helper_path() {
|
||||
let tempdir = tempfile::tempdir().expect("tempdir");
|
||||
let missing = tempdir.path().join("missing-codex");
|
||||
let runtime_paths =
|
||||
ExecServerRuntimePaths::new(missing, /*codex_linux_sandbox_exe*/ None)
|
||||
.expect("runtime paths");
|
||||
let status = ExecServerStatusState::new(runtime_paths);
|
||||
|
||||
let error = status
|
||||
.readiness()
|
||||
.await
|
||||
.expect_err("missing helper should make exec-server not ready");
|
||||
assert!(error.contains("helper path is unavailable"));
|
||||
}
|
||||
}
|
||||
@@ -1,12 +1,23 @@
|
||||
use std::io::Write as _;
|
||||
use std::net::SocketAddr;
|
||||
use std::sync::Arc;
|
||||
|
||||
use axum::Json;
|
||||
use axum::Router;
|
||||
use axum::extract::ConnectInfo;
|
||||
use axum::extract::State;
|
||||
use axum::extract::ws::WebSocketUpgrade;
|
||||
use axum::http::StatusCode;
|
||||
use axum::response::IntoResponse;
|
||||
use axum::response::Response;
|
||||
use axum::routing::any;
|
||||
use axum::routing::get;
|
||||
use tokio::net::TcpListener;
|
||||
use tokio_tungstenite::accept_async;
|
||||
use tracing::warn;
|
||||
|
||||
use crate::ExecServerRuntimePaths;
|
||||
use crate::connection::JsonRpcConnection;
|
||||
use crate::server::processor::ConnectionProcessor;
|
||||
use crate::server::status::ExecServerStatusState;
|
||||
|
||||
pub const DEFAULT_LISTEN_URL: &str = "ws://127.0.0.1:0";
|
||||
|
||||
@@ -61,34 +72,70 @@ async fn run_websocket_listener(
|
||||
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
||||
let listener = TcpListener::bind(bind_address).await?;
|
||||
let local_addr = listener.local_addr()?;
|
||||
let processor = ConnectionProcessor::new(runtime_paths);
|
||||
let status_state = ExecServerStatusState::new(runtime_paths.clone());
|
||||
let processor = ConnectionProcessor::new(runtime_paths, status_state);
|
||||
tracing::info!("codex-exec-server listening on ws://{local_addr}");
|
||||
println!("ws://{local_addr}");
|
||||
std::io::stdout().flush()?;
|
||||
eprintln!("codex exec-server listening on ws://{local_addr}");
|
||||
eprintln!(" readyz: http://{local_addr}/readyz");
|
||||
eprintln!(" healthz: http://{local_addr}/healthz");
|
||||
eprintln!(" status: http://{local_addr}/status");
|
||||
eprintln!(" metrics: http://{local_addr}/metrics");
|
||||
|
||||
loop {
|
||||
let (stream, peer_addr) = listener.accept().await?;
|
||||
let processor = processor.clone();
|
||||
tokio::spawn(async move {
|
||||
match accept_async(stream).await {
|
||||
Ok(websocket) => {
|
||||
processor
|
||||
.run_connection(JsonRpcConnection::from_websocket(
|
||||
websocket,
|
||||
format!("exec-server websocket {peer_addr}"),
|
||||
))
|
||||
.await;
|
||||
}
|
||||
Err(err) => {
|
||||
warn!(
|
||||
"failed to accept exec-server websocket connection from {peer_addr}: {err}"
|
||||
);
|
||||
}
|
||||
}
|
||||
});
|
||||
let router = Router::new()
|
||||
.route("/healthz", get(healthz))
|
||||
.route("/readyz", get(readyz))
|
||||
.route("/status", get(status))
|
||||
.route("/metrics", get(metrics))
|
||||
.fallback(any(websocket_upgrade))
|
||||
.with_state(Arc::new(processor));
|
||||
axum::serve(
|
||||
listener,
|
||||
router.into_make_service_with_connect_info::<SocketAddr>(),
|
||||
)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn healthz() -> impl IntoResponse {
|
||||
(StatusCode::OK, "ok\n")
|
||||
}
|
||||
|
||||
async fn readyz(State(processor): State<Arc<ConnectionProcessor>>) -> impl IntoResponse {
|
||||
match processor.readiness().await {
|
||||
Ok(()) => (StatusCode::OK, "ready\n"),
|
||||
Err(_) => (StatusCode::SERVICE_UNAVAILABLE, "not ready\n"),
|
||||
}
|
||||
}
|
||||
|
||||
async fn status(State(processor): State<Arc<ConnectionProcessor>>) -> impl IntoResponse {
|
||||
Json(processor.status_snapshot().await)
|
||||
}
|
||||
|
||||
async fn metrics(State(processor): State<Arc<ConnectionProcessor>>) -> impl IntoResponse {
|
||||
let snapshot = processor.status_snapshot().await;
|
||||
let metrics = processor.render_prometheus_metrics(&snapshot);
|
||||
([("content-type", "text/plain; version=0.0.4")], metrics)
|
||||
}
|
||||
|
||||
async fn websocket_upgrade(
|
||||
websocket: WebSocketUpgrade,
|
||||
ConnectInfo(peer_addr): ConnectInfo<SocketAddr>,
|
||||
State(processor): State<Arc<ConnectionProcessor>>,
|
||||
) -> Response {
|
||||
websocket
|
||||
.on_upgrade(move |websocket| async move {
|
||||
processor
|
||||
.run_connection(JsonRpcConnection::from_axum_websocket(
|
||||
websocket,
|
||||
format!("exec-server websocket {peer_addr}"),
|
||||
))
|
||||
.await;
|
||||
})
|
||||
.into_response()
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
#[path = "transport_tests.rs"]
|
||||
mod transport_tests;
|
||||
|
||||
@@ -5,12 +5,150 @@ mod common;
|
||||
use codex_app_server_protocol::JSONRPCError;
|
||||
use codex_app_server_protocol::JSONRPCMessage;
|
||||
use codex_app_server_protocol::JSONRPCResponse;
|
||||
use codex_exec_server::ExecResponse;
|
||||
use codex_exec_server::InitializeParams;
|
||||
use codex_exec_server::InitializeResponse;
|
||||
use codex_exec_server::ProcessId;
|
||||
use common::exec_server::exec_server;
|
||||
use pretty_assertions::assert_eq;
|
||||
use reqwest::StatusCode;
|
||||
use uuid::Uuid;
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn exec_server_serves_status_endpoints_on_same_listener() -> anyhow::Result<()> {
|
||||
let mut server = exec_server().await?;
|
||||
let client = reqwest::Client::new();
|
||||
|
||||
let healthz = client
|
||||
.get(http_url(server.websocket_url(), "/healthz"))
|
||||
.send()
|
||||
.await?;
|
||||
assert_eq!(healthz.status(), StatusCode::OK);
|
||||
assert_eq!(healthz.text().await?, "ok\n");
|
||||
|
||||
let readyz = client
|
||||
.get(http_url(server.websocket_url(), "/readyz"))
|
||||
.send()
|
||||
.await?;
|
||||
assert_eq!(readyz.status(), StatusCode::OK);
|
||||
assert_eq!(readyz.text().await?, "ready\n");
|
||||
|
||||
let initial_status: serde_json::Value = client
|
||||
.get(http_url(server.websocket_url(), "/status"))
|
||||
.send()
|
||||
.await?
|
||||
.json()
|
||||
.await?;
|
||||
assert_eq!(initial_status["service"], "codex-exec-server");
|
||||
assert_eq!(initial_status["status"], "ready");
|
||||
assert_eq!(
|
||||
initial_status["connections"]["active"],
|
||||
serde_json::json!(1)
|
||||
);
|
||||
assert_eq!(initial_status["sessions"]["active"], serde_json::json!(0));
|
||||
|
||||
let initialize_id = server
|
||||
.send_request(
|
||||
"initialize",
|
||||
serde_json::to_value(InitializeParams {
|
||||
client_name: "exec-server-test".to_string(),
|
||||
resume_session_id: None,
|
||||
})?,
|
||||
)
|
||||
.await?;
|
||||
let response = server
|
||||
.wait_for_event(|event| {
|
||||
matches!(
|
||||
event,
|
||||
JSONRPCMessage::Response(JSONRPCResponse { id, .. }) if id == &initialize_id
|
||||
)
|
||||
})
|
||||
.await?;
|
||||
let JSONRPCMessage::Response(JSONRPCResponse { result, .. }) = response else {
|
||||
panic!("expected initialize response");
|
||||
};
|
||||
let initialize_response: InitializeResponse = serde_json::from_value(result)?;
|
||||
Uuid::parse_str(&initialize_response.session_id)?;
|
||||
|
||||
server
|
||||
.send_notification("initialized", serde_json::json!({}))
|
||||
.await?;
|
||||
let process_start_id = server
|
||||
.send_request(
|
||||
"process/start",
|
||||
serde_json::json!({
|
||||
"processId": "proc-status",
|
||||
"argv": ["sh", "-c", "sleep 5"],
|
||||
"cwd": std::env::current_dir()?,
|
||||
"env": {},
|
||||
"tty": false,
|
||||
"pipeStdin": false,
|
||||
"arg0": null
|
||||
}),
|
||||
)
|
||||
.await?;
|
||||
let response = server
|
||||
.wait_for_event(|event| {
|
||||
matches!(
|
||||
event,
|
||||
JSONRPCMessage::Response(JSONRPCResponse { id, .. }) if id == &process_start_id
|
||||
)
|
||||
})
|
||||
.await?;
|
||||
let JSONRPCMessage::Response(JSONRPCResponse { result, .. }) = response else {
|
||||
panic!("expected process/start response");
|
||||
};
|
||||
let process_start_response: ExecResponse = serde_json::from_value(result)?;
|
||||
assert_eq!(
|
||||
process_start_response,
|
||||
ExecResponse {
|
||||
process_id: ProcessId::from("proc-status")
|
||||
}
|
||||
);
|
||||
|
||||
let status_after_process: serde_json::Value = client
|
||||
.get(http_url(server.websocket_url(), "/status"))
|
||||
.send()
|
||||
.await?
|
||||
.json()
|
||||
.await?;
|
||||
assert_eq!(
|
||||
status_after_process["sessions"]["active"],
|
||||
serde_json::json!(1)
|
||||
);
|
||||
assert_eq!(
|
||||
status_after_process["processes"]["running"],
|
||||
serde_json::json!(1)
|
||||
);
|
||||
assert_eq!(
|
||||
status_after_process["processes"]["totalStarted"],
|
||||
serde_json::json!(1)
|
||||
);
|
||||
assert_eq!(
|
||||
status_after_process["requests"]["succeeded"],
|
||||
serde_json::json!(2)
|
||||
);
|
||||
|
||||
let metrics = client
|
||||
.get(http_url(server.websocket_url(), "/metrics"))
|
||||
.send()
|
||||
.await?
|
||||
.text()
|
||||
.await?;
|
||||
assert!(metrics.contains("codex_exec_server_uptime_seconds"));
|
||||
assert!(metrics.contains("codex_exec_server_connections{state=\"active\"} 1"));
|
||||
assert!(
|
||||
metrics.contains("codex_exec_server_requests_total{method=\"initialize\",result=\"ok\"} 1")
|
||||
);
|
||||
assert!(
|
||||
metrics
|
||||
.contains("codex_exec_server_requests_total{method=\"process/start\",result=\"ok\"} 1")
|
||||
);
|
||||
|
||||
server.shutdown().await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn exec_server_reports_malformed_websocket_json_and_keeps_running() -> anyhow::Result<()> {
|
||||
let mut server = exec_server().await?;
|
||||
@@ -60,3 +198,11 @@ async fn exec_server_reports_malformed_websocket_json_and_keeps_running() -> any
|
||||
server.shutdown().await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn http_url(websocket_url: &str, path: &str) -> String {
|
||||
let http_authority = match websocket_url.strip_prefix("ws://") {
|
||||
Some(http_authority) => http_authority,
|
||||
None => panic!("exec-server harness should expose a ws:// URL: {websocket_url}"),
|
||||
};
|
||||
format!("http://{http_authority}{path}",)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user