mirror of
https://github.com/openai/codex.git
synced 2026-02-27 03:03:47 +00:00
Compare commits
7 Commits
main
...
codex/turn
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
baf0a90cae | ||
|
|
d7602c3081 | ||
|
|
5d91f2b2c2 | ||
|
|
d52f1a5d37 | ||
|
|
e0ab8c645c | ||
|
|
d2f8b95bbf | ||
|
|
8b57fdd973 |
@@ -19,6 +19,15 @@ export type ThreadForkParams = {threadId: string, /**
|
||||
* If specified, the thread_id param will be ignored.
|
||||
*/
|
||||
path?: string | null, /**
|
||||
* [UNSTABLE] Fork after the specified historical turn (inclusive).
|
||||
* When omitted, the full thread history is copied.
|
||||
*
|
||||
* The value should come from `thread/read(includeTurns=true)` for the same source thread.
|
||||
* Requests are rejected for legacy histories whose replayed turn ids are not stable, for
|
||||
* in-progress turns, and for turns that do not yet contain both a user message and an agent
|
||||
* response.
|
||||
*/
|
||||
forkAfterTurnId?: string | null, /**
|
||||
* Configuration overrides for the forked thread, if any.
|
||||
*/
|
||||
model?: string | null, modelProvider?: string | null, cwd?: string | null, approvalPolicy?: AskForApproval | null, sandbox?: SandboxMode | null, config?: { [key in string]?: JsonValue } | null, baseInstructions?: string | null, developerInstructions?: string | null, /**
|
||||
|
||||
@@ -1,3 +1,16 @@
|
||||
//! Reconstruct app-server turn history from persisted rollout items.
|
||||
//!
|
||||
//! This module translates the low-level rollout log into the higher-level `Turn` / `ThreadItem`
|
||||
//! structures returned by app-server APIs. The same reducer is used both for replaying stored
|
||||
//! rollouts and for incrementally rebuilding the current turn from live `EventMsg` values.
|
||||
//! Callers must feed items in recorded order; reordering lifecycle events can silently attach work
|
||||
//! to the wrong turn while still producing a superficially valid transcript.
|
||||
//!
|
||||
//! Modern rollouts persist explicit turn lifecycle events, which means replay can preserve stable
|
||||
//! turn ids. Legacy rollouts may only contain message/tool events, so replay has to synthesize
|
||||
//! turn ids while rebuilding them. Consumers that need durable turn anchors, such as turn-based
|
||||
//! `thread/fork`, should reject those synthesized ids rather than treating them as stable.
|
||||
|
||||
use crate::protocol::v2::CollabAgentState;
|
||||
use crate::protocol::v2::CollabAgentTool;
|
||||
use crate::protocol::v2::CollabAgentToolCallStatus;
|
||||
@@ -57,7 +70,10 @@ use codex_protocol::protocol::PatchApplyStatus as CorePatchApplyStatus;
|
||||
/// Convert persisted [`RolloutItem`] entries into a sequence of [`Turn`] values.
|
||||
///
|
||||
/// When available, this uses `TurnContext.turn_id` as the canonical turn id so
|
||||
/// resumed/rebuilt thread history preserves the original turn identifiers.
|
||||
/// resumed/rebuilt thread history preserves the original turn identifiers. Callers that also need
|
||||
/// to distinguish between stable and replay-synthesized turn ids should drive
|
||||
/// [`ThreadHistoryBuilder`] directly and inspect [`ThreadHistoryBuilder::has_synthetic_turn_ids`]
|
||||
/// before consuming the final turn list.
|
||||
pub fn build_turns_from_rollout_items(items: &[RolloutItem]) -> Vec<Turn> {
|
||||
let mut builder = ThreadHistoryBuilder::new();
|
||||
for item in items {
|
||||
@@ -66,10 +82,19 @@ pub fn build_turns_from_rollout_items(items: &[RolloutItem]) -> Vec<Turn> {
|
||||
builder.finish()
|
||||
}
|
||||
|
||||
/// Stateful reducer that groups rollout lifecycle items into app-server turns.
|
||||
///
|
||||
/// The builder owns the in-progress turn boundary while replay is in flight, then emits the
|
||||
/// finalized `Turn` list once all items have been applied. It also tracks whether replay had to
|
||||
/// synthesize any turn ids so app-server can share the same reduction rules for stored history
|
||||
/// and live notifications without exposing that metadata in every convenience return type.
|
||||
/// Reusing a builder across unrelated transcripts without calling [`Self::reset`] can leak late
|
||||
/// events from one transcript into the next.
|
||||
pub struct ThreadHistoryBuilder {
|
||||
turns: Vec<Turn>,
|
||||
current_turn: Option<PendingTurn>,
|
||||
next_item_index: i64,
|
||||
has_synthetic_turn_ids: bool,
|
||||
}
|
||||
|
||||
impl Default for ThreadHistoryBuilder {
|
||||
@@ -79,23 +104,47 @@ impl Default for ThreadHistoryBuilder {
|
||||
}
|
||||
|
||||
impl ThreadHistoryBuilder {
|
||||
/// Create an empty reducer with no active turn.
|
||||
///
|
||||
/// The builder expects to receive the rollout from the beginning of a transcript, in order.
|
||||
/// Starting midway through a rollout can still produce `Turn` values, but turn boundaries and
|
||||
/// synthetic-id detection may no longer describe the actual persisted history.
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
turns: Vec::new(),
|
||||
current_turn: None,
|
||||
next_item_index: 1,
|
||||
has_synthetic_turn_ids: false,
|
||||
}
|
||||
}
|
||||
|
||||
/// Clear all accumulated turns and return the builder to its initial state.
|
||||
pub fn reset(&mut self) {
|
||||
*self = Self::new();
|
||||
}
|
||||
|
||||
pub fn finish(mut self) -> Vec<Turn> {
|
||||
self.finish_current_turn();
|
||||
self.turns
|
||||
/// Finalize replay and return only the reconstructed turns.
|
||||
pub fn finish(self) -> Vec<Turn> {
|
||||
let mut this = self;
|
||||
this.finish_current_turn();
|
||||
this.turns
|
||||
}
|
||||
|
||||
/// Return whether replay had to synthesize any turn ids so far.
|
||||
///
|
||||
/// This becomes true when replay encounters legacy history without explicit turn lifecycle
|
||||
/// events and must mint replacement ids while rebuilding turns. Callers that want to use turn
|
||||
/// ids as stable anchors should check this before trusting replayed ids.
|
||||
pub fn has_synthetic_turn_ids(&self) -> bool {
|
||||
self.has_synthetic_turn_ids
|
||||
}
|
||||
|
||||
/// Return the current turn view without consuming the builder.
|
||||
///
|
||||
/// When a turn is actively streaming, this returns that in-progress turn. Otherwise it falls
|
||||
/// back to the most recently completed turn. This is useful for live UI updates, but a caller
|
||||
/// that treats the result as committed persisted history can accidentally display incomplete
|
||||
/// state after a partially applied event stream.
|
||||
pub fn active_turn_snapshot(&self) -> Option<Turn> {
|
||||
self.current_turn
|
||||
.as_ref()
|
||||
@@ -103,6 +152,7 @@ impl ThreadHistoryBuilder {
|
||||
.or_else(|| self.turns.last().cloned())
|
||||
}
|
||||
|
||||
/// Return whether replay is currently inside an unfinished turn boundary.
|
||||
pub fn has_active_turn(&self) -> bool {
|
||||
self.current_turn.is_some()
|
||||
}
|
||||
@@ -169,6 +219,12 @@ impl ThreadHistoryBuilder {
|
||||
}
|
||||
}
|
||||
|
||||
/// Apply one persisted rollout line to the reducer.
|
||||
///
|
||||
/// Unlike [`Self::handle_event`], this accepts the outer `RolloutItem` wrapper so replay can
|
||||
/// preserve rollback markers and explicit turn boundaries exactly as they were recorded on
|
||||
/// disk. Feed items in file order; replaying a suffix or shuffling lines can still produce
|
||||
/// syntactically valid turns while silently attaching items to the wrong exchange.
|
||||
pub fn handle_rollout_item(&mut self, item: &RolloutItem) {
|
||||
match item {
|
||||
RolloutItem::EventMsg(event) => self.handle_event(event),
|
||||
@@ -861,8 +917,15 @@ impl ThreadHistoryBuilder {
|
||||
}
|
||||
|
||||
fn new_turn(&mut self, id: Option<String>) -> PendingTurn {
|
||||
let id = match id {
|
||||
Some(id) => id,
|
||||
None => {
|
||||
self.has_synthetic_turn_ids = true;
|
||||
Uuid::now_v7().to_string()
|
||||
}
|
||||
};
|
||||
PendingTurn {
|
||||
id: id.unwrap_or_else(|| Uuid::now_v7().to_string()),
|
||||
id,
|
||||
items: Vec::new(),
|
||||
error: None,
|
||||
status: TurnStatus::Completed,
|
||||
@@ -950,6 +1013,10 @@ fn render_review_output_text(output: &ReviewOutputEvent) -> String {
|
||||
}
|
||||
}
|
||||
|
||||
/// Convert core patch-apply change records into the app-server wire format.
|
||||
///
|
||||
/// The returned list is sorted by path so repeated replays of the same patch event stay stable for
|
||||
/// clients and snapshot tests.
|
||||
pub fn convert_patch_changes(
|
||||
changes: &HashMap<std::path::PathBuf, codex_protocol::protocol::FileChange>,
|
||||
) -> Vec<FileUpdateChange> {
|
||||
@@ -1198,6 +1265,66 @@ mod tests {
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn reports_synthetic_turn_ids_for_legacy_history() {
|
||||
let items = vec![
|
||||
RolloutItem::EventMsg(EventMsg::UserMessage(UserMessageEvent {
|
||||
message: "legacy".into(),
|
||||
images: None,
|
||||
text_elements: Vec::new(),
|
||||
local_images: Vec::new(),
|
||||
})),
|
||||
RolloutItem::EventMsg(EventMsg::AgentMessage(AgentMessageEvent {
|
||||
message: "reply".into(),
|
||||
phase: None,
|
||||
})),
|
||||
];
|
||||
|
||||
let mut builder = ThreadHistoryBuilder::new();
|
||||
for item in &items {
|
||||
builder.handle_rollout_item(item);
|
||||
}
|
||||
|
||||
assert!(builder.has_synthetic_turn_ids());
|
||||
assert_eq!(builder.finish().len(), 1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn reports_no_synthetic_turn_ids_when_turn_boundaries_are_explicit() {
|
||||
let turn_id = "turn-explicit";
|
||||
let items = vec![
|
||||
RolloutItem::EventMsg(EventMsg::TurnStarted(TurnStartedEvent {
|
||||
turn_id: turn_id.to_string(),
|
||||
model_context_window: Some(128_000),
|
||||
collaboration_mode_kind: Default::default(),
|
||||
})),
|
||||
RolloutItem::EventMsg(EventMsg::UserMessage(UserMessageEvent {
|
||||
message: "modern".into(),
|
||||
images: None,
|
||||
text_elements: Vec::new(),
|
||||
local_images: Vec::new(),
|
||||
})),
|
||||
RolloutItem::EventMsg(EventMsg::AgentMessage(AgentMessageEvent {
|
||||
message: "reply".into(),
|
||||
phase: None,
|
||||
})),
|
||||
RolloutItem::EventMsg(EventMsg::TurnComplete(TurnCompleteEvent {
|
||||
turn_id: turn_id.to_string(),
|
||||
last_agent_message: Some("reply".into()),
|
||||
})),
|
||||
];
|
||||
|
||||
let mut builder = ThreadHistoryBuilder::new();
|
||||
for item in &items {
|
||||
builder.handle_rollout_item(item);
|
||||
}
|
||||
|
||||
assert!(!builder.has_synthetic_turn_ids());
|
||||
let turns = builder.finish();
|
||||
assert_eq!(turns.len(), 1);
|
||||
assert_eq!(turns[0].id, turn_id);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn ignores_non_plan_item_lifecycle_events() {
|
||||
let turn_id = "turn-1";
|
||||
|
||||
@@ -1869,6 +1869,17 @@ pub struct ThreadForkParams {
|
||||
#[ts(optional = nullable)]
|
||||
pub path: Option<PathBuf>,
|
||||
|
||||
/// [UNSTABLE] Fork after the specified historical turn (inclusive).
|
||||
/// When omitted, the full thread history is copied.
|
||||
///
|
||||
/// The value should come from `thread/read(includeTurns=true)` for the same source thread.
|
||||
/// Requests are rejected for legacy histories whose replayed turn ids are not stable, for
|
||||
/// in-progress turns, and for turns that do not yet contain both a user message and an agent
|
||||
/// response.
|
||||
#[experimental("thread/fork.forkAfterTurnId")]
|
||||
#[ts(optional = nullable)]
|
||||
pub fork_after_turn_id: Option<String>,
|
||||
|
||||
/// Configuration overrides for the forked thread, if any.
|
||||
#[ts(optional = nullable)]
|
||||
pub model: Option<String>,
|
||||
|
||||
@@ -121,7 +121,7 @@ Example with notification opt-out:
|
||||
|
||||
- `thread/start` — create a new thread; emits `thread/started` and auto-subscribes you to turn/item events for that thread.
|
||||
- `thread/resume` — reopen an existing thread by id so subsequent `turn/start` calls append to it.
|
||||
- `thread/fork` — fork an existing thread into a new thread id by copying the stored history; emits `thread/started` and auto-subscribes you to turn/item events for the new thread.
|
||||
- `thread/fork` — fork an existing thread into a new thread id by copying the stored history. Experimental `forkAfterTurnId` lets clients fork from a selected historical turn (modern turn-id-stable histories only); emits `thread/started` and auto-subscribes you to turn/item events for the new thread.
|
||||
- `thread/list` — page through stored rollouts; supports cursor-based pagination and optional `modelProviders`, `sourceKinds`, `archived`, `cwd`, and `searchTerm` filters. Each returned `thread` includes `status` (`ThreadStatus`), defaulting to `notLoaded` when the thread is not currently loaded.
|
||||
- `thread/loaded/list` — list the thread ids currently loaded in memory.
|
||||
- `thread/read` — read a stored thread by id without resuming it; optionally include turns via `includeTurns`. The returned `thread` includes `status` (`ThreadStatus`), defaulting to `notLoaded` when the thread is not currently loaded.
|
||||
@@ -215,7 +215,7 @@ To continue a stored session, call `thread/resume` with the `thread.id` you prev
|
||||
{ "id": 11, "result": { "thread": { "id": "thr_123", … } } }
|
||||
```
|
||||
|
||||
To branch from a stored session, call `thread/fork` with the `thread.id`. This creates a new thread id and emits a `thread/started` notification for it:
|
||||
To branch from a stored session, call `thread/fork` with the `thread.id`. This creates a new thread id and emits a `thread/started` notification for it. To fork from a selected prior turn instead of the full history, pass experimental `forkAfterTurnId`:
|
||||
|
||||
```json
|
||||
{ "method": "thread/fork", "id": 12, "params": { "threadId": "thr_123" } }
|
||||
@@ -223,6 +223,19 @@ To branch from a stored session, call `thread/fork` with the `thread.id`. This c
|
||||
{ "method": "thread/started", "params": { "thread": { … } } }
|
||||
```
|
||||
|
||||
```json
|
||||
{ "method": "thread/fork", "id": 13, "params": {
|
||||
"threadId": "thr_123",
|
||||
"forkAfterTurnId": "turn_abc",
|
||||
"cwd": "/Users/me/project-worktree"
|
||||
} }
|
||||
```
|
||||
|
||||
The `forkAfterTurnId` value should come from `thread/read(includeTurns=true)` for that source
|
||||
thread. Anchored forks are rejected for legacy histories that only have replay-synthesized turn
|
||||
ids, for turns that are still in progress, and for turns that do not yet include both the user's
|
||||
input and the model's response.
|
||||
|
||||
Experimental API: `thread/start`, `thread/resume`, and `thread/fork` accept `persistExtendedHistory: true` to persist a richer subset of ThreadItems for non-lossy history when calling `thread/read`, `thread/resume`, and `thread/fork` later. This does not backfill events that were not persisted previously.
|
||||
|
||||
### Example: List threads (with pagination & filters)
|
||||
|
||||
@@ -131,6 +131,7 @@ use codex_app_server_protocol::ThreadCompactStartParams;
|
||||
use codex_app_server_protocol::ThreadCompactStartResponse;
|
||||
use codex_app_server_protocol::ThreadForkParams;
|
||||
use codex_app_server_protocol::ThreadForkResponse;
|
||||
use codex_app_server_protocol::ThreadHistoryBuilder;
|
||||
use codex_app_server_protocol::ThreadItem;
|
||||
use codex_app_server_protocol::ThreadListParams;
|
||||
use codex_app_server_protocol::ThreadListResponse;
|
||||
@@ -2237,7 +2238,7 @@ impl CodexMessageProcessor {
|
||||
return;
|
||||
};
|
||||
|
||||
let (_, thread) = match self.load_thread(&thread_id).await {
|
||||
let (thread_id, thread) = match self.load_thread(&thread_id).await {
|
||||
Ok(v) => v,
|
||||
Err(error) => {
|
||||
self.outgoing.send_error(request_id, error).await;
|
||||
@@ -2245,17 +2246,54 @@ impl CodexMessageProcessor {
|
||||
}
|
||||
};
|
||||
|
||||
let expected_name = name.clone();
|
||||
|
||||
if let Err(err) = thread.submit(Op::SetThreadName { name }).await {
|
||||
self.send_internal_error(request_id, format!("failed to set thread name: {err}"))
|
||||
.await;
|
||||
return;
|
||||
}
|
||||
|
||||
if let Err(err) = self
|
||||
.wait_for_thread_name_persisted(thread_id, &expected_name)
|
||||
.await
|
||||
{
|
||||
self.send_internal_error(
|
||||
request_id,
|
||||
format!("failed to observe persisted thread name: {err}"),
|
||||
)
|
||||
.await;
|
||||
return;
|
||||
}
|
||||
|
||||
self.outgoing
|
||||
.send_response(request_id, ThreadSetNameResponse {})
|
||||
.await;
|
||||
}
|
||||
|
||||
async fn wait_for_thread_name_persisted(
|
||||
&self,
|
||||
thread_id: ThreadId,
|
||||
expected_name: &str,
|
||||
) -> std::io::Result<()> {
|
||||
let codex_home = self.config.codex_home.clone();
|
||||
tokio::time::timeout(Duration::from_secs(2), async {
|
||||
loop {
|
||||
match find_thread_name_by_id(&codex_home, &thread_id).await? {
|
||||
Some(name) if name == expected_name => return Ok(()),
|
||||
_ => tokio::time::sleep(Duration::from_millis(10)).await,
|
||||
}
|
||||
}
|
||||
})
|
||||
.await
|
||||
.map_err(|_| {
|
||||
std::io::Error::new(
|
||||
std::io::ErrorKind::TimedOut,
|
||||
format!("timed out waiting for thread name {expected_name:?}"),
|
||||
)
|
||||
})?
|
||||
}
|
||||
|
||||
async fn thread_unarchive(
|
||||
&mut self,
|
||||
request_id: ConnectionRequestId,
|
||||
@@ -3324,6 +3362,7 @@ impl CodexMessageProcessor {
|
||||
let ThreadForkParams {
|
||||
thread_id,
|
||||
path,
|
||||
fork_after_turn_id,
|
||||
model,
|
||||
model_provider,
|
||||
cwd,
|
||||
@@ -3437,21 +3476,71 @@ impl CodexMessageProcessor {
|
||||
};
|
||||
|
||||
let fallback_model_provider = config.model_provider_id.clone();
|
||||
let anchored_fork_history = if let Some(fork_after_turn_id) = fork_after_turn_id.as_deref()
|
||||
{
|
||||
let source_items = match read_rollout_items_from_rollout(rollout_path.as_path()).await {
|
||||
Ok(items) => items,
|
||||
Err(err) => {
|
||||
let (code, message) = match err.kind() {
|
||||
std::io::ErrorKind::NotFound => (
|
||||
INVALID_REQUEST_ERROR_CODE,
|
||||
format!("failed to load rollout `{}`: {err}", rollout_path.display()),
|
||||
),
|
||||
_ => (
|
||||
INTERNAL_ERROR_CODE,
|
||||
format!(
|
||||
"failed to read rollout `{}` for thread fork: {err}",
|
||||
rollout_path.display()
|
||||
),
|
||||
),
|
||||
};
|
||||
let error = JSONRPCErrorError {
|
||||
code,
|
||||
message,
|
||||
data: None,
|
||||
};
|
||||
self.outgoing.send_error(request_id, error).await;
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
match resolve_thread_fork_history_from_anchor(
|
||||
source_items.as_slice(),
|
||||
fork_after_turn_id,
|
||||
) {
|
||||
Ok(history_items) => Some(InitialHistory::Forked(history_items)),
|
||||
Err(message) => {
|
||||
self.send_invalid_request_error(request_id, message).await;
|
||||
return;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
let NewThread {
|
||||
thread_id,
|
||||
session_configured,
|
||||
..
|
||||
} = match self
|
||||
.thread_manager
|
||||
.fork_thread(
|
||||
usize::MAX,
|
||||
config,
|
||||
rollout_path.clone(),
|
||||
persist_extended_history,
|
||||
)
|
||||
.await
|
||||
{
|
||||
} = match if let Some(initial_history) = anchored_fork_history {
|
||||
self.thread_manager
|
||||
.resume_thread_with_history(
|
||||
config,
|
||||
initial_history,
|
||||
self.auth_manager.clone(),
|
||||
persist_extended_history,
|
||||
)
|
||||
.await
|
||||
} else {
|
||||
self.thread_manager
|
||||
.fork_thread(
|
||||
usize::MAX,
|
||||
config,
|
||||
rollout_path.clone(),
|
||||
persist_extended_history,
|
||||
)
|
||||
.await
|
||||
} {
|
||||
Ok(thread) => thread,
|
||||
Err(err) => {
|
||||
let (code, message) = match err {
|
||||
@@ -6869,6 +6958,112 @@ async fn sync_default_client_residency_requirement(
|
||||
}
|
||||
}
|
||||
|
||||
/// Return the exact rollout prefix that should seed a turn-anchored `thread/fork`.
|
||||
///
|
||||
/// The selected turn stays in the returned history, while every later turn is removed. The helper
|
||||
/// validates that the anchor is a stable, already-materialized exchange: legacy replay-synthesized
|
||||
/// ids are rejected, the turn must not still be in progress, and the turn must include both a
|
||||
/// user message and an agent response. The cutoff uses both the next explicit `TurnStarted` event
|
||||
/// and the next user-message boundary so the fork cannot accidentally retain trailing non-user
|
||||
/// events that conceptually belong to a later turn.
|
||||
fn resolve_thread_fork_history_from_anchor(
|
||||
rollout_items: &[RolloutItem],
|
||||
fork_after_turn_id: &str,
|
||||
) -> Result<Vec<RolloutItem>, String> {
|
||||
let mut history_builder = ThreadHistoryBuilder::new();
|
||||
for item in rollout_items {
|
||||
history_builder.handle_rollout_item(item);
|
||||
}
|
||||
if history_builder.has_synthetic_turn_ids() {
|
||||
return Err(
|
||||
"turn-based forking is not supported for legacy thread history; use full thread/fork"
|
||||
.to_string(),
|
||||
);
|
||||
}
|
||||
let turns = history_builder.finish();
|
||||
|
||||
let Some(target_turn_idx) = turns.iter().position(|turn| turn.id == fork_after_turn_id) else {
|
||||
return Err(format!(
|
||||
"fork turn not found in source thread history: {fork_after_turn_id}"
|
||||
));
|
||||
};
|
||||
let target_turn = &turns[target_turn_idx];
|
||||
let has_user_message = target_turn
|
||||
.items
|
||||
.iter()
|
||||
.any(|item| matches!(item, ThreadItem::UserMessage { .. }));
|
||||
let has_agent_message = target_turn
|
||||
.items
|
||||
.iter()
|
||||
.any(|item| matches!(item, ThreadItem::AgentMessage { .. }));
|
||||
|
||||
if matches!(target_turn.status, TurnStatus::InProgress) {
|
||||
return Err("fork turn must be completed/interrupted/failed, not in progress".to_string());
|
||||
}
|
||||
if !has_user_message {
|
||||
return Err("fork turn must contain a user message".to_string());
|
||||
}
|
||||
if !has_agent_message {
|
||||
return Err("fork turn must contain an agent message".to_string());
|
||||
}
|
||||
|
||||
let Some(next_turn) = turns.get(target_turn_idx.saturating_add(1)) else {
|
||||
return Ok(rollout_items.to_vec());
|
||||
};
|
||||
let next_turn_id = next_turn.id.as_str();
|
||||
|
||||
let next_turn_started_idx = rollout_items
|
||||
.iter()
|
||||
.position(|item| {
|
||||
matches!(
|
||||
item,
|
||||
RolloutItem::EventMsg(EventMsg::TurnStarted(payload))
|
||||
if payload.turn_id == next_turn_id
|
||||
)
|
||||
})
|
||||
.ok_or_else(|| {
|
||||
format!("failed to locate boundary after fork turn: missing next turn `{next_turn_id}`")
|
||||
})?;
|
||||
|
||||
let target_terminal_idx = rollout_items.iter().rposition(|item| {
|
||||
matches!(
|
||||
item,
|
||||
RolloutItem::EventMsg(EventMsg::TurnComplete(payload))
|
||||
if payload.turn_id == fork_after_turn_id
|
||||
) || matches!(
|
||||
item,
|
||||
RolloutItem::EventMsg(EventMsg::TurnAborted(payload))
|
||||
if payload.turn_id.as_deref() == Some(fork_after_turn_id)
|
||||
)
|
||||
});
|
||||
|
||||
let next_user_boundary_idx = target_terminal_idx
|
||||
.and_then(|terminal_idx| {
|
||||
rollout_items
|
||||
.iter()
|
||||
.enumerate()
|
||||
.skip(terminal_idx.saturating_add(1))
|
||||
.find_map(|(idx, item)| is_user_turn_rollout_boundary(item).then_some(idx))
|
||||
})
|
||||
.unwrap_or(usize::MAX);
|
||||
let cut_idx = next_turn_started_idx.min(next_user_boundary_idx);
|
||||
|
||||
Ok(rollout_items[..cut_idx].to_vec())
|
||||
}
|
||||
|
||||
/// Return whether this rollout item starts a new user turn for truncation purposes.
|
||||
///
|
||||
/// Anchored forks use this as a fallback boundary when explicit turn lifecycle events are present
|
||||
/// but a later turn also emitted standalone user-message markers that should not leak into the
|
||||
/// forked prefix.
|
||||
fn is_user_turn_rollout_boundary(item: &RolloutItem) -> bool {
|
||||
match item {
|
||||
RolloutItem::ResponseItem(ResponseItem::Message { role, .. }) => role == "user",
|
||||
RolloutItem::EventMsg(EventMsg::UserMessage(_)) => true,
|
||||
_ => false,
|
||||
}
|
||||
}
|
||||
|
||||
/// Derive the effective [`Config`] by layering three override sources.
|
||||
///
|
||||
/// Precedence (lowest to highest):
|
||||
|
||||
@@ -372,8 +372,36 @@ async fn list_apps_emits_updates_and_returns_after_both_lists_load() -> Result<(
|
||||
is_enabled: true,
|
||||
}];
|
||||
|
||||
let first_update = read_app_list_updated_notification(&mut mcp).await?;
|
||||
assert_eq!(first_update.data, expected_accessible);
|
||||
let expected_directory = vec![
|
||||
AppInfo {
|
||||
id: "alpha".to_string(),
|
||||
name: "Alpha".to_string(),
|
||||
description: Some("Alpha connector".to_string()),
|
||||
logo_url: Some("https://example.com/alpha.png".to_string()),
|
||||
logo_url_dark: None,
|
||||
distribution_channel: None,
|
||||
branding: alpha_branding.clone(),
|
||||
app_metadata: alpha_app_metadata.clone(),
|
||||
labels: alpha_labels.clone(),
|
||||
install_url: Some("https://chatgpt.com/apps/alpha/alpha".to_string()),
|
||||
is_accessible: false,
|
||||
is_enabled: true,
|
||||
},
|
||||
AppInfo {
|
||||
id: "beta".to_string(),
|
||||
name: "beta".to_string(),
|
||||
description: None,
|
||||
logo_url: None,
|
||||
logo_url_dark: None,
|
||||
distribution_channel: None,
|
||||
branding: None,
|
||||
app_metadata: None,
|
||||
labels: None,
|
||||
install_url: Some("https://chatgpt.com/apps/beta/beta".to_string()),
|
||||
is_accessible: false,
|
||||
is_enabled: true,
|
||||
},
|
||||
];
|
||||
|
||||
let expected_merged = vec![
|
||||
AppInfo {
|
||||
@@ -406,8 +434,26 @@ async fn list_apps_emits_updates_and_returns_after_both_lists_load() -> Result<(
|
||||
},
|
||||
];
|
||||
|
||||
let second_update = read_app_list_updated_notification(&mut mcp).await?;
|
||||
assert_eq!(second_update.data, expected_merged);
|
||||
let mut saw_merged_update = false;
|
||||
for update_idx in 0..2 {
|
||||
let update = read_app_list_updated_notification(&mut mcp).await?;
|
||||
if update.data == expected_accessible || update.data == expected_directory {
|
||||
continue;
|
||||
}
|
||||
if update.data == expected_merged {
|
||||
saw_merged_update = true;
|
||||
continue;
|
||||
}
|
||||
|
||||
panic!(
|
||||
"unexpected app/list/updated payload at update {update_idx}: {:?}",
|
||||
update.data
|
||||
);
|
||||
}
|
||||
assert!(
|
||||
saw_merged_update,
|
||||
"expected a merged app/list/updated payload before the response"
|
||||
);
|
||||
|
||||
let response: JSONRPCResponse = timeout(
|
||||
DEFAULT_TIMEOUT,
|
||||
|
||||
@@ -2,6 +2,7 @@ use anyhow::Result;
|
||||
use app_test_support::McpProcess;
|
||||
use app_test_support::create_fake_rollout;
|
||||
use app_test_support::create_mock_responses_server_repeating_assistant;
|
||||
use app_test_support::rollout_path;
|
||||
use app_test_support::to_response;
|
||||
use codex_app_server_protocol::JSONRPCError;
|
||||
use codex_app_server_protocol::JSONRPCNotification;
|
||||
@@ -17,11 +18,26 @@ use codex_app_server_protocol::ThreadStartedNotification;
|
||||
use codex_app_server_protocol::ThreadStatus;
|
||||
use codex_app_server_protocol::TurnStatus;
|
||||
use codex_app_server_protocol::UserInput;
|
||||
use codex_protocol::ThreadId;
|
||||
use codex_protocol::models::MessagePhase;
|
||||
use codex_protocol::protocol::AgentMessageEvent;
|
||||
use codex_protocol::protocol::EventMsg;
|
||||
use codex_protocol::protocol::RolloutItem;
|
||||
use codex_protocol::protocol::SessionMeta;
|
||||
use codex_protocol::protocol::SessionMetaLine;
|
||||
use codex_protocol::protocol::TurnCompleteEvent;
|
||||
use codex_protocol::protocol::TurnStartedEvent;
|
||||
use codex_protocol::protocol::UserMessageEvent;
|
||||
use pretty_assertions::assert_eq;
|
||||
use serde_json::Value;
|
||||
use serde_json::json;
|
||||
use std::fs;
|
||||
use std::fs::FileTimes;
|
||||
use std::path::Path;
|
||||
use std::path::PathBuf;
|
||||
use tempfile::TempDir;
|
||||
use tokio::time::timeout;
|
||||
use uuid::Uuid;
|
||||
|
||||
const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
|
||||
|
||||
@@ -191,6 +207,235 @@ async fn thread_fork_rejects_unmaterialized_thread() -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn thread_fork_can_fork_after_selected_turn() -> Result<()> {
|
||||
let server = create_mock_responses_server_repeating_assistant("Done").await;
|
||||
let codex_home = TempDir::new()?;
|
||||
create_config_toml(codex_home.path(), &server.uri())?;
|
||||
|
||||
let conversation_id = create_fake_rollout_with_explicit_turns(
|
||||
codex_home.path(),
|
||||
"2025-01-05T12-00-01",
|
||||
"2025-01-05T12:00:01Z",
|
||||
&[
|
||||
ExplicitTurnFixture {
|
||||
turn_id: "turn-1",
|
||||
user_text: Some("u1"),
|
||||
agent_text: Some("a1"),
|
||||
state: FixtureTurnState::Completed,
|
||||
},
|
||||
ExplicitTurnFixture {
|
||||
turn_id: "turn-2",
|
||||
user_text: Some("u2"),
|
||||
agent_text: Some("a2"),
|
||||
state: FixtureTurnState::Completed,
|
||||
},
|
||||
ExplicitTurnFixture {
|
||||
// Explicit turn with no user message exercises the exact cut-after-turn logic.
|
||||
turn_id: "turn-3-empty",
|
||||
user_text: None,
|
||||
agent_text: None,
|
||||
state: FixtureTurnState::Completed,
|
||||
},
|
||||
],
|
||||
)?;
|
||||
|
||||
let mut mcp = McpProcess::new(codex_home.path()).await?;
|
||||
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
|
||||
|
||||
let fork_cwd = codex_home.path().join("fork-worktree");
|
||||
fs::create_dir_all(&fork_cwd)?;
|
||||
|
||||
let fork_id = mcp
|
||||
.send_thread_fork_request(ThreadForkParams {
|
||||
thread_id: conversation_id,
|
||||
fork_after_turn_id: Some("turn-2".to_string()),
|
||||
cwd: Some(fork_cwd.display().to_string()),
|
||||
..Default::default()
|
||||
})
|
||||
.await?;
|
||||
let fork_resp: JSONRPCResponse = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(fork_id)),
|
||||
)
|
||||
.await??;
|
||||
let ThreadForkResponse { thread, cwd, .. } = to_response::<ThreadForkResponse>(fork_resp)?;
|
||||
|
||||
assert_eq!(cwd, fork_cwd);
|
||||
assert_eq!(thread.cwd, fork_cwd);
|
||||
assert_eq!(thread.turns.len(), 2, "later turns should be truncated");
|
||||
assert_eq!(thread.turns[0].id, "turn-1");
|
||||
assert_eq!(thread.turns[1].id, "turn-2");
|
||||
assert_turn_user_text(&thread.turns[0].items, "u1");
|
||||
assert_turn_user_text(&thread.turns[1].items, "u2");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn thread_fork_rejects_unknown_turn_anchor() -> Result<()> {
|
||||
let server = create_mock_responses_server_repeating_assistant("Done").await;
|
||||
let codex_home = TempDir::new()?;
|
||||
create_config_toml(codex_home.path(), &server.uri())?;
|
||||
|
||||
let conversation_id = create_fake_rollout_with_explicit_turns(
|
||||
codex_home.path(),
|
||||
"2025-01-05T12-00-02",
|
||||
"2025-01-05T12:00:02Z",
|
||||
&[ExplicitTurnFixture {
|
||||
turn_id: "turn-1",
|
||||
user_text: Some("u1"),
|
||||
agent_text: Some("a1"),
|
||||
state: FixtureTurnState::Completed,
|
||||
}],
|
||||
)?;
|
||||
|
||||
let mut mcp = McpProcess::new(codex_home.path()).await?;
|
||||
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
|
||||
|
||||
let fork_id = mcp
|
||||
.send_thread_fork_request(ThreadForkParams {
|
||||
thread_id: conversation_id,
|
||||
fork_after_turn_id: Some("missing-turn".to_string()),
|
||||
..Default::default()
|
||||
})
|
||||
.await?;
|
||||
let fork_err: JSONRPCError = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_error_message(RequestId::Integer(fork_id)),
|
||||
)
|
||||
.await??;
|
||||
|
||||
assert!(
|
||||
fork_err.error.message.contains("fork turn not found"),
|
||||
"unexpected fork error: {}",
|
||||
fork_err.error.message
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn thread_fork_rejects_legacy_turn_anchor() -> Result<()> {
|
||||
let server = create_mock_responses_server_repeating_assistant("Done").await;
|
||||
let codex_home = TempDir::new()?;
|
||||
create_config_toml(codex_home.path(), &server.uri())?;
|
||||
|
||||
let conversation_id = create_fake_rollout(
|
||||
codex_home.path(),
|
||||
"2025-01-05T12-00-03",
|
||||
"2025-01-05T12:00:03Z",
|
||||
"legacy preview",
|
||||
Some("mock_provider"),
|
||||
None,
|
||||
)?;
|
||||
|
||||
let mut mcp = McpProcess::new(codex_home.path()).await?;
|
||||
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
|
||||
|
||||
let fork_id = mcp
|
||||
.send_thread_fork_request(ThreadForkParams {
|
||||
thread_id: conversation_id,
|
||||
fork_after_turn_id: Some("legacy-turn".to_string()),
|
||||
..Default::default()
|
||||
})
|
||||
.await?;
|
||||
let fork_err: JSONRPCError = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_error_message(RequestId::Integer(fork_id)),
|
||||
)
|
||||
.await??;
|
||||
|
||||
assert!(
|
||||
fork_err.error.message.contains("legacy thread history"),
|
||||
"unexpected fork error: {}",
|
||||
fork_err.error.message
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn thread_fork_rejects_in_progress_turn_anchor() -> Result<()> {
|
||||
let server = create_mock_responses_server_repeating_assistant("Done").await;
|
||||
let codex_home = TempDir::new()?;
|
||||
create_config_toml(codex_home.path(), &server.uri())?;
|
||||
|
||||
let conversation_id = create_fake_rollout_with_explicit_turns(
|
||||
codex_home.path(),
|
||||
"2025-01-05T12-00-04",
|
||||
"2025-01-05T12:00:04Z",
|
||||
&[ExplicitTurnFixture {
|
||||
turn_id: "turn-in-progress",
|
||||
user_text: Some("u1"),
|
||||
agent_text: Some("a1"),
|
||||
state: FixtureTurnState::InProgress,
|
||||
}],
|
||||
)?;
|
||||
|
||||
let mut mcp = McpProcess::new(codex_home.path()).await?;
|
||||
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
|
||||
|
||||
let fork_id = mcp
|
||||
.send_thread_fork_request(ThreadForkParams {
|
||||
thread_id: conversation_id,
|
||||
fork_after_turn_id: Some("turn-in-progress".to_string()),
|
||||
..Default::default()
|
||||
})
|
||||
.await?;
|
||||
let fork_err: JSONRPCError = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_error_message(RequestId::Integer(fork_id)),
|
||||
)
|
||||
.await??;
|
||||
|
||||
assert!(
|
||||
fork_err.error.message.contains("not in progress"),
|
||||
"unexpected fork error: {}",
|
||||
fork_err.error.message
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn thread_fork_rejects_turn_anchor_without_agent_message() -> Result<()> {
|
||||
let server = create_mock_responses_server_repeating_assistant("Done").await;
|
||||
let codex_home = TempDir::new()?;
|
||||
create_config_toml(codex_home.path(), &server.uri())?;
|
||||
|
||||
let conversation_id = create_fake_rollout_with_explicit_turns(
|
||||
codex_home.path(),
|
||||
"2025-01-05T12-00-05",
|
||||
"2025-01-05T12:00:05Z",
|
||||
&[ExplicitTurnFixture {
|
||||
turn_id: "turn-no-agent",
|
||||
user_text: Some("u1"),
|
||||
agent_text: None,
|
||||
state: FixtureTurnState::Completed,
|
||||
}],
|
||||
)?;
|
||||
|
||||
let mut mcp = McpProcess::new(codex_home.path()).await?;
|
||||
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
|
||||
|
||||
let fork_id = mcp
|
||||
.send_thread_fork_request(ThreadForkParams {
|
||||
thread_id: conversation_id,
|
||||
fork_after_turn_id: Some("turn-no-agent".to_string()),
|
||||
..Default::default()
|
||||
})
|
||||
.await?;
|
||||
let fork_err: JSONRPCError = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_error_message(RequestId::Integer(fork_id)),
|
||||
)
|
||||
.await??;
|
||||
|
||||
assert!(
|
||||
fork_err.error.message.contains("agent message"),
|
||||
"unexpected fork error: {}",
|
||||
fork_err.error.message
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Helper to create a config.toml pointing at the mock model server.
|
||||
fn create_config_toml(codex_home: &Path, server_uri: &str) -> std::io::Result<()> {
|
||||
let config_toml = codex_home.join("config.toml");
|
||||
@@ -214,3 +459,161 @@ stream_max_retries = 0
|
||||
),
|
||||
)
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy)]
|
||||
enum FixtureTurnState {
|
||||
Completed,
|
||||
InProgress,
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy)]
|
||||
struct ExplicitTurnFixture<'a> {
|
||||
turn_id: &'a str,
|
||||
user_text: Option<&'a str>,
|
||||
agent_text: Option<&'a str>,
|
||||
state: FixtureTurnState,
|
||||
}
|
||||
|
||||
fn assert_turn_user_text(items: &[ThreadItem], expected: &str) {
|
||||
match items.first() {
|
||||
Some(ThreadItem::UserMessage { content, .. }) => assert_eq!(
|
||||
content,
|
||||
&vec![UserInput::Text {
|
||||
text: expected.to_string(),
|
||||
text_elements: Vec::new(),
|
||||
}]
|
||||
),
|
||||
other => panic!("expected first turn item to be a user message, got {other:?}"),
|
||||
}
|
||||
}
|
||||
|
||||
fn create_fake_rollout_with_explicit_turns(
|
||||
codex_home: &Path,
|
||||
filename_ts: &str,
|
||||
meta_rfc3339: &str,
|
||||
turns: &[ExplicitTurnFixture<'_>],
|
||||
) -> Result<String> {
|
||||
let uuid = Uuid::new_v4();
|
||||
let uuid_str = uuid.to_string();
|
||||
let conversation_id = ThreadId::from_string(&uuid_str)?;
|
||||
let file_path = rollout_path(codex_home, filename_ts, &uuid_str);
|
||||
let dir = file_path
|
||||
.parent()
|
||||
.ok_or_else(|| anyhow::anyhow!("missing rollout parent directory"))?;
|
||||
fs::create_dir_all(dir)?;
|
||||
|
||||
let meta = SessionMeta {
|
||||
id: conversation_id,
|
||||
forked_from_id: None,
|
||||
timestamp: meta_rfc3339.to_string(),
|
||||
cwd: PathBuf::from("/"),
|
||||
originator: "codex".to_string(),
|
||||
cli_version: "0.0.0".to_string(),
|
||||
source: codex_protocol::protocol::SessionSource::Cli,
|
||||
agent_nickname: None,
|
||||
agent_role: None,
|
||||
model_provider: Some("mock_provider".to_string()),
|
||||
base_instructions: None,
|
||||
dynamic_tools: None,
|
||||
};
|
||||
let mut lines = vec![rollout_line(
|
||||
meta_rfc3339,
|
||||
RolloutItem::SessionMeta(SessionMetaLine { meta, git: None }),
|
||||
)?];
|
||||
|
||||
for (idx, turn) in turns.iter().enumerate() {
|
||||
if let Some(user_text) = turn.user_text {
|
||||
lines.push(
|
||||
json!({
|
||||
"timestamp": meta_rfc3339,
|
||||
"type":"response_item",
|
||||
"payload": {
|
||||
"type":"message",
|
||||
"role":"user",
|
||||
"content":[{"type":"input_text","text": user_text}]
|
||||
}
|
||||
})
|
||||
.to_string(),
|
||||
);
|
||||
}
|
||||
|
||||
lines.push(rollout_line(
|
||||
meta_rfc3339,
|
||||
RolloutItem::EventMsg(EventMsg::TurnStarted(TurnStartedEvent {
|
||||
turn_id: turn.turn_id.to_string(),
|
||||
model_context_window: Some(128_000),
|
||||
collaboration_mode_kind: Default::default(),
|
||||
})),
|
||||
)?);
|
||||
if let Some(user_text) = turn.user_text {
|
||||
lines.push(rollout_line(
|
||||
meta_rfc3339,
|
||||
RolloutItem::EventMsg(EventMsg::UserMessage(UserMessageEvent {
|
||||
message: user_text.to_string(),
|
||||
images: None,
|
||||
local_images: Vec::new(),
|
||||
text_elements: Vec::new(),
|
||||
})),
|
||||
)?);
|
||||
}
|
||||
if let Some(agent_text) = turn.agent_text {
|
||||
lines.push(rollout_line(
|
||||
meta_rfc3339,
|
||||
RolloutItem::EventMsg(EventMsg::AgentMessage(AgentMessageEvent {
|
||||
message: agent_text.to_string(),
|
||||
phase: Some(MessagePhase::FinalAnswer),
|
||||
})),
|
||||
)?);
|
||||
}
|
||||
if matches!(turn.state, FixtureTurnState::Completed) {
|
||||
let last_agent_message = turn.agent_text.map(str::to_string);
|
||||
lines.push(rollout_line(
|
||||
meta_rfc3339,
|
||||
RolloutItem::EventMsg(EventMsg::TurnComplete(TurnCompleteEvent {
|
||||
turn_id: turn.turn_id.to_string(),
|
||||
last_agent_message,
|
||||
})),
|
||||
)?);
|
||||
}
|
||||
|
||||
if idx == 0 && turn.agent_text.is_some() {
|
||||
lines.push(
|
||||
json!({
|
||||
"timestamp": meta_rfc3339,
|
||||
"type":"response_item",
|
||||
"payload": {
|
||||
"type":"message",
|
||||
"role":"assistant",
|
||||
"content":[{"type":"output_text","text": turn.agent_text.unwrap_or("")}]
|
||||
}
|
||||
})
|
||||
.to_string(),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
fs::write(&file_path, lines.join("\n") + "\n")?;
|
||||
let parsed = chrono::DateTime::parse_from_rfc3339(meta_rfc3339)?.with_timezone(&chrono::Utc);
|
||||
let times = FileTimes::new().set_modified(parsed.into());
|
||||
fs::OpenOptions::new()
|
||||
.append(true)
|
||||
.open(&file_path)?
|
||||
.set_times(times)?;
|
||||
Ok(uuid_str)
|
||||
}
|
||||
|
||||
fn rollout_line(timestamp: &str, item: RolloutItem) -> Result<String> {
|
||||
let mut line = serde_json::Map::new();
|
||||
line.insert(
|
||||
"timestamp".to_string(),
|
||||
Value::String(timestamp.to_string()),
|
||||
);
|
||||
|
||||
let item_value = serde_json::to_value(item)?;
|
||||
let Value::Object(item_map) = item_value else {
|
||||
anyhow::bail!("rollout item did not serialize as an object");
|
||||
};
|
||||
line.extend(item_map);
|
||||
|
||||
Ok(Value::Object(line).to_string())
|
||||
}
|
||||
|
||||
@@ -348,7 +348,7 @@ impl RolloutRecorder {
|
||||
default_provider,
|
||||
)
|
||||
.await?;
|
||||
if let Some(path) = select_resume_path(&page, filter_cwd) {
|
||||
if let Some(path) = select_resume_path(&page, filter_cwd).await {
|
||||
return Ok(Some(path));
|
||||
}
|
||||
cursor = page.next_cursor;
|
||||
@@ -961,23 +961,36 @@ impl From<codex_state::ThreadsPage> for ThreadsPage {
|
||||
}
|
||||
}
|
||||
|
||||
fn select_resume_path(page: &ThreadsPage, filter_cwd: Option<&Path>) -> Option<PathBuf> {
|
||||
async fn select_resume_path(page: &ThreadsPage, filter_cwd: Option<&Path>) -> Option<PathBuf> {
|
||||
match filter_cwd {
|
||||
Some(cwd) => page.items.iter().find_map(|item| {
|
||||
if item
|
||||
.cwd
|
||||
.as_ref()
|
||||
.is_some_and(|session_cwd| cwd_matches(session_cwd, cwd))
|
||||
{
|
||||
Some(item.path.clone())
|
||||
} else {
|
||||
None
|
||||
Some(cwd) => {
|
||||
for item in &page.items {
|
||||
let effective_cwd = latest_turn_context_cwd(item.path.as_path())
|
||||
.await
|
||||
.or_else(|| item.cwd.clone());
|
||||
if effective_cwd
|
||||
.as_ref()
|
||||
.is_some_and(|session_cwd| cwd_matches(session_cwd, cwd))
|
||||
{
|
||||
return Some(item.path.clone());
|
||||
}
|
||||
}
|
||||
}),
|
||||
None
|
||||
}
|
||||
None => page.items.first().map(|item| item.path.clone()),
|
||||
}
|
||||
}
|
||||
|
||||
async fn latest_turn_context_cwd(rollout_path: &Path) -> Option<PathBuf> {
|
||||
let (items, _thread_id, _parse_errors) = RolloutRecorder::load_rollout_items(rollout_path)
|
||||
.await
|
||||
.ok()?;
|
||||
items.iter().rev().find_map(|item| match item {
|
||||
RolloutItem::TurnContext(turn_context) => Some(turn_context.cwd.clone()),
|
||||
_ => None,
|
||||
})
|
||||
}
|
||||
|
||||
fn select_resume_path_from_db_page(
|
||||
page: &codex_state::ThreadsPage,
|
||||
filter_cwd: Option<&Path>,
|
||||
@@ -1010,8 +1023,13 @@ mod tests {
|
||||
use crate::config::ConfigBuilder;
|
||||
use crate::features::Feature;
|
||||
use chrono::TimeZone;
|
||||
use codex_protocol::config_types::ReasoningSummary;
|
||||
use codex_protocol::protocol::AgentMessageEvent;
|
||||
use codex_protocol::protocol::AskForApproval;
|
||||
use codex_protocol::protocol::EventMsg;
|
||||
use codex_protocol::protocol::RolloutLine;
|
||||
use codex_protocol::protocol::SandboxPolicy;
|
||||
use codex_protocol::protocol::TurnContextItem;
|
||||
use codex_protocol::protocol::UserMessageEvent;
|
||||
use pretty_assertions::assert_eq;
|
||||
use std::fs::File;
|
||||
@@ -1054,6 +1072,35 @@ mod tests {
|
||||
Ok(path)
|
||||
}
|
||||
|
||||
fn append_turn_context(path: &Path, ts: &str, cwd: &Path) -> std::io::Result<()> {
|
||||
let mut file = fs::OpenOptions::new().append(true).open(path)?;
|
||||
let line = RolloutLine {
|
||||
timestamp: ts.to_string(),
|
||||
item: RolloutItem::TurnContext(TurnContextItem {
|
||||
turn_id: None,
|
||||
cwd: cwd.to_path_buf(),
|
||||
approval_policy: AskForApproval::Never,
|
||||
sandbox_policy: SandboxPolicy::new_read_only_policy(),
|
||||
network: None,
|
||||
model: "test-model".to_string(),
|
||||
personality: None,
|
||||
collaboration_mode: None,
|
||||
effort: None,
|
||||
summary: ReasoningSummary::Auto,
|
||||
user_instructions: None,
|
||||
developer_instructions: None,
|
||||
final_output_json_schema: None,
|
||||
truncation_policy: None,
|
||||
}),
|
||||
};
|
||||
writeln!(
|
||||
file,
|
||||
"{}",
|
||||
serde_json::to_string(&line).map_err(std::io::Error::other)?
|
||||
)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn recorder_materializes_only_after_explicit_persist() -> std::io::Result<()> {
|
||||
let home = TempDir::new().expect("temp dir");
|
||||
@@ -1320,4 +1367,35 @@ mod tests {
|
||||
assert_eq!(repaired_path, Some(real_path));
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn find_latest_thread_path_uses_latest_turn_context_cwd_without_sqlite()
|
||||
-> std::io::Result<()> {
|
||||
let home = TempDir::new().expect("temp dir");
|
||||
let mut config = ConfigBuilder::default()
|
||||
.codex_home(home.path().to_path_buf())
|
||||
.build()
|
||||
.await?;
|
||||
config.features.disable(Feature::Sqlite);
|
||||
|
||||
let latest_cwd = home.path().join("latest-cwd");
|
||||
let rollout_path =
|
||||
write_session_file(home.path(), "2025-01-03T12-00-00", Uuid::from_u128(9012))?;
|
||||
append_turn_context(&rollout_path, "2025-01-03T12-00-01", latest_cwd.as_path())?;
|
||||
|
||||
let found = RolloutRecorder::find_latest_thread_path(
|
||||
&config,
|
||||
1,
|
||||
None,
|
||||
ThreadSortKey::UpdatedAt,
|
||||
&[],
|
||||
None,
|
||||
config.model_provider_id.as_str(),
|
||||
Some(latest_cwd.as_path()),
|
||||
)
|
||||
.await?;
|
||||
|
||||
assert_eq!(found, Some(rollout_path));
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -483,3 +483,28 @@ macro_rules! skip_if_windows {
|
||||
}
|
||||
}};
|
||||
}
|
||||
|
||||
pub fn is_github_actions_linux_aarch64() -> bool {
|
||||
cfg!(all(target_os = "linux", target_arch = "aarch64"))
|
||||
&& ::std::env::var("GITHUB_ACTIONS").is_ok()
|
||||
}
|
||||
|
||||
#[macro_export]
|
||||
macro_rules! skip_if_github_actions_linux_aarch64 {
|
||||
() => {{
|
||||
if $crate::is_github_actions_linux_aarch64() {
|
||||
println!(
|
||||
"Skipping test because the default Linux sandbox is unavailable on GitHub Actions linux-aarch64."
|
||||
);
|
||||
return;
|
||||
}
|
||||
}};
|
||||
($return_value:expr $(,)?) => {{
|
||||
if $crate::is_github_actions_linux_aarch64() {
|
||||
println!(
|
||||
"Skipping test because the default Linux sandbox is unavailable on GitHub Actions linux-aarch64."
|
||||
);
|
||||
return $return_value;
|
||||
}
|
||||
}};
|
||||
}
|
||||
|
||||
@@ -12,6 +12,7 @@ use core_test_support::responses::mount_sse_once;
|
||||
use core_test_support::responses::mount_sse_sequence;
|
||||
use core_test_support::responses::sse;
|
||||
use core_test_support::responses::start_mock_server;
|
||||
use core_test_support::skip_if_github_actions_linux_aarch64;
|
||||
use core_test_support::test_codex::test_codex;
|
||||
use core_test_support::wait_for_event;
|
||||
use regex_lite::Regex;
|
||||
@@ -70,6 +71,8 @@ async fn interrupt_long_running_tool_emits_turn_aborted() {
|
||||
/// responses server, and ensures the model receives the synthesized abort.
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn interrupt_tool_records_history_entries() {
|
||||
skip_if_github_actions_linux_aarch64!();
|
||||
|
||||
let command = "sleep 60";
|
||||
let call_id = "call-history";
|
||||
|
||||
@@ -143,7 +146,7 @@ async fn interrupt_tool_records_history_entries() {
|
||||
let output = response_mock
|
||||
.function_call_output_text(call_id)
|
||||
.expect("missing function_call_output text");
|
||||
let re = Regex::new(r"^Wall time: ([0-9]+(?:\.[0-9])?) seconds\naborted by user$")
|
||||
let re = Regex::new(r"Wall time: ([0-9]+(?:\.[0-9]+)?) seconds\r?\naborted by user")
|
||||
.expect("compile regex");
|
||||
let captures = re.captures(&output);
|
||||
assert_matches!(
|
||||
|
||||
@@ -31,6 +31,7 @@ use core_test_support::responses::ev_response_created;
|
||||
use core_test_support::responses::mount_sse_once;
|
||||
use core_test_support::responses::sse;
|
||||
use core_test_support::responses::start_mock_server;
|
||||
use core_test_support::skip_if_github_actions_linux_aarch64;
|
||||
use core_test_support::skip_if_no_network;
|
||||
use core_test_support::test_codex::TestCodex;
|
||||
use core_test_support::test_codex::test_codex;
|
||||
@@ -1573,6 +1574,7 @@ fn scenarios() -> Vec<ScenarioSpec> {
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn approval_matrix_covers_all_modes() -> Result<()> {
|
||||
skip_if_github_actions_linux_aarch64!(Ok(()));
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
for scenario in scenarios() {
|
||||
|
||||
@@ -20,6 +20,7 @@ use core_test_support::responses::mount_sse_once;
|
||||
use core_test_support::responses::mount_sse_sequence;
|
||||
use core_test_support::responses::sse;
|
||||
use core_test_support::responses::start_mock_server;
|
||||
use core_test_support::skip_if_github_actions_linux_aarch64;
|
||||
use core_test_support::skip_if_no_network;
|
||||
use core_test_support::streaming_sse::StreamingSseChunk;
|
||||
use core_test_support::streaming_sse::start_streaming_sse_server;
|
||||
@@ -141,6 +142,7 @@ async fn read_file_tools_run_in_parallel() -> anyhow::Result<()> {
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn shell_tools_run_in_parallel() -> anyhow::Result<()> {
|
||||
skip_if_github_actions_linux_aarch64!(Ok(()));
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
let server = start_mock_server().await;
|
||||
|
||||
@@ -21,6 +21,7 @@ use core_test_support::responses::mount_sse_once;
|
||||
use core_test_support::responses::mount_sse_sequence;
|
||||
use core_test_support::responses::sse;
|
||||
use core_test_support::responses::start_mock_server;
|
||||
use core_test_support::skip_if_github_actions_linux_aarch64;
|
||||
use core_test_support::skip_if_no_network;
|
||||
use core_test_support::test_codex::test_codex;
|
||||
use regex_lite::Regex;
|
||||
@@ -190,6 +191,7 @@ async fn shell_escalated_permissions_rejected_then_ok() -> Result<()> {
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn sandbox_denied_shell_returns_original_output() -> Result<()> {
|
||||
skip_if_github_actions_linux_aarch64!(Ok(()));
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
let server = start_mock_server().await;
|
||||
|
||||
@@ -24,6 +24,7 @@ use core_test_support::responses::ev_response_created;
|
||||
use core_test_support::responses::mount_sse_sequence;
|
||||
use core_test_support::responses::sse;
|
||||
use core_test_support::responses::start_mock_server;
|
||||
use core_test_support::skip_if_github_actions_linux_aarch64;
|
||||
use core_test_support::skip_if_no_network;
|
||||
use core_test_support::skip_if_sandbox;
|
||||
use core_test_support::skip_if_windows;
|
||||
@@ -2493,6 +2494,7 @@ PY
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn unified_exec_runs_under_sandbox() -> Result<()> {
|
||||
skip_if_github_actions_linux_aarch64!(Ok(()));
|
||||
skip_if_no_network!(Ok(()));
|
||||
skip_if_sandbox!(Ok(()));
|
||||
skip_if_windows!(Ok(()));
|
||||
|
||||
@@ -122,6 +122,31 @@ fn is_bwrap_unavailable_output(output: &codex_core::exec::ExecToolCallOutput) ->
|
||||
|| output.stderr.text.contains("Invalid argument")))
|
||||
}
|
||||
|
||||
fn is_landlock_restrict_output(output: &codex_core::exec::ExecToolCallOutput) -> bool {
|
||||
output.stderr.text.contains("Sandbox(LandlockRestrict)")
|
||||
}
|
||||
|
||||
async fn should_skip_workspace_write_tests() -> bool {
|
||||
match run_cmd_result_with_writable_roots(
|
||||
&["bash", "-lc", "true"],
|
||||
&[],
|
||||
LONG_TIMEOUT_MS,
|
||||
false,
|
||||
false,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(_) => false,
|
||||
Err(CodexErr::Sandbox(SandboxErr::Denied { output, .. })) => {
|
||||
is_landlock_restrict_output(&output)
|
||||
}
|
||||
// A hung probe does not give us actionable signal; skip rather than
|
||||
// fail a suite that is already running in a degraded sandbox.
|
||||
Err(CodexErr::Sandbox(SandboxErr::Timeout { .. })) => true,
|
||||
Err(err) => panic!("workspace-write sandbox probe failed unexpectedly: {err:?}"),
|
||||
}
|
||||
}
|
||||
|
||||
async fn should_skip_bwrap_tests() -> bool {
|
||||
match run_cmd_result_with_writable_roots(
|
||||
&["bash", "-lc", "true"],
|
||||
@@ -159,6 +184,10 @@ fn expect_denied(
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_root_read() {
|
||||
if should_skip_workspace_write_tests().await {
|
||||
eprintln!("skipping workspace-write test: landlock restrictions are unavailable");
|
||||
return;
|
||||
}
|
||||
run_cmd(&["ls", "-l", "/bin"], &[], SHORT_TIMEOUT_MS).await;
|
||||
}
|
||||
|
||||
@@ -265,6 +294,11 @@ async fn bwrap_preserves_writable_dev_shm_bind_mount() {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_writable_root() {
|
||||
if should_skip_workspace_write_tests().await {
|
||||
eprintln!("skipping workspace-write test: landlock restrictions are unavailable");
|
||||
return;
|
||||
}
|
||||
|
||||
let tmpdir = tempfile::tempdir().unwrap();
|
||||
let file_path = tmpdir.path().join("test");
|
||||
run_cmd(
|
||||
@@ -283,6 +317,11 @@ async fn test_writable_root() {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_no_new_privs_is_enabled() {
|
||||
if should_skip_workspace_write_tests().await {
|
||||
eprintln!("skipping workspace-write test: landlock restrictions are unavailable");
|
||||
return;
|
||||
}
|
||||
|
||||
let output = run_cmd_output(
|
||||
&["bash", "-lc", "grep '^NoNewPrivs:' /proc/self/status"],
|
||||
&[],
|
||||
@@ -301,9 +340,17 @@ async fn test_no_new_privs_is_enabled() {
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
#[should_panic(expected = "Sandbox(Timeout")]
|
||||
async fn test_timeout() {
|
||||
run_cmd(&["sleep", "2"], &[], 50).await;
|
||||
if should_skip_workspace_write_tests().await {
|
||||
eprintln!("skipping workspace-write test: landlock restrictions are unavailable");
|
||||
return;
|
||||
}
|
||||
|
||||
let result = run_cmd_result_with_writable_roots(&["sleep", "2"], &[], 50, false, false).await;
|
||||
assert!(
|
||||
matches!(result, Err(CodexErr::Sandbox(SandboxErr::Timeout { .. }))),
|
||||
"expected sandbox timeout, got {result:?}"
|
||||
);
|
||||
}
|
||||
|
||||
/// Helper that runs `cmd` under the Linux sandbox and asserts that the command
|
||||
|
||||
Reference in New Issue
Block a user