Compare commits

...

1 Commits

Author SHA1 Message Date
Tom Wiltzius
fa657c2524 Add thread store interface 2026-04-14 09:45:14 -07:00
9 changed files with 579 additions and 0 deletions

11
codex-rs/Cargo.lock generated
View File

@@ -2822,6 +2822,17 @@ dependencies = [
"tempfile",
]
[[package]]
name = "codex-thread-store"
version = "0.0.0"
dependencies = [
"async-trait",
"chrono",
"codex-protocol",
"serde",
"thiserror 2.0.18",
]
[[package]]
name = "codex-tools"
version = "0.0.0"

View File

@@ -87,6 +87,7 @@ members = [
"state",
"terminal-detection",
"test-binary-support",
"thread-store",
"codex-experimental-api-macros",
"plugin",
]
@@ -165,6 +166,7 @@ codex-state = { path = "state" }
codex-stdio-to-uds = { path = "stdio-to-uds" }
codex-terminal-detection = { path = "terminal-detection" }
codex-test-binary-support = { path = "test-binary-support" }
codex-thread-store = { path = "thread-store" }
codex-tools = { path = "tools" }
codex-tui = { path = "tui" }
codex-utils-absolute-path = { path = "utils/absolute-path" }

View File

@@ -0,0 +1,6 @@
load("//:defs.bzl", "codex_rust_crate")
codex_rust_crate(
name = "thread-store",
crate_name = "codex_thread_store",
)

View File

@@ -0,0 +1,19 @@
[package]
edition.workspace = true
license.workspace = true
name = "codex-thread-store"
version.workspace = true
[lib]
name = "codex_thread_store"
path = "src/lib.rs"
[lints]
workspace = true
[dependencies]
async-trait = { workspace = true }
chrono = { workspace = true, features = ["serde"] }
codex-protocol = { workspace = true }
serde = { workspace = true, features = ["derive"] }
thiserror = { workspace = true }

View File

@@ -0,0 +1,50 @@
use codex_protocol::ThreadId;
/// Result type returned by thread-store operations.
pub type ThreadStoreResult<T> = Result<T, ThreadStoreError>;
/// Error type shared by thread-store implementations.
#[derive(Debug, thiserror::Error)]
pub enum ThreadStoreError {
/// The requested thread does not exist in this store.
#[error("thread {thread_id} not found")]
ThreadNotFound {
/// Thread id requested by the caller.
thread_id: ThreadId,
},
/// The request is syntactically valid but unsupported by this store.
#[error("unsupported thread-store operation `{operation}`")]
Unsupported {
/// Stable operation label suitable for logs and tests.
operation: &'static str,
},
/// The caller supplied invalid request data.
#[error("invalid thread-store request: {message}")]
InvalidRequest {
/// User-facing explanation of the invalid request.
message: String,
},
/// The operation conflicted with current store state.
#[error("thread-store conflict: {message}")]
Conflict {
/// User-facing explanation of the conflict.
message: String,
},
/// The backing store is temporarily unavailable.
#[error("thread store is unavailable: {message}")]
Unavailable {
/// User-facing explanation of the availability failure.
message: String,
},
/// Catch-all for implementation failures that do not fit a more specific category.
#[error("thread-store internal error: {message}")]
Internal {
/// User-facing explanation of the implementation failure.
message: String,
},
}

View File

