diff --git a/codex-rs/core/src/thread_manager.rs b/codex-rs/core/src/thread_manager.rs index feda91ce2f..a19832717d 100644 --- a/codex-rs/core/src/thread_manager.rs +++ b/codex-rs/core/src/thread_manager.rs @@ -7,7 +7,6 @@ use crate::environment_selection::default_thread_environment_selections; use crate::environment_selection::resolve_environment_selections; use crate::file_watcher::FileWatcher; use crate::mcp::McpManager; -use crate::rollout::RolloutRecorder; use crate::rollout::truncation; use crate::session::Codex; use crate::session::CodexSpawnArgs; @@ -41,6 +40,7 @@ use codex_protocol::protocol::Event; use codex_protocol::protocol::EventMsg; use codex_protocol::protocol::InitialHistory; use codex_protocol::protocol::Op; +use codex_protocol::protocol::ResumedHistory; use codex_protocol::protocol::RolloutItem; use codex_protocol::protocol::SessionConfiguredEvent; use codex_protocol::protocol::SessionSource; @@ -55,6 +55,7 @@ use codex_state::DirectionalThreadSpawnEdgeStatus; use codex_thread_store::InMemoryThreadStore; use codex_thread_store::LocalThreadStore; use codex_thread_store::LocalThreadStoreConfig; +use codex_thread_store::ReadThreadByRolloutPathParams; use codex_thread_store::ReadThreadParams; use codex_thread_store::RemoteThreadStore; use codex_thread_store::StoredThread; @@ -615,7 +616,7 @@ impl ThreadManager { auth_manager: Arc, parent_trace: Option, ) -> CodexResult { - let initial_history = RolloutRecorder::get_rollout_history(&rollout_path).await?; + let initial_history = self.initial_history_from_rollout_path(rollout_path).await?; Box::pin(self.resume_thread_with_history( config, initial_history, @@ -687,7 +688,7 @@ impl ThreadManager { auth_manager: Arc, user_shell_override: crate::shell::Shell, ) -> CodexResult { - let initial_history = RolloutRecorder::get_rollout_history(&rollout_path).await?; + let initial_history = self.initial_history_from_rollout_path(rollout_path).await?; let environments = default_thread_environment_selections( self.state.environment_manager.as_ref(), &config.cwd, @@ -784,7 +785,7 @@ impl ThreadManager { S: Into, { let snapshot = snapshot.into(); - let history = RolloutRecorder::get_rollout_history(&path).await?; + let history = self.initial_history_from_rollout_path(path).await?; self.fork_thread_from_history( snapshot, config, @@ -796,6 +797,24 @@ impl ThreadManager { .await } + async fn initial_history_from_rollout_path( + &self, + rollout_path: PathBuf, + ) -> CodexResult { + let requested_rollout_path = rollout_path.clone(); + let stored_thread = self + .state + .thread_store + .read_thread_by_rollout_path(ReadThreadByRolloutPathParams { + rollout_path, + include_archived: true, + include_history: true, + }) + .await + .map_err(thread_store_rollout_read_error)?; + stored_thread_to_initial_history(stored_thread, Some(requested_rollout_path)) + } + /// Fork an existing thread from already-loaded store history. pub async fn fork_thread_from_history( &self, @@ -1280,6 +1299,31 @@ impl ThreadManagerState { } } +fn stored_thread_to_initial_history( + stored_thread: StoredThread, + rollout_path: Option, +) -> CodexResult { + let thread_id = stored_thread.thread_id; + let history = stored_thread.history.ok_or_else(|| { + CodexErr::Fatal(format!( + "thread {thread_id} did not include persisted history" + )) + })?; + Ok(InitialHistory::Resumed(ResumedHistory { + conversation_id: thread_id, + history: history.items, + rollout_path: rollout_path.or(stored_thread.rollout_path), + })) +} + +fn thread_store_rollout_read_error(err: ThreadStoreError) -> CodexErr { + match err { + ThreadStoreError::ThreadNotFound { thread_id } => CodexErr::ThreadNotFound(thread_id), + ThreadStoreError::InvalidRequest { message } => CodexErr::InvalidRequest(message), + err => CodexErr::Fatal(format!("failed to read thread by rollout path: {err}")), + } +} + /// Return a fork snapshot cut strictly before the nth user message (0-based). /// /// Out-of-range values keep the full committed history at a turn boundary, but diff --git a/codex-rs/core/src/thread_manager_tests.rs b/codex-rs/core/src/thread_manager_tests.rs index 17ac81a93b..0834c18e21 100644 --- a/codex-rs/core/src/thread_manager_tests.rs +++ b/codex-rs/core/src/thread_manager_tests.rs @@ -16,6 +16,7 @@ use codex_protocol::openai_models::ModelsResponse; use codex_protocol::protocol::AgentMessageEvent; use codex_protocol::protocol::InitialHistory; use codex_protocol::protocol::InternalSessionSource; +use codex_protocol::protocol::ResumedHistory; use codex_protocol::protocol::SessionSource; use codex_protocol::protocol::ThreadSource; use codex_protocol::protocol::TurnStartedEvent; @@ -730,6 +731,111 @@ async fn resume_stopped_thread_from_rollout_preserves_thread_source() { .expect("shutdown resumed thread"); } +#[tokio::test] +async fn rollout_path_resume_and_fork_read_history_through_thread_store() { + let temp_dir = tempdir().expect("tempdir"); + let mut config = test_config().await; + config.codex_home = temp_dir.path().join("codex-home").abs(); + config.cwd = config.codex_home.abs(); + config.experimental_thread_store = ThreadStoreConfig::InMemory { + id: format!("thread-manager-{}", uuid::Uuid::new_v4()), + }; + std::fs::create_dir_all(&config.codex_home).expect("create codex home"); + + let auth_manager = + AuthManager::from_auth_for_testing(CodexAuth::create_dummy_chatgpt_auth_for_testing()); + let state_db = init_state_db(&config).await; + let thread_store = thread_store_from_config(&config, state_db.clone()); + let in_memory_store = thread_store + .as_any() + .downcast_ref::() + .expect("configured in-memory store"); + let manager = ThreadManager::new( + &config, + auth_manager.clone(), + SessionSource::Exec, + Arc::new(codex_exec_server::EnvironmentManager::default_for_tests()), + /*analytics_events_client*/ None, + thread_store.clone(), + state_db, + TEST_INSTALLATION_ID.to_string(), + ); + + let source = manager + .start_thread(config.clone()) + .await + .expect("start source thread"); + source + .thread + .shutdown_and_wait() + .await + .expect("shutdown source thread"); + let _ = manager.remove_thread(&source.thread_id).await; + + let rollout_path = config + .codex_home + .join("rollouts/source.jsonl") + .to_path_buf(); + let resumed = manager + .resume_thread_with_history( + config.clone(), + InitialHistory::Resumed(ResumedHistory { + conversation_id: source.thread_id, + history: vec![RolloutItem::ResponseItem(user_msg("hello"))], + rollout_path: Some(rollout_path.clone()), + }), + auth_manager.clone(), + /*persist_extended_history*/ false, + /*parent_trace*/ None, + ) + .await + .expect("seed rollout path in store"); + resumed + .thread + .shutdown_and_wait() + .await + .expect("shutdown seeded resumed thread"); + let _ = manager.remove_thread(&resumed.thread_id).await; + + let resumed_from_path = manager + .resume_thread_from_rollout( + config.clone(), + rollout_path.clone(), + auth_manager, + /*parent_trace*/ None, + ) + .await + .expect("resume from rollout path"); + assert_eq!(resumed_from_path.thread_id, resumed.thread_id); + + let forked = manager + .fork_thread( + ForkSnapshot::Interrupted, + config, + rollout_path, + /*thread_source*/ None, + /*persist_extended_history*/ false, + /*parent_trace*/ None, + ) + .await + .expect("fork from rollout path"); + assert_ne!(forked.thread_id, resumed.thread_id); + + let calls = in_memory_store.calls().await; + assert_eq!(calls.read_thread_by_rollout_path, 2); + + resumed_from_path + .thread + .shutdown_and_wait() + .await + .expect("shutdown path-resumed thread"); + forked + .thread + .shutdown_and_wait() + .await + .expect("shutdown forked thread"); +} + #[tokio::test] async fn new_uses_active_provider_for_model_refresh() { let server = MockServer::start().await; diff --git a/codex-rs/thread-store/src/in_memory.rs b/codex-rs/thread-store/src/in_memory.rs index fca3d21e62..77739260b8 100644 --- a/codex-rs/thread-store/src/in_memory.rs +++ b/codex-rs/thread-store/src/in_memory.rs @@ -256,10 +256,16 @@ fn stored_thread_from_state( items: history_items.clone(), }); let name = state.names.get(&thread_id).cloned().flatten(); + let rollout_path = state + .rollout_paths + .iter() + .find_map(|(path, mapped_thread_id)| { + (*mapped_thread_id == thread_id).then(|| path.clone()) + }); Ok(StoredThread { thread_id, - rollout_path: None, + rollout_path, forked_from_id: created.forked_from_id, preview: String::new(), name,