fix: fix readiness subscribe token wrap-around (#8770)

Fixes ReadinessFlag::subscribe to avoid handing out token 0 or duplicate
tokens on i32 wrap-around, adds regression tests, and prevents readiness
gates from getting stuck waiting on an unmarkable or mis-authorized
token.
This commit is contained in:
Thibault Sottiaux
2026-01-06 05:09:02 -08:00
committed by GitHub
parent 4c3d2a5bbe
commit 6346e4f560

View File

@@ -118,26 +118,25 @@ impl Readiness for ReadinessFlag {
return Err(errors::ReadinessError::FlagAlreadyReady);
}
// Generate a token; ensure it's not 0.
let token = Token(self.next_id.fetch_add(1, Ordering::Relaxed));
// Recheck readiness while holding the lock so mark_ready can't flip the flag between the
// check above and inserting the token.
let inserted = self
// check above and inserting the token. Also ensure the token is non-zero and unique in
// the presence of `i32` wrap-around.
let token = self
.with_tokens(|tokens| {
if self.load_ready() {
return false;
return None;
}
loop {
let token = Token(self.next_id.fetch_add(1, Ordering::Relaxed));
if token.0 != 0 && tokens.insert(token) {
return Some(token);
}
}
tokens.insert(token);
true
})
.await?;
if !inserted {
return Err(errors::ReadinessError::FlagAlreadyReady);
}
Ok(token)
token.ok_or(errors::ReadinessError::FlagAlreadyReady)
}
async fn mark_ready(&self, token: Token) -> Result<bool, errors::ReadinessError> {
@@ -199,6 +198,7 @@ mod errors {
#[cfg(test)]
mod tests {
use std::sync::Arc;
use std::sync::atomic::Ordering;
use super::Readiness;
use super::ReadinessFlag;
@@ -289,4 +289,26 @@ mod tests {
.expect_err("contended subscribe should report a lock failure");
assert_matches!(err, ReadinessError::TokenLockFailed);
}
#[tokio::test]
async fn subscribe_skips_zero_token() -> Result<(), ReadinessError> {
let flag = ReadinessFlag::new();
flag.next_id.store(0, Ordering::Relaxed);
let token = flag.subscribe().await?;
assert_ne!(token, Token(0));
assert!(flag.mark_ready(token).await?);
Ok(())
}
#[tokio::test]
async fn subscribe_avoids_duplicate_tokens() -> Result<(), ReadinessError> {
let flag = ReadinessFlag::new();
let token = flag.subscribe().await?;
flag.next_id.store(token.0, Ordering::Relaxed);
let token2 = flag.subscribe().await?;
assert_ne!(token2, token);
Ok(())
}
}