Compare commits

...

1 Commits

Author SHA1 Message Date
jif-oai
131e35119b feat: memories 1 2026-02-02 20:57:17 +00:00
5 changed files with 290 additions and 0 deletions

View File

@@ -0,0 +1,20 @@
CREATE TABLE memory_summaries (
thread_id TEXT PRIMARY KEY,
cwd TEXT NOT NULL,
summary TEXT NOT NULL,
created_at INTEGER NOT NULL,
updated_at INTEGER NOT NULL
);
CREATE INDEX idx_memory_summaries_cwd_updated_at
ON memory_summaries(cwd, updated_at DESC, thread_id);
CREATE TABLE memory_summary_locks (
cwd TEXT PRIMARY KEY,
owner_id TEXT NOT NULL,
acquired_at INTEGER NOT NULL,
expires_at INTEGER NOT NULL
);
CREATE INDEX idx_memory_summary_locks_expires_at
ON memory_summary_locks(expires_at);

View File

@@ -14,6 +14,8 @@ mod runtime;
pub use model::LogEntry;
pub use model::LogQuery;
pub use model::LogRow;
pub use model::MemorySummary;
pub use model::MemorySummaryLock;
/// Preferred entrypoint: owns configuration and metrics.
pub use runtime::StateRuntime;

View File

