recorder: document Flush as a channel barrier (OS flush), not fsync; clarify writer ack semantics

This commit is contained in:
Ahmed Ibrahim
2025-09-08 11:34:52 -07:00
parent e9b78c9296
commit 5f18406e8d

View File

@@ -117,10 +117,37 @@ impl From<Event> for RolloutItem {
}
}
/// Convenience helpers to extract typed items from a list of rollout items.
pub trait RolloutItemSliceExt {
fn get_response_items(&self) -> Vec<ResponseItem>;
fn get_events(&self) -> Vec<Event>;
}
impl RolloutItemSliceExt for [RolloutItem] {
fn get_response_items(&self) -> Vec<ResponseItem> {
self.iter()
.filter_map(|it| match it {
RolloutItem::ResponseItem(ri) => Some(ri.clone()),
_ => None,
})
.collect()
}
fn get_events(&self) -> Vec<Event> {
self.iter()
.filter_map(|it| match it {
RolloutItem::Event(ev) => Some(ev.clone()),
_ => None,
})
.collect()
}
}
enum RolloutCmd {
AddResponseItems(Vec<ResponseItem>),
AddEvents(Vec<Event>),
AddSessionMeta(SessionMetaWithGit),
Flush { ack: oneshot::Sender<()> },
Shutdown { ack: oneshot::Sender<()> },
}
@@ -195,6 +222,28 @@ impl RolloutRecorder {
}
}
/// Ensure all writes up to this point have been processed by the writer task.
///
/// This is a sequencing barrier for readers that plan to open and read the
/// rollout file immediately after calling this method. The background writer
/// processes the channel serially; when it dequeues `Flush`, all prior
/// `AddResponseItems`/`AddEvents`/`AddSessionMeta` have already been written
/// via `write_line`, which calls `file.flush()` (OSbuffer flush).
///
/// Note: this does NOT perform an fsync (`sync_data`/`sync_all`). If durable
/// persistence is required (powerloss safety), we should add that here or
/// provide a separate method.
pub async fn flush(&self) -> std::io::Result<()> {
let (tx_done, rx_done) = oneshot::channel();
self.tx
.send(RolloutCmd::Flush { ack: tx_done })
.await
.map_err(|e| IoError::other(format!("failed to queue rollout flush: {e}")))?;
rx_done
.await
.map_err(|e| IoError::other(format!("failed waiting for rollout flush: {e}")))
}
async fn record_response_item(&self, item: &ResponseItem) -> std::io::Result<()> {
// Note that function calls may look a bit strange if they are
// "fully qualified MCP tool calls," so we could consider
@@ -435,6 +484,11 @@ async fn rollout_writer(
.await?;
}
}
// Sequencing barrier: by the time we handle `Flush`, all previously
// queued writes have been applied and flushed to OS buffers.
RolloutCmd::Flush { ack } => {
let _ = ack.send(());
}
RolloutCmd::AddSessionMeta(meta) => {
writer
.write_line(&SessionMetaLine {