first step

This commit is contained in:
Ruslan Nigmatullin
2026-02-27 18:24:21 -08:00
committed by Anton Panasenko
parent 6f05d8d735
commit c8e2b46acd
11 changed files with 1002 additions and 50 deletions

View File

@@ -6213,6 +6213,12 @@
"null"
]
},
"experimental_app_server_remote_control_url": {
"type": [
"string",
"null"
]
},
"forced_chatgpt_workspace_id": {
"type": [
"string",

View File

@@ -2857,6 +2857,12 @@
"null"
]
},
"experimental_app_server_remote_control_url": {
"type": [
"string",
"null"
]
},
"forced_chatgpt_workspace_id": {
"type": [
"string",

View File

@@ -233,6 +233,12 @@
"null"
]
},
"experimental_app_server_remote_control_url": {
"type": [
"string",
"null"
]
},
"forced_chatgpt_workspace_id": {
"type": [
"string",

View File

@@ -20,4 +20,4 @@ export type Config = {model: string | null, review_model: string | null, model_c
* [UNSTABLE] Optional default for where approval requests are routed for
* review.
*/
approvals_reviewer: ApprovalsReviewer | null, sandbox_mode: SandboxMode | null, sandbox_workspace_write: SandboxWorkspaceWrite | null, forced_chatgpt_workspace_id: string | null, forced_login_method: ForcedLoginMethod | null, web_search: WebSearchMode | null, tools: ToolsV2 | null, profile: string | null, profiles: { [key in string]?: ProfileV2 }, instructions: string | null, developer_instructions: string | null, compact_prompt: string | null, model_reasoning_effort: ReasoningEffort | null, model_reasoning_summary: ReasoningSummary | null, model_verbosity: Verbosity | null, service_tier: ServiceTier | null, analytics: AnalyticsConfig | null} & ({ [key in string]?: number | string | boolean | Array<JsonValue> | { [key in string]?: JsonValue } | null });
approvals_reviewer: ApprovalsReviewer | null, sandbox_mode: SandboxMode | null, sandbox_workspace_write: SandboxWorkspaceWrite | null, forced_chatgpt_workspace_id: string | null, forced_login_method: ForcedLoginMethod | null, web_search: WebSearchMode | null, tools: ToolsV2 | null, profile: string | null, profiles: { [key in string]?: ProfileV2 }, instructions: string | null, developer_instructions: string | null, compact_prompt: string | null, model_reasoning_effort: ReasoningEffort | null, model_reasoning_summary: ReasoningSummary | null, model_verbosity: Verbosity | null, service_tier: ServiceTier | null, analytics: AnalyticsConfig | null, experimental_app_server_remote_control_url: string | null} & ({ [key in string]?: number | string | boolean | Array<JsonValue> | { [key in string]?: JsonValue } | null });

View File

@@ -717,6 +717,8 @@ pub struct Config {
#[experimental("config/read.apps")]
#[serde(default)]
pub apps: Option<AppsConfig>,
#[experimental("config/read.experimental_app_server_remote_control_url")]
pub experimental_app_server_remote_control_url: Option<String>,
#[serde(default, flatten)]
pub additional: HashMap<String, JsonValue>,
}
@@ -6782,6 +6784,7 @@ mod tests {
service_tier: None,
analytics: None,
apps: None,
experimental_app_server_remote_control_url: None,
additional: HashMap::new(),
});
@@ -6815,6 +6818,7 @@ mod tests {
service_tier: None,
analytics: None,
apps: None,
experimental_app_server_remote_control_url: None,
additional: HashMap::new(),
});
@@ -6870,6 +6874,7 @@ mod tests {
service_tier: None,
analytics: None,
apps: None,
experimental_app_server_remote_control_url: None,
additional: HashMap::new(),
});
@@ -6919,6 +6924,7 @@ mod tests {
service_tier: None,
analytics: None,
apps: None,
experimental_app_server_remote_control_url: None,
additional: HashMap::new(),
});

View File

