mirror of
https://github.com/openai/codex.git
synced 2026-05-24 21:14:51 +00:00
## Summary This change lets `forced_chatgpt_workspace_id` accept multiple workspace IDs instead of a single value. It keeps the existing config key name, adds backward-compatible parsing for a single string in `config.toml`, and normalizes the setting into an allowed workspace list across login enforcement, app-server config surfaces, and local ChatGPT auth helpers. ## Why Workspace-restricted deployments may need to allow more than one ChatGPT workspace without dropping the guardrail entirely. ## Server-side impact Codex's local server and app-server protocol needed changes because they previously assumed a single workspace ID. The local login flow now matches the auth backend interface by sending the allowed workspace list as a single comma-separated `allowed_workspace_id` query parameter. ## Validation This was tested with: - A single workspace config - With multi-workspace configs - With multiple workspaces in the config - The user only being a part of a subset of them All were successful. Automated coverage: - `cargo test -p codex-login` - `cargo test -p codex-app-server-protocol` - `cargo test -p codex-tui local_chatgpt_auth` - `cargo test --locked -p codex-app-server login_account_chatgpt_includes_forced_workspace_allowlist_query_param`
955 lines
32 KiB
Rust
955 lines
32 KiB
Rust
use super::*;
|
|
|
|
// Duration before a browser ChatGPT login attempt is abandoned.
|
|
const LOGIN_CHATGPT_TIMEOUT: Duration = Duration::from_secs(10 * 60);
|
|
// The override is intentionally available only in debug builds, matching the login path below.
|
|
#[cfg(debug_assertions)]
|
|
const LOGIN_ISSUER_OVERRIDE_ENV_VAR: &str = "CODEX_APP_SERVER_LOGIN_ISSUER";
|
|
|
|
enum ActiveLogin {
|
|
Browser {
|
|
shutdown_handle: ShutdownHandle,
|
|
login_id: Uuid,
|
|
},
|
|
DeviceCode {
|
|
cancel: CancellationToken,
|
|
login_id: Uuid,
|
|
},
|
|
}
|
|
|
|
impl ActiveLogin {
|
|
fn login_id(&self) -> Uuid {
|
|
match self {
|
|
ActiveLogin::Browser { login_id, .. } | ActiveLogin::DeviceCode { login_id, .. } => {
|
|
*login_id
|
|
}
|
|
}
|
|
}
|
|
|
|
fn cancel(&self) {
|
|
match self {
|
|
ActiveLogin::Browser {
|
|
shutdown_handle, ..
|
|
} => shutdown_handle.shutdown(),
|
|
ActiveLogin::DeviceCode { cancel, .. } => cancel.cancel(),
|
|
}
|
|
}
|
|
}
|
|
|
|
#[derive(Clone, Copy, Debug)]
|
|
enum CancelLoginError {
|
|
NotFound,
|
|
}
|
|
|
|
enum RefreshTokenRequestOutcome {
|
|
NotAttemptedOrSucceeded,
|
|
FailedTransiently,
|
|
FailedPermanently,
|
|
}
|
|
|
|
impl Drop for ActiveLogin {
|
|
fn drop(&mut self) {
|
|
self.cancel();
|
|
}
|
|
}
|
|
|
|
#[derive(Clone)]
|
|
pub(crate) struct AccountRequestProcessor {
|
|
auth_manager: Arc<AuthManager>,
|
|
thread_manager: Arc<ThreadManager>,
|
|
outgoing: Arc<OutgoingMessageSender>,
|
|
config: Arc<Config>,
|
|
config_manager: ConfigManager,
|
|
active_login: Arc<Mutex<Option<ActiveLogin>>>,
|
|
}
|
|
|
|
impl AccountRequestProcessor {
|
|
pub(crate) fn new(
|
|
auth_manager: Arc<AuthManager>,
|
|
thread_manager: Arc<ThreadManager>,
|
|
outgoing: Arc<OutgoingMessageSender>,
|
|
config: Arc<Config>,
|
|
config_manager: ConfigManager,
|
|
) -> Self {
|
|
Self {
|
|
auth_manager,
|
|
thread_manager,
|
|
outgoing,
|
|
config,
|
|
config_manager,
|
|
active_login: Arc::new(Mutex::new(None)),
|
|
}
|
|
}
|
|
|
|
pub(crate) async fn login_account(
|
|
&self,
|
|
request_id: ConnectionRequestId,
|
|
params: LoginAccountParams,
|
|
) -> Result<Option<ClientResponsePayload>, JSONRPCErrorError> {
|
|
self.login_v2(request_id, params).await.map(|()| None)
|
|
}
|
|
|
|
pub(crate) async fn logout_account(
|
|
&self,
|
|
request_id: ConnectionRequestId,
|
|
) -> Result<Option<ClientResponsePayload>, JSONRPCErrorError> {
|
|
self.logout_v2(request_id).await.map(|()| None)
|
|
}
|
|
|
|
pub(crate) async fn cancel_login_account(
|
|
&self,
|
|
params: CancelLoginAccountParams,
|
|
) -> Result<Option<ClientResponsePayload>, JSONRPCErrorError> {
|
|
self.cancel_login_response(params)
|
|
.await
|
|
.map(|response| Some(response.into()))
|
|
}
|
|
|
|
pub(crate) async fn get_account(
|
|
&self,
|
|
params: GetAccountParams,
|
|
) -> Result<Option<ClientResponsePayload>, JSONRPCErrorError> {
|
|
self.get_account_response(params)
|
|
.await
|
|
.map(|response| Some(response.into()))
|
|
}
|
|
|
|
pub(crate) async fn get_auth_status(
|
|
&self,
|
|
params: GetAuthStatusParams,
|
|
) -> Result<Option<ClientResponsePayload>, JSONRPCErrorError> {
|
|
self.get_auth_status_response(params)
|
|
.await
|
|
.map(|response| Some(response.into()))
|
|
}
|
|
|
|
pub(crate) async fn get_account_rate_limits(
|
|
&self,
|
|
) -> Result<Option<ClientResponsePayload>, JSONRPCErrorError> {
|
|
self.get_account_rate_limits_response()
|
|
.await
|
|
.map(|response| Some(response.into()))
|
|
}
|
|
|
|
pub(crate) async fn send_add_credits_nudge_email(
|
|
&self,
|
|
params: SendAddCreditsNudgeEmailParams,
|
|
) -> Result<Option<ClientResponsePayload>, JSONRPCErrorError> {
|
|
self.send_add_credits_nudge_email_response(params)
|
|
.await
|
|
.map(|response| Some(response.into()))
|
|
}
|
|
|
|
pub(crate) async fn cancel_active_login(&self) {
|
|
let mut guard = self.active_login.lock().await;
|
|
if let Some(active_login) = guard.take() {
|
|
drop(active_login);
|
|
}
|
|
}
|
|
|
|
pub(crate) fn clear_external_auth(&self) {
|
|
self.auth_manager.clear_external_auth();
|
|
}
|
|
|
|
fn current_account_updated_notification(&self) -> AccountUpdatedNotification {
|
|
let auth = self.auth_manager.auth_cached();
|
|
AccountUpdatedNotification {
|
|
auth_mode: auth.as_ref().map(CodexAuth::api_auth_mode),
|
|
plan_type: auth.as_ref().and_then(CodexAuth::account_plan_type),
|
|
}
|
|
}
|
|
|
|
async fn maybe_refresh_remote_installed_plugins_cache_for_current_config(
|
|
config_manager: &ConfigManager,
|
|
thread_manager: &Arc<ThreadManager>,
|
|
auth: Option<CodexAuth>,
|
|
) {
|
|
match config_manager
|
|
.load_latest_config(/*fallback_cwd*/ None)
|
|
.await
|
|
{
|
|
Ok(config) => {
|
|
let refresh_thread_manager = Arc::clone(thread_manager);
|
|
let refresh_config_manager = config_manager.clone();
|
|
thread_manager
|
|
.plugins_manager()
|
|
.maybe_start_remote_installed_plugins_cache_refresh(
|
|
&config.plugins_config_input(),
|
|
auth,
|
|
Some(Arc::new(move || {
|
|
Self::spawn_effective_plugins_changed_task(
|
|
Arc::clone(&refresh_thread_manager),
|
|
refresh_config_manager.clone(),
|
|
);
|
|
})),
|
|
);
|
|
}
|
|
Err(err) => {
|
|
warn!(
|
|
"failed to reload config after account changed, skipping remote installed plugins cache refresh: {err}"
|
|
);
|
|
}
|
|
}
|
|
}
|
|
|
|
fn spawn_effective_plugins_changed_task(
|
|
thread_manager: Arc<ThreadManager>,
|
|
config_manager: ConfigManager,
|
|
) {
|
|
tokio::spawn(async move {
|
|
thread_manager.plugins_manager().clear_cache();
|
|
thread_manager.skills_manager().clear_cache();
|
|
if thread_manager.list_thread_ids().await.is_empty() {
|
|
return;
|
|
}
|
|
crate::mcp_refresh::queue_best_effort_refresh(&thread_manager, &config_manager).await;
|
|
});
|
|
}
|
|
|
|
async fn login_v2(
|
|
&self,
|
|
request_id: ConnectionRequestId,
|
|
params: LoginAccountParams,
|
|
) -> Result<(), JSONRPCErrorError> {
|
|
match params {
|
|
LoginAccountParams::ApiKey { api_key } => {
|
|
self.login_api_key_v2(request_id, LoginApiKeyParams { api_key })
|
|
.await;
|
|
}
|
|
LoginAccountParams::Chatgpt {
|
|
codex_streamlined_login,
|
|
} => {
|
|
self.login_chatgpt_v2(request_id, codex_streamlined_login)
|
|
.await;
|
|
}
|
|
LoginAccountParams::ChatgptDeviceCode => {
|
|
self.login_chatgpt_device_code_v2(request_id).await;
|
|
}
|
|
LoginAccountParams::ChatgptAuthTokens {
|
|
access_token,
|
|
chatgpt_account_id,
|
|
chatgpt_plan_type,
|
|
} => {
|
|
self.login_chatgpt_auth_tokens(
|
|
request_id,
|
|
access_token,
|
|
chatgpt_account_id,
|
|
chatgpt_plan_type,
|
|
)
|
|
.await;
|
|
}
|
|
}
|
|
Ok(())
|
|
}
|
|
|
|
fn external_auth_active_error(&self) -> JSONRPCErrorError {
|
|
invalid_request(
|
|
"External auth is active. Use account/login/start (chatgptAuthTokens) to update it or account/logout to clear it.",
|
|
)
|
|
}
|
|
|
|
async fn login_api_key_common(
|
|
&self,
|
|
params: &LoginApiKeyParams,
|
|
) -> std::result::Result<(), JSONRPCErrorError> {
|
|
if self.auth_manager.is_external_chatgpt_auth_active() {
|
|
return Err(self.external_auth_active_error());
|
|
}
|
|
|
|
if matches!(
|
|
self.config.forced_login_method,
|
|
Some(ForcedLoginMethod::Chatgpt)
|
|
) {
|
|
return Err(invalid_request(
|
|
"API key login is disabled. Use ChatGPT login instead.",
|
|
));
|
|
}
|
|
|
|
// Cancel any active login attempt.
|
|
{
|
|
let mut guard = self.active_login.lock().await;
|
|
if let Some(active) = guard.take() {
|
|
drop(active);
|
|
}
|
|
}
|
|
|
|
match login_with_api_key(
|
|
&self.config.codex_home,
|
|
¶ms.api_key,
|
|
self.config.cli_auth_credentials_store_mode,
|
|
) {
|
|
Ok(()) => {
|
|
self.auth_manager.reload().await;
|
|
Ok(())
|
|
}
|
|
Err(err) => Err(internal_error(format!("failed to save api key: {err}"))),
|
|
}
|
|
}
|
|
|
|
async fn login_api_key_v2(&self, request_id: ConnectionRequestId, params: LoginApiKeyParams) {
|
|
let result = self
|
|
.login_api_key_common(¶ms)
|
|
.await
|
|
.map(|()| LoginAccountResponse::ApiKey {});
|
|
let logged_in = result.is_ok();
|
|
self.outgoing.send_result(request_id, result).await;
|
|
|
|
if logged_in {
|
|
self.send_login_success_notifications(/*login_id*/ None)
|
|
.await;
|
|
}
|
|
}
|
|
|
|
// Build options for a ChatGPT login attempt; performs validation.
|
|
async fn login_chatgpt_common(
|
|
&self,
|
|
codex_streamlined_login: bool,
|
|
) -> std::result::Result<LoginServerOptions, JSONRPCErrorError> {
|
|
let config = self.config.as_ref();
|
|
|
|
if self.auth_manager.is_external_chatgpt_auth_active() {
|
|
return Err(self.external_auth_active_error());
|
|
}
|
|
|
|
if matches!(config.forced_login_method, Some(ForcedLoginMethod::Api)) {
|
|
return Err(invalid_request(
|
|
"ChatGPT login is disabled. Use API key login instead.",
|
|
));
|
|
}
|
|
|
|
let opts = LoginServerOptions {
|
|
open_browser: false,
|
|
codex_streamlined_login,
|
|
..LoginServerOptions::new(
|
|
config.codex_home.to_path_buf(),
|
|
CLIENT_ID.to_string(),
|
|
config.forced_chatgpt_workspace_id.clone(),
|
|
config.cli_auth_credentials_store_mode,
|
|
)
|
|
};
|
|
#[cfg(debug_assertions)]
|
|
let opts = {
|
|
let mut opts = opts;
|
|
if let Ok(issuer) = std::env::var(LOGIN_ISSUER_OVERRIDE_ENV_VAR)
|
|
&& !issuer.trim().is_empty()
|
|
{
|
|
opts.issuer = issuer;
|
|
}
|
|
opts
|
|
};
|
|
|
|
Ok(opts)
|
|
}
|
|
|
|
fn login_chatgpt_device_code_start_error(err: IoError) -> JSONRPCErrorError {
|
|
let is_not_found = err.kind() == std::io::ErrorKind::NotFound;
|
|
if is_not_found {
|
|
invalid_request(err.to_string())
|
|
} else {
|
|
internal_error(format!("failed to request device code: {err}"))
|
|
}
|
|
}
|
|
|
|
async fn login_chatgpt_v2(
|
|
&self,
|
|
request_id: ConnectionRequestId,
|
|
codex_streamlined_login: bool,
|
|
) {
|
|
let result = self.login_chatgpt_response(codex_streamlined_login).await;
|
|
self.outgoing.send_result(request_id, result).await;
|
|
}
|
|
|
|
async fn login_chatgpt_response(
|
|
&self,
|
|
codex_streamlined_login: bool,
|
|
) -> Result<LoginAccountResponse, JSONRPCErrorError> {
|
|
let opts = self.login_chatgpt_common(codex_streamlined_login).await?;
|
|
let server = run_login_server(opts)
|
|
.map_err(|err| internal_error(format!("failed to start login server: {err}")))?;
|
|
let login_id = Uuid::new_v4();
|
|
let shutdown_handle = server.cancel_handle();
|
|
|
|
// Replace active login if present.
|
|
{
|
|
let mut guard = self.active_login.lock().await;
|
|
if let Some(existing) = guard.take() {
|
|
drop(existing);
|
|
}
|
|
*guard = Some(ActiveLogin::Browser {
|
|
shutdown_handle: shutdown_handle.clone(),
|
|
login_id,
|
|
});
|
|
}
|
|
|
|
let outgoing_clone = self.outgoing.clone();
|
|
let config_manager = self.config_manager.clone();
|
|
let thread_manager = Arc::clone(&self.thread_manager);
|
|
let chatgpt_base_url = self.config.chatgpt_base_url.clone();
|
|
let active_login = self.active_login.clone();
|
|
let auth_url = server.auth_url.clone();
|
|
tokio::spawn(async move {
|
|
let (success, error_msg) = match tokio::time::timeout(
|
|
LOGIN_CHATGPT_TIMEOUT,
|
|
server.block_until_done(),
|
|
)
|
|
.await
|
|
{
|
|
Ok(Ok(())) => (true, None),
|
|
Ok(Err(err)) => (false, Some(format!("Login server error: {err}"))),
|
|
Err(_elapsed) => {
|
|
shutdown_handle.shutdown();
|
|
(false, Some("Login timed out".to_string()))
|
|
}
|
|
};
|
|
|
|
Self::send_chatgpt_login_completion_notifications(
|
|
&outgoing_clone,
|
|
config_manager,
|
|
thread_manager,
|
|
chatgpt_base_url,
|
|
login_id,
|
|
success,
|
|
error_msg,
|
|
)
|
|
.await;
|
|
|
|
// Clear the active login if it matches this attempt. It may have been replaced or cancelled.
|
|
let mut guard = active_login.lock().await;
|
|
if guard.as_ref().map(ActiveLogin::login_id) == Some(login_id) {
|
|
*guard = None;
|
|
}
|
|
});
|
|
|
|
Ok(LoginAccountResponse::Chatgpt {
|
|
login_id: login_id.to_string(),
|
|
auth_url,
|
|
})
|
|
}
|
|
|
|
async fn login_chatgpt_device_code_v2(&self, request_id: ConnectionRequestId) {
|
|
let result = self.login_chatgpt_device_code_response().await;
|
|
self.outgoing.send_result(request_id, result).await;
|
|
}
|
|
|
|
async fn login_chatgpt_device_code_response(
|
|
&self,
|
|
) -> Result<LoginAccountResponse, JSONRPCErrorError> {
|
|
let opts = self
|
|
.login_chatgpt_common(/*codex_streamlined_login*/ false)
|
|
.await?;
|
|
let device_code = request_device_code(&opts)
|
|
.await
|
|
.map_err(Self::login_chatgpt_device_code_start_error)?;
|
|
let login_id = Uuid::new_v4();
|
|
let cancel = CancellationToken::new();
|
|
|
|
{
|
|
let mut guard = self.active_login.lock().await;
|
|
if let Some(existing) = guard.take() {
|
|
drop(existing);
|
|
}
|
|
*guard = Some(ActiveLogin::DeviceCode {
|
|
cancel: cancel.clone(),
|
|
login_id,
|
|
});
|
|
}
|
|
|
|
let verification_url = device_code.verification_url.clone();
|
|
let user_code = device_code.user_code.clone();
|
|
|
|
let outgoing_clone = self.outgoing.clone();
|
|
let config_manager = self.config_manager.clone();
|
|
let thread_manager = Arc::clone(&self.thread_manager);
|
|
let chatgpt_base_url = self.config.chatgpt_base_url.clone();
|
|
let active_login = self.active_login.clone();
|
|
tokio::spawn(async move {
|
|
let (success, error_msg) = tokio::select! {
|
|
_ = cancel.cancelled() => {
|
|
(false, Some("Login was not completed".to_string()))
|
|
}
|
|
r = complete_device_code_login(opts, device_code) => {
|
|
match r {
|
|
Ok(()) => (true, None),
|
|
Err(err) => (false, Some(err.to_string())),
|
|
}
|
|
}
|
|
};
|
|
|
|
Self::send_chatgpt_login_completion_notifications(
|
|
&outgoing_clone,
|
|
config_manager,
|
|
thread_manager,
|
|
chatgpt_base_url,
|
|
login_id,
|
|
success,
|
|
error_msg,
|
|
)
|
|
.await;
|
|
|
|
let mut guard = active_login.lock().await;
|
|
if guard.as_ref().map(ActiveLogin::login_id) == Some(login_id) {
|
|
*guard = None;
|
|
}
|
|
});
|
|
|
|
Ok(LoginAccountResponse::ChatgptDeviceCode {
|
|
login_id: login_id.to_string(),
|
|
verification_url,
|
|
user_code,
|
|
})
|
|
}
|
|
|
|
async fn cancel_login_chatgpt_common(
|
|
&self,
|
|
login_id: Uuid,
|
|
) -> std::result::Result<(), CancelLoginError> {
|
|
let mut guard = self.active_login.lock().await;
|
|
if guard.as_ref().map(ActiveLogin::login_id) == Some(login_id) {
|
|
if let Some(active) = guard.take() {
|
|
drop(active);
|
|
}
|
|
Ok(())
|
|
} else {
|
|
Err(CancelLoginError::NotFound)
|
|
}
|
|
}
|
|
|
|
async fn cancel_login_response(
|
|
&self,
|
|
params: CancelLoginAccountParams,
|
|
) -> Result<CancelLoginAccountResponse, JSONRPCErrorError> {
|
|
let login_id = params.login_id;
|
|
let uuid = Uuid::parse_str(&login_id)
|
|
.map_err(|_| invalid_request(format!("invalid login id: {login_id}")))?;
|
|
let status = match self.cancel_login_chatgpt_common(uuid).await {
|
|
Ok(()) => CancelLoginAccountStatus::Canceled,
|
|
Err(CancelLoginError::NotFound) => CancelLoginAccountStatus::NotFound,
|
|
};
|
|
Ok(CancelLoginAccountResponse { status })
|
|
}
|
|
|
|
async fn login_chatgpt_auth_tokens(
|
|
&self,
|
|
request_id: ConnectionRequestId,
|
|
access_token: String,
|
|
chatgpt_account_id: String,
|
|
chatgpt_plan_type: Option<String>,
|
|
) {
|
|
let result = self
|
|
.login_chatgpt_auth_tokens_response(access_token, chatgpt_account_id, chatgpt_plan_type)
|
|
.await;
|
|
let logged_in = result.is_ok();
|
|
self.outgoing.send_result(request_id, result).await;
|
|
|
|
if logged_in {
|
|
self.send_login_success_notifications(/*login_id*/ None)
|
|
.await;
|
|
}
|
|
}
|
|
|
|
async fn login_chatgpt_auth_tokens_response(
|
|
&self,
|
|
access_token: String,
|
|
chatgpt_account_id: String,
|
|
chatgpt_plan_type: Option<String>,
|
|
) -> Result<LoginAccountResponse, JSONRPCErrorError> {
|
|
if matches!(
|
|
self.config.forced_login_method,
|
|
Some(ForcedLoginMethod::Api)
|
|
) {
|
|
return Err(invalid_request(
|
|
"External ChatGPT auth is disabled. Use API key login instead.",
|
|
));
|
|
}
|
|
|
|
// Cancel any active login attempt to avoid persisting managed auth state.
|
|
{
|
|
let mut guard = self.active_login.lock().await;
|
|
if let Some(active) = guard.take() {
|
|
drop(active);
|
|
}
|
|
}
|
|
|
|
if let Some(expected_workspaces) = self.config.forced_chatgpt_workspace_id.as_deref()
|
|
&& !expected_workspaces.contains(&chatgpt_account_id)
|
|
{
|
|
return Err(invalid_request(format!(
|
|
"External auth must use one of workspace(s) {expected_workspaces:?}, but received {chatgpt_account_id:?}.",
|
|
)));
|
|
}
|
|
|
|
login_with_chatgpt_auth_tokens(
|
|
&self.config.codex_home,
|
|
&access_token,
|
|
&chatgpt_account_id,
|
|
chatgpt_plan_type.as_deref(),
|
|
)
|
|
.map_err(|err| internal_error(format!("failed to set external auth: {err}")))?;
|
|
self.auth_manager.reload().await;
|
|
self.config_manager.replace_cloud_requirements_loader(
|
|
self.auth_manager.clone(),
|
|
self.config.chatgpt_base_url.clone(),
|
|
);
|
|
self.config_manager
|
|
.sync_default_client_residency_requirement()
|
|
.await;
|
|
|
|
Ok(LoginAccountResponse::ChatgptAuthTokens {})
|
|
}
|
|
|
|
async fn send_login_success_notifications(&self, login_id: Option<Uuid>) {
|
|
Self::maybe_refresh_remote_installed_plugins_cache_for_current_config(
|
|
&self.config_manager,
|
|
&self.thread_manager,
|
|
self.auth_manager.auth_cached(),
|
|
)
|
|
.await;
|
|
|
|
let payload_login_completed = AccountLoginCompletedNotification {
|
|
login_id: login_id.map(|id| id.to_string()),
|
|
success: true,
|
|
error: None,
|
|
};
|
|
self.outgoing
|
|
.send_server_notification(ServerNotification::AccountLoginCompleted(
|
|
payload_login_completed,
|
|
))
|
|
.await;
|
|
|
|
self.outgoing
|
|
.send_server_notification(ServerNotification::AccountUpdated(
|
|
self.current_account_updated_notification(),
|
|
))
|
|
.await;
|
|
}
|
|
|
|
async fn send_chatgpt_login_completion_notifications(
|
|
outgoing: &OutgoingMessageSender,
|
|
config_manager: ConfigManager,
|
|
thread_manager: Arc<ThreadManager>,
|
|
chatgpt_base_url: String,
|
|
login_id: Uuid,
|
|
success: bool,
|
|
error_msg: Option<String>,
|
|
) {
|
|
let payload_v2 = AccountLoginCompletedNotification {
|
|
login_id: Some(login_id.to_string()),
|
|
success,
|
|
error: error_msg,
|
|
};
|
|
outgoing
|
|
.send_server_notification(ServerNotification::AccountLoginCompleted(payload_v2))
|
|
.await;
|
|
|
|
if success {
|
|
let auth_manager = thread_manager.auth_manager();
|
|
auth_manager.reload().await;
|
|
config_manager
|
|
.replace_cloud_requirements_loader(auth_manager.clone(), chatgpt_base_url);
|
|
config_manager
|
|
.sync_default_client_residency_requirement()
|
|
.await;
|
|
|
|
let auth = auth_manager.auth_cached();
|
|
Self::maybe_refresh_remote_installed_plugins_cache_for_current_config(
|
|
&config_manager,
|
|
&thread_manager,
|
|
auth.clone(),
|
|
)
|
|
.await;
|
|
let payload_v2 = AccountUpdatedNotification {
|
|
auth_mode: auth.as_ref().map(CodexAuth::api_auth_mode),
|
|
plan_type: auth.as_ref().and_then(CodexAuth::account_plan_type),
|
|
};
|
|
outgoing
|
|
.send_server_notification(ServerNotification::AccountUpdated(payload_v2))
|
|
.await;
|
|
}
|
|
}
|
|
|
|
async fn logout_common(&self) -> std::result::Result<Option<AuthMode>, JSONRPCErrorError> {
|
|
// Cancel any active login attempt.
|
|
{
|
|
let mut guard = self.active_login.lock().await;
|
|
if let Some(active) = guard.take() {
|
|
drop(active);
|
|
}
|
|
}
|
|
|
|
match self.auth_manager.logout_with_revoke().await {
|
|
Ok(_) => {}
|
|
Err(err) => {
|
|
return Err(internal_error(format!("logout failed: {err}")));
|
|
}
|
|
}
|
|
|
|
Self::maybe_refresh_remote_installed_plugins_cache_for_current_config(
|
|
&self.config_manager,
|
|
&self.thread_manager,
|
|
self.auth_manager.auth_cached(),
|
|
)
|
|
.await;
|
|
|
|
// Reflect the current auth method after logout (likely None).
|
|
Ok(self
|
|
.auth_manager
|
|
.auth_cached()
|
|
.as_ref()
|
|
.map(CodexAuth::api_auth_mode))
|
|
}
|
|
|
|
async fn logout_v2(&self, request_id: ConnectionRequestId) -> Result<(), JSONRPCErrorError> {
|
|
let result = self.logout_common().await;
|
|
let account_updated =
|
|
result
|
|
.as_ref()
|
|
.ok()
|
|
.cloned()
|
|
.map(|auth_mode| AccountUpdatedNotification {
|
|
auth_mode,
|
|
plan_type: None,
|
|
});
|
|
self.outgoing
|
|
.send_result(request_id, result.map(|_| LogoutAccountResponse {}))
|
|
.await;
|
|
|
|
if let Some(payload) = account_updated {
|
|
self.outgoing
|
|
.send_server_notification(ServerNotification::AccountUpdated(payload))
|
|
.await;
|
|
}
|
|
Ok(())
|
|
}
|
|
|
|
async fn refresh_token_if_requested(&self, do_refresh: bool) -> RefreshTokenRequestOutcome {
|
|
if self.auth_manager.is_external_chatgpt_auth_active() {
|
|
return RefreshTokenRequestOutcome::NotAttemptedOrSucceeded;
|
|
}
|
|
if do_refresh && let Err(err) = self.auth_manager.refresh_token().await {
|
|
let failed_reason = err.failed_reason();
|
|
if failed_reason.is_none() {
|
|
tracing::warn!("failed to refresh token while getting account: {err}");
|
|
return RefreshTokenRequestOutcome::FailedTransiently;
|
|
}
|
|
return RefreshTokenRequestOutcome::FailedPermanently;
|
|
}
|
|
RefreshTokenRequestOutcome::NotAttemptedOrSucceeded
|
|
}
|
|
|
|
async fn get_auth_status_response(
|
|
&self,
|
|
params: GetAuthStatusParams,
|
|
) -> Result<GetAuthStatusResponse, JSONRPCErrorError> {
|
|
let include_token = params.include_token.unwrap_or(false);
|
|
let do_refresh = params.refresh_token.unwrap_or(false);
|
|
|
|
self.refresh_token_if_requested(do_refresh).await;
|
|
|
|
// Determine whether auth is required based on the active model provider.
|
|
// If a custom provider is configured with `requires_openai_auth == false`,
|
|
// then no auth step is required; otherwise, default to requiring auth.
|
|
let requires_openai_auth = self.config.model_provider.requires_openai_auth;
|
|
|
|
let response = if !requires_openai_auth {
|
|
GetAuthStatusResponse {
|
|
auth_method: None,
|
|
auth_token: None,
|
|
requires_openai_auth: Some(false),
|
|
}
|
|
} else {
|
|
let auth = if do_refresh {
|
|
self.auth_manager.auth_cached()
|
|
} else {
|
|
self.auth_manager.auth().await
|
|
};
|
|
match auth {
|
|
Some(auth) => {
|
|
let permanent_refresh_failure =
|
|
self.auth_manager.refresh_failure_for_auth(&auth).is_some();
|
|
let auth_mode = auth.api_auth_mode();
|
|
let (reported_auth_method, token_opt) =
|
|
if matches!(auth, CodexAuth::AgentIdentity(_))
|
|
|| include_token && permanent_refresh_failure
|
|
{
|
|
(Some(auth_mode), None)
|
|
} else {
|
|
match auth.get_token() {
|
|
Ok(token) if !token.is_empty() => {
|
|
let tok = if include_token { Some(token) } else { None };
|
|
(Some(auth_mode), tok)
|
|
}
|
|
Ok(_) => (None, None),
|
|
Err(err) => {
|
|
tracing::warn!("failed to get token for auth status: {err}");
|
|
(None, None)
|
|
}
|
|
}
|
|
};
|
|
GetAuthStatusResponse {
|
|
auth_method: reported_auth_method,
|
|
auth_token: token_opt,
|
|
requires_openai_auth: Some(true),
|
|
}
|
|
}
|
|
None => GetAuthStatusResponse {
|
|
auth_method: None,
|
|
auth_token: None,
|
|
requires_openai_auth: Some(true),
|
|
},
|
|
}
|
|
};
|
|
|
|
Ok(response)
|
|
}
|
|
|
|
async fn get_account_response(
|
|
&self,
|
|
params: GetAccountParams,
|
|
) -> Result<GetAccountResponse, JSONRPCErrorError> {
|
|
let do_refresh = params.refresh_token;
|
|
|
|
self.refresh_token_if_requested(do_refresh).await;
|
|
|
|
let provider = create_model_provider(
|
|
self.config.model_provider.clone(),
|
|
Some(self.auth_manager.clone()),
|
|
);
|
|
let account_state = match provider.account_state() {
|
|
Ok(account_state) => account_state,
|
|
Err(ProviderAccountError::MissingChatgptAccountDetails) => {
|
|
return Err(invalid_request(
|
|
"email and plan type are required for chatgpt authentication",
|
|
));
|
|
}
|
|
};
|
|
let account = account_state.account.map(Account::from);
|
|
|
|
Ok(GetAccountResponse {
|
|
account,
|
|
requires_openai_auth: account_state.requires_openai_auth,
|
|
})
|
|
}
|
|
|
|
async fn get_account_rate_limits_response(
|
|
&self,
|
|
) -> Result<GetAccountRateLimitsResponse, JSONRPCErrorError> {
|
|
self.fetch_account_rate_limits()
|
|
.await
|
|
.map(
|
|
|(rate_limits, rate_limits_by_limit_id)| GetAccountRateLimitsResponse {
|
|
rate_limits: rate_limits.into(),
|
|
rate_limits_by_limit_id: Some(
|
|
rate_limits_by_limit_id
|
|
.into_iter()
|
|
.map(|(limit_id, snapshot)| (limit_id, snapshot.into()))
|
|
.collect(),
|
|
),
|
|
},
|
|
)
|
|
}
|
|
|
|
async fn send_add_credits_nudge_email_response(
|
|
&self,
|
|
params: SendAddCreditsNudgeEmailParams,
|
|
) -> Result<SendAddCreditsNudgeEmailResponse, JSONRPCErrorError> {
|
|
self.send_add_credits_nudge_email_inner(params)
|
|
.await
|
|
.map(|status| SendAddCreditsNudgeEmailResponse { status })
|
|
}
|
|
|
|
async fn send_add_credits_nudge_email_inner(
|
|
&self,
|
|
params: SendAddCreditsNudgeEmailParams,
|
|
) -> Result<AddCreditsNudgeEmailStatus, JSONRPCErrorError> {
|
|
let Some(auth) = self.auth_manager.auth().await else {
|
|
return Err(invalid_request(
|
|
"codex account authentication required to notify workspace owner",
|
|
));
|
|
};
|
|
|
|
if !auth.uses_codex_backend() {
|
|
return Err(invalid_request(
|
|
"chatgpt authentication required to notify workspace owner",
|
|
));
|
|
}
|
|
|
|
let client = BackendClient::from_auth(self.config.chatgpt_base_url.clone(), &auth)
|
|
.map_err(|err| internal_error(format!("failed to construct backend client: {err}")))?;
|
|
|
|
match client
|
|
.send_add_credits_nudge_email(Self::backend_credit_type(params.credit_type))
|
|
.await
|
|
{
|
|
Ok(()) => Ok(AddCreditsNudgeEmailStatus::Sent),
|
|
Err(err) if err.status().is_some_and(|status| status.as_u16() == 429) => {
|
|
Ok(AddCreditsNudgeEmailStatus::CooldownActive)
|
|
}
|
|
Err(err) => Err(internal_error(format!(
|
|
"failed to notify workspace owner: {err}"
|
|
))),
|
|
}
|
|
}
|
|
|
|
fn backend_credit_type(value: AddCreditsNudgeCreditType) -> BackendAddCreditsNudgeCreditType {
|
|
match value {
|
|
AddCreditsNudgeCreditType::Credits => BackendAddCreditsNudgeCreditType::Credits,
|
|
AddCreditsNudgeCreditType::UsageLimit => BackendAddCreditsNudgeCreditType::UsageLimit,
|
|
}
|
|
}
|
|
|
|
async fn fetch_account_rate_limits(
|
|
&self,
|
|
) -> Result<
|
|
(
|
|
CoreRateLimitSnapshot,
|
|
HashMap<String, CoreRateLimitSnapshot>,
|
|
),
|
|
JSONRPCErrorError,
|
|
> {
|
|
let Some(auth) = self.auth_manager.auth().await else {
|
|
return Err(invalid_request(
|
|
"codex account authentication required to read rate limits",
|
|
));
|
|
};
|
|
|
|
if !auth.uses_codex_backend() {
|
|
return Err(invalid_request(
|
|
"chatgpt authentication required to read rate limits",
|
|
));
|
|
}
|
|
|
|
let client = BackendClient::from_auth(self.config.chatgpt_base_url.clone(), &auth)
|
|
.map_err(|err| internal_error(format!("failed to construct backend client: {err}")))?;
|
|
|
|
let snapshots = client
|
|
.get_rate_limits_many()
|
|
.await
|
|
.map_err(|err| internal_error(format!("failed to fetch codex rate limits: {err}")))?;
|
|
if snapshots.is_empty() {
|
|
return Err(internal_error(
|
|
"failed to fetch codex rate limits: no snapshots returned",
|
|
));
|
|
}
|
|
|
|
let rate_limits_by_limit_id: HashMap<String, CoreRateLimitSnapshot> = snapshots
|
|
.iter()
|
|
.cloned()
|
|
.map(|snapshot| {
|
|
let limit_id = snapshot
|
|
.limit_id
|
|
.clone()
|
|
.unwrap_or_else(|| "codex".to_string());
|
|
(limit_id, snapshot)
|
|
})
|
|
.collect();
|
|
|
|
let primary = snapshots
|
|
.iter()
|
|
.find(|snapshot| snapshot.limit_id.as_deref() == Some("codex"))
|
|
.cloned()
|
|
.unwrap_or_else(|| snapshots[0].clone());
|
|
|
|
Ok((primary, rate_limits_by_limit_id))
|
|
}
|
|
}
|