mirror of
https://github.com/openai/codex.git
synced 2026-04-24 06:35:50 +00:00
fix create-api-key flow cleanup
Co-authored-by: Codex <noreply@openai.com>
This commit is contained in:
@@ -240,6 +240,7 @@ use codex_git_utils::git_diff_to_remote;
|
||||
use codex_git_utils::resolve_root_git_project_for_trust;
|
||||
use codex_login::AuthManager;
|
||||
use codex_login::CLIENT_ID;
|
||||
use codex_login::CREATE_API_KEY_OAUTH_TIMEOUT;
|
||||
use codex_login::CodexAuth;
|
||||
use codex_login::OPENAI_API_KEY_ENV_VAR;
|
||||
use codex_login::PendingCreateApiKey;
|
||||
@@ -315,6 +316,7 @@ use std::sync::RwLock;
|
||||
use std::sync::atomic::AtomicBool;
|
||||
use std::sync::atomic::Ordering;
|
||||
use std::time::Duration;
|
||||
use std::time::Instant;
|
||||
use std::time::SystemTime;
|
||||
use tokio::sync::Mutex;
|
||||
use tokio::sync::broadcast;
|
||||
@@ -370,7 +372,12 @@ enum ActiveLogin {
|
||||
}
|
||||
|
||||
struct ActiveCreateApiKey {
|
||||
flow_id: Uuid,
|
||||
thread_id: ThreadId,
|
||||
started_at: Instant,
|
||||
auth_url: String,
|
||||
callback_port: u16,
|
||||
shutdown_handle: ShutdownHandle,
|
||||
pending: Option<PendingCreateApiKey>,
|
||||
}
|
||||
|
||||
@@ -417,9 +424,7 @@ impl Drop for ActiveLogin {
|
||||
|
||||
impl Drop for ActiveCreateApiKey {
|
||||
fn drop(&mut self) {
|
||||
if let Some(pending) = self.pending.as_ref() {
|
||||
pending.shutdown();
|
||||
}
|
||||
self.shutdown_handle.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2773,6 +2778,29 @@ impl CodexMessageProcessor {
|
||||
return;
|
||||
}
|
||||
|
||||
// If a previous client started this flow and never called finish, do not
|
||||
// try to bind fixed port 5000 until the old callback server is dropped.
|
||||
let existing_response = {
|
||||
let mut active_create_api_key = self.active_create_api_key.lock().await;
|
||||
if let Some(active) = active_create_api_key.as_ref()
|
||||
&& active.thread_id == thread_uuid
|
||||
&& active.started_at.elapsed() < CREATE_API_KEY_OAUTH_TIMEOUT
|
||||
&& active.pending.is_some()
|
||||
{
|
||||
Some(ThreadCreateApiKeyStartResponse::Started {
|
||||
auth_url: active.auth_url.clone(),
|
||||
callback_port: active.callback_port,
|
||||
})
|
||||
} else {
|
||||
drop(active_create_api_key.take());
|
||||
None
|
||||
}
|
||||
};
|
||||
if let Some(response) = existing_response {
|
||||
self.outgoing.send_response(request_id, response).await;
|
||||
return;
|
||||
}
|
||||
|
||||
let pending = match start_create_api_key() {
|
||||
Ok(pending) => pending,
|
||||
Err(err) => {
|
||||
@@ -2786,14 +2814,33 @@ impl CodexMessageProcessor {
|
||||
};
|
||||
let auth_url = pending.auth_url().to_string();
|
||||
let callback_port = pending.callback_port();
|
||||
let shutdown_handle = pending.shutdown_handle();
|
||||
let flow_id = Uuid::new_v4();
|
||||
|
||||
let mut active_create_api_key = self.active_create_api_key.lock().await;
|
||||
*active_create_api_key = Some(ActiveCreateApiKey {
|
||||
flow_id,
|
||||
thread_id: thread_uuid,
|
||||
started_at: Instant::now(),
|
||||
auth_url: auth_url.clone(),
|
||||
callback_port,
|
||||
shutdown_handle,
|
||||
pending: Some(pending),
|
||||
});
|
||||
drop(active_create_api_key);
|
||||
|
||||
let active_create_api_key = Arc::clone(&self.active_create_api_key);
|
||||
tokio::spawn(async move {
|
||||
tokio::time::sleep(CREATE_API_KEY_OAUTH_TIMEOUT).await;
|
||||
let mut active_create_api_key = active_create_api_key.lock().await;
|
||||
if active_create_api_key
|
||||
.as_ref()
|
||||
.is_some_and(|active| active.flow_id == flow_id)
|
||||
{
|
||||
drop(active_create_api_key.take());
|
||||
}
|
||||
});
|
||||
|
||||
self.outgoing
|
||||
.send_response(
|
||||
request_id,
|
||||
@@ -2820,7 +2867,7 @@ impl CodexMessageProcessor {
|
||||
}
|
||||
};
|
||||
|
||||
let pending = {
|
||||
let (flow_id, pending) = {
|
||||
let mut active_create_api_key = self.active_create_api_key.lock().await;
|
||||
let Some(mut active) = active_create_api_key.take() else {
|
||||
drop(active_create_api_key);
|
||||
@@ -2847,6 +2894,7 @@ impl CodexMessageProcessor {
|
||||
}
|
||||
|
||||
let Some(pending) = active.pending.take() else {
|
||||
*active_create_api_key = Some(active);
|
||||
drop(active_create_api_key);
|
||||
self.outgoing
|
||||
.send_error(
|
||||
@@ -2863,14 +2911,32 @@ impl CodexMessageProcessor {
|
||||
return;
|
||||
};
|
||||
|
||||
pending
|
||||
// Keep an active marker while finish waits so a later start request
|
||||
// can cancel/restart this flow instead of failing on port 5000.
|
||||
let flow_id = active.flow_id;
|
||||
*active_create_api_key = Some(active);
|
||||
(flow_id, pending)
|
||||
};
|
||||
|
||||
let outgoing = Arc::clone(&self.outgoing);
|
||||
let active_create_api_key = Arc::clone(&self.active_create_api_key);
|
||||
tokio::spawn(async move {
|
||||
let result = pending.finish().await;
|
||||
let flow_was_active = {
|
||||
let mut active_create_api_key = active_create_api_key.lock().await;
|
||||
if active_create_api_key
|
||||
.as_ref()
|
||||
.is_some_and(|active| active.flow_id == flow_id)
|
||||
{
|
||||
drop(active_create_api_key.take());
|
||||
true
|
||||
} else {
|
||||
false
|
||||
}
|
||||
};
|
||||
|
||||
match result {
|
||||
Ok(created) => {
|
||||
Ok(created) if flow_was_active => {
|
||||
let response = ThreadCreateApiKeyFinishResponse {
|
||||
organization_id: created.organization_id,
|
||||
organization_title: created.organization_title,
|
||||
@@ -2886,6 +2952,18 @@ impl CodexMessageProcessor {
|
||||
.await;
|
||||
outgoing.send_response(request_id, response).await;
|
||||
}
|
||||
Ok(_) => {
|
||||
outgoing
|
||||
.send_error(
|
||||
request_id,
|
||||
JSONRPCErrorError {
|
||||
code: INTERNAL_ERROR_CODE,
|
||||
message: "API key creation was cancelled.".to_string(),
|
||||
data: None,
|
||||
},
|
||||
)
|
||||
.await;
|
||||
}
|
||||
Err(err) => {
|
||||
outgoing
|
||||
.send_error(
|
||||
|
||||
@@ -10,6 +10,7 @@ use url::Url;
|
||||
|
||||
use crate::oauth_callback_server::AuthorizationCodeServer;
|
||||
use crate::oauth_callback_server::PortConflictStrategy;
|
||||
use crate::oauth_callback_server::ShutdownHandle;
|
||||
use crate::oauth_callback_server::start_authorization_code_server;
|
||||
use crate::pkce::PkceCodes;
|
||||
|
||||
@@ -27,8 +28,8 @@ const USER_AGENT: &str = "Codex-Create-API-Key/1.0";
|
||||
const PROJECT_API_KEY_NAME: &str = "Codex CLI";
|
||||
const PROJECT_POLL_INTERVAL_SECONDS: u64 = 10;
|
||||
const PROJECT_POLL_TIMEOUT_SECONDS: u64 = 60;
|
||||
const OAUTH_TIMEOUT_SECONDS: u64 = 15 * 60;
|
||||
const HTTP_TIMEOUT_SECONDS: u64 = 30;
|
||||
pub const CREATE_API_KEY_OAUTH_TIMEOUT: Duration = Duration::from_secs(60);
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
struct CreateApiKeyOptions {
|
||||
@@ -65,14 +66,14 @@ impl PendingCreateApiKey {
|
||||
self.callback_server.open_browser()
|
||||
}
|
||||
|
||||
pub fn shutdown(&self) {
|
||||
self.callback_server.shutdown();
|
||||
pub fn shutdown_handle(&self) -> ShutdownHandle {
|
||||
self.callback_server.shutdown_handle.clone()
|
||||
}
|
||||
|
||||
pub async fn finish(self) -> Result<CreatedApiKey, CreateApiKeyError> {
|
||||
let code = self
|
||||
.callback_server
|
||||
.wait_for_code(Duration::from_secs(OAUTH_TIMEOUT_SECONDS))
|
||||
.wait_for_code(CREATE_API_KEY_OAUTH_TIMEOUT)
|
||||
.await
|
||||
.map_err(|err| CreateApiKeyError::message(err.to_string()))?;
|
||||
create_api_key_from_authorization_code(
|
||||
|
||||
@@ -11,6 +11,7 @@ mod pkce;
|
||||
mod server;
|
||||
|
||||
pub use codex_client::BuildCustomCaTransportError as BuildLoginHttpClientError;
|
||||
pub use create_api_key::CREATE_API_KEY_OAUTH_TIMEOUT;
|
||||
pub use create_api_key::CreateApiKeyError;
|
||||
pub use create_api_key::CreatedApiKey;
|
||||
pub use create_api_key::PendingCreateApiKey;
|
||||
|
||||
@@ -53,7 +53,7 @@ pub(crate) struct AuthorizationCodeServer {
|
||||
pub redirect_uri: String,
|
||||
code_verifier: String,
|
||||
server_handle: tokio::task::JoinHandle<io::Result<String>>,
|
||||
shutdown_handle: ShutdownHandle,
|
||||
pub(crate) shutdown_handle: ShutdownHandle,
|
||||
}
|
||||
|
||||
impl AuthorizationCodeServer {
|
||||
@@ -65,10 +65,6 @@ impl AuthorizationCodeServer {
|
||||
&self.code_verifier
|
||||
}
|
||||
|
||||
pub(crate) fn shutdown(&self) {
|
||||
self.shutdown_handle.shutdown();
|
||||
}
|
||||
|
||||
pub async fn wait_for_code(self, timeout: Duration) -> io::Result<String> {
|
||||
let AuthorizationCodeServer {
|
||||
server_handle,
|
||||
|
||||
Reference in New Issue
Block a user