Compare commits

...

1 Commits

Author SHA1 Message Date
Ruslan Nigmatullin
403caeace6 Make remote control auth errors recoverable 2026-04-10 23:10:05 -07:00
5 changed files with 383 additions and 46 deletions

View File

@@ -27,6 +27,8 @@ use crate::outgoing_message::QueuedOutgoingMessage;
use crate::transport::CHANNEL_CAPACITY;
use crate::transport::ConnectionState;
use crate::transport::OutboundConnectionState;
use crate::transport::RemoteControlAuthStartup;
use crate::transport::RemoteControlStartup;
use crate::transport::TransportEvent;
use crate::transport::auth::policy_from_settings;
use crate::transport::route_outgoing_envelope;
@@ -572,6 +574,16 @@ pub async fn run_main_with_transport(
));
}
let remote_control_startup = if remote_control_enabled {
let auth = if transport_accept_handles.is_empty() {
RemoteControlAuthStartup::RequireReady
} else {
RemoteControlAuthStartup::AllowRecoverable
};
RemoteControlStartup::Enabled { auth }
} else {
RemoteControlStartup::Disabled
};
let (remote_control_accept_handle, remote_control_handle) = start_remote_control(
config.chatgpt_base_url.clone(),
state_db.clone(),
@@ -579,7 +591,7 @@ pub async fn run_main_with_transport(
transport_event_tx.clone(),
transport_shutdown_token.clone(),
app_server_client_name_rx,
remote_control_enabled,
remote_control_startup,
)
.await?;
transport_accept_handles.push(remote_control_accept_handle);

View File

@@ -33,7 +33,9 @@ mod remote_control;
mod stdio;
mod websocket;
pub(crate) use remote_control::RemoteControlAuthStartup;
pub(crate) use remote_control::RemoteControlHandle;
pub(crate) use remote_control::RemoteControlStartup;
pub(crate) use remote_control::start_remote_control;
pub(crate) use stdio::start_stdio_connection;
pub(crate) use websocket::start_websocket_acceptor;

View File

@@ -45,6 +45,18 @@ impl RemoteControlHandle {
}
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub(crate) enum RemoteControlAuthStartup {
AllowRecoverable,
RequireReady,
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub(crate) enum RemoteControlStartup {
Disabled,
Enabled { auth: RemoteControlAuthStartup },
}
pub(crate) async fn start_remote_control(
remote_control_url: String,
state_db: Option<Arc<StateRuntime>>,
@@ -52,17 +64,26 @@ pub(crate) async fn start_remote_control(
transport_event_tx: mpsc::Sender<TransportEvent>,
shutdown_token: CancellationToken,
app_server_client_name_rx: Option<oneshot::Receiver<String>>,
initial_enabled: bool,
startup: RemoteControlStartup,
) -> io::Result<(JoinHandle<()>, RemoteControlHandle)> {
let remote_control_target = if initial_enabled {
Some(normalize_remote_control_url(&remote_control_url)?)
} else {
None
let remote_control_target = match startup {
RemoteControlStartup::Enabled { .. } => {
Some(normalize_remote_control_url(&remote_control_url)?)
}
RemoteControlStartup::Disabled => None,
};
if initial_enabled {
validate_remote_control_auth(&auth_manager).await?;
if let RemoteControlStartup::Enabled { auth } = startup {
match auth {
RemoteControlAuthStartup::AllowRecoverable => {
validate_remote_control_auth(&auth_manager).await?;
}
RemoteControlAuthStartup::RequireReady => {
validate_remote_control_auth_ready(&auth_manager).await?;
}
}
}
let initial_enabled = matches!(startup, RemoteControlStartup::Enabled { .. });
let (enabled_tx, enabled_rx) = watch::channel(initial_enabled);
let join_handle = tokio::spawn(async move {
RemoteControlWebsocket::new(
@@ -96,5 +117,18 @@ pub(crate) async fn validate_remote_control_auth(
}
}
pub(crate) async fn validate_remote_control_auth_ready(
auth_manager: &Arc<AuthManager>,
) -> io::Result<()> {
match load_remote_control_auth(auth_manager).await {
Ok(_) => Ok(()),
Err(err) if err.kind() == io::ErrorKind::WouldBlock => Err(io::Error::new(
io::ErrorKind::PermissionDenied,
err.to_string(),
)),
Err(err) => Err(err),
}
}
#[cfg(test)]
mod tests;

View File

@@ -99,6 +99,24 @@ fn remote_control_auth_dot_json(account_id: Option<&str>) -> AuthDotJson {
}
}
fn remote_control_api_key_auth_dot_json() -> AuthDotJson {
AuthDotJson {
auth_mode: Some(AuthMode::ApiKey),
openai_api_key: Some("sk-test-api-key".to_string()),
tokens: None,
last_refresh: None,
}
}
fn remote_control_invalid_chatgpt_auth_dot_json() -> AuthDotJson {
AuthDotJson {
auth_mode: Some(AuthMode::Chatgpt),
openai_api_key: None,
tokens: None,
last_refresh: Some(chrono::Utc::now()),
}
}
async fn remote_control_state_runtime(codex_home: &TempDir) -> Arc<StateRuntime> {
StateRuntime::init(codex_home.path().to_path_buf(), "test-provider".to_string())
.await
@@ -129,7 +147,9 @@ async fn remote_control_transport_manages_virtual_clients_and_routes_messages()
transport_event_tx,
shutdown_token.clone(),
/*app_server_client_name_rx*/ None,
/*initial_enabled*/ true,
RemoteControlStartup::Enabled {
auth: RemoteControlAuthStartup::AllowRecoverable,
},
)
.await
.expect("remote control should start");
@@ -394,7 +414,9 @@ async fn remote_control_transport_reconnects_after_disconnect() {
transport_event_tx,
shutdown_token.clone(),
/*app_server_client_name_rx*/ None,
/*initial_enabled*/ true,
RemoteControlStartup::Enabled {
auth: RemoteControlAuthStartup::AllowRecoverable,
},
)
.await
.expect("remote control should start");
@@ -466,7 +488,7 @@ async fn remote_control_start_allows_remote_control_invalid_url_when_disabled()
transport_event_tx,
shutdown_token.clone(),
/*app_server_client_name_rx*/ None,
/*initial_enabled*/ false,
RemoteControlStartup::Disabled,
)
.await
.expect("disabled remote control should not validate the URL at startup");
@@ -478,6 +500,104 @@ async fn remote_control_start_allows_remote_control_invalid_url_when_disabled()
.expect("remote control task should join");
}
#[tokio::test]
async fn remote_control_auth_validation_treats_api_key_auth_as_recoverable_unless_only_transport() {
let codex_home = TempDir::new().expect("temp dir should create");
save_auth(
codex_home.path(),
&remote_control_api_key_auth_dot_json(),
AuthCredentialsStoreMode::File,
)
.expect("api key auth should save");
let auth_manager = AuthManager::shared(
codex_home.path().to_path_buf(),
/*enable_codex_api_key_env*/ false,
AuthCredentialsStoreMode::File,
);
validate_remote_control_auth(&auth_manager)
.await
.expect("api key auth should be recoverable while another transport is available");
let (transport_event_tx, _transport_event_rx) =
mpsc::channel::<TransportEvent>(CHANNEL_CAPACITY);
let shutdown_token = CancellationToken::new();
let err = match start_remote_control(
"http://127.0.0.1:1/backend-api/".to_string(),
/*state_db*/ None,
auth_manager,
transport_event_tx,
shutdown_token.clone(),
/*app_server_client_name_rx*/ None,
RemoteControlStartup::Enabled {
auth: RemoteControlAuthStartup::RequireReady,
},
)
.await
{
Ok((remote_task, _remote_handle)) => {
shutdown_token.cancel();
let _ = remote_task.await;
panic!("api key auth should fail when remote control is the only startup transport");
}
Err(err) => err,
};
assert_eq!(err.kind(), std::io::ErrorKind::PermissionDenied);
assert_eq!(
err.to_string(),
"remote control requires ChatGPT authentication; API key auth is not supported"
);
}
#[tokio::test]
async fn remote_control_auth_validation_treats_invalid_chatgpt_auth_as_recoverable() {
let codex_home = TempDir::new().expect("temp dir should create");
save_auth(
codex_home.path(),
&remote_control_invalid_chatgpt_auth_dot_json(),
AuthCredentialsStoreMode::File,
)
.expect("invalid ChatGPT auth should save");
let auth_manager = AuthManager::shared(
codex_home.path().to_path_buf(),
/*enable_codex_api_key_env*/ false,
AuthCredentialsStoreMode::File,
);
validate_remote_control_auth(&auth_manager)
.await
.expect("invalid ChatGPT auth should be recoverable while another transport is available");
let (transport_event_tx, _transport_event_rx) =
mpsc::channel::<TransportEvent>(CHANNEL_CAPACITY);
let shutdown_token = CancellationToken::new();
let err = match start_remote_control(
"http://127.0.0.1:1/backend-api/".to_string(),
/*state_db*/ None,
auth_manager,
transport_event_tx,
shutdown_token.clone(),
/*app_server_client_name_rx*/ None,
RemoteControlStartup::Enabled {
auth: RemoteControlAuthStartup::RequireReady,
},
)
.await
{
Ok((remote_task, _remote_handle)) => {
shutdown_token.cancel();
let _ = remote_task.await;
panic!(
"invalid ChatGPT auth should fail when remote control is the only startup transport"
);
}
Err(err) => err,
};
assert_eq!(err.kind(), std::io::ErrorKind::PermissionDenied);
assert_eq!(
err.to_string(),
"remote control cannot read ChatGPT authentication token: Token data is not available."
);
}
#[tokio::test]
async fn remote_control_handle_set_enabled_stops_and_restarts_connections() {
let listener = TcpListener::bind("127.0.0.1:0")
@@ -495,7 +615,9 @@ async fn remote_control_handle_set_enabled_stops_and_restarts_connections() {
transport_event_tx,
shutdown_token.clone(),
/*app_server_client_name_rx*/ None,
/*initial_enabled*/ true,
RemoteControlStartup::Enabled {
auth: RemoteControlAuthStartup::AllowRecoverable,
},
)
.await
.expect("remote control should start");
@@ -548,7 +670,9 @@ async fn remote_control_transport_clears_outgoing_buffer_when_backend_acks() {
transport_event_tx,
shutdown_token.clone(),
/*app_server_client_name_rx*/ None,
/*initial_enabled*/ true,
RemoteControlStartup::Enabled {
auth: RemoteControlAuthStartup::AllowRecoverable,
},
)
.await
.expect("remote control should start");
@@ -715,7 +839,9 @@ async fn remote_control_http_mode_enrolls_before_connecting() {
transport_event_tx,
shutdown_token.clone(),
/*app_server_client_name_rx*/ None,
/*initial_enabled*/ true,
RemoteControlStartup::Enabled {
auth: RemoteControlAuthStartup::AllowRecoverable,
},
)
.await
.expect("remote control should start");
@@ -932,7 +1058,9 @@ async fn remote_control_http_mode_reuses_persisted_enrollment_before_reenrolling
transport_event_tx,
shutdown_token.clone(),
/*app_server_client_name_rx*/ None,
/*initial_enabled*/ true,
RemoteControlStartup::Enabled {
auth: RemoteControlAuthStartup::AllowRecoverable,
},
)
.await
.expect("remote control should start");
@@ -999,7 +1127,9 @@ async fn remote_control_stdio_mode_waits_for_client_name_before_connecting() {
transport_event_tx,
shutdown_token.clone(),
Some(app_server_client_name_rx),
/*initial_enabled*/ true,
RemoteControlStartup::Enabled {
auth: RemoteControlAuthStartup::AllowRecoverable,
},
)
.await
.expect("remote control should start");
@@ -1056,7 +1186,9 @@ async fn remote_control_waits_for_account_id_before_enrolling() {
transport_event_tx,
shutdown_token.clone(),
/*app_server_client_name_rx*/ None,
/*initial_enabled*/ true,
RemoteControlStartup::Enabled {
auth: RemoteControlAuthStartup::AllowRecoverable,
},
)
.await
.expect("remote control should start before account id is available");
@@ -1096,6 +1228,77 @@ async fn remote_control_waits_for_account_id_before_enrolling() {
let _ = remote_task.await;
}
#[tokio::test]
async fn remote_control_reloads_api_key_auth_until_chatgpt_auth_available() {
let listener = TcpListener::bind("127.0.0.1:0")
.await
.expect("listener should bind");
let remote_control_url = remote_control_url_for_listener(&listener);
let codex_home = TempDir::new().expect("temp dir should create");
save_auth(
codex_home.path(),
&remote_control_api_key_auth_dot_json(),
AuthCredentialsStoreMode::File,
)
.expect("api key auth should save");
let state_db = remote_control_state_runtime(&codex_home).await;
let auth_manager = AuthManager::shared(
codex_home.path().to_path_buf(),
/*enable_codex_api_key_env*/ false,
AuthCredentialsStoreMode::File,
);
let (transport_event_tx, _transport_event_rx) =
mpsc::channel::<TransportEvent>(CHANNEL_CAPACITY);
let shutdown_token = CancellationToken::new();
let (remote_task, _remote_handle) = start_remote_control(
remote_control_url,
Some(state_db),
auth_manager,
transport_event_tx,
shutdown_token.clone(),
/*app_server_client_name_rx*/ None,
RemoteControlStartup::Enabled {
auth: RemoteControlAuthStartup::AllowRecoverable,
},
)
.await
.expect("remote control should start while API key auth is recoverable");
timeout(Duration::from_millis(100), listener.accept())
.await
.expect_err("remote control should wait until ChatGPT auth is available");
save_auth(
codex_home.path(),
&remote_control_auth_dot_json(Some("account_id")),
AuthCredentialsStoreMode::File,
)
.expect("ChatGPT auth should save");
let enroll_request = accept_http_request(&listener).await;
assert_eq!(
enroll_request.request_line,
"POST /backend-api/wham/remote/control/server/enroll HTTP/1.1"
);
respond_with_json(
enroll_request.stream,
json!({ "server_id": "srv_e_ready", "environment_id": "env_ready" }),
)
.await;
let (handshake_request, _websocket) = accept_remote_control_backend_connection(&listener).await;
assert_eq!(
handshake_request
.headers
.get("x-codex-server-id")
.map(String::as_str),
Some("srv_e_ready")
);
shutdown_token.cancel();
let _ = remote_task.await;
}
#[tokio::test]
async fn remote_control_http_mode_clears_stale_persisted_enrollment_after_404() {
let listener = TcpListener::bind("127.0.0.1:0")
@@ -1139,7 +1342,9 @@ async fn remote_control_http_mode_clears_stale_persisted_enrollment_after_404()
transport_event_tx,
shutdown_token.clone(),
/*app_server_client_name_rx*/ None,
/*initial_enabled*/ true,
RemoteControlStartup::Enabled {
auth: RemoteControlAuthStartup::AllowRecoverable,
},
)
.await
.expect("remote control should start");

View File

@@ -667,11 +667,10 @@ pub(crate) async fn load_remote_control_auth(
auth_manager: &Arc<AuthManager>,
) -> io::Result<RemoteControlConnectionAuth> {
let mut reloaded = false;
let auth = loop {
loop {
let Some(auth) = auth_manager.auth().await else {
if reloaded {
return Err(io::Error::new(
ErrorKind::PermissionDenied,
return Err(recoverable_remote_control_auth_error(
"remote control requires ChatGPT authentication",
));
}
@@ -680,32 +679,48 @@ pub(crate) async fn load_remote_control_auth(
continue;
};
if !auth.is_chatgpt_auth() {
break auth;
if reloaded {
return Err(recoverable_remote_control_auth_error(
"remote control requires ChatGPT authentication; API key auth is not supported",
));
}
auth_manager.reload();
reloaded = true;
continue;
}
if auth.get_account_id().is_none() && !reloaded {
auth_manager.reload();
reloaded = true;
continue;
}
break auth;
};
if !auth.is_chatgpt_auth() {
return Err(io::Error::new(
ErrorKind::PermissionDenied,
"remote control requires ChatGPT authentication; API key auth is not supported",
));
}
Ok(RemoteControlConnectionAuth {
bearer_token: auth.get_token().map_err(io::Error::other)?,
account_id: auth.get_account_id().ok_or_else(|| {
io::Error::new(
ErrorKind::WouldBlock,
let bearer_token = match auth.get_token() {
Ok(bearer_token) => bearer_token,
Err(err) => {
if reloaded {
return Err(recoverable_remote_control_auth_error(format!(
"remote control cannot read ChatGPT authentication token: {err}"
)));
}
auth_manager.reload();
reloaded = true;
continue;
}
};
let account_id = auth.get_account_id().ok_or_else(|| {
recoverable_remote_control_auth_error(
"remote control enrollment is waiting for a ChatGPT account id",
)
})?,
})
})?;
return Ok(RemoteControlConnectionAuth {
bearer_token,
account_id,
});
}
}
fn recoverable_remote_control_auth_error(message: impl Into<String>) -> io::Error {
io::Error::new(ErrorKind::WouldBlock, message.into())
}
pub(super) async fn connect_remote_control_websocket(
@@ -754,12 +769,15 @@ pub(super) async fn connect_remote_control_websocket(
let new_enrollment = match enroll_remote_control_server(remote_control_target, &auth).await
{
Ok(new_enrollment) => new_enrollment,
Err(err)
if err.kind() == ErrorKind::PermissionDenied
&& recover_remote_control_auth(auth_recovery).await =>
{
return Err(io::Error::other(format!(
"{err}; retrying after auth recovery"
Err(err) if err.kind() == ErrorKind::PermissionDenied => {
if recover_remote_control_auth(auth_recovery).await {
return Err(io::Error::other(format!(
"{err}; retrying after auth recovery"
)));
}
reload_remote_control_auth(auth_manager, auth_recovery);
return Err(recoverable_remote_control_auth_error(format!(
"{err}; waiting for valid ChatGPT authentication"
)));
}
Err(err) => return Err(err),
@@ -825,12 +843,19 @@ pub(super) async fn connect_remote_control_websocket(
tungstenite::Error::Http(response)
if matches!(response.status().as_u16(), 401 | 403) =>
{
let message = format!(
"remote control websocket auth failed with HTTP {}",
response.status()
);
if recover_remote_control_auth(auth_recovery).await {
return Err(io::Error::other(format!(
"remote control websocket auth failed with HTTP {}; retrying after auth recovery",
response.status()
"{message}; retrying after auth recovery"
)));
}
reload_remote_control_auth(auth_manager, auth_recovery);
return Err(recoverable_remote_control_auth_error(format!(
"{message}; waiting for valid ChatGPT authentication"
)));
}
_ => {}
}
@@ -866,6 +891,14 @@ async fn recover_remote_control_auth(auth_recovery: &mut UnauthorizedRecovery) -
}
}
fn reload_remote_control_auth(
auth_manager: &Arc<AuthManager>,
auth_recovery: &mut UnauthorizedRecovery,
) {
auth_manager.reload();
*auth_recovery = auth_manager.unauthorized_recovery();
}
fn format_remote_control_websocket_connect_error(
websocket_url: &str,
err: &tungstenite::Error,
@@ -1171,6 +1204,57 @@ mod tests {
);
}
#[tokio::test]
async fn connect_remote_control_websocket_treats_exhausted_unauthorized_as_recoverable() {
let listener = TcpListener::bind("127.0.0.1:0")
.await
.expect("listener should bind");
let remote_control_url = remote_control_url_for_listener(&listener);
let remote_control_target =
normalize_remote_control_url(&remote_control_url).expect("target should parse");
let codex_home = TempDir::new().expect("temp dir should create");
let state_db = remote_control_state_runtime(&codex_home).await;
let auth_manager = remote_control_auth_manager();
let mut auth_recovery = auth_manager.unauthorized_recovery();
let mut enrollment = Some(RemoteControlEnrollment {
account_id: "account_id".to_string(),
environment_id: "env_test".to_string(),
server_id: "srv_e_test".to_string(),
server_name: "test-server".to_string(),
});
let server_task = tokio::spawn(async move {
let (stream, request_line) = accept_http_request(&listener).await;
assert_eq!(
request_line,
"GET /backend-api/wham/remote/control/server HTTP/1.1"
);
respond_with_status_and_headers(stream, "401 Unauthorized", &[], "unauthorized").await;
});
let err = connect_remote_control_websocket(
&remote_control_target,
Some(state_db.as_ref()),
&auth_manager,
&mut auth_recovery,
&mut enrollment,
/*subscribe_cursor*/ None,
/*app_server_client_name*/ None,
)
.await
.expect_err("unauthorized response should remain recoverable after recovery is exhausted");
server_task.await.expect("server task should succeed");
assert_eq!(err.kind(), ErrorKind::WouldBlock);
assert_eq!(
err.to_string(),
"remote control websocket auth failed with HTTP 401 Unauthorized; waiting for valid ChatGPT authentication"
);
assert!(
auth_manager.auth().await.is_none(),
"exhausted unauthorized recovery should reload auth so future runtime changes can be observed"
);
}
#[tokio::test]
async fn run_remote_control_websocket_loop_shutdown_cancels_reconnect_backoff() {
let listener = TcpListener::bind("127.0.0.1:0")