@@ -0,0 +1,38 @@
//! Storage-neutral thread persistence interfaces.
//!
//! Application code should treat [`codex_protocol::ThreadId`] as the only durable thread handle.
//! Implementations are responsible for resolving that id to local rollout files, RPC requests, or
//! any other backing store.
mod error;
mod recorder;
mod store;
mod types;
pub use error::ThreadStoreError;
pub use error::ThreadStoreResult;
pub use recorder::ThreadRecorder;
pub use store::ThreadStore;
pub use types::AppendThreadItemsParams;
pub use types::ArchiveThreadParams;
pub use types::CreateThreadParams;
pub use types::FindThreadByNameParams;
pub use types::FindThreadSpawnByPathParams;
pub use types::GitInfoPatch;
pub use types::ListThreadSpawnEdgesParams;
pub use types::ListThreadsParams;
pub use types::LoadThreadHistoryParams;
pub use types::OptionalStringPatch;
pub use types::ReadThreadParams;
pub use types::ResolveLegacyPathParams;
pub use types::ResumeThreadRecorderParams;
pub use types::SetThreadNameParams;
pub use types::StoredThread;
pub use types::StoredThreadHistory;
pub use types::ThreadEventPersistenceMode;
pub use types::ThreadMetadataPatch;
pub use types::ThreadPage;
pub use types::ThreadSortKey;
pub use types::ThreadSpawnEdge;
pub use types::ThreadSpawnEdgeStatus;
pub use types::UpdateThreadMetadataParams;

View File

@@ -0,0 +1,24 @@
use async_trait::async_trait;
use codex_protocol::ThreadId;
use codex_protocol::protocol::RolloutItem;
use crate::ThreadStoreResult;
/// Live append handle for a thread.
#[async_trait]
pub trait ThreadRecorder: Send + Sync {
/// Returns the thread id this recorder appends to.
fn thread_id(&self) -> ThreadId;
/// Queues items for persistence according to this recorder's filtering policy.
async fn record_items(&self, items: &[RolloutItem]) -> ThreadStoreResult<()>;
/// Materializes the thread if persistence is lazy, then persists all queued items.
async fn persist(&self) -> ThreadStoreResult<()>;
/// Flushes all queued items and returns once they are durable/readable.
async fn flush(&self) -> ThreadStoreResult<()>;
/// Flushes pending items and closes the recorder.
async fn shutdown(&self) -> ThreadStoreResult<()>;
}

View File

@@ -0,0 +1,103 @@
use std::path::Path;
use async_trait::async_trait;
use codex_protocol::ThreadId;
use crate::AppendThreadItemsParams;
use crate::ArchiveThreadParams;
use crate::CreateThreadParams;
use crate::FindThreadByNameParams;
use crate::FindThreadSpawnByPathParams;
use crate::ListThreadSpawnEdgesParams;
use crate::ListThreadsParams;
use crate::LoadThreadHistoryParams;
use crate::ReadThreadParams;
use crate::ResolveLegacyPathParams;
use crate::ResumeThreadRecorderParams;
use crate::SetThreadNameParams;
use crate::StoredThread;
use crate::StoredThreadHistory;
use crate::ThreadPage;
use crate::ThreadRecorder;
use crate::ThreadSpawnEdge;
use crate::ThreadStoreResult;
use crate::UpdateThreadMetadataParams;
/// Storage-neutral thread persistence boundary.
#[async_trait]
pub trait ThreadStore: Send + Sync {
/// Creates a new thread and returns a live recorder for future appends.
async fn create_thread(
&self,
params: CreateThreadParams,
) -> ThreadStoreResult<Box<dyn ThreadRecorder>>;
/// Reopens a live recorder for an existing thread.
async fn resume_thread_recorder(
&self,
params: ResumeThreadRecorderParams,
) -> ThreadStoreResult<Box<dyn ThreadRecorder>>;
/// Appends items to a stored thread outside the live-recorder path.
async fn append_items(&self, params: AppendThreadItemsParams) -> ThreadStoreResult<()>;
/// Loads persisted history for resume, fork, rollback, and memory jobs.
async fn load_history(
&self,
params: LoadThreadHistoryParams,
) -> ThreadStoreResult<StoredThreadHistory>;
/// Reads a thread summary and optionally its persisted history.
async fn read_thread(&self, params: ReadThreadParams) -> ThreadStoreResult<StoredThread>;
/// Lists stored threads matching the supplied filters.
async fn list_threads(&self, params: ListThreadsParams) -> ThreadStoreResult<ThreadPage>;
/// Finds the newest thread whose user-facing name exactly matches the supplied name.
async fn find_thread_by_name(
&self,
params: FindThreadByNameParams,
) -> ThreadStoreResult<Option<StoredThread>>;
/// Sets a user-facing thread name.
async fn set_thread_name(&self, params: SetThreadNameParams) -> ThreadStoreResult<()>;
/// Applies a mutable metadata patch and returns the updated thread.
async fn update_thread_metadata(
&self,
params: UpdateThreadMetadataParams,
) -> ThreadStoreResult<StoredThread>;
/// Archives a thread.
async fn archive_thread(&self, params: ArchiveThreadParams) -> ThreadStoreResult<()>;
/// Unarchives a thread and returns its updated metadata.
async fn unarchive_thread(
&self,
params: ArchiveThreadParams,
) -> ThreadStoreResult<StoredThread>;
/// Resolves a legacy rollout path to a thread id, if this store supports local path lookup.
async fn resolve_legacy_path(
&self,
params: ResolveLegacyPathParams,
) -> ThreadStoreResult<Option<ThreadId>>;
/// Persists or replaces a thread-spawn parent-child edge.
async fn upsert_thread_spawn_edge(&self, edge: ThreadSpawnEdge) -> ThreadStoreResult<()>;
/// Lists thread-spawn children or descendants.
async fn list_thread_spawn_edges(
&self,
params: ListThreadSpawnEdgesParams,
) -> ThreadStoreResult<Vec<ThreadSpawnEdge>>;
/// Finds a thread-spawn child or descendant by canonical agent path.
async fn find_thread_spawn_by_path(
&self,
params: FindThreadSpawnByPathParams,
) -> ThreadStoreResult<Option<ThreadId>>;
/// Returns true if this store can resolve the supplied legacy path.
fn supports_legacy_path(&self, path: &Path) -> bool;
}