@@ -27,6 +27,7 @@ use crate::transport::ConnectionState;
use crate::transport::OutboundConnectionState;
use crate::transport::TransportEvent;
use crate::transport::route_outgoing_envelope;
use crate::transport::start_remote_control;
use crate::transport::start_stdio_connection;
use crate::transport::start_websocket_acceptor;
use codex_app_server_protocol::ConfigLayerSource;
@@ -353,37 +354,6 @@ pub async fn run_main_with_transport(
let (outbound_control_tx, mut outbound_control_rx) =
mpsc::channel::<OutboundControlEvent>(CHANNEL_CAPACITY);
enum TransportRuntime {
Stdio,
WebSocket {
accept_handle: JoinHandle<()>,
shutdown_token: CancellationToken,
},
}
let mut stdio_handles = Vec::<JoinHandle<()>>::new();
let transport_runtime = match transport {
AppServerTransport::Stdio => {
start_stdio_connection(transport_event_tx.clone(), &mut stdio_handles).await?;
TransportRuntime::Stdio
}
AppServerTransport::WebSocket { bind_address } => {
let shutdown_token = CancellationToken::new();
let accept_handle = start_websocket_acceptor(
bind_address,
transport_event_tx.clone(),
shutdown_token.clone(),
)
.await?;
TransportRuntime::WebSocket {
accept_handle,
shutdown_token,
}
}
};
let single_client_mode = matches!(&transport_runtime, TransportRuntime::Stdio);
let shutdown_when_no_connections = single_client_mode;
let graceful_signal_restart_enabled = !single_client_mode;
// Parse CLI overrides once and derive the base Config eagerly so later
// components do not need to work with raw TOML values.
let cli_kv_overrides = cli_config_overrides.parse_overrides().map_err(|e| {
@@ -539,6 +509,40 @@ pub async fn run_main_with_transport(
}
}
let transport_shutdown_token = CancellationToken::new();
let mut transport_accept_handles = Vec::<JoinHandle<()>>::new();
let single_client_mode = match transport {
AppServerTransport::Stdio => {
start_stdio_connection(transport_event_tx.clone(), &mut transport_accept_handles)
.await?;
true
}
AppServerTransport::WebSocket { bind_address } => {
let accept_handle = start_websocket_acceptor(
bind_address,
transport_event_tx.clone(),
transport_shutdown_token.clone(),
)
.await?;
transport_accept_handles.push(accept_handle);
false
}
};
let shutdown_when_no_connections = single_client_mode;
let graceful_signal_restart_enabled = !single_client_mode;
let remote_control_url = config.experimental_app_server_remote_control_url.clone();
if let Some(remote_control_url) = remote_control_url {
let accept_handle = start_remote_control(
remote_control_url,
transport_event_tx.clone(),
transport_shutdown_token.clone(),
)
.await?;
transport_accept_handles.push(accept_handle);
}
let outbound_handle = tokio::spawn(async move {
let mut outbound_connections = HashMap::<ConnectionId, OutboundConnectionState>::new();
loop {
@@ -619,10 +623,7 @@ pub async fn run_main_with_transport(
let mut thread_created_rx = processor.thread_created_receiver();
let mut running_turn_count_rx = processor.subscribe_running_assistant_turn_count();
let mut connections = HashMap::<ConnectionId, ConnectionState>::new();
let websocket_accept_shutdown = match &transport_runtime {
TransportRuntime::WebSocket { shutdown_token, .. } => Some(shutdown_token.clone()),
TransportRuntime::Stdio => None,
};
let transport_shutdown_token = transport_shutdown_token.clone();
async move {
let mut listen_for_threads = true;
let mut shutdown_state = ShutdownState::default();
@@ -635,9 +636,7 @@ pub async fn run_main_with_transport(
shutdown_state.update(running_turn_count, connections.len()),
ShutdownAction::Finish
) {
if let Some(shutdown_token) = &websocket_accept_shutdown {
shutdown_token.cancel();
}
transport_shutdown_token.cancel();
let _ = outbound_control_tx
.send(OutboundControlEvent::DisconnectAll)
.await;
@@ -833,16 +832,8 @@ pub async fn run_main_with_transport(
let _ = processor_handle.await;
let _ = outbound_handle.await;
if let TransportRuntime::WebSocket {
accept_handle,
shutdown_token,
} = transport_runtime
{
shutdown_token.cancel();
let _ = accept_handle.await;
}
for handle in stdio_handles {
transport_shutdown_token.cancel();
for handle in transport_accept_handles {
let _ = handle.await;
}

View File

@@ -22,6 +22,8 @@ use futures::StreamExt;
use owo_colors::OwoColorize;
use owo_colors::Stream;
use owo_colors::Style;
use serde::Deserialize;
use serde::Serialize;
use std::collections::HashMap;
use std::collections::HashSet;
use std::io::ErrorKind;
@@ -40,6 +42,11 @@ use tokio::io::{self};
use tokio::net::TcpListener;
use tokio::sync::mpsc;
use tokio::task::JoinHandle;
use tokio::time::Duration;
use tokio::time::Instant;
use tokio::time::MissedTickBehavior;
use tokio_tungstenite::connect_async;
use tokio_tungstenite::tungstenite::Message as TungsteniteMessage;
use tokio_util::sync::CancellationToken;
use tracing::debug;
use tracing::error;
@@ -50,6 +57,10 @@ use tracing::warn;
/// is a balance between throughput and memory usage - 128 messages should be
/// plenty for an interactive CLI.
pub(crate) const CHANNEL_CAPACITY: usize = 128;
const REMOTE_CONTROL_CLIENT_IDLE_TIMEOUT: Duration = Duration::from_secs(10 * 60);
const REMOTE_CONTROL_IDLE_SWEEP_INTERVAL: Duration = Duration::from_secs(30);
const REMOTE_CONTROL_RECONNECT_INITIAL_BACKOFF: Duration = Duration::from_secs(1);
const REMOTE_CONTROL_RECONNECT_MAX_BACKOFF: Duration = Duration::from_secs(30);
fn colorize(text: &str, style: Style) -> String {
text.if_supports_color(Stream::Stderr, |value| value.style(style))
@@ -308,6 +319,12 @@ pub(crate) async fn start_stdio_connection(
Ok(())
}
static CONNECTION_ID_COUNTER: AtomicU64 = AtomicU64::new(1);
fn next_connection_id() -> ConnectionId {
ConnectionId(CONNECTION_ID_COUNTER.fetch_add(1, Ordering::Relaxed))
}
pub(crate) async fn start_websocket_acceptor(
bind_address: SocketAddr,
transport_event_tx: mpsc::Sender<TransportEvent>,
@@ -691,6 +708,453 @@ pub(crate) async fn route_outgoing_envelope(
}
}
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[serde(transparent)]
pub struct ClientId(pub String);
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum ClientEvent {
ClientMessage {
#[serde(rename = "clientId")]
client_id: ClientId,
message: JSONRPCMessage,
},
Ping {
#[serde(rename = "clientId")]
client_id: ClientId,
},
ClientClosed {
#[serde(rename = "clientId")]
client_id: ClientId,
},
}
#[derive(Debug, Clone, Serialize)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum ServerEvent {
ServerMessage {
#[serde(rename = "clientId")]
client_id: ClientId,
message: Box<OutgoingMessage>,
},
Pong {
#[serde(rename = "clientId")]
client_id: ClientId,
status: PongStatus,
},
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum PongStatus {
Active,
Unknown,
}
struct RemoteControlClientState {
connection_id: ConnectionId,
disconnect_token: CancellationToken,
last_activity_at: Instant,
}
pub(crate) async fn start_remote_control(
remote_control_url: String,
transport_event_tx: mpsc::Sender<TransportEvent>,
shutdown_token: CancellationToken,
) -> IoResult<JoinHandle<()>> {
let remote_control_url = normalize_remote_control_url(&remote_control_url)?;
Ok(tokio::spawn(async move {
let local_shutdown_token = shutdown_token.child_token();
let (client_event_tx, client_event_rx) = mpsc::channel(CHANNEL_CAPACITY);
let (server_event_tx, server_event_rx) = mpsc::channel(CHANNEL_CAPACITY);
let (writer_exited_tx, writer_exited_rx) = mpsc::channel(CHANNEL_CAPACITY);
let mut websocket_task = tokio::spawn(run_remote_control_websocket_loop(
remote_control_url,
client_event_tx,
server_event_rx,
local_shutdown_token.clone(),
));
let mut manager_task = tokio::spawn(run_remote_control_manager(
transport_event_tx,
client_event_rx,
server_event_tx,
writer_exited_tx,
writer_exited_rx,
local_shutdown_token.clone(),
));
tokio::select! {
_ = local_shutdown_token.cancelled() => {}
_ = &mut websocket_task => {
local_shutdown_token.cancel();
}
_ = &mut manager_task => {
local_shutdown_token.cancel();
}
}
let _ = websocket_task.await;
let _ = manager_task.await;
}))
}
fn normalize_remote_control_url(remote_control_url: &str) -> IoResult<String> {
if remote_control_url.starts_with("ws://") || remote_control_url.starts_with("wss://") {
return Ok(remote_control_url.to_string());
}
if let Some(rest) = remote_control_url.strip_prefix("http://") {
return Ok(format!("ws://{rest}"));
}
if let Some(rest) = remote_control_url.strip_prefix("https://") {
return Ok(format!("wss://{rest}"));
}
Err(std::io::Error::new(
ErrorKind::InvalidInput,
format!(
"invalid remote control URL `{remote_control_url}`; expected ws://, wss://, http://, or https://"
),
))
}
async fn run_remote_control_manager(
transport_event_tx: mpsc::Sender<TransportEvent>,
mut client_event_rx: mpsc::Receiver<ClientEvent>,
server_event_tx: mpsc::Sender<ServerEvent>,
writer_exited_tx: mpsc::Sender<ClientId>,
mut writer_exited_rx: mpsc::Receiver<ClientId>,
shutdown_token: CancellationToken,
) {
let mut clients = HashMap::<ClientId, RemoteControlClientState>::new();
let mut idle_sweep = tokio::time::interval(REMOTE_CONTROL_IDLE_SWEEP_INTERVAL);
idle_sweep.set_missed_tick_behavior(MissedTickBehavior::Delay);
loop {
tokio::select! {
_ = shutdown_token.cancelled() => {
break;
}
_ = idle_sweep.tick() => {
if !close_expired_remote_control_clients(&transport_event_tx, &mut clients).await {
break;
}
}
writer_exited = writer_exited_rx.recv() => {
let Some(client_id) = writer_exited else {
break;
};
if !close_remote_control_client(&transport_event_tx, &mut clients, &client_id).await {
break;
}
}
client_event = client_event_rx.recv() => {
let Some(client_event) = client_event else {
break;
};
match client_event {
ClientEvent::ClientMessage { client_id, message } => {
if let Some(connection_id) = clients.get_mut(&client_id).map(|client| {
client.last_activity_at = Instant::now();
client.connection_id
}) {
if transport_event_tx
.send(TransportEvent::IncomingMessage {
connection_id,
message,
})
.await
.is_err()
{
break;
}
continue;
}
if !remote_control_message_starts_connection(&message) {
continue;
}
let connection_id = next_connection_id();
let (writer_tx, writer_rx) = mpsc::channel::<OutgoingMessage>(CHANNEL_CAPACITY);
let disconnect_token = CancellationToken::new();
if transport_event_tx
.send(TransportEvent::ConnectionOpened {
connection_id,
writer: writer_tx,
allow_legacy_notifications: false,
disconnect_sender: Some(disconnect_token.clone()),
})
.await
.is_err()
{
break;
}
tokio::spawn(run_remote_control_client_outbound(
client_id.clone(),
writer_rx,
server_event_tx.clone(),
writer_exited_tx.clone(),
disconnect_token.clone(),
));
clients.insert(
client_id,
RemoteControlClientState {
connection_id,
disconnect_token,
last_activity_at: Instant::now(),
},
);
if transport_event_tx
.send(TransportEvent::IncomingMessage {
connection_id,
message,
})
.await
.is_err()
{
break;
}
}
ClientEvent::Ping { client_id } => {
let status = match clients.get_mut(&client_id) {
Some(client) => {
client.last_activity_at = Instant::now();
PongStatus::Active
}
None => PongStatus::Unknown,
};
if server_event_tx
.send(ServerEvent::Pong { client_id, status })
.await
.is_err()
{
break;
}
}
ClientEvent::ClientClosed { client_id } => {
if !close_remote_control_client(&transport_event_tx, &mut clients, &client_id).await {
break;
}
}
}
}
}
}
while let Some(client_id) = clients.keys().next().cloned() {
if !close_remote_control_client(&transport_event_tx, &mut clients, &client_id).await {
break;
}
}
}
fn remote_control_message_starts_connection(message: &JSONRPCMessage) -> bool {
matches!(
message,
JSONRPCMessage::Request(codex_app_server_protocol::JSONRPCRequest { method, .. })
if method == "initialize"
)
}
fn remote_control_client_is_alive(client: &RemoteControlClientState, now: Instant) -> bool {
now.duration_since(client.last_activity_at) < REMOTE_CONTROL_CLIENT_IDLE_TIMEOUT
}
async fn close_expired_remote_control_clients(
transport_event_tx: &mpsc::Sender<TransportEvent>,
clients: &mut HashMap<ClientId, RemoteControlClientState>,
) -> bool {
let now = Instant::now();
let expired_client_ids: Vec<ClientId> = clients
.iter()
.filter_map(|(client_id, client)| {
(!remote_control_client_is_alive(client, now)).then_some(client_id.clone())
})
.collect();
for client_id in expired_client_ids {
if !close_remote_control_client(transport_event_tx, clients, &client_id).await {
return false;
}
}
true
}
async fn close_remote_control_client(
transport_event_tx: &mpsc::Sender<TransportEvent>,
clients: &mut HashMap<ClientId, RemoteControlClientState>,
client_id: &ClientId,
) -> bool {
let Some(client) = clients.remove(client_id) else {
return true;
};
client.disconnect_token.cancel();
transport_event_tx
.send(TransportEvent::ConnectionClosed {
connection_id: client.connection_id,
})
.await
.is_ok()
}
async fn run_remote_control_client_outbound(
client_id: ClientId,
mut writer_rx: mpsc::Receiver<OutgoingMessage>,
server_event_tx: mpsc::Sender<ServerEvent>,
writer_exited_tx: mpsc::Sender<ClientId>,
disconnect_token: CancellationToken,
) {
loop {
tokio::select! {
_ = disconnect_token.cancelled() => {
break;
}
outgoing_message = writer_rx.recv() => {
let Some(outgoing_message) = outgoing_message else {
break;
};
if server_event_tx
.send(ServerEvent::ServerMessage {
client_id: client_id.clone(),
message: Box::new(outgoing_message),
})
.await
.is_err()
{
break;
}
}
}
}
let _ = writer_exited_tx.send(client_id).await;
}
async fn run_remote_control_websocket_loop(
remote_control_url: String,
client_event_tx: mpsc::Sender<ClientEvent>,
mut server_event_rx: mpsc::Receiver<ServerEvent>,
shutdown_token: CancellationToken,
) {
let mut reconnect_backoff = REMOTE_CONTROL_RECONNECT_INITIAL_BACKOFF;
let mut wait_before_connect = false;
let mut pending_server_event = None::<ServerEvent>;
loop {
let slept_before_connect = if wait_before_connect {
tokio::select! {
_ = shutdown_token.cancelled() => {
break;
}
_ = tokio::time::sleep(reconnect_backoff) => {}
}
true
} else {
wait_before_connect = true;
false
};
let websocket_stream = tokio::select! {
_ = shutdown_token.cancelled() => {
break;
}
connect_result = connect_async(remote_control_url.as_str()) => {
match connect_result {
Ok((websocket_stream, _response)) => {
reconnect_backoff = REMOTE_CONTROL_RECONNECT_INITIAL_BACKOFF;
info!("connected to app-server remote control websocket: {remote_control_url}");
websocket_stream
}
Err(err) => {
warn!("failed to connect app-server remote control websocket `{remote_control_url}`: {err}");
if slept_before_connect {
reconnect_backoff = reconnect_backoff
.saturating_mul(2)
.min(REMOTE_CONTROL_RECONNECT_MAX_BACKOFF);
}
continue;
}
}
}
};
let (mut websocket_writer, mut websocket_reader) = websocket_stream.split();
loop {
if let Some(server_event) = pending_server_event.take() {
let payload = match serde_json::to_string(&server_event) {
Ok(payload) => payload,
Err(err) => {
error!("failed to serialize remote-control server event: {err}");
continue;
}
};
if let Err(err) = websocket_writer
.send(TungsteniteMessage::Text(payload.into()))
.await
{
warn!("remote control websocket send failed: {err}");
pending_server_event = Some(server_event);
break;
}
continue;
}
tokio::select! {
_ = shutdown_token.cancelled() => {
return;
}
incoming_message = websocket_reader.next() => {
match incoming_message {
Some(Ok(TungsteniteMessage::Text(text))) => {
match serde_json::from_str::<ClientEvent>(&text) {
Ok(client_event) => {
if client_event_tx.send(client_event).await.is_err() {
return;
}
}
Err(err) => {
warn!("failed to deserialize remote-control client event: {err}");
}
}
}
Some(Ok(TungsteniteMessage::Ping(payload))) => {
if let Err(err) = websocket_writer
.send(TungsteniteMessage::Pong(payload))
.await
{
warn!("remote control websocket pong failed: {err}");
break;
}
}
Some(Ok(TungsteniteMessage::Pong(_))) => {}
Some(Ok(TungsteniteMessage::Binary(_))) => {
warn!("dropping unsupported binary remote-control websocket message");
}
Some(Ok(TungsteniteMessage::Frame(_))) => {}
Some(Ok(TungsteniteMessage::Close(_))) | None => {
warn!("remote control websocket disconnected");
break;
}
Some(Err(err)) => {
warn!("remote control websocket receive error: {err}");
break;
}
}
}
server_event = server_event_rx.recv() => {
let Some(server_event) = server_event else {
return;
};
pending_server_event = Some(server_event);
}
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
@@ -700,8 +1164,10 @@ mod tests {
use pretty_assertions::assert_eq;
use serde_json::json;
use std::path::PathBuf;
use tokio::time::Duration;
use tokio::net::TcpStream;
use tokio::time::timeout;
use tokio_tungstenite::WebSocketStream;
use tokio_tungstenite::accept_async;
fn absolute_path(path: &str) -> AbsolutePathBuf {
AbsolutePathBuf::from_absolute_path(path).expect("absolute path")
@@ -1349,4 +1815,379 @@ mod tests {
| Err(tokio::sync::mpsc::error::TryRecvError::Disconnected)
));
}
#[test]
fn normalize_remote_control_url_rewrites_http_schemes() {
assert_eq!(
normalize_remote_control_url("ws://example.com/control").expect("valid ws url"),
"ws://example.com/control"
);
assert_eq!(
normalize_remote_control_url("wss://example.com/control").expect("valid wss url"),
"wss://example.com/control"
);
assert_eq!(
normalize_remote_control_url("http://example.com/control").expect("valid http url"),
"ws://example.com/control"
);
assert_eq!(
normalize_remote_control_url("https://example.com/control").expect("valid https url"),
"wss://example.com/control"
);
}
#[test]
fn normalize_remote_control_url_rejects_unsupported_schemes() {
let err = normalize_remote_control_url("ftp://example.com/control")
.expect_err("unsupported scheme should fail");
assert_eq!(
err.to_string(),
"invalid remote control URL `ftp://example.com/control`; expected ws://, wss://, http://, or https://"
);
}
#[test]
fn remote_control_client_is_alive_respects_activity_timeout() {
let base = Instant::now();
let client = RemoteControlClientState {
connection_id: ConnectionId(11),
disconnect_token: CancellationToken::new(),
last_activity_at: base,
};
assert!(remote_control_client_is_alive(
&client,
base + REMOTE_CONTROL_CLIENT_IDLE_TIMEOUT - Duration::from_millis(1)
));
assert!(!remote_control_client_is_alive(
&client,
base + REMOTE_CONTROL_CLIENT_IDLE_TIMEOUT
));
}
#[tokio::test]
async fn remote_control_transport_manages_virtual_clients_and_routes_messages() {
let listener = TcpListener::bind("127.0.0.1:0")
.await
.expect("listener should bind");
let remote_control_url = format!(
"ws://{}",
listener
.local_addr()
.expect("listener should have a local addr")
);
let (transport_event_tx, mut transport_event_rx) =
mpsc::channel::<TransportEvent>(CHANNEL_CAPACITY);
let shutdown_token = CancellationToken::new();
let remote_handle = start_remote_control(
remote_control_url,
transport_event_tx,
shutdown_token.clone(),
)
.await
.expect("remote control should start");
let mut websocket = accept_remote_control_connection(&listener).await;
let client_id = ClientId("client-1".to_string());
send_client_event(
&mut websocket,
ClientEvent::Ping {
client_id: client_id.clone(),
},
)
.await;
assert_eq!(
read_server_event(&mut websocket).await,
json!({
"type": "pong",
"clientId": "client-1",
"status": "unknown",
})
);
send_client_event(
&mut websocket,
ClientEvent::ClientMessage {
client_id: client_id.clone(),
message: JSONRPCMessage::Notification(
codex_app_server_protocol::JSONRPCNotification {
method: "initialized".to_string(),
params: None,
},
),
},
)
.await;
assert!(
timeout(Duration::from_millis(100), transport_event_rx.recv())
.await
.is_err(),
"non-initialize client messages should be ignored before connection creation"
);
let initialize_message =
JSONRPCMessage::Request(codex_app_server_protocol::JSONRPCRequest {
id: codex_app_server_protocol::RequestId::Integer(1),
method: "initialize".to_string(),
params: Some(json!({
"clientInfo": {
"name": "remote-test-client",
"version": "0.1.0"
}
})),
trace: None,
});
send_client_event(
&mut websocket,
ClientEvent::ClientMessage {
client_id: client_id.clone(),
message: initialize_message.clone(),
},
)
.await;
let (connection_id, writer) =
match timeout(Duration::from_secs(5), transport_event_rx.recv())
.await
.expect("connection open should arrive in time")
.expect("connection open should exist")
{
TransportEvent::ConnectionOpened {
connection_id,
writer,
..
} => (connection_id, writer),
other => panic!("expected connection open event, got {other:?}"),
};
match timeout(Duration::from_secs(5), transport_event_rx.recv())
.await
.expect("initialize message should arrive in time")
.expect("initialize message should exist")
{
TransportEvent::IncomingMessage {
connection_id: incoming_connection_id,
message,
} => {
assert_eq!(incoming_connection_id, connection_id);
assert_eq!(message, initialize_message);
}
other => panic!("expected initialize incoming message, got {other:?}"),
}
let followup_message =
JSONRPCMessage::Notification(codex_app_server_protocol::JSONRPCNotification {
method: "initialized".to_string(),
params: None,
});
send_client_event(
&mut websocket,
ClientEvent::ClientMessage {
client_id: client_id.clone(),
message: followup_message.clone(),
},
)
.await;
match timeout(Duration::from_secs(5), transport_event_rx.recv())
.await
.expect("followup message should arrive in time")
.expect("followup message should exist")
{
TransportEvent::IncomingMessage {
connection_id: incoming_connection_id,
message,
} => {
assert_eq!(incoming_connection_id, connection_id);
assert_eq!(message, followup_message);
}
other => panic!("expected followup incoming message, got {other:?}"),
}
send_client_event(
&mut websocket,
ClientEvent::Ping {
client_id: client_id.clone(),
},
)
.await;
assert_eq!(
read_server_event(&mut websocket).await,
json!({
"type": "pong",
"clientId": "client-1",
"status": "active",
})
);
writer
.send(OutgoingMessage::Notification(
crate::outgoing_message::OutgoingNotification {
method: "codex/event/test".to_string(),
params: Some(json!({ "ok": true })),
},
))
.await
.expect("remote writer should accept outgoing message");
assert_eq!(
read_server_event(&mut websocket).await,
json!({
"type": "server_message",
"clientId": "client-1",
"message": {
"method": "codex/event/test",
"params": {
"ok": true,
}
}
})
);
send_client_event(
&mut websocket,
ClientEvent::ClientClosed {
client_id: client_id.clone(),
},
)
.await;
match timeout(Duration::from_secs(5), transport_event_rx.recv())
.await
.expect("connection close should arrive in time")
.expect("connection close should exist")
{
TransportEvent::ConnectionClosed {
connection_id: closed_connection_id,
} => {
assert_eq!(closed_connection_id, connection_id);
}
other => panic!("expected connection close event, got {other:?}"),
}
send_client_event(&mut websocket, ClientEvent::Ping { client_id }).await;
assert_eq!(
read_server_event(&mut websocket).await,
json!({
"type": "pong",
"clientId": "client-1",
"status": "unknown",
})
);
shutdown_token.cancel();
let _ = remote_handle.await;
}
#[tokio::test]
async fn remote_control_transport_reconnects_after_disconnect() {
let listener = TcpListener::bind("127.0.0.1:0")
.await
.expect("listener should bind");
let remote_control_url = format!(
"ws://{}",
listener
.local_addr()
.expect("listener should have a local addr")
);
let (transport_event_tx, mut transport_event_rx) =
mpsc::channel::<TransportEvent>(CHANNEL_CAPACITY);
let shutdown_token = CancellationToken::new();
let remote_handle = start_remote_control(
remote_control_url,
transport_event_tx,
shutdown_token.clone(),
)
.await
.expect("remote control should start");
let mut first_websocket = accept_remote_control_connection(&listener).await;
first_websocket
.close(None)
.await
.expect("first websocket should close");
drop(first_websocket);
let mut second_websocket = accept_remote_control_connection(&listener).await;
send_client_event(
&mut second_websocket,
ClientEvent::ClientMessage {
client_id: ClientId("client-2".to_string()),
message: JSONRPCMessage::Request(codex_app_server_protocol::JSONRPCRequest {
id: codex_app_server_protocol::RequestId::Integer(2),
method: "initialize".to_string(),
params: Some(json!({
"clientInfo": {
"name": "remote-test-client",
"version": "0.1.0"
}
})),
trace: None,
}),
},
)
.await;
match timeout(Duration::from_secs(5), transport_event_rx.recv())
.await
.expect("reconnected initialize should arrive in time")
.expect("reconnected initialize should exist")
{
TransportEvent::ConnectionOpened { .. } => {}
other => panic!("expected connection open after reconnect, got {other:?}"),
}
shutdown_token.cancel();
let _ = remote_handle.await;
}
async fn accept_remote_control_connection(
listener: &TcpListener,
) -> WebSocketStream<TcpStream> {
let (stream, _) = timeout(Duration::from_secs(5), listener.accept())
.await
.expect("remote control should connect in time")
.expect("listener accept should succeed");
accept_async(stream)
.await
.expect("websocket handshake should succeed")
}
async fn send_client_event(
websocket: &mut WebSocketStream<TcpStream>,
client_event: ClientEvent,
) {
let payload = serde_json::to_string(&client_event).expect("client event should serialize");
websocket
.send(TungsteniteMessage::Text(payload.into()))
.await
.expect("client event should send");
}
async fn read_server_event(websocket: &mut WebSocketStream<TcpStream>) -> serde_json::Value {
loop {
let frame = timeout(Duration::from_secs(5), websocket.next())
.await
.expect("server event should arrive in time")
.expect("websocket should stay open")
.expect("websocket frame should be readable");
match frame {
TungsteniteMessage::Text(text) => {
return serde_json::from_str(text.as_ref())
.expect("server event should deserialize");
}
TungsteniteMessage::Ping(payload) => {
websocket
.send(TungsteniteMessage::Pong(payload))
.await
.expect("websocket pong should send");
}
TungsteniteMessage::Pong(_) => {}
TungsteniteMessage::Close(frame) => {
panic!("unexpected websocket close frame: {frame:?}");
}
TungsteniteMessage::Binary(_) => {
panic!("unexpected binary websocket frame");
}
TungsteniteMessage::Frame(_) => {}
}
}
}
}

View File

@@ -88,6 +88,58 @@ sandbox_mode = "workspace-write"
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn config_read_includes_experimental_app_server_remote_control_url() -> Result<()> {
let codex_home = TempDir::new()?;
write_config(
&codex_home,
r#"
experimental_app_server_remote_control_url = "https://example.com/remote-control"
"#,
)?;
let codex_home_path = codex_home.path().canonicalize()?;
let user_file = AbsolutePathBuf::try_from(codex_home_path.join("config.toml"))?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let request_id = mcp
.send_config_read_request(ConfigReadParams {
include_layers: true,
cwd: None,
})
.await?;
let resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(request_id)),
)
.await??;
let ConfigReadResponse {
config,
origins,
layers,
} = to_response(resp)?;
assert_eq!(
config.experimental_app_server_remote_control_url.as_deref(),
Some("https://example.com/remote-control")
);
assert_eq!(
origins
.get("experimental_app_server_remote_control_url")
.expect("origin")
.name,
ConfigLayerSource::User {
file: user_file.clone(),
}
);
let layers = layers.expect("layers present");
assert_layers_user_then_optional_system(&layers, user_file)?;
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn config_read_includes_tools() -> Result<()> {
let codex_home = TempDir::new()?;

View File

@@ -1850,6 +1850,10 @@
"description": "When true, disables burst-paste detection for typed input entirely. All characters are inserted as they are received, and no buffering or placeholder replacement will occur for fast keypress bursts.",
"type": "boolean"
},
"experimental_app_server_remote_control_url": {
"description": "Experimental / do not use. Provides the app-server remote control URL. The app-server uses the presence of this field to opt into remote mode.",
"type": "string"
},
"experimental_compact_prompt_file": {
"$ref": "#/definitions/AbsolutePathBuf"
},

View File

@@ -4252,6 +4252,7 @@ fn test_precedence_fixture_with_o3_profile() -> std::io::Result<()> {
realtime: RealtimeConfig::default(),
experimental_realtime_ws_backend_prompt: None,
experimental_realtime_ws_startup_context: None,
experimental_app_server_remote_control_url: None,
base_instructions: None,
developer_instructions: None,
compact_prompt: None,
@@ -4391,6 +4392,7 @@ fn test_precedence_fixture_with_gpt3_profile() -> std::io::Result<()> {
realtime: RealtimeConfig::default(),
experimental_realtime_ws_backend_prompt: None,
experimental_realtime_ws_startup_context: None,
experimental_app_server_remote_control_url: None,
base_instructions: None,
developer_instructions: None,
compact_prompt: None,
@@ -4528,6 +4530,7 @@ fn test_precedence_fixture_with_zdr_profile() -> std::io::Result<()> {
realtime: RealtimeConfig::default(),
experimental_realtime_ws_backend_prompt: None,
experimental_realtime_ws_startup_context: None,
experimental_app_server_remote_control_url: None,
base_instructions: None,
developer_instructions: None,
compact_prompt: None,
@@ -4651,6 +4654,7 @@ fn test_precedence_fixture_with_gpt5_profile() -> std::io::Result<()> {
realtime: RealtimeConfig::default(),
experimental_realtime_ws_backend_prompt: None,
experimental_realtime_ws_startup_context: None,
experimental_app_server_remote_control_url: None,
base_instructions: None,
developer_instructions: None,
compact_prompt: None,
@@ -5912,6 +5916,34 @@ experimental_realtime_ws_startup_context = "startup context from config"
Ok(())
}
#[test]
fn experimental_app_server_remote_control_url_loads_from_config_toml() -> std::io::Result<()> {
let cfg: ConfigToml = toml::from_str(
r#"
experimental_app_server_remote_control_url = "https://example.com/remote-control"
"#,
)
.expect("TOML deserialization should succeed");
assert_eq!(
cfg.experimental_app_server_remote_control_url.as_deref(),
Some("https://example.com/remote-control")
);
let codex_home = TempDir::new()?;
let config = Config::load_from_base_config_with_overrides(
cfg,
ConfigOverrides::default(),
codex_home.path().to_path_buf(),
)?;
assert_eq!(
config.experimental_app_server_remote_control_url.as_deref(),
Some("https://example.com/remote-control")
);
Ok(())
}
#[test]
fn experimental_realtime_ws_model_loads_from_config_toml() -> std::io::Result<()> {
let cfg: ConfigToml = toml::from_str(

View File

@@ -495,6 +495,9 @@ pub struct Config {
/// instructions inserted into developer messages when realtime becomes
/// active.
pub experimental_realtime_start_instructions: Option<String>,
/// Experimental / do not use. Provides the app-server remote control URL.
/// The app-server uses the presence of this field to opt into remote mode.
pub experimental_app_server_remote_control_url: Option<String>,
/// When set, restricts ChatGPT login to a specific workspace identifier.
pub forced_chatgpt_workspace_id: Option<String>,
@@ -1394,6 +1397,9 @@ pub struct ConfigToml {
/// instructions inserted into developer messages when realtime becomes
/// active.
pub experimental_realtime_start_instructions: Option<String>,
/// Experimental / do not use. Provides the app-server remote control URL.
/// The app-server uses the presence of this field to opt into remote mode.
pub experimental_app_server_remote_control_url: Option<String>,
pub projects: Option<HashMap<String, ProjectConfig>>,
/// Controls the web search tool mode: disabled, cached, or live.
@@ -2711,6 +2717,8 @@ impl Config {
experimental_realtime_ws_backend_prompt: cfg.experimental_realtime_ws_backend_prompt,
experimental_realtime_ws_startup_context: cfg.experimental_realtime_ws_startup_context,
experimental_realtime_start_instructions: cfg.experimental_realtime_start_instructions,
experimental_app_server_remote_control_url: cfg
.experimental_app_server_remote_control_url,
forced_chatgpt_workspace_id,
forced_login_method,
include_apply_patch_tool: include_apply_patch_tool_flag,