Compare commits

...

11 Commits

Author SHA1 Message Date
Eric Traut
73af41cf2c codex: drain rollout queue on shutdown (#16599) 2026-04-08 22:25:36 -07:00
Eric Traut
3ad47058d4 codex: harden wait-agent shutdown test (#16599) 2026-04-08 22:25:36 -07:00
Eric Traut
c37a0c8314 codex: emit shutdown complete after durability errors (#16599) 2026-04-08 22:25:36 -07:00
Eric Traut
fc9ae98657 codex: checkpoint WAL after rollout shutdown errors (#16599) 2026-04-08 22:25:36 -07:00
Eric Traut
93eb7ec719 codex: distinguish shutdown durability failures (#16599) 2026-04-08 22:25:36 -07:00
Eric Traut
7b22f4a97d codex: address PR review feedback (#16989) 2026-04-08 22:25:36 -07:00
Eric Traut
ba96d37df4 codex: fix CI failure on PR #16989 2026-04-08 22:25:35 -07:00
Eric Traut
7615a2a93a codex: surface shutdown durability failures (#16599) 2026-04-08 22:25:35 -07:00
Eric Traut
fea9f89215 codex: address PR review feedback (#16989) 2026-04-08 22:25:35 -07:00
Eric Traut
b7669fc7e9 codex: address PR review feedback (#16989) 2026-04-08 22:25:35 -07:00
Eric Traut
4891934519 Durably flush shutdown state 2026-04-08 22:25:35 -07:00
9 changed files with 382 additions and 106 deletions

View File

@@ -402,6 +402,7 @@ enum AppListLoadResult {
enum ThreadShutdownResult {
Complete,
ShutdownFailed(String),
SubmitFailed,
TimedOut,
}
@@ -2171,6 +2172,9 @@ impl CodexMessageProcessor {
for thread_id in report.submit_failed {
warn!("failed to submit Shutdown to thread {thread_id}");
}
for (thread_id, error) in report.shutdown_failed {
warn!("thread {thread_id} shutdown completed with error: {error}");
}
for thread_id in report.timed_out {
warn!("timed out waiting for thread {thread_id} to shut down");
}
@@ -5433,6 +5437,9 @@ impl CodexMessageProcessor {
async fn wait_for_thread_shutdown(thread: &Arc<CodexThread>) -> ThreadShutdownResult {
match tokio::time::timeout(Duration::from_secs(10), thread.shutdown_and_wait()).await {
Ok(Ok(())) => ThreadShutdownResult::Complete,
Ok(Err(CodexErr::ShutdownFailed(message))) => {
ThreadShutdownResult::ShutdownFailed(message)
}
Ok(Err(_)) => ThreadShutdownResult::SubmitFailed,
Err(_) => ThreadShutdownResult::TimedOut,
}
@@ -5515,40 +5522,41 @@ impl CodexMessageProcessor {
let thread_manager = self.thread_manager.clone();
let thread_watch_manager = self.thread_watch_manager.clone();
tokio::spawn(async move {
match Self::wait_for_thread_shutdown(&thread).await {
ThreadShutdownResult::Complete => {
if thread_manager.remove_thread(&thread_id).await.is_none() {
info!(
"thread {thread_id} was already removed before unsubscribe finalized"
);
thread_watch_manager
.remove_thread(&thread_id.to_string())
.await;
pending_thread_unloads.lock().await.remove(&thread_id);
return;
}
thread_watch_manager
.remove_thread(&thread_id.to_string())
.await;
let notification = ThreadClosedNotification {
thread_id: thread_id.to_string(),
};
outgoing
.send_server_notification(ServerNotification::ThreadClosed(
notification,
))
.await;
pending_thread_unloads.lock().await.remove(&thread_id);
}
let shutdown_error = match Self::wait_for_thread_shutdown(&thread).await {
ThreadShutdownResult::Complete => None,
ThreadShutdownResult::ShutdownFailed(error) => Some(error),
ThreadShutdownResult::SubmitFailed => {
pending_thread_unloads.lock().await.remove(&thread_id);
warn!("failed to submit Shutdown to thread {thread_id}");
return;
}
ThreadShutdownResult::TimedOut => {
pending_thread_unloads.lock().await.remove(&thread_id);
warn!("thread {thread_id} shutdown timed out; leaving thread loaded");
return;
}
};
if let Some(error) = shutdown_error {
error!("thread {thread_id} shutdown completed with error: {error}");
}
if thread_manager.remove_thread(&thread_id).await.is_none() {
info!("thread {thread_id} was already removed before unsubscribe finalized");
thread_watch_manager
.remove_thread(&thread_id.to_string())
.await;
pending_thread_unloads.lock().await.remove(&thread_id);
return;
}
thread_watch_manager
.remove_thread(&thread_id.to_string())
.await;
let notification = ThreadClosedNotification {
thread_id: thread_id.to_string(),
};
outgoing
.send_server_notification(ServerNotification::ThreadClosed(notification))
.await;
pending_thread_unloads.lock().await.remove(&thread_id);
});
}
@@ -5635,6 +5643,9 @@ impl CodexMessageProcessor {
info!("thread {thread_id} was active; shutting down");
match Self::wait_for_thread_shutdown(&conversation).await {
ThreadShutdownResult::Complete => {}
ThreadShutdownResult::ShutdownFailed(error) => {
error!("thread {thread_id} shutdown completed with error: {error}");
}
ThreadShutdownResult::SubmitFailed => {
error!(
"failed to submit Shutdown to thread {thread_id}; proceeding with archive"

View File

@@ -747,7 +747,10 @@ impl Codex {
Err(err) => return Err(err),
}
session_loop_termination.await;
Ok(())
match self.session.shutdown_failure.borrow().clone() {
Some(message) => Err(CodexErr::ShutdownFailed(message)),
_ => Ok(()),
}
}
pub async fn next_event(&self) -> CodexResult<Event> {
@@ -821,6 +824,7 @@ pub(crate) struct Session {
pub(crate) conversation_id: ThreadId,
tx_event: Sender<Event>,
agent_status: watch::Sender<AgentStatus>,
shutdown_failure: watch::Sender<Option<String>>,
out_of_band_elicitation_paused: watch::Sender<bool>,
state: Mutex<SessionState>,
/// Serializes rebuild/apply cycles for the running proxy; each cycle
@@ -2039,12 +2043,14 @@ impl Session {
));
let (out_of_band_elicitation_paused, _out_of_band_elicitation_paused_rx) =
watch::channel(false);
let (shutdown_failure, _shutdown_failure_rx) = watch::channel(None);
let (mailbox, mailbox_rx) = Mailbox::new();
let sess = Arc::new(Session {
conversation_id,
tx_event: tx_event.clone(),
agent_status,
shutdown_failure,
out_of_band_elicitation_paused,
state: Mutex::new(state),
managed_network_proxy_refresh_lock: Mutex::new(()),
@@ -5629,6 +5635,7 @@ mod handlers {
// Gracefully flush and shutdown rollout recorder on session end so tests
// that inspect the rollout file do not race with the background writer.
let mut shutdown_failures = Vec::new();
let recorder_opt = {
let mut guard = sess.services.rollout.lock().await;
guard.take()
@@ -5637,15 +5644,37 @@ mod handlers {
&& let Err(e) = rec.shutdown().await
{
warn!("failed to shutdown rollout recorder: {e}");
let message = "Failed to shutdown rollout recorder".to_string();
shutdown_failures.push(message.clone());
let event = Event {
id: sub_id.clone(),
msg: EventMsg::Error(ErrorEvent {
message: "Failed to shutdown rollout recorder".to_string(),
message,
codex_error_info: Some(CodexErrorInfo::Other),
}),
};
sess.send_event_raw(event).await;
}
if let Some(state_db) = sess.services.state_db.as_deref()
&& let Err(e) = state_db.checkpoint_wal().await
{
warn!("failed to checkpoint state db WAL during shutdown: {e}");
let message = "Failed to checkpoint state database WAL".to_string();
shutdown_failures.push(message.clone());
let event = Event {
id: sub_id.clone(),
msg: EventMsg::Error(ErrorEvent {
message,
codex_error_info: Some(CodexErrorInfo::Other),
}),
};
sess.send_event_raw(event).await;
}
if !shutdown_failures.is_empty() {
sess.shutdown_failure
.send_replace(Some(shutdown_failures.join("; ")));
}
let event = Event {
id: sub_id,

View File

@@ -2949,6 +2949,7 @@ pub(crate) async fn make_session_and_context() -> (Session, TurnContext) {
conversation_id,
tx_event,
agent_status: agent_status_tx,
shutdown_failure: watch::channel(None).0,
out_of_band_elicitation_paused: watch::channel(false).0,
state: Mutex::new(state),
managed_network_proxy_refresh_lock: Mutex::new(()),
@@ -3494,6 +3495,42 @@ async fn shutdown_and_wait_waits_when_shutdown_is_already_in_progress() {
.expect("shutdown waiter");
}
#[tokio::test]
async fn shutdown_and_wait_returns_shutdown_error_status() {
let (session, _turn_context) = make_session_and_context().await;
let (tx_sub, rx_sub) = async_channel::bounded(4);
let (_tx_event, rx_event) = async_channel::unbounded();
let (_agent_status_tx, agent_status) = watch::channel(AgentStatus::PendingInit);
let error_message = "shutdown durability failed".to_string();
let status_message = error_message.clone();
let session = Arc::new(session);
let session_for_loop = Arc::clone(&session);
let session_loop_handle = tokio::spawn(async move {
let shutdown: Submission = rx_sub.recv().await.expect("shutdown submission");
assert_eq!(shutdown.op, Op::Shutdown);
session_for_loop
.shutdown_failure
.send_replace(Some(status_message));
});
let codex = Arc::new(Codex {
tx_sub,
rx_event,
agent_status,
session,
session_loop_termination: session_loop_termination_from_handle(session_loop_handle),
});
let err = codex
.shutdown_and_wait()
.await
.expect_err("shutdown error status should propagate");
match err {
CodexErr::ShutdownFailed(message) => assert_eq!(message, error_message),
other => panic!("unexpected error: {other:?}"),
}
}
#[tokio::test]
async fn shutdown_and_wait_shuts_down_cached_guardian_subagent() {
let (parent_session, parent_turn_context) = make_session_and_context().await;
@@ -3793,6 +3830,7 @@ pub(crate) async fn make_session_and_context_with_dynamic_tools_and_rx(
conversation_id,
tx_event,
agent_status: agent_status_tx,
shutdown_failure: watch::channel(None).0,
out_of_band_elicitation_paused: watch::channel(false).0,
state: Mutex::new(state),
managed_network_proxy_refresh_lock: Mutex::new(()),

View File

@@ -179,12 +179,14 @@ impl From<usize> for ForkSnapshot {
#[derive(Debug, Default, PartialEq, Eq)]
pub struct ThreadShutdownReport {
pub completed: Vec<ThreadId>,
pub shutdown_failed: Vec<(ThreadId, String)>,
pub submit_failed: Vec<ThreadId>,
pub timed_out: Vec<ThreadId>,
}
enum ShutdownOutcome {
Complete,
ShutdownFailed(String),
SubmitFailed,
TimedOut,
}
@@ -605,6 +607,9 @@ impl ThreadManager {
let outcome = match tokio::time::timeout(timeout, thread.shutdown_and_wait()).await
{
Ok(Ok(())) => ShutdownOutcome::Complete,
Ok(Err(CodexErr::ShutdownFailed(message))) => {
ShutdownOutcome::ShutdownFailed(message)
}
Ok(Err(_)) => ShutdownOutcome::SubmitFailed,
Err(_) => ShutdownOutcome::TimedOut,
};
@@ -616,6 +621,9 @@ impl ThreadManager {
while let Some((thread_id, outcome)) = shutdowns.next().await {
match outcome {
ShutdownOutcome::Complete => report.completed.push(thread_id),
ShutdownOutcome::ShutdownFailed(message) => {
report.shutdown_failed.push((thread_id, message));
}
ShutdownOutcome::SubmitFailed => report.submit_failed.push(thread_id),
ShutdownOutcome::TimedOut => report.timed_out.push(thread_id),
}
@@ -625,10 +633,16 @@ impl ThreadManager {
for thread_id in &report.completed {
tracked_threads.remove(thread_id);
}
for (thread_id, _) in &report.shutdown_failed {
tracked_threads.remove(thread_id);
}
report
.completed
.sort_by_key(std::string::ToString::to_string);
report
.shutdown_failed
.sort_by_key(|(thread_id, _)| thread_id.to_string());
report
.submit_failed
.sort_by_key(std::string::ToString::to_string);

View File

@@ -2473,14 +2473,12 @@ async fn wait_agent_returns_final_status_without_timeout() {
.await
.expect("subscribe should succeed");
let _ = thread
thread
.thread
.submit(Op::Shutdown {})
.shutdown_and_wait()
.await
.expect("shutdown should submit");
let _ = timeout(Duration::from_secs(1), status_rx.changed())
.await
.expect("shutdown status should arrive");
.expect("shutdown should complete");
assert_eq!(status_rx.borrow_and_update().clone(), AgentStatus::Shutdown);
let invocation = invocation(
Arc::new(session),

View File

@@ -137,6 +137,8 @@ pub enum CodexErr {
UnsupportedOperation(String),
#[error("{0}")]
RefreshTokenFailed(RefreshTokenFailedError),
#[error("shutdown failed: {0}")]
ShutdownFailed(String),
#[error("Fatal error: {0}")]
Fatal(String),
// -----------------------------------------------------------------
@@ -171,6 +173,7 @@ impl CodexErr {
| CodexErr::Interrupted
| CodexErr::EnvVar(_)
| CodexErr::Fatal(_)
| CodexErr::ShutdownFailed(_)
| CodexErr::UsageNotIncluded
| CodexErr::QuotaExceeded
| CodexErr::InvalidImageRequest()

View File

@@ -98,10 +98,10 @@ enum RolloutCmd {
},
/// Ensure all prior writes are processed; respond when flushed.
Flush {
ack: oneshot::Sender<()>,
ack: oneshot::Sender<std::io::Result<()>>,
},
Shutdown {
ack: oneshot::Sender<()>,
ack: oneshot::Sender<std::io::Result<()>>,
},
}
@@ -525,7 +525,7 @@ impl RolloutRecorder {
.await
.map_err(|e| IoError::other(format!("failed to queue rollout flush: {e}")))?;
rx.await
.map_err(|e| IoError::other(format!("failed waiting for rollout flush: {e}")))
.map_err(|e| IoError::other(format!("failed waiting for rollout flush: {e}")))?
}
pub async fn load_rollout_items(
@@ -613,9 +613,9 @@ impl RolloutRecorder {
pub async fn shutdown(&self) -> std::io::Result<()> {
let (tx_done, rx_done) = oneshot::channel();
match self.tx.send(RolloutCmd::Shutdown { ack: tx_done }).await {
Ok(_) => rx_done
.await
.map_err(|e| IoError::other(format!("failed waiting for rollout shutdown: {e}")))?,
Ok(_) => rx_done.await.map_err(|e| {
IoError::other(format!("failed waiting for rollout shutdown: {e}"))
})??,
Err(e) => {
warn!("failed to send rollout shutdown command: {e}");
return Err(IoError::other(format!(
@@ -746,18 +746,10 @@ async fn rollout_writer(
while let Some(cmd) = rx.recv().await {
match cmd {
RolloutCmd::AddItems(items) => {
if items.is_empty() {
continue;
}
if writer.is_none() {
buffered_items.extend(items);
continue;
}
write_and_reconcile_items(
writer.as_mut(),
items.as_slice(),
write_or_buffer_rollout_items(
&mut writer,
&mut buffered_items,
items,
&rollout_path,
state_db_ctx.as_deref(),
state_builder.as_ref(),
@@ -766,70 +758,104 @@ async fn rollout_writer(
.await?;
}
RolloutCmd::Persist { ack } => {
if writer.is_none() {
let result = async {
let Some(log_file_info) = deferred_log_file_info.take() else {
return Err(IoError::other(
"deferred rollout recorder missing log file metadata",
));
};
let file = open_log_file(log_file_info.path.as_path())?;
writer = Some(JsonlWriter {
file: tokio::fs::File::from_std(file),
});
let result = async {
materialize_writer_if_needed(
&mut writer,
&mut deferred_log_file_info,
&mut meta,
&mut buffered_items,
&cwd,
&rollout_path,
state_db_ctx.as_deref(),
&mut state_builder,
default_provider.as_str(),
generate_memories,
)
.await?;
flush_writer(writer.as_mut()).await
}
.await;
if let Some(session_meta) = meta.take() {
write_session_meta(
writer.as_mut(),
session_meta,
&cwd,
&rollout_path,
state_db_ctx.as_deref(),
&mut state_builder,
default_provider.as_str(),
generate_memories,
)
.await?;
}
if !buffered_items.is_empty() {
write_and_reconcile_items(
writer.as_mut(),
buffered_items.as_slice(),
&rollout_path,
state_db_ctx.as_deref(),
state_builder.as_ref(),
default_provider.as_str(),
)
.await?;
buffered_items.clear();
}
Ok(())
match result {
Ok(()) => {
let _ = ack.send(Ok(()));
}
.await;
if let Err(err) = result {
let kind = err.kind();
let message = err.to_string();
let _ = ack.send(Err(IoError::new(kind, message.clone())));
return Err(IoError::new(kind, message));
Err(err) => {
let return_err = clone_io_error(&err);
let _ = ack.send(Err(err));
return Err(return_err);
}
}
let _ = ack.send(Ok(()));
}
RolloutCmd::Flush { ack } => {
// Deferred fresh threads may not have an initialized file yet.
if let Some(writer) = writer.as_mut()
&& let Err(e) = writer.file.flush().await
{
let _ = ack.send(());
return Err(e);
match flush_writer(writer.as_mut()).await {
Ok(()) => {
let _ = ack.send(Ok(()));
}
Err(err) => {
let return_err = clone_io_error(&err);
let _ = ack.send(Err(err));
return Err(return_err);
}
}
let _ = ack.send(());
}
RolloutCmd::Shutdown { ack } => {
let _ = ack.send(());
rx.close();
let mut shutdown_acks = vec![ack];
let result = async {
while let Some(cmd) = rx.recv().await {
match cmd {
RolloutCmd::AddItems(items) => {
write_or_buffer_rollout_items(
&mut writer,
&mut buffered_items,
items,
&rollout_path,
state_db_ctx.as_deref(),
state_builder.as_ref(),
default_provider.as_str(),
)
.await?;
}
RolloutCmd::Persist { ack }
| RolloutCmd::Flush { ack }
| RolloutCmd::Shutdown { ack } => {
shutdown_acks.push(ack);
}
}
}
materialize_writer_if_needed(
&mut writer,
&mut deferred_log_file_info,
&mut meta,
&mut buffered_items,
&cwd,
&rollout_path,
state_db_ctx.as_deref(),
&mut state_builder,
default_provider.as_str(),
generate_memories,
)
.await?;
sync_writer(writer.as_mut()).await
}
.await;
match result {
Ok(()) => {
for ack in shutdown_acks {
let _ = ack.send(Ok(()));
}
break;
}
Err(err) => {
let return_err = clone_io_error(&err);
for ack in shutdown_acks {
let _ = ack.send(Err(clone_io_error(&err)));
}
return Err(return_err);
}
}
}
}
}
@@ -837,6 +863,111 @@ async fn rollout_writer(
Ok(())
}
async fn write_or_buffer_rollout_items(
writer: &mut Option<JsonlWriter>,
buffered_items: &mut Vec<RolloutItem>,
items: Vec<RolloutItem>,
rollout_path: &Path,
state_db_ctx: Option<&StateRuntime>,
state_builder: Option<&ThreadMetadataBuilder>,
default_provider: &str,
) -> std::io::Result<()> {
if items.is_empty() {
return Ok(());
}
if writer.is_none() {
buffered_items.extend(items);
return Ok(());
}
write_and_reconcile_items(
writer.as_mut(),
items.as_slice(),
rollout_path,
state_db_ctx,
state_builder,
default_provider,
)
.await
}
fn clone_io_error(err: &IoError) -> IoError {
IoError::new(err.kind(), err.to_string())
}
async fn sync_writer(writer: Option<&mut JsonlWriter>) -> std::io::Result<()> {
if let Some(writer) = writer {
writer.file.flush().await?;
writer.file.sync_all().await?;
}
Ok(())
}
async fn flush_writer(writer: Option<&mut JsonlWriter>) -> std::io::Result<()> {
if let Some(writer) = writer {
writer.file.flush().await?;
}
Ok(())
}
#[allow(clippy::too_many_arguments)]
async fn materialize_writer_if_needed(
writer: &mut Option<JsonlWriter>,
deferred_log_file_info: &mut Option<LogFileInfo>,
meta: &mut Option<SessionMeta>,
buffered_items: &mut Vec<RolloutItem>,
cwd: &Path,
rollout_path: &Path,
state_db_ctx: Option<&StateRuntime>,
state_builder: &mut Option<ThreadMetadataBuilder>,
default_provider: &str,
generate_memories: bool,
) -> std::io::Result<()> {
if writer.is_some() {
return Ok(());
}
let Some(log_file_info) = deferred_log_file_info.take() else {
return Err(IoError::other(
"deferred rollout recorder missing log file metadata",
));
};
let file = open_log_file(log_file_info.path.as_path())?;
*writer = Some(JsonlWriter {
file: tokio::fs::File::from_std(file),
});
if let Some(session_meta) = meta.take() {
write_session_meta(
writer.as_mut(),
session_meta,
cwd,
rollout_path,
state_db_ctx,
state_builder,
default_provider,
generate_memories,
)
.await?;
}
if !buffered_items.is_empty() {
write_and_reconcile_items(
writer.as_mut(),
buffered_items.as_slice(),
rollout_path,
state_db_ctx,
state_builder.as_ref(),
default_provider,
)
.await?;
buffered_items.clear();
}
Ok(())
}
#[allow(clippy::too_many_arguments)]
async fn write_session_meta(
mut writer: Option<&mut JsonlWriter>,

View File

@@ -60,6 +60,7 @@ pub async fn append_session_index_entry(
line.push('\n');
file.write_all(line.as_bytes()).await?;
file.flush().await?;
file.sync_all().await?;
Ok(())
}

View File

@@ -69,6 +69,10 @@ pub use remote_control::RemoteControlEnrollmentRecord;
// metadata, rather than the exact sum of all persisted SQLite column bytes.
const LOG_PARTITION_SIZE_LIMIT_BYTES: i64 = 10 * 1024 * 1024;
const LOG_PARTITION_ROW_LIMIT: i64 = 1_000;
const CHECKPOINT_WAL_BUSY_MAX_ATTEMPTS: usize = 10;
const CHECKPOINT_WAL_BUSY_TIMEOUT_MS: u64 = 100;
const SQLITE_BUSY_TIMEOUT: Duration = Duration::from_secs(5);
const SQLITE_BUSY_TIMEOUT_MS: u64 = 5_000;
#[derive(Clone)]
pub struct StateRuntime {
@@ -139,6 +143,53 @@ impl StateRuntime {
pub fn codex_home(&self) -> &Path {
self.codex_home.as_path()
}
/// Checkpoint both runtime SQLite databases so WAL contents are persisted to the main files.
pub async fn checkpoint_wal(&self) -> anyhow::Result<()> {
checkpoint_wal_pool(self.pool.as_ref(), "state").await?;
checkpoint_wal_pool(self.logs_pool.as_ref(), "logs").await?;
Ok(())
}
}
async fn checkpoint_wal_pool(pool: &SqlitePool, name: &str) -> anyhow::Result<()> {
let mut conn = pool.acquire().await?;
sqlx::query(&format!(
"PRAGMA busy_timeout = {CHECKPOINT_WAL_BUSY_TIMEOUT_MS}"
))
.execute(&mut *conn)
.await?;
let checkpoint_result = async {
let mut last_busy_result = None;
for attempt in 1..=CHECKPOINT_WAL_BUSY_MAX_ATTEMPTS {
let row = sqlx::query("PRAGMA wal_checkpoint(TRUNCATE)")
.fetch_one(&mut *conn)
.await?;
let busy: i64 = row.try_get(0)?;
let log_pages: i64 = row.try_get(1)?;
let checkpointed_pages: i64 = row.try_get(2)?;
if busy == 0 {
return Ok(());
}
last_busy_result = Some((checkpointed_pages, log_pages));
if attempt < CHECKPOINT_WAL_BUSY_MAX_ATTEMPTS {
tokio::time::sleep(Duration::from_millis(50)).await;
}
}
let (checkpointed_pages, log_pages) = last_busy_result.unwrap_or((0, 0));
anyhow::bail!(
"{name} WAL checkpoint was busy: {checkpointed_pages}/{log_pages} pages checkpointed"
)
};
let checkpoint_result: anyhow::Result<()> = checkpoint_result.await;
let restore_result = sqlx::query(&format!("PRAGMA busy_timeout = {SQLITE_BUSY_TIMEOUT_MS}"))
.execute(&mut *conn)
.await;
restore_result?;
checkpoint_result
}
fn base_sqlite_options(path: &Path) -> SqliteConnectOptions {
@@ -147,7 +198,7 @@ fn base_sqlite_options(path: &Path) -> SqliteConnectOptions {
.create_if_missing(true)
.journal_mode(SqliteJournalMode::Wal)
.synchronous(SqliteSynchronous::Normal)
.busy_timeout(Duration::from_secs(5))
.busy_timeout(SQLITE_BUSY_TIMEOUT)
.log_statements(LevelFilter::Off)
}