mirror of
https://github.com/openai/codex.git
synced 2026-04-24 22:54:54 +00:00
feat(app-server, core): return threads by created_at or updated_at (#9247)
Add support for returning threads by either `created_at` OR `updated_at` descending. Previously core always returned threads ordered by `created_at`. This PR: - updates core to be able to list threads by `updated_at` OR `created_at` descending based on what the caller wants - also update `thread/list` in app-server to expose this (default to `created_at` if not specified) All existing codepaths (app-server, TUI) still default to `created_at`, so no behavior change is expected with this PR. **Implementation** To sort by `updated_at` is a bit nontrivial (whereas `created_at` is easy due to the way we structure the folders and filenames on disk, which are all based on `created_at`). The most naive way to do this without introducing a cache file or sqlite DB (which we have to implement/maintain) is to scan files in reverse `created_at` order on disk, and look at the file's mtime (last modified timestamp according to the filesystem) until we reach `MAX_SCAN_FILES` (currently set to 10,000). Then, we can return the most recent N threads. Based on some quick and dirty benchmarking on my machine with ~1000 rollout files, calling `thread/list` with limit 50, the `updated_at` path is slower as expected due to all the I/O: - updated-at: average 103.10 ms - created-at: average 41.10 ms Those absolute numbers aren't a big deal IMO, but we can certainly optimize this in a followup if needed by introducing more state stored on disk. **Caveat** There's also a limitation in that any files older than `MAX_SCAN_FILES` will be excluded, which means if a user continues a REALLY old thread, it's possible to not be included. In practice that should not be too big of an issue. If a user makes... - 1000 rollouts/day → threads older than 10 days won't show up - 100 rollouts/day → ~100 days If this becomes a problem for some reason, even more motivation to implement an updated_at cache.
This commit is contained in:
@@ -6,6 +6,7 @@ use crate::models::supported_models;
|
||||
use crate::outgoing_message::OutgoingMessageSender;
|
||||
use crate::outgoing_message::OutgoingNotification;
|
||||
use chrono::DateTime;
|
||||
use chrono::SecondsFormat;
|
||||
use chrono::Utc;
|
||||
use codex_app_server_protocol::Account;
|
||||
use codex_app_server_protocol::AccountLoginCompletedNotification;
|
||||
@@ -99,6 +100,7 @@ use codex_app_server_protocol::ThreadLoadedListResponse;
|
||||
use codex_app_server_protocol::ThreadResumeParams;
|
||||
use codex_app_server_protocol::ThreadResumeResponse;
|
||||
use codex_app_server_protocol::ThreadRollbackParams;
|
||||
use codex_app_server_protocol::ThreadSortKey;
|
||||
use codex_app_server_protocol::ThreadStartParams;
|
||||
use codex_app_server_protocol::ThreadStartResponse;
|
||||
use codex_app_server_protocol::ThreadStartedNotification;
|
||||
@@ -123,6 +125,7 @@ use codex_core::NewThread;
|
||||
use codex_core::RolloutRecorder;
|
||||
use codex_core::SessionMeta;
|
||||
use codex_core::ThreadManager;
|
||||
use codex_core::ThreadSortKey as CoreThreadSortKey;
|
||||
use codex_core::auth::CLIENT_ID;
|
||||
use codex_core::auth::login_with_api_key;
|
||||
use codex_core::config::Config;
|
||||
@@ -1598,6 +1601,7 @@ impl CodexMessageProcessor {
|
||||
let ThreadListParams {
|
||||
cursor,
|
||||
limit,
|
||||
sort_key,
|
||||
model_providers,
|
||||
} = params;
|
||||
|
||||
@@ -1605,8 +1609,12 @@ impl CodexMessageProcessor {
|
||||
.map(|value| value as usize)
|
||||
.unwrap_or(THREAD_LIST_DEFAULT_LIMIT)
|
||||
.clamp(1, THREAD_LIST_MAX_LIMIT);
|
||||
let core_sort_key = match sort_key.unwrap_or(ThreadSortKey::CreatedAt) {
|
||||
ThreadSortKey::CreatedAt => CoreThreadSortKey::CreatedAt,
|
||||
ThreadSortKey::UpdatedAt => CoreThreadSortKey::UpdatedAt,
|
||||
};
|
||||
let (summaries, next_cursor) = match self
|
||||
.list_threads_common(requested_page_size, cursor, model_providers)
|
||||
.list_threads_common(requested_page_size, cursor, model_providers, core_sort_key)
|
||||
.await
|
||||
{
|
||||
Ok(r) => r,
|
||||
@@ -2171,7 +2179,12 @@ impl CodexMessageProcessor {
|
||||
.clamp(1, THREAD_LIST_MAX_LIMIT);
|
||||
|
||||
match self
|
||||
.list_threads_common(requested_page_size, cursor, model_providers)
|
||||
.list_threads_common(
|
||||
requested_page_size,
|
||||
cursor,
|
||||
model_providers,
|
||||
CoreThreadSortKey::UpdatedAt,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok((items, next_cursor)) => {
|
||||
@@ -2189,8 +2202,18 @@ impl CodexMessageProcessor {
|
||||
requested_page_size: usize,
|
||||
cursor: Option<String>,
|
||||
model_providers: Option<Vec<String>>,
|
||||
sort_key: CoreThreadSortKey,
|
||||
) -> Result<(Vec<ConversationSummary>, Option<String>), JSONRPCErrorError> {
|
||||
let mut cursor_obj: Option<RolloutCursor> = cursor.as_ref().and_then(|s| parse_cursor(s));
|
||||
let mut cursor_obj: Option<RolloutCursor> = match cursor.as_ref() {
|
||||
Some(cursor_str) => {
|
||||
Some(parse_cursor(cursor_str).ok_or_else(|| JSONRPCErrorError {
|
||||
code: INVALID_REQUEST_ERROR_CODE,
|
||||
message: format!("invalid cursor: {cursor_str}"),
|
||||
data: None,
|
||||
})?)
|
||||
}
|
||||
None => None,
|
||||
};
|
||||
let mut last_cursor = cursor_obj.clone();
|
||||
let mut remaining = requested_page_size;
|
||||
let mut items = Vec::with_capacity(requested_page_size);
|
||||
@@ -2214,6 +2237,7 @@ impl CodexMessageProcessor {
|
||||
&self.config.codex_home,
|
||||
page_size,
|
||||
cursor_obj.as_ref(),
|
||||
sort_key,
|
||||
INTERACTIVE_SESSION_SOURCES,
|
||||
model_provider_filter.as_deref(),
|
||||
fallback_provider.as_str(),
|
||||
@@ -2229,6 +2253,7 @@ impl CodexMessageProcessor {
|
||||
.items
|
||||
.into_iter()
|
||||
.filter_map(|it| {
|
||||
let updated_at = it.updated_at.clone();
|
||||
let session_meta_line = it.head.first().and_then(|first| {
|
||||
serde_json::from_value::<SessionMetaLine>(first.clone()).ok()
|
||||
})?;
|
||||
@@ -2238,6 +2263,7 @@ impl CodexMessageProcessor {
|
||||
&session_meta_line.meta,
|
||||
session_meta_line.git.as_ref(),
|
||||
fallback_provider.as_str(),
|
||||
updated_at,
|
||||
)
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
@@ -3982,12 +4008,19 @@ pub(crate) async fn read_summary_from_rollout(
|
||||
git,
|
||||
} = session_meta_line;
|
||||
|
||||
let created_at = if session_meta.timestamp.is_empty() {
|
||||
None
|
||||
} else {
|
||||
Some(session_meta.timestamp.as_str())
|
||||
};
|
||||
let updated_at = read_updated_at(path, created_at).await;
|
||||
if let Some(summary) = extract_conversation_summary(
|
||||
path.to_path_buf(),
|
||||
&head,
|
||||
&session_meta,
|
||||
git.as_ref(),
|
||||
fallback_provider,
|
||||
updated_at.clone(),
|
||||
) {
|
||||
return Ok(summary);
|
||||
}
|
||||
@@ -4002,10 +4035,12 @@ pub(crate) async fn read_summary_from_rollout(
|
||||
.clone()
|
||||
.unwrap_or_else(|| fallback_provider.to_string());
|
||||
let git_info = git.as_ref().map(map_git_info);
|
||||
let updated_at = updated_at.or_else(|| timestamp.clone());
|
||||
|
||||
Ok(ConversationSummary {
|
||||
conversation_id: session_meta.id,
|
||||
timestamp,
|
||||
updated_at,
|
||||
path: path.to_path_buf(),
|
||||
preview: String::new(),
|
||||
model_provider,
|
||||
@@ -4040,6 +4075,7 @@ fn extract_conversation_summary(
|
||||
session_meta: &SessionMeta,
|
||||
git: Option<&CoreGitInfo>,
|
||||
fallback_provider: &str,
|
||||
updated_at: Option<String>,
|
||||
) -> Option<ConversationSummary> {
|
||||
let preview = head
|
||||
.iter()
|
||||
@@ -4065,10 +4101,12 @@ fn extract_conversation_summary(
|
||||
.clone()
|
||||
.unwrap_or_else(|| fallback_provider.to_string());
|
||||
let git_info = git.map(map_git_info);
|
||||
let updated_at = updated_at.or_else(|| timestamp.clone());
|
||||
|
||||
Some(ConversationSummary {
|
||||
conversation_id,
|
||||
timestamp,
|
||||
updated_at,
|
||||
path,
|
||||
preview: preview.to_string(),
|
||||
model_provider,
|
||||
@@ -4095,12 +4133,25 @@ fn parse_datetime(timestamp: Option<&str>) -> Option<DateTime<Utc>> {
|
||||
})
|
||||
}
|
||||
|
||||
async fn read_updated_at(path: &Path, created_at: Option<&str>) -> Option<String> {
|
||||
let updated_at = tokio::fs::metadata(path)
|
||||
.await
|
||||
.ok()
|
||||
.and_then(|meta| meta.modified().ok())
|
||||
.map(|modified| {
|
||||
let updated_at: DateTime<Utc> = modified.into();
|
||||
updated_at.to_rfc3339_opts(SecondsFormat::Secs, true)
|
||||
});
|
||||
updated_at.or_else(|| created_at.map(str::to_string))
|
||||
}
|
||||
|
||||
pub(crate) fn summary_to_thread(summary: ConversationSummary) -> Thread {
|
||||
let ConversationSummary {
|
||||
conversation_id,
|
||||
path,
|
||||
preview,
|
||||
timestamp,
|
||||
updated_at,
|
||||
model_provider,
|
||||
cwd,
|
||||
cli_version,
|
||||
@@ -4109,6 +4160,7 @@ pub(crate) fn summary_to_thread(summary: ConversationSummary) -> Thread {
|
||||
} = summary;
|
||||
|
||||
let created_at = parse_datetime(timestamp.as_deref());
|
||||
let updated_at = parse_datetime(updated_at.as_deref()).or(created_at);
|
||||
let git_info = git_info.map(|info| ApiGitInfo {
|
||||
sha: info.sha,
|
||||
branch: info.branch,
|
||||
@@ -4120,6 +4172,7 @@ pub(crate) fn summary_to_thread(summary: ConversationSummary) -> Thread {
|
||||
preview,
|
||||
model_provider,
|
||||
created_at: created_at.map(|dt| dt.timestamp()).unwrap_or(0),
|
||||
updated_at: updated_at.map(|dt| dt.timestamp()).unwrap_or(0),
|
||||
path,
|
||||
cwd,
|
||||
cli_version,
|
||||
@@ -4174,13 +4227,20 @@ mod tests {
|
||||
|
||||
let session_meta = serde_json::from_value::<SessionMeta>(head[0].clone())?;
|
||||
|
||||
let summary =
|
||||
extract_conversation_summary(path.clone(), &head, &session_meta, None, "test-provider")
|
||||
.expect("summary");
|
||||
let summary = extract_conversation_summary(
|
||||
path.clone(),
|
||||
&head,
|
||||
&session_meta,
|
||||
None,
|
||||
"test-provider",
|
||||
timestamp.clone(),
|
||||
)
|
||||
.expect("summary");
|
||||
|
||||
let expected = ConversationSummary {
|
||||
conversation_id,
|
||||
timestamp,
|
||||
timestamp: timestamp.clone(),
|
||||
updated_at: timestamp,
|
||||
path,
|
||||
preview: "Count to 5".to_string(),
|
||||
model_provider: "test-provider".to_string(),
|
||||
@@ -4200,6 +4260,7 @@ mod tests {
|
||||
use codex_protocol::protocol::RolloutLine;
|
||||
use codex_protocol::protocol::SessionMetaLine;
|
||||
use std::fs;
|
||||
use std::fs::FileTimes;
|
||||
|
||||
let temp_dir = TempDir::new()?;
|
||||
let path = temp_dir.path().join("rollout.jsonl");
|
||||
@@ -4223,12 +4284,19 @@ mod tests {
|
||||
};
|
||||
|
||||
fs::write(&path, format!("{}\n", serde_json::to_string(&line)?))?;
|
||||
let parsed = chrono::DateTime::parse_from_rfc3339(×tamp)?.with_timezone(&Utc);
|
||||
let times = FileTimes::new().set_modified(parsed.into());
|
||||
std::fs::OpenOptions::new()
|
||||
.append(true)
|
||||
.open(&path)?
|
||||
.set_times(times)?;
|
||||
|
||||
let summary = read_summary_from_rollout(path.as_path(), "fallback").await?;
|
||||
|
||||
let expected = ConversationSummary {
|
||||
conversation_id,
|
||||
timestamp: Some(timestamp),
|
||||
timestamp: Some(timestamp.clone()),
|
||||
updated_at: Some("2025-09-05T16:53:11Z".to_string()),
|
||||
path: path.clone(),
|
||||
preview: String::new(),
|
||||
model_provider: "fallback".to_string(),
|
||||
|
||||
Reference in New Issue
Block a user