This commit is contained in:
Ahmed Ibrahim
2025-09-08 11:54:02 -07:00
parent b7c95b57fd
commit 2354594eeb
2 changed files with 126 additions and 96 deletions

View File

@@ -10,7 +10,6 @@ use codex_protocol::mcp_protocol::ConversationId;
use codex_protocol::protocol::Event;
use serde::Deserialize;
use serde::Serialize;
use serde_json::Value;
use time::OffsetDateTime;
use time::format_description::FormatItem;
use time::macros::format_description;
@@ -52,7 +51,7 @@ pub struct SessionMetaWithGit {
git: Option<GitInfo>,
}
#[derive(Serialize, Deserialize, Default, Clone, Debug)]
#[derive(Serialize, Deserialize, Default, Clone)]
pub struct SessionStateSnapshot {}
#[derive(Serialize, Deserialize, Default, Clone)]
@@ -80,22 +79,36 @@ pub struct RolloutRecorder {
path: PathBuf,
}
#[derive(Clone)]
pub enum RolloutRecorderParams {
Create {
conversation_id: ConversationId,
instructions: Option<String>,
#[derive(Serialize, Deserialize, Debug, Clone)]
#[serde(tag = "record_type", rename_all = "snake_case")]
enum TaggedLine {
Response {
#[serde(flatten)]
item: ResponseItem,
},
Resume {
path: PathBuf,
Event {
#[serde(flatten)]
event: Event,
},
SessionMeta {
#[serde(flatten)]
meta: SessionMetaWithGit,
},
PrevSessionMeta {
#[serde(flatten)]
meta: SessionMetaWithGit,
},
State {
#[serde(flatten)]
state: SessionStateSnapshot,
},
}
#[derive(Serialize)]
struct SessionMetaLine<'a> {
record_type: &'static str,
#[derive(Serialize, Deserialize, Debug, Clone)]
struct TimestampedLine {
timestamp: String,
#[serde(flatten)]
meta: &'a SessionMetaWithGit,
record: TaggedLine,
}
#[derive(Debug, Clone)]
@@ -151,19 +164,6 @@ enum RolloutCmd {
Shutdown { ack: oneshot::Sender<()> },
}
impl RolloutRecorderParams {
pub fn new(conversation_id: ConversationId, instructions: Option<String>) -> Self {
Self::Create {
conversation_id,
instructions,
}
}
pub fn resume(path: PathBuf) -> Self {
Self::Resume { path }
}
}
impl RolloutRecorder {
pub fn path(&self) -> &Path {
&self.path
@@ -193,16 +193,21 @@ impl RolloutRecorder {
path,
} = create_log_file(config, conversation_id)?;
let timestamp_format: &[FormatItem] = format_description!(
"[year]-[month]-[day]T[hour]:[minute]:[second].[subsecond digits:3]Z"
);
let fixed_offset = timestamp.offset();
let timestamp_format: &[FormatItem] =
format_description!("[year]-[month]-[day]T[hour]:[minute]:[second].[subsecond digits:3]");
let timestamp = timestamp
.to_offset(time::UtcOffset::UTC)
.to_offset(fixed_offset)
.format(timestamp_format)
.map_err(|e| IoError::other(format!("failed to format timestamp: {e}")))?;
(
let cwd = config.cwd.to_path_buf();
let (tx, rx) = mpsc::channel(100);
tokio::task::spawn(rollout_writer(
tokio::fs::File::from_std(file),
rx,
Some(SessionMeta {
timestamp,
id: session_id,
@@ -211,7 +216,11 @@ impl RolloutRecorder {
cli_version: env!("CARGO_PKG_VERSION").to_string(),
instructions,
}),
)
cwd,
fixed_offset,
));
Ok(Self { tx, path })
}
pub(crate) async fn record_items(&self, item: RolloutItem) -> std::io::Result<()> {
@@ -298,47 +307,32 @@ impl RolloutRecorder {
if line.trim().is_empty() {
continue;
}
let v: Value = match serde_json::from_str(line) {
Ok(v) => v,
Err(_) => continue,
};
match v.get("record_type").and_then(|rt| rt.as_str()) {
Some("state") => continue,
Some("event") => {
let mut ev_val = v.clone();
if let Some(obj) = ev_val.as_object_mut() {
obj.remove("record_type");
}
match serde_json::from_value::<Event>(ev_val) {
Ok(ev) => items.push(RolloutItem::Event(ev)),
Err(e) => warn!("failed to parse event: {v:?}, error: {e}"),
match serde_json::from_str::<TimestampedLine>(line) {
Ok(TimestampedLine {
record: TaggedLine::State { .. },
..
}) => {}
Ok(TimestampedLine {
record: TaggedLine::Event { event },
..
}) => items.push(RolloutItem::Event(event)),
Ok(TimestampedLine {
record: TaggedLine::SessionMeta { meta },
..
})
| Ok(TimestampedLine {
record: TaggedLine::PrevSessionMeta { meta },
..
}) => items.push(RolloutItem::SessionMeta(meta)),
Ok(TimestampedLine {
record: TaggedLine::Response { item },
..
}) => {
if is_persisted_response_item(&item) {
items.push(RolloutItem::ResponseItem(item));
}
}
Some("prev_session_meta") | Some("session_meta") => {
let mut meta_val = v.clone();
if let Some(obj) = meta_val.as_object_mut() {
obj.remove("record_type");
}
match serde_json::from_value::<SessionMetaWithGit>(meta_val) {
Ok(meta) => items.push(RolloutItem::SessionMeta(meta)),
Err(e) => warn!("failed to parse prev_session_meta: {v:?}, error: {e}"),
}
}
Some("response") | None => {
match serde_json::from_value::<ResponseItem>(v.clone()) {
Ok(item) => {
if is_persisted_response_item(&item) {
items.push(RolloutItem::ResponseItem(item));
}
}
Err(e) => {
warn!("failed to parse response item: {v:?}, error: {e}");
}
}
}
Some(other) => {
warn!("unknown record_type in rollout: {other}");
}
Err(_) => warn!("failed to parse rollout line: {line}"),
}
}
@@ -435,8 +429,9 @@ async fn rollout_writer(
mut rx: mpsc::Receiver<RolloutCmd>,
mut meta: Option<SessionMeta>,
cwd: std::path::PathBuf,
fixed_offset: time::UtcOffset,
) -> std::io::Result<()> {
let mut writer = JsonlWriter { file };
let mut writer = JsonlWriter { file, fixed_offset };
// If we have a meta, collect git info asynchronously and write meta first
if let Some(session_meta) = meta.take() {
@@ -447,9 +442,8 @@ async fn rollout_writer(
};
// Write the SessionMeta as the first item in the file
writer
.write_line(&SessionMetaLine {
record_type: "session_meta",
meta: &session_meta_with_git,
.write_tagged(TaggedLine::SessionMeta {
meta: session_meta_with_git,
})
.await?;
}
@@ -460,24 +454,13 @@ async fn rollout_writer(
RolloutCmd::AddResponseItems(items) => {
for item in items {
if is_persisted_response_item(&item) {
writer.write_line(&item).await?;
writer.write_tagged(TaggedLine::Response { item }).await?;
}
}
}
RolloutCmd::AddEvents(events) => {
for event in events {
#[derive(Serialize)]
struct EventLine<'a> {
record_type: &'static str,
#[serde(flatten)]
event: &'a Event,
}
writer
.write_line(&EventLine {
record_type: "event",
event: &event,
})
.await?;
writer.write_tagged(TaggedLine::Event { event }).await?;
}
}
// Sequencing barrier: by the time we handle `Flush`, all previously
@@ -487,10 +470,7 @@ async fn rollout_writer(
}
RolloutCmd::AddSessionMeta(meta) => {
writer
.write_line(&SessionMetaLine {
record_type: "prev_session_meta",
meta: &meta,
})
.write_tagged(TaggedLine::PrevSessionMeta { meta })
.await?;
}
RolloutCmd::Shutdown { ack } => {
@@ -504,6 +484,7 @@ async fn rollout_writer(
struct JsonlWriter {
file: tokio::fs::File,
fixed_offset: time::UtcOffset,
}
impl JsonlWriter {
@@ -514,4 +495,15 @@ impl JsonlWriter {
self.file.flush().await?;
Ok(())
}
async fn write_tagged(&mut self, record: TaggedLine) -> std::io::Result<()> {
let ts_format: &[FormatItem] =
format_description!("[year]-[month]-[day]T[hour]:[minute]:[second].[subsecond digits:3]");
let now = time::OffsetDateTime::now_utc().to_offset(self.fixed_offset);
let timestamp = now
.format(ts_format)
.map_err(|e| IoError::other(format!("failed to format timestamp: {e}")))?;
let line = TimestampedLine { timestamp, record };
self.write_line(&line).await
}
}

View File

@@ -127,7 +127,14 @@ async fn resume_includes_initial_messages_and_sends_prior_items() {
writeln!(
f,
"{}",
json!({"meta":"test","instructions":"be nice", "id": Uuid::new_v4(), "timestamp": "2024-01-01T00:00:00Z"})
json!({
"record_type": "session_meta",
"id": Uuid::new_v4(),
"timestamp": "2024-01-01T00:00:00Z",
"cwd": tmpdir.path().to_string_lossy(),
"originator": "test",
"cli_version": "0.0.0-test"
})
)
.unwrap();
@@ -139,7 +146,17 @@ async fn resume_includes_initial_messages_and_sends_prior_items() {
text: "resumed user message".to_string(),
}],
};
writeln!(f, "{}", serde_json::to_string(&prior_user).unwrap()).unwrap();
let mut prior_user_obj = serde_json::to_value(&prior_user)
.unwrap()
.as_object()
.unwrap()
.clone();
prior_user_obj.insert("record_type".to_string(), serde_json::json!("response"));
prior_user_obj.insert(
"timestamp".to_string(),
serde_json::json!("2025-01-01T00:00:00.000+00:00"),
);
writeln!(f, "{}", serde_json::Value::Object(prior_user_obj)).unwrap();
// Prior item: system message (excluded from API history)
let prior_system = codex_protocol::models::ResponseItem::Message {
@@ -149,7 +166,17 @@ async fn resume_includes_initial_messages_and_sends_prior_items() {
text: "resumed system instruction".to_string(),
}],
};
writeln!(f, "{}", serde_json::to_string(&prior_system).unwrap()).unwrap();
let mut prior_system_obj = serde_json::to_value(&prior_system)
.unwrap()
.as_object()
.unwrap()
.clone();
prior_system_obj.insert("record_type".to_string(), serde_json::json!("response"));
prior_system_obj.insert(
"timestamp".to_string(),
serde_json::json!("2025-01-01T00:00:00.000+00:00"),
);
writeln!(f, "{}", serde_json::Value::Object(prior_system_obj)).unwrap();
// Prior item: assistant message
let prior_item = codex_protocol::models::ResponseItem::Message {
@@ -159,11 +186,22 @@ async fn resume_includes_initial_messages_and_sends_prior_items() {
text: "resumed assistant message".to_string(),
}],
};
writeln!(f, "{}", serde_json::to_string(&prior_item).unwrap()).unwrap();
let mut prior_item_obj = serde_json::to_value(&prior_item)
.unwrap()
.as_object()
.unwrap()
.clone();
prior_item_obj.insert("record_type".to_string(), serde_json::json!("response"));
prior_item_obj.insert(
"timestamp".to_string(),
serde_json::json!("2025-01-01T00:00:00.000+00:00"),
);
writeln!(f, "{}", serde_json::Value::Object(prior_item_obj)).unwrap();
let prior_item_event = EventMsg::AgentMessage(AgentMessageEvent {
message: "resumed assistant message".to_string(),
});
let prior_event_line = serde_json::json!({
"timestamp": "2025-01-01T00:00:00.000+00:00",
"record_type": "event",
"id": "resume-1",
"msg": prior_item_event,