@@ -0,0 +1,19 @@
use codex_protocol::ThreadId;
use std::path::PathBuf;
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct MemorySummary {
pub thread_id: ThreadId,
pub cwd: PathBuf,
pub summary: String,
pub created_at: i64,
pub updated_at: i64,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct MemorySummaryLock {
pub cwd: PathBuf,
pub owner_id: String,
pub acquired_at: i64,
pub expires_at: i64,
}

View File

@@ -1,9 +1,12 @@
mod log;
mod memory_summary;
mod thread_metadata;
pub use log::LogEntry;
pub use log::LogQuery;
pub use log::LogRow;
pub use memory_summary::MemorySummary;
pub use memory_summary::MemorySummaryLock;
pub use thread_metadata::Anchor;
pub use thread_metadata::BackfillStats;
pub use thread_metadata::ExtractionOutcome;

View File

@@ -8,6 +8,8 @@ use crate::ThreadMetadataBuilder;
use crate::ThreadsPage;
use crate::apply_rollout_item;
use crate::migrations::MIGRATOR;
use crate::model::MemorySummary;
use crate::model::MemorySummaryLock;
use crate::model::ThreadRow;
use crate::model::anchor_from_item;
use crate::model::datetime_to_epoch_seconds;
@@ -275,6 +277,199 @@ FROM threads
Ok(max_id.unwrap_or(0))
}
/// Upsert a memory summary for a thread.
pub async fn upsert_memory_summary(&self, summary: &MemorySummary) -> anyhow::Result<()> {
sqlx::query(
r#"
INSERT INTO memory_summaries (
thread_id,
cwd,
summary,
created_at,
updated_at
) VALUES (?, ?, ?, ?, ?)
ON CONFLICT(thread_id) DO UPDATE SET
cwd = excluded.cwd,
summary = excluded.summary,
updated_at = excluded.updated_at
"#,
)
.bind(summary.thread_id.to_string())
.bind(summary.cwd.to_string_lossy().as_ref())
.bind(&summary.summary)
.bind(summary.created_at)
.bind(summary.updated_at)
.execute(self.pool.as_ref())
.await?;
Ok(())
}
/// Fetch the most recent memory summaries for a given cwd.
pub async fn list_memory_summaries_for_cwd(
&self,
cwd: &Path,
limit: usize,
) -> anyhow::Result<Vec<MemorySummary>> {
let rows = sqlx::query(
r#"
SELECT thread_id, cwd, summary, created_at, updated_at
FROM memory_summaries
WHERE cwd = ?
ORDER BY updated_at DESC, thread_id DESC
LIMIT ?
"#,
)
.bind(cwd.to_string_lossy().as_ref())
.bind(limit as i64)
.fetch_all(self.pool.as_ref())
.await?;
rows.into_iter()
.map(|row| {
let thread_id: String = row.try_get("thread_id")?;
Ok(MemorySummary {
thread_id: ThreadId::try_from(thread_id)?,
cwd: PathBuf::from(row.try_get::<String, _>("cwd")?),
summary: row.try_get("summary")?,
created_at: row.try_get("created_at")?,
updated_at: row.try_get("updated_at")?,
})
})
.collect()
}
/// Fetch memory summaries for a set of thread ids.
pub async fn list_memory_summaries_for_threads(
&self,
thread_ids: &[ThreadId],
) -> anyhow::Result<Vec<MemorySummary>> {
if thread_ids.is_empty() {
return Ok(Vec::new());
}
let mut builder = QueryBuilder::<Sqlite>::new(
"SELECT thread_id, cwd, summary, created_at, updated_at FROM memory_summaries WHERE thread_id IN (",
);
let mut separated = builder.separated(", ");
for thread_id in thread_ids {
separated.push_bind(thread_id.to_string());
}
separated.push_unseparated(")");
let rows = builder.build().fetch_all(self.pool.as_ref()).await?;
rows.into_iter()
.map(|row| {
let thread_id: String = row.try_get("thread_id")?;
Ok(MemorySummary {
thread_id: ThreadId::try_from(thread_id)?,
cwd: PathBuf::from(row.try_get::<String, _>("cwd")?),
summary: row.try_get("summary")?,
created_at: row.try_get("created_at")?,
updated_at: row.try_get("updated_at")?,
})
})
.collect()
}
/// Acquire a per-cwd summary lock if it is free or expired.
pub async fn try_acquire_memory_summary_lock(
&self,
cwd: &Path,
owner_id: &str,
now_ts: i64,
ttl_secs: i64,
) -> anyhow::Result<bool> {
let expires_at = now_ts.saturating_add(ttl_secs);
let mut txn = self.pool.begin().await?;
let updated = sqlx::query(
r#"
UPDATE memory_summary_locks
SET owner_id = ?, acquired_at = ?, expires_at = ?
WHERE cwd = ? AND expires_at <= ?
"#,
)
.bind(owner_id)
.bind(now_ts)
.bind(expires_at)
.bind(cwd.to_string_lossy().as_ref())
.bind(now_ts)
.execute(&mut *txn)
.await?
.rows_affected();
if updated == 0 {
let inserted = sqlx::query(
r#"
INSERT OR IGNORE INTO memory_summary_locks (cwd, owner_id, acquired_at, expires_at)
VALUES (?, ?, ?, ?)
"#,
)
.bind(cwd.to_string_lossy().as_ref())
.bind(owner_id)
.bind(now_ts)
.bind(expires_at)
.execute(&mut *txn)
.await?
.rows_affected();
if inserted == 0 {
txn.rollback().await?;
return Ok(false);
}
}
txn.commit().await?;
Ok(true)
}
/// Release a per-cwd summary lock if owned by the caller.
pub async fn release_memory_summary_lock(
&self,
cwd: &Path,
owner_id: &str,
) -> anyhow::Result<()> {
sqlx::query(
r#"
DELETE FROM memory_summary_locks
WHERE cwd = ? AND owner_id = ?
"#,
)
.bind(cwd.to_string_lossy().as_ref())
.bind(owner_id)
.execute(self.pool.as_ref())
.await?;
Ok(())
}
/// Fetch the current memory summary lock, if any.
pub async fn get_memory_summary_lock(
&self,
cwd: &Path,
) -> anyhow::Result<Option<MemorySummaryLock>> {
let row = sqlx::query(
r#"
SELECT cwd, owner_id, acquired_at, expires_at
FROM memory_summary_locks
WHERE cwd = ?
"#,
)
.bind(cwd.to_string_lossy().as_ref())
.fetch_optional(self.pool.as_ref())
.await?;
let Some(row) = row else {
return Ok(None);
};
Ok(Some(MemorySummaryLock {
cwd: PathBuf::from(row.try_get::<String, _>("cwd")?),
owner_id: row.try_get("owner_id")?,
acquired_at: row.try_get("acquired_at")?,
expires_at: row.try_get("expires_at")?,
}))
}
/// List thread ids using the underlying database (no rollout scanning).
pub async fn list_thread_ids(
&self,
@@ -305,6 +500,57 @@ FROM threads
.collect()
}
/// List recent threads for a cwd using the underlying database (no rollout scanning).
pub async fn list_recent_threads_for_cwd(
&self,
cwd: &Path,
limit: usize,
sort_key: SortKey,
allowed_sources: &[String],
model_providers: Option<&[String]>,
archived_only: bool,
) -> anyhow::Result<Vec<ThreadMetadata>> {
let mut builder = QueryBuilder::<Sqlite>::new(
r#"
SELECT
id,
rollout_path,
created_at,
updated_at,
source,
model_provider,
cwd,
title,
sandbox_policy,
approval_mode,
tokens_used,
has_user_event,
archived_at,
git_sha,
git_branch,
git_origin_url
FROM threads
"#,
);
push_thread_filters(
&mut builder,
archived_only,
allowed_sources,
model_providers,
None,
sort_key,
);
let cwd_value = cwd.to_string_lossy();
builder.push(" AND cwd = ");
builder.push_bind(cwd_value.as_ref());
push_thread_order_and_limit(&mut builder, sort_key, limit);
let rows = builder.build().fetch_all(self.pool.as_ref()).await?;
rows.into_iter()
.map(|row| ThreadRow::try_from_row(&row).and_then(ThreadMetadata::try_from))
.collect()
}
/// Insert or replace thread metadata directly.
pub async fn upsert_thread(&self, metadata: &crate::ThreadMetadata) -> anyhow::Result<()> {
sqlx::query(