Compare commits

...

2 Commits

Author SHA1 Message Date
Max Johnson
18bdfb5454 build: patch rules_rust process_wrapper metadata handling
Co-authored-by: Codex <noreply@openai.com>
2026-03-06 12:21:00 -08:00
Max Johnson
f9491590ec app-server: add websocket listener health checks 2026-03-06 11:22:33 -08:00
7 changed files with 153 additions and 57 deletions

View File

@@ -42,6 +42,12 @@ bazel_dep(name = "rules_platform", version = "0.1.0")
bazel_dep(name = "rules_rs", version = "0.0.40")
rules_rust = use_extension("@rules_rs//rs/experimental:rules_rust.bzl", "rules_rust")
rules_rust.patch(
patches = [
"//patches:rules_rs_process_wrapper_emit_metadata.patch",
],
strip = 1,
)
use_repo(rules_rust, "rules_rust")
toolchains = use_extension("@rules_rs//rs/experimental/toolchains:module_extension.bzl", "toolchains")

4
codex-rs/Cargo.lock generated
View File

@@ -827,6 +827,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b52af3cb4058c895d37317bb27508dccc8e5f2d39454016b297bf4a400597b8"
dependencies = [
"axum-core",
"base64 0.22.1",
"bytes",
"form_urlencoded",
"futures-util",
@@ -845,8 +846,10 @@ dependencies = [
"serde_json",
"serde_path_to_error",
"serde_urlencoded",
"sha1",
"sync_wrapper",
"tokio",
"tokio-tungstenite",
"tower",
"tower-layer",
"tower-service",
@@ -1440,6 +1443,7 @@ dependencies = [
"futures",
"owo-colors",
"pretty_assertions",
"reqwest",
"rmcp",
"serde",
"serde_json",

View File

@@ -18,6 +18,12 @@ workspace = true
[dependencies]
anyhow = { workspace = true }
async-trait = { workspace = true }
axum = { workspace = true, default-features = false, features = [
"http1",
"json",
"tokio",
"ws",
] }
codex-arg0 = { workspace = true }
codex-cloud-requirements = { workspace = true }
codex-core = { workspace = true }
@@ -59,15 +65,11 @@ uuid = { workspace = true, features = ["serde", "v7"] }
[dev-dependencies]
app_test_support = { workspace = true }
axum = { workspace = true, default-features = false, features = [
"http1",
"json",
"tokio",
] }
base64 = { workspace = true }
core_test_support = { workspace = true }
codex-utils-cargo-bin = { workspace = true }
pretty_assertions = { workspace = true }
reqwest = { workspace = true, features = ["rustls-tls"] }
rmcp = { workspace = true, default-features = false, features = [
"elicitation",
"server",

View File

@@ -26,6 +26,11 @@ Supported transports:
- stdio (`--listen stdio://`, default): newline-delimited JSON (JSONL)
- websocket (`--listen ws://IP:PORT`): one JSON-RPC message per websocket text frame (**experimental / unsupported**)
When running with `--listen ws://IP:PORT`, the same listener also serves basic HTTP health probes:
- `GET /readyz` returns `200 OK` once the listener is accepting new connections.
- `GET /healthz` currently always returns `200 OK`.
Websocket transport is currently experimental and unsupported. Do not rely on it for production workloads.
Tracing/log output:

View File

@@ -4,6 +4,16 @@ use crate::outgoing_message::ConnectionId;
use crate::outgoing_message::OutgoingEnvelope;
use crate::outgoing_message::OutgoingError;
use crate::outgoing_message::OutgoingMessage;
use axum::Router;
use axum::extract::ConnectInfo;
use axum::extract::State;
use axum::extract::ws::Message as WebSocketMessage;
use axum::extract::ws::WebSocket;
use axum::extract::ws::WebSocketUpgrade;
use axum::http::StatusCode;
use axum::response::IntoResponse;
use axum::routing::any;
use axum::routing::get;
use codex_app_server_protocol::JSONRPCErrorError;
use codex_app_server_protocol::JSONRPCMessage;
use codex_app_server_protocol::ServerRequest;
@@ -28,12 +38,8 @@ use tokio::io::AsyncWriteExt;
use tokio::io::BufReader;
use tokio::io::{self};
use tokio::net::TcpListener;
use tokio::net::TcpStream;
use tokio::sync::mpsc;
use tokio::task::JoinHandle;
use tokio_tungstenite::accept_async_with_config;
use tokio_tungstenite::tungstenite::Message as WebSocketMessage;
use tokio_tungstenite::tungstenite::protocol::WebSocketConfig;
use tokio_util::sync::CancellationToken;
use tracing::debug;
use tracing::error;
@@ -55,9 +61,15 @@ fn print_websocket_startup_banner(addr: SocketAddr) {
let title = colorize("codex app-server (WebSockets)", Style::new().bold().cyan());
let listening_label = colorize("listening on:", Style::new().dimmed());
let listen_url = colorize(&format!("ws://{addr}"), Style::new().green());
let ready_label = colorize("readyz:", Style::new().dimmed());
let ready_url = colorize(&format!("http://{addr}/readyz"), Style::new().green());
let health_label = colorize("healthz:", Style::new().dimmed());
let health_url = colorize(&format!("http://{addr}/healthz"), Style::new().green());
let note_label = colorize("note:", Style::new().dimmed());
eprintln!("{title}");
eprintln!(" {listening_label} {listen_url}");
eprintln!(" {ready_label} {ready_url}");
eprintln!(" {health_label} {health_url}");
if addr.ip().is_loopback() {
eprintln!(
" {note_label} binds localhost only (use SSH port-forwarding for remote access)"
@@ -69,6 +81,28 @@ fn print_websocket_startup_banner(addr: SocketAddr) {
}
}
#[derive(Clone)]
struct WebSocketListenerState {
transport_event_tx: mpsc::Sender<TransportEvent>,
connection_counter: Arc<AtomicU64>,
}
async fn health_check_handler() -> StatusCode {
StatusCode::OK
}
async fn websocket_upgrade_handler(
websocket: WebSocketUpgrade,
ConnectInfo(peer_addr): ConnectInfo<SocketAddr>,
State(state): State<WebSocketListenerState>,
) -> impl IntoResponse {
let connection_id = ConnectionId(state.connection_counter.fetch_add(1, Ordering::Relaxed));
info!(%peer_addr, "websocket client connected");
websocket.on_upgrade(move |stream| async move {
run_websocket_connection(connection_id, stream, state.transport_event_tx).await;
})
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub enum AppServerTransport {
Stdio,
@@ -279,54 +313,34 @@ pub(crate) async fn start_websocket_acceptor(
print_websocket_startup_banner(local_addr);
info!("app-server websocket listening on ws://{local_addr}");
let connection_counter = Arc::new(AtomicU64::new(1));
let router = Router::new()
.route("/readyz", get(health_check_handler))
.route("/healthz", get(health_check_handler))
.fallback(any(websocket_upgrade_handler))
.with_state(WebSocketListenerState {
transport_event_tx,
connection_counter: Arc::new(AtomicU64::new(1)),
});
let server = axum::serve(
listener,
router.into_make_service_with_connect_info::<SocketAddr>(),
)
.with_graceful_shutdown(async move {
shutdown_token.cancelled().await;
});
Ok(tokio::spawn(async move {
loop {
tokio::select! {
_ = shutdown_token.cancelled() => {
info!("websocket acceptor shutting down");
break;
}
accept_result = listener.accept() => {
match accept_result {
Ok((stream, peer_addr)) => {
info!(%peer_addr, "websocket client connected");
let connection_id =
ConnectionId(connection_counter.fetch_add(1, Ordering::Relaxed));
let transport_event_tx_for_connection = transport_event_tx.clone();
tokio::spawn(async move {
run_websocket_connection(
connection_id,
stream,
transport_event_tx_for_connection,
)
.await;
});
}
Err(err) => {
error!("failed to accept websocket connection: {err}");
}
}
}
}
if let Err(err) = server.await {
error!("websocket acceptor failed: {err}");
}
info!("websocket acceptor shutting down");
}))
}
async fn run_websocket_connection(
connection_id: ConnectionId,
stream: TcpStream,
websocket_stream: WebSocket,
transport_event_tx: mpsc::Sender<TransportEvent>,
) {
let websocket_stream =
match accept_async_with_config(stream, Some(WebSocketConfig::default())).await {
Ok(stream) => stream,
Err(err) => {
warn!("failed to complete websocket handshake: {err}");
return;
}
};
let (writer_tx, writer_rx) = mpsc::channel::<OutgoingMessage>(CHANNEL_CAPACITY);
let writer_tx_for_reader = writer_tx.clone();
let disconnect_token = CancellationToken::new();
@@ -377,10 +391,7 @@ async fn run_websocket_connection(
}
async fn run_websocket_outbound_loop(
mut websocket_writer: futures::stream::SplitSink<
tokio_tungstenite::WebSocketStream<TcpStream>,
WebSocketMessage,
>,
mut websocket_writer: futures::stream::SplitSink<WebSocket, WebSocketMessage>,
mut writer_rx: mpsc::Receiver<OutgoingMessage>,
mut writer_control_rx: mpsc::Receiver<WebSocketMessage>,
disconnect_token: CancellationToken,
@@ -414,9 +425,7 @@ async fn run_websocket_outbound_loop(
}
async fn run_websocket_inbound_loop(
mut websocket_reader: futures::stream::SplitStream<
tokio_tungstenite::WebSocketStream<TcpStream>,
>,
mut websocket_reader: futures::stream::SplitStream<WebSocket>,
transport_event_tx: mpsc::Sender<TransportEvent>,
writer_tx_for_reader: mpsc::Sender<OutgoingMessage>,
writer_control_tx: mpsc::Sender<WebSocketMessage>,
@@ -435,7 +444,7 @@ async fn run_websocket_inbound_loop(
&transport_event_tx,
&writer_tx_for_reader,
connection_id,
&text,
text.as_ref(),
)
.await
{
@@ -457,7 +466,6 @@ async fn run_websocket_inbound_loop(
Some(Ok(WebSocketMessage::Binary(_))) => {
warn!("dropping unsupported binary websocket message");
}
Some(Ok(WebSocketMessage::Frame(_))) => {}
Some(Err(err)) => {
warn!("websocket receive error: {err}");
break;

View File

@@ -11,6 +11,7 @@ use codex_app_server_protocol::JSONRPCResponse;
use codex_app_server_protocol::RequestId;
use futures::SinkExt;
use futures::StreamExt;
use reqwest::StatusCode;
use serde_json::json;
use std::net::SocketAddr;
use std::path::Path;
@@ -78,6 +79,34 @@ async fn websocket_transport_routes_per_connection_handshake_and_responses() ->
Ok(())
}
#[tokio::test]
async fn websocket_transport_serves_health_endpoints_on_same_listener() -> Result<()> {
let server = create_mock_responses_server_sequence_unchecked(Vec::new()).await;
let codex_home = TempDir::new()?;
create_config_toml(codex_home.path(), &server.uri(), "never")?;
let bind_addr = reserve_local_addr()?;
let mut process = spawn_websocket_server(codex_home.path(), bind_addr).await?;
let client = reqwest::Client::new();
let readyz = http_get(&client, bind_addr, "/readyz").await?;
assert_eq!(readyz.status(), StatusCode::OK);
let healthz = http_get(&client, bind_addr, "/healthz").await?;
assert_eq!(healthz.status(), StatusCode::OK);
let mut ws = connect_websocket(bind_addr).await?;
send_initialize_request(&mut ws, 1, "ws_health_client").await?;
let init = read_response_for_id(&mut ws, 1).await?;
assert_eq!(init.id, RequestId::Integer(1));
process
.kill()
.await
.context("failed to stop websocket app-server process")?;
Ok(())
}
pub(super) async fn spawn_websocket_server(
codex_home: &Path,
bind_addr: SocketAddr,
@@ -132,6 +161,30 @@ pub(super) async fn connect_websocket(bind_addr: SocketAddr) -> Result<WsClient>
}
}
async fn http_get(
client: &reqwest::Client,
bind_addr: SocketAddr,
path: &str,
) -> Result<reqwest::Response> {
let deadline = Instant::now() + Duration::from_secs(10);
loop {
match client
.get(format!("http://{bind_addr}{path}"))
.send()
.await
.with_context(|| format!("failed to GET http://{bind_addr}{path}"))
{
Ok(response) => return Ok(response),
Err(err) => {
if Instant::now() >= deadline {
bail!("failed to GET http://{bind_addr}{path}: {err}");
}
sleep(Duration::from_millis(50)).await;
}
}
}
}
pub(super) async fn send_initialize_request(
stream: &mut WsClient,
id: i64,

View File

@@ -0,0 +1,18 @@
diff --git a/util/process_wrapper/rustc.rs b/util/process_wrapper/rustc.rs
index ddb82c9..69052ce 100644
--- a/util/process_wrapper/rustc.rs
+++ b/util/process_wrapper/rustc.rs
@@ -64,10 +64,12 @@ pub(crate) fn process_json(line: String, error_format: ErrorFormat) -> LineResult {
let parsed: JsonValue = line
.parse()
.map_err(|_| "error parsing rustc output as json".to_owned())?;
Ok(match parsed.try_into() {
+ Ok(RustcMessage::Emit(emit)) if emit == "metadata" => LineOutput::Terminate,
+ Ok(RustcMessage::Emit(_)) => LineOutput::Skip,
Ok(RustcMessage::Message(rendered)) => {
output_based_on_error_format(line, rendered, error_format)
}
- _ => LineOutput::Skip,
+ Err(()) => LineOutput::Skip,
})
}