revert recorder

This commit is contained in:
jif-oai
2026-03-06 11:37:17 +01:00
parent 7753a33a58
commit 1da0c25b0a

View File

@@ -18,8 +18,6 @@ use time::format_description::well_known::Rfc3339;
use time::macros::format_description;
use tokio::io::AsyncWriteExt;
use tokio::sync::mpsc::Sender;
use tokio::sync::mpsc::UnboundedReceiver;
use tokio::sync::mpsc::UnboundedSender;
use tokio::sync::mpsc::{self};
use tokio::sync::oneshot;
use tracing::info;
@@ -57,6 +55,7 @@ use codex_protocol::protocol::RolloutLine;
use codex_protocol::protocol::SessionMeta;
use codex_protocol::protocol::SessionMetaLine;
use codex_protocol::protocol::SessionSource;
use codex_state::StateRuntime;
use codex_state::ThreadMetadataBuilder;
/// Records all [`ResponseItem`]s for a session and flushes them to disk after
@@ -97,7 +96,7 @@ enum RolloutCmd {
Persist {
ack: oneshot::Sender<()>,
},
/// Ensure all prior file writes are processed; respond when flushed.
/// Ensure all prior writes are processed; respond when flushed.
Flush {
ack: oneshot::Sender<()>,
},
@@ -106,13 +105,6 @@ enum RolloutCmd {
},
}
type StateDbSyncPayload = (
PathBuf,
Option<ThreadMetadataBuilder>,
Vec<RolloutItem>,
Option<String>,
);
impl RolloutRecorderParams {
pub fn new(
conversation_id: ThreadId,
@@ -457,15 +449,6 @@ impl RolloutRecorder {
// future will yield, which is fine we only need to ensure we do not
// perform *blocking* I/O on the caller's thread.
let (tx, rx) = mpsc::channel::<RolloutCmd>(256);
let state_db_tx = state_db_ctx.as_ref().map(|state_db| {
let (tx, rx) = mpsc::unbounded_channel::<StateDbSyncPayload>();
tokio::task::spawn(state_db_sync_writer(
state_db.clone(),
rx,
config.model_provider_id.clone(),
));
tx
});
// Spawn a Tokio task that owns the file handle and performs async
// writes. Using `tokio::fs::File` keeps everything on the async I/O
// driver instead of blocking the runtime.
@@ -473,12 +456,12 @@ impl RolloutRecorder {
file,
deferred_log_file_info,
rx,
state_db_tx,
meta,
cwd,
rollout_path.clone(),
state_db_ctx.clone(),
state_builder,
state_db_ctx.is_some(),
config.model_provider_id.clone(),
config.memories.generate_memories,
));
@@ -726,12 +709,12 @@ async fn rollout_writer(
file: Option<tokio::fs::File>,
mut deferred_log_file_info: Option<LogFileInfo>,
mut rx: mpsc::Receiver<RolloutCmd>,
state_db_tx: Option<UnboundedSender<StateDbSyncPayload>>,
mut meta: Option<SessionMeta>,
cwd: std::path::PathBuf,
rollout_path: PathBuf,
state_db_ctx: Option<StateDbHandle>,
mut state_builder: Option<ThreadMetadataBuilder>,
has_state_db: bool,
default_provider: String,
generate_memories: bool,
) -> std::io::Result<()> {
let mut writer = file.map(|file| JsonlWriter { file });
@@ -747,12 +730,12 @@ async fn rollout_writer(
{
write_session_meta(
writer.as_mut(),
state_db_tx.as_ref(),
session_meta,
&cwd,
&rollout_path,
state_db_ctx.as_deref(),
&mut state_builder,
has_state_db,
default_provider.as_str(),
generate_memories,
)
.await?;
@@ -773,10 +756,11 @@ async fn rollout_writer(
write_and_reconcile_items(
writer.as_mut(),
state_db_tx.as_ref(),
items.as_slice(),
&rollout_path,
state_db_ctx.as_deref(),
state_builder.as_ref(),
default_provider.as_str(),
)
.await?;
}
@@ -796,12 +780,12 @@ async fn rollout_writer(
if let Some(session_meta) = meta.take() {
write_session_meta(
writer.as_mut(),
state_db_tx.as_ref(),
session_meta,
&cwd,
&rollout_path,
state_db_ctx.as_deref(),
&mut state_builder,
has_state_db,
default_provider.as_str(),
generate_memories,
)
.await?;
@@ -810,10 +794,11 @@ async fn rollout_writer(
if !buffered_items.is_empty() {
write_and_reconcile_items(
writer.as_mut(),
state_db_tx.as_ref(),
buffered_items.as_slice(),
&rollout_path,
state_db_ctx.as_deref(),
state_builder.as_ref(),
default_provider.as_str(),
)
.await?;
buffered_items.clear();
@@ -853,12 +838,12 @@ async fn rollout_writer(
#[allow(clippy::too_many_arguments)]
async fn write_session_meta(
mut writer: Option<&mut JsonlWriter>,
state_db_tx: Option<&UnboundedSender<StateDbSyncPayload>>,
session_meta: SessionMeta,
cwd: &Path,
rollout_path: &Path,
state_db_ctx: Option<&StateRuntime>,
state_builder: &mut Option<ThreadMetadataBuilder>,
has_state_db: bool,
default_provider: &str,
generate_memories: bool,
) -> std::io::Result<()> {
let git_info = collect_git_info(cwd).await;
@@ -866,7 +851,7 @@ async fn write_session_meta(
meta: session_meta,
git: git_info,
};
if has_state_db {
if state_db_ctx.is_some() {
*state_builder = metadata::builder_from_session_meta(&session_meta_line, rollout_path);
}
@@ -874,82 +859,49 @@ async fn write_session_meta(
if let Some(writer) = writer.as_mut() {
writer.write_rollout_item(&rollout_item).await?;
}
enqueue_thread_state_sync(
state_db_tx,
sync_thread_state_after_write(
state_db_ctx,
rollout_path,
state_builder.as_ref(),
std::slice::from_ref(&rollout_item),
default_provider,
(!generate_memories).then_some("disabled"),
)?;
)
.await;
Ok(())
}
async fn write_and_reconcile_items(
mut writer: Option<&mut JsonlWriter>,
state_db_tx: Option<&UnboundedSender<StateDbSyncPayload>>,
items: &[RolloutItem],
rollout_path: &Path,
state_db_ctx: Option<&StateRuntime>,
state_builder: Option<&ThreadMetadataBuilder>,
default_provider: &str,
) -> std::io::Result<()> {
if let Some(writer) = writer.as_mut() {
for item in items {
writer.write_rollout_item(item).await?;
}
}
enqueue_thread_state_sync(state_db_tx, rollout_path, state_builder, items, None)?;
Ok(())
}
fn enqueue_thread_state_sync(
state_db_tx: Option<&UnboundedSender<StateDbSyncPayload>>,
rollout_path: &Path,
state_builder: Option<&ThreadMetadataBuilder>,
items: &[RolloutItem],
new_thread_memory_mode: Option<&str>,
) -> std::io::Result<()> {
if items.is_empty() && new_thread_memory_mode.is_none() {
return Ok(());
}
let Some(state_db_tx) = state_db_tx else {
return Ok(());
};
state_db_tx
.send((
rollout_path.to_path_buf(),
state_builder.cloned(),
items.to_vec(),
new_thread_memory_mode.map(str::to_owned),
))
.map_err(|e| IoError::other(format!("failed to queue state db sync: {e}")))
}
async fn state_db_sync_writer(
state_db_ctx: StateDbHandle,
mut rx: UnboundedReceiver<StateDbSyncPayload>,
default_provider: String,
) -> std::io::Result<()> {
while let Some((rollout_path, state_builder, items, new_thread_memory_mode)) = rx.recv().await {
sync_thread_state_after_write(
Some(state_db_ctx.as_ref()),
rollout_path.as_path(),
default_provider.as_str(),
state_builder.as_ref(),
items.as_slice(),
new_thread_memory_mode.as_deref(),
)
.await;
}
sync_thread_state_after_write(
state_db_ctx,
rollout_path,
state_builder,
items,
default_provider,
None,
)
.await;
Ok(())
}
async fn sync_thread_state_after_write(
state_db_ctx: Option<&codex_state::StateRuntime>,
state_db_ctx: Option<&StateRuntime>,
rollout_path: &Path,
default_provider: &str,
state_builder: Option<&ThreadMetadataBuilder>,
items: &[RolloutItem],
default_provider: &str,
new_thread_memory_mode: Option<&str>,
) {
let updated_at = Utc::now();
@@ -1419,9 +1371,9 @@ mod tests {
sync_thread_state_after_write(
Some(state_db.as_ref()),
rollout_path.as_path(),
config.model_provider_id.as_str(),
Some(&builder),
items.as_slice(),
config.model_provider_id.as_str(),
None,
)
.await;