back comp

This commit is contained in:
Ahmed Ibrahim
2025-09-05 10:56:35 -07:00
parent 777fbba58c
commit 0c732e4a53
3 changed files with 81 additions and 6 deletions

View File

@@ -3,3 +3,5 @@
This crate is designed for utilities that need to be shared across other crates in the workspace, but should not go in `core`.
For narrow utility features, the pattern is to add introduce a new feature under `[features]` in `Cargo.toml` and then gate it with `#[cfg]` in `lib.rs`, as appropriate.
high

View File

@@ -581,14 +581,42 @@ impl Session {
async fn record_initial_history_resumed(&self, items: Vec<RolloutItem>) -> Vec<EventMsg> {
let mut responses: Vec<ResponseItem> = Vec::new();
// Include everything before we see a session meta marker; after that, include only user messages
let before_resume_session = items
.get(0)
.map(|it| !matches!(it, RolloutItem::SessionMeta(..)))
.unwrap_or(true);
let mut msgs = Vec::new();
for item in items.clone() {
if let RolloutItem::ResponseItem(v) = item {
responses.extend(v);
match item {
RolloutItem::ResponseItem(ref v) => {
responses.extend(v.clone());
let new_msgs: Vec<EventMsg> = v
.iter()
.flat_map(|ri| {
map_response_item_to_event_messages(ri, self.show_raw_agent_reasoning)
})
.collect();
if before_resume_session {
msgs.extend(new_msgs);
} else {
msgs.extend(
new_msgs
.into_iter()
.filter(|m| matches!(m, EventMsg::UserMessage(_))),
);
}
}
RolloutItem::Event(events) => msgs.extend(events.iter().map(|e| e.msg.clone())),
RolloutItem::SessionMeta(..) => {}
}
}
if !responses.is_empty() {
self.record_conversation_items_internal(&responses, false).await;
}
msgs
}
/// Sends the given event to the client and records it to the rollout (if enabled).

View File

@@ -38,11 +38,14 @@ use codex_protocol::models::ResponseItem;
pub struct SessionMeta {
pub id: ConversationId,
pub timestamp: String,
pub cwd: String,
pub originator: String,
pub cli_version: String,
pub instructions: Option<String>,
}
#[derive(Serialize)]
struct SessionMetaWithGit {
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct SessionMetaWithGit {
#[serde(flatten)]
meta: SessionMeta,
#[serde(skip_serializing_if = "Option::is_none")]
@@ -87,10 +90,18 @@ pub enum RolloutRecorderParams {
},
}
#[derive(Serialize)]
struct SessionMetaLine<'a> {
record_type: &'static str,
#[serde(flatten)]
meta: &'a SessionMetaWithGit,
}
#[derive(Debug, Clone)]
pub enum RolloutItem {
ResponseItem(Vec<ResponseItem>),
Event(Vec<Event>),
SessionMeta(SessionMetaWithGit),
}
impl<T> From<T> for RolloutItem
@@ -105,6 +116,7 @@ where
enum RolloutCmd {
AddResponseItems(Vec<ResponseItem>),
AddEvent(Vec<Event>),
AddSessionMeta(SessionMetaWithGit),
Shutdown { ack: oneshot::Sender<()> },
}
@@ -160,6 +172,9 @@ impl RolloutRecorder {
Some(SessionMeta {
timestamp,
id: session_id,
cwd: config.cwd.to_string_lossy().to_string(),
originator: config.responses_originator_header.clone(),
cli_version: env!("CARGO_PKG_VERSION").to_string(),
instructions,
}),
)
@@ -193,6 +208,7 @@ impl RolloutRecorder {
match item {
RolloutItem::ResponseItem(items) => self.record_response_items(&items).await,
RolloutItem::Event(events) => self.record_event(&events).await,
RolloutItem::SessionMeta(meta) => self.record_session_meta(&meta).await,
}
}
@@ -231,6 +247,13 @@ impl RolloutRecorder {
.map_err(|e| IoError::other(format!("failed to queue rollout event: {e}")))
}
async fn record_session_meta(&self, meta: &SessionMetaWithGit) -> std::io::Result<()> {
self.tx
.send(RolloutCmd::AddSessionMeta(meta.clone()))
.await
.map_err(|e| IoError::other(format!("failed to queue rollout session meta: {e}")))
}
pub async fn get_rollout_history(path: &Path) -> std::io::Result<InitialHistory> {
info!("Resuming rollout from {path:?}");
tracing::error!("Resuming rollout from {path:?}");
@@ -275,6 +298,16 @@ impl RolloutRecorder {
Err(e) => warn!("failed to parse event: {v:?}, error: {e}"),
}
}
Some("prev_session_meta") | Some("session_meta") => {
let mut meta_val = v.clone();
if let Some(obj) = meta_val.as_object_mut() {
obj.remove("record_type");
}
match serde_json::from_value::<SessionMetaWithGit>(meta_val) {
Ok(meta) => items.push(RolloutItem::SessionMeta(meta)),
Err(e) => warn!("failed to parse prev_session_meta: {v:?}, error: {e}"),
}
}
Some("response") | None => {
match serde_json::from_value::<ResponseItem>(v.clone()) {
Ok(item) => {
@@ -392,9 +425,13 @@ async fn rollout_writer(
meta: session_meta,
git: git_info,
};
// Write the SessionMeta as the first item in the file
writer.write_line(&session_meta_with_git).await?;
writer
.write_line(&SessionMetaLine {
record_type: "session_meta",
meta: &session_meta_with_git,
})
.await?;
}
// Process rollout commands
@@ -423,6 +460,14 @@ async fn rollout_writer(
.await?;
}
}
RolloutCmd::AddSessionMeta(meta) => {
writer
.write_line(&SessionMetaLine {
record_type: "prev_session_meta",
meta: &meta,
})
.await?;
}
RolloutCmd::Shutdown { ack } => {
let _ = ack.send(());
}