mirror of
https://github.com/openai/codex.git
synced 2026-04-24 14:45:27 +00:00
fix create-api-key flow cancellation races
Co-authored-by: Codex <noreply@openai.com>
This commit is contained in:
@@ -2779,26 +2779,50 @@ impl CodexMessageProcessor {
|
||||
|
||||
// If a previous client started this flow and never called finish, do not
|
||||
// try to bind fixed port 5000 until the old callback server is dropped.
|
||||
let existing_response = {
|
||||
let (existing_response, error_message, pending_to_cancel) = {
|
||||
let mut active_create_api_key = self.active_create_api_key.lock().await;
|
||||
if let Some(active) = active_create_api_key.as_ref()
|
||||
&& active.thread_id == thread_uuid
|
||||
&& active.started_at.elapsed() < CREATE_API_KEY_OAUTH_TIMEOUT
|
||||
&& active.pending.is_some()
|
||||
{
|
||||
Some(ThreadCreateApiKeyStartResponse::Started {
|
||||
auth_url: active.auth_url.clone(),
|
||||
callback_port: active.callback_port,
|
||||
})
|
||||
(
|
||||
Some(ThreadCreateApiKeyStartResponse::Started {
|
||||
auth_url: active.auth_url.clone(),
|
||||
callback_port: active.callback_port,
|
||||
}),
|
||||
None,
|
||||
None,
|
||||
)
|
||||
} else if let Some(active) = active_create_api_key.as_ref()
|
||||
&& active.pending.is_none()
|
||||
{
|
||||
(
|
||||
None,
|
||||
Some(format!(
|
||||
"API key creation is already in progress for thread {}",
|
||||
active.thread_id
|
||||
)),
|
||||
None,
|
||||
)
|
||||
} else {
|
||||
drop(active_create_api_key.take());
|
||||
None
|
||||
let pending_to_cancel = active_create_api_key
|
||||
.take()
|
||||
.and_then(|mut active| active.pending.take());
|
||||
(None, None, pending_to_cancel)
|
||||
}
|
||||
};
|
||||
if let Some(response) = existing_response {
|
||||
self.outgoing.send_response(request_id, response).await;
|
||||
return;
|
||||
}
|
||||
if let Some(message) = error_message {
|
||||
self.send_invalid_request_error(request_id, message).await;
|
||||
return;
|
||||
}
|
||||
if let Some(pending) = pending_to_cancel {
|
||||
pending.cancel().await;
|
||||
}
|
||||
|
||||
let pending = match start_create_api_key() {
|
||||
Ok(pending) => pending,
|
||||
@@ -2831,12 +2855,21 @@ impl CodexMessageProcessor {
|
||||
let active_create_api_key = Arc::clone(&self.active_create_api_key);
|
||||
tokio::spawn(async move {
|
||||
tokio::time::sleep(CREATE_API_KEY_OAUTH_TIMEOUT).await;
|
||||
let mut active_create_api_key = active_create_api_key.lock().await;
|
||||
if active_create_api_key
|
||||
.as_ref()
|
||||
.is_some_and(|active| active.flow_id == flow_id)
|
||||
{
|
||||
drop(active_create_api_key.take());
|
||||
let pending_to_cancel = {
|
||||
let mut active_create_api_key = active_create_api_key.lock().await;
|
||||
if active_create_api_key
|
||||
.as_ref()
|
||||
.is_some_and(|active| active.flow_id == flow_id && active.pending.is_some())
|
||||
{
|
||||
active_create_api_key
|
||||
.take()
|
||||
.and_then(|mut active| active.pending.take())
|
||||
} else {
|
||||
None
|
||||
}
|
||||
};
|
||||
if let Some(pending) = pending_to_cancel {
|
||||
pending.cancel().await;
|
||||
}
|
||||
});
|
||||
|
||||
|
||||
@@ -70,6 +70,10 @@ impl PendingCreateApiKey {
|
||||
self.callback_server.shutdown_handle.clone()
|
||||
}
|
||||
|
||||
pub async fn cancel(self) {
|
||||
self.callback_server.shutdown_and_wait().await;
|
||||
}
|
||||
|
||||
pub async fn finish(self) -> Result<CreatedApiKey, CreateApiKeyError> {
|
||||
let code = self
|
||||
.callback_server
|
||||
|
||||
@@ -51,6 +51,7 @@ pub(crate) struct AuthorizationCodeServer {
|
||||
pub auth_url: String,
|
||||
pub actual_port: u16,
|
||||
pub redirect_uri: String,
|
||||
server: Arc<Server>,
|
||||
code_verifier: String,
|
||||
server_handle: tokio::task::JoinHandle<io::Result<String>>,
|
||||
pub(crate) shutdown_handle: ShutdownHandle,
|
||||
@@ -65,8 +66,21 @@ impl AuthorizationCodeServer {
|
||||
&self.code_verifier
|
||||
}
|
||||
|
||||
pub(crate) async fn shutdown_and_wait(self) {
|
||||
let AuthorizationCodeServer {
|
||||
server,
|
||||
server_handle,
|
||||
shutdown_handle,
|
||||
..
|
||||
} = self;
|
||||
shutdown_handle.shutdown();
|
||||
server.unblock();
|
||||
let _ = server_handle.await;
|
||||
}
|
||||
|
||||
pub async fn wait_for_code(self, timeout: Duration) -> io::Result<String> {
|
||||
let AuthorizationCodeServer {
|
||||
server,
|
||||
server_handle,
|
||||
shutdown_handle,
|
||||
..
|
||||
@@ -81,6 +95,7 @@ impl AuthorizationCodeServer {
|
||||
}
|
||||
_ = tokio::time::sleep(timeout) => {
|
||||
shutdown_handle.shutdown();
|
||||
server.unblock();
|
||||
let _ = server_handle.await;
|
||||
Err(io::Error::new(
|
||||
io::ErrorKind::TimedOut,
|
||||
@@ -115,7 +130,7 @@ where
|
||||
}
|
||||
};
|
||||
let (server_handle, shutdown_handle) = spawn_callback_server_loop(
|
||||
server,
|
||||
server.clone(),
|
||||
rx,
|
||||
"Authentication was not completed",
|
||||
move |url_raw| {
|
||||
@@ -129,6 +144,7 @@ where
|
||||
auth_url,
|
||||
actual_port,
|
||||
redirect_uri,
|
||||
server,
|
||||
code_verifier: pkce.code_verifier,
|
||||
server_handle,
|
||||
shutdown_handle,
|
||||
|
||||
Reference in New Issue
Block a user