mirror of
https://github.com/openai/codex.git
synced 2026-04-24 14:45:27 +00:00
codex: address PR review feedback (#16989)
This commit is contained in:
@@ -70,6 +70,9 @@ pub use remote_control::RemoteControlEnrollmentRecord;
|
||||
const LOG_PARTITION_SIZE_LIMIT_BYTES: i64 = 10 * 1024 * 1024;
|
||||
const LOG_PARTITION_ROW_LIMIT: i64 = 1_000;
|
||||
const CHECKPOINT_WAL_BUSY_MAX_ATTEMPTS: usize = 10;
|
||||
const CHECKPOINT_WAL_BUSY_TIMEOUT_MS: u64 = 100;
|
||||
const SQLITE_BUSY_TIMEOUT: Duration = Duration::from_secs(5);
|
||||
const SQLITE_BUSY_TIMEOUT_MS: u64 = 5_000;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct StateRuntime {
|
||||
@@ -150,28 +153,43 @@ impl StateRuntime {
|
||||
}
|
||||
|
||||
async fn checkpoint_wal_pool(pool: &SqlitePool, name: &str) -> anyhow::Result<()> {
|
||||
let mut last_busy_result = None;
|
||||
for attempt in 1..=CHECKPOINT_WAL_BUSY_MAX_ATTEMPTS {
|
||||
let row = sqlx::query("PRAGMA wal_checkpoint(TRUNCATE)")
|
||||
.fetch_one(pool)
|
||||
.await?;
|
||||
let busy: i64 = row.try_get(0)?;
|
||||
let log_pages: i64 = row.try_get(1)?;
|
||||
let checkpointed_pages: i64 = row.try_get(2)?;
|
||||
if busy == 0 {
|
||||
return Ok(());
|
||||
let mut conn = pool.acquire().await?;
|
||||
sqlx::query(&format!(
|
||||
"PRAGMA busy_timeout = {CHECKPOINT_WAL_BUSY_TIMEOUT_MS}"
|
||||
))
|
||||
.execute(&mut *conn)
|
||||
.await?;
|
||||
let checkpoint_result = async {
|
||||
let mut last_busy_result = None;
|
||||
for attempt in 1..=CHECKPOINT_WAL_BUSY_MAX_ATTEMPTS {
|
||||
let row = sqlx::query("PRAGMA wal_checkpoint(TRUNCATE)")
|
||||
.fetch_one(&mut *conn)
|
||||
.await?;
|
||||
let busy: i64 = row.try_get(0)?;
|
||||
let log_pages: i64 = row.try_get(1)?;
|
||||
let checkpointed_pages: i64 = row.try_get(2)?;
|
||||
if busy == 0 {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
last_busy_result = Some((checkpointed_pages, log_pages));
|
||||
if attempt < CHECKPOINT_WAL_BUSY_MAX_ATTEMPTS {
|
||||
tokio::time::sleep(Duration::from_millis(50)).await;
|
||||
}
|
||||
}
|
||||
|
||||
last_busy_result = Some((checkpointed_pages, log_pages));
|
||||
if attempt < CHECKPOINT_WAL_BUSY_MAX_ATTEMPTS {
|
||||
tokio::time::sleep(Duration::from_millis(50)).await;
|
||||
}
|
||||
}
|
||||
let (checkpointed_pages, log_pages) = last_busy_result.unwrap_or((0, 0));
|
||||
anyhow::bail!(
|
||||
"{name} WAL checkpoint was busy: {checkpointed_pages}/{log_pages} pages checkpointed"
|
||||
)
|
||||
};
|
||||
let checkpoint_result: anyhow::Result<()> = checkpoint_result.await;
|
||||
let restore_result = sqlx::query(&format!("PRAGMA busy_timeout = {SQLITE_BUSY_TIMEOUT_MS}"))
|
||||
.execute(&mut *conn)
|
||||
.await;
|
||||
restore_result?;
|
||||
|
||||
let (checkpointed_pages, log_pages) = last_busy_result.unwrap_or((0, 0));
|
||||
anyhow::bail!(
|
||||
"{name} WAL checkpoint was busy: {checkpointed_pages}/{log_pages} pages checkpointed"
|
||||
)
|
||||
checkpoint_result
|
||||
}
|
||||
|
||||
fn base_sqlite_options(path: &Path) -> SqliteConnectOptions {
|
||||
@@ -180,7 +198,7 @@ fn base_sqlite_options(path: &Path) -> SqliteConnectOptions {
|
||||
.create_if_missing(true)
|
||||
.journal_mode(SqliteJournalMode::Wal)
|
||||
.synchronous(SqliteSynchronous::Normal)
|
||||
.busy_timeout(Duration::from_secs(5))
|
||||
.busy_timeout(SQLITE_BUSY_TIMEOUT)
|
||||
.log_statements(LevelFilter::Off)
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user