mirror of
https://github.com/openai/codex.git
synced 2026-05-15 00:32:51 +00:00
Route ThreadManager rollout path reads through thread store (#21265)
- Route ThreadManager rollout-path resume/fork through ThreadStore history reads. - Add in-memory store coverage proving path-addressed reads are used. This isn't strictly necessary for the ThreadStore migration, since these ThreadManager methods _only_ work for path-based lookups, but I'm trying to migrate all the rollout recorder callsites to use the threadstore were possible for consistency.
This commit is contained in:
@@ -7,7 +7,6 @@ use crate::environment_selection::default_thread_environment_selections;
|
||||
use crate::environment_selection::resolve_environment_selections;
|
||||
use crate::file_watcher::FileWatcher;
|
||||
use crate::mcp::McpManager;
|
||||
use crate::rollout::RolloutRecorder;
|
||||
use crate::rollout::truncation;
|
||||
use crate::session::Codex;
|
||||
use crate::session::CodexSpawnArgs;
|
||||
@@ -41,6 +40,7 @@ use codex_protocol::protocol::Event;
|
||||
use codex_protocol::protocol::EventMsg;
|
||||
use codex_protocol::protocol::InitialHistory;
|
||||
use codex_protocol::protocol::Op;
|
||||
use codex_protocol::protocol::ResumedHistory;
|
||||
use codex_protocol::protocol::RolloutItem;
|
||||
use codex_protocol::protocol::SessionConfiguredEvent;
|
||||
use codex_protocol::protocol::SessionSource;
|
||||
@@ -55,6 +55,7 @@ use codex_state::DirectionalThreadSpawnEdgeStatus;
|
||||
use codex_thread_store::InMemoryThreadStore;
|
||||
use codex_thread_store::LocalThreadStore;
|
||||
use codex_thread_store::LocalThreadStoreConfig;
|
||||
use codex_thread_store::ReadThreadByRolloutPathParams;
|
||||
use codex_thread_store::ReadThreadParams;
|
||||
use codex_thread_store::RemoteThreadStore;
|
||||
use codex_thread_store::StoredThread;
|
||||
@@ -615,7 +616,7 @@ impl ThreadManager {
|
||||
auth_manager: Arc<AuthManager>,
|
||||
parent_trace: Option<W3cTraceContext>,
|
||||
) -> CodexResult<NewThread> {
|
||||
let initial_history = RolloutRecorder::get_rollout_history(&rollout_path).await?;
|
||||
let initial_history = self.initial_history_from_rollout_path(rollout_path).await?;
|
||||
Box::pin(self.resume_thread_with_history(
|
||||
config,
|
||||
initial_history,
|
||||
@@ -687,7 +688,7 @@ impl ThreadManager {
|
||||
auth_manager: Arc<AuthManager>,
|
||||
user_shell_override: crate::shell::Shell,
|
||||
) -> CodexResult<NewThread> {
|
||||
let initial_history = RolloutRecorder::get_rollout_history(&rollout_path).await?;
|
||||
let initial_history = self.initial_history_from_rollout_path(rollout_path).await?;
|
||||
let environments = default_thread_environment_selections(
|
||||
self.state.environment_manager.as_ref(),
|
||||
&config.cwd,
|
||||
@@ -784,7 +785,7 @@ impl ThreadManager {
|
||||
S: Into<ForkSnapshot>,
|
||||
{
|
||||
let snapshot = snapshot.into();
|
||||
let history = RolloutRecorder::get_rollout_history(&path).await?;
|
||||
let history = self.initial_history_from_rollout_path(path).await?;
|
||||
self.fork_thread_from_history(
|
||||
snapshot,
|
||||
config,
|
||||
@@ -796,6 +797,24 @@ impl ThreadManager {
|
||||
.await
|
||||
}
|
||||
|
||||
async fn initial_history_from_rollout_path(
|
||||
&self,
|
||||
rollout_path: PathBuf,
|
||||
) -> CodexResult<InitialHistory> {
|
||||
let requested_rollout_path = rollout_path.clone();
|
||||
let stored_thread = self
|
||||
.state
|
||||
.thread_store
|
||||
.read_thread_by_rollout_path(ReadThreadByRolloutPathParams {
|
||||
rollout_path,
|
||||
include_archived: true,
|
||||
include_history: true,
|
||||
})
|
||||
.await
|
||||
.map_err(thread_store_rollout_read_error)?;
|
||||
stored_thread_to_initial_history(stored_thread, Some(requested_rollout_path))
|
||||
}
|
||||
|
||||
/// Fork an existing thread from already-loaded store history.
|
||||
pub async fn fork_thread_from_history<S>(
|
||||
&self,
|
||||
@@ -1280,6 +1299,31 @@ impl ThreadManagerState {
|
||||
}
|
||||
}
|
||||
|
||||
fn stored_thread_to_initial_history(
|
||||
stored_thread: StoredThread,
|
||||
rollout_path: Option<PathBuf>,
|
||||
) -> CodexResult<InitialHistory> {
|
||||
let thread_id = stored_thread.thread_id;
|
||||
let history = stored_thread.history.ok_or_else(|| {
|
||||
CodexErr::Fatal(format!(
|
||||
"thread {thread_id} did not include persisted history"
|
||||
))
|
||||
})?;
|
||||
Ok(InitialHistory::Resumed(ResumedHistory {
|
||||
conversation_id: thread_id,
|
||||
history: history.items,
|
||||
rollout_path: rollout_path.or(stored_thread.rollout_path),
|
||||
}))
|
||||
}
|
||||
|
||||
fn thread_store_rollout_read_error(err: ThreadStoreError) -> CodexErr {
|
||||
match err {
|
||||
ThreadStoreError::ThreadNotFound { thread_id } => CodexErr::ThreadNotFound(thread_id),
|
||||
ThreadStoreError::InvalidRequest { message } => CodexErr::InvalidRequest(message),
|
||||
err => CodexErr::Fatal(format!("failed to read thread by rollout path: {err}")),
|
||||
}
|
||||
}
|
||||
|
||||
/// Return a fork snapshot cut strictly before the nth user message (0-based).
|
||||
///
|
||||
/// Out-of-range values keep the full committed history at a turn boundary, but
|
||||
|
||||
@@ -16,6 +16,7 @@ use codex_protocol::openai_models::ModelsResponse;
|
||||
use codex_protocol::protocol::AgentMessageEvent;
|
||||
use codex_protocol::protocol::InitialHistory;
|
||||
use codex_protocol::protocol::InternalSessionSource;
|
||||
use codex_protocol::protocol::ResumedHistory;
|
||||
use codex_protocol::protocol::SessionSource;
|
||||
use codex_protocol::protocol::ThreadSource;
|
||||
use codex_protocol::protocol::TurnStartedEvent;
|
||||
@@ -730,6 +731,111 @@ async fn resume_stopped_thread_from_rollout_preserves_thread_source() {
|
||||
.expect("shutdown resumed thread");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn rollout_path_resume_and_fork_read_history_through_thread_store() {
|
||||
let temp_dir = tempdir().expect("tempdir");
|
||||
let mut config = test_config().await;
|
||||
config.codex_home = temp_dir.path().join("codex-home").abs();
|
||||
config.cwd = config.codex_home.abs();
|
||||
config.experimental_thread_store = ThreadStoreConfig::InMemory {
|
||||
id: format!("thread-manager-{}", uuid::Uuid::new_v4()),
|
||||
};
|
||||
std::fs::create_dir_all(&config.codex_home).expect("create codex home");
|
||||
|
||||
let auth_manager =
|
||||
AuthManager::from_auth_for_testing(CodexAuth::create_dummy_chatgpt_auth_for_testing());
|
||||
let state_db = init_state_db(&config).await;
|
||||
let thread_store = thread_store_from_config(&config, state_db.clone());
|
||||
let in_memory_store = thread_store
|
||||
.as_any()
|
||||
.downcast_ref::<InMemoryThreadStore>()
|
||||
.expect("configured in-memory store");
|
||||
let manager = ThreadManager::new(
|
||||
&config,
|
||||
auth_manager.clone(),
|
||||
SessionSource::Exec,
|
||||
Arc::new(codex_exec_server::EnvironmentManager::default_for_tests()),
|
||||
/*analytics_events_client*/ None,
|
||||
thread_store.clone(),
|
||||
state_db,
|
||||
TEST_INSTALLATION_ID.to_string(),
|
||||
);
|
||||
|
||||
let source = manager
|
||||
.start_thread(config.clone())
|
||||
.await
|
||||
.expect("start source thread");
|
||||
source
|
||||
.thread
|
||||
.shutdown_and_wait()
|
||||
.await
|
||||
.expect("shutdown source thread");
|
||||
let _ = manager.remove_thread(&source.thread_id).await;
|
||||
|
||||
let rollout_path = config
|
||||
.codex_home
|
||||
.join("rollouts/source.jsonl")
|
||||
.to_path_buf();
|
||||
let resumed = manager
|
||||
.resume_thread_with_history(
|
||||
config.clone(),
|
||||
InitialHistory::Resumed(ResumedHistory {
|
||||
conversation_id: source.thread_id,
|
||||
history: vec![RolloutItem::ResponseItem(user_msg("hello"))],
|
||||
rollout_path: Some(rollout_path.clone()),
|
||||
}),
|
||||
auth_manager.clone(),
|
||||
/*persist_extended_history*/ false,
|
||||
/*parent_trace*/ None,
|
||||
)
|
||||
.await
|
||||
.expect("seed rollout path in store");
|
||||
resumed
|
||||
.thread
|
||||
.shutdown_and_wait()
|
||||
.await
|
||||
.expect("shutdown seeded resumed thread");
|
||||
let _ = manager.remove_thread(&resumed.thread_id).await;
|
||||
|
||||
let resumed_from_path = manager
|
||||
.resume_thread_from_rollout(
|
||||
config.clone(),
|
||||
rollout_path.clone(),
|
||||
auth_manager,
|
||||
/*parent_trace*/ None,
|
||||
)
|
||||
.await
|
||||
.expect("resume from rollout path");
|
||||
assert_eq!(resumed_from_path.thread_id, resumed.thread_id);
|
||||
|
||||
let forked = manager
|
||||
.fork_thread(
|
||||
ForkSnapshot::Interrupted,
|
||||
config,
|
||||
rollout_path,
|
||||
/*thread_source*/ None,
|
||||
/*persist_extended_history*/ false,
|
||||
/*parent_trace*/ None,
|
||||
)
|
||||
.await
|
||||
.expect("fork from rollout path");
|
||||
assert_ne!(forked.thread_id, resumed.thread_id);
|
||||
|
||||
let calls = in_memory_store.calls().await;
|
||||
assert_eq!(calls.read_thread_by_rollout_path, 2);
|
||||
|
||||
resumed_from_path
|
||||
.thread
|
||||
.shutdown_and_wait()
|
||||
.await
|
||||
.expect("shutdown path-resumed thread");
|
||||
forked
|
||||
.thread
|
||||
.shutdown_and_wait()
|
||||
.await
|
||||
.expect("shutdown forked thread");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn new_uses_active_provider_for_model_refresh() {
|
||||
let server = MockServer::start().await;
|
||||
|
||||
@@ -256,10 +256,16 @@ fn stored_thread_from_state(
|
||||
items: history_items.clone(),
|
||||
});
|
||||
let name = state.names.get(&thread_id).cloned().flatten();
|
||||
let rollout_path = state
|
||||
.rollout_paths
|
||||
.iter()
|
||||
.find_map(|(path, mapped_thread_id)| {
|
||||
(*mapped_thread_id == thread_id).then(|| path.clone())
|
||||
});
|
||||
|
||||
Ok(StoredThread {
|
||||
thread_id,
|
||||
rollout_path: None,
|
||||
rollout_path,
|
||||
forked_from_id: created.forked_from_id,
|
||||
preview: String::new(),
|
||||
name,
|
||||
|
||||
Reference in New Issue
Block a user