mirror of
https://github.com/openai/codex.git
synced 2026-06-01 19:02:59 +00:00
feat(app-server): update remote control APIs for better UX (#22877)
## Why To help improve `codex remote-control` CLI UX which I plan to do in a followup, this PR adds `server-name` to the various remote control APIs: - `remoteControl/enable` - `remoteControl/disable` - `remoteControl/status/changed` Also, add a `remoteControl/status/read` API. This will be helpful in the Codex App.
This commit is contained in:
@@ -204,7 +204,8 @@ Example with notification opt-out:
|
||||
- `app/list` — list available apps.
|
||||
- `remoteControl/enable` — experimental; enable remote control for the current app-server process and return the current remote-control status snapshot. The caller is responsible for persisting the desired setting outside app-server.
|
||||
- `remoteControl/disable` — experimental; disable remote control for the current app-server process and return the current remote-control status snapshot. This does not revoke already enrolled controller devices.
|
||||
- `remoteControl/status/changed` — notification emitted when the remote-control status or client-visible environment id changes. `status` is one of `disabled`, `connecting`, `connected`, or `errored`; `environmentId` is a string when the app-server has a current enrollment and `null` when that enrollment is cleared, invalidated, or remote control is disabled. Newly initialized app-server clients always receive the current status snapshot.
|
||||
- `remoteControl/status/read` — experimental; read the current remote-control status snapshot. `status` is one of `disabled`, `connecting`, `connected`, or `errored`; `serverName` is the local machine name used by this app-server process; `environmentId` is a string when the app-server has a current enrollment and `null` when that enrollment is cleared, invalidated, or remote control is disabled.
|
||||
- `remoteControl/status/changed` — notification emitted when the remote-control status or client-visible environment id changes. `status` is one of `disabled`, `connecting`, `connected`, or `errored`; `serverName` is the local machine name used by this app-server process; `environmentId` is a string when the app-server has a current enrollment and `null` when that enrollment is cleared, invalidated, or remote control is disabled. Newly initialized app-server clients always receive the current status snapshot.
|
||||
- `skills/config/write` — write user-level skill config by name or absolute path.
|
||||
- `plugin/install` — install a plugin from a discovered marketplace entry, rejecting marketplace entries marked unavailable for install, install MCPs if any, and return the effective plugin auth policy plus any apps that still need auth (**under development; do not call from production clients yet**).
|
||||
- `plugin/uninstall` — uninstall a local plugin by `pluginId` in `<plugin>@<marketplace>` form by removing its cached files and clearing its user-level config entry, or uninstall a remote ChatGPT plugin by backend `pluginId` by forwarding the uninstall to the ChatGPT plugin backend and removing any downloaded remote-plugin cache (**under development; do not call from production clients yet**).
|
||||
|
||||
@@ -41,7 +41,6 @@ use codex_analytics::AppServerRpcTransport;
|
||||
use codex_app_server_protocol::ConfigLayerSource;
|
||||
use codex_app_server_protocol::ConfigWarningNotification;
|
||||
use codex_app_server_protocol::JSONRPCMessage;
|
||||
use codex_app_server_protocol::RemoteControlStatusChangedNotification;
|
||||
use codex_app_server_protocol::ServerNotification;
|
||||
use codex_app_server_protocol::TextPosition as AppTextPosition;
|
||||
use codex_app_server_protocol::TextRange as AppTextRange;
|
||||
@@ -1003,14 +1002,9 @@ pub async fn run_main_with_transport_options(
|
||||
continue;
|
||||
}
|
||||
remote_control_status = status.clone();
|
||||
let notification = ServerNotification::RemoteControlStatusChanged(status);
|
||||
initialize_notification_sender
|
||||
.send_server_notification(ServerNotification::RemoteControlStatusChanged(
|
||||
RemoteControlStatusChangedNotification {
|
||||
status: status.status,
|
||||
installation_id: status.installation_id,
|
||||
environment_id: status.environment_id,
|
||||
},
|
||||
))
|
||||
.send_server_notification(notification)
|
||||
.await;
|
||||
}
|
||||
created = thread_created_rx.recv(), if listen_for_threads => {
|
||||
|
||||
@@ -897,6 +897,10 @@ impl MessageProcessor {
|
||||
.remote_control_processor
|
||||
.disable()
|
||||
.map(|response| Some(response.into())),
|
||||
ClientRequest::RemoteControlStatusRead { .. } => self
|
||||
.remote_control_processor
|
||||
.status_read()
|
||||
.map(|response| Some(response.into())),
|
||||
ClientRequest::ConfigRequirementsRead { params: _, .. } => self
|
||||
.config_processor
|
||||
.config_requirements_read()
|
||||
|
||||
@@ -5,6 +5,7 @@ use crate::transport::RemoteControlUnavailable;
|
||||
use codex_app_server_protocol::JSONRPCErrorError;
|
||||
use codex_app_server_protocol::RemoteControlDisableResponse;
|
||||
use codex_app_server_protocol::RemoteControlEnableResponse;
|
||||
use codex_app_server_protocol::RemoteControlStatusReadResponse;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub(crate) struct RemoteControlRequestProcessor {
|
||||
@@ -31,6 +32,16 @@ impl RemoteControlRequestProcessor {
|
||||
Ok(RemoteControlDisableResponse::from(handle.disable()))
|
||||
}
|
||||
|
||||
pub(crate) fn status_read(&self) -> Result<RemoteControlStatusReadResponse, JSONRPCErrorError> {
|
||||
let status = self.handle()?.status();
|
||||
Ok(RemoteControlStatusReadResponse {
|
||||
status: status.status,
|
||||
server_name: status.server_name,
|
||||
installation_id: status.installation_id,
|
||||
environment_id: status.environment_id,
|
||||
})
|
||||
}
|
||||
|
||||
fn handle(&self) -> Result<&RemoteControlHandle, JSONRPCErrorError> {
|
||||
self.remote_control_handle
|
||||
.as_ref()
|
||||
|
||||
@@ -582,6 +582,12 @@ impl McpProcess {
|
||||
.await
|
||||
}
|
||||
|
||||
/// Send a `remoteControl/status/read` JSON-RPC request.
|
||||
pub async fn send_remote_control_status_read_request(&mut self) -> anyhow::Result<i64> {
|
||||
self.send_request("remoteControl/status/read", /*params*/ None)
|
||||
.await
|
||||
}
|
||||
|
||||
/// Send an `app/list` JSON-RPC request.
|
||||
pub async fn send_apps_list_request(&mut self, params: AppsListParams) -> anyhow::Result<i64> {
|
||||
let params = Some(serde_json::to_value(params)?);
|
||||
|
||||
@@ -1,14 +1,27 @@
|
||||
use std::time::Duration;
|
||||
|
||||
use anyhow::Context;
|
||||
use anyhow::Result;
|
||||
use app_test_support::ChatGptAuthFixture;
|
||||
use app_test_support::McpProcess;
|
||||
use app_test_support::to_response;
|
||||
use app_test_support::write_chatgpt_auth;
|
||||
use app_test_support::write_mock_responses_config_toml_with_chatgpt_base_url;
|
||||
use codex_app_server_protocol::JSONRPCResponse;
|
||||
use codex_app_server_protocol::RemoteControlConnectionStatus;
|
||||
use codex_app_server_protocol::RemoteControlDisableResponse;
|
||||
use codex_app_server_protocol::RemoteControlEnableResponse;
|
||||
use codex_app_server_protocol::RemoteControlStatusReadResponse;
|
||||
use codex_app_server_protocol::RequestId;
|
||||
use codex_config::types::AuthCredentialsStoreMode;
|
||||
use pretty_assertions::assert_eq;
|
||||
use tempfile::TempDir;
|
||||
use tokio::io::AsyncBufReadExt;
|
||||
use tokio::io::BufReader;
|
||||
use tokio::net::TcpListener;
|
||||
use tokio::net::TcpStream;
|
||||
use tokio::sync::oneshot;
|
||||
use tokio::task::JoinHandle;
|
||||
use tokio::time::timeout;
|
||||
|
||||
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10);
|
||||
@@ -28,6 +41,28 @@ async fn remote_control_disable_returns_disabled_status() -> Result<()> {
|
||||
let received: RemoteControlDisableResponse = to_response(response)?;
|
||||
|
||||
assert_eq!(received.status, RemoteControlConnectionStatus::Disabled);
|
||||
assert!(!received.server_name.is_empty());
|
||||
assert_eq!(received.environment_id, None);
|
||||
assert!(!received.installation_id.is_empty());
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn remote_control_status_read_returns_disabled_status() -> Result<()> {
|
||||
let codex_home = TempDir::new()?;
|
||||
let mut mcp = McpProcess::new(codex_home.path()).await?;
|
||||
timeout(DEFAULT_TIMEOUT, mcp.initialize()).await??;
|
||||
|
||||
let request_id = mcp.send_remote_control_status_read_request().await?;
|
||||
let response: JSONRPCResponse = timeout(
|
||||
DEFAULT_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(request_id)),
|
||||
)
|
||||
.await??;
|
||||
let received: RemoteControlStatusReadResponse = to_response(response)?;
|
||||
|
||||
assert_eq!(received.status, RemoteControlConnectionStatus::Disabled);
|
||||
assert!(!received.server_name.is_empty());
|
||||
assert_eq!(received.environment_id, None);
|
||||
assert!(!received.installation_id.is_empty());
|
||||
Ok(())
|
||||
@@ -36,6 +71,7 @@ async fn remote_control_disable_returns_disabled_status() -> Result<()> {
|
||||
#[tokio::test]
|
||||
async fn remote_control_enable_returns_connecting_status() -> Result<()> {
|
||||
let codex_home = TempDir::new()?;
|
||||
let _backend = BlockingRemoteControlBackend::start(codex_home.path()).await?;
|
||||
let mut mcp = McpProcess::new(codex_home.path()).await?;
|
||||
timeout(DEFAULT_TIMEOUT, mcp.initialize()).await??;
|
||||
|
||||
@@ -48,7 +84,117 @@ async fn remote_control_enable_returns_connecting_status() -> Result<()> {
|
||||
let received: RemoteControlEnableResponse = to_response(response)?;
|
||||
|
||||
assert_eq!(received.status, RemoteControlConnectionStatus::Connecting);
|
||||
assert!(!received.server_name.is_empty());
|
||||
assert_eq!(received.environment_id, None);
|
||||
assert!(!received.installation_id.is_empty());
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn remote_control_status_read_returns_connecting_status_after_enable() -> Result<()> {
|
||||
let codex_home = TempDir::new()?;
|
||||
let mut backend = BlockingRemoteControlBackend::start(codex_home.path()).await?;
|
||||
let mut mcp = McpProcess::new(codex_home.path()).await?;
|
||||
timeout(DEFAULT_TIMEOUT, mcp.initialize()).await??;
|
||||
|
||||
let request_id = mcp.send_remote_control_enable_request().await?;
|
||||
let _: JSONRPCResponse = timeout(
|
||||
DEFAULT_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(request_id)),
|
||||
)
|
||||
.await??;
|
||||
|
||||
let enroll_request = timeout(DEFAULT_TIMEOUT, backend.wait_for_enroll_request()).await??;
|
||||
assert_eq!(
|
||||
enroll_request,
|
||||
"POST /backend-api/wham/remote/control/server/enroll HTTP/1.1"
|
||||
);
|
||||
|
||||
let request_id = mcp.send_remote_control_status_read_request().await?;
|
||||
let response: JSONRPCResponse = timeout(
|
||||
DEFAULT_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(request_id)),
|
||||
)
|
||||
.await??;
|
||||
let received: RemoteControlStatusReadResponse = to_response(response)?;
|
||||
|
||||
assert_eq!(received.status, RemoteControlConnectionStatus::Connecting);
|
||||
assert!(!received.server_name.is_empty());
|
||||
assert_eq!(received.environment_id, None);
|
||||
assert!(!received.installation_id.is_empty());
|
||||
Ok(())
|
||||
}
|
||||
|
||||
struct BlockingRemoteControlBackend {
|
||||
enroll_request_rx: Option<oneshot::Receiver<Result<String>>>,
|
||||
server_task: JoinHandle<()>,
|
||||
}
|
||||
|
||||
impl BlockingRemoteControlBackend {
|
||||
async fn start(codex_home: &std::path::Path) -> Result<Self> {
|
||||
let listener = TcpListener::bind("127.0.0.1:0").await?;
|
||||
let remote_control_url = format!("http://{}/backend-api/", listener.local_addr()?);
|
||||
write_mock_responses_config_toml_with_chatgpt_base_url(
|
||||
codex_home,
|
||||
&remote_control_url,
|
||||
&remote_control_url,
|
||||
)?;
|
||||
write_chatgpt_auth(
|
||||
codex_home,
|
||||
ChatGptAuthFixture::new("chatgpt-token")
|
||||
.account_id("account_id")
|
||||
.chatgpt_account_id("account_id"),
|
||||
AuthCredentialsStoreMode::File,
|
||||
)?;
|
||||
|
||||
let (enroll_request_tx, enroll_request_rx) = oneshot::channel();
|
||||
let server_task = tokio::spawn(async move {
|
||||
match read_enroll_request(listener).await {
|
||||
Ok((request_line, _reader)) => {
|
||||
let _ = enroll_request_tx.send(Ok(request_line));
|
||||
std::future::pending::<()>().await;
|
||||
}
|
||||
Err(err) => {
|
||||
let _ = enroll_request_tx.send(Err(err));
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
Ok(Self {
|
||||
enroll_request_rx: Some(enroll_request_rx),
|
||||
server_task,
|
||||
})
|
||||
}
|
||||
|
||||
async fn wait_for_enroll_request(&mut self) -> Result<String> {
|
||||
let rx = self
|
||||
.enroll_request_rx
|
||||
.take()
|
||||
.context("enroll request should only be awaited once")?;
|
||||
rx.await?
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for BlockingRemoteControlBackend {
|
||||
fn drop(&mut self) {
|
||||
self.server_task.abort();
|
||||
}
|
||||
}
|
||||
|
||||
async fn read_enroll_request(listener: TcpListener) -> Result<(String, BufReader<TcpStream>)> {
|
||||
let (stream, _) = listener.accept().await?;
|
||||
let mut reader = BufReader::new(stream);
|
||||
|
||||
let mut request_line = String::new();
|
||||
reader.read_line(&mut request_line).await?;
|
||||
|
||||
loop {
|
||||
let mut line = String::new();
|
||||
reader.read_line(&mut line).await?;
|
||||
if line == "\r\n" {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
Ok((request_line.trim_end().to_string(), reader))
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user