Durably flush shutdown state

This commit is contained in:
Eric Traut
2026-04-06 23:34:28 -07:00
parent 24c598e8a9
commit 6a44a244d1
4 changed files with 80 additions and 21 deletions

View File

@@ -5557,6 +5557,11 @@ mod handlers {
};
sess.send_event_raw(event).await;
}
if let Some(state_db) = sess.services.state_db.as_deref()
&& let Err(e) = state_db.checkpoint_wal().await
{
warn!("failed to checkpoint state db WAL during shutdown: {e}");
}
let event = Event {
id: sub_id,

View File

@@ -94,14 +94,14 @@ pub enum RolloutRecorderParams {
enum RolloutCmd {
AddItems(Vec<RolloutItem>),
Persist {
ack: oneshot::Sender<()>,
ack: oneshot::Sender<std::io::Result<()>>,
},
/// Ensure all prior writes are processed; respond when flushed.
Flush {
ack: oneshot::Sender<()>,
ack: oneshot::Sender<std::io::Result<()>>,
},
Shutdown {
ack: oneshot::Sender<()>,
ack: oneshot::Sender<std::io::Result<()>>,
},
}
@@ -514,7 +514,7 @@ impl RolloutRecorder {
.await
.map_err(|e| IoError::other(format!("failed to queue rollout persist: {e}")))?;
rx.await
.map_err(|e| IoError::other(format!("failed waiting for rollout persist: {e}")))
.map_err(|e| IoError::other(format!("failed waiting for rollout persist: {e}")))?
}
/// Flush all queued writes and wait until they are committed by the writer task.
@@ -525,7 +525,7 @@ impl RolloutRecorder {
.await
.map_err(|e| IoError::other(format!("failed to queue rollout flush: {e}")))?;
rx.await
.map_err(|e| IoError::other(format!("failed waiting for rollout flush: {e}")))
.map_err(|e| IoError::other(format!("failed waiting for rollout flush: {e}")))?
}
pub async fn load_rollout_items(
@@ -613,9 +613,9 @@ impl RolloutRecorder {
pub async fn shutdown(&self) -> std::io::Result<()> {
let (tx_done, rx_done) = oneshot::channel();
match self.tx.send(RolloutCmd::Shutdown { ack: tx_done }).await {
Ok(_) => rx_done
.await
.map_err(|e| IoError::other(format!("failed waiting for rollout shutdown: {e}")))?,
Ok(_) => rx_done.await.map_err(|e| {
IoError::other(format!("failed waiting for rollout shutdown: {e}"))
})??,
Err(e) => {
warn!("failed to send rollout shutdown command: {e}");
return Err(IoError::other(format!(
@@ -805,36 +805,78 @@ async fn rollout_writer(
buffered_items.clear();
}
flush_writer(writer.as_mut()).await?;
Ok(())
}
.await;
if let Err(err) = result {
let _ = ack.send(());
return Err(err);
match result {
Ok(()) => {
let _ = ack.send(Ok(()));
}
Err(err) => {
let return_err = clone_io_error(&err);
let _ = ack.send(Err(err));
return Err(return_err);
}
}
} else if let Err(err) = flush_writer(writer.as_mut()).await {
let return_err = clone_io_error(&err);
let _ = ack.send(Err(err));
return Err(return_err);
} else {
let _ = ack.send(Ok(()));
}
let _ = ack.send(());
}
RolloutCmd::Flush { ack } => {
// Deferred fresh threads may not have an initialized file yet.
if let Some(writer) = writer.as_mut()
&& let Err(e) = writer.file.flush().await
{
let _ = ack.send(());
return Err(e);
match flush_writer(writer.as_mut()).await {
Ok(()) => {
let _ = ack.send(Ok(()));
}
Err(err) => {
let return_err = clone_io_error(&err);
let _ = ack.send(Err(err));
return Err(return_err);
}
}
let _ = ack.send(());
}
RolloutCmd::Shutdown { ack } => {
let _ = ack.send(());
}
RolloutCmd::Shutdown { ack } => match sync_writer(writer.as_mut()).await {
Ok(()) => {
let _ = ack.send(Ok(()));
break;
}
Err(err) => {
let return_err = clone_io_error(&err);
let _ = ack.send(Err(err));
return Err(return_err);
}
},
}
}
Ok(())
}
fn clone_io_error(err: &IoError) -> IoError {
IoError::new(err.kind(), err.to_string())
}
async fn sync_writer(writer: Option<&mut JsonlWriter>) -> std::io::Result<()> {
if let Some(writer) = writer {
writer.file.flush().await?;
writer.file.sync_all().await?;
}
Ok(())
}
async fn flush_writer(writer: Option<&mut JsonlWriter>) -> std::io::Result<()> {
if let Some(writer) = writer {
writer.file.flush().await?;
}
Ok(())
}
#[allow(clippy::too_many_arguments)]
async fn write_session_meta(
mut writer: Option<&mut JsonlWriter>,

View File

@@ -60,6 +60,7 @@ pub async fn append_session_index_entry(
line.push('\n');
file.write_all(line.as_bytes()).await?;
file.flush().await?;
file.sync_all().await?;
Ok(())
}

View File

@@ -139,6 +139,17 @@ impl StateRuntime {
pub fn codex_home(&self) -> &Path {
self.codex_home.as_path()
}
/// Checkpoint both runtime SQLite databases so WAL contents are persisted to the main files.
pub async fn checkpoint_wal(&self) -> anyhow::Result<()> {
sqlx::query("PRAGMA wal_checkpoint(TRUNCATE)")
.execute(self.pool.as_ref())
.await?;
sqlx::query("PRAGMA wal_checkpoint(TRUNCATE)")
.execute(self.logs_pool.as_ref())
.await?;
Ok(())
}
}
fn base_sqlite_options(path: &Path) -> SqliteConnectOptions {