use std::path::PathBuf; use std::sync::Arc; use codex_protocol::ThreadId; use codex_protocol::protocol::RolloutItem; use codex_protocol::protocol::ThreadMemoryMode; use tracing::warn; use crate::AppendThreadItemsParams; use crate::CreateThreadParams; use crate::LoadThreadHistoryParams; use crate::LocalThreadStore; use crate::ResumeThreadParams; use crate::StoredThreadHistory; use crate::ThreadMetadataPatch; use crate::ThreadStore; use crate::ThreadStoreResult; use crate::UpdateThreadMetadataParams; /// Handle for an active thread's persistence lifecycle. /// /// `LiveThread` keeps lifecycle decisions with the caller while delegating storage details to /// [`ThreadStore`]. Local stores may use a rollout file internally and remote stores may use a /// service, but session code should only need this handle for the active thread. #[derive(Clone)] pub struct LiveThread { thread_id: ThreadId, thread_store: Arc, } /// Owns a live thread while session initialization is still fallible. /// /// If initialization returns early after persistence has been opened, dropping this guard discards /// the live writer without forcing lazy in-memory state to become durable. Call [`commit`] once the /// session owns the live thread for normal operation. pub struct LiveThreadInitGuard { live_thread: Option, } impl LiveThreadInitGuard { pub fn new(live_thread: Option) -> Self { Self { live_thread } } pub fn as_ref(&self) -> Option<&LiveThread> { self.live_thread.as_ref() } pub fn commit(&mut self) { self.live_thread = None; } pub async fn discard(&mut self) { let Some(live_thread) = self.live_thread.take() else { return; }; if let Err(err) = live_thread.discard().await { warn!("failed to discard thread persistence for failed session init: {err}"); } } } impl Drop for LiveThreadInitGuard { fn drop(&mut self) { let Some(live_thread) = self.live_thread.take() else { return; }; let Ok(handle) = tokio::runtime::Handle::try_current() else { warn!("failed to discard thread persistence for failed session init: no Tokio runtime"); return; }; handle.spawn(async move { if let Err(err) = live_thread.discard().await { warn!("failed to discard thread persistence for failed session init: {err}"); } }); } } impl LiveThread { pub async fn create( thread_store: Arc, params: CreateThreadParams, ) -> ThreadStoreResult { let thread_id = params.thread_id; thread_store.create_thread(params).await?; Ok(Self { thread_id, thread_store, }) } pub async fn resume( thread_store: Arc, params: ResumeThreadParams, ) -> ThreadStoreResult { let thread_id = params.thread_id; thread_store.resume_thread(params).await?; Ok(Self { thread_id, thread_store, }) } pub async fn append_items(&self, items: &[RolloutItem]) -> ThreadStoreResult<()> { self.thread_store .append_items(AppendThreadItemsParams { thread_id: self.thread_id, items: items.to_vec(), }) .await } pub async fn persist(&self) -> ThreadStoreResult<()> { self.thread_store.persist_thread(self.thread_id).await } pub async fn flush(&self) -> ThreadStoreResult<()> { self.thread_store.flush_thread(self.thread_id).await } pub async fn shutdown(&self) -> ThreadStoreResult<()> { self.thread_store.shutdown_thread(self.thread_id).await } pub async fn discard(&self) -> ThreadStoreResult<()> { self.thread_store.discard_thread(self.thread_id).await } pub async fn load_history( &self, include_archived: bool, ) -> ThreadStoreResult { self.thread_store .load_history(LoadThreadHistoryParams { thread_id: self.thread_id, include_archived, }) .await } pub async fn update_memory_mode( &self, mode: ThreadMemoryMode, include_archived: bool, ) -> ThreadStoreResult<()> { self.thread_store .update_thread_metadata(UpdateThreadMetadataParams { thread_id: self.thread_id, patch: ThreadMetadataPatch { memory_mode: Some(mode), ..Default::default() }, include_archived, }) .await?; Ok(()) } /// Returns the live local rollout path for legacy local-only callers. /// /// Remote stores do not expose rollout files, so they return `Ok(None)`. pub async fn local_rollout_path(&self) -> ThreadStoreResult> { let Some(local_store) = self .thread_store .as_any() .downcast_ref::() else { return Ok(None); }; local_store .live_rollout_path(self.thread_id) .await .map(Some) } }