mirror of
https://github.com/openai/codex.git
synced 2026-05-23 20:44:50 +00:00
Fixes #20792 ## Why `/goal`-first threads are valid resumable threads, but they can be missing from `codex resume` and app recents because discovery depends on metadata derived from a normal first user message. PR #21489 attempted to fix this by using the goal objective as `first_user_message`. Review feedback pointed out that `first_user_message` does more than provide visible text today: it gates listing, supplies preview text, and participates in deciding whether a later title should surface as a distinct thread name. Reusing it for the goal objective could leave a `/goal`-first thread with `first_user_message=<goal>` and `title=<later prompt>`, even though the goal should only provide the initial visible preview. This PR follows that feedback by and keeps the `first_user_message` as is but introduces a new `preview` field to separate concerns. The `preview` field is populated from the first user message or the goal objective. We can extend it in the future to include other sources. ## What Changed - Added internal thread `preview` metadata in `codex-state`, including a SQLite migration that backfills from `first_user_message` and from existing `thread_goals` objectives when needed. - Treated `ThreadGoalUpdated` as preview-bearing metadata so goal-first threads can be listed and searched without mutating `first_user_message`. - Updated rollout listing, state queries, thread-store conversion, and app-server mapping to use preview metadata while continuing to expose the existing public `preview` field. - Preserved title/name distinctness behavior around literal `first_user_message`, so a later normal prompt after `/goal` does not surface as a separate name just because the goal supplied the initial preview. - Preserved compatibility for older/internal metadata writes by deriving preview from `first_user_message` when explicit preview metadata is absent. ## Verification - Manually verified that a thread that starts with a `/goal <objective>` shows up in the resume picker.
436 lines
15 KiB
Rust
436 lines
15 KiB
Rust
use std::collections::HashMap;
|
|
use std::collections::HashSet;
|
|
|
|
use codex_protocol::ThreadId;
|
|
use codex_rollout::RolloutConfig;
|
|
use codex_rollout::RolloutRecorder;
|
|
use codex_rollout::find_thread_names_by_ids;
|
|
use codex_rollout::parse_cursor;
|
|
|
|
use super::LocalThreadStore;
|
|
use super::helpers::distinct_thread_metadata_title;
|
|
use super::helpers::set_thread_name_from_title;
|
|
use super::helpers::stored_thread_from_rollout_item;
|
|
use crate::ListThreadsParams;
|
|
use crate::SortDirection;
|
|
use crate::ThreadPage;
|
|
use crate::ThreadSortKey;
|
|
use crate::ThreadStoreError;
|
|
use crate::ThreadStoreResult;
|
|
|
|
pub(super) async fn list_threads(
|
|
store: &LocalThreadStore,
|
|
params: ListThreadsParams,
|
|
) -> ThreadStoreResult<ThreadPage> {
|
|
let cursor = params
|
|
.cursor
|
|
.as_deref()
|
|
.map(|cursor| {
|
|
parse_cursor(cursor).ok_or_else(|| ThreadStoreError::InvalidRequest {
|
|
message: format!("invalid cursor: {cursor}"),
|
|
})
|
|
})
|
|
.transpose()?;
|
|
let sort_key = match params.sort_key {
|
|
ThreadSortKey::CreatedAt => codex_rollout::ThreadSortKey::CreatedAt,
|
|
ThreadSortKey::UpdatedAt => codex_rollout::ThreadSortKey::UpdatedAt,
|
|
};
|
|
let sort_direction = match params.sort_direction {
|
|
SortDirection::Asc => codex_rollout::SortDirection::Asc,
|
|
SortDirection::Desc => codex_rollout::SortDirection::Desc,
|
|
};
|
|
let state_db = store.state_db().await;
|
|
let rollout_config = RolloutConfig {
|
|
codex_home: store.config.codex_home.clone(),
|
|
sqlite_home: store.config.sqlite_home.clone(),
|
|
cwd: store.config.codex_home.clone(),
|
|
model_provider_id: store.config.default_model_provider_id.clone(),
|
|
generate_memories: false,
|
|
};
|
|
let page = list_rollout_threads(
|
|
state_db,
|
|
&rollout_config,
|
|
store.config.default_model_provider_id.as_str(),
|
|
¶ms,
|
|
cursor.as_ref(),
|
|
sort_key,
|
|
sort_direction,
|
|
)
|
|
.await?;
|
|
|
|
let next_cursor = page
|
|
.next_cursor
|
|
.as_ref()
|
|
.and_then(|cursor| serde_json::to_value(cursor).ok())
|
|
.and_then(|value| value.as_str().map(str::to_owned));
|
|
let mut items = page
|
|
.items
|
|
.into_iter()
|
|
.filter_map(|item| {
|
|
stored_thread_from_rollout_item(
|
|
item,
|
|
params.archived,
|
|
store.config.default_model_provider_id.as_str(),
|
|
)
|
|
})
|
|
.collect::<Vec<_>>();
|
|
|
|
let thread_ids = items
|
|
.iter()
|
|
.map(|thread| thread.thread_id)
|
|
.collect::<HashSet<_>>();
|
|
let mut names = HashMap::<ThreadId, String>::with_capacity(thread_ids.len());
|
|
if let Some(state_db_ctx) = store.state_db().await {
|
|
for &thread_id in &thread_ids {
|
|
let Ok(Some(metadata)) = state_db_ctx.get_thread(thread_id).await else {
|
|
continue;
|
|
};
|
|
if let Some(title) = distinct_thread_metadata_title(&metadata) {
|
|
names.insert(thread_id, title);
|
|
}
|
|
}
|
|
}
|
|
if names.len() < thread_ids.len()
|
|
&& let Ok(legacy_names) =
|
|
find_thread_names_by_ids(store.config.codex_home.as_path(), &thread_ids).await
|
|
{
|
|
for (thread_id, title) in legacy_names {
|
|
names.entry(thread_id).or_insert(title);
|
|
}
|
|
}
|
|
for thread in &mut items {
|
|
if let Some(title) = names.get(&thread.thread_id).cloned() {
|
|
set_thread_name_from_title(thread, title);
|
|
}
|
|
}
|
|
|
|
Ok(ThreadPage { items, next_cursor })
|
|
}
|
|
|
|
async fn list_rollout_threads(
|
|
state_db: Option<codex_rollout::StateDbHandle>,
|
|
config: &RolloutConfig,
|
|
default_model_provider_id: &str,
|
|
params: &ListThreadsParams,
|
|
cursor: Option<&codex_rollout::Cursor>,
|
|
sort_key: codex_rollout::ThreadSortKey,
|
|
sort_direction: codex_rollout::SortDirection,
|
|
) -> ThreadStoreResult<codex_rollout::ThreadsPage> {
|
|
let page = if params.use_state_db_only && params.archived {
|
|
RolloutRecorder::list_archived_threads_from_state_db(
|
|
state_db,
|
|
config,
|
|
params.page_size,
|
|
cursor,
|
|
sort_key,
|
|
sort_direction,
|
|
params.allowed_sources.as_slice(),
|
|
params.model_providers.as_deref(),
|
|
params.cwd_filters.as_deref(),
|
|
default_model_provider_id,
|
|
params.search_term.as_deref(),
|
|
)
|
|
.await
|
|
} else if params.use_state_db_only {
|
|
RolloutRecorder::list_threads_from_state_db(
|
|
state_db,
|
|
config,
|
|
params.page_size,
|
|
cursor,
|
|
sort_key,
|
|
sort_direction,
|
|
params.allowed_sources.as_slice(),
|
|
params.model_providers.as_deref(),
|
|
params.cwd_filters.as_deref(),
|
|
default_model_provider_id,
|
|
params.search_term.as_deref(),
|
|
)
|
|
.await
|
|
} else if params.archived {
|
|
RolloutRecorder::list_archived_threads(
|
|
state_db,
|
|
config,
|
|
params.page_size,
|
|
cursor,
|
|
sort_key,
|
|
sort_direction,
|
|
params.allowed_sources.as_slice(),
|
|
params.model_providers.as_deref(),
|
|
params.cwd_filters.as_deref(),
|
|
default_model_provider_id,
|
|
params.search_term.as_deref(),
|
|
)
|
|
.await
|
|
} else {
|
|
RolloutRecorder::list_threads(
|
|
state_db,
|
|
config,
|
|
params.page_size,
|
|
cursor,
|
|
sort_key,
|
|
sort_direction,
|
|
params.allowed_sources.as_slice(),
|
|
params.model_providers.as_deref(),
|
|
params.cwd_filters.as_deref(),
|
|
default_model_provider_id,
|
|
params.search_term.as_deref(),
|
|
)
|
|
.await
|
|
};
|
|
page.map_err(|err| ThreadStoreError::Internal {
|
|
message: format!("failed to list threads: {err}"),
|
|
})
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use chrono::Utc;
|
|
use codex_protocol::ThreadId;
|
|
use codex_protocol::protocol::SessionSource;
|
|
use pretty_assertions::assert_eq;
|
|
use std::fs;
|
|
use tempfile::TempDir;
|
|
use uuid::Uuid;
|
|
|
|
use super::*;
|
|
use crate::ThreadStore;
|
|
use crate::local::LocalThreadStore;
|
|
use crate::local::test_support::test_config;
|
|
use crate::local::test_support::write_archived_session_file;
|
|
use crate::local::test_support::write_session_file;
|
|
use crate::local::test_support::write_session_file_with;
|
|
|
|
#[tokio::test]
|
|
async fn list_threads_uses_default_provider_when_rollout_omits_provider() {
|
|
let home = TempDir::new().expect("temp dir");
|
|
let store = LocalThreadStore::new(test_config(home.path()), /*state_db*/ None);
|
|
write_session_file_with(
|
|
home.path(),
|
|
home.path().join("sessions/2025/01/03"),
|
|
"2025-01-03T12-00-00",
|
|
Uuid::from_u128(102),
|
|
"Hello from user",
|
|
/*model_provider*/ None,
|
|
)
|
|
.expect("session file");
|
|
|
|
let page = store
|
|
.list_threads(ListThreadsParams {
|
|
page_size: 10,
|
|
cursor: None,
|
|
sort_key: ThreadSortKey::CreatedAt,
|
|
sort_direction: SortDirection::Desc,
|
|
allowed_sources: Vec::new(),
|
|
model_providers: None,
|
|
cwd_filters: None,
|
|
archived: false,
|
|
search_term: None,
|
|
use_state_db_only: false,
|
|
})
|
|
.await
|
|
.expect("thread listing");
|
|
|
|
assert_eq!(page.items.len(), 1);
|
|
assert_eq!(page.items[0].model_provider, "test-provider");
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn list_threads_preserves_sqlite_title_search_results() {
|
|
let home = TempDir::new().expect("temp dir");
|
|
let config = test_config(home.path());
|
|
let uuid = Uuid::from_u128(103);
|
|
let thread_id = ThreadId::from_string(&uuid.to_string()).expect("valid thread id");
|
|
let rollout_path = home.path().join("rollout-title-search.jsonl");
|
|
fs::write(&rollout_path, "").expect("placeholder rollout file");
|
|
|
|
let runtime = codex_state::StateRuntime::init(
|
|
home.path().to_path_buf(),
|
|
config.default_model_provider_id.clone(),
|
|
)
|
|
.await
|
|
.expect("state db should initialize");
|
|
let store = LocalThreadStore::new(config.clone(), Some(runtime.clone()));
|
|
runtime
|
|
.mark_backfill_complete(/*last_watermark*/ None)
|
|
.await
|
|
.expect("backfill should be complete");
|
|
let created_at = Utc::now();
|
|
let mut builder = codex_state::ThreadMetadataBuilder::new(
|
|
thread_id,
|
|
rollout_path,
|
|
created_at,
|
|
SessionSource::Cli,
|
|
);
|
|
builder.model_provider = Some(config.default_model_provider_id.clone());
|
|
builder.cwd = home.path().to_path_buf();
|
|
builder.cli_version = Some("test_version".to_string());
|
|
let mut metadata = builder.build(config.default_model_provider_id.as_str());
|
|
metadata.title = "needle title".to_string();
|
|
metadata.first_user_message = Some("plain preview".to_string());
|
|
metadata.preview = metadata.first_user_message.clone();
|
|
runtime
|
|
.upsert_thread(&metadata)
|
|
.await
|
|
.expect("state db upsert should succeed");
|
|
|
|
let page = store
|
|
.list_threads(ListThreadsParams {
|
|
page_size: 10,
|
|
cursor: None,
|
|
sort_key: ThreadSortKey::CreatedAt,
|
|
sort_direction: SortDirection::Desc,
|
|
allowed_sources: Vec::new(),
|
|
model_providers: None,
|
|
cwd_filters: None,
|
|
archived: false,
|
|
search_term: Some("needle".to_string()),
|
|
use_state_db_only: true,
|
|
})
|
|
.await
|
|
.expect("thread listing");
|
|
|
|
let ids = page
|
|
.items
|
|
.iter()
|
|
.map(|item| item.thread_id)
|
|
.collect::<Vec<_>>();
|
|
assert_eq!(ids, vec![thread_id]);
|
|
assert_eq!(
|
|
page.items[0].first_user_message.as_deref(),
|
|
Some("plain preview")
|
|
);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn list_threads_selects_active_or_archived_collection() {
|
|
let home = TempDir::new().expect("temp dir");
|
|
let store = LocalThreadStore::new(test_config(home.path()), /*state_db*/ None);
|
|
let active_uuid = Uuid::from_u128(105);
|
|
let archived_uuid = Uuid::from_u128(106);
|
|
write_session_file(home.path(), "2025-01-03T12-00-00", active_uuid)
|
|
.expect("active session file");
|
|
write_archived_session_file(home.path(), "2025-01-03T13-00-00", archived_uuid)
|
|
.expect("archived session file");
|
|
|
|
let active = store
|
|
.list_threads(ListThreadsParams {
|
|
page_size: 10,
|
|
cursor: None,
|
|
sort_key: ThreadSortKey::CreatedAt,
|
|
sort_direction: SortDirection::Desc,
|
|
allowed_sources: Vec::new(),
|
|
model_providers: None,
|
|
cwd_filters: None,
|
|
archived: false,
|
|
search_term: None,
|
|
use_state_db_only: false,
|
|
})
|
|
.await
|
|
.expect("active listing");
|
|
let archived = store
|
|
.list_threads(ListThreadsParams {
|
|
page_size: 10,
|
|
cursor: None,
|
|
sort_key: ThreadSortKey::CreatedAt,
|
|
sort_direction: SortDirection::Desc,
|
|
allowed_sources: Vec::new(),
|
|
model_providers: None,
|
|
cwd_filters: None,
|
|
archived: true,
|
|
search_term: None,
|
|
use_state_db_only: false,
|
|
})
|
|
.await
|
|
.expect("archived listing");
|
|
|
|
let active_id = ThreadId::from_string(&active_uuid.to_string()).expect("valid thread id");
|
|
let archived_id =
|
|
ThreadId::from_string(&archived_uuid.to_string()).expect("valid thread id");
|
|
assert_eq!(
|
|
active
|
|
.items
|
|
.iter()
|
|
.map(|item| item.thread_id)
|
|
.collect::<Vec<_>>(),
|
|
vec![active_id]
|
|
);
|
|
assert_eq!(
|
|
archived
|
|
.items
|
|
.iter()
|
|
.map(|item| item.thread_id)
|
|
.collect::<Vec<_>>(),
|
|
vec![archived_id]
|
|
);
|
|
assert_eq!(active.items[0].archived_at, None);
|
|
assert_eq!(
|
|
archived.items[0].archived_at,
|
|
Some(archived.items[0].updated_at)
|
|
);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn list_threads_returns_local_rollout_summary() {
|
|
let home = TempDir::new().expect("temp dir");
|
|
let config = test_config(home.path());
|
|
let store = LocalThreadStore::new(config, /*state_db*/ None);
|
|
let uuid = Uuid::from_u128(101);
|
|
let path =
|
|
write_session_file(home.path(), "2025-01-03T12-00-00", uuid).expect("session file");
|
|
|
|
let page = store
|
|
.list_threads(ListThreadsParams {
|
|
page_size: 10,
|
|
cursor: None,
|
|
sort_key: ThreadSortKey::CreatedAt,
|
|
sort_direction: SortDirection::Desc,
|
|
allowed_sources: vec![SessionSource::Cli],
|
|
model_providers: Some(vec!["test-provider".to_string()]),
|
|
cwd_filters: None,
|
|
archived: false,
|
|
search_term: None,
|
|
use_state_db_only: false,
|
|
})
|
|
.await
|
|
.expect("thread listing");
|
|
|
|
let thread_id = ThreadId::from_string(&uuid.to_string()).expect("valid thread id");
|
|
assert_eq!(page.next_cursor, None);
|
|
assert_eq!(page.items.len(), 1);
|
|
assert_eq!(page.items[0].thread_id, thread_id);
|
|
assert_eq!(page.items[0].rollout_path, Some(path));
|
|
assert_eq!(page.items[0].preview, "Hello from user");
|
|
assert_eq!(
|
|
page.items[0].first_user_message.as_deref(),
|
|
Some("Hello from user")
|
|
);
|
|
assert_eq!(page.items[0].model_provider, "test-provider");
|
|
assert_eq!(page.items[0].cli_version, "test_version");
|
|
assert_eq!(page.items[0].source, SessionSource::Cli);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn list_threads_rejects_invalid_cursor() {
|
|
let home = TempDir::new().expect("temp dir");
|
|
let store = LocalThreadStore::new(test_config(home.path()), /*state_db*/ None);
|
|
|
|
let err = store
|
|
.list_threads(ListThreadsParams {
|
|
page_size: 10,
|
|
cursor: Some("not-a-cursor".to_string()),
|
|
sort_key: ThreadSortKey::CreatedAt,
|
|
sort_direction: SortDirection::Desc,
|
|
allowed_sources: Vec::new(),
|
|
model_providers: None,
|
|
cwd_filters: None,
|
|
archived: false,
|
|
search_term: None,
|
|
use_state_db_only: false,
|
|
})
|
|
.await
|
|
.expect_err("invalid cursor should fail");
|
|
|
|
assert!(matches!(err, ThreadStoreError::InvalidRequest { .. }));
|
|
}
|
|
}
|