mirror of
https://github.com/openai/codex.git
synced 2026-05-03 02:46:39 +00:00
[codex] Route live thread writes through ThreadStore (#18882)
Begin migrating the thread write codepaths to ThreadStore. This starts using ThreadStore inside of core session code, not only in the app server code. Rework the interfaces around thread recording/persistence. We're left with the following: * `ThreadManager`: owns the process-level registry of loaded threads and handles cross-thread orchestration: start, resume, fork, lookup, remove, and route ops to running CodexThreads. * `CodexThread`: represents one loaded/running thread from the outside. It is the handle app-server and callers use to submit ops, inspect session metadata, and shut the thread down. * `LiveThread`: session-owned persistence lifecycle handle for one active thread. Core session code uses it to append rollout items, materialize lazy persistence, flush, shutdown, discard init-failed writers, and load that thread’s persisted history. * `ThreadStore`: storage backend abstraction. It answers “how are threads persisted, read, listed, updated, archived?” Local and remote implementations live behind this trait. * `LocalThreadStore`: local ThreadStore implementation. It owns the file/sqlite-specific details and keeps RolloutRecorder as a local implementation detail. This is a few too many Thread abstractions for my liking, but they do all represent different concepts / needs / layers. Migration note: in places where the core code explicitly requires a path, rather than a thread ID, throw an error if we're running with a remote store. Cover the new local live-writer lifecycle with focused tests and preserve app-server thread-start behavior, including ephemeral pathless sessions.
This commit is contained in:
176
codex-rs/thread-store/src/live_thread.rs
Normal file
176
codex-rs/thread-store/src/live_thread.rs
Normal file
@@ -0,0 +1,176 @@
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
|
||||
use codex_protocol::ThreadId;
|
||||
use codex_protocol::protocol::RolloutItem;
|
||||
use codex_protocol::protocol::ThreadMemoryMode;
|
||||
use tracing::warn;
|
||||
|
||||
use crate::AppendThreadItemsParams;
|
||||
use crate::CreateThreadParams;
|
||||
use crate::LoadThreadHistoryParams;
|
||||
use crate::LocalThreadStore;
|
||||
use crate::ResumeThreadParams;
|
||||
use crate::StoredThreadHistory;
|
||||
use crate::ThreadMetadataPatch;
|
||||
use crate::ThreadStore;
|
||||
use crate::ThreadStoreResult;
|
||||
use crate::UpdateThreadMetadataParams;
|
||||
|
||||
/// Handle for an active thread's persistence lifecycle.
|
||||
///
|
||||
/// `LiveThread` keeps lifecycle decisions with the caller while delegating storage details to
|
||||
/// [`ThreadStore`]. Local stores may use a rollout file internally and remote stores may use a
|
||||
/// service, but session code should only need this handle for the active thread.
|
||||
#[derive(Clone)]
|
||||
pub struct LiveThread {
|
||||
thread_id: ThreadId,
|
||||
thread_store: Arc<dyn ThreadStore>,
|
||||
}
|
||||
|
||||
/// Owns a live thread while session initialization is still fallible.
|
||||
///
|
||||
/// If initialization returns early after persistence has been opened, dropping this guard discards
|
||||
/// the live writer without forcing lazy in-memory state to become durable. Call [`commit`] once the
|
||||
/// session owns the live thread for normal operation.
|
||||
pub struct LiveThreadInitGuard {
|
||||
live_thread: Option<LiveThread>,
|
||||
}
|
||||
|
||||
impl LiveThreadInitGuard {
|
||||
pub fn new(live_thread: Option<LiveThread>) -> Self {
|
||||
Self { live_thread }
|
||||
}
|
||||
|
||||
pub fn as_ref(&self) -> Option<&LiveThread> {
|
||||
self.live_thread.as_ref()
|
||||
}
|
||||
|
||||
pub fn commit(&mut self) {
|
||||
self.live_thread = None;
|
||||
}
|
||||
|
||||
pub async fn discard(&mut self) {
|
||||
let Some(live_thread) = self.live_thread.take() else {
|
||||
return;
|
||||
};
|
||||
if let Err(err) = live_thread.discard().await {
|
||||
warn!("failed to discard thread persistence for failed session init: {err}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for LiveThreadInitGuard {
|
||||
fn drop(&mut self) {
|
||||
let Some(live_thread) = self.live_thread.take() else {
|
||||
return;
|
||||
};
|
||||
let Ok(handle) = tokio::runtime::Handle::try_current() else {
|
||||
warn!("failed to discard thread persistence for failed session init: no Tokio runtime");
|
||||
return;
|
||||
};
|
||||
handle.spawn(async move {
|
||||
if let Err(err) = live_thread.discard().await {
|
||||
warn!("failed to discard thread persistence for failed session init: {err}");
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
impl LiveThread {
|
||||
pub async fn create(
|
||||
thread_store: Arc<dyn ThreadStore>,
|
||||
params: CreateThreadParams,
|
||||
) -> ThreadStoreResult<Self> {
|
||||
let thread_id = params.thread_id;
|
||||
thread_store.create_thread(params).await?;
|
||||
Ok(Self {
|
||||
thread_id,
|
||||
thread_store,
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn resume(
|
||||
thread_store: Arc<dyn ThreadStore>,
|
||||
params: ResumeThreadParams,
|
||||
) -> ThreadStoreResult<Self> {
|
||||
let thread_id = params.thread_id;
|
||||
thread_store.resume_thread(params).await?;
|
||||
Ok(Self {
|
||||
thread_id,
|
||||
thread_store,
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn append_items(&self, items: &[RolloutItem]) -> ThreadStoreResult<()> {
|
||||
self.thread_store
|
||||
.append_items(AppendThreadItemsParams {
|
||||
thread_id: self.thread_id,
|
||||
items: items.to_vec(),
|
||||
})
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn persist(&self) -> ThreadStoreResult<()> {
|
||||
self.thread_store.persist_thread(self.thread_id).await
|
||||
}
|
||||
|
||||
pub async fn flush(&self) -> ThreadStoreResult<()> {
|
||||
self.thread_store.flush_thread(self.thread_id).await
|
||||
}
|
||||
|
||||
pub async fn shutdown(&self) -> ThreadStoreResult<()> {
|
||||
self.thread_store.shutdown_thread(self.thread_id).await
|
||||
}
|
||||
|
||||
pub async fn discard(&self) -> ThreadStoreResult<()> {
|
||||
self.thread_store.discard_thread(self.thread_id).await
|
||||
}
|
||||
|
||||
pub async fn load_history(
|
||||
&self,
|
||||
include_archived: bool,
|
||||
) -> ThreadStoreResult<StoredThreadHistory> {
|
||||
self.thread_store
|
||||
.load_history(LoadThreadHistoryParams {
|
||||
thread_id: self.thread_id,
|
||||
include_archived,
|
||||
})
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn update_memory_mode(
|
||||
&self,
|
||||
mode: ThreadMemoryMode,
|
||||
include_archived: bool,
|
||||
) -> ThreadStoreResult<()> {
|
||||
self.thread_store
|
||||
.update_thread_metadata(UpdateThreadMetadataParams {
|
||||
thread_id: self.thread_id,
|
||||
patch: ThreadMetadataPatch {
|
||||
memory_mode: Some(mode),
|
||||
..Default::default()
|
||||
},
|
||||
include_archived,
|
||||
})
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Returns the live local rollout path for legacy local-only callers.
|
||||
///
|
||||
/// Remote stores do not expose rollout files, so they return `Ok(None)`.
|
||||
pub async fn local_rollout_path(&self) -> ThreadStoreResult<Option<PathBuf>> {
|
||||
let Some(local_store) = self
|
||||
.thread_store
|
||||
.as_any()
|
||||
.downcast_ref::<LocalThreadStore>()
|
||||
else {
|
||||
return Ok(None);
|
||||
};
|
||||
local_store
|
||||
.live_rollout_path(self.thread_id)
|
||||
.await
|
||||
.map(Some)
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user