Files
codex/codex-rs/thread-store/src/live_thread.rs
Tom 33d24b0df5 codex: migrate (more) app-server thread history reads to ThreadStore (#20575)
Migrate token usage replay, rollback responses, and detached review
setup (a special case of forking) to be served from ThreadStore reads
rather direct rollout files.

- replay restored token usage from already-loaded `RolloutItem` history
instead of reopening `Thread.path`
- rebuild rollback responses from loaded `ThreadStore` snapshots and
history
- start detached reviews from store-backed parent history and stored
review-thread metadata
- remove obsolete app-server rollout-summary helper code that became
dead after the store-backed migration
- preserve response/notification ordering for resume, fork, rollback,
and detached review flows
- add integration test coverage for the affected paths
2026-05-04 21:16:50 -07:00

207 lines
6.0 KiB
Rust

use std::path::PathBuf;
use std::sync::Arc;
use codex_protocol::ThreadId;
use codex_protocol::protocol::RolloutItem;
use codex_protocol::protocol::ThreadMemoryMode;
use tracing::warn;
use crate::AppendThreadItemsParams;
use crate::CreateThreadParams;
use crate::LoadThreadHistoryParams;
use crate::LocalThreadStore;
use crate::ReadThreadParams;
use crate::ResumeThreadParams;
use crate::StoredThread;
use crate::StoredThreadHistory;
use crate::ThreadMetadataPatch;
use crate::ThreadStore;
use crate::ThreadStoreResult;
use crate::UpdateThreadMetadataParams;
/// Handle for an active thread's persistence lifecycle.
///
/// `LiveThread` keeps lifecycle decisions with the caller while delegating storage details to
/// [`ThreadStore`]. Local stores may use a rollout file internally and remote stores may use a
/// service, but session code should only need this handle for the active thread.
#[derive(Clone)]
pub struct LiveThread {
thread_id: ThreadId,
thread_store: Arc<dyn ThreadStore>,
}
/// Owns a live thread while session initialization is still fallible.
///
/// If initialization returns early after persistence has been opened, dropping this guard discards
/// the live writer without forcing lazy in-memory state to become durable. Call [`commit`] once the
/// session owns the live thread for normal operation.
pub struct LiveThreadInitGuard {
live_thread: Option<LiveThread>,
}
impl LiveThreadInitGuard {
pub fn new(live_thread: Option<LiveThread>) -> Self {
Self { live_thread }
}
pub fn as_ref(&self) -> Option<&LiveThread> {
self.live_thread.as_ref()
}
pub fn commit(&mut self) {
self.live_thread = None;
}
pub async fn discard(&mut self) {
let Some(live_thread) = self.live_thread.take() else {
return;
};
if let Err(err) = live_thread.discard().await {
warn!("failed to discard thread persistence for failed session init: {err}");
}
}
}
impl Drop for LiveThreadInitGuard {
fn drop(&mut self) {
let Some(live_thread) = self.live_thread.take() else {
return;
};
let Ok(handle) = tokio::runtime::Handle::try_current() else {
warn!("failed to discard thread persistence for failed session init: no Tokio runtime");
return;
};
handle.spawn(async move {
if let Err(err) = live_thread.discard().await {
warn!("failed to discard thread persistence for failed session init: {err}");
}
});
}
}
impl LiveThread {
pub async fn create(
thread_store: Arc<dyn ThreadStore>,
params: CreateThreadParams,
) -> ThreadStoreResult<Self> {
let thread_id = params.thread_id;
thread_store.create_thread(params).await?;
Ok(Self {
thread_id,
thread_store,
})
}
pub async fn resume(
thread_store: Arc<dyn ThreadStore>,
params: ResumeThreadParams,
) -> ThreadStoreResult<Self> {
let thread_id = params.thread_id;
thread_store.resume_thread(params).await?;
Ok(Self {
thread_id,
thread_store,
})
}
pub async fn append_items(&self, items: &[RolloutItem]) -> ThreadStoreResult<()> {
self.thread_store
.append_items(AppendThreadItemsParams {
thread_id: self.thread_id,
items: items.to_vec(),
})
.await
}
pub async fn persist(&self) -> ThreadStoreResult<()> {
self.thread_store.persist_thread(self.thread_id).await
}
pub async fn flush(&self) -> ThreadStoreResult<()> {
self.thread_store.flush_thread(self.thread_id).await
}
pub async fn shutdown(&self) -> ThreadStoreResult<()> {
self.thread_store.shutdown_thread(self.thread_id).await
}
pub async fn discard(&self) -> ThreadStoreResult<()> {
self.thread_store.discard_thread(self.thread_id).await
}
pub async fn load_history(
&self,
include_archived: bool,
) -> ThreadStoreResult<StoredThreadHistory> {
self.thread_store
.load_history(LoadThreadHistoryParams {
thread_id: self.thread_id,
include_archived,
})
.await
}
pub async fn read_thread(
&self,
include_archived: bool,
include_history: bool,
) -> ThreadStoreResult<StoredThread> {
self.thread_store
.read_thread(ReadThreadParams {
thread_id: self.thread_id,
include_archived,
include_history,
})
.await
}
pub async fn update_memory_mode(
&self,
mode: ThreadMemoryMode,
include_archived: bool,
) -> ThreadStoreResult<()> {
self.thread_store
.update_thread_metadata(UpdateThreadMetadataParams {
thread_id: self.thread_id,
patch: ThreadMetadataPatch {
memory_mode: Some(mode),
..Default::default()
},
include_archived,
})
.await?;
Ok(())
}
pub async fn update_metadata(
&self,
patch: ThreadMetadataPatch,
include_archived: bool,
) -> ThreadStoreResult<StoredThread> {
self.thread_store
.update_thread_metadata(UpdateThreadMetadataParams {
thread_id: self.thread_id,
patch,
include_archived,
})
.await
}
/// Returns the live local rollout path for legacy local-only callers.
///
/// Remote stores do not expose rollout files, so they return `Ok(None)`.
pub async fn local_rollout_path(&self) -> ThreadStoreResult<Option<PathBuf>> {
let Some(local_store) = self
.thread_store
.as_any()
.downcast_ref::<LocalThreadStore>()
else {
return Ok(None);
};
local_store
.live_rollout_path(self.thread_id)
.await
.map(Some)
}
}