mirror of
https://github.com/openai/codex.git
synced 2026-02-23 17:23:47 +00:00
Compare commits
2 Commits
latest-alp
...
codex/turn
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
fe81f39312 | ||
|
|
47914f9f8a |
@@ -19,6 +19,10 @@ export type ThreadForkParams = {threadId: string, /**
|
||||
* If specified, the thread_id param will be ignored.
|
||||
*/
|
||||
path?: string | null, /**
|
||||
* [UNSTABLE] Fork after the specified historical turn (inclusive).
|
||||
* When omitted, the full thread history is copied.
|
||||
*/
|
||||
forkAfterTurnId?: string | null, /**
|
||||
* Configuration overrides for the forked thread, if any.
|
||||
*/
|
||||
model?: string | null, modelProvider?: string | null, cwd?: string | null, approvalPolicy?: AskForApproval | null, sandbox?: SandboxMode | null, config?: { [key in string]?: JsonValue } | null, baseInstructions?: string | null, developerInstructions?: string | null, /**
|
||||
|
||||
@@ -51,22 +51,40 @@ use codex_protocol::protocol::ExecCommandStatus as CoreExecCommandStatus;
|
||||
#[cfg(test)]
|
||||
use codex_protocol::protocol::PatchApplyStatus as CorePatchApplyStatus;
|
||||
|
||||
#[derive(Debug, Clone, PartialEq)]
|
||||
pub struct ThreadHistoryBuildResult {
|
||||
pub turns: Vec<Turn>,
|
||||
/// True when any turn id had to be synthesized during replay because the
|
||||
/// rollout history lacked explicit turn lifecycle events.
|
||||
pub has_synthetic_turn_ids: bool,
|
||||
}
|
||||
|
||||
/// Convert persisted [`RolloutItem`] entries into a sequence of [`Turn`] values and
|
||||
/// annotate whether any turn ids were synthesized during replay.
|
||||
///
|
||||
/// When available, this uses `TurnContext.turn_id` as the canonical turn id so
|
||||
/// resumed/rebuilt thread history preserves the original turn identifiers.
|
||||
pub fn build_thread_history_from_rollout_items(items: &[RolloutItem]) -> ThreadHistoryBuildResult {
|
||||
let mut builder = ThreadHistoryBuilder::new();
|
||||
for item in items {
|
||||
builder.handle_rollout_item(item);
|
||||
}
|
||||
builder.finish_result()
|
||||
}
|
||||
|
||||
/// Convert persisted [`RolloutItem`] entries into a sequence of [`Turn`] values.
|
||||
///
|
||||
/// When available, this uses `TurnContext.turn_id` as the canonical turn id so
|
||||
/// resumed/rebuilt thread history preserves the original turn identifiers.
|
||||
pub fn build_turns_from_rollout_items(items: &[RolloutItem]) -> Vec<Turn> {
|
||||
let mut builder = ThreadHistoryBuilder::new();
|
||||
for item in items {
|
||||
builder.handle_rollout_item(item);
|
||||
}
|
||||
builder.finish()
|
||||
build_thread_history_from_rollout_items(items).turns
|
||||
}
|
||||
|
||||
pub struct ThreadHistoryBuilder {
|
||||
turns: Vec<Turn>,
|
||||
current_turn: Option<PendingTurn>,
|
||||
next_item_index: i64,
|
||||
has_synthetic_turn_ids: bool,
|
||||
}
|
||||
|
||||
impl Default for ThreadHistoryBuilder {
|
||||
@@ -81,6 +99,7 @@ impl ThreadHistoryBuilder {
|
||||
turns: Vec::new(),
|
||||
current_turn: None,
|
||||
next_item_index: 1,
|
||||
has_synthetic_turn_ids: false,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -88,9 +107,16 @@ impl ThreadHistoryBuilder {
|
||||
*self = Self::new();
|
||||
}
|
||||
|
||||
pub fn finish(mut self) -> Vec<Turn> {
|
||||
pub fn finish(self) -> Vec<Turn> {
|
||||
self.finish_result().turns
|
||||
}
|
||||
|
||||
pub fn finish_result(mut self) -> ThreadHistoryBuildResult {
|
||||
self.finish_current_turn();
|
||||
self.turns
|
||||
ThreadHistoryBuildResult {
|
||||
turns: self.turns,
|
||||
has_synthetic_turn_ids: self.has_synthetic_turn_ids,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn active_turn_snapshot(&self) -> Option<Turn> {
|
||||
@@ -809,8 +835,15 @@ impl ThreadHistoryBuilder {
|
||||
}
|
||||
|
||||
fn new_turn(&mut self, id: Option<String>) -> PendingTurn {
|
||||
let id = match id {
|
||||
Some(id) => id,
|
||||
None => {
|
||||
self.has_synthetic_turn_ids = true;
|
||||
Uuid::now_v7().to_string()
|
||||
}
|
||||
};
|
||||
PendingTurn {
|
||||
id: id.unwrap_or_else(|| Uuid::now_v7().to_string()),
|
||||
id,
|
||||
items: Vec::new(),
|
||||
error: None,
|
||||
status: TurnStatus::Completed,
|
||||
@@ -1127,6 +1160,57 @@ mod tests {
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn reports_synthetic_turn_ids_for_legacy_history() {
|
||||
let items = vec![
|
||||
RolloutItem::EventMsg(EventMsg::UserMessage(UserMessageEvent {
|
||||
message: "legacy".into(),
|
||||
images: None,
|
||||
text_elements: Vec::new(),
|
||||
local_images: Vec::new(),
|
||||
})),
|
||||
RolloutItem::EventMsg(EventMsg::AgentMessage(AgentMessageEvent {
|
||||
message: "reply".into(),
|
||||
phase: None,
|
||||
})),
|
||||
];
|
||||
|
||||
let result = build_thread_history_from_rollout_items(&items);
|
||||
assert_eq!(result.turns.len(), 1);
|
||||
assert_eq!(result.has_synthetic_turn_ids, true);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn reports_no_synthetic_turn_ids_when_turn_boundaries_are_explicit() {
|
||||
let turn_id = "turn-explicit";
|
||||
let items = vec![
|
||||
RolloutItem::EventMsg(EventMsg::TurnStarted(TurnStartedEvent {
|
||||
turn_id: turn_id.to_string(),
|
||||
model_context_window: Some(128_000),
|
||||
collaboration_mode_kind: Default::default(),
|
||||
})),
|
||||
RolloutItem::EventMsg(EventMsg::UserMessage(UserMessageEvent {
|
||||
message: "modern".into(),
|
||||
images: None,
|
||||
text_elements: Vec::new(),
|
||||
local_images: Vec::new(),
|
||||
})),
|
||||
RolloutItem::EventMsg(EventMsg::AgentMessage(AgentMessageEvent {
|
||||
message: "reply".into(),
|
||||
phase: None,
|
||||
})),
|
||||
RolloutItem::EventMsg(EventMsg::TurnComplete(TurnCompleteEvent {
|
||||
turn_id: turn_id.to_string(),
|
||||
last_agent_message: Some("reply".into()),
|
||||
})),
|
||||
];
|
||||
|
||||
let result = build_thread_history_from_rollout_items(&items);
|
||||
assert_eq!(result.turns.len(), 1);
|
||||
assert_eq!(result.turns[0].id, turn_id);
|
||||
assert_eq!(result.has_synthetic_turn_ids, false);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn ignores_non_plan_item_lifecycle_events() {
|
||||
let turn_id = "turn-1";
|
||||
|
||||
@@ -1708,6 +1708,12 @@ pub struct ThreadForkParams {
|
||||
#[ts(optional = nullable)]
|
||||
pub path: Option<PathBuf>,
|
||||
|
||||
/// [UNSTABLE] Fork after the specified historical turn (inclusive).
|
||||
/// When omitted, the full thread history is copied.
|
||||
#[experimental("thread/fork.forkAfterTurnId")]
|
||||
#[ts(optional = nullable)]
|
||||
pub fork_after_turn_id: Option<String>,
|
||||
|
||||
/// Configuration overrides for the forked thread, if any.
|
||||
#[ts(optional = nullable)]
|
||||
pub model: Option<String>,
|
||||
|
||||
@@ -121,7 +121,7 @@ Example with notification opt-out:
|
||||
|
||||
- `thread/start` — create a new thread; emits `thread/started` and auto-subscribes you to turn/item events for that thread.
|
||||
- `thread/resume` — reopen an existing thread by id so subsequent `turn/start` calls append to it.
|
||||
- `thread/fork` — fork an existing thread into a new thread id by copying the stored history; emits `thread/started` and auto-subscribes you to turn/item events for the new thread.
|
||||
- `thread/fork` — fork an existing thread into a new thread id by copying the stored history. Experimental `forkAfterTurnId` lets clients fork from a selected historical turn (modern turn-id-stable histories only); emits `thread/started` and auto-subscribes you to turn/item events for the new thread.
|
||||
- `thread/list` — page through stored rollouts; supports cursor-based pagination and optional `modelProviders`, `sourceKinds`, `archived`, and `cwd` filters. Each returned `thread` includes `status` (`ThreadStatus`), defaulting to `notLoaded` when the thread is not currently loaded.
|
||||
- `thread/loaded/list` — list the thread ids currently loaded in memory.
|
||||
- `thread/read` — read a stored thread by id without resuming it; optionally include turns via `includeTurns`. The returned `thread` includes `status` (`ThreadStatus`), defaulting to `notLoaded` when the thread is not currently loaded.
|
||||
@@ -208,7 +208,7 @@ To continue a stored session, call `thread/resume` with the `thread.id` you prev
|
||||
{ "id": 11, "result": { "thread": { "id": "thr_123", … } } }
|
||||
```
|
||||
|
||||
To branch from a stored session, call `thread/fork` with the `thread.id`. This creates a new thread id and emits a `thread/started` notification for it:
|
||||
To branch from a stored session, call `thread/fork` with the `thread.id`. This creates a new thread id and emits a `thread/started` notification for it. To fork from a selected prior turn instead of the full history, pass experimental `forkAfterTurnId`:
|
||||
|
||||
```json
|
||||
{ "method": "thread/fork", "id": 12, "params": { "threadId": "thr_123" } }
|
||||
@@ -216,6 +216,14 @@ To branch from a stored session, call `thread/fork` with the `thread.id`. This c
|
||||
{ "method": "thread/started", "params": { "thread": { … } } }
|
||||
```
|
||||
|
||||
```json
|
||||
{ "method": "thread/fork", "id": 13, "params": {
|
||||
"threadId": "thr_123",
|
||||
"forkAfterTurnId": "turn_abc",
|
||||
"cwd": "/Users/me/project-worktree"
|
||||
} }
|
||||
```
|
||||
|
||||
Experimental API: `thread/start`, `thread/resume`, and `thread/fork` accept `persistExtendedHistory: true` to persist a richer subset of ThreadItems for non-lossy history when calling `thread/read`, `thread/resume`, and `thread/fork` later. This does not backfill events that were not persisted previously.
|
||||
|
||||
### Example: List threads (with pagination & filters)
|
||||
|
||||
@@ -167,6 +167,7 @@ use codex_app_server_protocol::WindowsSandboxSetupCompletedNotification;
|
||||
use codex_app_server_protocol::WindowsSandboxSetupMode;
|
||||
use codex_app_server_protocol::WindowsSandboxSetupStartParams;
|
||||
use codex_app_server_protocol::WindowsSandboxSetupStartResponse;
|
||||
use codex_app_server_protocol::build_thread_history_from_rollout_items;
|
||||
use codex_app_server_protocol::build_turns_from_rollout_items;
|
||||
use codex_backend_client::Client as BackendClient;
|
||||
use codex_chatgpt::connectors;
|
||||
@@ -3269,6 +3270,7 @@ impl CodexMessageProcessor {
|
||||
let ThreadForkParams {
|
||||
thread_id,
|
||||
path,
|
||||
fork_after_turn_id,
|
||||
model,
|
||||
model_provider,
|
||||
cwd,
|
||||
@@ -3382,21 +3384,71 @@ impl CodexMessageProcessor {
|
||||
};
|
||||
|
||||
let fallback_model_provider = config.model_provider_id.clone();
|
||||
let anchored_fork_history = if let Some(fork_after_turn_id) = fork_after_turn_id.as_deref()
|
||||
{
|
||||
let source_items = match read_rollout_items_from_rollout(rollout_path.as_path()).await {
|
||||
Ok(items) => items,
|
||||
Err(err) => {
|
||||
let (code, message) = match err.kind() {
|
||||
std::io::ErrorKind::NotFound => (
|
||||
INVALID_REQUEST_ERROR_CODE,
|
||||
format!("failed to load rollout `{}`: {err}", rollout_path.display()),
|
||||
),
|
||||
_ => (
|
||||
INTERNAL_ERROR_CODE,
|
||||
format!(
|
||||
"failed to read rollout `{}` for thread fork: {err}",
|
||||
rollout_path.display()
|
||||
),
|
||||
),
|
||||
};
|
||||
let error = JSONRPCErrorError {
|
||||
code,
|
||||
message,
|
||||
data: None,
|
||||
};
|
||||
self.outgoing.send_error(request_id, error).await;
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
match resolve_thread_fork_history_from_anchor(
|
||||
source_items.as_slice(),
|
||||
fork_after_turn_id,
|
||||
) {
|
||||
Ok(history_items) => Some(InitialHistory::Forked(history_items)),
|
||||
Err(message) => {
|
||||
self.send_invalid_request_error(request_id, message).await;
|
||||
return;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
let NewThread {
|
||||
thread_id,
|
||||
session_configured,
|
||||
..
|
||||
} = match self
|
||||
.thread_manager
|
||||
.fork_thread(
|
||||
usize::MAX,
|
||||
config,
|
||||
rollout_path.clone(),
|
||||
persist_extended_history,
|
||||
)
|
||||
.await
|
||||
{
|
||||
} = match if let Some(initial_history) = anchored_fork_history {
|
||||
self.thread_manager
|
||||
.resume_thread_with_history(
|
||||
config,
|
||||
initial_history,
|
||||
self.auth_manager.clone(),
|
||||
persist_extended_history,
|
||||
)
|
||||
.await
|
||||
} else {
|
||||
self.thread_manager
|
||||
.fork_thread(
|
||||
usize::MAX,
|
||||
config,
|
||||
rollout_path.clone(),
|
||||
persist_extended_history,
|
||||
)
|
||||
.await
|
||||
} {
|
||||
Ok(thread) => thread,
|
||||
Err(err) => {
|
||||
let (code, message) = match err {
|
||||
@@ -6630,6 +6682,99 @@ async fn sync_default_client_residency_requirement(
|
||||
}
|
||||
}
|
||||
|
||||
fn resolve_thread_fork_history_from_anchor(
|
||||
rollout_items: &[RolloutItem],
|
||||
fork_after_turn_id: &str,
|
||||
) -> Result<Vec<RolloutItem>, String> {
|
||||
let history = build_thread_history_from_rollout_items(rollout_items);
|
||||
if history.has_synthetic_turn_ids {
|
||||
return Err(
|
||||
"turn-based forking is not supported for legacy thread history; use full thread/fork"
|
||||
.to_string(),
|
||||
);
|
||||
}
|
||||
|
||||
let Some(target_turn_idx) = history
|
||||
.turns
|
||||
.iter()
|
||||
.position(|turn| turn.id == fork_after_turn_id)
|
||||
else {
|
||||
return Err(format!(
|
||||
"fork turn not found in source thread history: {fork_after_turn_id}"
|
||||
));
|
||||
};
|
||||
let target_turn = &history.turns[target_turn_idx];
|
||||
let has_user_message = target_turn
|
||||
.items
|
||||
.iter()
|
||||
.any(|item| matches!(item, ThreadItem::UserMessage { .. }));
|
||||
let has_agent_message = target_turn
|
||||
.items
|
||||
.iter()
|
||||
.any(|item| matches!(item, ThreadItem::AgentMessage { .. }));
|
||||
|
||||
if matches!(target_turn.status, TurnStatus::InProgress) {
|
||||
return Err("fork turn must be completed/interrupted/failed, not in progress".to_string());
|
||||
}
|
||||
if !has_user_message {
|
||||
return Err("fork turn must contain a user message".to_string());
|
||||
}
|
||||
if !has_agent_message {
|
||||
return Err("fork turn must contain an agent message".to_string());
|
||||
}
|
||||
|
||||
let Some(next_turn) = history.turns.get(target_turn_idx.saturating_add(1)) else {
|
||||
return Ok(rollout_items.to_vec());
|
||||
};
|
||||
let next_turn_id = next_turn.id.as_str();
|
||||
|
||||
let next_turn_started_idx = rollout_items
|
||||
.iter()
|
||||
.position(|item| {
|
||||
matches!(
|
||||
item,
|
||||
RolloutItem::EventMsg(EventMsg::TurnStarted(payload))
|
||||
if payload.turn_id == next_turn_id
|
||||
)
|
||||
})
|
||||
.ok_or_else(|| {
|
||||
format!("failed to locate boundary after fork turn: missing next turn `{next_turn_id}`")
|
||||
})?;
|
||||
|
||||
let target_terminal_idx = rollout_items.iter().rposition(|item| {
|
||||
matches!(
|
||||
item,
|
||||
RolloutItem::EventMsg(EventMsg::TurnComplete(payload))
|
||||
if payload.turn_id == fork_after_turn_id
|
||||
) || matches!(
|
||||
item,
|
||||
RolloutItem::EventMsg(EventMsg::TurnAborted(payload))
|
||||
if payload.turn_id.as_deref() == Some(fork_after_turn_id)
|
||||
)
|
||||
});
|
||||
|
||||
let next_user_boundary_idx = target_terminal_idx
|
||||
.and_then(|terminal_idx| {
|
||||
rollout_items
|
||||
.iter()
|
||||
.enumerate()
|
||||
.skip(terminal_idx.saturating_add(1))
|
||||
.find_map(|(idx, item)| is_user_turn_rollout_boundary(item).then_some(idx))
|
||||
})
|
||||
.unwrap_or(usize::MAX);
|
||||
let cut_idx = next_turn_started_idx.min(next_user_boundary_idx);
|
||||
|
||||
Ok(rollout_items[..cut_idx].to_vec())
|
||||
}
|
||||
|
||||
fn is_user_turn_rollout_boundary(item: &RolloutItem) -> bool {
|
||||
match item {
|
||||
RolloutItem::ResponseItem(ResponseItem::Message { role, .. }) => role == "user",
|
||||
RolloutItem::EventMsg(EventMsg::UserMessage(_)) => true,
|
||||
_ => false,
|
||||
}
|
||||
}
|
||||
|
||||
/// Derive the effective [`Config`] by layering three override sources.
|
||||
///
|
||||
/// Precedence (lowest to highest):
|
||||
|
||||
@@ -2,6 +2,7 @@ use anyhow::Result;
|
||||
use app_test_support::McpProcess;
|
||||
use app_test_support::create_fake_rollout;
|
||||
use app_test_support::create_mock_responses_server_repeating_assistant;
|
||||
use app_test_support::rollout_path;
|
||||
use app_test_support::to_response;
|
||||
use codex_app_server_protocol::JSONRPCError;
|
||||
use codex_app_server_protocol::JSONRPCNotification;
|
||||
@@ -17,11 +18,26 @@ use codex_app_server_protocol::ThreadStartedNotification;
|
||||
use codex_app_server_protocol::ThreadStatus;
|
||||
use codex_app_server_protocol::TurnStatus;
|
||||
use codex_app_server_protocol::UserInput;
|
||||
use codex_protocol::ThreadId;
|
||||
use codex_protocol::models::MessagePhase;
|
||||
use codex_protocol::protocol::AgentMessageEvent;
|
||||
use codex_protocol::protocol::EventMsg;
|
||||
use codex_protocol::protocol::RolloutItem;
|
||||
use codex_protocol::protocol::SessionMeta;
|
||||
use codex_protocol::protocol::SessionMetaLine;
|
||||
use codex_protocol::protocol::TurnCompleteEvent;
|
||||
use codex_protocol::protocol::TurnStartedEvent;
|
||||
use codex_protocol::protocol::UserMessageEvent;
|
||||
use pretty_assertions::assert_eq;
|
||||
use serde_json::Value;
|
||||
use serde_json::json;
|
||||
use std::fs;
|
||||
use std::fs::FileTimes;
|
||||
use std::path::Path;
|
||||
use std::path::PathBuf;
|
||||
use tempfile::TempDir;
|
||||
use tokio::time::timeout;
|
||||
use uuid::Uuid;
|
||||
|
||||
const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
|
||||
|
||||
@@ -191,6 +207,235 @@ async fn thread_fork_rejects_unmaterialized_thread() -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn thread_fork_can_fork_after_selected_turn() -> Result<()> {
|
||||
let server = create_mock_responses_server_repeating_assistant("Done").await;
|
||||
let codex_home = TempDir::new()?;
|
||||
create_config_toml(codex_home.path(), &server.uri())?;
|
||||
|
||||
let conversation_id = create_fake_rollout_with_explicit_turns(
|
||||
codex_home.path(),
|
||||
"2025-01-05T12-00-01",
|
||||
"2025-01-05T12:00:01Z",
|
||||
&[
|
||||
ExplicitTurnFixture {
|
||||
turn_id: "turn-1",
|
||||
user_text: Some("u1"),
|
||||
agent_text: Some("a1"),
|
||||
state: FixtureTurnState::Completed,
|
||||
},
|
||||
ExplicitTurnFixture {
|
||||
turn_id: "turn-2",
|
||||
user_text: Some("u2"),
|
||||
agent_text: Some("a2"),
|
||||
state: FixtureTurnState::Completed,
|
||||
},
|
||||
ExplicitTurnFixture {
|
||||
// Explicit turn with no user message exercises the exact cut-after-turn logic.
|
||||
turn_id: "turn-3-empty",
|
||||
user_text: None,
|
||||
agent_text: None,
|
||||
state: FixtureTurnState::Completed,
|
||||
},
|
||||
],
|
||||
)?;
|
||||
|
||||
let mut mcp = McpProcess::new(codex_home.path()).await?;
|
||||
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
|
||||
|
||||
let fork_cwd = codex_home.path().join("fork-worktree");
|
||||
fs::create_dir_all(&fork_cwd)?;
|
||||
|
||||
let fork_id = mcp
|
||||
.send_thread_fork_request(ThreadForkParams {
|
||||
thread_id: conversation_id,
|
||||
fork_after_turn_id: Some("turn-2".to_string()),
|
||||
cwd: Some(fork_cwd.display().to_string()),
|
||||
..Default::default()
|
||||
})
|
||||
.await?;
|
||||
let fork_resp: JSONRPCResponse = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(fork_id)),
|
||||
)
|
||||
.await??;
|
||||
let ThreadForkResponse { thread, cwd, .. } = to_response::<ThreadForkResponse>(fork_resp)?;
|
||||
|
||||
assert_eq!(cwd, fork_cwd);
|
||||
assert_eq!(thread.cwd, fork_cwd);
|
||||
assert_eq!(thread.turns.len(), 2, "later turns should be truncated");
|
||||
assert_eq!(thread.turns[0].id, "turn-1");
|
||||
assert_eq!(thread.turns[1].id, "turn-2");
|
||||
assert_turn_user_text(&thread.turns[0].items, "u1");
|
||||
assert_turn_user_text(&thread.turns[1].items, "u2");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn thread_fork_rejects_unknown_turn_anchor() -> Result<()> {
|
||||
let server = create_mock_responses_server_repeating_assistant("Done").await;
|
||||
let codex_home = TempDir::new()?;
|
||||
create_config_toml(codex_home.path(), &server.uri())?;
|
||||
|
||||
let conversation_id = create_fake_rollout_with_explicit_turns(
|
||||
codex_home.path(),
|
||||
"2025-01-05T12-00-02",
|
||||
"2025-01-05T12:00:02Z",
|
||||
&[ExplicitTurnFixture {
|
||||
turn_id: "turn-1",
|
||||
user_text: Some("u1"),
|
||||
agent_text: Some("a1"),
|
||||
state: FixtureTurnState::Completed,
|
||||
}],
|
||||
)?;
|
||||
|
||||
let mut mcp = McpProcess::new(codex_home.path()).await?;
|
||||
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
|
||||
|
||||
let fork_id = mcp
|
||||
.send_thread_fork_request(ThreadForkParams {
|
||||
thread_id: conversation_id,
|
||||
fork_after_turn_id: Some("missing-turn".to_string()),
|
||||
..Default::default()
|
||||
})
|
||||
.await?;
|
||||
let fork_err: JSONRPCError = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_error_message(RequestId::Integer(fork_id)),
|
||||
)
|
||||
.await??;
|
||||
|
||||
assert!(
|
||||
fork_err.error.message.contains("fork turn not found"),
|
||||
"unexpected fork error: {}",
|
||||
fork_err.error.message
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn thread_fork_rejects_legacy_turn_anchor() -> Result<()> {
|
||||
let server = create_mock_responses_server_repeating_assistant("Done").await;
|
||||
let codex_home = TempDir::new()?;
|
||||
create_config_toml(codex_home.path(), &server.uri())?;
|
||||
|
||||
let conversation_id = create_fake_rollout(
|
||||
codex_home.path(),
|
||||
"2025-01-05T12-00-03",
|
||||
"2025-01-05T12:00:03Z",
|
||||
"legacy preview",
|
||||
Some("mock_provider"),
|
||||
None,
|
||||
)?;
|
||||
|
||||
let mut mcp = McpProcess::new(codex_home.path()).await?;
|
||||
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
|
||||
|
||||
let fork_id = mcp
|
||||
.send_thread_fork_request(ThreadForkParams {
|
||||
thread_id: conversation_id,
|
||||
fork_after_turn_id: Some("legacy-turn".to_string()),
|
||||
..Default::default()
|
||||
})
|
||||
.await?;
|
||||
let fork_err: JSONRPCError = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_error_message(RequestId::Integer(fork_id)),
|
||||
)
|
||||
.await??;
|
||||
|
||||
assert!(
|
||||
fork_err.error.message.contains("legacy thread history"),
|
||||
"unexpected fork error: {}",
|
||||
fork_err.error.message
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn thread_fork_rejects_in_progress_turn_anchor() -> Result<()> {
|
||||
let server = create_mock_responses_server_repeating_assistant("Done").await;
|
||||
let codex_home = TempDir::new()?;
|
||||
create_config_toml(codex_home.path(), &server.uri())?;
|
||||
|
||||
let conversation_id = create_fake_rollout_with_explicit_turns(
|
||||
codex_home.path(),
|
||||
"2025-01-05T12-00-04",
|
||||
"2025-01-05T12:00:04Z",
|
||||
&[ExplicitTurnFixture {
|
||||
turn_id: "turn-in-progress",
|
||||
user_text: Some("u1"),
|
||||
agent_text: Some("a1"),
|
||||
state: FixtureTurnState::InProgress,
|
||||
}],
|
||||
)?;
|
||||
|
||||
let mut mcp = McpProcess::new(codex_home.path()).await?;
|
||||
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
|
||||
|
||||
let fork_id = mcp
|
||||
.send_thread_fork_request(ThreadForkParams {
|
||||
thread_id: conversation_id,
|
||||
fork_after_turn_id: Some("turn-in-progress".to_string()),
|
||||
..Default::default()
|
||||
})
|
||||
.await?;
|
||||
let fork_err: JSONRPCError = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_error_message(RequestId::Integer(fork_id)),
|
||||
)
|
||||
.await??;
|
||||
|
||||
assert!(
|
||||
fork_err.error.message.contains("not in progress"),
|
||||
"unexpected fork error: {}",
|
||||
fork_err.error.message
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn thread_fork_rejects_turn_anchor_without_agent_message() -> Result<()> {
|
||||
let server = create_mock_responses_server_repeating_assistant("Done").await;
|
||||
let codex_home = TempDir::new()?;
|
||||
create_config_toml(codex_home.path(), &server.uri())?;
|
||||
|
||||
let conversation_id = create_fake_rollout_with_explicit_turns(
|
||||
codex_home.path(),
|
||||
"2025-01-05T12-00-05",
|
||||
"2025-01-05T12:00:05Z",
|
||||
&[ExplicitTurnFixture {
|
||||
turn_id: "turn-no-agent",
|
||||
user_text: Some("u1"),
|
||||
agent_text: None,
|
||||
state: FixtureTurnState::Completed,
|
||||
}],
|
||||
)?;
|
||||
|
||||
let mut mcp = McpProcess::new(codex_home.path()).await?;
|
||||
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
|
||||
|
||||
let fork_id = mcp
|
||||
.send_thread_fork_request(ThreadForkParams {
|
||||
thread_id: conversation_id,
|
||||
fork_after_turn_id: Some("turn-no-agent".to_string()),
|
||||
..Default::default()
|
||||
})
|
||||
.await?;
|
||||
let fork_err: JSONRPCError = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_error_message(RequestId::Integer(fork_id)),
|
||||
)
|
||||
.await??;
|
||||
|
||||
assert!(
|
||||
fork_err.error.message.contains("agent message"),
|
||||
"unexpected fork error: {}",
|
||||
fork_err.error.message
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Helper to create a config.toml pointing at the mock model server.
|
||||
fn create_config_toml(codex_home: &Path, server_uri: &str) -> std::io::Result<()> {
|
||||
let config_toml = codex_home.join("config.toml");
|
||||
@@ -214,3 +459,161 @@ stream_max_retries = 0
|
||||
),
|
||||
)
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy)]
|
||||
enum FixtureTurnState {
|
||||
Completed,
|
||||
InProgress,
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy)]
|
||||
struct ExplicitTurnFixture<'a> {
|
||||
turn_id: &'a str,
|
||||
user_text: Option<&'a str>,
|
||||
agent_text: Option<&'a str>,
|
||||
state: FixtureTurnState,
|
||||
}
|
||||
|
||||
fn assert_turn_user_text(items: &[ThreadItem], expected: &str) {
|
||||
match items.first() {
|
||||
Some(ThreadItem::UserMessage { content, .. }) => assert_eq!(
|
||||
content,
|
||||
&vec![UserInput::Text {
|
||||
text: expected.to_string(),
|
||||
text_elements: Vec::new(),
|
||||
}]
|
||||
),
|
||||
other => panic!("expected first turn item to be a user message, got {other:?}"),
|
||||
}
|
||||
}
|
||||
|
||||
fn create_fake_rollout_with_explicit_turns(
|
||||
codex_home: &Path,
|
||||
filename_ts: &str,
|
||||
meta_rfc3339: &str,
|
||||
turns: &[ExplicitTurnFixture<'_>],
|
||||
) -> Result<String> {
|
||||
let uuid = Uuid::new_v4();
|
||||
let uuid_str = uuid.to_string();
|
||||
let conversation_id = ThreadId::from_string(&uuid_str)?;
|
||||
let file_path = rollout_path(codex_home, filename_ts, &uuid_str);
|
||||
let dir = file_path
|
||||
.parent()
|
||||
.ok_or_else(|| anyhow::anyhow!("missing rollout parent directory"))?;
|
||||
fs::create_dir_all(dir)?;
|
||||
|
||||
let meta = SessionMeta {
|
||||
id: conversation_id,
|
||||
forked_from_id: None,
|
||||
timestamp: meta_rfc3339.to_string(),
|
||||
cwd: PathBuf::from("/"),
|
||||
originator: "codex".to_string(),
|
||||
cli_version: "0.0.0".to_string(),
|
||||
source: codex_protocol::protocol::SessionSource::Cli,
|
||||
agent_nickname: None,
|
||||
agent_role: None,
|
||||
model_provider: Some("mock_provider".to_string()),
|
||||
base_instructions: None,
|
||||
dynamic_tools: None,
|
||||
};
|
||||
let mut lines = vec![rollout_line(
|
||||
meta_rfc3339,
|
||||
RolloutItem::SessionMeta(SessionMetaLine { meta, git: None }),
|
||||
)?];
|
||||
|
||||
for (idx, turn) in turns.iter().enumerate() {
|
||||
if let Some(user_text) = turn.user_text {
|
||||
lines.push(
|
||||
json!({
|
||||
"timestamp": meta_rfc3339,
|
||||
"type":"response_item",
|
||||
"payload": {
|
||||
"type":"message",
|
||||
"role":"user",
|
||||
"content":[{"type":"input_text","text": user_text}]
|
||||
}
|
||||
})
|
||||
.to_string(),
|
||||
);
|
||||
}
|
||||
|
||||
lines.push(rollout_line(
|
||||
meta_rfc3339,
|
||||
RolloutItem::EventMsg(EventMsg::TurnStarted(TurnStartedEvent {
|
||||
turn_id: turn.turn_id.to_string(),
|
||||
model_context_window: Some(128_000),
|
||||
collaboration_mode_kind: Default::default(),
|
||||
})),
|
||||
)?);
|
||||
if let Some(user_text) = turn.user_text {
|
||||
lines.push(rollout_line(
|
||||
meta_rfc3339,
|
||||
RolloutItem::EventMsg(EventMsg::UserMessage(UserMessageEvent {
|
||||
message: user_text.to_string(),
|
||||
images: None,
|
||||
local_images: Vec::new(),
|
||||
text_elements: Vec::new(),
|
||||
})),
|
||||
)?);
|
||||
}
|
||||
if let Some(agent_text) = turn.agent_text {
|
||||
lines.push(rollout_line(
|
||||
meta_rfc3339,
|
||||
RolloutItem::EventMsg(EventMsg::AgentMessage(AgentMessageEvent {
|
||||
message: agent_text.to_string(),
|
||||
phase: Some(MessagePhase::FinalAnswer),
|
||||
})),
|
||||
)?);
|
||||
}
|
||||
if matches!(turn.state, FixtureTurnState::Completed) {
|
||||
let last_agent_message = turn.agent_text.map(str::to_string);
|
||||
lines.push(rollout_line(
|
||||
meta_rfc3339,
|
||||
RolloutItem::EventMsg(EventMsg::TurnComplete(TurnCompleteEvent {
|
||||
turn_id: turn.turn_id.to_string(),
|
||||
last_agent_message,
|
||||
})),
|
||||
)?);
|
||||
}
|
||||
|
||||
if idx == 0 && turn.agent_text.is_some() {
|
||||
lines.push(
|
||||
json!({
|
||||
"timestamp": meta_rfc3339,
|
||||
"type":"response_item",
|
||||
"payload": {
|
||||
"type":"message",
|
||||
"role":"assistant",
|
||||
"content":[{"type":"output_text","text": turn.agent_text.unwrap_or("")}]
|
||||
}
|
||||
})
|
||||
.to_string(),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
fs::write(&file_path, lines.join("\n") + "\n")?;
|
||||
let parsed = chrono::DateTime::parse_from_rfc3339(meta_rfc3339)?.with_timezone(&chrono::Utc);
|
||||
let times = FileTimes::new().set_modified(parsed.into());
|
||||
fs::OpenOptions::new()
|
||||
.append(true)
|
||||
.open(&file_path)?
|
||||
.set_times(times)?;
|
||||
Ok(uuid_str)
|
||||
}
|
||||
|
||||
fn rollout_line(timestamp: &str, item: RolloutItem) -> Result<String> {
|
||||
let mut line = serde_json::Map::new();
|
||||
line.insert(
|
||||
"timestamp".to_string(),
|
||||
Value::String(timestamp.to_string()),
|
||||
);
|
||||
|
||||
let item_value = serde_json::to_value(item)?;
|
||||
let Value::Object(item_map) = item_value else {
|
||||
anyhow::bail!("rollout item did not serialize as an object");
|
||||
};
|
||||
line.extend(item_map);
|
||||
|
||||
Ok(Value::Object(line).to_string())
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user