Migrate fork and resume reads to thread store (#18900)

- Route cold thread/resume and thread/fork source loading through
ThreadStore reads instead of direct rollout path operations
- Keep lookups that explicitly specify a rollout-path using the local
thread store methods but return an invalid-request error for remote
ThreadStore configurations
- Add some additional unit tests for code path coverage
This commit is contained in:
Tom
2026-04-24 13:51:37 -07:00
committed by GitHub
parent 13e0ec1614
commit 0a9b559c0b
18 changed files with 966 additions and 410 deletions

View File

@@ -3,7 +3,9 @@ use codex_core::NewThread;
use codex_core::parse_turn_item;
use codex_protocol::items::TurnItem;
use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::InitialHistory;
use codex_protocol::protocol::Op;
use codex_protocol::protocol::ResumedHistory;
use codex_protocol::protocol::RolloutItem;
use codex_protocol::protocol::RolloutLine;
use codex_protocol::user_input::UserInput;
@@ -66,27 +68,9 @@ async fn fork_thread_twice_drops_to_first_message() {
// GetHistory flushes before returning the path; no wait needed.
// Helper: read rollout items (excluding SessionMeta) from a JSONL path.
let read_items = |p: &std::path::Path| -> Vec<RolloutItem> {
let text = std::fs::read_to_string(p).expect("read rollout file");
let mut items: Vec<RolloutItem> = Vec::new();
for line in text.lines() {
if line.trim().is_empty() {
continue;
}
let v: serde_json::Value = serde_json::from_str(line).expect("jsonl line");
let rl: RolloutLine = serde_json::from_value(v).expect("rollout line");
match rl.item {
RolloutItem::SessionMeta(_) => {}
other => items.push(other),
}
}
items
};
// Compute expected prefixes after each fork by truncating base rollout
// strictly before the nth user input (0-based).
let base_items = read_items(&base_path);
let base_items = read_rollout_items(&base_path);
let find_user_input_positions = |items: &[RolloutItem]| -> Vec<usize> {
let mut pos = Vec::new();
for (i, it) in items.iter().enumerate() {
@@ -126,7 +110,7 @@ async fn fork_thread_twice_drops_to_first_message() {
let fork1_path = codex_fork1.rollout_path().expect("rollout path");
// GetHistory on fork1 flushed; the file is ready.
let fork1_items = read_items(&fork1_path);
let fork1_items = read_rollout_items(&fork1_path);
pretty_assertions::assert_eq!(
serde_json::to_value(&fork1_items).unwrap(),
serde_json::to_value(&expected_after_first).unwrap()
@@ -149,16 +133,114 @@ async fn fork_thread_twice_drops_to_first_message() {
let fork2_path = codex_fork2.rollout_path().expect("rollout path");
// GetHistory on fork2 flushed; the file is ready.
let fork1_items = read_items(&fork1_path);
let fork1_items = read_rollout_items(&fork1_path);
let fork1_user_inputs = find_user_input_positions(&fork1_items);
let cut_last_on_fork1 = fork1_user_inputs
.get(fork1_user_inputs.len().saturating_sub(1))
.copied()
.unwrap_or(0);
let expected_after_second: Vec<RolloutItem> = fork1_items[..cut_last_on_fork1].to_vec();
let fork2_items = read_items(&fork2_path);
let fork2_items = read_rollout_items(&fork2_path);
pretty_assertions::assert_eq!(
serde_json::to_value(&fork2_items).unwrap(),
serde_json::to_value(&expected_after_second).unwrap()
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn fork_thread_from_history_does_not_require_source_rollout_path() {
skip_if_no_network!();
let server = MockServer::start().await;
let sse = sse(vec![ev_response_created("resp"), ev_completed("resp")]);
Mock::given(method("POST"))
.and(path("/v1/responses"))
.respond_with(
ResponseTemplate::new(200)
.insert_header("content-type", "text/event-stream")
.set_body_raw(sse, "text/event-stream"),
)
.expect(1)
.mount(&server)
.await;
let mut builder = test_codex();
let test = builder.build(&server).await.expect("create conversation");
let codex = test.codex.clone();
let thread_manager = test.thread_manager.clone();
codex
.submit(Op::UserInput {
environments: None,
items: vec![UserInput::Text {
text: "fork me from stored history".to_string(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
responsesapi_client_metadata: None,
})
.await
.unwrap();
let _ = wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
let source_path = codex.rollout_path().expect("source rollout path");
let source_items = read_rollout_items(&source_path);
let NewThread {
thread: forked_thread,
..
} = thread_manager
.fork_thread_from_history(
ForkSnapshot::Interrupted,
test.config,
InitialHistory::Resumed(ResumedHistory {
conversation_id: test.session_configured.session_id,
history: source_items.clone(),
rollout_path: None,
}),
/*persist_extended_history*/ false,
/*parent_trace*/ None,
)
.await
.expect("fork from stored history");
let forked_path = forked_thread.rollout_path().expect("forked rollout path");
let forked_items = read_rollout_items(&forked_path);
let forked_items = forked_items
.iter()
.map(|item| serde_json::to_value(item).unwrap())
.collect::<Vec<_>>();
let source_items = source_items
.iter()
.map(|item| serde_json::to_value(item).unwrap())
.collect::<Vec<_>>();
assert!(
forked_items.starts_with(&source_items),
"forked history should start with the supplied source history"
);
}
fn read_rollout_items(path: &std::path::Path) -> Vec<RolloutItem> {
let text = match std::fs::read_to_string(path) {
Ok(text) => text,
Err(err) => panic!("failed to read rollout file {}: {err}", path.display()),
};
let mut items: Vec<RolloutItem> = Vec::new();
for line in text.lines() {
if line.trim().is_empty() {
continue;
}
let v: serde_json::Value = match serde_json::from_str(line) {
Ok(value) => value,
Err(err) => panic!("failed to parse rollout JSON line `{line}`: {err}"),
};
let rl: RolloutLine = match serde_json::from_value(v) {
Ok(line) => line,
Err(err) => panic!("failed to parse rollout line `{line}`: {err}"),
};
match rl.item {
RolloutItem::SessionMeta(_) => {}
other => items.push(other),
}
}
items
}

View File

@@ -74,7 +74,7 @@ fn resume_history(
time_to_first_token_ms: None,
})),
],
rollout_path: rollout_path.to_path_buf(),
rollout_path: Some(rollout_path.to_path_buf()),
})
}