Leverage state DB metadata for thread summaries (#10621)

Summary:
- read conversation summaries and cwd info from the state DB when
possible so we no longer rely on rollout files for metadata and avoid
extra I/O
- persist CLI version in thread metadata, surface it through summary
builders, and add the necessary DB migration hooks
- simplify thread listing by using enriched state DB data directly
rather than reading rollout heads

Testing:
- Not run (not requested)
This commit is contained in:
jif-oai
2026-02-05 16:39:11 +00:00
committed by GitHub
parent 68e82e5dc9
commit 9ee746afd6
14 changed files with 748 additions and 408 deletions

View File

@@ -186,7 +186,8 @@ use codex_core::rollout_date_parts;
use codex_core::sandboxing::SandboxPermissions;
use codex_core::skills::remote::download_remote_skill;
use codex_core::skills::remote::list_remote_skills;
use codex_core::state_db::get_state_db;
use codex_core::state_db::StateDbHandle;
use codex_core::state_db::open_if_present;
use codex_core::token_data::parse_id_token;
use codex_core::windows_sandbox::WindowsSandboxLevelExt;
use codex_feedback::CodexFeedback;
@@ -1976,7 +1977,11 @@ impl CodexMessageProcessor {
let rollout_path_display = archived_path.display().to_string();
let fallback_provider = self.config.model_provider_id.clone();
let state_db_ctx = get_state_db(&self.config, None).await;
let state_db_ctx = open_if_present(
&self.config.codex_home,
self.config.model_provider_id.as_str(),
)
.await;
let archived_folder = self
.config
.codex_home
@@ -2285,23 +2290,44 @@ impl CodexMessageProcessor {
}
};
let rollout_path =
match find_thread_path_by_id_str(&self.config.codex_home, &thread_uuid.to_string())
.await
{
Ok(Some(path)) => Some(path),
Ok(None) => None,
Err(err) => {
self.send_invalid_request_error(
request_id,
format!("failed to locate thread id {thread_uuid}: {err}"),
)
.await;
return;
}
};
let db_summary = read_summary_from_state_db_by_thread_id(&self.config, thread_uuid).await;
let mut rollout_path = db_summary.as_ref().map(|summary| summary.path.clone());
if rollout_path.is_none() || include_turns {
rollout_path =
match find_thread_path_by_id_str(&self.config.codex_home, &thread_uuid.to_string())
.await
{
Ok(Some(path)) => Some(path),
Ok(None) => {
if include_turns {
None
} else {
rollout_path
}
}
Err(err) => {
self.send_invalid_request_error(
request_id,
format!("failed to locate thread id {thread_uuid}: {err}"),
)
.await;
return;
}
};
}
let mut thread = if let Some(rollout_path) = rollout_path.as_ref() {
if include_turns && rollout_path.is_none() && db_summary.is_some() {
self.send_internal_error(
request_id,
format!("failed to locate rollout for thread {thread_uuid}"),
)
.await;
return;
}
let mut thread = if let Some(summary) = db_summary {
summary_to_thread(summary)
} else if let Some(rollout_path) = rollout_path.as_ref() {
let fallback_provider = self.config.model_provider_id.as_str();
match read_summary_from_rollout(rollout_path, fallback_provider).await {
Ok(summary) => summary_to_thread(summary),
@@ -2608,8 +2634,8 @@ impl CodexMessageProcessor {
developer_instructions,
} = params;
let rollout_path = if let Some(path) = path {
path
let (rollout_path, source_thread_id) = if let Some(path) = path {
(path, None)
} else {
let existing_thread_id = match ThreadId::from_string(&thread_id) {
Ok(id) => id,
@@ -2630,7 +2656,7 @@ impl CodexMessageProcessor {
)
.await
{
Ok(Some(p)) => p,
Ok(Some(p)) => (p, Some(existing_thread_id)),
Ok(None) => {
self.send_invalid_request_error(
request_id,
@@ -2650,14 +2676,9 @@ impl CodexMessageProcessor {
}
};
let history_cwd = match read_session_meta_line(&rollout_path).await {
Ok(meta_line) => Some(meta_line.meta.cwd),
Err(err) => {
let rollout_path = rollout_path.display();
warn!("failed to read session metadata from rollout {rollout_path}: {err}");
None
}
};
let history_cwd =
read_history_cwd_from_state_db(&self.config, source_thread_id, rollout_path.as_path())
.await;
// Persist windows sandbox feature.
let mut cli_overrides = cli_overrides.unwrap_or_default();
@@ -2807,6 +2828,15 @@ impl CodexMessageProcessor {
request_id: RequestId,
params: GetConversationSummaryParams,
) {
if let GetConversationSummaryParams::ThreadId { conversation_id } = &params
&& let Some(summary) =
read_summary_from_state_db_by_thread_id(&self.config, *conversation_id).await
{
let response = GetConversationSummaryResponse { summary };
self.outgoing.send_response(request_id, response).await;
return;
}
let path = match params {
GetConversationSummaryParams::RolloutPath { rollout_path } => {
if rollout_path.is_relative() {
@@ -2931,6 +2961,11 @@ impl CodexMessageProcessor {
let fallback_provider = self.config.model_provider_id.clone();
let (allowed_sources_vec, source_kind_filter) = compute_source_filters(source_kinds);
let allowed_sources = allowed_sources_vec.as_slice();
let state_db_ctx = open_if_present(
&self.config.codex_home,
self.config.model_provider_id.as_str(),
)
.await;
while remaining > 0 {
let page_size = remaining.min(THREAD_LIST_MAX_LIMIT);
@@ -2968,31 +3003,26 @@ impl CodexMessageProcessor {
})?
};
let mut filtered = page
.items
.into_iter()
.filter_map(|it| {
let updated_at = it.updated_at.clone();
let session_meta_line = it.head.first().and_then(|first| {
serde_json::from_value::<SessionMetaLine>(first.clone()).ok()
})?;
extract_conversation_summary(
it.path,
&it.head,
&session_meta_line.meta,
session_meta_line.git.as_ref(),
fallback_provider.as_str(),
updated_at,
)
})
.filter(|summary| {
source_kind_filter
.as_ref()
.is_none_or(|filter| source_kind_matches(&summary.source, filter))
})
.collect::<Vec<_>>();
if filtered.len() > remaining {
filtered.truncate(remaining);
let mut filtered = Vec::with_capacity(page.items.len());
for it in page.items {
let Some(summary) = summary_from_thread_list_item(
it,
fallback_provider.as_str(),
state_db_ctx.as_ref(),
)
.await
else {
continue;
};
if source_kind_filter
.as_ref()
.is_none_or(|filter| source_kind_matches(&summary.source, filter))
{
filtered.push(summary);
if filtered.len() >= remaining {
break;
}
}
}
items.extend(filtered);
remaining = requested_page_size.saturating_sub(items.len());
@@ -3581,13 +3611,13 @@ impl CodexMessageProcessor {
} = params;
// Derive a Config using the same logic as new conversation, honoring overrides if provided.
let rollout_path = if let Some(path) = path {
path
let (rollout_path, source_thread_id) = if let Some(path) = path {
(path, None)
} else if let Some(conversation_id) = conversation_id {
match find_thread_path_by_id_str(&self.config.codex_home, &conversation_id.to_string())
.await
{
Ok(Some(found_path)) => found_path,
Ok(Some(found_path)) => (found_path, Some(conversation_id)),
Ok(None) => {
self.send_invalid_request_error(
request_id,
@@ -3614,14 +3644,9 @@ impl CodexMessageProcessor {
return;
};
let history_cwd = match read_session_meta_line(&rollout_path).await {
Ok(meta_line) => Some(meta_line.meta.cwd),
Err(err) => {
let rollout_path = rollout_path.display();
warn!("failed to read session metadata from rollout {rollout_path}: {err}");
None
}
};
let history_cwd =
read_history_cwd_from_state_db(&self.config, source_thread_id, rollout_path.as_path())
.await;
let (typesafe_overrides, request_overrides) = match overrides {
Some(overrides) => {
@@ -3909,7 +3934,11 @@ impl CodexMessageProcessor {
}
if state_db_ctx.is_none() {
state_db_ctx = get_state_db(&self.config, None).await;
state_db_ctx = open_if_present(
&self.config.codex_home,
self.config.model_provider_id.as_str(),
)
.await;
}
// Move the rollout file to archived.
@@ -5094,6 +5123,168 @@ async fn derive_config_for_cwd(
.await
}
async fn read_history_cwd_from_state_db(
config: &Config,
thread_id: Option<ThreadId>,
rollout_path: &Path,
) -> Option<PathBuf> {
if let Some(state_db_ctx) =
open_if_present(&config.codex_home, config.model_provider_id.as_str()).await
&& let Some(thread_id) = thread_id
&& let Ok(Some(metadata)) = state_db_ctx.get_thread(thread_id).await
{
return Some(metadata.cwd);
}
match read_session_meta_line(rollout_path).await {
Ok(meta_line) => Some(meta_line.meta.cwd),
Err(err) => {
let rollout_path = rollout_path.display();
warn!("failed to read session metadata from rollout {rollout_path}: {err}");
None
}
}
}
async fn read_summary_from_state_db_by_thread_id(
config: &Config,
thread_id: ThreadId,
) -> Option<ConversationSummary> {
let state_db_ctx = open_if_present(&config.codex_home, config.model_provider_id.as_str()).await;
read_summary_from_state_db_context_by_thread_id(state_db_ctx.as_ref(), thread_id).await
}
async fn read_summary_from_state_db_context_by_thread_id(
state_db_ctx: Option<&StateDbHandle>,
thread_id: ThreadId,
) -> Option<ConversationSummary> {
let state_db_ctx = state_db_ctx?;
let metadata = match state_db_ctx.get_thread(thread_id).await {
Ok(Some(metadata)) => metadata,
Ok(None) | Err(_) => return None,
};
Some(summary_from_state_db_metadata(
metadata.id,
metadata.rollout_path,
metadata.first_user_message,
metadata
.created_at
.to_rfc3339_opts(SecondsFormat::Secs, true),
metadata
.updated_at
.to_rfc3339_opts(SecondsFormat::Secs, true),
metadata.model_provider,
metadata.cwd,
metadata.cli_version,
metadata.source,
metadata.git_sha,
metadata.git_branch,
metadata.git_origin_url,
))
}
async fn summary_from_thread_list_item(
it: codex_core::ThreadItem,
fallback_provider: &str,
state_db_ctx: Option<&StateDbHandle>,
) -> Option<ConversationSummary> {
if let Some(thread_id) = it.thread_id {
let timestamp = it.created_at.clone();
let updated_at = it.updated_at.clone().or_else(|| timestamp.clone());
let model_provider = it
.model_provider
.clone()
.unwrap_or_else(|| fallback_provider.to_string());
let cwd = it.cwd?;
let cli_version = it.cli_version.unwrap_or_default();
let source = it
.source
.unwrap_or(codex_protocol::protocol::SessionSource::Unknown);
return Some(ConversationSummary {
conversation_id: thread_id,
path: it.path,
preview: it.first_user_message.unwrap_or_default(),
timestamp,
updated_at,
model_provider,
cwd,
cli_version,
source,
git_info: if it.git_sha.is_none()
&& it.git_branch.is_none()
&& it.git_origin_url.is_none()
{
None
} else {
Some(ConversationGitInfo {
sha: it.git_sha,
branch: it.git_branch,
origin_url: it.git_origin_url,
})
},
});
}
if let Some(thread_id) = thread_id_from_rollout_path(it.path.as_path()) {
return read_summary_from_state_db_context_by_thread_id(state_db_ctx, thread_id).await;
}
None
}
fn thread_id_from_rollout_path(path: &Path) -> Option<ThreadId> {
let file_name = path.file_name()?.to_str()?;
let stem = file_name.strip_suffix(".jsonl")?;
if stem.len() < 37 {
return None;
}
let uuid_start = stem.len().saturating_sub(36);
if !stem[..uuid_start].ends_with('-') {
return None;
}
ThreadId::from_string(&stem[uuid_start..]).ok()
}
#[allow(clippy::too_many_arguments)]
fn summary_from_state_db_metadata(
conversation_id: ThreadId,
path: PathBuf,
first_user_message: Option<String>,
timestamp: String,
updated_at: String,
model_provider: String,
cwd: PathBuf,
cli_version: String,
source: String,
git_sha: Option<String>,
git_branch: Option<String>,
git_origin_url: Option<String>,
) -> ConversationSummary {
let preview = first_user_message.unwrap_or_default();
let source = serde_json::from_value(serde_json::Value::String(source))
.unwrap_or(codex_protocol::protocol::SessionSource::Unknown);
let git_info = if git_sha.is_none() && git_branch.is_none() && git_origin_url.is_none() {
None
} else {
Some(ConversationGitInfo {
sha: git_sha,
branch: git_branch,
origin_url: git_origin_url,
})
};
ConversationSummary {
conversation_id,
path,
preview,
timestamp: Some(timestamp),
updated_at: Some(updated_at),
model_provider,
cwd,
cli_version,
source,
git_info,
}
}
pub(crate) async fn read_summary_from_rollout(
path: &Path,
fallback_provider: &str,