From 92e658df03d076e3ca8a60236e6e3c3dd03b807a Mon Sep 17 00:00:00 2001 From: Francis Chalissery Date: Mon, 18 May 2026 17:53:26 -0700 Subject: [PATCH] Add SQLite-backed thread content search --- .../app-server/tests/suite/v2/thread_list.rs | 34 +++- codex-rs/rollout/src/metadata.rs | 179 +++++++++++++++--- codex-rs/rollout/src/recorder.rs | 3 + codex-rs/rollout/src/recorder_tests.rs | 19 +- codex-rs/rollout/src/state_db.rs | 36 ++++ .../state/migrations/0033_thread_search.sql | 12 ++ .../0034_thread_search_backfill_state.sql | 23 +++ codex-rs/state/src/extract.rs | 71 +++++++ codex-rs/state/src/lib.rs | 2 + codex-rs/state/src/model/thread_metadata.rs | 2 + codex-rs/state/src/runtime/backfill.rs | 109 +++++++++++ codex-rs/state/src/runtime/threads.rs | 85 ++++++++- 12 files changed, 532 insertions(+), 43 deletions(-) create mode 100644 codex-rs/state/migrations/0033_thread_search.sql create mode 100644 codex-rs/state/migrations/0034_thread_search_backfill_state.sql diff --git a/codex-rs/app-server/tests/suite/v2/thread_list.rs b/codex-rs/app-server/tests/suite/v2/thread_list.rs index 80254d8f47..8d474d94ef 100644 --- a/codex-rs/app-server/tests/suite/v2/thread_list.rs +++ b/codex-rs/app-server/tests/suite/v2/thread_list.rs @@ -35,6 +35,7 @@ use codex_protocol::protocol::SessionSource as CoreSessionSource; use codex_protocol::protocol::SubAgentSource; use core_test_support::responses; use pretty_assertions::assert_eq; +use serde_json::json; use std::cmp::Reverse; use std::fs; use std::fs::FileTimes; @@ -174,6 +175,27 @@ fn set_rollout_cwd(path: &Path, cwd: &Path) -> Result<()> { Ok(()) } +fn append_user_message_to_rollout(path: &Path, timestamp: &str, message: &str) -> Result<()> { + let mut content = fs::read_to_string(path)?; + content.push_str( + format!( + "{}\n", + json!({ + "timestamp": timestamp, + "type": "event_msg", + "payload": { + "type": "user_message", + "message": message, + "kind": "plain" + } + }) + ) + .as_str(), + ); + fs::write(path, content)?; + Ok(()) +} + #[tokio::test] async fn thread_list_basic_empty() -> Result<()> { let codex_home = TempDir::new()?; @@ -592,10 +614,20 @@ sqlite = true codex_home.path(), "2025-01-02T12-00-00", "2025-01-02T12:00:00Z", - "needle suffix", + "no body hit in preview", Some("mock_provider"), /*git_info*/ None, )?; + append_user_message_to_rollout( + rollout_path( + codex_home.path(), + "2025-01-02T12-00-00", + newer_match.as_str(), + ) + .as_path(), + "2025-01-02T12:05:00Z", + "later message with needle suffix", + )?; // `thread/list` applies `search_term` on the sqlite fast path. This test creates // rollouts manually, so mark the DB backfill complete and then run an unsearched diff --git a/codex-rs/rollout/src/metadata.rs b/codex-rs/rollout/src/metadata.rs index 2dd2df3a41..3d67b10f55 100644 --- a/codex-rs/rollout/src/metadata.rs +++ b/codex-rs/rollout/src/metadata.rs @@ -23,6 +23,7 @@ use codex_state::DB_METRIC_BACKFILL_DURATION_MS; use codex_state::ExtractionOutcome; use codex_state::ThreadMetadataBuilder; use codex_state::apply_rollout_item; +use codex_state::thread_search_text_from_rollout_items; use std::path::Path; use std::path::PathBuf; use tracing::info; @@ -121,6 +122,7 @@ pub async fn extract_metadata_from_rollout( } Ok(ExtractionOutcome { metadata, + search_text: thread_search_text_from_rollout_items(items.as_slice()), memory_mode: items.iter().rev().find_map(|item| match item { RolloutItem::SessionMeta(meta_line) => meta_line.meta.memory_mode.clone(), RolloutItem::ResponseItem(_) @@ -210,33 +212,8 @@ pub(crate) async fn backfill_sessions_with_lease( } } - let sessions_root = codex_home.join(SESSIONS_SUBDIR); - let archived_root = codex_home.join(ARCHIVED_SESSIONS_SUBDIR); - let mut rollout_paths: Vec = Vec::new(); - for (root, archived) in [(sessions_root, false), (archived_root, true)] { - if !tokio::fs::try_exists(&root).await.unwrap_or(false) { - continue; - } - match collect_rollout_paths(&root).await { - Ok(paths) => { - rollout_paths.extend(paths.into_iter().map(|path| BackfillRolloutPath { - watermark: backfill_watermark_for_path(codex_home, &path), - path, - archived, - })); - } - Err(err) => { - warn!( - "failed to collect rollout paths under {}: {err}", - root.display() - ); - } - } - } - rollout_paths.sort_by(|a, b| a.watermark.cmp(&b.watermark)); - if let Some(last_watermark) = backfill_state.last_watermark.as_deref() { - rollout_paths.retain(|entry| entry.watermark.as_str() > last_watermark); - } + let rollout_paths = + rollout_paths_after_watermark(codex_home, backfill_state.last_watermark.as_deref()).await; let mut stats = BackfillStats { scanned: 0, @@ -259,6 +236,7 @@ pub(crate) async fn backfill_sessions_with_lease( ); } let mut metadata = outcome.metadata; + let search_text = outcome.search_text; metadata.cwd = normalize_cwd_for_state_db(&metadata.cwd); let memory_mode = outcome.memory_mode.unwrap_or_else(|| "enabled".to_string()); if let Ok(Some(existing_metadata)) = runtime.get_thread(metadata.id).await { @@ -274,6 +252,17 @@ pub(crate) async fn backfill_sessions_with_lease( stats.failed = stats.failed.saturating_add(1); warn!("failed to upsert rollout {}: {err}", rollout.path.display()); } else { + if let Err(err) = runtime + .replace_thread_search_text(metadata.id, search_text.as_slice()) + .await + { + stats.failed = stats.failed.saturating_add(1); + warn!( + "failed to index rollout search text {}: {err}", + rollout.path.display() + ); + continue; + } if let Err(err) = runtime .set_thread_memory_mode(metadata.id, memory_mode.as_str()) .await @@ -369,6 +358,108 @@ pub(crate) async fn backfill_sessions_with_lease( } } +pub(crate) async fn backfill_thread_search( + runtime: &codex_state::StateRuntime, + codex_home: &Path, + default_provider: &str, +) { + let backfill_state = match runtime.get_thread_search_backfill_state().await { + Ok(state) => state, + Err(err) => { + warn!( + "failed to read thread search backfill state at {}: {err}", + codex_home.display() + ); + return; + } + }; + if backfill_state.status == BackfillStatus::Complete { + return; + } + let claimed = match runtime + .try_claim_thread_search_backfill(BACKFILL_LEASE_SECONDS) + .await + { + Ok(claimed) => claimed, + Err(err) => { + warn!( + "failed to claim thread search backfill at {}: {err}", + codex_home.display() + ); + return; + } + }; + if !claimed { + return; + } + + let rollout_paths = + rollout_paths_after_watermark(codex_home, backfill_state.last_watermark.as_deref()).await; + let mut stats = BackfillStats { + scanned: 0, + upserted: 0, + failed: 0, + }; + let mut last_watermark = backfill_state.last_watermark.clone(); + for batch in rollout_paths.chunks(BACKFILL_BATCH_SIZE) { + for rollout in batch { + stats.scanned = stats.scanned.saturating_add(1); + match extract_metadata_from_rollout(&rollout.path, default_provider).await { + Ok(outcome) => { + if let Err(err) = runtime + .replace_thread_search_text( + outcome.metadata.id, + outcome.search_text.as_slice(), + ) + .await + { + stats.failed = stats.failed.saturating_add(1); + warn!( + "failed to backfill thread search text {}: {err}", + rollout.path.display() + ); + } else { + stats.upserted = stats.upserted.saturating_add(1); + } + } + Err(err) => { + stats.failed = stats.failed.saturating_add(1); + warn!( + "failed to extract thread search rollout {}: {err}", + rollout.path.display() + ); + } + } + } + if let Some(last_entry) = batch.last() { + if let Err(err) = runtime + .checkpoint_thread_search_backfill(last_entry.watermark.as_str()) + .await + { + warn!( + "failed to checkpoint thread search backfill at {}: {err}", + codex_home.display() + ); + } else { + last_watermark = Some(last_entry.watermark.clone()); + } + } + } + if let Err(err) = runtime + .mark_thread_search_backfill_complete(last_watermark.as_deref()) + .await + { + warn!( + "failed to mark thread search backfill complete at {}: {err}", + codex_home.display() + ); + } + info!( + "thread search backfill scanned={}, indexed={}, failed={}", + stats.scanned, stats.upserted, stats.failed + ); +} + #[derive(Debug, Clone)] struct BackfillRolloutPath { watermark: String, @@ -376,6 +467,40 @@ struct BackfillRolloutPath { archived: bool, } +async fn rollout_paths_after_watermark( + codex_home: &Path, + last_watermark: Option<&str>, +) -> Vec { + let sessions_root = codex_home.join(SESSIONS_SUBDIR); + let archived_root = codex_home.join(ARCHIVED_SESSIONS_SUBDIR); + let mut rollout_paths: Vec = Vec::new(); + for (root, archived) in [(sessions_root, false), (archived_root, true)] { + if !tokio::fs::try_exists(&root).await.unwrap_or(false) { + continue; + } + match collect_rollout_paths(&root).await { + Ok(paths) => { + rollout_paths.extend(paths.into_iter().map(|path| BackfillRolloutPath { + watermark: backfill_watermark_for_path(codex_home, &path), + path, + archived, + })); + } + Err(err) => { + warn!( + "failed to collect rollout paths under {}: {err}", + root.display() + ); + } + } + } + rollout_paths.sort_by(|a, b| a.watermark.cmp(&b.watermark)); + if let Some(last_watermark) = last_watermark { + rollout_paths.retain(|entry| entry.watermark.as_str() > last_watermark); + } + rollout_paths +} + fn backfill_watermark_for_path(codex_home: &Path, path: &Path) -> String { path.strip_prefix(codex_home) .unwrap_or(path) diff --git a/codex-rs/rollout/src/recorder.rs b/codex-rs/rollout/src/recorder.rs index 369c9f20a8..be9804b940 100644 --- a/codex-rs/rollout/src/recorder.rs +++ b/codex-rs/rollout/src/recorder.rs @@ -1771,6 +1771,9 @@ async fn sync_thread_state_after_write( || items .iter() .any(codex_state::rollout_item_affects_thread_metadata) + || items + .iter() + .any(codex_state::rollout_item_affects_thread_search) { state_db::apply_rollout_items( state_db_ctx, diff --git a/codex-rs/rollout/src/recorder_tests.rs b/codex-rs/rollout/src/recorder_tests.rs index 2063020be2..1911301a11 100644 --- a/codex-rs/rollout/src/recorder_tests.rs +++ b/codex-rs/rollout/src/recorder_tests.rs @@ -7,6 +7,7 @@ use codex_protocol::ThreadId; use codex_protocol::config_types::ReasoningSummary as ReasoningSummaryConfig; use codex_protocol::models::ResponseItem; use codex_protocol::protocol::AgentMessageEvent; +use codex_protocol::protocol::AgentReasoningEvent; use codex_protocol::protocol::AskForApproval; use codex_protocol::protocol::EventMsg; use codex_protocol::protocol::RolloutItem; @@ -582,11 +583,9 @@ async fn metadata_irrelevant_events_coalesce_state_db_updated_at() -> std::io::R let initial_first_user_message = initial_thread.first_user_message.clone(); recorder - .record_items(&[RolloutItem::EventMsg(EventMsg::AgentMessage( - AgentMessageEvent { - message: "assistant text".to_string(), - phase: None, - memory_citation: None, + .record_items(&[RolloutItem::EventMsg(EventMsg::AgentReasoning( + AgentReasoningEvent { + text: "metadata and search irrelevant reasoning".to_string(), }, ))]) .await?; @@ -595,7 +594,7 @@ async fn metadata_irrelevant_events_coalesce_state_db_updated_at() -> std::io::R let updated_thread = state_db .get_thread(thread_id) .await - .expect("thread should load after agent message") + .expect("thread should load after reasoning") .expect("thread should still exist"); assert_eq!(updated_thread.updated_at, initial_updated_at); @@ -608,11 +607,9 @@ async fn metadata_irrelevant_events_coalesce_state_db_updated_at() -> std::io::R tokio::time::sleep(THREAD_UPDATED_AT_TOUCH_INTERVAL + Duration::from_millis(10)).await; recorder - .record_items(&[RolloutItem::EventMsg(EventMsg::AgentMessage( - AgentMessageEvent { - message: "more assistant text".to_string(), - phase: None, - memory_citation: None, + .record_items(&[RolloutItem::EventMsg(EventMsg::AgentReasoning( + AgentReasoningEvent { + text: "later metadata and search irrelevant reasoning".to_string(), }, ))]) .await?; diff --git a/codex-rs/rollout/src/state_db.rs b/codex-rs/rollout/src/state_db.rs index d039e16d68..9d72fc17de 100644 --- a/codex-rs/rollout/src/state_db.rs +++ b/codex-rs/rollout/src/state_db.rs @@ -125,6 +125,11 @@ async fn try_init_with_roots_inner( ) })?; if backfill_state.status == codex_state::BackfillStatus::Complete { + start_thread_search_backfill( + runtime.clone(), + codex_home.clone(), + default_model_provider_id.clone(), + ); return Ok(runtime); } @@ -151,6 +156,11 @@ async fn try_init_with_roots_inner( ) })?; if backfill_state.status == codex_state::BackfillStatus::Complete { + start_thread_search_backfill( + runtime.clone(), + codex_home.clone(), + default_model_provider_id.clone(), + ); return Ok(runtime); } if wait_started.elapsed() >= STARTUP_BACKFILL_WAIT_TIMEOUT { @@ -178,6 +188,21 @@ async fn try_init_with_roots_inner( } } +fn start_thread_search_backfill( + runtime: StateDbHandle, + codex_home: PathBuf, + default_model_provider_id: String, +) { + tokio::spawn(async move { + metadata::backfill_thread_search( + runtime.as_ref(), + codex_home.as_path(), + default_model_provider_id.as_str(), + ) + .await; + }); +} + fn emit_startup_warning(message: &str) { warn!("{message}"); if !tracing::dispatcher::has_been_set() { @@ -485,6 +510,7 @@ pub async fn reconcile_rollout( } }; let mut metadata = outcome.metadata; + let search_text = outcome.search_text; let memory_mode = outcome.memory_mode.unwrap_or_else(|| "enabled".to_string()); metadata.cwd = normalize_cwd_for_state_db(&metadata.cwd); if let Ok(Some(existing_metadata)) = ctx.get_thread(metadata.id).await { @@ -506,6 +532,16 @@ pub async fn reconcile_rollout( ); return; } + if let Err(err) = ctx + .replace_thread_search_text(metadata.id, search_text.as_slice()) + .await + { + warn!( + "state db reconcile_rollout search index failed {}: {err}", + rollout_path.display() + ); + return; + } if let Err(err) = ctx .set_thread_memory_mode(metadata.id, memory_mode.as_str()) .await diff --git a/codex-rs/state/migrations/0033_thread_search.sql b/codex-rs/state/migrations/0033_thread_search.sql new file mode 100644 index 0000000000..17b8d00467 --- /dev/null +++ b/codex-rs/state/migrations/0033_thread_search.sql @@ -0,0 +1,12 @@ +CREATE VIRTUAL TABLE thread_search USING fts5( + thread_id UNINDEXED, + body +); + +UPDATE backfill_state +SET + status = 'pending', + last_watermark = NULL, + last_success_at = NULL, + updated_at = CAST(strftime('%s', 'now') AS INTEGER) +WHERE id = 1; diff --git a/codex-rs/state/migrations/0034_thread_search_backfill_state.sql b/codex-rs/state/migrations/0034_thread_search_backfill_state.sql new file mode 100644 index 0000000000..42a6295813 --- /dev/null +++ b/codex-rs/state/migrations/0034_thread_search_backfill_state.sql @@ -0,0 +1,23 @@ +CREATE TABLE thread_search_backfill_state ( + id INTEGER PRIMARY KEY CHECK (id = 1), + status TEXT NOT NULL, + last_watermark TEXT, + last_success_at INTEGER, + updated_at INTEGER NOT NULL +); + +INSERT INTO thread_search_backfill_state ( + id, + status, + last_watermark, + last_success_at, + updated_at +) +VALUES ( + 1, + 'pending', + NULL, + NULL, + CAST(strftime('%s', 'now') AS INTEGER) +) +ON CONFLICT(id) DO NOTHING; diff --git a/codex-rs/state/src/extract.rs b/codex-rs/state/src/extract.rs index d815d444ce..ab5ec01962 100644 --- a/codex-rs/state/src/extract.rs +++ b/codex-rs/state/src/extract.rs @@ -1,4 +1,5 @@ use crate::model::ThreadMetadata; +use codex_protocol::models::ContentItem; use codex_protocol::models::ResponseItem; use codex_protocol::protocol::EventMsg; use codex_protocol::protocol::RolloutItem; @@ -42,6 +43,58 @@ pub fn rollout_item_affects_thread_metadata(item: &RolloutItem) -> bool { } } +/// Return whether this rollout item contributes user-visible conversation text to search. +pub fn rollout_item_affects_thread_search(item: &RolloutItem) -> bool { + match item { + RolloutItem::EventMsg(EventMsg::AgentMessage(agent)) => !agent.message.trim().is_empty(), + RolloutItem::EventMsg(EventMsg::UserMessage(user)) => { + !strip_user_message_prefix(user.message.as_str()).is_empty() + } + RolloutItem::ResponseItem(ResponseItem::Message { role, content, .. }) => { + matches!(role.as_str(), "assistant" | "user") + && content.iter().any(content_item_has_search_text) + } + RolloutItem::SessionMeta(_) + | RolloutItem::TurnContext(_) + | RolloutItem::EventMsg(_) + | RolloutItem::ResponseItem(_) + | RolloutItem::Compacted(_) => false, + } +} + +/// Extract searchable user and assistant text from rollout items. +pub fn thread_search_text_from_rollout_items(items: &[RolloutItem]) -> Vec { + let mut chunks = Vec::new(); + for item in items { + match item { + RolloutItem::EventMsg(EventMsg::AgentMessage(agent)) => { + push_search_text(&mut chunks, agent.message.as_str()); + } + RolloutItem::EventMsg(EventMsg::UserMessage(user)) => { + push_search_text( + &mut chunks, + strip_user_message_prefix(user.message.as_str()), + ); + } + RolloutItem::ResponseItem(ResponseItem::Message { role, content, .. }) + if matches!(role.as_str(), "assistant" | "user") => + { + for content_item in content { + if let Some(text) = content_item_search_text(content_item) { + push_search_text(&mut chunks, text); + } + } + } + RolloutItem::SessionMeta(_) + | RolloutItem::TurnContext(_) + | RolloutItem::EventMsg(_) + | RolloutItem::ResponseItem(_) + | RolloutItem::Compacted(_) => {} + } + } + chunks +} + fn apply_session_meta_from_item(metadata: &mut ThreadMetadata, meta_line: &SessionMetaLine) { if metadata.id != meta_line.meta.id { // Ignore session_meta lines that don't match the canonical thread ID, @@ -125,6 +178,24 @@ fn strip_user_message_prefix(text: &str) -> &str { } } +fn content_item_has_search_text(item: &ContentItem) -> bool { + content_item_search_text(item).is_some_and(|text| !text.trim().is_empty()) +} + +fn content_item_search_text(item: &ContentItem) -> Option<&str> { + match item { + ContentItem::InputText { text } | ContentItem::OutputText { text } => Some(text.as_str()), + ContentItem::InputImage { .. } => None, + } +} + +fn push_search_text(chunks: &mut Vec, text: &str) { + let text = text.trim(); + if !text.is_empty() { + chunks.push(text.to_string()); + } +} + fn user_message_preview(user: &UserMessageEvent) -> Option { let message = strip_user_message_prefix(user.message.as_str()); if !message.is_empty() { diff --git a/codex-rs/state/src/lib.rs b/codex-rs/state/src/lib.rs index ea9a2b089d..a2e2ccde3f 100644 --- a/codex-rs/state/src/lib.rs +++ b/codex-rs/state/src/lib.rs @@ -23,6 +23,8 @@ pub use runtime::StateRuntime; /// Most consumers should prefer [`StateRuntime`]. pub use extract::apply_rollout_item; pub use extract::rollout_item_affects_thread_metadata; +pub use extract::rollout_item_affects_thread_search; +pub use extract::thread_search_text_from_rollout_items; pub use model::AgentJob; pub use model::AgentJobCreateParams; pub use model::AgentJobItem; diff --git a/codex-rs/state/src/model/thread_metadata.rs b/codex-rs/state/src/model/thread_metadata.rs index f0b8d315e5..4695d6ae59 100644 --- a/codex-rs/state/src/model/thread_metadata.rs +++ b/codex-rs/state/src/model/thread_metadata.rs @@ -50,6 +50,8 @@ pub struct ThreadsPage { pub struct ExtractionOutcome { /// The extracted thread metadata. pub metadata: ThreadMetadata, + /// Searchable conversation text extracted from rollout history. + pub search_text: Vec, /// The explicit thread memory mode from rollout metadata, if present. pub memory_mode: Option, /// The number of rollout lines that failed to parse. diff --git a/codex-rs/state/src/runtime/backfill.rs b/codex-rs/state/src/runtime/backfill.rs index 2bfec0a88f..b189e79b63 100644 --- a/codex-rs/state/src/runtime/backfill.rs +++ b/codex-rs/state/src/runtime/backfill.rs @@ -107,6 +107,115 @@ WHERE id = 1 r#" INSERT INTO backfill_state (id, status, last_watermark, last_success_at, updated_at) VALUES (?, ?, NULL, NULL, ?) +ON CONFLICT(id) DO NOTHING + "#, + ) + .bind(1_i64) + .bind(crate::BackfillStatus::Pending.as_str()) + .bind(Utc::now().timestamp()) + .execute(self.pool.as_ref()) + .await?; + Ok(()) + } + + /// Read the persisted background thread-search backfill state. + pub async fn get_thread_search_backfill_state(&self) -> anyhow::Result { + self.ensure_thread_search_backfill_state_row().await?; + let row = sqlx::query( + r#" +SELECT status, last_watermark, last_success_at +FROM thread_search_backfill_state +WHERE id = 1 + "#, + ) + .fetch_one(self.pool.as_ref()) + .await?; + crate::BackfillState::try_from_row(&row) + } + + /// Attempt to claim ownership of the background thread-search backfill. + pub async fn try_claim_thread_search_backfill( + &self, + lease_seconds: i64, + ) -> anyhow::Result { + self.ensure_thread_search_backfill_state_row().await?; + let now = Utc::now().timestamp(); + let lease_cutoff = now.saturating_sub(lease_seconds.max(0)); + let result = sqlx::query( + r#" +UPDATE thread_search_backfill_state +SET status = ?, updated_at = ? +WHERE id = 1 + AND status != ? + AND (status != ? OR updated_at <= ?) + "#, + ) + .bind(crate::BackfillStatus::Running.as_str()) + .bind(now) + .bind(crate::BackfillStatus::Complete.as_str()) + .bind(crate::BackfillStatus::Running.as_str()) + .bind(lease_cutoff) + .execute(self.pool.as_ref()) + .await?; + Ok(result.rows_affected() == 1) + } + + /// Persist background thread-search backfill progress. + pub async fn checkpoint_thread_search_backfill(&self, watermark: &str) -> anyhow::Result<()> { + self.ensure_thread_search_backfill_state_row().await?; + sqlx::query( + r#" +UPDATE thread_search_backfill_state +SET status = ?, last_watermark = ?, updated_at = ? +WHERE id = 1 + "#, + ) + .bind(crate::BackfillStatus::Running.as_str()) + .bind(watermark) + .bind(Utc::now().timestamp()) + .execute(self.pool.as_ref()) + .await?; + Ok(()) + } + + /// Mark the background thread-search backfill as complete. + pub async fn mark_thread_search_backfill_complete( + &self, + last_watermark: Option<&str>, + ) -> anyhow::Result<()> { + self.ensure_thread_search_backfill_state_row().await?; + let now = Utc::now().timestamp(); + sqlx::query( + r#" +UPDATE thread_search_backfill_state +SET + status = ?, + last_watermark = COALESCE(?, last_watermark), + last_success_at = ?, + updated_at = ? +WHERE id = 1 + "#, + ) + .bind(crate::BackfillStatus::Complete.as_str()) + .bind(last_watermark) + .bind(now) + .bind(now) + .execute(self.pool.as_ref()) + .await?; + Ok(()) + } + + async fn ensure_thread_search_backfill_state_row(&self) -> anyhow::Result<()> { + sqlx::query( + r#" +INSERT INTO thread_search_backfill_state ( + id, + status, + last_watermark, + last_success_at, + updated_at +) +VALUES (?, ?, NULL, NULL, ?) ON CONFLICT(id) DO NOTHING "#, ) diff --git a/codex-rs/state/src/runtime/threads.rs b/codex-rs/state/src/runtime/threads.rs index 925fb45288..a6a2825cba 100644 --- a/codex-rs/state/src/runtime/threads.rs +++ b/codex-rs/state/src/runtime/threads.rs @@ -584,6 +584,51 @@ ON CONFLICT(id) DO NOTHING Ok(result.rows_affected() > 0) } + /// Replace all indexed conversation text for a thread. + pub async fn replace_thread_search_text( + &self, + thread_id: ThreadId, + chunks: &[String], + ) -> anyhow::Result<()> { + let thread_id = thread_id.to_string(); + let mut tx = self.pool.begin().await?; + sqlx::query("DELETE FROM thread_search WHERE thread_id = ?") + .bind(thread_id.as_str()) + .execute(&mut *tx) + .await?; + for chunk in chunks { + sqlx::query("INSERT INTO thread_search (thread_id, body) VALUES (?, ?)") + .bind(thread_id.as_str()) + .bind(chunk.as_str()) + .execute(&mut *tx) + .await?; + } + tx.commit().await?; + Ok(()) + } + + /// Append newly persisted conversation text to the thread search index. + pub async fn append_thread_search_text( + &self, + thread_id: ThreadId, + chunks: &[String], + ) -> anyhow::Result<()> { + if chunks.is_empty() { + return Ok(()); + } + let thread_id = thread_id.to_string(); + let mut tx = self.pool.begin().await?; + for chunk in chunks { + sqlx::query("INSERT INTO thread_search (thread_id, body) VALUES (?, ?)") + .bind(thread_id.as_str()) + .bind(chunk.as_str()) + .execute(&mut *tx) + .await?; + } + tx.commit().await?; + Ok(()) + } + pub async fn touch_thread_updated_at( &self, thread_id: ThreadId, @@ -876,6 +921,9 @@ ON CONFLICT(thread_id, position) DO NOTHING self.upsert_thread(&metadata).await }; upsert_result?; + let search_text = crate::thread_search_text_from_rollout_items(items); + self.append_thread_search_text(builder.id, search_text.as_slice()) + .await?; if let Some(memory_mode) = extract_memory_mode(items) && let Err(err) = self .set_thread_memory_mode(builder.id, memory_mode.as_str()) @@ -943,10 +991,17 @@ ON CONFLICT(thread_id, position) DO NOTHING /// Delete a thread metadata row by id. pub async fn delete_thread(&self, thread_id: ThreadId) -> anyhow::Result { - let result = sqlx::query("DELETE FROM threads WHERE id = ?") - .bind(thread_id.to_string()) - .execute(self.pool.as_ref()) + let thread_id = thread_id.to_string(); + let mut tx = self.pool.begin().await?; + sqlx::query("DELETE FROM thread_search WHERE thread_id = ?") + .bind(thread_id.as_str()) + .execute(&mut *tx) .await?; + let result = sqlx::query("DELETE FROM threads WHERE id = ?") + .bind(thread_id.as_str()) + .execute(&mut *tx) + .await?; + tx.commit().await?; Ok(result.rows_affected()) } } @@ -1105,7 +1160,15 @@ pub(super) fn push_thread_filters<'a>( builder.push_bind(search_term); builder.push(") > 0 OR instr(threads.preview, "); builder.push_bind(search_term); - builder.push(") > 0)"); + builder.push(") > 0"); + if let Some(search_query) = thread_search_query(search_term) { + builder.push( + " OR threads.id IN (SELECT thread_id FROM thread_search WHERE thread_search MATCH ", + ); + builder.push_bind(search_query); + builder.push(")"); + } + builder.push(")"); } if let Some(anchor) = anchor { let anchor_ts = datetime_to_epoch_millis(anchor.ts); @@ -1157,6 +1220,20 @@ fn metadata_preview(metadata: &crate::ThreadMetadata) -> &str { .unwrap_or_default() } +fn thread_search_query(search_term: &str) -> Option { + let terms = search_term + .split_whitespace() + .filter_map(|term| { + let term = term.trim(); + if term.is_empty() { + return None; + } + Some(format!("\"{}\"*", term.replace('"', "\"\""))) + }) + .collect::>(); + (!terms.is_empty()).then(|| terms.join(" AND ")) +} + #[cfg(test)] mod tests { use super::*;