Compare commits

...

5 Commits

Author SHA1 Message Date
Charles Cunningham
1357f36a25 Preserve full-history fork semantics
Co-authored-by: Codex <noreply@openai.com>
2026-03-23 13:54:03 -07:00
Charles Cunningham
5dac3c0f23 codex: route fork snapshot through core test support
Co-authored-by: Codex <noreply@openai.com>
2026-03-23 13:30:45 -07:00
Charles Cunningham
5f67c17144 codex: fix external fork snapshot test type
Co-authored-by: Codex <noreply@openai.com>
2026-03-23 13:22:25 -07:00
Charles Cunningham
7370d01767 codex: fix PR CI fallout on latest main
Co-authored-by: Codex <noreply@openai.com>
2026-03-23 13:14:32 -07:00
Charles Cunningham
f3d1732242 core: add fork snapshot compatibility
Co-authored-by: Codex <noreply@openai.com>
2026-03-23 13:13:54 -07:00
11 changed files with 79 additions and 18 deletions

View File

@@ -183,6 +183,7 @@ use codex_core::AuthManager;
use codex_core::CodexAuth;
use codex_core::CodexThread;
use codex_core::Cursor as RolloutCursor;
use codex_core::ForkSnapshot;
use codex_core::NewThread;
use codex_core::RolloutRecorder;
use codex_core::SessionMeta;
@@ -4039,7 +4040,7 @@ impl CodexMessageProcessor {
} = match self
.thread_manager
.fork_thread(
usize::MAX,
ForkSnapshot::FullHistory,
config,
rollout_path.clone(),
persist_extended_history,
@@ -6508,7 +6509,7 @@ impl CodexMessageProcessor {
} = self
.thread_manager
.fork_thread(
usize::MAX,
ForkSnapshot::FullHistory,
config,
rollout_path,
/*persist_extended_history*/ false,

View File

@@ -79,6 +79,7 @@ use codex_protocol::protocol::ConversationAudioParams;
use codex_protocol::protocol::RealtimeAudioFrame;
use codex_protocol::protocol::Submission;
use codex_protocol::protocol::W3cTraceContext;
use core_test_support::codex_core::ForkSnapshot;
use core_test_support::context_snapshot;
use core_test_support::context_snapshot::ContextSnapshotOptions;
use core_test_support::context_snapshot::ContextSnapshotRenderMode;
@@ -1172,7 +1173,13 @@ async fn fork_startup_context_then_first_turn_diff_snapshot() -> anyhow::Result<
codex_config::Constrained::allow_any(AskForApproval::UnlessTrusted);
let forked = initial
.thread_manager
.fork_thread(usize::MAX, fork_config, rollout_path, false, None)
.fork_thread(
ForkSnapshot::FullHistory,
fork_config,
rollout_path,
/*persist_extended_history*/ false,
/*parent_trace*/ None,
)
.await?;
let collaboration_mode = CollaborationMode {

View File

@@ -97,6 +97,7 @@ mod seatbelt_permissions;
mod thread_manager;
pub mod web_search;
pub mod windows_sandbox_read_grants;
pub use thread_manager::ForkSnapshot;
pub use thread_manager::NewThread;
pub use thread_manager::ThreadManager;
#[deprecated(note = "use ThreadManager")]

View File

@@ -128,6 +128,14 @@ pub struct NewThread {
pub session_configured: SessionConfiguredEvent,
}
/// Describes how a thread fork should seed the new thread's history.
pub enum ForkSnapshot {
/// Cut strictly before the nth user message (0-based).
TruncateBeforeNthUserMessage(usize),
/// Keep the full source history.
FullHistory,
}
#[derive(Debug, Default, PartialEq, Eq)]
pub struct ThreadShutdownReport {
pub completed: Vec<ThreadId>,
@@ -543,20 +551,32 @@ impl ThreadManager {
report
}
/// Fork an existing thread by taking messages up to the given position (not including
/// the message at the given position) and starting a new thread with identical
/// configuration (unless overridden by the caller's `config`). The new thread will have
/// a fresh id. Pass `usize::MAX` to keep the full rollout history.
/// Fork an existing thread by snapshotting rollout history according to
/// `snapshot` and starting a new thread with identical configuration
/// (unless overridden by the caller's `config`). The new thread will have
/// a fresh id.
pub async fn fork_thread(
&self,
nth_user_message: usize,
snapshot: ForkSnapshot,
config: Config,
path: PathBuf,
persist_extended_history: bool,
parent_trace: Option<W3cTraceContext>,
) -> CodexResult<NewThread> {
let history = RolloutRecorder::get_rollout_history(&path).await?;
let history = truncate_before_nth_user_message(history, nth_user_message);
let history = match snapshot {
ForkSnapshot::TruncateBeforeNthUserMessage(nth_user_message) => {
truncate_before_nth_user_message(history, nth_user_message)
}
ForkSnapshot::FullHistory => {
let rollout_items = history.get_rollout_items();
if rollout_items.is_empty() {
InitialHistory::New
} else {
InitialHistory::Forked(rollout_items)
}
}
};
Box::pin(self.state.spawn_thread(
config,
history,

View File

@@ -1,5 +1,7 @@
#![expect(clippy::expect_used)]
pub use codex_core;
use anyhow::Context as _;
use anyhow::ensure;
use codex_arg0::Arg0PathEntryGuard;

View File

@@ -12,6 +12,7 @@ use super::compact::FIRST_REPLY;
use super::compact::SUMMARY_TEXT;
use anyhow::Result;
use codex_core::CodexThread;
use codex_core::ForkSnapshot;
use codex_core::ThreadManager;
use codex_core::compact::SUMMARIZATION_PROMPT;
use codex_core::config::Config;
@@ -701,8 +702,14 @@ async fn fork_thread(
path: std::path::PathBuf,
nth_user_message: usize,
) -> Arc<CodexThread> {
Box::pin(manager.fork_thread(nth_user_message, config.clone(), path, false, None))
.await
.expect("fork conversation")
.thread
Box::pin(manager.fork_thread(
ForkSnapshot::TruncateBeforeNthUserMessage(nth_user_message),
config.clone(),
path,
/*persist_extended_history*/ false,
/*parent_trace*/ None,
))
.await
.expect("fork conversation")
.thread
}

View File

@@ -1,3 +1,4 @@
use codex_core::ForkSnapshot;
use codex_core::NewThread;
use codex_core::parse_turn_item;
use codex_protocol::items::TurnItem;
@@ -110,7 +111,13 @@ async fn fork_thread_twice_drops_to_first_message() {
thread: codex_fork1,
..
} = thread_manager
.fork_thread(1, config_for_fork.clone(), base_path.clone(), false, None)
.fork_thread(
ForkSnapshot::TruncateBeforeNthUserMessage(1),
config_for_fork.clone(),
base_path.clone(),
/*persist_extended_history*/ false,
/*parent_trace*/ None,
)
.await
.expect("fork 1");
@@ -129,7 +136,13 @@ async fn fork_thread_twice_drops_to_first_message() {
thread: codex_fork2,
..
} = thread_manager
.fork_thread(0, config_for_fork.clone(), fork1_path.clone(), false, None)
.fork_thread(
ForkSnapshot::TruncateBeforeNthUserMessage(0),
config_for_fork.clone(),
fork1_path.clone(),
/*persist_extended_history*/ false,
/*parent_trace*/ None,
)
.await
.expect("fork 2");

View File

@@ -1,4 +1,5 @@
use anyhow::Result;
use codex_core::ForkSnapshot;
use codex_core::config::Constrained;
use codex_execpolicy::Policy;
use codex_protocol::models::DeveloperInstructions;
@@ -419,7 +420,13 @@ async fn resume_and_fork_append_permissions_messages() -> Result<()> {
fork_config.permissions.approval_policy = Constrained::allow_any(AskForApproval::UnlessTrusted);
let forked = initial
.thread_manager
.fork_thread(usize::MAX, fork_config, rollout_path, false, None)
.fork_thread(
ForkSnapshot::FullHistory,
fork_config,
rollout_path,
/*persist_extended_history*/ false,
/*parent_trace*/ None,
)
.await?;
forked
.thread

View File

@@ -57,6 +57,7 @@ use codex_app_server_protocol::RequestId;
use codex_arg0::Arg0DispatchPaths;
use codex_core::AuthManager;
use codex_core::CodexAuth;
use codex_core::ForkSnapshot;
use codex_core::ThreadManager;
use codex_core::config::Config;
use codex_core::config::ConfigBuilder;
@@ -2502,7 +2503,7 @@ impl App {
);
let forked = thread_manager
.fork_thread(
usize::MAX,
ForkSnapshot::FullHistory,
config.clone(),
target_session.path.clone(),
/*persist_extended_history*/ false,
@@ -2925,7 +2926,7 @@ impl App {
match self
.server
.fork_thread(
usize::MAX,
ForkSnapshot::FullHistory,
self.config.clone(),
path.clone(),
/*persist_extended_history*/ false,

View File

@@ -7194,6 +7194,7 @@ fn plugins_test_detail(
short_description: None,
interface: None,
path: PathBuf::from(format!("/skills/{name}/SKILL.md")),
enabled: true,
})
.collect(),
apps: apps

View File

@@ -7791,6 +7791,7 @@ fn plugins_test_detail(
short_description: None,
interface: None,
path: PathBuf::from(format!("/skills/{name}/SKILL.md")),
enabled: true,
})
.collect(),
apps: apps