mirror of
https://github.com/openai/codex.git
synced 2026-05-29 23:40:29 +00:00
feat(app-server): migrate remote control to server tokens (#24141)
## Why `codex-backend` now authenticates remote-control server websocket connections with short-lived server tokens instead of the user's ChatGPT access token. `app-server` needs to mint and refresh those server tokens without persisting them, so a restart can reconnect from durable enrollment identity while keeping the bearer token memory-only. ## What Changed Updated the remote-control transport to consume `remote_control_token` and `expires_at` from server enroll responses and added `/server/refresh` support for persisted enrollments or expiring cached tokens. Websocket handshakes now send `Authorization: Bearer <remote_control_token>` with the existing server identity headers, and no longer send the ChatGPT bearer token or `chatgpt-account-id` on that websocket path. The in-memory enrollment state now owns the ephemeral server token cache, while SQLite still persists only `server_id`, `environment_id`, and `server_name`. Websocket `401`/`403` clears only the cached token for refresh on reconnect; websocket or refresh `404` clears stale persisted enrollment and re-enrolls. Response body previews redact `remote_control_token` before surfacing parse errors. ## Verification - `just test -p codex-app-server-transport` - Manual prod smoke with an isolated `CODEX_HOME`: `codex remote-control --json -c 'chatgpt_base_url="https://chatgpt.com/backend-api"'` reached `status:"connected"` with `environmentId:"env_i_6a17d9f1d764832986da2e80f4554f1b"`.
This commit is contained in:
@@ -1,18 +1,24 @@
|
||||
use super::protocol::EnrollRemoteServerRequest;
|
||||
use super::protocol::EnrollRemoteServerResponse;
|
||||
use super::protocol::RefreshRemoteServerRequest;
|
||||
use super::protocol::RemoteControlTarget;
|
||||
use axum::http::HeaderMap;
|
||||
use codex_api::SharedAuthProvider;
|
||||
use codex_login::default_client::build_reqwest_client;
|
||||
use codex_state::RemoteControlEnrollmentRecord;
|
||||
use codex_state::StateRuntime;
|
||||
use serde::Serialize;
|
||||
use serde::de::DeserializeOwned;
|
||||
use std::io;
|
||||
use std::io::ErrorKind;
|
||||
use time::OffsetDateTime;
|
||||
use time::format_description::well_known::Rfc3339;
|
||||
use tracing::info;
|
||||
use tracing::warn;
|
||||
|
||||
const REMOTE_CONTROL_ENROLL_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(30);
|
||||
const REMOTE_CONTROL_RESPONSE_BODY_MAX_BYTES: usize = 4096;
|
||||
const REMOTE_CONTROL_SERVER_TOKEN_REFRESH_SKEW_SECS: i64 = 30;
|
||||
|
||||
const REQUEST_ID_HEADER: &str = "x-request-id";
|
||||
const OAI_REQUEST_ID_HEADER: &str = "x-oai-request-id";
|
||||
@@ -26,6 +32,24 @@ pub(super) struct RemoteControlEnrollment {
|
||||
pub(super) environment_id: String,
|
||||
pub(super) server_id: String,
|
||||
pub(super) server_name: String,
|
||||
pub(super) remote_control_token: Option<String>,
|
||||
pub(super) expires_at: Option<OffsetDateTime>,
|
||||
}
|
||||
|
||||
impl RemoteControlEnrollment {
|
||||
pub(super) fn should_refresh_server_token(&self) -> bool {
|
||||
self.remote_control_token.is_none()
|
||||
|| self.expires_at.is_none_or(|expires_at| {
|
||||
expires_at.unix_timestamp()
|
||||
<= OffsetDateTime::now_utc().unix_timestamp()
|
||||
+ REMOTE_CONTROL_SERVER_TOKEN_REFRESH_SKEW_SECS
|
||||
})
|
||||
}
|
||||
|
||||
pub(super) fn clear_server_token(&mut self) {
|
||||
self.remote_control_token = None;
|
||||
self.expires_at = None;
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) struct RemoteControlConnectionAuth {
|
||||
@@ -81,6 +105,8 @@ pub(super) async fn load_persisted_remote_control_enrollment(
|
||||
environment_id: enrollment.environment_id,
|
||||
server_id: enrollment.server_id,
|
||||
server_name: enrollment.server_name,
|
||||
remote_control_token: None,
|
||||
expires_at: None,
|
||||
}))
|
||||
}
|
||||
None => {
|
||||
@@ -164,19 +190,33 @@ pub(crate) fn preview_remote_control_response_body(body: &[u8]) -> String {
|
||||
if trimmed.is_empty() {
|
||||
return "<empty>".to_string();
|
||||
}
|
||||
if trimmed.len() <= REMOTE_CONTROL_RESPONSE_BODY_MAX_BYTES {
|
||||
return trimmed.to_string();
|
||||
let redacted = redact_remote_control_response_body(trimmed);
|
||||
if redacted.len() <= REMOTE_CONTROL_RESPONSE_BODY_MAX_BYTES {
|
||||
return redacted;
|
||||
}
|
||||
|
||||
let mut cut = REMOTE_CONTROL_RESPONSE_BODY_MAX_BYTES;
|
||||
while !trimmed.is_char_boundary(cut) {
|
||||
while !redacted.is_char_boundary(cut) {
|
||||
cut = cut.saturating_sub(1);
|
||||
}
|
||||
let mut truncated = trimmed[..cut].to_string();
|
||||
let mut truncated = redacted[..cut].to_string();
|
||||
truncated.push_str("...");
|
||||
truncated
|
||||
}
|
||||
|
||||
fn redact_remote_control_response_body(body: &str) -> String {
|
||||
let Ok(mut body_json) = serde_json::from_str::<serde_json::Value>(body) else {
|
||||
return body.to_string();
|
||||
};
|
||||
let Some(body_object) = body_json.as_object_mut() else {
|
||||
return body.to_string();
|
||||
};
|
||||
if let Some(remote_control_token) = body_object.get_mut("remote_control_token") {
|
||||
*remote_control_token = serde_json::Value::String("<redacted>".to_string());
|
||||
}
|
||||
body_json.to_string()
|
||||
}
|
||||
|
||||
pub(crate) fn format_headers(headers: &HeaderMap) -> String {
|
||||
let request_id_str = headers
|
||||
.get(REQUEST_ID_HEADER)
|
||||
@@ -204,58 +244,146 @@ pub(super) async fn enroll_remote_control_server(
|
||||
app_server_version: env!("CARGO_PKG_VERSION"),
|
||||
installation_id: installation_id.to_string(),
|
||||
};
|
||||
let enrollment_response = send_remote_control_server_request::<_, EnrollRemoteServerResponse>(
|
||||
enroll_url,
|
||||
auth,
|
||||
installation_id,
|
||||
&request,
|
||||
"enroll",
|
||||
"server enrollment",
|
||||
)
|
||||
.await?;
|
||||
let mut enrollment = RemoteControlEnrollment {
|
||||
account_id: auth.account_id.clone(),
|
||||
environment_id: enrollment_response.environment_id,
|
||||
server_id: enrollment_response.server_id,
|
||||
server_name: server_name.to_string(),
|
||||
remote_control_token: None,
|
||||
expires_at: None,
|
||||
};
|
||||
update_remote_control_server_token(
|
||||
&mut enrollment,
|
||||
enroll_url,
|
||||
enrollment_response.remote_control_token,
|
||||
enrollment_response.expires_at,
|
||||
)?;
|
||||
Ok(enrollment)
|
||||
}
|
||||
|
||||
pub(super) async fn refresh_remote_control_server(
|
||||
remote_control_target: &RemoteControlTarget,
|
||||
auth: &RemoteControlConnectionAuth,
|
||||
installation_id: &str,
|
||||
enrollment: &mut RemoteControlEnrollment,
|
||||
) -> io::Result<()> {
|
||||
let refresh_url = &remote_control_target.refresh_url;
|
||||
let request = RefreshRemoteServerRequest {
|
||||
server_id: enrollment.server_id.clone(),
|
||||
installation_id: installation_id.to_string(),
|
||||
};
|
||||
let refreshed = send_remote_control_server_request::<_, EnrollRemoteServerResponse>(
|
||||
refresh_url,
|
||||
auth,
|
||||
installation_id,
|
||||
&request,
|
||||
"refresh",
|
||||
"server refresh",
|
||||
)
|
||||
.await?;
|
||||
if refreshed.server_id != enrollment.server_id
|
||||
|| refreshed.environment_id != enrollment.environment_id
|
||||
{
|
||||
return Err(io::Error::other(format!(
|
||||
"remote control server refresh returned mismatched enrollment: expected server_id={}, environment_id={}; got server_id={}, environment_id={}",
|
||||
enrollment.server_id,
|
||||
enrollment.environment_id,
|
||||
refreshed.server_id,
|
||||
refreshed.environment_id
|
||||
)));
|
||||
}
|
||||
|
||||
update_remote_control_server_token(
|
||||
enrollment,
|
||||
refresh_url,
|
||||
refreshed.remote_control_token,
|
||||
refreshed.expires_at,
|
||||
)
|
||||
}
|
||||
|
||||
async fn send_remote_control_server_request<Request, Response>(
|
||||
url: &str,
|
||||
auth: &RemoteControlConnectionAuth,
|
||||
installation_id: &str,
|
||||
request: &Request,
|
||||
action: &str,
|
||||
response_kind: &str,
|
||||
) -> io::Result<Response>
|
||||
where
|
||||
Request: Serialize,
|
||||
Response: DeserializeOwned,
|
||||
{
|
||||
let client = build_reqwest_client();
|
||||
let mut auth_headers = HeaderMap::new();
|
||||
auth.auth_provider.add_auth_headers(&mut auth_headers);
|
||||
let http_request = client
|
||||
.post(enroll_url)
|
||||
let response = client
|
||||
.post(url)
|
||||
.timeout(REMOTE_CONTROL_ENROLL_TIMEOUT)
|
||||
.headers(auth_headers)
|
||||
.header(REMOTE_CONTROL_ACCOUNT_ID_HEADER, &auth.account_id)
|
||||
.header(REMOTE_CONTROL_INSTALLATION_ID_HEADER, installation_id)
|
||||
.json(&request);
|
||||
|
||||
let response = http_request.send().await.map_err(|err| {
|
||||
io::Error::other(format!(
|
||||
"failed to enroll remote control server at `{enroll_url}`: {err}"
|
||||
))
|
||||
})?;
|
||||
.json(request)
|
||||
.send()
|
||||
.await
|
||||
.map_err(|err| {
|
||||
io::Error::other(format!(
|
||||
"failed to {action} remote control server at `{url}`: {err}"
|
||||
))
|
||||
})?;
|
||||
let headers = response.headers().clone();
|
||||
let status = response.status();
|
||||
let body = response.bytes().await.map_err(|err| {
|
||||
io::Error::other(format!(
|
||||
"failed to read remote control enrollment response from `{enroll_url}`: {err}"
|
||||
"failed to read remote control {response_kind} response from `{url}`: {err}"
|
||||
))
|
||||
})?;
|
||||
let body_preview = preview_remote_control_response_body(&body);
|
||||
if !status.is_success() {
|
||||
let headers_str = format_headers(&headers);
|
||||
let error_kind = if matches!(status.as_u16(), 401 | 403) {
|
||||
ErrorKind::PermissionDenied
|
||||
} else {
|
||||
ErrorKind::Other
|
||||
let error_kind = match status.as_u16() {
|
||||
401 | 403 => ErrorKind::PermissionDenied,
|
||||
404 => ErrorKind::NotFound,
|
||||
_ => ErrorKind::Other,
|
||||
};
|
||||
return Err(io::Error::new(
|
||||
error_kind,
|
||||
format!(
|
||||
"remote control server enrollment failed at `{enroll_url}`: HTTP {status}, {headers_str}, body: {body_preview}"
|
||||
"remote control {response_kind} failed at `{url}`: HTTP {status}, {headers_str}, body: {body_preview}"
|
||||
),
|
||||
));
|
||||
}
|
||||
|
||||
let enrollment = serde_json::from_slice::<EnrollRemoteServerResponse>(&body).map_err(|err| {
|
||||
serde_json::from_slice::<Response>(&body).map_err(|err| {
|
||||
let headers_str = format_headers(&headers);
|
||||
io::Error::other(format!(
|
||||
"failed to parse remote control enrollment response from `{enroll_url}`: HTTP {status}, {headers_str}, body: {body_preview}, decode error: {err}"
|
||||
"failed to parse remote control {response_kind} response from `{url}`: HTTP {status}, {headers_str}, body: {body_preview}, decode error: {err}"
|
||||
))
|
||||
})
|
||||
}
|
||||
|
||||
fn update_remote_control_server_token(
|
||||
enrollment: &mut RemoteControlEnrollment,
|
||||
url: &str,
|
||||
token: String,
|
||||
expires_at: String,
|
||||
) -> io::Result<()> {
|
||||
let expires_at = OffsetDateTime::parse(&expires_at, &Rfc3339).map_err(|err| {
|
||||
io::Error::other(format!(
|
||||
"failed to parse remote control server token expiry from `{url}`: {err}"
|
||||
))
|
||||
})?;
|
||||
|
||||
Ok(RemoteControlEnrollment {
|
||||
account_id: auth.account_id.clone(),
|
||||
environment_id: enrollment.environment_id,
|
||||
server_id: enrollment.server_id,
|
||||
server_name: server_name.to_string(),
|
||||
})
|
||||
enrollment.remote_control_token = Some(token);
|
||||
enrollment.expires_at = Some(expires_at);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
@@ -281,6 +409,40 @@ mod tests {
|
||||
.expect("state runtime should initialize")
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn remote_control_enrollment_refreshes_server_token_before_expiry() {
|
||||
let expires_soon = RemoteControlEnrollment {
|
||||
account_id: "account-a".to_string(),
|
||||
environment_id: "env_first".to_string(),
|
||||
server_id: "srv_e_first".to_string(),
|
||||
server_name: "first-server".to_string(),
|
||||
remote_control_token: Some("expires-soon".to_string()),
|
||||
expires_at: Some(OffsetDateTime::now_utc() + time::Duration::seconds(29)),
|
||||
};
|
||||
let expires_later = RemoteControlEnrollment {
|
||||
expires_at: Some(OffsetDateTime::now_utc() + time::Duration::seconds(31)),
|
||||
remote_control_token: Some("expires-later".to_string()),
|
||||
..expires_soon.clone()
|
||||
};
|
||||
|
||||
assert!(expires_soon.should_refresh_server_token());
|
||||
assert!(!expires_later.should_refresh_server_token());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn preview_remote_control_response_body_redacts_server_token() {
|
||||
assert_eq!(
|
||||
serde_json::from_str::<serde_json::Value>(&preview_remote_control_response_body(
|
||||
br#"{"server_id":"srv_e_test","remote_control_token":"secret"}"#
|
||||
))
|
||||
.expect("redacted response preview should stay valid json"),
|
||||
json!({
|
||||
"server_id": "srv_e_test",
|
||||
"remote_control_token": "<redacted>",
|
||||
})
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn persisted_remote_control_enrollment_round_trips_by_target_and_account() {
|
||||
let codex_home = TempDir::new().expect("temp dir should create");
|
||||
@@ -295,12 +457,16 @@ mod tests {
|
||||
environment_id: "env_first".to_string(),
|
||||
server_id: "srv_e_first".to_string(),
|
||||
server_name: "first-server".to_string(),
|
||||
remote_control_token: None,
|
||||
expires_at: None,
|
||||
};
|
||||
let second_enrollment = RemoteControlEnrollment {
|
||||
account_id: "account-a".to_string(),
|
||||
environment_id: "env_second".to_string(),
|
||||
server_id: "srv_e_second".to_string(),
|
||||
server_name: "second-server".to_string(),
|
||||
remote_control_token: None,
|
||||
expires_at: None,
|
||||
};
|
||||
|
||||
update_persisted_remote_control_enrollment(
|
||||
@@ -371,12 +537,16 @@ mod tests {
|
||||
environment_id: "env_first".to_string(),
|
||||
server_id: "srv_e_first".to_string(),
|
||||
server_name: "first-server".to_string(),
|
||||
remote_control_token: None,
|
||||
expires_at: None,
|
||||
};
|
||||
let second_enrollment = RemoteControlEnrollment {
|
||||
account_id: "account-a".to_string(),
|
||||
environment_id: "env_second".to_string(),
|
||||
server_id: "srv_e_second".to_string(),
|
||||
server_name: "second-server".to_string(),
|
||||
remote_control_token: None,
|
||||
expires_at: None,
|
||||
};
|
||||
|
||||
update_persisted_remote_control_enrollment(
|
||||
@@ -448,7 +618,8 @@ mod tests {
|
||||
normalize_remote_control_url(&remote_control_url).expect("target should parse");
|
||||
let enroll_url = remote_control_target.enroll_url.clone();
|
||||
let response_body = json!({
|
||||
"error": "not enrolled",
|
||||
"server_id": "srv_e_test",
|
||||
"environment_id": "env_test",
|
||||
});
|
||||
let expected_body = response_body.to_string();
|
||||
let server_task = tokio::spawn(async move {
|
||||
@@ -472,7 +643,7 @@ mod tests {
|
||||
assert_eq!(
|
||||
err.to_string(),
|
||||
format!(
|
||||
"failed to parse remote control enrollment response from `{enroll_url}`: HTTP 200 OK, request-id: <none>, cf-ray: <none>, body: {expected_body}, decode error: missing field `server_id` at line 1 column {}",
|
||||
"failed to parse remote control server enrollment response from `{enroll_url}`: HTTP 200 OK, request-id: <none>, cf-ray: <none>, body: {expected_body}, decode error: missing field `remote_control_token` at line 1 column {}",
|
||||
expected_body.len()
|
||||
)
|
||||
);
|
||||
|
||||
@@ -11,6 +11,7 @@ use url::Url;
|
||||
pub(super) struct RemoteControlTarget {
|
||||
pub(super) websocket_url: String,
|
||||
pub(super) enroll_url: String,
|
||||
pub(super) refresh_url: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
@@ -26,6 +27,14 @@ pub(super) struct EnrollRemoteServerRequest {
|
||||
pub(super) struct EnrollRemoteServerResponse {
|
||||
pub(super) server_id: String,
|
||||
pub(super) environment_id: String,
|
||||
pub(super) remote_control_token: String,
|
||||
pub(super) expires_at: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
pub(super) struct RefreshRemoteServerRequest {
|
||||
pub(super) server_id: String,
|
||||
pub(super) installation_id: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
|
||||
@@ -177,6 +186,9 @@ pub(super) fn normalize_remote_control_url(
|
||||
let enroll_url = remote_control_url
|
||||
.join("wham/remote/control/server/enroll")
|
||||
.map_err(map_url_parse_error)?;
|
||||
let refresh_url = remote_control_url
|
||||
.join("wham/remote/control/server/refresh")
|
||||
.map_err(map_url_parse_error)?;
|
||||
let mut websocket_url = remote_control_url
|
||||
.join("wham/remote/control/server")
|
||||
.map_err(map_url_parse_error)?;
|
||||
@@ -194,6 +206,7 @@ pub(super) fn normalize_remote_control_url(
|
||||
Ok(RemoteControlTarget {
|
||||
websocket_url: websocket_url.to_string(),
|
||||
enroll_url: enroll_url.to_string(),
|
||||
refresh_url: refresh_url.to_string(),
|
||||
})
|
||||
}
|
||||
|
||||
@@ -212,6 +225,8 @@ mod tests {
|
||||
.to_string(),
|
||||
enroll_url: "https://chatgpt.com/backend-api/wham/remote/control/server/enroll"
|
||||
.to_string(),
|
||||
refresh_url: "https://chatgpt.com/backend-api/wham/remote/control/server/refresh"
|
||||
.to_string(),
|
||||
}
|
||||
);
|
||||
assert_eq!(
|
||||
@@ -224,6 +239,9 @@ mod tests {
|
||||
enroll_url:
|
||||
"https://api.chatgpt-staging.com/backend-api/wham/remote/control/server/enroll"
|
||||
.to_string(),
|
||||
refresh_url:
|
||||
"https://api.chatgpt-staging.com/backend-api/wham/remote/control/server/refresh"
|
||||
.to_string(),
|
||||
}
|
||||
);
|
||||
}
|
||||
@@ -238,6 +256,8 @@ mod tests {
|
||||
.to_string(),
|
||||
enroll_url: "http://localhost:8080/backend-api/wham/remote/control/server/enroll"
|
||||
.to_string(),
|
||||
refresh_url: "http://localhost:8080/backend-api/wham/remote/control/server/refresh"
|
||||
.to_string(),
|
||||
}
|
||||
);
|
||||
assert_eq!(
|
||||
@@ -248,6 +268,9 @@ mod tests {
|
||||
.to_string(),
|
||||
enroll_url: "https://localhost:8443/backend-api/wham/remote/control/server/enroll"
|
||||
.to_string(),
|
||||
refresh_url:
|
||||
"https://localhost:8443/backend-api/wham/remote/control/server/refresh"
|
||||
.to_string(),
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
@@ -58,6 +58,9 @@ use tokio_tungstenite::tungstenite;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
|
||||
const TEST_INSTALLATION_ID: &str = "11111111-1111-4111-8111-111111111111";
|
||||
const TEST_REMOTE_CONTROL_SERVER_TOKEN: &str = "Remote Control Token";
|
||||
const TEST_REFRESHED_REMOTE_CONTROL_SERVER_TOKEN: &str = "Refreshed Remote Control Token";
|
||||
const TEST_REMOTE_CONTROL_SERVER_TOKEN_EXPIRES_AT: &str = "2999-01-01T00:00:00Z";
|
||||
|
||||
fn remote_control_auth_manager() -> Arc<AuthManager> {
|
||||
auth_manager_from_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing())
|
||||
@@ -125,6 +128,19 @@ fn test_server_name() -> String {
|
||||
gethostname().to_string_lossy().trim().to_string()
|
||||
}
|
||||
|
||||
fn remote_control_server_token_response(
|
||||
server_id: &str,
|
||||
environment_id: &str,
|
||||
remote_control_token: &str,
|
||||
) -> serde_json::Value {
|
||||
json!({
|
||||
"server_id": server_id,
|
||||
"environment_id": environment_id,
|
||||
"remote_control_token": remote_control_token,
|
||||
"expires_at": TEST_REMOTE_CONTROL_SERVER_TOKEN_EXPIRES_AT,
|
||||
})
|
||||
}
|
||||
|
||||
async fn expect_remote_control_status(
|
||||
status_rx: &mut watch::Receiver<RemoteControlStatusChangedNotification>,
|
||||
expected_status: Option<RemoteControlConnectionStatus>,
|
||||
@@ -203,7 +219,11 @@ async fn remote_control_transport_manages_virtual_clients_and_routes_messages()
|
||||
);
|
||||
respond_with_json(
|
||||
enroll_request.stream,
|
||||
json!({ "server_id": "srv_e_test", "environment_id": "env_test" }),
|
||||
remote_control_server_token_response(
|
||||
"srv_e_test",
|
||||
"env_test",
|
||||
TEST_REMOTE_CONTROL_SERVER_TOKEN,
|
||||
),
|
||||
)
|
||||
.await;
|
||||
let mut websocket = accept_remote_control_connection(&listener).await;
|
||||
@@ -483,17 +503,31 @@ async fn remote_control_transport_reconnects_after_disconnect() {
|
||||
);
|
||||
respond_with_json(
|
||||
enroll_request.stream,
|
||||
json!({ "server_id": "srv_e_test", "environment_id": "env_test" }),
|
||||
remote_control_server_token_response(
|
||||
"srv_e_test",
|
||||
"env_test",
|
||||
TEST_REMOTE_CONTROL_SERVER_TOKEN,
|
||||
),
|
||||
)
|
||||
.await;
|
||||
let mut first_websocket = accept_remote_control_connection(&listener).await;
|
||||
let (first_handshake_request, mut first_websocket) =
|
||||
accept_remote_control_backend_connection(&listener).await;
|
||||
assert_eq!(
|
||||
first_handshake_request.headers.get("authorization"),
|
||||
Some(&format!("Bearer {TEST_REMOTE_CONTROL_SERVER_TOKEN}"))
|
||||
);
|
||||
first_websocket
|
||||
.close(None)
|
||||
.await
|
||||
.expect("first websocket should close");
|
||||
drop(first_websocket);
|
||||
|
||||
let mut second_websocket = accept_remote_control_connection(&listener).await;
|
||||
let (second_handshake_request, mut second_websocket) =
|
||||
accept_remote_control_backend_connection(&listener).await;
|
||||
assert_eq!(
|
||||
second_handshake_request.headers.get("authorization"),
|
||||
Some(&format!("Bearer {TEST_REMOTE_CONTROL_SERVER_TOKEN}"))
|
||||
);
|
||||
expect_remote_control_status(
|
||||
&mut status_rx,
|
||||
/*expected_status*/ None,
|
||||
@@ -537,6 +571,91 @@ async fn remote_control_transport_reconnects_after_disconnect() {
|
||||
let _ = remote_task.await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn remote_control_transport_refreshes_server_token_after_websocket_unauthorized() {
|
||||
let listener = TcpListener::bind("127.0.0.1:0")
|
||||
.await
|
||||
.expect("listener should bind");
|
||||
let remote_control_url = remote_control_url_for_listener(&listener);
|
||||
let codex_home = TempDir::new().expect("temp dir should create");
|
||||
let (transport_event_tx, _transport_event_rx) =
|
||||
mpsc::channel::<TransportEvent>(CHANNEL_CAPACITY);
|
||||
let shutdown_token = CancellationToken::new();
|
||||
let (remote_task, remote_handle) = start_remote_control(
|
||||
RemoteControlStartConfig {
|
||||
remote_control_url,
|
||||
installation_id: TEST_INSTALLATION_ID.to_string(),
|
||||
},
|
||||
Some(remote_control_state_runtime(&codex_home).await),
|
||||
remote_control_auth_manager(),
|
||||
transport_event_tx,
|
||||
shutdown_token.clone(),
|
||||
/*app_server_client_name_rx*/ None,
|
||||
/*initial_enabled*/ true,
|
||||
)
|
||||
.await
|
||||
.expect("remote control should start");
|
||||
let mut status_rx = remote_handle.status_receiver();
|
||||
|
||||
let enroll_request = accept_http_request(&listener).await;
|
||||
assert_eq!(
|
||||
enroll_request.request_line,
|
||||
"POST /backend-api/wham/remote/control/server/enroll HTTP/1.1"
|
||||
);
|
||||
respond_with_json(
|
||||
enroll_request.stream,
|
||||
remote_control_server_token_response(
|
||||
"srv_e_test",
|
||||
"env_test",
|
||||
TEST_REMOTE_CONTROL_SERVER_TOKEN,
|
||||
),
|
||||
)
|
||||
.await;
|
||||
|
||||
let websocket_request = accept_http_request(&listener).await;
|
||||
assert_eq!(
|
||||
websocket_request.request_line,
|
||||
"GET /backend-api/wham/remote/control/server HTTP/1.1"
|
||||
);
|
||||
assert_eq!(
|
||||
websocket_request.headers.get("authorization"),
|
||||
Some(&format!("Bearer {TEST_REMOTE_CONTROL_SERVER_TOKEN}"))
|
||||
);
|
||||
respond_with_status(websocket_request.stream, "401 Unauthorized", "").await;
|
||||
|
||||
let refresh_request = accept_http_request(&listener).await;
|
||||
assert_eq!(
|
||||
refresh_request.request_line,
|
||||
"POST /backend-api/wham/remote/control/server/refresh HTTP/1.1"
|
||||
);
|
||||
respond_with_json(
|
||||
refresh_request.stream,
|
||||
remote_control_server_token_response(
|
||||
"srv_e_test",
|
||||
"env_test",
|
||||
TEST_REFRESHED_REMOTE_CONTROL_SERVER_TOKEN,
|
||||
),
|
||||
)
|
||||
.await;
|
||||
|
||||
let (handshake_request, _websocket) = accept_remote_control_backend_connection(&listener).await;
|
||||
expect_remote_control_status(
|
||||
&mut status_rx,
|
||||
/*expected_status*/ None,
|
||||
Some("env_test"),
|
||||
)
|
||||
.await;
|
||||
assert_eq!(
|
||||
handshake_request.headers.get("authorization"),
|
||||
Some(&format!(
|
||||
"Bearer {TEST_REFRESHED_REMOTE_CONTROL_SERVER_TOKEN}"
|
||||
))
|
||||
);
|
||||
|
||||
shutdown_token.cancel();
|
||||
let _ = remote_task.await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn remote_control_start_allows_remote_control_invalid_url_when_disabled() {
|
||||
let (transport_event_tx, _transport_event_rx) =
|
||||
@@ -696,7 +815,11 @@ async fn remote_control_handle_enable_disable_stops_and_restarts_connections() {
|
||||
);
|
||||
respond_with_json(
|
||||
enroll_request.stream,
|
||||
json!({ "server_id": "srv_e_test", "environment_id": "env_test" }),
|
||||
remote_control_server_token_response(
|
||||
"srv_e_test",
|
||||
"env_test",
|
||||
TEST_REMOTE_CONTROL_SERVER_TOKEN,
|
||||
),
|
||||
)
|
||||
.await;
|
||||
let mut first_websocket = accept_remote_control_connection(&listener).await;
|
||||
@@ -801,7 +924,11 @@ async fn remote_control_transport_clears_outgoing_buffer_when_backend_acks() {
|
||||
let enroll_request = accept_http_request(&listener).await;
|
||||
respond_with_json(
|
||||
enroll_request.stream,
|
||||
json!({ "server_id": "srv_e_test", "environment_id": "env_test" }),
|
||||
remote_control_server_token_response(
|
||||
"srv_e_test",
|
||||
"env_test",
|
||||
TEST_REMOTE_CONTROL_SERVER_TOKEN,
|
||||
),
|
||||
)
|
||||
.await;
|
||||
let mut first_websocket = accept_remote_control_connection(&listener).await;
|
||||
@@ -1008,7 +1135,11 @@ async fn remote_control_http_mode_enrolls_before_connecting() {
|
||||
);
|
||||
respond_with_json(
|
||||
enroll_request.stream,
|
||||
json!({ "server_id": "srv_e_test", "environment_id": "env_test" }),
|
||||
remote_control_server_token_response(
|
||||
"srv_e_test",
|
||||
"env_test",
|
||||
TEST_REMOTE_CONTROL_SERVER_TOKEN,
|
||||
),
|
||||
)
|
||||
.await;
|
||||
|
||||
@@ -1026,13 +1157,13 @@ async fn remote_control_http_mode_enrolls_before_connecting() {
|
||||
);
|
||||
assert_eq!(
|
||||
handshake_request.headers.get("authorization"),
|
||||
Some(&"Bearer Access Token".to_string())
|
||||
Some(&format!("Bearer {TEST_REMOTE_CONTROL_SERVER_TOKEN}"))
|
||||
);
|
||||
assert_eq!(
|
||||
handshake_request
|
||||
.headers
|
||||
.get(REMOTE_CONTROL_ACCOUNT_ID_HEADER),
|
||||
Some(&"account_id".to_string())
|
||||
None
|
||||
);
|
||||
assert_eq!(
|
||||
handshake_request
|
||||
@@ -1172,7 +1303,7 @@ async fn remote_control_http_mode_enrolls_before_connecting() {
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn remote_control_http_mode_reuses_persisted_enrollment_before_reenrolling() {
|
||||
async fn remote_control_http_mode_refreshes_persisted_enrollment_before_connecting() {
|
||||
let listener = TcpListener::bind("127.0.0.1:0")
|
||||
.await
|
||||
.expect("listener should bind");
|
||||
@@ -1186,6 +1317,8 @@ async fn remote_control_http_mode_reuses_persisted_enrollment_before_reenrolling
|
||||
environment_id: "env_persisted".to_string(),
|
||||
server_id: "srv_e_persisted".to_string(),
|
||||
server_name: "persisted-server".to_string(),
|
||||
remote_control_token: None,
|
||||
expires_at: None,
|
||||
};
|
||||
update_persisted_remote_control_enrollment(
|
||||
Some(state_db.as_ref()),
|
||||
@@ -1215,6 +1348,33 @@ async fn remote_control_http_mode_reuses_persisted_enrollment_before_reenrolling
|
||||
.await
|
||||
.expect("remote control should start");
|
||||
|
||||
let refresh_request = accept_http_request(&listener).await;
|
||||
assert_eq!(
|
||||
refresh_request.request_line,
|
||||
"POST /backend-api/wham/remote/control/server/refresh HTTP/1.1"
|
||||
);
|
||||
assert_eq!(
|
||||
refresh_request.headers.get("authorization"),
|
||||
Some(&"Bearer Access Token".to_string())
|
||||
);
|
||||
assert_eq!(
|
||||
serde_json::from_str::<serde_json::Value>(&refresh_request.body)
|
||||
.expect("refresh body should deserialize"),
|
||||
json!({
|
||||
"server_id": persisted_enrollment.server_id.clone(),
|
||||
"installation_id": TEST_INSTALLATION_ID,
|
||||
})
|
||||
);
|
||||
respond_with_json(
|
||||
refresh_request.stream,
|
||||
remote_control_server_token_response(
|
||||
&persisted_enrollment.server_id,
|
||||
&persisted_enrollment.environment_id,
|
||||
TEST_REFRESHED_REMOTE_CONTROL_SERVER_TOKEN,
|
||||
),
|
||||
)
|
||||
.await;
|
||||
|
||||
let (handshake_request, _websocket) = accept_remote_control_backend_connection(&listener).await;
|
||||
assert_eq!(
|
||||
handshake_request.path,
|
||||
@@ -1224,6 +1384,12 @@ async fn remote_control_http_mode_reuses_persisted_enrollment_before_reenrolling
|
||||
handshake_request.headers.get("x-codex-server-id"),
|
||||
Some(&persisted_enrollment.server_id)
|
||||
);
|
||||
assert_eq!(
|
||||
handshake_request.headers.get("authorization"),
|
||||
Some(&format!(
|
||||
"Bearer {TEST_REFRESHED_REMOTE_CONTROL_SERVER_TOKEN}"
|
||||
))
|
||||
);
|
||||
assert_eq!(
|
||||
load_persisted_remote_control_enrollment(
|
||||
Some(state_db.as_ref()),
|
||||
@@ -1256,6 +1422,8 @@ async fn remote_control_stdio_mode_waits_for_client_name_before_connecting() {
|
||||
environment_id: "env_persisted".to_string(),
|
||||
server_id: "srv_e_persisted".to_string(),
|
||||
server_name: "persisted-server".to_string(),
|
||||
remote_control_token: None,
|
||||
expires_at: None,
|
||||
};
|
||||
update_persisted_remote_control_enrollment(
|
||||
Some(state_db.as_ref()),
|
||||
@@ -1291,6 +1459,20 @@ async fn remote_control_stdio_mode_waits_for_client_name_before_connecting() {
|
||||
.expect_err("remote control should wait for the stdio client name");
|
||||
|
||||
let _ = app_server_client_name_tx.send(app_server_client_name.to_string());
|
||||
let refresh_request = accept_http_request(&listener).await;
|
||||
assert_eq!(
|
||||
refresh_request.request_line,
|
||||
"POST /backend-api/wham/remote/control/server/refresh HTTP/1.1"
|
||||
);
|
||||
respond_with_json(
|
||||
refresh_request.stream,
|
||||
remote_control_server_token_response(
|
||||
&persisted_enrollment.server_id,
|
||||
&persisted_enrollment.environment_id,
|
||||
TEST_REFRESHED_REMOTE_CONTROL_SERVER_TOKEN,
|
||||
),
|
||||
)
|
||||
.await;
|
||||
let (handshake_request, _websocket) = accept_remote_control_backend_connection(&listener).await;
|
||||
assert_eq!(
|
||||
handshake_request.headers.get("x-codex-server-id"),
|
||||
@@ -1328,6 +1510,8 @@ async fn remote_control_waits_for_account_id_before_enrolling() {
|
||||
environment_id: "env_ready".to_string(),
|
||||
server_id: "srv_e_ready".to_string(),
|
||||
server_name: expected_server_name,
|
||||
remote_control_token: None,
|
||||
expires_at: None,
|
||||
};
|
||||
|
||||
let (transport_event_tx, _transport_event_rx) =
|
||||
@@ -1369,10 +1553,11 @@ async fn remote_control_waits_for_account_id_before_enrolling() {
|
||||
);
|
||||
respond_with_json(
|
||||
enroll_request.stream,
|
||||
json!({
|
||||
"server_id": expected_enrollment.server_id,
|
||||
"environment_id": expected_enrollment.environment_id,
|
||||
}),
|
||||
remote_control_server_token_response(
|
||||
&expected_enrollment.server_id,
|
||||
&expected_enrollment.environment_id,
|
||||
TEST_REMOTE_CONTROL_SERVER_TOKEN,
|
||||
),
|
||||
)
|
||||
.await;
|
||||
|
||||
@@ -1387,7 +1572,7 @@ async fn remote_control_waits_for_account_id_before_enrolling() {
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn remote_control_http_mode_clears_stale_persisted_enrollment_after_404() {
|
||||
async fn remote_control_http_mode_reenrolls_when_refresh_reports_stale_enrollment() {
|
||||
let listener = TcpListener::bind("127.0.0.1:0")
|
||||
.await
|
||||
.expect("listener should bind");
|
||||
@@ -1402,12 +1587,16 @@ async fn remote_control_http_mode_clears_stale_persisted_enrollment_after_404()
|
||||
environment_id: "env_stale".to_string(),
|
||||
server_id: "srv_e_stale".to_string(),
|
||||
server_name: "stale-server".to_string(),
|
||||
remote_control_token: None,
|
||||
expires_at: None,
|
||||
};
|
||||
let refreshed_enrollment = RemoteControlEnrollment {
|
||||
account_id: "account_id".to_string(),
|
||||
environment_id: "env_refreshed".to_string(),
|
||||
server_id: "srv_e_refreshed".to_string(),
|
||||
server_name: expected_server_name,
|
||||
remote_control_token: None,
|
||||
expires_at: None,
|
||||
};
|
||||
update_persisted_remote_control_enrollment(
|
||||
Some(state_db.as_ref()),
|
||||
@@ -1438,6 +1627,138 @@ async fn remote_control_http_mode_clears_stale_persisted_enrollment_after_404()
|
||||
.expect("remote control should start");
|
||||
let mut status_rx = remote_handle.status_receiver();
|
||||
|
||||
let refresh_request = accept_http_request(&listener).await;
|
||||
assert_eq!(
|
||||
refresh_request.request_line,
|
||||
"POST /backend-api/wham/remote/control/server/refresh HTTP/1.1"
|
||||
);
|
||||
expect_remote_control_status(
|
||||
&mut status_rx,
|
||||
/*expected_status*/ None,
|
||||
Some("env_stale"),
|
||||
)
|
||||
.await;
|
||||
respond_with_status(refresh_request.stream, "404 Not Found", "").await;
|
||||
expect_remote_control_status(
|
||||
&mut status_rx,
|
||||
/*expected_status*/ None,
|
||||
/*expected_environment_id*/ None,
|
||||
)
|
||||
.await;
|
||||
|
||||
let enroll_request = accept_http_request(&listener).await;
|
||||
assert_eq!(
|
||||
enroll_request.request_line,
|
||||
"POST /backend-api/wham/remote/control/server/enroll HTTP/1.1"
|
||||
);
|
||||
respond_with_json(
|
||||
enroll_request.stream,
|
||||
remote_control_server_token_response(
|
||||
&refreshed_enrollment.server_id,
|
||||
&refreshed_enrollment.environment_id,
|
||||
TEST_REMOTE_CONTROL_SERVER_TOKEN,
|
||||
),
|
||||
)
|
||||
.await;
|
||||
|
||||
let (handshake_request, _websocket) = accept_remote_control_backend_connection(&listener).await;
|
||||
expect_remote_control_status(
|
||||
&mut status_rx,
|
||||
/*expected_status*/ None,
|
||||
Some("env_refreshed"),
|
||||
)
|
||||
.await;
|
||||
assert_eq!(
|
||||
handshake_request.headers.get("x-codex-server-id"),
|
||||
Some(&refreshed_enrollment.server_id)
|
||||
);
|
||||
assert_eq!(
|
||||
load_persisted_remote_control_enrollment(
|
||||
Some(state_db.as_ref()),
|
||||
&remote_control_target,
|
||||
"account_id",
|
||||
/*app_server_client_name*/ None,
|
||||
)
|
||||
.await
|
||||
.expect("refreshed enrollment should load"),
|
||||
Some(refreshed_enrollment)
|
||||
);
|
||||
|
||||
shutdown_token.cancel();
|
||||
let _ = remote_task.await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn remote_control_http_mode_clears_stale_persisted_enrollment_after_404() {
|
||||
let listener = TcpListener::bind("127.0.0.1:0")
|
||||
.await
|
||||
.expect("listener should bind");
|
||||
let remote_control_url = remote_control_url_for_listener(&listener);
|
||||
let codex_home = TempDir::new().expect("temp dir should create");
|
||||
let state_db = remote_control_state_runtime(&codex_home).await;
|
||||
let remote_control_target =
|
||||
normalize_remote_control_url(&remote_control_url).expect("target should parse");
|
||||
let expected_server_name = gethostname().to_string_lossy().trim().to_string();
|
||||
let stale_enrollment = RemoteControlEnrollment {
|
||||
account_id: "account_id".to_string(),
|
||||
environment_id: "env_stale".to_string(),
|
||||
server_id: "srv_e_stale".to_string(),
|
||||
server_name: "stale-server".to_string(),
|
||||
remote_control_token: None,
|
||||
expires_at: None,
|
||||
};
|
||||
let refreshed_enrollment = RemoteControlEnrollment {
|
||||
account_id: "account_id".to_string(),
|
||||
environment_id: "env_refreshed".to_string(),
|
||||
server_id: "srv_e_refreshed".to_string(),
|
||||
server_name: expected_server_name,
|
||||
remote_control_token: None,
|
||||
expires_at: None,
|
||||
};
|
||||
update_persisted_remote_control_enrollment(
|
||||
Some(state_db.as_ref()),
|
||||
&remote_control_target,
|
||||
"account_id",
|
||||
/*app_server_client_name*/ None,
|
||||
Some(&stale_enrollment),
|
||||
)
|
||||
.await
|
||||
.expect("stale enrollment should save");
|
||||
|
||||
let (transport_event_tx, _transport_event_rx) =
|
||||
mpsc::channel::<TransportEvent>(CHANNEL_CAPACITY);
|
||||
let shutdown_token = CancellationToken::new();
|
||||
let (remote_task, remote_handle) = start_remote_control(
|
||||
RemoteControlStartConfig {
|
||||
remote_control_url,
|
||||
installation_id: TEST_INSTALLATION_ID.to_string(),
|
||||
},
|
||||
Some(state_db.clone()),
|
||||
remote_control_auth_manager_with_home(&codex_home),
|
||||
transport_event_tx,
|
||||
shutdown_token.clone(),
|
||||
/*app_server_client_name_rx*/ None,
|
||||
/*initial_enabled*/ true,
|
||||
)
|
||||
.await
|
||||
.expect("remote control should start");
|
||||
let mut status_rx = remote_handle.status_receiver();
|
||||
|
||||
let refresh_request = accept_http_request(&listener).await;
|
||||
assert_eq!(
|
||||
refresh_request.request_line,
|
||||
"POST /backend-api/wham/remote/control/server/refresh HTTP/1.1"
|
||||
);
|
||||
respond_with_json(
|
||||
refresh_request.stream,
|
||||
remote_control_server_token_response(
|
||||
&stale_enrollment.server_id,
|
||||
&stale_enrollment.environment_id,
|
||||
TEST_REFRESHED_REMOTE_CONTROL_SERVER_TOKEN,
|
||||
),
|
||||
)
|
||||
.await;
|
||||
|
||||
let websocket_request = accept_http_request(&listener).await;
|
||||
assert_eq!(
|
||||
websocket_request.request_line,
|
||||
@@ -1468,10 +1789,11 @@ async fn remote_control_http_mode_clears_stale_persisted_enrollment_after_404()
|
||||
);
|
||||
respond_with_json(
|
||||
enroll_request.stream,
|
||||
json!({
|
||||
"server_id": refreshed_enrollment.server_id,
|
||||
"environment_id": refreshed_enrollment.environment_id,
|
||||
}),
|
||||
remote_control_server_token_response(
|
||||
&refreshed_enrollment.server_id,
|
||||
&refreshed_enrollment.environment_id,
|
||||
TEST_REMOTE_CONTROL_SERVER_TOKEN,
|
||||
),
|
||||
)
|
||||
.await;
|
||||
|
||||
|
||||
@@ -7,6 +7,7 @@ use crate::transport::remote_control::enroll::enroll_remote_control_server;
|
||||
use crate::transport::remote_control::enroll::format_headers;
|
||||
use crate::transport::remote_control::enroll::load_persisted_remote_control_enrollment;
|
||||
use crate::transport::remote_control::enroll::preview_remote_control_response_body;
|
||||
use crate::transport::remote_control::enroll::refresh_remote_control_server;
|
||||
use crate::transport::remote_control::enroll::update_persisted_remote_control_enrollment;
|
||||
|
||||
use super::protocol::ClientEnvelope;
|
||||
@@ -55,7 +56,6 @@ use tracing::info;
|
||||
use tracing::warn;
|
||||
|
||||
pub(super) const REMOTE_CONTROL_PROTOCOL_VERSION: &str = "3";
|
||||
pub(super) const REMOTE_CONTROL_ACCOUNT_ID_HEADER: &str = "chatgpt-account-id";
|
||||
pub(super) const REMOTE_CONTROL_INSTALLATION_ID_HEADER: &str = "x-codex-installation-id";
|
||||
const REMOTE_CONTROL_SUBSCRIBE_CURSOR_HEADER: &str = "x-codex-subscribe-cursor";
|
||||
const REMOTE_CONTROL_WEBSOCKET_PING_INTERVAL: std::time::Duration =
|
||||
@@ -1119,7 +1119,6 @@ fn set_remote_control_header(
|
||||
fn build_remote_control_websocket_request(
|
||||
websocket_url: &str,
|
||||
enrollment: &RemoteControlEnrollment,
|
||||
auth: &RemoteControlConnectionAuth,
|
||||
installation_id: &str,
|
||||
subscribe_cursor: Option<&str>,
|
||||
) -> io::Result<tungstenite::http::Request<()>> {
|
||||
@@ -1141,10 +1140,17 @@ fn build_remote_control_websocket_request(
|
||||
"x-codex-protocol-version",
|
||||
REMOTE_CONTROL_PROTOCOL_VERSION,
|
||||
)?;
|
||||
let mut auth_headers = tungstenite::http::HeaderMap::new();
|
||||
auth.auth_provider.add_auth_headers(&mut auth_headers);
|
||||
headers.extend(auth_headers);
|
||||
set_remote_control_header(headers, REMOTE_CONTROL_ACCOUNT_ID_HEADER, &auth.account_id)?;
|
||||
set_remote_control_header(
|
||||
headers,
|
||||
"authorization",
|
||||
&format!(
|
||||
"Bearer {}",
|
||||
enrollment
|
||||
.remote_control_token
|
||||
.as_deref()
|
||||
.ok_or_else(|| io::Error::other("missing remote control server token"))?
|
||||
),
|
||||
)?;
|
||||
set_remote_control_header(
|
||||
headers,
|
||||
REMOTE_CONTROL_INSTALLATION_ID_HEADER,
|
||||
@@ -1219,7 +1225,7 @@ fn next_reconnect_delay(reconnect_attempt: &mut u64) -> (std::time::Duration, bo
|
||||
pub(super) async fn connect_remote_control_websocket(
|
||||
remote_control_target: &RemoteControlTarget,
|
||||
state_db: Option<&StateRuntime>,
|
||||
auth_context: RemoteControlAuthContext<'_>,
|
||||
mut auth_context: RemoteControlAuthContext<'_>,
|
||||
enrollment: &mut Option<RemoteControlEnrollment>,
|
||||
connect_options: RemoteControlConnectOptions<'_>,
|
||||
status_publisher: &RemoteControlStatusPublisher,
|
||||
@@ -1228,11 +1234,6 @@ pub(super) async fn connect_remote_control_websocket(
|
||||
tungstenite::http::Response<()>,
|
||||
)> {
|
||||
ensure_rustls_crypto_provider();
|
||||
let RemoteControlAuthContext {
|
||||
auth_manager,
|
||||
auth_recovery,
|
||||
auth_change_rx,
|
||||
} = auth_context;
|
||||
|
||||
let Some(state_db) = state_db else {
|
||||
*enrollment = None;
|
||||
@@ -1242,7 +1243,7 @@ pub(super) async fn connect_remote_control_websocket(
|
||||
));
|
||||
};
|
||||
|
||||
let auth = match load_remote_control_auth(auth_manager).await {
|
||||
let auth = match load_remote_control_auth(auth_context.auth_manager).await {
|
||||
Ok(auth) => auth,
|
||||
Err(err) => {
|
||||
if err.kind() == ErrorKind::PermissionDenied {
|
||||
@@ -1287,52 +1288,87 @@ pub(super) async fn connect_remote_control_websocket(
|
||||
});
|
||||
}
|
||||
|
||||
if enrollment.is_none() {
|
||||
enroll_remote_control_server_if_missing(
|
||||
remote_control_target,
|
||||
state_db,
|
||||
&auth,
|
||||
&mut auth_context,
|
||||
enrollment,
|
||||
connect_options,
|
||||
status_publisher,
|
||||
)
|
||||
.await?;
|
||||
|
||||
if enrollment
|
||||
.as_ref()
|
||||
.ok_or_else(|| io::Error::other("missing remote control enrollment after enrollment step"))?
|
||||
.should_refresh_server_token()
|
||||
{
|
||||
let enrollment_ref = enrollment.as_ref().ok_or_else(|| {
|
||||
io::Error::other("missing remote control enrollment after enrollment step")
|
||||
})?;
|
||||
let server_id = enrollment_ref.server_id.clone();
|
||||
let environment_id = enrollment_ref.environment_id.clone();
|
||||
|
||||
info!(
|
||||
"creating new remote control enrollment: websocket_url={}, enroll_url={}, account_id={}",
|
||||
remote_control_target.websocket_url, remote_control_target.enroll_url, auth.account_id
|
||||
"refreshing remote control server token: websocket_url={}, refresh_url={}, account_id={}, server_id={}, environment_id={}",
|
||||
remote_control_target.websocket_url,
|
||||
remote_control_target.refresh_url,
|
||||
auth.account_id,
|
||||
server_id,
|
||||
environment_id
|
||||
);
|
||||
let new_enrollment = match enroll_remote_control_server(
|
||||
let enrollment_ref = enrollment.as_mut().ok_or_else(|| {
|
||||
io::Error::other("missing remote control enrollment before server refresh")
|
||||
})?;
|
||||
match refresh_remote_control_server(
|
||||
remote_control_target,
|
||||
&auth,
|
||||
connect_options.installation_id,
|
||||
connect_options.server_name,
|
||||
enrollment_ref,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(new_enrollment) => new_enrollment,
|
||||
Ok(()) => {}
|
||||
Err(err) if err.kind() == ErrorKind::NotFound => {
|
||||
info!(
|
||||
"remote control server refresh returned HTTP 404; clearing stale enrollment before re-enrolling: websocket_url={}, account_id={}, server_id={}, environment_id={}",
|
||||
remote_control_target.websocket_url, auth.account_id, server_id, environment_id
|
||||
);
|
||||
clear_remote_control_enrollment(
|
||||
state_db,
|
||||
remote_control_target,
|
||||
&auth.account_id,
|
||||
connect_options.app_server_client_name,
|
||||
enrollment,
|
||||
status_publisher,
|
||||
)
|
||||
.await;
|
||||
enroll_remote_control_server_if_missing(
|
||||
remote_control_target,
|
||||
state_db,
|
||||
&auth,
|
||||
&mut auth_context,
|
||||
enrollment,
|
||||
connect_options,
|
||||
status_publisher,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
Err(err)
|
||||
if err.kind() == ErrorKind::PermissionDenied
|
||||
&& recover_remote_control_auth(auth_recovery, auth_change_rx).await =>
|
||||
&& recover_remote_control_auth(
|
||||
auth_context.auth_recovery,
|
||||
auth_context.auth_change_rx,
|
||||
)
|
||||
.await =>
|
||||
{
|
||||
return Err(io::Error::other(format!(
|
||||
"{err}; retrying after auth recovery"
|
||||
)));
|
||||
}
|
||||
Err(err) => return Err(err),
|
||||
};
|
||||
if let Err(err) = update_persisted_remote_control_enrollment(
|
||||
Some(state_db),
|
||||
remote_control_target,
|
||||
&auth.account_id,
|
||||
connect_options.app_server_client_name,
|
||||
Some(&new_enrollment),
|
||||
)
|
||||
.await
|
||||
{
|
||||
return Err(io::Error::other(format!(
|
||||
"failed to persist remote control enrollment in sqlite state db: {err}"
|
||||
)));
|
||||
}
|
||||
info!(
|
||||
"created new remote control enrollment: websocket_url={}, account_id={}, server_id={}, environment_id={}",
|
||||
remote_control_target.websocket_url,
|
||||
new_enrollment.account_id,
|
||||
new_enrollment.server_id,
|
||||
new_enrollment.environment_id
|
||||
);
|
||||
status_publisher.publish_environment_id(Some(new_enrollment.environment_id.clone()));
|
||||
*enrollment = Some(new_enrollment);
|
||||
}
|
||||
|
||||
let enrollment_ref = enrollment.as_ref().ok_or_else(|| {
|
||||
@@ -1341,7 +1377,6 @@ pub(super) async fn connect_remote_control_websocket(
|
||||
let request = build_remote_control_websocket_request(
|
||||
&remote_control_target.websocket_url,
|
||||
enrollment_ref,
|
||||
&auth,
|
||||
connect_options.installation_id,
|
||||
connect_options.subscribe_cursor,
|
||||
)?;
|
||||
@@ -1373,28 +1408,29 @@ pub(super) async fn connect_remote_control_websocket(
|
||||
enrollment_ref.server_id,
|
||||
enrollment_ref.environment_id
|
||||
);
|
||||
if let Err(clear_err) = update_persisted_remote_control_enrollment(
|
||||
Some(state_db),
|
||||
clear_remote_control_enrollment(
|
||||
state_db,
|
||||
remote_control_target,
|
||||
&auth.account_id,
|
||||
connect_options.app_server_client_name,
|
||||
/*enrollment*/ None,
|
||||
enrollment,
|
||||
status_publisher,
|
||||
)
|
||||
.await
|
||||
{
|
||||
warn!(
|
||||
"failed to clear stale remote control enrollment in sqlite state db: {clear_err}"
|
||||
);
|
||||
}
|
||||
*enrollment = None;
|
||||
status_publisher.publish_environment_id(/*environment_id*/ None);
|
||||
.await;
|
||||
}
|
||||
tungstenite::Error::Http(response)
|
||||
if matches!(response.status().as_u16(), 401 | 403)
|
||||
&& recover_remote_control_auth(auth_recovery, auth_change_rx).await =>
|
||||
if matches!(response.status().as_u16(), 401 | 403) =>
|
||||
{
|
||||
enrollment
|
||||
.as_mut()
|
||||
.ok_or_else(|| {
|
||||
io::Error::other(
|
||||
"missing remote control enrollment after websocket auth failure",
|
||||
)
|
||||
})?
|
||||
.clear_server_token();
|
||||
return Err(io::Error::other(format!(
|
||||
"remote control websocket auth failed with HTTP {}; retrying after auth recovery",
|
||||
"remote control websocket auth failed with HTTP {}; refreshing server token before reconnect",
|
||||
response.status()
|
||||
)));
|
||||
}
|
||||
@@ -1410,6 +1446,94 @@ pub(super) async fn connect_remote_control_websocket(
|
||||
}
|
||||
}
|
||||
|
||||
async fn clear_remote_control_enrollment(
|
||||
state_db: &StateRuntime,
|
||||
remote_control_target: &RemoteControlTarget,
|
||||
account_id: &str,
|
||||
app_server_client_name: Option<&str>,
|
||||
enrollment: &mut Option<RemoteControlEnrollment>,
|
||||
status_publisher: &RemoteControlStatusPublisher,
|
||||
) {
|
||||
if let Err(clear_err) = update_persisted_remote_control_enrollment(
|
||||
Some(state_db),
|
||||
remote_control_target,
|
||||
account_id,
|
||||
app_server_client_name,
|
||||
/*enrollment*/ None,
|
||||
)
|
||||
.await
|
||||
{
|
||||
warn!("failed to clear stale remote control enrollment in sqlite state db: {clear_err}");
|
||||
}
|
||||
*enrollment = None;
|
||||
status_publisher.publish_environment_id(/*environment_id*/ None);
|
||||
}
|
||||
|
||||
async fn enroll_remote_control_server_if_missing(
|
||||
remote_control_target: &RemoteControlTarget,
|
||||
state_db: &StateRuntime,
|
||||
auth: &RemoteControlConnectionAuth,
|
||||
auth_context: &mut RemoteControlAuthContext<'_>,
|
||||
enrollment: &mut Option<RemoteControlEnrollment>,
|
||||
connect_options: RemoteControlConnectOptions<'_>,
|
||||
status_publisher: &RemoteControlStatusPublisher,
|
||||
) -> io::Result<()> {
|
||||
if enrollment.is_some() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
info!(
|
||||
"creating new remote control enrollment: websocket_url={}, enroll_url={}, account_id={}",
|
||||
remote_control_target.websocket_url, remote_control_target.enroll_url, auth.account_id
|
||||
);
|
||||
let new_enrollment = match enroll_remote_control_server(
|
||||
remote_control_target,
|
||||
auth,
|
||||
connect_options.installation_id,
|
||||
connect_options.server_name,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(new_enrollment) => new_enrollment,
|
||||
Err(err)
|
||||
if err.kind() == ErrorKind::PermissionDenied
|
||||
&& recover_remote_control_auth(
|
||||
auth_context.auth_recovery,
|
||||
auth_context.auth_change_rx,
|
||||
)
|
||||
.await =>
|
||||
{
|
||||
return Err(io::Error::other(format!(
|
||||
"{err}; retrying after auth recovery"
|
||||
)));
|
||||
}
|
||||
Err(err) => return Err(err),
|
||||
};
|
||||
if let Err(err) = update_persisted_remote_control_enrollment(
|
||||
Some(state_db),
|
||||
remote_control_target,
|
||||
&auth.account_id,
|
||||
connect_options.app_server_client_name,
|
||||
Some(&new_enrollment),
|
||||
)
|
||||
.await
|
||||
{
|
||||
return Err(io::Error::other(format!(
|
||||
"failed to persist remote control enrollment in sqlite state db: {err}"
|
||||
)));
|
||||
}
|
||||
info!(
|
||||
"created new remote control enrollment: websocket_url={}, account_id={}, server_id={}, environment_id={}",
|
||||
remote_control_target.websocket_url,
|
||||
new_enrollment.account_id,
|
||||
new_enrollment.server_id,
|
||||
new_enrollment.environment_id
|
||||
);
|
||||
status_publisher.publish_environment_id(Some(new_enrollment.environment_id.clone()));
|
||||
*enrollment = Some(new_enrollment);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn recover_remote_control_auth(
|
||||
auth_recovery: &mut UnauthorizedRecovery,
|
||||
auth_change_rx: &mut watch::Receiver<u64>,
|
||||
@@ -1519,6 +1643,19 @@ mod tests {
|
||||
#[cfg(not(windows))]
|
||||
const TEST_HTTP_ACCEPT_TIMEOUT: Duration = Duration::from_secs(5);
|
||||
const TEST_INSTALLATION_ID: &str = "11111111-1111-4111-8111-111111111111";
|
||||
const TEST_REMOTE_CONTROL_SERVER_TOKEN: &str = "Remote Control Token";
|
||||
|
||||
fn remote_control_enrollment(remote_control_token: Option<&str>) -> RemoteControlEnrollment {
|
||||
RemoteControlEnrollment {
|
||||
account_id: "account_id".to_string(),
|
||||
environment_id: "env_test".to_string(),
|
||||
server_id: "srv_e_test".to_string(),
|
||||
server_name: "test-server".to_string(),
|
||||
remote_control_token: remote_control_token.map(str::to_string),
|
||||
expires_at: remote_control_token
|
||||
.map(|_| time::OffsetDateTime::now_utc() + time::Duration::hours(1)),
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn next_reconnect_delay_resets_after_cap() {
|
||||
@@ -1670,12 +1807,9 @@ mod tests {
|
||||
let auth_manager = remote_control_auth_manager();
|
||||
let mut auth_recovery = auth_manager.unauthorized_recovery();
|
||||
let mut auth_change_rx = auth_manager.auth_change_receiver();
|
||||
let mut enrollment = Some(RemoteControlEnrollment {
|
||||
account_id: "account_id".to_string(),
|
||||
environment_id: "env_test".to_string(),
|
||||
server_id: "srv_e_test".to_string(),
|
||||
server_name: "test-server".to_string(),
|
||||
});
|
||||
let mut enrollment = Some(remote_control_enrollment(Some(
|
||||
TEST_REMOTE_CONTROL_SERVER_TOKEN,
|
||||
)));
|
||||
let (status_publisher, status_rx) = remote_control_status_channel();
|
||||
|
||||
let err = match connect_remote_control_websocket(
|
||||
@@ -1715,7 +1849,7 @@ mod tests {
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn connect_remote_control_websocket_recovers_after_unauthorized_reload() {
|
||||
async fn connect_remote_control_websocket_invalidates_unauthorized_server_token() {
|
||||
let listener = TcpListener::bind("127.0.0.1:0")
|
||||
.await
|
||||
.expect("listener should bind");
|
||||
@@ -1723,35 +1857,14 @@ mod tests {
|
||||
let remote_control_target =
|
||||
normalize_remote_control_url(&remote_control_url).expect("target should parse");
|
||||
let codex_home = TempDir::new().expect("temp dir should create");
|
||||
save_auth(
|
||||
codex_home.path(),
|
||||
&remote_control_auth_dot_json("stale-token"),
|
||||
AuthCredentialsStoreMode::File,
|
||||
)
|
||||
.expect("stale auth should save");
|
||||
let state_db = remote_control_state_runtime(&codex_home).await;
|
||||
let auth_manager = AuthManager::shared(
|
||||
codex_home.path().to_path_buf(),
|
||||
/*enable_codex_api_key_env*/ false,
|
||||
AuthCredentialsStoreMode::File,
|
||||
/*chatgpt_base_url*/ None,
|
||||
)
|
||||
.await;
|
||||
let auth_manager = remote_control_auth_manager();
|
||||
let mut auth_recovery = auth_manager.unauthorized_recovery();
|
||||
let mut auth_change_rx = auth_manager.auth_change_receiver();
|
||||
let mut enrollment = Some(RemoteControlEnrollment {
|
||||
account_id: "account_id".to_string(),
|
||||
environment_id: "env_test".to_string(),
|
||||
server_id: "srv_e_test".to_string(),
|
||||
server_name: "test-server".to_string(),
|
||||
});
|
||||
let mut enrollment = Some(remote_control_enrollment(Some(
|
||||
TEST_REMOTE_CONTROL_SERVER_TOKEN,
|
||||
)));
|
||||
let (status_publisher, status_rx) = remote_control_status_channel();
|
||||
save_auth(
|
||||
codex_home.path(),
|
||||
&remote_control_auth_dot_json("fresh-token"),
|
||||
AuthCredentialsStoreMode::File,
|
||||
)
|
||||
.expect("fresh auth should save");
|
||||
|
||||
let server_task = tokio::spawn(async move {
|
||||
let (stream, request_line) = accept_http_request(&listener).await;
|
||||
@@ -1794,22 +1907,13 @@ mod tests {
|
||||
);
|
||||
assert_eq!(
|
||||
err.to_string(),
|
||||
"remote control websocket auth failed with HTTP 401 Unauthorized; retrying after auth recovery"
|
||||
"remote control websocket auth failed with HTTP 401 Unauthorized; refreshing server token before reconnect"
|
||||
);
|
||||
assert_eq!(
|
||||
auth_manager
|
||||
.auth()
|
||||
.await
|
||||
.expect("auth should remain available")
|
||||
.get_token()
|
||||
.expect("token should be readable"),
|
||||
"fresh-token"
|
||||
);
|
||||
assert!(
|
||||
!auth_change_rx
|
||||
.has_changed()
|
||||
.expect("auth change watch should remain open"),
|
||||
"recovery's own auth reload should not wake the reconnect loop"
|
||||
enrollment,
|
||||
Some(remote_control_enrollment(
|
||||
/*remote_control_token*/ None
|
||||
))
|
||||
);
|
||||
}
|
||||
|
||||
@@ -1905,6 +2009,104 @@ mod tests {
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn connect_remote_control_websocket_recovers_after_unauthorized_refresh() {
|
||||
let listener = TcpListener::bind("127.0.0.1:0")
|
||||
.await
|
||||
.expect("listener should bind");
|
||||
let remote_control_url = remote_control_url_for_listener(&listener);
|
||||
let remote_control_target =
|
||||
normalize_remote_control_url(&remote_control_url).expect("target should parse");
|
||||
let refresh_url = remote_control_target.refresh_url.clone();
|
||||
let server_task = tokio::spawn(async move {
|
||||
let (stream, request_line) = accept_http_request(&listener).await;
|
||||
assert_eq!(
|
||||
request_line,
|
||||
"POST /backend-api/wham/remote/control/server/refresh HTTP/1.1"
|
||||
);
|
||||
respond_with_status_and_headers(stream, "401 Unauthorized", &[], "unauthorized").await;
|
||||
});
|
||||
let codex_home = TempDir::new().expect("temp dir should create");
|
||||
save_auth(
|
||||
codex_home.path(),
|
||||
&remote_control_auth_dot_json("stale-token"),
|
||||
AuthCredentialsStoreMode::File,
|
||||
)
|
||||
.expect("stale auth should save");
|
||||
let state_db = remote_control_state_runtime(&codex_home).await;
|
||||
let auth_manager = AuthManager::shared(
|
||||
codex_home.path().to_path_buf(),
|
||||
/*enable_codex_api_key_env*/ false,
|
||||
AuthCredentialsStoreMode::File,
|
||||
/*chatgpt_base_url*/ None,
|
||||
)
|
||||
.await;
|
||||
let mut auth_recovery = auth_manager.unauthorized_recovery();
|
||||
let mut auth_change_rx = auth_manager.auth_change_receiver();
|
||||
let mut enrollment = Some(remote_control_enrollment(
|
||||
/*remote_control_token*/ None,
|
||||
));
|
||||
let (status_publisher, status_rx) = remote_control_status_channel();
|
||||
save_auth(
|
||||
codex_home.path(),
|
||||
&remote_control_auth_dot_json("fresh-token"),
|
||||
AuthCredentialsStoreMode::File,
|
||||
)
|
||||
.expect("fresh auth should save");
|
||||
|
||||
let err = connect_remote_control_websocket(
|
||||
&remote_control_target,
|
||||
Some(state_db.as_ref()),
|
||||
RemoteControlAuthContext {
|
||||
auth_manager: &auth_manager,
|
||||
auth_recovery: &mut auth_recovery,
|
||||
auth_change_rx: &mut auth_change_rx,
|
||||
},
|
||||
&mut enrollment,
|
||||
RemoteControlConnectOptions {
|
||||
installation_id: TEST_INSTALLATION_ID,
|
||||
server_name: "test-server",
|
||||
subscribe_cursor: None,
|
||||
app_server_client_name: None,
|
||||
},
|
||||
&status_publisher,
|
||||
)
|
||||
.await
|
||||
.expect_err("unauthorized refresh should fail the websocket connect");
|
||||
|
||||
server_task.await.expect("server task should succeed");
|
||||
assert_eq!(
|
||||
status_rx.borrow().clone(),
|
||||
RemoteControlStatusChangedNotification {
|
||||
status: RemoteControlConnectionStatus::Connecting,
|
||||
server_name: "test-server".to_string(),
|
||||
installation_id: TEST_INSTALLATION_ID.to_string(),
|
||||
environment_id: Some("env_test".to_string()),
|
||||
}
|
||||
);
|
||||
assert_eq!(
|
||||
err.to_string(),
|
||||
format!(
|
||||
"remote control server refresh failed at `{refresh_url}`: HTTP 401 Unauthorized, request-id: <none>, cf-ray: <none>, body: unauthorized; retrying after auth recovery"
|
||||
)
|
||||
);
|
||||
assert_eq!(
|
||||
auth_manager
|
||||
.auth()
|
||||
.await
|
||||
.expect("auth should remain available")
|
||||
.get_token()
|
||||
.expect("token should be readable"),
|
||||
"fresh-token"
|
||||
);
|
||||
assert!(
|
||||
!auth_change_rx
|
||||
.has_changed()
|
||||
.expect("auth change watch should remain open"),
|
||||
"recovery's own auth reload should not wake the reconnect loop"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn connect_remote_control_websocket_requires_sqlite_state_db() {
|
||||
let remote_control_target = normalize_remote_control_url("http://127.0.0.1:9/backend-api/")
|
||||
@@ -1912,12 +2114,9 @@ mod tests {
|
||||
let auth_manager = remote_control_auth_manager();
|
||||
let mut auth_recovery = auth_manager.unauthorized_recovery();
|
||||
let mut auth_change_rx = auth_manager.auth_change_receiver();
|
||||
let mut enrollment = Some(RemoteControlEnrollment {
|
||||
account_id: "account_id".to_string(),
|
||||
environment_id: "env_test".to_string(),
|
||||
server_id: "srv_e_test".to_string(),
|
||||
server_name: "test-server".to_string(),
|
||||
});
|
||||
let mut enrollment = Some(remote_control_enrollment(Some(
|
||||
TEST_REMOTE_CONTROL_SERVER_TOKEN,
|
||||
)));
|
||||
let (status_publisher, _status_rx) = remote_control_status_channel();
|
||||
|
||||
let err = connect_remote_control_websocket(
|
||||
@@ -1960,12 +2159,9 @@ mod tests {
|
||||
.await;
|
||||
let mut auth_recovery = auth_manager.unauthorized_recovery();
|
||||
let mut auth_change_rx = auth_manager.auth_change_receiver();
|
||||
let mut enrollment = Some(RemoteControlEnrollment {
|
||||
account_id: "account_id".to_string(),
|
||||
environment_id: "env_test".to_string(),
|
||||
server_id: "srv_e_test".to_string(),
|
||||
server_name: "test-server".to_string(),
|
||||
});
|
||||
let mut enrollment = Some(remote_control_enrollment(Some(
|
||||
TEST_REMOTE_CONTROL_SERVER_TOKEN,
|
||||
)));
|
||||
let (status_publisher, mut status_rx) = remote_control_status_channel();
|
||||
status_publisher.publish_environment_id(Some("env_test".to_string()));
|
||||
status_rx
|
||||
|
||||
Reference in New Issue
Block a user