Add app-server transport layer with websocket support (#10693)

- Adds --listen <URL> to codex app-server with two listen modes:
      - stdio:// (default, existing behavior)
      - ws://IP:PORT (new websocket transport)
  - Refactors message routing to be connection-aware:
- Tracks per-connection session state (initialize/experimental
capability)
      - Routes responses/errors to the originating connection
- Broadcasts server notifications/requests to initialized connections
- Updates initialization semantics to be per connection (not
process-global), and updates app-server docs accordingly.
- Adds websocket accept/read/write handling (JSON-RPC per text frame,
ping/pong handling, connection lifecycle events).

Testing

- Unit tests for transport URL parsing and targeted response/error
routing.
  - New websocket integration test validating:
      - per-connection initialization requirements
      - no cross-connection response leakage
      - same request IDs on different connections route independently.
This commit is contained in:
Max Johnson
2026-02-05 12:56:34 -08:00
committed by GitHub
parent 428a9f6035
commit 8473096efb
13 changed files with 1403 additions and 308 deletions

4
codex-rs/Cargo.lock generated
View File

@@ -1219,6 +1219,7 @@ dependencies = [
"axum",
"base64 0.22.1",
"chrono",
"clap",
"codex-app-server-protocol",
"codex-arg0",
"codex-backend-client",
@@ -1233,8 +1234,10 @@ dependencies = [
"codex-protocol",
"codex-rmcp-client",
"codex-utils-absolute-path",
"codex-utils-cargo-bin",
"codex-utils-json-to-toml",
"core_test_support",
"futures",
"os_info",
"pretty_assertions",
"rmcp",
@@ -1245,6 +1248,7 @@ dependencies = [
"tempfile",
"time",
"tokio",
"tokio-tungstenite",
"toml 0.9.11+spec-1.1.0",
"tracing",
"tracing-subscriber",

View File

@@ -33,6 +33,8 @@ codex-rmcp-client = { workspace = true }
codex-utils-absolute-path = { workspace = true }
codex-utils-json-to-toml = { workspace = true }
chrono = { workspace = true }
clap = { workspace = true, features = ["derive"] }
futures = { workspace = true }
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
tempfile = { workspace = true }
@@ -45,6 +47,7 @@ tokio = { workspace = true, features = [
"rt-multi-thread",
"signal",
] }
tokio-tungstenite = { workspace = true }
tracing = { workspace = true, features = ["log"] }
tracing-subscriber = { workspace = true, features = ["env-filter", "fmt"] }
uuid = { workspace = true, features = ["serde", "v7"] }
@@ -59,6 +62,7 @@ axum = { workspace = true, default-features = false, features = [
base64 = { workspace = true }
codex-execpolicy = { workspace = true }
core_test_support = { workspace = true }
codex-utils-cargo-bin = { workspace = true }
os_info = { workspace = true }
pretty_assertions = { workspace = true }
rmcp = { workspace = true, default-features = false, features = [
@@ -66,5 +70,6 @@ rmcp = { workspace = true, default-features = false, features = [
"transport-streamable-http-server",
] }
serial_test = { workspace = true }
tokio-tungstenite = { workspace = true }
wiremock = { workspace = true }
shlex = { workspace = true }

View File

@@ -19,7 +19,14 @@
## Protocol
Similar to [MCP](https://modelcontextprotocol.io/), `codex app-server` supports bidirectional communication, streaming JSONL over stdio. The protocol is JSON-RPC 2.0, though the `"jsonrpc":"2.0"` header is omitted.
Similar to [MCP](https://modelcontextprotocol.io/), `codex app-server` supports bidirectional communication using JSON-RPC 2.0 messages (with the `"jsonrpc":"2.0"` header omitted on the wire).
Supported transports:
- stdio (`--listen stdio://`, default): newline-delimited JSON (JSONL)
- websocket (`--listen ws://IP:PORT`): one JSON-RPC message per websocket text frame (**experimental / unsupported**)
Websocket transport is currently experimental and unsupported. Do not rely on it for production workloads.
## Message Schema
@@ -42,7 +49,7 @@ Use the thread APIs to create, list, or archive conversations. Drive a conversat
## Lifecycle Overview
- Initialize once: Immediately after launching the codex app-server process, send an `initialize` request with your client metadata, then emit an `initialized` notification. Any other request before this handshake gets rejected.
- Initialize once per connection: Immediately after opening a transport connection, send an `initialize` request with your client metadata, then emit an `initialized` notification. Any other request on that connection before this handshake gets rejected.
- Start (or resume) a thread: Call `thread/start` to open a fresh conversation. The response returns the thread object and youll also get a `thread/started` notification. If youre continuing an existing conversation, call `thread/resume` with its ID instead. If you want to branch from an existing conversation, call `thread/fork` to create a new thread id with copied history.
- Begin a turn: To send user input, call `turn/start` with the target `threadId` and the user's input. Optional fields let you override model, cwd, sandbox policy, etc. This immediately returns the new turn object and triggers a `turn/started` notification.
- Stream events: After `turn/start`, keep reading JSON-RPC notifications on stdout. Youll see `item/started`, `item/completed`, deltas like `item/agentMessage/delta`, tool progress, etc. These represent streaming model output plus any side effects (commands, tool calls, reasoning notes).
@@ -50,7 +57,7 @@ Use the thread APIs to create, list, or archive conversations. Drive a conversat
## Initialization
Clients must send a single `initialize` request before invoking any other method, then acknowledge with an `initialized` notification. The server returns the user agent string it will present to upstream services; subsequent requests issued before initialization receive a `"Not initialized"` error, and repeated `initialize` calls receive an `"Already initialized"` error.
Clients must send a single `initialize` request per transport connection before invoking any other method on that connection, then acknowledge with an `initialized` notification. The server returns the user agent string it will present to upstream services; subsequent requests issued before initialization receive a `"Not initialized"` error, and repeated `initialize` calls on the same connection receive an `"Already initialized"` error.
Applications building on top of `codex app-server` should identify themselves via the `clientInfo` parameter.

View File

@@ -1093,7 +1093,7 @@ pub(crate) async fn apply_bespoke_event_handling(
),
data: None,
};
outgoing.send_error(request_id, error).await;
outgoing.send_error(request_id.clone(), error).await;
return;
}
}
@@ -1107,7 +1107,7 @@ pub(crate) async fn apply_bespoke_event_handling(
),
data: None,
};
outgoing.send_error(request_id, error).await;
outgoing.send_error(request_id.clone(), error).await;
return;
}
};
@@ -1831,6 +1831,7 @@ async fn construct_mcp_tool_call_end_notification(
mod tests {
use super::*;
use crate::CHANNEL_CAPACITY;
use crate::outgoing_message::OutgoingEnvelope;
use crate::outgoing_message::OutgoingMessage;
use crate::outgoing_message::OutgoingMessageSender;
use anyhow::Result;
@@ -1858,6 +1859,21 @@ mod tests {
Arc::new(Mutex::new(HashMap::new()))
}
async fn recv_broadcast_message(
rx: &mut mpsc::Receiver<OutgoingEnvelope>,
) -> Result<OutgoingMessage> {
let envelope = rx
.recv()
.await
.ok_or_else(|| anyhow!("should send one message"))?;
match envelope {
OutgoingEnvelope::Broadcast { message } => Ok(message),
OutgoingEnvelope::ToConnection { connection_id, .. } => {
bail!("unexpected targeted message for connection {connection_id:?}")
}
}
}
#[test]
fn file_change_accept_for_session_maps_to_approved_for_session() {
let (decision, completion_status) =
@@ -1910,10 +1926,7 @@ mod tests {
)
.await;
let msg = rx
.recv()
.await
.ok_or_else(|| anyhow!("should send one notification"))?;
let msg = recv_broadcast_message(&mut rx).await?;
match msg {
OutgoingMessage::AppServerNotification(ServerNotification::TurnCompleted(n)) => {
assert_eq!(n.turn.id, event_turn_id);
@@ -1952,10 +1965,7 @@ mod tests {
)
.await;
let msg = rx
.recv()
.await
.ok_or_else(|| anyhow!("should send one notification"))?;
let msg = recv_broadcast_message(&mut rx).await?;
match msg {
OutgoingMessage::AppServerNotification(ServerNotification::TurnCompleted(n)) => {
assert_eq!(n.turn.id, event_turn_id);
@@ -1994,10 +2004,7 @@ mod tests {
)
.await;
let msg = rx
.recv()
.await
.ok_or_else(|| anyhow!("should send one notification"))?;
let msg = recv_broadcast_message(&mut rx).await?;
match msg {
OutgoingMessage::AppServerNotification(ServerNotification::TurnCompleted(n)) => {
assert_eq!(n.turn.id, event_turn_id);
@@ -2046,10 +2053,7 @@ mod tests {
)
.await;
let msg = rx
.recv()
.await
.ok_or_else(|| anyhow!("should send one notification"))?;
let msg = recv_broadcast_message(&mut rx).await?;
match msg {
OutgoingMessage::AppServerNotification(ServerNotification::TurnPlanUpdated(n)) => {
assert_eq!(n.thread_id, conversation_id.to_string());
@@ -2117,10 +2121,7 @@ mod tests {
)
.await;
let first = rx
.recv()
.await
.ok_or_else(|| anyhow!("expected usage notification"))?;
let first = recv_broadcast_message(&mut rx).await?;
match first {
OutgoingMessage::AppServerNotification(
ServerNotification::ThreadTokenUsageUpdated(payload),
@@ -2136,10 +2137,7 @@ mod tests {
other => bail!("unexpected notification: {other:?}"),
}
let second = rx
.recv()
.await
.ok_or_else(|| anyhow!("expected rate limit notification"))?;
let second = recv_broadcast_message(&mut rx).await?;
match second {
OutgoingMessage::AppServerNotification(
ServerNotification::AccountRateLimitsUpdated(payload),
@@ -2276,10 +2274,7 @@ mod tests {
.await;
// Verify: A turn 1
let msg = rx
.recv()
.await
.ok_or_else(|| anyhow!("should send first notification"))?;
let msg = recv_broadcast_message(&mut rx).await?;
match msg {
OutgoingMessage::AppServerNotification(ServerNotification::TurnCompleted(n)) => {
assert_eq!(n.turn.id, a_turn1);
@@ -2297,10 +2292,7 @@ mod tests {
}
// Verify: B turn 1
let msg = rx
.recv()
.await
.ok_or_else(|| anyhow!("should send second notification"))?;
let msg = recv_broadcast_message(&mut rx).await?;
match msg {
OutgoingMessage::AppServerNotification(ServerNotification::TurnCompleted(n)) => {
assert_eq!(n.turn.id, b_turn1);
@@ -2318,10 +2310,7 @@ mod tests {
}
// Verify: A turn 2
let msg = rx
.recv()
.await
.ok_or_else(|| anyhow!("should send third notification"))?;
let msg = recv_broadcast_message(&mut rx).await?;
match msg {
OutgoingMessage::AppServerNotification(ServerNotification::TurnCompleted(n)) => {
assert_eq!(n.turn.id, a_turn2);
@@ -2487,10 +2476,7 @@ mod tests {
)
.await;
let msg = rx
.recv()
.await
.ok_or_else(|| anyhow!("should send one notification"))?;
let msg = recv_broadcast_message(&mut rx).await?;
match msg {
OutgoingMessage::AppServerNotification(ServerNotification::TurnDiffUpdated(
notification,

File diff suppressed because it is too large Load Diff

View File

@@ -8,14 +8,24 @@ use codex_core::config::ConfigBuilder;
use codex_core::config_loader::CloudRequirementsLoader;
use codex_core::config_loader::ConfigLayerStackOrdering;
use codex_core::config_loader::LoaderOverrides;
use std::collections::HashMap;
use std::io::ErrorKind;
use std::io::Result as IoResult;
use std::path::PathBuf;
use std::sync::Arc;
use crate::message_processor::MessageProcessor;
use crate::message_processor::MessageProcessorArgs;
use crate::outgoing_message::OutgoingMessage;
use crate::outgoing_message::ConnectionId;
use crate::outgoing_message::OutgoingEnvelope;
use crate::outgoing_message::OutgoingMessageSender;
use crate::transport::CHANNEL_CAPACITY;
use crate::transport::ConnectionState;
use crate::transport::TransportEvent;
use crate::transport::has_initialized_connections;
use crate::transport::route_outgoing_envelope;
use crate::transport::start_stdio_connection;
use crate::transport::start_websocket_acceptor;
use codex_app_server_protocol::ConfigLayerSource;
use codex_app_server_protocol::ConfigWarningNotification;
use codex_app_server_protocol::JSONRPCMessage;
@@ -26,13 +36,9 @@ use codex_core::check_execpolicy_for_warnings;
use codex_core::config_loader::ConfigLoadError;
use codex_core::config_loader::TextRange as CoreTextRange;
use codex_feedback::CodexFeedback;
use tokio::io::AsyncBufReadExt;
use tokio::io::AsyncWriteExt;
use tokio::io::BufReader;
use tokio::io::{self};
use tokio::sync::mpsc;
use tokio::task::JoinHandle;
use toml::Value as TomlValue;
use tracing::debug;
use tracing::error;
use tracing::info;
use tracing::warn;
@@ -51,11 +57,9 @@ mod fuzzy_file_search;
mod message_processor;
mod models;
mod outgoing_message;
mod transport;
/// Size of the bounded channels used to communicate between tasks. The value
/// is a balance between throughput and memory usage 128 messages should be
/// plenty for an interactive CLI.
const CHANNEL_CAPACITY: usize = 128;
pub use crate::transport::AppServerTransport;
fn config_warning_from_error(
summary: impl Into<String>,
@@ -173,32 +177,39 @@ pub async fn run_main(
loader_overrides: LoaderOverrides,
default_analytics_enabled: bool,
) -> IoResult<()> {
// Set up channels.
let (incoming_tx, mut incoming_rx) = mpsc::channel::<JSONRPCMessage>(CHANNEL_CAPACITY);
let (outgoing_tx, mut outgoing_rx) = mpsc::channel::<OutgoingMessage>(CHANNEL_CAPACITY);
run_main_with_transport(
codex_linux_sandbox_exe,
cli_config_overrides,
loader_overrides,
default_analytics_enabled,
AppServerTransport::Stdio,
)
.await
}
// Task: read from stdin, push to `incoming_tx`.
let stdin_reader_handle = tokio::spawn({
async move {
let stdin = io::stdin();
let reader = BufReader::new(stdin);
let mut lines = reader.lines();
pub async fn run_main_with_transport(
codex_linux_sandbox_exe: Option<PathBuf>,
cli_config_overrides: CliConfigOverrides,
loader_overrides: LoaderOverrides,
default_analytics_enabled: bool,
transport: AppServerTransport,
) -> IoResult<()> {
let (transport_event_tx, mut transport_event_rx) =
mpsc::channel::<TransportEvent>(CHANNEL_CAPACITY);
let (outgoing_tx, mut outgoing_rx) = mpsc::channel::<OutgoingEnvelope>(CHANNEL_CAPACITY);
while let Some(line) = lines.next_line().await.unwrap_or_default() {
match serde_json::from_str::<JSONRPCMessage>(&line) {
Ok(msg) => {
if incoming_tx.send(msg).await.is_err() {
// Receiver gone nothing left to do.
break;
}
}
Err(e) => error!("Failed to deserialize JSONRPCMessage: {e}"),
}
}
debug!("stdin reader finished (EOF)");
let mut stdio_handles = Vec::<JoinHandle<()>>::new();
let mut websocket_accept_handle = None;
match transport {
AppServerTransport::Stdio => {
start_stdio_connection(transport_event_tx.clone(), &mut stdio_handles).await?;
}
});
AppServerTransport::WebSocket { bind_address } => {
websocket_accept_handle =
Some(start_websocket_acceptor(bind_address, transport_event_tx.clone()).await?);
}
}
let shutdown_when_no_connections = matches!(transport, AppServerTransport::Stdio);
// Parse CLI overrides once and derive the base Config eagerly so later
// components do not need to work with raw TOML values.
@@ -327,15 +338,14 @@ pub async fn run_main(
}
}
// Task: process incoming messages.
let processor_handle = tokio::spawn({
let outgoing_message_sender = OutgoingMessageSender::new(outgoing_tx);
let outgoing_message_sender = Arc::new(OutgoingMessageSender::new(outgoing_tx));
let cli_overrides: Vec<(String, TomlValue)> = cli_kv_overrides.clone();
let loader_overrides = loader_overrides_for_config_api;
let mut processor = MessageProcessor::new(MessageProcessorArgs {
outgoing: outgoing_message_sender,
codex_linux_sandbox_exe,
config: std::sync::Arc::new(config),
config: Arc::new(config),
cli_overrides,
loader_overrides,
cloud_requirements: cloud_requirements.clone(),
@@ -343,25 +353,65 @@ pub async fn run_main(
config_warnings,
});
let mut thread_created_rx = processor.thread_created_receiver();
let mut connections = HashMap::<ConnectionId, ConnectionState>::new();
async move {
let mut listen_for_threads = true;
loop {
tokio::select! {
msg = incoming_rx.recv() => {
let Some(msg) = msg else {
event = transport_event_rx.recv() => {
let Some(event) = event else {
break;
};
match msg {
JSONRPCMessage::Request(r) => processor.process_request(r).await,
JSONRPCMessage::Response(r) => processor.process_response(r).await,
JSONRPCMessage::Notification(n) => processor.process_notification(n).await,
JSONRPCMessage::Error(e) => processor.process_error(e).await,
match event {
TransportEvent::ConnectionOpened { connection_id, writer } => {
connections.insert(connection_id, ConnectionState::new(writer));
}
TransportEvent::ConnectionClosed { connection_id } => {
connections.remove(&connection_id);
if shutdown_when_no_connections && connections.is_empty() {
break;
}
}
TransportEvent::IncomingMessage { connection_id, message } => {
match message {
JSONRPCMessage::Request(request) => {
let Some(connection_state) = connections.get_mut(&connection_id) else {
warn!("dropping request from unknown connection: {:?}", connection_id);
continue;
};
processor
.process_request(
connection_id,
request,
&mut connection_state.session,
)
.await;
}
JSONRPCMessage::Response(response) => {
processor.process_response(response).await;
}
JSONRPCMessage::Notification(notification) => {
processor.process_notification(notification).await;
}
JSONRPCMessage::Error(err) => {
processor.process_error(err).await;
}
}
}
}
}
envelope = outgoing_rx.recv() => {
let Some(envelope) = envelope else {
break;
};
route_outgoing_envelope(&mut connections, envelope).await;
}
created = thread_created_rx.recv(), if listen_for_threads => {
match created {
Ok(thread_id) => {
processor.try_attach_thread_listener(thread_id).await;
if has_initialized_connections(&connections) {
processor.try_attach_thread_listener(thread_id).await;
}
}
Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => {
// TODO(jif) handle lag.
@@ -382,33 +432,17 @@ pub async fn run_main(
}
});
// Task: write outgoing messages to stdout.
let stdout_writer_handle = tokio::spawn(async move {
let mut stdout = io::stdout();
while let Some(outgoing_message) = outgoing_rx.recv().await {
let Ok(value) = serde_json::to_value(outgoing_message) else {
error!("Failed to convert OutgoingMessage to JSON value");
continue;
};
match serde_json::to_string(&value) {
Ok(mut json) => {
json.push('\n');
if let Err(e) = stdout.write_all(json.as_bytes()).await {
error!("Failed to write to stdout: {e}");
break;
}
}
Err(e) => error!("Failed to serialize JSONRPCMessage: {e}"),
}
}
drop(transport_event_tx);
info!("stdout writer exited (channel closed)");
});
let _ = processor_handle.await;
// Wait for all tasks to finish. The typical exit path is the stdin reader
// hitting EOF which, once it drops `incoming_tx`, propagates shutdown to
// the processor and then to the stdout task.
let _ = tokio::join!(stdin_reader_handle, processor_handle, stdout_writer_handle);
if let Some(handle) = websocket_accept_handle {
handle.abort();
}
for handle in stdio_handles {
let _ = handle.await;
}
Ok(())
}

View File

@@ -1,4 +1,6 @@
use codex_app_server::run_main;
use clap::Parser;
use codex_app_server::AppServerTransport;
use codex_app_server::run_main_with_transport;
use codex_arg0::arg0_dispatch_or_else;
use codex_common::CliConfigOverrides;
use codex_core::config_loader::LoaderOverrides;
@@ -8,19 +10,34 @@ use std::path::PathBuf;
// managed config file without writing to /etc.
const MANAGED_CONFIG_PATH_ENV_VAR: &str = "CODEX_APP_SERVER_MANAGED_CONFIG_PATH";
#[derive(Debug, Parser)]
struct AppServerArgs {
/// Transport endpoint URL. Supported values: `stdio://` (default),
/// `ws://IP:PORT`.
#[arg(
long = "listen",
value_name = "URL",
default_value = AppServerTransport::DEFAULT_LISTEN_URL
)]
listen: AppServerTransport,
}
fn main() -> anyhow::Result<()> {
arg0_dispatch_or_else(|codex_linux_sandbox_exe| async move {
let args = AppServerArgs::parse();
let managed_config_path = managed_config_path_from_debug_env();
let loader_overrides = LoaderOverrides {
managed_config_path,
..Default::default()
};
let transport = args.listen;
run_main(
run_main_with_transport(
codex_linux_sandbox_exe,
CliConfigOverrides::default(),
loader_overrides,
false,
transport,
)
.await?;
Ok(())

View File

@@ -1,12 +1,12 @@
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use crate::codex_message_processor::CodexMessageProcessor;
use crate::codex_message_processor::CodexMessageProcessorArgs;
use crate::config_api::ConfigApi;
use crate::error_code::INVALID_REQUEST_ERROR_CODE;
use crate::outgoing_message::ConnectionId;
use crate::outgoing_message::ConnectionRequestId;
use crate::outgoing_message::OutgoingMessageSender;
use async_trait::async_trait;
use codex_app_server_protocol::ChatgptAuthTokensRefreshParams;
@@ -25,7 +25,6 @@ use codex_app_server_protocol::JSONRPCErrorError;
use codex_app_server_protocol::JSONRPCNotification;
use codex_app_server_protocol::JSONRPCRequest;
use codex_app_server_protocol::JSONRPCResponse;
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::ServerNotification;
use codex_app_server_protocol::ServerRequestPayload;
use codex_app_server_protocol::experimental_required_message;
@@ -110,13 +109,17 @@ pub(crate) struct MessageProcessor {
codex_message_processor: CodexMessageProcessor,
config_api: ConfigApi,
config: Arc<Config>,
initialized: bool,
experimental_api_enabled: Arc<AtomicBool>,
config_warnings: Vec<ConfigWarningNotification>,
config_warnings: Arc<Vec<ConfigWarningNotification>>,
}
#[derive(Debug, Default)]
pub(crate) struct ConnectionSessionState {
pub(crate) initialized: bool,
experimental_api_enabled: bool,
}
pub(crate) struct MessageProcessorArgs {
pub(crate) outgoing: OutgoingMessageSender,
pub(crate) outgoing: Arc<OutgoingMessageSender>,
pub(crate) codex_linux_sandbox_exe: Option<PathBuf>,
pub(crate) config: Arc<Config>,
pub(crate) cli_overrides: Vec<(String, TomlValue)>,
@@ -140,8 +143,6 @@ impl MessageProcessor {
feedback,
config_warnings,
} = args;
let outgoing = Arc::new(outgoing);
let experimental_api_enabled = Arc::new(AtomicBool::new(false));
let auth_manager = AuthManager::shared(
config.codex_home.clone(),
false,
@@ -178,14 +179,20 @@ impl MessageProcessor {
codex_message_processor,
config_api,
config,
initialized: false,
experimental_api_enabled,
config_warnings,
config_warnings: Arc::new(config_warnings),
}
}
pub(crate) async fn process_request(&mut self, request: JSONRPCRequest) {
let request_id = request.id.clone();
pub(crate) async fn process_request(
&mut self,
connection_id: ConnectionId,
request: JSONRPCRequest,
session: &mut ConnectionSessionState,
) {
let request_id = ConnectionRequestId {
connection_id,
request_id: request.id.clone(),
};
let request_json = match serde_json::to_value(&request) {
Ok(request_json) => request_json,
Err(err) => {
@@ -216,7 +223,11 @@ impl MessageProcessor {
// Handle Initialize internally so CodexMessageProcessor does not have to concern
// itself with the `initialized` bool.
ClientRequest::Initialize { request_id, params } => {
if self.initialized {
let request_id = ConnectionRequestId {
connection_id,
request_id,
};
if session.initialized {
let error = JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: "Already initialized".to_string(),
@@ -225,12 +236,16 @@ impl MessageProcessor {
self.outgoing.send_error(request_id, error).await;
return;
} else {
let experimental_api_enabled = params
// TODO(maxj): Revisit capability scoping for `experimental_api_enabled`.
// Current behavior is per-connection. Reviewer feedback notes this can
// create odd cross-client behavior (for example dynamic tool calls on a
// shared thread when another connected client did not opt into
// experimental API). Proposed direction is instance-global first-write-wins
// with initialize-time mismatch rejection.
session.experimental_api_enabled = params
.capabilities
.as_ref()
.is_some_and(|cap| cap.experimental_api);
self.experimental_api_enabled
.store(experimental_api_enabled, Ordering::Relaxed);
let ClientInfo {
name,
title: _title,
@@ -246,7 +261,7 @@ impl MessageProcessor {
),
data: None,
};
self.outgoing.send_error(request_id, error).await;
self.outgoing.send_error(request_id.clone(), error).await;
return;
}
SetOriginatorError::AlreadyInitialized => {
@@ -267,22 +282,20 @@ impl MessageProcessor {
let response = InitializeResponse { user_agent };
self.outgoing.send_response(request_id, response).await;
self.initialized = true;
if !self.config_warnings.is_empty() {
for notification in self.config_warnings.drain(..) {
self.outgoing
.send_server_notification(ServerNotification::ConfigWarning(
notification,
))
.await;
}
session.initialized = true;
for notification in self.config_warnings.iter().cloned() {
self.outgoing
.send_server_notification(ServerNotification::ConfigWarning(
notification,
))
.await;
}
return;
}
}
_ => {
if !self.initialized {
if !session.initialized {
let error = JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: "Not initialized".to_string(),
@@ -295,7 +308,7 @@ impl MessageProcessor {
}
if let Some(reason) = codex_request.experimental_reason()
&& !self.experimental_api_enabled.load(Ordering::Relaxed)
&& !session.experimental_api_enabled
{
let error = JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
@@ -308,22 +321,49 @@ impl MessageProcessor {
match codex_request {
ClientRequest::ConfigRead { request_id, params } => {
self.handle_config_read(request_id, params).await;
self.handle_config_read(
ConnectionRequestId {
connection_id,
request_id,
},
params,
)
.await;
}
ClientRequest::ConfigValueWrite { request_id, params } => {
self.handle_config_value_write(request_id, params).await;
self.handle_config_value_write(
ConnectionRequestId {
connection_id,
request_id,
},
params,
)
.await;
}
ClientRequest::ConfigBatchWrite { request_id, params } => {
self.handle_config_batch_write(request_id, params).await;
self.handle_config_batch_write(
ConnectionRequestId {
connection_id,
request_id,
},
params,
)
.await;
}
ClientRequest::ConfigRequirementsRead {
request_id,
params: _,
} => {
self.handle_config_requirements_read(request_id).await;
self.handle_config_requirements_read(ConnectionRequestId {
connection_id,
request_id,
})
.await;
}
other => {
self.codex_message_processor.process_request(other).await;
self.codex_message_processor
.process_request(connection_id, other)
.await;
}
}
}
@@ -339,9 +379,6 @@ impl MessageProcessor {
}
pub(crate) async fn try_attach_thread_listener(&mut self, thread_id: ThreadId) {
if !self.initialized {
return;
}
self.codex_message_processor
.try_attach_thread_listener(thread_id)
.await;
@@ -360,7 +397,7 @@ impl MessageProcessor {
self.outgoing.notify_client_error(err.id, err.error).await;
}
async fn handle_config_read(&self, request_id: RequestId, params: ConfigReadParams) {
async fn handle_config_read(&self, request_id: ConnectionRequestId, params: ConfigReadParams) {
match self.config_api.read(params).await {
Ok(response) => self.outgoing.send_response(request_id, response).await,
Err(error) => self.outgoing.send_error(request_id, error).await,
@@ -369,7 +406,7 @@ impl MessageProcessor {
async fn handle_config_value_write(
&self,
request_id: RequestId,
request_id: ConnectionRequestId,
params: ConfigValueWriteParams,
) {
match self.config_api.write_value(params).await {
@@ -380,7 +417,7 @@ impl MessageProcessor {
async fn handle_config_batch_write(
&self,
request_id: RequestId,
request_id: ConnectionRequestId,
params: ConfigBatchWriteParams,
) {
match self.config_api.batch_write(params).await {
@@ -389,7 +426,7 @@ impl MessageProcessor {
}
}
async fn handle_config_requirements_read(&self, request_id: RequestId) {
async fn handle_config_requirements_read(&self, request_id: ConnectionRequestId) {
match self.config_api.config_requirements_read().await {
Ok(response) => self.outgoing.send_response(request_id, response).await,
Err(error) => self.outgoing.send_error(request_id, error).await,

View File

@@ -19,17 +19,39 @@ use crate::error_code::INTERNAL_ERROR_CODE;
#[cfg(test)]
use codex_protocol::account::PlanType;
/// Stable identifier for a transport connection.
#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
pub(crate) struct ConnectionId(pub(crate) u64);
/// Stable identifier for a client request scoped to a transport connection.
#[derive(Clone, Debug, Eq, Hash, PartialEq)]
pub(crate) struct ConnectionRequestId {
pub(crate) connection_id: ConnectionId,
pub(crate) request_id: RequestId,
}
#[derive(Debug, Clone)]
pub(crate) enum OutgoingEnvelope {
ToConnection {
connection_id: ConnectionId,
message: OutgoingMessage,
},
Broadcast {
message: OutgoingMessage,
},
}
/// Sends messages to the client and manages request callbacks.
pub(crate) struct OutgoingMessageSender {
next_request_id: AtomicI64,
sender: mpsc::Sender<OutgoingMessage>,
next_server_request_id: AtomicI64,
sender: mpsc::Sender<OutgoingEnvelope>,
request_id_to_callback: Mutex<HashMap<RequestId, oneshot::Sender<Result>>>,
}
impl OutgoingMessageSender {
pub(crate) fn new(sender: mpsc::Sender<OutgoingMessage>) -> Self {
pub(crate) fn new(sender: mpsc::Sender<OutgoingEnvelope>) -> Self {
Self {
next_request_id: AtomicI64::new(0),
next_server_request_id: AtomicI64::new(0),
sender,
request_id_to_callback: Mutex::new(HashMap::new()),
}
@@ -47,7 +69,7 @@ impl OutgoingMessageSender {
&self,
request: ServerRequestPayload,
) -> (RequestId, oneshot::Receiver<Result>) {
let id = RequestId::Integer(self.next_request_id.fetch_add(1, Ordering::Relaxed));
let id = RequestId::Integer(self.next_server_request_id.fetch_add(1, Ordering::Relaxed));
let outgoing_message_id = id.clone();
let (tx_approve, rx_approve) = oneshot::channel();
{
@@ -57,7 +79,13 @@ impl OutgoingMessageSender {
let outgoing_message =
OutgoingMessage::Request(request.request_with_id(outgoing_message_id.clone()));
if let Err(err) = self.sender.send(outgoing_message).await {
if let Err(err) = self
.sender
.send(OutgoingEnvelope::Broadcast {
message: outgoing_message,
})
.await
{
warn!("failed to send request {outgoing_message_id:?} to client: {err:?}");
let mut request_id_to_callback = self.request_id_to_callback.lock().await;
request_id_to_callback.remove(&outgoing_message_id);
@@ -107,17 +135,31 @@ impl OutgoingMessageSender {
entry.is_some()
}
pub(crate) async fn send_response<T: Serialize>(&self, id: RequestId, response: T) {
pub(crate) async fn send_response<T: Serialize>(
&self,
request_id: ConnectionRequestId,
response: T,
) {
match serde_json::to_value(response) {
Ok(result) => {
let outgoing_message = OutgoingMessage::Response(OutgoingResponse { id, result });
if let Err(err) = self.sender.send(outgoing_message).await {
let outgoing_message = OutgoingMessage::Response(OutgoingResponse {
id: request_id.request_id,
result,
});
if let Err(err) = self
.sender
.send(OutgoingEnvelope::ToConnection {
connection_id: request_id.connection_id,
message: outgoing_message,
})
.await
{
warn!("failed to send response to client: {err:?}");
}
}
Err(err) => {
self.send_error(
id,
request_id,
JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message: format!("failed to serialize response: {err}"),
@@ -132,7 +174,9 @@ impl OutgoingMessageSender {
pub(crate) async fn send_server_notification(&self, notification: ServerNotification) {
if let Err(err) = self
.sender
.send(OutgoingMessage::AppServerNotification(notification))
.send(OutgoingEnvelope::Broadcast {
message: OutgoingMessage::AppServerNotification(notification),
})
.await
{
warn!("failed to send server notification to client: {err:?}");
@@ -143,14 +187,34 @@ impl OutgoingMessageSender {
/// [`OutgoingMessage::Notification`] should be removed.
pub(crate) async fn send_notification(&self, notification: OutgoingNotification) {
let outgoing_message = OutgoingMessage::Notification(notification);
if let Err(err) = self.sender.send(outgoing_message).await {
if let Err(err) = self
.sender
.send(OutgoingEnvelope::Broadcast {
message: outgoing_message,
})
.await
{
warn!("failed to send notification to client: {err:?}");
}
}
pub(crate) async fn send_error(&self, id: RequestId, error: JSONRPCErrorError) {
let outgoing_message = OutgoingMessage::Error(OutgoingError { id, error });
if let Err(err) = self.sender.send(outgoing_message).await {
pub(crate) async fn send_error(
&self,
request_id: ConnectionRequestId,
error: JSONRPCErrorError,
) {
let outgoing_message = OutgoingMessage::Error(OutgoingError {
id: request_id.request_id,
error,
});
if let Err(err) = self
.sender
.send(OutgoingEnvelope::ToConnection {
connection_id: request_id.connection_id,
message: outgoing_message,
})
.await
{
warn!("failed to send error to client: {err:?}");
}
}
@@ -190,6 +254,8 @@ pub(crate) struct OutgoingError {
#[cfg(test)]
mod tests {
use std::time::Duration;
use codex_app_server_protocol::AccountLoginCompletedNotification;
use codex_app_server_protocol::AccountRateLimitsUpdatedNotification;
use codex_app_server_protocol::AccountUpdatedNotification;
@@ -200,6 +266,7 @@ mod tests {
use codex_app_server_protocol::RateLimitWindow;
use pretty_assertions::assert_eq;
use serde_json::json;
use tokio::time::timeout;
use uuid::Uuid;
use super::*;
@@ -336,4 +403,75 @@ mod tests {
"ensure the notification serializes correctly"
);
}
#[tokio::test]
async fn send_response_routes_to_target_connection() {
let (tx, mut rx) = mpsc::channel::<OutgoingEnvelope>(4);
let outgoing = OutgoingMessageSender::new(tx);
let request_id = ConnectionRequestId {
connection_id: ConnectionId(42),
request_id: RequestId::Integer(7),
};
outgoing
.send_response(request_id.clone(), json!({ "ok": true }))
.await;
let envelope = timeout(Duration::from_secs(1), rx.recv())
.await
.expect("should receive envelope before timeout")
.expect("channel should contain one message");
match envelope {
OutgoingEnvelope::ToConnection {
connection_id,
message,
} => {
assert_eq!(connection_id, ConnectionId(42));
let OutgoingMessage::Response(response) = message else {
panic!("expected response message");
};
assert_eq!(response.id, request_id.request_id);
assert_eq!(response.result, json!({ "ok": true }));
}
other => panic!("expected targeted response envelope, got: {other:?}"),
}
}
#[tokio::test]
async fn send_error_routes_to_target_connection() {
let (tx, mut rx) = mpsc::channel::<OutgoingEnvelope>(4);
let outgoing = OutgoingMessageSender::new(tx);
let request_id = ConnectionRequestId {
connection_id: ConnectionId(9),
request_id: RequestId::Integer(3),
};
let error = JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message: "boom".to_string(),
data: None,
};
outgoing.send_error(request_id.clone(), error.clone()).await;
let envelope = timeout(Duration::from_secs(1), rx.recv())
.await
.expect("should receive envelope before timeout")
.expect("channel should contain one message");
match envelope {
OutgoingEnvelope::ToConnection {
connection_id,
message,
} => {
assert_eq!(connection_id, ConnectionId(9));
let OutgoingMessage::Error(outgoing_error) = message else {
panic!("expected error message");
};
assert_eq!(outgoing_error.id, RequestId::Integer(3));
assert_eq!(outgoing_error.error, error);
}
other => panic!("expected targeted error envelope, got: {other:?}"),
}
}
}

View File

@@ -0,0 +1,424 @@
use crate::message_processor::ConnectionSessionState;
use crate::outgoing_message::ConnectionId;
use crate::outgoing_message::OutgoingEnvelope;
use crate::outgoing_message::OutgoingMessage;
use codex_app_server_protocol::JSONRPCMessage;
use futures::SinkExt;
use futures::StreamExt;
use std::collections::HashMap;
use std::io::ErrorKind;
use std::io::Result as IoResult;
use std::net::SocketAddr;
use std::str::FromStr;
use std::sync::Arc;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering;
use tokio::io::AsyncBufReadExt;
use tokio::io::AsyncWriteExt;
use tokio::io::BufReader;
use tokio::io::{self};
use tokio::net::TcpListener;
use tokio::net::TcpStream;
use tokio::sync::mpsc;
use tokio::task::JoinHandle;
use tokio_tungstenite::accept_async;
use tokio_tungstenite::tungstenite::Message as WebSocketMessage;
use tracing::debug;
use tracing::error;
use tracing::info;
use tracing::warn;
/// Size of the bounded channels used to communicate between tasks. The value
/// is a balance between throughput and memory usage - 128 messages should be
/// plenty for an interactive CLI.
pub(crate) const CHANNEL_CAPACITY: usize = 128;
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub enum AppServerTransport {
Stdio,
WebSocket { bind_address: SocketAddr },
}
#[derive(Debug, Clone, Eq, PartialEq)]
pub enum AppServerTransportParseError {
UnsupportedListenUrl(String),
InvalidWebSocketListenUrl(String),
}
impl std::fmt::Display for AppServerTransportParseError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
AppServerTransportParseError::UnsupportedListenUrl(listen_url) => write!(
f,
"unsupported --listen URL `{listen_url}`; expected `stdio://` or `ws://IP:PORT`"
),
AppServerTransportParseError::InvalidWebSocketListenUrl(listen_url) => write!(
f,
"invalid websocket --listen URL `{listen_url}`; expected `ws://IP:PORT`"
),
}
}
}
impl std::error::Error for AppServerTransportParseError {}
impl AppServerTransport {
pub const DEFAULT_LISTEN_URL: &'static str = "stdio://";
pub fn from_listen_url(listen_url: &str) -> Result<Self, AppServerTransportParseError> {
if listen_url == Self::DEFAULT_LISTEN_URL {
return Ok(Self::Stdio);
}
if let Some(socket_addr) = listen_url.strip_prefix("ws://") {
let bind_address = socket_addr.parse::<SocketAddr>().map_err(|_| {
AppServerTransportParseError::InvalidWebSocketListenUrl(listen_url.to_string())
})?;
return Ok(Self::WebSocket { bind_address });
}
Err(AppServerTransportParseError::UnsupportedListenUrl(
listen_url.to_string(),
))
}
}
impl FromStr for AppServerTransport {
type Err = AppServerTransportParseError;
fn from_str(s: &str) -> Result<Self, Self::Err> {
Self::from_listen_url(s)
}
}
#[derive(Debug)]
pub(crate) enum TransportEvent {
ConnectionOpened {
connection_id: ConnectionId,
writer: mpsc::Sender<OutgoingMessage>,
},
ConnectionClosed {
connection_id: ConnectionId,
},
IncomingMessage {
connection_id: ConnectionId,
message: JSONRPCMessage,
},
}
pub(crate) struct ConnectionState {
pub(crate) writer: mpsc::Sender<OutgoingMessage>,
pub(crate) session: ConnectionSessionState,
}
impl ConnectionState {
pub(crate) fn new(writer: mpsc::Sender<OutgoingMessage>) -> Self {
Self {
writer,
session: ConnectionSessionState::default(),
}
}
}
pub(crate) async fn start_stdio_connection(
transport_event_tx: mpsc::Sender<TransportEvent>,
stdio_handles: &mut Vec<JoinHandle<()>>,
) -> IoResult<()> {
let connection_id = ConnectionId(0);
let (writer_tx, mut writer_rx) = mpsc::channel::<OutgoingMessage>(CHANNEL_CAPACITY);
transport_event_tx
.send(TransportEvent::ConnectionOpened {
connection_id,
writer: writer_tx,
})
.await
.map_err(|_| std::io::Error::new(ErrorKind::BrokenPipe, "processor unavailable"))?;
let transport_event_tx_for_reader = transport_event_tx.clone();
stdio_handles.push(tokio::spawn(async move {
let stdin = io::stdin();
let reader = BufReader::new(stdin);
let mut lines = reader.lines();
loop {
match lines.next_line().await {
Ok(Some(line)) => {
if !forward_incoming_message(
&transport_event_tx_for_reader,
connection_id,
&line,
)
.await
{
break;
}
}
Ok(None) => break,
Err(err) => {
error!("Failed reading stdin: {err}");
break;
}
}
}
let _ = transport_event_tx_for_reader
.send(TransportEvent::ConnectionClosed { connection_id })
.await;
debug!("stdin reader finished (EOF)");
}));
stdio_handles.push(tokio::spawn(async move {
let mut stdout = io::stdout();
while let Some(outgoing_message) = writer_rx.recv().await {
let Some(mut json) = serialize_outgoing_message(outgoing_message) else {
continue;
};
json.push('\n');
if let Err(err) = stdout.write_all(json.as_bytes()).await {
error!("Failed to write to stdout: {err}");
break;
}
}
info!("stdout writer exited (channel closed)");
}));
Ok(())
}
pub(crate) async fn start_websocket_acceptor(
bind_address: SocketAddr,
transport_event_tx: mpsc::Sender<TransportEvent>,
) -> IoResult<JoinHandle<()>> {
let listener = TcpListener::bind(bind_address).await?;
let local_addr = listener.local_addr()?;
info!("app-server websocket listening on ws://{local_addr}");
let connection_counter = Arc::new(AtomicU64::new(1));
Ok(tokio::spawn(async move {
loop {
match listener.accept().await {
Ok((stream, _peer_addr)) => {
let connection_id =
ConnectionId(connection_counter.fetch_add(1, Ordering::Relaxed));
let transport_event_tx_for_connection = transport_event_tx.clone();
tokio::spawn(async move {
run_websocket_connection(
connection_id,
stream,
transport_event_tx_for_connection,
)
.await;
});
}
Err(err) => {
error!("failed to accept websocket connection: {err}");
}
}
}
}))
}
async fn run_websocket_connection(
connection_id: ConnectionId,
stream: TcpStream,
transport_event_tx: mpsc::Sender<TransportEvent>,
) {
let websocket_stream = match accept_async(stream).await {
Ok(stream) => stream,
Err(err) => {
warn!("failed to complete websocket handshake: {err}");
return;
}
};
let (writer_tx, mut writer_rx) = mpsc::channel::<OutgoingMessage>(CHANNEL_CAPACITY);
if transport_event_tx
.send(TransportEvent::ConnectionOpened {
connection_id,
writer: writer_tx,
})
.await
.is_err()
{
return;
}
let (mut websocket_writer, mut websocket_reader) = websocket_stream.split();
loop {
tokio::select! {
outgoing_message = writer_rx.recv() => {
let Some(outgoing_message) = outgoing_message else {
break;
};
let Some(json) = serialize_outgoing_message(outgoing_message) else {
continue;
};
if websocket_writer.send(WebSocketMessage::Text(json.into())).await.is_err() {
break;
}
}
incoming_message = websocket_reader.next() => {
match incoming_message {
Some(Ok(WebSocketMessage::Text(text))) => {
if !forward_incoming_message(&transport_event_tx, connection_id, &text).await {
break;
}
}
Some(Ok(WebSocketMessage::Ping(payload))) => {
if websocket_writer.send(WebSocketMessage::Pong(payload)).await.is_err() {
break;
}
}
Some(Ok(WebSocketMessage::Pong(_))) => {}
Some(Ok(WebSocketMessage::Close(_))) | None => break,
Some(Ok(WebSocketMessage::Binary(_))) => {
warn!("dropping unsupported binary websocket message");
}
Some(Ok(WebSocketMessage::Frame(_))) => {}
Some(Err(err)) => {
warn!("websocket receive error: {err}");
break;
}
}
}
}
}
let _ = transport_event_tx
.send(TransportEvent::ConnectionClosed { connection_id })
.await;
}
async fn forward_incoming_message(
transport_event_tx: &mpsc::Sender<TransportEvent>,
connection_id: ConnectionId,
payload: &str,
) -> bool {
match serde_json::from_str::<JSONRPCMessage>(payload) {
Ok(message) => transport_event_tx
.send(TransportEvent::IncomingMessage {
connection_id,
message,
})
.await
.is_ok(),
Err(err) => {
error!("Failed to deserialize JSONRPCMessage: {err}");
true
}
}
}
fn serialize_outgoing_message(outgoing_message: OutgoingMessage) -> Option<String> {
let value = match serde_json::to_value(outgoing_message) {
Ok(value) => value,
Err(err) => {
error!("Failed to convert OutgoingMessage to JSON value: {err}");
return None;
}
};
match serde_json::to_string(&value) {
Ok(json) => Some(json),
Err(err) => {
error!("Failed to serialize JSONRPCMessage: {err}");
None
}
}
}
pub(crate) async fn route_outgoing_envelope(
connections: &mut HashMap<ConnectionId, ConnectionState>,
envelope: OutgoingEnvelope,
) {
match envelope {
OutgoingEnvelope::ToConnection {
connection_id,
message,
} => {
let Some(connection_state) = connections.get(&connection_id) else {
warn!(
"dropping message for disconnected connection: {:?}",
connection_id
);
return;
};
if connection_state.writer.send(message).await.is_err() {
connections.remove(&connection_id);
}
}
OutgoingEnvelope::Broadcast { message } => {
let target_connections: Vec<ConnectionId> = connections
.iter()
.filter_map(|(connection_id, connection_state)| {
if connection_state.session.initialized {
Some(*connection_id)
} else {
None
}
})
.collect();
for connection_id in target_connections {
let Some(connection_state) = connections.get(&connection_id) else {
continue;
};
if connection_state.writer.send(message.clone()).await.is_err() {
connections.remove(&connection_id);
}
}
}
}
}
pub(crate) fn has_initialized_connections(
connections: &HashMap<ConnectionId, ConnectionState>,
) -> bool {
connections
.values()
.any(|connection| connection.session.initialized)
}
#[cfg(test)]
mod tests {
use super::*;
use pretty_assertions::assert_eq;
#[test]
fn app_server_transport_parses_stdio_listen_url() {
let transport = AppServerTransport::from_listen_url(AppServerTransport::DEFAULT_LISTEN_URL)
.expect("stdio listen URL should parse");
assert_eq!(transport, AppServerTransport::Stdio);
}
#[test]
fn app_server_transport_parses_websocket_listen_url() {
let transport = AppServerTransport::from_listen_url("ws://127.0.0.1:1234")
.expect("websocket listen URL should parse");
assert_eq!(
transport,
AppServerTransport::WebSocket {
bind_address: "127.0.0.1:1234".parse().expect("valid socket address"),
}
);
}
#[test]
fn app_server_transport_rejects_invalid_websocket_listen_url() {
let err = AppServerTransport::from_listen_url("ws://localhost:1234")
.expect_err("hostname bind address should be rejected");
assert_eq!(
err.to_string(),
"invalid websocket --listen URL `ws://localhost:1234`; expected `ws://IP:PORT`"
);
}
#[test]
fn app_server_transport_rejects_unsupported_listen_url() {
let err = AppServerTransport::from_listen_url("http://127.0.0.1:1234")
.expect_err("unsupported scheme should fail");
assert_eq!(
err.to_string(),
"unsupported --listen URL `http://127.0.0.1:1234`; expected `stdio://` or `ws://IP:PORT`"
);
}
}

View File

@@ -0,0 +1,263 @@
use anyhow::Context;
use anyhow::Result;
use anyhow::bail;
use app_test_support::create_mock_responses_server_sequence_unchecked;
use codex_app_server_protocol::ClientInfo;
use codex_app_server_protocol::InitializeParams;
use codex_app_server_protocol::JSONRPCError;
use codex_app_server_protocol::JSONRPCMessage;
use codex_app_server_protocol::JSONRPCRequest;
use codex_app_server_protocol::JSONRPCResponse;
use codex_app_server_protocol::RequestId;
use futures::SinkExt;
use futures::StreamExt;
use serde_json::json;
use std::net::SocketAddr;
use std::path::Path;
use std::process::Stdio;
use tempfile::TempDir;
use tokio::io::AsyncBufReadExt;
use tokio::process::Child;
use tokio::process::Command;
use tokio::time::Duration;
use tokio::time::Instant;
use tokio::time::sleep;
use tokio::time::timeout;
use tokio_tungstenite::MaybeTlsStream;
use tokio_tungstenite::WebSocketStream;
use tokio_tungstenite::connect_async;
use tokio_tungstenite::tungstenite::Message as WebSocketMessage;
const DEFAULT_READ_TIMEOUT: Duration = Duration::from_secs(5);
type WsClient = WebSocketStream<MaybeTlsStream<tokio::net::TcpStream>>;
#[tokio::test]
async fn websocket_transport_routes_per_connection_handshake_and_responses() -> Result<()> {
let server = create_mock_responses_server_sequence_unchecked(Vec::new()).await;
let codex_home = TempDir::new()?;
create_config_toml(codex_home.path(), &server.uri(), "never")?;
let bind_addr = reserve_local_addr()?;
let mut process = spawn_websocket_server(codex_home.path(), bind_addr).await?;
let mut ws1 = connect_websocket(bind_addr).await?;
let mut ws2 = connect_websocket(bind_addr).await?;
send_initialize_request(&mut ws1, 1, "ws_client_one").await?;
let first_init = read_response_for_id(&mut ws1, 1).await?;
assert_eq!(first_init.id, RequestId::Integer(1));
// Initialize responses are request-scoped and must not leak to other
// connections.
assert_no_message(&mut ws2, Duration::from_millis(250)).await?;
send_config_read_request(&mut ws2, 2).await?;
let not_initialized = read_error_for_id(&mut ws2, 2).await?;
assert_eq!(not_initialized.error.message, "Not initialized");
send_initialize_request(&mut ws2, 3, "ws_client_two").await?;
let second_init = read_response_for_id(&mut ws2, 3).await?;
assert_eq!(second_init.id, RequestId::Integer(3));
// Same request-id on different connections must route independently.
send_config_read_request(&mut ws1, 77).await?;
send_config_read_request(&mut ws2, 77).await?;
let ws1_config = read_response_for_id(&mut ws1, 77).await?;
let ws2_config = read_response_for_id(&mut ws2, 77).await?;
assert_eq!(ws1_config.id, RequestId::Integer(77));
assert_eq!(ws2_config.id, RequestId::Integer(77));
assert!(ws1_config.result.get("config").is_some());
assert!(ws2_config.result.get("config").is_some());
process
.kill()
.await
.context("failed to stop websocket app-server process")?;
Ok(())
}
async fn spawn_websocket_server(codex_home: &Path, bind_addr: SocketAddr) -> Result<Child> {
let program = codex_utils_cargo_bin::cargo_bin("codex-app-server")
.context("should find app-server binary")?;
let mut cmd = Command::new(program);
cmd.arg("--listen")
.arg(format!("ws://{bind_addr}"))
.stdin(Stdio::null())
.stdout(Stdio::null())
.stderr(Stdio::piped())
.env("CODEX_HOME", codex_home)
.env("RUST_LOG", "debug");
let mut process = cmd
.kill_on_drop(true)
.spawn()
.context("failed to spawn websocket app-server process")?;
if let Some(stderr) = process.stderr.take() {
let mut stderr_reader = tokio::io::BufReader::new(stderr).lines();
tokio::spawn(async move {
while let Ok(Some(line)) = stderr_reader.next_line().await {
eprintln!("[websocket app-server stderr] {line}");
}
});
}
Ok(process)
}
fn reserve_local_addr() -> Result<SocketAddr> {
let listener = std::net::TcpListener::bind("127.0.0.1:0")?;
let addr = listener.local_addr()?;
drop(listener);
Ok(addr)
}
async fn connect_websocket(bind_addr: SocketAddr) -> Result<WsClient> {
let url = format!("ws://{bind_addr}");
let deadline = Instant::now() + Duration::from_secs(10);
loop {
match connect_async(&url).await {
Ok((stream, _response)) => return Ok(stream),
Err(err) => {
if Instant::now() >= deadline {
bail!("failed to connect websocket to {url}: {err}");
}
sleep(Duration::from_millis(50)).await;
}
}
}
}
async fn send_initialize_request(stream: &mut WsClient, id: i64, client_name: &str) -> Result<()> {
let params = InitializeParams {
client_info: ClientInfo {
name: client_name.to_string(),
title: Some("WebSocket Test Client".to_string()),
version: "0.1.0".to_string(),
},
capabilities: None,
};
send_request(
stream,
"initialize",
id,
Some(serde_json::to_value(params)?),
)
.await
}
async fn send_config_read_request(stream: &mut WsClient, id: i64) -> Result<()> {
send_request(
stream,
"config/read",
id,
Some(json!({ "includeLayers": false })),
)
.await
}
async fn send_request(
stream: &mut WsClient,
method: &str,
id: i64,
params: Option<serde_json::Value>,
) -> Result<()> {
let message = JSONRPCMessage::Request(JSONRPCRequest {
id: RequestId::Integer(id),
method: method.to_string(),
params,
});
send_jsonrpc(stream, message).await
}
async fn send_jsonrpc(stream: &mut WsClient, message: JSONRPCMessage) -> Result<()> {
let payload = serde_json::to_string(&message)?;
stream
.send(WebSocketMessage::Text(payload.into()))
.await
.context("failed to send websocket frame")
}
async fn read_response_for_id(stream: &mut WsClient, id: i64) -> Result<JSONRPCResponse> {
let target_id = RequestId::Integer(id);
loop {
let message = read_jsonrpc_message(stream).await?;
if let JSONRPCMessage::Response(response) = message
&& response.id == target_id
{
return Ok(response);
}
}
}
async fn read_error_for_id(stream: &mut WsClient, id: i64) -> Result<JSONRPCError> {
let target_id = RequestId::Integer(id);
loop {
let message = read_jsonrpc_message(stream).await?;
if let JSONRPCMessage::Error(err) = message
&& err.id == target_id
{
return Ok(err);
}
}
}
async fn read_jsonrpc_message(stream: &mut WsClient) -> Result<JSONRPCMessage> {
loop {
let frame = timeout(DEFAULT_READ_TIMEOUT, stream.next())
.await
.context("timed out waiting for websocket frame")?
.context("websocket stream ended unexpectedly")?
.context("failed to read websocket frame")?;
match frame {
WebSocketMessage::Text(text) => return Ok(serde_json::from_str(text.as_ref())?),
WebSocketMessage::Ping(payload) => {
stream.send(WebSocketMessage::Pong(payload)).await?;
}
WebSocketMessage::Pong(_) => {}
WebSocketMessage::Close(frame) => {
bail!("websocket closed unexpectedly: {frame:?}")
}
WebSocketMessage::Binary(_) => bail!("unexpected binary websocket frame"),
WebSocketMessage::Frame(_) => {}
}
}
}
async fn assert_no_message(stream: &mut WsClient, wait_for: Duration) -> Result<()> {
match timeout(wait_for, stream.next()).await {
Ok(Some(Ok(frame))) => bail!("unexpected frame while waiting for silence: {frame:?}"),
Ok(Some(Err(err))) => bail!("unexpected websocket read error: {err}"),
Ok(None) => bail!("websocket closed unexpectedly while waiting for silence"),
Err(_) => Ok(()),
}
}
fn create_config_toml(
codex_home: &Path,
server_uri: &str,
approval_policy: &str,
) -> std::io::Result<()> {
let config_toml = codex_home.join("config.toml");
std::fs::write(
config_toml,
format!(
r#"
model = "mock-model"
approval_policy = "{approval_policy}"
sandbox_mode = "read-only"
model_provider = "mock_provider"
[model_providers.mock_provider]
name = "Mock provider for test"
base_url = "{server_uri}/v1"
wire_api = "responses"
request_max_retries = 0
stream_max_retries = 0
"#
),
)
}

View File

@@ -4,6 +4,7 @@ mod app_list;
mod collaboration_mode_list;
mod compaction;
mod config_rpc;
mod connection_handling_websocket;
mod dynamic_tools;
mod experimental_api;
mod experimental_feature_list;

View File

@@ -306,6 +306,15 @@ struct AppServerCommand {
#[command(subcommand)]
subcommand: Option<AppServerSubcommand>,
/// Transport endpoint URL. Supported values: `stdio://` (default),
/// `ws://IP:PORT`.
#[arg(
long = "listen",
value_name = "URL",
default_value = codex_app_server::AppServerTransport::DEFAULT_LISTEN_URL
)]
listen: codex_app_server::AppServerTransport,
/// Controls whether analytics are enabled by default.
///
/// Analytics are disabled by default for app-server. Users have to explicitly opt in
@@ -587,11 +596,13 @@ async fn cli_main(codex_linux_sandbox_exe: Option<PathBuf>) -> anyhow::Result<()
}
Some(Subcommand::AppServer(app_server_cli)) => match app_server_cli.subcommand {
None => {
codex_app_server::run_main(
let transport = app_server_cli.listen;
codex_app_server::run_main_with_transport(
codex_linux_sandbox_exe,
root_config_overrides,
codex_core::config_loader::LoaderOverrides::default(),
app_server_cli.analytics_default_enabled,
transport,
)
.await?;
}
@@ -1328,6 +1339,10 @@ mod tests {
fn app_server_analytics_default_disabled_without_flag() {
let app_server = app_server_from_args(["codex", "app-server"].as_ref());
assert!(!app_server.analytics_default_enabled);
assert_eq!(
app_server.listen,
codex_app_server::AppServerTransport::Stdio
);
}
#[test]
@@ -1337,6 +1352,36 @@ mod tests {
assert!(app_server.analytics_default_enabled);
}
#[test]
fn app_server_listen_websocket_url_parses() {
let app_server = app_server_from_args(
["codex", "app-server", "--listen", "ws://127.0.0.1:4500"].as_ref(),
);
assert_eq!(
app_server.listen,
codex_app_server::AppServerTransport::WebSocket {
bind_address: "127.0.0.1:4500".parse().expect("valid socket address"),
}
);
}
#[test]
fn app_server_listen_stdio_url_parses() {
let app_server =
app_server_from_args(["codex", "app-server", "--listen", "stdio://"].as_ref());
assert_eq!(
app_server.listen,
codex_app_server::AppServerTransport::Stdio
);
}
#[test]
fn app_server_listen_invalid_url_fails_to_parse() {
let parse_result =
MultitoolCli::try_parse_from(["codex", "app-server", "--listen", "http://foo"]);
assert!(parse_result.is_err());
}
#[test]
fn features_enable_parses_feature_name() {
let cli = MultitoolCli::try_parse_from(["codex", "features", "enable", "unified_exec"])