View File

@@ -0,0 +1,326 @@
use std::path::PathBuf;
use chrono::DateTime;
use chrono::Utc;
use codex_protocol::ThreadId;
use codex_protocol::config_types::ServiceTier;
use codex_protocol::dynamic_tools::DynamicToolSpec;
use codex_protocol::models::BaseInstructions;
use codex_protocol::openai_models::ReasoningEffort;
use codex_protocol::protocol::AskForApproval;
use codex_protocol::protocol::GitInfo;
use codex_protocol::protocol::RolloutItem;
use codex_protocol::protocol::SandboxPolicy;
use codex_protocol::protocol::SessionSource;
use codex_protocol::protocol::TokenUsage;
use serde::Deserialize;
use serde::Serialize;
/// Controls how many event variants should be persisted for future replay.
#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
pub enum ThreadEventPersistenceMode {
/// Persist only the legacy minimal replay surface.
#[default]
Limited,
/// Persist the richer event surface used by app-server history reconstruction.
Extended,
}
/// Parameters required to create a persisted thread and its recorder.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct CreateThreadParams {
/// Thread id generated by Codex before opening persistence.
pub thread_id: ThreadId,
/// Source thread id when this thread is created as a fork.
pub forked_from_id: Option<ThreadId>,
/// Runtime source for the thread.
pub source: SessionSource,
/// Working directory captured at thread start.
pub cwd: PathBuf,
/// Originator value captured in session metadata.
pub originator: String,
/// CLI/app version creating the thread.
pub cli_version: String,
/// Model provider id selected for the thread.
pub model_provider: String,
/// Optional model selected at thread start.
pub model: Option<String>,
/// Optional service tier selected at thread start.
pub service_tier: Option<ServiceTier>,
/// Optional reasoning effort selected at thread start.
pub reasoning_effort: Option<ReasoningEffort>,
/// Approval mode selected at thread start.
pub approval_mode: AskForApproval,
/// Sandbox policy selected at thread start.
pub sandbox_policy: SandboxPolicy,
/// Base instructions persisted in session metadata.
pub base_instructions: BaseInstructions,
/// Dynamic tools available to the thread at startup.
pub dynamic_tools: Vec<DynamicToolSpec>,
/// Optional git metadata captured by the caller.
pub git_info: Option<GitInfo>,
/// Whether the recorder should persist the extended event surface.
pub event_persistence_mode: ThreadEventPersistenceMode,
}
/// Parameters required to reopen persistence for an existing thread.
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct ResumeThreadRecorderParams {
/// Existing thread id whose future items should be appended.
pub thread_id: ThreadId,
/// Whether archived threads may be reopened.
pub include_archived: bool,
/// Whether the recorder should persist the extended event surface.
pub event_persistence_mode: ThreadEventPersistenceMode,
}
/// Parameters for appending rollout items outside a live recorder.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct AppendThreadItemsParams {
/// Thread id to append to.
pub thread_id: ThreadId,
/// Items to append in order.
pub items: Vec<RolloutItem>,
/// Optional idempotency key supplied by callers that may retry the same append.
pub idempotency_key: Option<String>,
/// Optional timestamp override for the thread's updated-at value.
pub updated_at: Option<DateTime<Utc>>,
}
/// Parameters for loading persisted history for resume, fork, rollback, and memory jobs.
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct LoadThreadHistoryParams {
/// Thread id to load.
pub thread_id: ThreadId,
/// Whether archived threads are eligible.
pub include_archived: bool,
}
/// Persisted rollout history for a thread, without any filesystem path requirement.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct StoredThreadHistory {
/// Thread id represented by the history.
pub thread_id: ThreadId,
/// Persisted rollout items in replay order.
pub items: Vec<RolloutItem>,
}
/// Parameters for reading a thread summary and optionally its replay history.
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct ReadThreadParams {
/// Thread id to read.
pub thread_id: ThreadId,
/// Whether archived threads are eligible.
pub include_archived: bool,
/// Whether persisted rollout items should be included in the response.
pub include_history: bool,
}
/// The sort key to use when listing stored threads.
#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
pub enum ThreadSortKey {
/// Sort by the thread creation timestamp.
#[default]
CreatedAt,
/// Sort by the thread last-update timestamp.
UpdatedAt,
}
/// Parameters for listing threads.
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct ListThreadsParams {
/// Maximum number of threads to return.
pub page_size: usize,
/// Opaque cursor returned by a previous list call.
pub cursor: Option<String>,
/// Sort order requested by the caller.
pub sort_key: ThreadSortKey,
/// Allowed session sources. Empty means implementation default.
pub allowed_sources: Vec<SessionSource>,
/// Optional model provider filter. `None` means implementation default, while an empty vector
/// means all providers.
pub model_providers: Option<Vec<String>>,
/// Whether archived threads should be listed instead of active threads.
pub archived: bool,
/// Optional exact working-directory filter.
pub cwd: Option<PathBuf>,
/// Optional substring/full-text search term for thread title/preview.
pub search_term: Option<String>,
}
/// A page of stored thread records.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ThreadPage {
/// Threads returned for this page.
pub items: Vec<StoredThread>,
/// Opaque cursor to continue listing.
pub next_cursor: Option<String>,
/// Number of backing rows/files/items scanned, when available.
pub scanned: Option<usize>,
}
/// Store-owned thread metadata used by list/read/resume responses.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct StoredThread {
/// Thread id.
pub thread_id: ThreadId,
/// Source thread id when this thread was forked from another thread.
pub forked_from_id: Option<ThreadId>,
/// Best available user-facing preview, usually the first user message.
pub preview: String,
/// Optional user-facing thread name/title.
pub name: Option<String>,
/// Model provider id associated with the thread.
pub model_provider: String,
/// Latest observed model, if known.
pub model: Option<String>,
/// Latest observed service tier, if known.
pub service_tier: Option<ServiceTier>,
/// Latest observed reasoning effort, if known.
pub reasoning_effort: Option<ReasoningEffort>,
/// Thread creation timestamp.
pub created_at: DateTime<Utc>,
/// Thread last-update timestamp.
pub updated_at: DateTime<Utc>,
/// Thread archive timestamp, if archived.
pub archived_at: Option<DateTime<Utc>>,
/// Working directory captured for the thread.
pub cwd: PathBuf,
/// Version of the CLI/app that created the thread.
pub cli_version: String,
/// Runtime source for the thread.
pub source: SessionSource,
/// Optional random nickname for thread-spawn sub-agents.
pub agent_nickname: Option<String>,
/// Optional role for thread-spawn sub-agents.
pub agent_role: Option<String>,
/// Optional canonical path for thread-spawn sub-agents.
pub agent_path: Option<String>,
/// Optional Git metadata captured for the thread.
pub git_info: Option<GitInfo>,
/// Approval mode captured for the thread.
pub approval_mode: AskForApproval,
/// Sandbox policy captured for the thread.
pub sandbox_policy: SandboxPolicy,
/// Last observed token usage.
pub token_usage: Option<TokenUsage>,
/// First user message observed for this thread, if any.
pub first_user_message: Option<String>,
/// Persisted history, populated only when requested.
pub history: Option<StoredThreadHistory>,
}
/// Parameters for setting a user-facing thread name.
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct SetThreadNameParams {
/// Thread id to update.
pub thread_id: ThreadId,
/// Normalized thread name.
pub name: String,
}
/// Optional field patch where omission leaves a value unchanged and `Some(None)` clears it.
pub type OptionalStringPatch = Option<Option<String>>;
/// Patch for thread Git metadata.
#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
pub struct GitInfoPatch {
/// Replacement commit SHA, clear request, or no-op.
pub sha: OptionalStringPatch,
/// Replacement branch name, clear request, or no-op.
pub branch: OptionalStringPatch,
/// Replacement origin URL, clear request, or no-op.
pub origin_url: OptionalStringPatch,
}
/// Patch for mutable thread metadata.
#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
pub struct ThreadMetadataPatch {
/// Optional user-facing thread title.
pub name: OptionalStringPatch,
/// Optional Git metadata patch.
pub git_info: Option<GitInfoPatch>,
}
/// Parameters for patching mutable thread metadata.
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct UpdateThreadMetadataParams {
/// Thread id to update.
pub thread_id: ThreadId,
/// Patch to apply.
pub patch: ThreadMetadataPatch,
}
/// Parameters for archiving or unarchiving a thread.
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct ArchiveThreadParams {
/// Thread id to archive or unarchive.
pub thread_id: ThreadId,
}
/// Parameters for finding a thread by its exact user-facing name.
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct FindThreadByNameParams {
/// Exact normalized thread name to find.
pub name: String,
/// Whether archived threads are eligible.
pub include_archived: bool,
/// Optional exact working-directory filter.
pub cwd: Option<PathBuf>,
/// Allowed session sources. Empty means implementation default.
pub allowed_sources: Vec<SessionSource>,
/// Optional model provider filter. `None` means implementation default, while an empty vector
/// means all providers.
pub model_providers: Option<Vec<String>>,
}
/// Parameters for resolving an old filesystem rollout path to a thread id.
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct ResolveLegacyPathParams {
/// Filesystem rollout path supplied by legacy callers.
pub path: PathBuf,
/// Whether archived threads are eligible.
pub include_archived: bool,
}
/// Status of a parent-child thread-spawn edge.
#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum ThreadSpawnEdgeStatus {
/// Child thread is still active from the parent thread's perspective.
Open,
/// Child thread has been closed from the parent thread's perspective.
Closed,
}
/// Parent-child relationship created by thread-spawn tools.
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct ThreadSpawnEdge {
/// Parent thread id.
pub parent_thread_id: ThreadId,
/// Child thread id.
pub child_thread_id: ThreadId,
/// Current edge status.
pub status: ThreadSpawnEdgeStatus,
}
/// Parameters for listing thread-spawn children or descendants.
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct ListThreadSpawnEdgesParams {
/// Parent thread id to list from.
pub thread_id: ThreadId,
/// Whether descendants should be traversed recursively.
pub recursive: bool,
/// Optional status filter.
pub status: Option<ThreadSpawnEdgeStatus>,
}
/// Parameters for finding a thread-spawn child or descendant by canonical agent path.
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct FindThreadSpawnByPathParams {
/// Parent thread id to search from.
pub thread_id: ThreadId,
/// Whether descendants should be searched recursively.
pub recursive: bool,
/// Canonical agent path to find.
pub agent_path: String,
}