Compare commits

...

2 Commits

Author SHA1 Message Date
Yaroslav Volovich
fe81f39312 codex: address PR review feedback (#12577) 2026-02-23 16:29:24 +00:00
Yaroslav Volovich
47914f9f8a app-server: support turn-based thread forks 2026-02-23 13:47:41 +00:00
6 changed files with 670 additions and 20 deletions

View File

@@ -19,6 +19,10 @@ export type ThreadForkParams = {threadId: string, /**
* If specified, the thread_id param will be ignored.
*/
path?: string | null, /**
* [UNSTABLE] Fork after the specified historical turn (inclusive).
* When omitted, the full thread history is copied.
*/
forkAfterTurnId?: string | null, /**
* Configuration overrides for the forked thread, if any.
*/
model?: string | null, modelProvider?: string | null, cwd?: string | null, approvalPolicy?: AskForApproval | null, sandbox?: SandboxMode | null, config?: { [key in string]?: JsonValue } | null, baseInstructions?: string | null, developerInstructions?: string | null, /**

View File

@@ -51,22 +51,40 @@ use codex_protocol::protocol::ExecCommandStatus as CoreExecCommandStatus;
#[cfg(test)]
use codex_protocol::protocol::PatchApplyStatus as CorePatchApplyStatus;
#[derive(Debug, Clone, PartialEq)]
pub struct ThreadHistoryBuildResult {
pub turns: Vec<Turn>,
/// True when any turn id had to be synthesized during replay because the
/// rollout history lacked explicit turn lifecycle events.
pub has_synthetic_turn_ids: bool,
}
/// Convert persisted [`RolloutItem`] entries into a sequence of [`Turn`] values and
/// annotate whether any turn ids were synthesized during replay.
///
/// When available, this uses `TurnContext.turn_id` as the canonical turn id so
/// resumed/rebuilt thread history preserves the original turn identifiers.
pub fn build_thread_history_from_rollout_items(items: &[RolloutItem]) -> ThreadHistoryBuildResult {
let mut builder = ThreadHistoryBuilder::new();
for item in items {
builder.handle_rollout_item(item);
}
builder.finish_result()
}
/// Convert persisted [`RolloutItem`] entries into a sequence of [`Turn`] values.
///
/// When available, this uses `TurnContext.turn_id` as the canonical turn id so
/// resumed/rebuilt thread history preserves the original turn identifiers.
pub fn build_turns_from_rollout_items(items: &[RolloutItem]) -> Vec<Turn> {
let mut builder = ThreadHistoryBuilder::new();
for item in items {
builder.handle_rollout_item(item);
}
builder.finish()
build_thread_history_from_rollout_items(items).turns
}
pub struct ThreadHistoryBuilder {
turns: Vec<Turn>,
current_turn: Option<PendingTurn>,
next_item_index: i64,
has_synthetic_turn_ids: bool,
}
impl Default for ThreadHistoryBuilder {
@@ -81,6 +99,7 @@ impl ThreadHistoryBuilder {
turns: Vec::new(),
current_turn: None,
next_item_index: 1,
has_synthetic_turn_ids: false,
}
}
@@ -88,9 +107,16 @@ impl ThreadHistoryBuilder {
*self = Self::new();
}
pub fn finish(mut self) -> Vec<Turn> {
pub fn finish(self) -> Vec<Turn> {
self.finish_result().turns
}
pub fn finish_result(mut self) -> ThreadHistoryBuildResult {
self.finish_current_turn();
self.turns
ThreadHistoryBuildResult {
turns: self.turns,
has_synthetic_turn_ids: self.has_synthetic_turn_ids,
}
}
pub fn active_turn_snapshot(&self) -> Option<Turn> {
@@ -809,8 +835,15 @@ impl ThreadHistoryBuilder {
}
fn new_turn(&mut self, id: Option<String>) -> PendingTurn {
let id = match id {
Some(id) => id,
None => {
self.has_synthetic_turn_ids = true;
Uuid::now_v7().to_string()
}
};
PendingTurn {
id: id.unwrap_or_else(|| Uuid::now_v7().to_string()),
id,
items: Vec::new(),
error: None,
status: TurnStatus::Completed,
@@ -1127,6 +1160,57 @@ mod tests {
);
}
#[test]
fn reports_synthetic_turn_ids_for_legacy_history() {
let items = vec![
RolloutItem::EventMsg(EventMsg::UserMessage(UserMessageEvent {
message: "legacy".into(),
images: None,
text_elements: Vec::new(),
local_images: Vec::new(),
})),
RolloutItem::EventMsg(EventMsg::AgentMessage(AgentMessageEvent {
message: "reply".into(),
phase: None,
})),
];
let result = build_thread_history_from_rollout_items(&items);
assert_eq!(result.turns.len(), 1);
assert_eq!(result.has_synthetic_turn_ids, true);
}
#[test]
fn reports_no_synthetic_turn_ids_when_turn_boundaries_are_explicit() {
let turn_id = "turn-explicit";
let items = vec![
RolloutItem::EventMsg(EventMsg::TurnStarted(TurnStartedEvent {
turn_id: turn_id.to_string(),
model_context_window: Some(128_000),
collaboration_mode_kind: Default::default(),
})),
RolloutItem::EventMsg(EventMsg::UserMessage(UserMessageEvent {
message: "modern".into(),
images: None,
text_elements: Vec::new(),
local_images: Vec::new(),
})),
RolloutItem::EventMsg(EventMsg::AgentMessage(AgentMessageEvent {
message: "reply".into(),
phase: None,
})),
RolloutItem::EventMsg(EventMsg::TurnComplete(TurnCompleteEvent {
turn_id: turn_id.to_string(),
last_agent_message: Some("reply".into()),
})),
];
let result = build_thread_history_from_rollout_items(&items);
assert_eq!(result.turns.len(), 1);
assert_eq!(result.turns[0].id, turn_id);
assert_eq!(result.has_synthetic_turn_ids, false);
}
#[test]
fn ignores_non_plan_item_lifecycle_events() {
let turn_id = "turn-1";

View File

@@ -1708,6 +1708,12 @@ pub struct ThreadForkParams {
#[ts(optional = nullable)]
pub path: Option<PathBuf>,
/// [UNSTABLE] Fork after the specified historical turn (inclusive).
/// When omitted, the full thread history is copied.
#[experimental("thread/fork.forkAfterTurnId")]
#[ts(optional = nullable)]
pub fork_after_turn_id: Option<String>,
/// Configuration overrides for the forked thread, if any.
#[ts(optional = nullable)]
pub model: Option<String>,

View File

@@ -121,7 +121,7 @@ Example with notification opt-out:
- `thread/start` — create a new thread; emits `thread/started` and auto-subscribes you to turn/item events for that thread.
- `thread/resume` — reopen an existing thread by id so subsequent `turn/start` calls append to it.
- `thread/fork` — fork an existing thread into a new thread id by copying the stored history; emits `thread/started` and auto-subscribes you to turn/item events for the new thread.
- `thread/fork` — fork an existing thread into a new thread id by copying the stored history. Experimental `forkAfterTurnId` lets clients fork from a selected historical turn (modern turn-id-stable histories only); emits `thread/started` and auto-subscribes you to turn/item events for the new thread.
- `thread/list` — page through stored rollouts; supports cursor-based pagination and optional `modelProviders`, `sourceKinds`, `archived`, and `cwd` filters. Each returned `thread` includes `status` (`ThreadStatus`), defaulting to `notLoaded` when the thread is not currently loaded.
- `thread/loaded/list` — list the thread ids currently loaded in memory.
- `thread/read` — read a stored thread by id without resuming it; optionally include turns via `includeTurns`. The returned `thread` includes `status` (`ThreadStatus`), defaulting to `notLoaded` when the thread is not currently loaded.
@@ -208,7 +208,7 @@ To continue a stored session, call `thread/resume` with the `thread.id` you prev
{ "id": 11, "result": { "thread": { "id": "thr_123", } } }
```
To branch from a stored session, call `thread/fork` with the `thread.id`. This creates a new thread id and emits a `thread/started` notification for it:
To branch from a stored session, call `thread/fork` with the `thread.id`. This creates a new thread id and emits a `thread/started` notification for it. To fork from a selected prior turn instead of the full history, pass experimental `forkAfterTurnId`:
```json
{ "method": "thread/fork", "id": 12, "params": { "threadId": "thr_123" } }
@@ -216,6 +216,14 @@ To branch from a stored session, call `thread/fork` with the `thread.id`. This c
{ "method": "thread/started", "params": { "thread": { } } }
```
```json
{ "method": "thread/fork", "id": 13, "params": {
"threadId": "thr_123",
"forkAfterTurnId": "turn_abc",
"cwd": "/Users/me/project-worktree"
} }
```
Experimental API: `thread/start`, `thread/resume`, and `thread/fork` accept `persistExtendedHistory: true` to persist a richer subset of ThreadItems for non-lossy history when calling `thread/read`, `thread/resume`, and `thread/fork` later. This does not backfill events that were not persisted previously.
### Example: List threads (with pagination & filters)

View File

@@ -167,6 +167,7 @@ use codex_app_server_protocol::WindowsSandboxSetupCompletedNotification;
use codex_app_server_protocol::WindowsSandboxSetupMode;
use codex_app_server_protocol::WindowsSandboxSetupStartParams;
use codex_app_server_protocol::WindowsSandboxSetupStartResponse;
use codex_app_server_protocol::build_thread_history_from_rollout_items;
use codex_app_server_protocol::build_turns_from_rollout_items;
use codex_backend_client::Client as BackendClient;
use codex_chatgpt::connectors;
@@ -3269,6 +3270,7 @@ impl CodexMessageProcessor {
let ThreadForkParams {
thread_id,
path,
fork_after_turn_id,
model,
model_provider,
cwd,
@@ -3382,21 +3384,71 @@ impl CodexMessageProcessor {
};
let fallback_model_provider = config.model_provider_id.clone();
let anchored_fork_history = if let Some(fork_after_turn_id) = fork_after_turn_id.as_deref()
{
let source_items = match read_rollout_items_from_rollout(rollout_path.as_path()).await {
Ok(items) => items,
Err(err) => {
let (code, message) = match err.kind() {
std::io::ErrorKind::NotFound => (
INVALID_REQUEST_ERROR_CODE,
format!("failed to load rollout `{}`: {err}", rollout_path.display()),
),
_ => (
INTERNAL_ERROR_CODE,
format!(
"failed to read rollout `{}` for thread fork: {err}",
rollout_path.display()
),
),
};
let error = JSONRPCErrorError {
code,
message,
data: None,
};
self.outgoing.send_error(request_id, error).await;
return;
}
};
match resolve_thread_fork_history_from_anchor(
source_items.as_slice(),
fork_after_turn_id,
) {
Ok(history_items) => Some(InitialHistory::Forked(history_items)),
Err(message) => {
self.send_invalid_request_error(request_id, message).await;
return;
}
}
} else {
None
};
let NewThread {
thread_id,
session_configured,
..
} = match self
.thread_manager
.fork_thread(
usize::MAX,
config,
rollout_path.clone(),
persist_extended_history,
)
.await
{
} = match if let Some(initial_history) = anchored_fork_history {
self.thread_manager
.resume_thread_with_history(
config,
initial_history,
self.auth_manager.clone(),
persist_extended_history,
)
.await
} else {
self.thread_manager
.fork_thread(
usize::MAX,
config,
rollout_path.clone(),
persist_extended_history,
)
.await
} {
Ok(thread) => thread,
Err(err) => {
let (code, message) = match err {
@@ -6630,6 +6682,99 @@ async fn sync_default_client_residency_requirement(
}
}
fn resolve_thread_fork_history_from_anchor(
rollout_items: &[RolloutItem],
fork_after_turn_id: &str,
) -> Result<Vec<RolloutItem>, String> {
let history = build_thread_history_from_rollout_items(rollout_items);
if history.has_synthetic_turn_ids {
return Err(
"turn-based forking is not supported for legacy thread history; use full thread/fork"
.to_string(),
);
}
let Some(target_turn_idx) = history
.turns
.iter()
.position(|turn| turn.id == fork_after_turn_id)
else {
return Err(format!(
"fork turn not found in source thread history: {fork_after_turn_id}"
));
};
let target_turn = &history.turns[target_turn_idx];
let has_user_message = target_turn
.items
.iter()
.any(|item| matches!(item, ThreadItem::UserMessage { .. }));
let has_agent_message = target_turn
.items
.iter()
.any(|item| matches!(item, ThreadItem::AgentMessage { .. }));
if matches!(target_turn.status, TurnStatus::InProgress) {
return Err("fork turn must be completed/interrupted/failed, not in progress".to_string());
}
if !has_user_message {
return Err("fork turn must contain a user message".to_string());
}
if !has_agent_message {
return Err("fork turn must contain an agent message".to_string());
}
let Some(next_turn) = history.turns.get(target_turn_idx.saturating_add(1)) else {
return Ok(rollout_items.to_vec());
};
let next_turn_id = next_turn.id.as_str();
let next_turn_started_idx = rollout_items
.iter()
.position(|item| {
matches!(
item,
RolloutItem::EventMsg(EventMsg::TurnStarted(payload))
if payload.turn_id == next_turn_id
)
})
.ok_or_else(|| {
format!("failed to locate boundary after fork turn: missing next turn `{next_turn_id}`")
})?;
let target_terminal_idx = rollout_items.iter().rposition(|item| {
matches!(
item,
RolloutItem::EventMsg(EventMsg::TurnComplete(payload))
if payload.turn_id == fork_after_turn_id
) || matches!(
item,
RolloutItem::EventMsg(EventMsg::TurnAborted(payload))
if payload.turn_id.as_deref() == Some(fork_after_turn_id)
)
});
let next_user_boundary_idx = target_terminal_idx
.and_then(|terminal_idx| {
rollout_items
.iter()
.enumerate()
.skip(terminal_idx.saturating_add(1))
.find_map(|(idx, item)| is_user_turn_rollout_boundary(item).then_some(idx))
})
.unwrap_or(usize::MAX);
let cut_idx = next_turn_started_idx.min(next_user_boundary_idx);
Ok(rollout_items[..cut_idx].to_vec())
}
fn is_user_turn_rollout_boundary(item: &RolloutItem) -> bool {
match item {
RolloutItem::ResponseItem(ResponseItem::Message { role, .. }) => role == "user",
RolloutItem::EventMsg(EventMsg::UserMessage(_)) => true,
_ => false,
}
}
/// Derive the effective [`Config`] by layering three override sources.
///
/// Precedence (lowest to highest):

View File

@@ -2,6 +2,7 @@ use anyhow::Result;
use app_test_support::McpProcess;
use app_test_support::create_fake_rollout;
use app_test_support::create_mock_responses_server_repeating_assistant;
use app_test_support::rollout_path;
use app_test_support::to_response;
use codex_app_server_protocol::JSONRPCError;
use codex_app_server_protocol::JSONRPCNotification;
@@ -17,11 +18,26 @@ use codex_app_server_protocol::ThreadStartedNotification;
use codex_app_server_protocol::ThreadStatus;
use codex_app_server_protocol::TurnStatus;
use codex_app_server_protocol::UserInput;
use codex_protocol::ThreadId;
use codex_protocol::models::MessagePhase;
use codex_protocol::protocol::AgentMessageEvent;
use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::RolloutItem;
use codex_protocol::protocol::SessionMeta;
use codex_protocol::protocol::SessionMetaLine;
use codex_protocol::protocol::TurnCompleteEvent;
use codex_protocol::protocol::TurnStartedEvent;
use codex_protocol::protocol::UserMessageEvent;
use pretty_assertions::assert_eq;
use serde_json::Value;
use serde_json::json;
use std::fs;
use std::fs::FileTimes;
use std::path::Path;
use std::path::PathBuf;
use tempfile::TempDir;
use tokio::time::timeout;
use uuid::Uuid;
const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
@@ -191,6 +207,235 @@ async fn thread_fork_rejects_unmaterialized_thread() -> Result<()> {
Ok(())
}
#[tokio::test]
async fn thread_fork_can_fork_after_selected_turn() -> Result<()> {
let server = create_mock_responses_server_repeating_assistant("Done").await;
let codex_home = TempDir::new()?;
create_config_toml(codex_home.path(), &server.uri())?;
let conversation_id = create_fake_rollout_with_explicit_turns(
codex_home.path(),
"2025-01-05T12-00-01",
"2025-01-05T12:00:01Z",
&[
ExplicitTurnFixture {
turn_id: "turn-1",
user_text: Some("u1"),
agent_text: Some("a1"),
state: FixtureTurnState::Completed,
},
ExplicitTurnFixture {
turn_id: "turn-2",
user_text: Some("u2"),
agent_text: Some("a2"),
state: FixtureTurnState::Completed,
},
ExplicitTurnFixture {
// Explicit turn with no user message exercises the exact cut-after-turn logic.
turn_id: "turn-3-empty",
user_text: None,
agent_text: None,
state: FixtureTurnState::Completed,
},
],
)?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let fork_cwd = codex_home.path().join("fork-worktree");
fs::create_dir_all(&fork_cwd)?;
let fork_id = mcp
.send_thread_fork_request(ThreadForkParams {
thread_id: conversation_id,
fork_after_turn_id: Some("turn-2".to_string()),
cwd: Some(fork_cwd.display().to_string()),
..Default::default()
})
.await?;
let fork_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(fork_id)),
)
.await??;
let ThreadForkResponse { thread, cwd, .. } = to_response::<ThreadForkResponse>(fork_resp)?;
assert_eq!(cwd, fork_cwd);
assert_eq!(thread.cwd, fork_cwd);
assert_eq!(thread.turns.len(), 2, "later turns should be truncated");
assert_eq!(thread.turns[0].id, "turn-1");
assert_eq!(thread.turns[1].id, "turn-2");
assert_turn_user_text(&thread.turns[0].items, "u1");
assert_turn_user_text(&thread.turns[1].items, "u2");
Ok(())
}
#[tokio::test]
async fn thread_fork_rejects_unknown_turn_anchor() -> Result<()> {
let server = create_mock_responses_server_repeating_assistant("Done").await;
let codex_home = TempDir::new()?;
create_config_toml(codex_home.path(), &server.uri())?;
let conversation_id = create_fake_rollout_with_explicit_turns(
codex_home.path(),
"2025-01-05T12-00-02",
"2025-01-05T12:00:02Z",
&[ExplicitTurnFixture {
turn_id: "turn-1",
user_text: Some("u1"),
agent_text: Some("a1"),
state: FixtureTurnState::Completed,
}],
)?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let fork_id = mcp
.send_thread_fork_request(ThreadForkParams {
thread_id: conversation_id,
fork_after_turn_id: Some("missing-turn".to_string()),
..Default::default()
})
.await?;
let fork_err: JSONRPCError = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_error_message(RequestId::Integer(fork_id)),
)
.await??;
assert!(
fork_err.error.message.contains("fork turn not found"),
"unexpected fork error: {}",
fork_err.error.message
);
Ok(())
}
#[tokio::test]
async fn thread_fork_rejects_legacy_turn_anchor() -> Result<()> {
let server = create_mock_responses_server_repeating_assistant("Done").await;
let codex_home = TempDir::new()?;
create_config_toml(codex_home.path(), &server.uri())?;
let conversation_id = create_fake_rollout(
codex_home.path(),
"2025-01-05T12-00-03",
"2025-01-05T12:00:03Z",
"legacy preview",
Some("mock_provider"),
None,
)?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let fork_id = mcp
.send_thread_fork_request(ThreadForkParams {
thread_id: conversation_id,
fork_after_turn_id: Some("legacy-turn".to_string()),
..Default::default()
})
.await?;
let fork_err: JSONRPCError = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_error_message(RequestId::Integer(fork_id)),
)
.await??;
assert!(
fork_err.error.message.contains("legacy thread history"),
"unexpected fork error: {}",
fork_err.error.message
);
Ok(())
}
#[tokio::test]
async fn thread_fork_rejects_in_progress_turn_anchor() -> Result<()> {
let server = create_mock_responses_server_repeating_assistant("Done").await;
let codex_home = TempDir::new()?;
create_config_toml(codex_home.path(), &server.uri())?;
let conversation_id = create_fake_rollout_with_explicit_turns(
codex_home.path(),
"2025-01-05T12-00-04",
"2025-01-05T12:00:04Z",
&[ExplicitTurnFixture {
turn_id: "turn-in-progress",
user_text: Some("u1"),
agent_text: Some("a1"),
state: FixtureTurnState::InProgress,
}],
)?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let fork_id = mcp
.send_thread_fork_request(ThreadForkParams {
thread_id: conversation_id,
fork_after_turn_id: Some("turn-in-progress".to_string()),
..Default::default()
})
.await?;
let fork_err: JSONRPCError = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_error_message(RequestId::Integer(fork_id)),
)
.await??;
assert!(
fork_err.error.message.contains("not in progress"),
"unexpected fork error: {}",
fork_err.error.message
);
Ok(())
}
#[tokio::test]
async fn thread_fork_rejects_turn_anchor_without_agent_message() -> Result<()> {
let server = create_mock_responses_server_repeating_assistant("Done").await;
let codex_home = TempDir::new()?;
create_config_toml(codex_home.path(), &server.uri())?;
let conversation_id = create_fake_rollout_with_explicit_turns(
codex_home.path(),
"2025-01-05T12-00-05",
"2025-01-05T12:00:05Z",
&[ExplicitTurnFixture {
turn_id: "turn-no-agent",
user_text: Some("u1"),
agent_text: None,
state: FixtureTurnState::Completed,
}],
)?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let fork_id = mcp
.send_thread_fork_request(ThreadForkParams {
thread_id: conversation_id,
fork_after_turn_id: Some("turn-no-agent".to_string()),
..Default::default()
})
.await?;
let fork_err: JSONRPCError = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_error_message(RequestId::Integer(fork_id)),
)
.await??;
assert!(
fork_err.error.message.contains("agent message"),
"unexpected fork error: {}",
fork_err.error.message
);
Ok(())
}
// Helper to create a config.toml pointing at the mock model server.
fn create_config_toml(codex_home: &Path, server_uri: &str) -> std::io::Result<()> {
let config_toml = codex_home.join("config.toml");
@@ -214,3 +459,161 @@ stream_max_retries = 0
),
)
}
#[derive(Clone, Copy)]
enum FixtureTurnState {
Completed,
InProgress,
}
#[derive(Clone, Copy)]
struct ExplicitTurnFixture<'a> {
turn_id: &'a str,
user_text: Option<&'a str>,
agent_text: Option<&'a str>,
state: FixtureTurnState,
}
fn assert_turn_user_text(items: &[ThreadItem], expected: &str) {
match items.first() {
Some(ThreadItem::UserMessage { content, .. }) => assert_eq!(
content,
&vec![UserInput::Text {
text: expected.to_string(),
text_elements: Vec::new(),
}]
),
other => panic!("expected first turn item to be a user message, got {other:?}"),
}
}
fn create_fake_rollout_with_explicit_turns(
codex_home: &Path,
filename_ts: &str,
meta_rfc3339: &str,
turns: &[ExplicitTurnFixture<'_>],
) -> Result<String> {
let uuid = Uuid::new_v4();
let uuid_str = uuid.to_string();
let conversation_id = ThreadId::from_string(&uuid_str)?;
let file_path = rollout_path(codex_home, filename_ts, &uuid_str);
let dir = file_path
.parent()
.ok_or_else(|| anyhow::anyhow!("missing rollout parent directory"))?;
fs::create_dir_all(dir)?;
let meta = SessionMeta {
id: conversation_id,
forked_from_id: None,
timestamp: meta_rfc3339.to_string(),
cwd: PathBuf::from("/"),
originator: "codex".to_string(),
cli_version: "0.0.0".to_string(),
source: codex_protocol::protocol::SessionSource::Cli,
agent_nickname: None,
agent_role: None,
model_provider: Some("mock_provider".to_string()),
base_instructions: None,
dynamic_tools: None,
};
let mut lines = vec![rollout_line(
meta_rfc3339,
RolloutItem::SessionMeta(SessionMetaLine { meta, git: None }),
)?];
for (idx, turn) in turns.iter().enumerate() {
if let Some(user_text) = turn.user_text {
lines.push(
json!({
"timestamp": meta_rfc3339,
"type":"response_item",
"payload": {
"type":"message",
"role":"user",
"content":[{"type":"input_text","text": user_text}]
}
})
.to_string(),
);
}
lines.push(rollout_line(
meta_rfc3339,
RolloutItem::EventMsg(EventMsg::TurnStarted(TurnStartedEvent {
turn_id: turn.turn_id.to_string(),
model_context_window: Some(128_000),
collaboration_mode_kind: Default::default(),
})),
)?);
if let Some(user_text) = turn.user_text {
lines.push(rollout_line(
meta_rfc3339,
RolloutItem::EventMsg(EventMsg::UserMessage(UserMessageEvent {
message: user_text.to_string(),
images: None,
local_images: Vec::new(),
text_elements: Vec::new(),
})),
)?);
}
if let Some(agent_text) = turn.agent_text {
lines.push(rollout_line(
meta_rfc3339,
RolloutItem::EventMsg(EventMsg::AgentMessage(AgentMessageEvent {
message: agent_text.to_string(),
phase: Some(MessagePhase::FinalAnswer),
})),
)?);
}
if matches!(turn.state, FixtureTurnState::Completed) {
let last_agent_message = turn.agent_text.map(str::to_string);
lines.push(rollout_line(
meta_rfc3339,
RolloutItem::EventMsg(EventMsg::TurnComplete(TurnCompleteEvent {
turn_id: turn.turn_id.to_string(),
last_agent_message,
})),
)?);
}
if idx == 0 && turn.agent_text.is_some() {
lines.push(
json!({
"timestamp": meta_rfc3339,
"type":"response_item",
"payload": {
"type":"message",
"role":"assistant",
"content":[{"type":"output_text","text": turn.agent_text.unwrap_or("")}]
}
})
.to_string(),
);
}
}
fs::write(&file_path, lines.join("\n") + "\n")?;
let parsed = chrono::DateTime::parse_from_rfc3339(meta_rfc3339)?.with_timezone(&chrono::Utc);
let times = FileTimes::new().set_modified(parsed.into());
fs::OpenOptions::new()
.append(true)
.open(&file_path)?
.set_times(times)?;
Ok(uuid_str)
}
fn rollout_line(timestamp: &str, item: RolloutItem) -> Result<String> {
let mut line = serde_json::Map::new();
line.insert(
"timestamp".to_string(),
Value::String(timestamp.to_string()),
);
let item_value = serde_json::to_value(item)?;
let Value::Object(item_map) = item_value else {
anyhow::bail!("rollout item did not serialize as an object");
};
line.extend(item_map);
Ok(Value::Object(line).to_string())
}