remove flakyness to another PR

This commit is contained in:
Ahmed Ibrahim
2025-07-21 20:09:49 -07:00
parent 79dce68a0b
commit b5a12a3fd5
2 changed files with 0 additions and 74 deletions

View File

@@ -807,20 +807,6 @@ async fn submission_loop(
}
}
}
// Gracefully flush and shutdown rollout recorder on session end so tests
// that inspect the rollout file do not race with the background writer.
if let Some(sess_arc) = sess {
let recorder_opt = {
let mut guard = sess_arc.rollout.lock().unwrap();
guard.take()
};
if let Some(rec) = recorder_opt {
if let Err(e) = rec.shutdown().await {
warn!("failed to shutdown rollout recorder: {e}");
}
}
}
debug!("Agent loop exited");
}
@@ -1021,15 +1007,6 @@ async fn run_task(sess: Arc<Session>, sub_id: String, input: Vec<InputItem>) {
}
}
}
// Flush rollout so that all recorded items for this task are durable before TaskComplete.
if let Some(rec) = {
let guard = sess.rollout.lock().unwrap();
guard.as_ref().cloned()
} {
if let Err(e) = rec.sync().await {
warn!("failed to flush rollout at task end: {e}");
}
}
sess.remove_task(&sub_id);
let event = Event {
id: sub_id,

View File

@@ -14,7 +14,6 @@ use time::macros::format_description;
use tokio::io::AsyncWriteExt;
use tokio::sync::mpsc::Sender;
use tokio::sync::mpsc::{self};
use tokio::sync::oneshot;
use tracing::info;
use tracing::warn;
use uuid::Uuid;
@@ -63,10 +62,6 @@ pub(crate) struct RolloutRecorder {
enum RolloutCmd {
AddItems(Vec<ResponseItem>),
UpdateState(SessionStateSnapshot),
Sync {
exit: bool,
ack: oneshot::Sender<()>,
},
}
impl RolloutRecorder {
@@ -205,44 +200,7 @@ impl RolloutRecorder {
info!("Resumed rollout successfully from {path:?}");
Ok((Self { tx }, saved))
}
pub async fn sync(&self) -> std::io::Result<()> {
let (tx_done, rx_done) = oneshot::channel();
if let Err(e) = self
.tx
.send(RolloutCmd::Sync {
exit: false,
ack: tx_done,
})
.await
{
warn!("failed to send rollout sync command: {e}");
return Ok(());
}
rx_done
.await
.map_err(|e| IoError::other(format!("failed waiting for rollout sync: {e}")))
}
pub async fn shutdown(&self) -> std::io::Result<()> {
let (tx_done, rx_done) = oneshot::channel();
if let Err(e) = self
.tx
.send(RolloutCmd::Sync {
exit: true,
ack: tx_done,
})
.await
{
warn!("failed to send rollout shutdown command: {e}");
return Ok(());
}
rx_done
.await
.map_err(|e| IoError::other(format!("failed waiting for rollout shutdown: {e}")))
}
}
struct LogFileInfo {
/// Opened file handle to the rollout file.
file: File,
@@ -348,15 +306,6 @@ async fn rollout_writer(
warn!("Failed to write state: {e}");
}
}
RolloutCmd::Sync { exit, ack } => {
if let Err(e) = file.flush().await {
warn!("Failed to flush on sync: {e}");
}
let _ = ack.send(());
if exit {
break;
}
}
}
}
}