Compare commits

...

7 Commits

Author SHA1 Message Date
David de Regt
0140e3bba7 build fix 2026-04-11 18:37:10 -04:00
David de Regt
a7337fe18f pr feedback/build issues 2026-04-11 14:47:10 -04:00
David de Regt
2b5f3d3baf fix rebase-from-main issues 2026-04-11 14:47:10 -04:00
David de Regt
b505ee8069 move completely from threadsortdirection to sortdirection, update readme 2026-04-11 14:47:10 -04:00
David de Regt
6d7a94e78b add a thread/turns/list call to paginate through turn data 2026-04-11 14:47:10 -04:00
David de Regt
b2f1ff81e8 added a backwards_cursor to let you sync forwards and backwards 2026-04-11 14:45:19 -04:00
David de Regt
ca7a98c5ea [draft] Add early proposal for sort order for thread/list
To improve performance on connection/reconnection to a remote appserver, we want to allow for delta syncs of the thread list, so that you only get what updated since you last connected (in short reconnections, expected to be zero items).  This requires adding an ascending sort option to the thread search (both the sqlite DB query and the filesystem-based query), and then adding a slightly different sortkey for the ascending sort to efficiently support these delta syncs.

Note: This was codexed up, though iterated on a few times to make the implementation as clean as made sense to my untrained eye (not used to this codebase).  There's still models at a few layers for the sortdirection that feel ugly but seems consistent with other search parameters so may be fine.  The biggest ugliness is in the non-sqlite filesystem-backed query mode, where the ascending sort is pretty gross, but I don't think there's a better answer there, and it seems like that path is an edge case.
2026-04-11 14:45:19 -04:00
46 changed files with 3185 additions and 136 deletions

View File

@@ -2569,6 +2569,13 @@
},
"type": "object"
},
"SortDirection": {
"enum": [
"asc",
"desc"
],
"type": "string"
},
"TextElement": {
"properties": {
"byteRange": {
@@ -2764,6 +2771,17 @@
"null"
]
},
"sortDirection": {
"anyOf": [
{
"$ref": "#/definitions/SortDirection"
},
{
"type": "null"
}
],
"description": "Optional sort direction; defaults to descending (newest first)."
},
"sortKey": {
"anyOf": [
{
@@ -3261,6 +3279,44 @@
],
"type": "string"
},
"ThreadTurnsListParams": {
"properties": {
"cursor": {
"description": "Opaque cursor to pass to the next call to continue after the last turn.",
"type": [
"string",
"null"
]
},
"limit": {
"description": "Optional turn page size.",
"format": "uint32",
"minimum": 0.0,
"type": [
"integer",
"null"
]
},
"sortDirection": {
"anyOf": [
{
"$ref": "#/definitions/SortDirection"
},
{
"type": "null"
}
],
"description": "Optional turn pagination direction; defaults to descending."
},
"threadId": {
"type": "string"
}
},
"required": [
"threadId"
],
"type": "object"
},
"ThreadUnarchiveParams": {
"properties": {
"threadId": {
@@ -3952,6 +4008,30 @@
"title": "Thread/readRequest",
"type": "object"
},
{
"properties": {
"id": {
"$ref": "#/definitions/RequestId"
},
"method": {
"enum": [
"thread/turns/list"
],
"title": "Thread/turns/listRequestMethod",
"type": "string"
},
"params": {
"$ref": "#/definitions/ThreadTurnsListParams"
}
},
"required": [
"id",
"method",
"params"
],
"title": "Thread/turns/listRequest",
"type": "object"
},
{
"properties": {
"id": {

View File

@@ -578,6 +578,30 @@
"title": "Thread/readRequest",
"type": "object"
},
{
"properties": {
"id": {
"$ref": "#/definitions/v2/RequestId"
},
"method": {
"enum": [
"thread/turns/list"
],
"title": "Thread/turns/listRequestMethod",
"type": "string"
},
"params": {
"$ref": "#/definitions/v2/ThreadTurnsListParams"
}
},
"required": [
"id",
"method",
"params"
],
"title": "Thread/turns/listRequest",
"type": "object"
},
{
"properties": {
"id": {
@@ -12332,6 +12356,13 @@
"title": "SkillsListResponse",
"type": "object"
},
"SortDirection": {
"enum": [
"asc",
"desc"
],
"type": "string"
},
"SubAgentSource": {
"oneOf": [
{
@@ -13543,6 +13574,17 @@
"null"
]
},
"sortDirection": {
"anyOf": [
{
"$ref": "#/definitions/v2/SortDirection"
},
{
"type": "null"
}
],
"description": "Optional sort direction; defaults to descending (newest first)."
},
"sortKey": {
"anyOf": [
{
@@ -13571,6 +13613,13 @@
"ThreadListResponse": {
"$schema": "http://json-schema.org/draft-07/schema#",
"properties": {
"backwardsCursor": {
"description": "Opaque cursor to pass as `cursor` when reversing `sortDirection`. This is only populated when the page contains at least one thread. Use it with the opposite `sortDirection`; for timestamp sorts it anchors at the start of the page timestamp so same-second updates are not skipped.",
"type": [
"string",
"null"
]
},
"data": {
"items": {
"$ref": "#/definitions/v2/Thread"
@@ -14585,6 +14634,76 @@
"title": "ThreadTokenUsageUpdatedNotification",
"type": "object"
},
"ThreadTurnsListParams": {
"$schema": "http://json-schema.org/draft-07/schema#",
"properties": {
"cursor": {
"description": "Opaque cursor to pass to the next call to continue after the last turn.",
"type": [
"string",
"null"
]
},
"limit": {
"description": "Optional turn page size.",
"format": "uint32",
"minimum": 0.0,
"type": [
"integer",
"null"
]
},
"sortDirection": {
"anyOf": [
{
"$ref": "#/definitions/v2/SortDirection"
},
{
"type": "null"
}
],
"description": "Optional turn pagination direction; defaults to descending."
},
"threadId": {
"type": "string"
}
},
"required": [
"threadId"
],
"title": "ThreadTurnsListParams",
"type": "object"
},
"ThreadTurnsListResponse": {
"$schema": "http://json-schema.org/draft-07/schema#",
"properties": {
"backwardsCursor": {
"description": "Opaque cursor to pass as `cursor` when reversing `sortDirection`. This is only populated when the page contains at least one turn. Use it with the opposite `sortDirection` to include the anchor turn again and catch updates to that turn.",
"type": [
"string",
"null"
]
},
"data": {
"items": {
"$ref": "#/definitions/v2/Turn"
},
"type": "array"
},
"nextCursor": {
"description": "Opaque cursor to pass to the next call to continue after the last turn. if None, there are no more turns to return.",
"type": [
"string",
"null"
]
}
},
"required": [
"data"
],
"title": "ThreadTurnsListResponse",
"type": "object"
},
"ThreadUnarchiveParams": {
"$schema": "http://json-schema.org/draft-07/schema#",
"properties": {

View File

@@ -1160,6 +1160,30 @@
"title": "Thread/readRequest",
"type": "object"
},
{
"properties": {
"id": {
"$ref": "#/definitions/RequestId"
},
"method": {
"enum": [
"thread/turns/list"
],
"title": "Thread/turns/listRequestMethod",
"type": "string"
},
"params": {
"$ref": "#/definitions/ThreadTurnsListParams"
}
},
"required": [
"id",
"method",
"params"
],
"title": "Thread/turns/listRequest",
"type": "object"
},
{
"properties": {
"id": {
@@ -10180,6 +10204,13 @@
"title": "SkillsListResponse",
"type": "object"
},
"SortDirection": {
"enum": [
"asc",
"desc"
],
"type": "string"
},
"SubAgentSource": {
"oneOf": [
{
@@ -11391,6 +11422,17 @@
"null"
]
},
"sortDirection": {
"anyOf": [
{
"$ref": "#/definitions/SortDirection"
},
{
"type": "null"
}
],
"description": "Optional sort direction; defaults to descending (newest first)."
},
"sortKey": {
"anyOf": [
{
@@ -11419,6 +11461,13 @@
"ThreadListResponse": {
"$schema": "http://json-schema.org/draft-07/schema#",
"properties": {
"backwardsCursor": {
"description": "Opaque cursor to pass as `cursor` when reversing `sortDirection`. This is only populated when the page contains at least one thread. Use it with the opposite `sortDirection`; for timestamp sorts it anchors at the start of the page timestamp so same-second updates are not skipped.",
"type": [
"string",
"null"
]
},
"data": {
"items": {
"$ref": "#/definitions/Thread"
@@ -12433,6 +12482,76 @@
"title": "ThreadTokenUsageUpdatedNotification",
"type": "object"
},
"ThreadTurnsListParams": {
"$schema": "http://json-schema.org/draft-07/schema#",
"properties": {
"cursor": {
"description": "Opaque cursor to pass to the next call to continue after the last turn.",
"type": [
"string",
"null"
]
},
"limit": {
"description": "Optional turn page size.",
"format": "uint32",
"minimum": 0.0,
"type": [
"integer",
"null"
]
},
"sortDirection": {
"anyOf": [
{
"$ref": "#/definitions/SortDirection"
},
{
"type": "null"
}
],
"description": "Optional turn pagination direction; defaults to descending."
},
"threadId": {
"type": "string"
}
},
"required": [
"threadId"
],
"title": "ThreadTurnsListParams",
"type": "object"
},
"ThreadTurnsListResponse": {
"$schema": "http://json-schema.org/draft-07/schema#",
"properties": {
"backwardsCursor": {
"description": "Opaque cursor to pass as `cursor` when reversing `sortDirection`. This is only populated when the page contains at least one turn. Use it with the opposite `sortDirection` to include the anchor turn again and catch updates to that turn.",
"type": [
"string",
"null"
]
},
"data": {
"items": {
"$ref": "#/definitions/Turn"
},
"type": "array"
},
"nextCursor": {
"description": "Opaque cursor to pass to the next call to continue after the last turn. if None, there are no more turns to return.",
"type": [
"string",
"null"
]
}
},
"required": [
"data"
],
"title": "ThreadTurnsListResponse",
"type": "object"
},
"ThreadUnarchiveParams": {
"$schema": "http://json-schema.org/draft-07/schema#",
"properties": {

View File

@@ -1,6 +1,13 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"definitions": {
"SortDirection": {
"enum": [
"asc",
"desc"
],
"type": "string"
},
"ThreadSortKey": {
"enum": [
"created_at",
@@ -72,6 +79,17 @@
"null"
]
},
"sortDirection": {
"anyOf": [
{
"$ref": "#/definitions/SortDirection"
},
{
"type": "null"
}
],
"description": "Optional sort direction; defaults to descending (newest first)."
},
"sortKey": {
"anyOf": [
{

View File

@@ -1931,6 +1931,13 @@
}
},
"properties": {
"backwardsCursor": {
"description": "Opaque cursor to pass as `cursor` when reversing `sortDirection`. This is only populated when the page contains at least one thread. Use it with the opposite `sortDirection`; for timestamp sorts it anchors at the start of the page timestamp so same-second updates are not skipped.",
"type": [
"string",
"null"
]
},
"data": {
"items": {
"$ref": "#/definitions/Thread"

View File

@@ -0,0 +1,49 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"definitions": {
"SortDirection": {
"enum": [
"asc",
"desc"
],
"type": "string"
}
},
"properties": {
"cursor": {
"description": "Opaque cursor to pass to the next call to continue after the last turn.",
"type": [
"string",
"null"
]
},
"limit": {
"description": "Optional turn page size.",
"format": "uint32",
"minimum": 0.0,
"type": [
"integer",
"null"
]
},
"sortDirection": {
"anyOf": [
{
"$ref": "#/definitions/SortDirection"
},
{
"type": "null"
}
],
"description": "Optional turn pagination direction; defaults to descending."
},
"threadId": {
"type": "string"
}
},
"required": [
"threadId"
],
"title": "ThreadTurnsListParams",
"type": "object"
}

File diff suppressed because it is too large Load Diff

File diff suppressed because one or more lines are too long

View File

@@ -0,0 +1,5 @@
// GENERATED CODE! DO NOT MODIFY BY HAND!
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
export type SortDirection = "asc" | "desc";

View File

@@ -1,6 +1,7 @@
// GENERATED CODE! DO NOT MODIFY BY HAND!
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
import type { SortDirection } from "./SortDirection";
import type { ThreadSortKey } from "./ThreadSortKey";
import type { ThreadSourceKind } from "./ThreadSourceKind";
@@ -17,6 +18,10 @@ limit?: number | null,
* Optional sort key; defaults to created_at.
*/
sortKey?: ThreadSortKey | null,
/**
* Optional sort direction; defaults to descending (newest first).
*/
sortDirection?: SortDirection | null,
/**
* Optional provider filter; when set, only sessions recorded under these
* providers are returned. When present but empty, includes all providers.

View File

@@ -8,4 +8,11 @@ export type ThreadListResponse = { data: Array<Thread>,
* Opaque cursor to pass to the next call to continue after the last item.
* if None, there are no more items to return.
*/
nextCursor: string | null, };
nextCursor: string | null,
/**
* Opaque cursor to pass as `cursor` when reversing `sortDirection`.
* This is only populated when the page contains at least one thread.
* Use it with the opposite `sortDirection`; for timestamp sorts it anchors
* at the start of the page timestamp so same-second updates are not skipped.
*/
backwardsCursor: string | null, };

View File

@@ -0,0 +1,18 @@
// GENERATED CODE! DO NOT MODIFY BY HAND!
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
import type { SortDirection } from "./SortDirection";
export type ThreadTurnsListParams = { threadId: string,
/**
* Opaque cursor to pass to the next call to continue after the last turn.
*/
cursor?: string | null,
/**
* Optional turn page size.
*/
limit?: number | null,
/**
* Optional turn pagination direction; defaults to descending.
*/
sortDirection?: SortDirection | null, };

View File

@@ -0,0 +1,18 @@
// GENERATED CODE! DO NOT MODIFY BY HAND!
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
import type { Turn } from "./Turn";
export type ThreadTurnsListResponse = { data: Array<Turn>,
/**
* Opaque cursor to pass to the next call to continue after the last turn.
* if None, there are no more turns to return.
*/
nextCursor: string | null,
/**
* Opaque cursor to pass as `cursor` when reversing `sortDirection`.
* This is only populated when the page contains at least one turn.
* Use it with the opposite `sortDirection` to include the anchor turn again
* and catch updates to that turn.
*/
backwardsCursor: string | null, };

View File

@@ -268,6 +268,7 @@ export type { SkillsListEntry } from "./SkillsListEntry";
export type { SkillsListExtraRootsForCwd } from "./SkillsListExtraRootsForCwd";
export type { SkillsListParams } from "./SkillsListParams";
export type { SkillsListResponse } from "./SkillsListResponse";
export type { SortDirection } from "./SortDirection";
export type { TerminalInteractionNotification } from "./TerminalInteractionNotification";
export type { TextElement } from "./TextElement";
export type { TextPosition } from "./TextPosition";
@@ -320,6 +321,8 @@ export type { ThreadStatus } from "./ThreadStatus";
export type { ThreadStatusChangedNotification } from "./ThreadStatusChangedNotification";
export type { ThreadTokenUsage } from "./ThreadTokenUsage";
export type { ThreadTokenUsageUpdatedNotification } from "./ThreadTokenUsageUpdatedNotification";
export type { ThreadTurnsListParams } from "./ThreadTurnsListParams";
export type { ThreadTurnsListResponse } from "./ThreadTurnsListResponse";
export type { ThreadUnarchiveParams } from "./ThreadUnarchiveParams";
export type { ThreadUnarchiveResponse } from "./ThreadUnarchiveResponse";
export type { ThreadUnarchivedNotification } from "./ThreadUnarchivedNotification";

View File

@@ -317,6 +317,10 @@ client_request_definitions! {
params: v2::ThreadReadParams,
response: v2::ThreadReadResponse,
},
ThreadTurnsList => "thread/turns/list" {
params: v2::ThreadTurnsListParams,
response: v2::ThreadTurnsListResponse,
},
SkillsList => "skills/list" {
params: v2::SkillsListParams,
response: v2::SkillsListResponse,

View File

@@ -975,8 +975,15 @@ impl ThreadHistoryBuilder {
}
fn new_turn(&mut self, id: Option<String>) -> PendingTurn {
let id = id.unwrap_or_else(|| {
if self.next_rollout_index == 0 {
Uuid::now_v7().to_string()
} else {
format!("rollout-{}", self.current_rollout_index)
}
});
PendingTurn {
id: id.unwrap_or_else(|| Uuid::now_v7().to_string()),
id,
items: Vec::new(),
error: None,
status: TurnStatus::Completed,
@@ -1622,8 +1629,8 @@ mod tests {
.collect::<Vec<_>>();
let turns = build_turns_from_rollout_items(&items);
assert_eq!(turns.len(), 2);
assert!(Uuid::parse_str(&turns[0].id).is_ok());
assert!(Uuid::parse_str(&turns[1].id).is_ok());
assert_eq!(turns[0].id, "rollout-0");
assert_eq!(turns[1].id, "rollout-5");
assert_ne!(turns[0].id, turns[1].id);
assert_eq!(turns[0].status, TurnStatus::Completed);
assert_eq!(turns[1].status, TurnStatus::Completed);

View File

@@ -3125,6 +3125,9 @@ pub struct ThreadListParams {
/// Optional sort key; defaults to created_at.
#[ts(optional = nullable)]
pub sort_key: Option<ThreadSortKey>,
/// Optional sort direction; defaults to descending (newest first).
#[ts(optional = nullable)]
pub sort_direction: Option<SortDirection>,
/// Optional provider filter; when set, only sessions recorded under these
/// providers are returned. When present but empty, includes all providers.
#[ts(optional = nullable)]
@@ -3172,6 +3175,14 @@ pub enum ThreadSortKey {
UpdatedAt,
}
#[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "snake_case")]
#[ts(export_to = "v2/")]
pub enum SortDirection {
Asc,
Desc,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
@@ -3180,6 +3191,11 @@ pub struct ThreadListResponse {
/// Opaque cursor to pass to the next call to continue after the last item.
/// if None, there are no more items to return.
pub next_cursor: Option<String>,
/// Opaque cursor to pass as `cursor` when reversing `sortDirection`.
/// This is only populated when the page contains at least one thread.
/// Use it with the opposite `sortDirection`; for timestamp sorts it anchors
/// at the start of the page timestamp so same-second updates are not skipped.
pub backwards_cursor: Option<String>,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Default, JsonSchema, TS)]
@@ -3245,6 +3261,37 @@ pub struct ThreadReadResponse {
pub thread: Thread,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct ThreadTurnsListParams {
pub thread_id: String,
/// Opaque cursor to pass to the next call to continue after the last turn.
#[ts(optional = nullable)]
pub cursor: Option<String>,
/// Optional turn page size.
#[ts(optional = nullable)]
pub limit: Option<u32>,
/// Optional turn pagination direction; defaults to descending.
#[ts(optional = nullable)]
pub sort_direction: Option<SortDirection>,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct ThreadTurnsListResponse {
pub data: Vec<Turn>,
/// Opaque cursor to pass to the next call to continue after the last turn.
/// if None, there are no more turns to return.
pub next_cursor: Option<String>,
/// Opaque cursor to pass as `cursor` when reversing `sortDirection`.
/// This is only populated when the page contains at least one turn.
/// Use it with the opposite `sortDirection` to include the anchor turn again
/// and catch updates to that turn.
pub backwards_cursor: Option<String>,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]

View File

@@ -1124,6 +1124,7 @@ async fn thread_list(endpoint: &Endpoint, config_overrides: &[String], limit: u3
cursor: None,
limit: Some(limit),
sort_key: None,
sort_direction: None,
model_providers: None,
source_kinds: None,
archived: None,

View File

@@ -139,6 +139,7 @@ Example with notification opt-out:
- `thread/list` — page through stored rollouts; supports cursor-based pagination and optional `modelProviders`, `sourceKinds`, `archived`, `cwd`, and `searchTerm` filters. Each returned `thread` includes `status` (`ThreadStatus`), defaulting to `notLoaded` when the thread is not currently loaded.
- `thread/loaded/list` — list the thread ids currently loaded in memory.
- `thread/read` — read a stored thread by id without resuming it; optionally include turns via `includeTurns`. The returned `thread` includes `status` (`ThreadStatus`), defaulting to `notLoaded` when the thread is not currently loaded.
- `thread/turns/list` — page through a stored threads turn history without resuming it; supports cursor-based pagination with `sortDirection`, `nextCursor`, and `backwardsCursor`.
- `thread/metadata/update` — patch stored thread metadata in sqlite; currently supports updating persisted `gitInfo` fields and returns the refreshed `thread`.
- `thread/status/changed` — notification emitted when a loaded threads status changes (`threadId` + new `status`).
- `thread/archive` — move a threads rollout file into the archived directory; returns `{}` on success and emits `thread/archived`.
@@ -274,11 +275,13 @@ Experimental API: `thread/start`, `thread/resume`, and `thread/fork` accept `per
- `cursor` — opaque string from a prior response; omit for the first page.
- `limit` — server defaults to a reasonable page size if unset.
- `sortKey``created_at` (default) or `updated_at`.
- `sortDirection``desc` (default) or `asc`.
- `modelProviders` — restrict results to specific providers; unset, null, or an empty array will include all providers.
- `sourceKinds` — restrict results to specific sources; omit or pass `[]` for interactive sessions only (`cli`, `vscode`).
- `archived` — when `true`, list archived threads only. When `false` or `null`, list non-archived threads (default).
- `cwd` — restrict results to threads whose session cwd exactly matches this path. Relative paths are resolved against the app-server process cwd before matching.
- `searchTerm` — restrict results to threads whose extracted title contains this substring (case-sensitive).
- Responses include `nextCursor` to continue in the same direction and `backwardsCursor` to pass as `cursor` when reversing `sortDirection`.
- Responses include `agentNickname` and `agentRole` for AgentControl-spawned thread sub-agents when available.
Example:
@@ -294,7 +297,8 @@ Example:
{ "id": "thr_a", "preview": "Create a TUI", "modelProvider": "openai", "createdAt": 1730831111, "updatedAt": 1730831111, "status": { "type": "notLoaded" }, "agentNickname": "Atlas", "agentRole": "explorer" },
{ "id": "thr_b", "preview": "Fix tests", "modelProvider": "openai", "createdAt": 1730750000, "updatedAt": 1730750000, "status": { "type": "notLoaded" } }
],
"nextCursor": "opaque-token-or-null"
"nextCursor": "opaque-token-or-null",
"backwardsCursor": "opaque-token-or-null"
} }
```
@@ -351,7 +355,7 @@ If this was the last subscriber, the server unloads the thread and emits `thread
### Example: Read a thread
Use `thread/read` to fetch a stored thread by id without resuming it. Pass `includeTurns` when you want the rollout history loaded into `thread.turns`. The returned thread includes `agentNickname` and `agentRole` for AgentControl-spawned thread sub-agents when available.
Use `thread/read` to fetch a stored thread by id without resuming it. Pass `includeTurns` when you want the full rollout history loaded into `thread.turns`. The returned thread includes `agentNickname` and `agentRole` for AgentControl-spawned thread sub-agents when available.
```json
{ "method": "thread/read", "id": 22, "params": { "threadId": "thr_123" } }
@@ -367,6 +371,23 @@ Use `thread/read` to fetch a stored thread by id without resuming it. Pass `incl
} }
```
### Example: List thread turns
Use `thread/turns/list` to page a stored threads turn history without resuming it. By default, results are sorted descending so clients can start at the present and fetch older turns with `nextCursor`. The response also includes `backwardsCursor`; pass it as `cursor` on a later request with `sortDirection: "asc"` to fetch turns newer than the first item from the earlier page.
```json
{ "method": "thread/turns/list", "id": 24, "params": {
"threadId": "thr_123",
"limit": 50,
"sortDirection": "desc"
} }
{ "id": 24, "result": {
"data": [ ... ],
"nextCursor": "older-turns-cursor-or-null",
"backwardsCursor": "newer-turns-cursor-or-null"
} }
```
### Example: Update stored thread metadata
Use `thread/metadata/update` to patch sqlite-backed metadata for a thread without resuming it. Today this supports persisted `gitInfo`; omitted fields are left unchanged, while explicit `null` clears a stored value.

View File

@@ -117,6 +117,7 @@ use codex_app_server_protocol::SkillsConfigWriteParams;
use codex_app_server_protocol::SkillsConfigWriteResponse;
use codex_app_server_protocol::SkillsListParams;
use codex_app_server_protocol::SkillsListResponse;
use codex_app_server_protocol::SortDirection;
use codex_app_server_protocol::Thread;
use codex_app_server_protocol::ThreadArchiveParams;
use codex_app_server_protocol::ThreadArchiveResponse;
@@ -167,6 +168,8 @@ use codex_app_server_protocol::ThreadStartParams;
use codex_app_server_protocol::ThreadStartResponse;
use codex_app_server_protocol::ThreadStartedNotification;
use codex_app_server_protocol::ThreadStatus;
use codex_app_server_protocol::ThreadTurnsListParams;
use codex_app_server_protocol::ThreadTurnsListResponse;
use codex_app_server_protocol::ThreadUnarchiveParams;
use codex_app_server_protocol::ThreadUnarchiveResponse;
use codex_app_server_protocol::ThreadUnarchivedNotification;
@@ -354,6 +357,8 @@ use crate::thread_state::ThreadStateManager;
const THREAD_LIST_DEFAULT_LIMIT: usize = 25;
const THREAD_LIST_MAX_LIMIT: usize = 100;
const THREAD_TURNS_DEFAULT_LIMIT: usize = 25;
const THREAD_TURNS_MAX_LIMIT: usize = 100;
struct ThreadListFilters {
model_providers: Option<Vec<String>>,
@@ -785,6 +790,10 @@ impl CodexMessageProcessor {
self.thread_read(to_connection_request_id(request_id), params)
.await;
}
ClientRequest::ThreadTurnsList { request_id, params } => {
self.thread_turns_list(to_connection_request_id(request_id), params)
.await;
}
ClientRequest::ThreadShellCommand { request_id, params } => {
self.thread_shell_command(to_connection_request_id(request_id), params)
.await;
@@ -3422,6 +3431,7 @@ impl CodexMessageProcessor {
cursor,
limit,
sort_key,
sort_direction,
model_providers,
source_kinds,
archived,
@@ -3444,11 +3454,12 @@ impl CodexMessageProcessor {
ThreadSortKey::CreatedAt => CoreThreadSortKey::CreatedAt,
ThreadSortKey::UpdatedAt => CoreThreadSortKey::UpdatedAt,
};
let (summaries, next_cursor) = match self
let list_result = self
.list_threads_common(
requested_page_size,
cursor,
core_sort_key,
sort_direction.unwrap_or(SortDirection::Desc),
ThreadListFilters {
model_providers,
source_kinds,
@@ -3457,8 +3468,8 @@ impl CodexMessageProcessor {
search_term,
},
)
.await
{
.await;
let (summaries, next_cursor) = match list_result {
Ok(r) => r,
Err(error) => {
self.outgoing.send_error(request_id, error).await;
@@ -3485,7 +3496,7 @@ impl CodexMessageProcessor {
.loaded_statuses_for_threads(status_ids)
.await;
let data = threads
let data: Vec<_> = threads
.into_iter()
.map(|(conversation_id, mut thread)| {
if let Some(title) = names.get(&conversation_id).cloned() {
@@ -3497,7 +3508,14 @@ impl CodexMessageProcessor {
thread
})
.collect();
let response = ThreadListResponse { data, next_cursor };
let backwards_cursor = data
.first()
.and_then(|thread| thread_backwards_cursor_for_sort_key(thread, core_sort_key));
let response = ThreadListResponse {
data,
next_cursor,
backwards_cursor,
};
self.outgoing.send_response(request_id, response).await;
}
@@ -3714,6 +3732,155 @@ impl CodexMessageProcessor {
self.outgoing.send_response(request_id, response).await;
}
async fn thread_turns_list(
&self,
request_id: ConnectionRequestId,
params: ThreadTurnsListParams,
) {
let ThreadTurnsListParams {
thread_id,
cursor,
limit,
sort_direction,
} = params;
let thread_uuid = match ThreadId::from_string(&thread_id) {
Ok(id) => id,
Err(err) => {
self.send_invalid_request_error(request_id, format!("invalid thread id: {err}"))
.await;
return;
}
};
let state_db_ctx = get_state_db(&self.config).await;
let mut rollout_path = self
.resolve_rollout_path(thread_uuid, state_db_ctx.as_ref())
.await;
if rollout_path.is_none() {
rollout_path =
match find_thread_path_by_id_str(&self.config.codex_home, &thread_uuid.to_string())
.await
{
Ok(Some(path)) => Some(path),
Ok(None) => match find_archived_thread_path_by_id_str(
&self.config.codex_home,
&thread_uuid.to_string(),
)
.await
{
Ok(path) => path,
Err(err) => {
self.send_invalid_request_error(
request_id,
format!("failed to locate archived thread id {thread_uuid}: {err}"),
)
.await;
return;
}
},
Err(err) => {
self.send_invalid_request_error(
request_id,
format!("failed to locate thread id {thread_uuid}: {err}"),
)
.await;
return;
}
};
}
if rollout_path.is_none() {
match self.thread_manager.get_thread(thread_uuid).await {
Ok(thread) => {
rollout_path = thread.rollout_path();
if rollout_path.is_none() {
self.send_invalid_request_error(
request_id,
"ephemeral threads do not support thread/turns/list".to_string(),
)
.await;
return;
}
}
Err(_) => {
self.send_invalid_request_error(
request_id,
format!("thread not loaded: {thread_uuid}"),
)
.await;
return;
}
}
}
let Some(rollout_path) = rollout_path.as_ref() else {
self.send_internal_error(
request_id,
format!("failed to locate rollout for thread {thread_uuid}"),
)
.await;
return;
};
match read_rollout_items_from_rollout(rollout_path).await {
Ok(items) => {
// Rollback and compaction events can change earlier turns, so pagination
// has to replay the full rollout until turn metadata is indexed separately.
let mut turns = build_turns_from_rollout_items(&items);
let has_live_in_progress_turn =
match self.thread_manager.get_thread(thread_uuid).await {
Ok(thread) => matches!(thread.agent_status().await, AgentStatus::Running),
Err(_) => false,
};
normalize_thread_turns_status(
&mut turns,
self.thread_watch_manager
.loaded_status_for_thread(&thread_uuid.to_string())
.await,
has_live_in_progress_turn,
);
let page = match paginate_thread_turns(
turns,
cursor.as_deref(),
limit,
sort_direction.unwrap_or(SortDirection::Desc),
) {
Ok(page) => page,
Err(error) => {
self.outgoing.send_error(request_id, error).await;
return;
}
};
let response = ThreadTurnsListResponse {
data: page.turns,
next_cursor: page.next_cursor,
backwards_cursor: page.backwards_cursor,
};
self.outgoing.send_response(request_id, response).await;
}
Err(err) if err.kind() == std::io::ErrorKind::NotFound => {
self.send_invalid_request_error(
request_id,
format!(
"thread {thread_uuid} is not materialized yet; thread/turns/list is unavailable before first user message"
),
)
.await;
}
Err(err) => {
self.send_internal_error(
request_id,
format!(
"failed to load rollout `{}` for thread {thread_uuid}: {err}",
rollout_path.display()
),
)
.await;
}
}
}
pub(crate) fn thread_created_receiver(&self) -> broadcast::Receiver<ThreadId> {
self.thread_manager.subscribe_thread_created()
}
@@ -4679,6 +4846,7 @@ impl CodexMessageProcessor {
requested_page_size: usize,
cursor: Option<String>,
sort_key: CoreThreadSortKey,
sort_direction: SortDirection,
filters: ThreadListFilters,
) -> Result<(Vec<ConversationSummary>, Option<String>), JSONRPCErrorError> {
let ThreadListFilters {
@@ -4689,13 +4857,15 @@ impl CodexMessageProcessor {
search_term,
} = filters;
let mut cursor_obj: Option<RolloutCursor> = match cursor.as_ref() {
Some(cursor_str) => {
Some(parse_cursor(cursor_str).ok_or_else(|| JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: format!("invalid cursor: {cursor_str}"),
data: None,
})?)
}
Some(cursor_str) => Some(
parse_thread_list_cursor(cursor_str, sort_direction).ok_or_else(|| {
JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: format!("invalid cursor: {cursor_str}"),
data: None,
}
})?,
),
None => None,
};
let mut last_cursor = cursor_obj.clone();
@@ -4717,6 +4887,10 @@ impl CodexMessageProcessor {
let (allowed_sources_vec, source_kind_filter) = compute_source_filters(source_kinds);
let allowed_sources = allowed_sources_vec.as_slice();
let state_db_ctx = get_state_db(&self.config).await;
let core_sort_direction = match sort_direction {
SortDirection::Asc => codex_core::SortDirection::Asc,
SortDirection::Desc => codex_core::SortDirection::Desc,
};
while remaining > 0 {
let page_size = remaining.min(THREAD_LIST_MAX_LIMIT);
@@ -4726,37 +4900,35 @@ impl CodexMessageProcessor {
page_size,
cursor_obj.as_ref(),
sort_key,
core_sort_direction,
allowed_sources,
model_provider_filter.as_deref(),
fallback_provider.as_str(),
search_term.as_deref(),
)
.await
.map_err(|err| JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message: format!("failed to list threads: {err}"),
data: None,
})?
} else {
RolloutRecorder::list_threads(
&self.config,
page_size,
cursor_obj.as_ref(),
sort_key,
core_sort_direction,
allowed_sources,
model_provider_filter.as_deref(),
fallback_provider.as_str(),
search_term.as_deref(),
)
.await
.map_err(|err| JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message: format!("failed to list threads: {err}"),
data: None,
})?
};
}
.map_err(|err| JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message: format!("failed to list threads: {err}"),
data: None,
})?;
let mut filtered = Vec::with_capacity(page.items.len());
let next_cursor_value = page.next_cursor.clone();
let mut candidate_summaries = Vec::with_capacity(page.items.len());
for it in page.items {
let Some(summary) = summary_from_thread_list_item(
it,
@@ -4767,6 +4939,11 @@ impl CodexMessageProcessor {
else {
continue;
};
candidate_summaries.push(summary);
}
let mut filtered = Vec::with_capacity(candidate_summaries.len());
for summary in candidate_summaries {
if source_kind_filter
.as_ref()
.is_none_or(|filter| source_kind_matches(&summary.source, filter))
@@ -4784,7 +4961,6 @@ impl CodexMessageProcessor {
remaining = requested_page_size.saturating_sub(items.len());
// Encode RolloutCursor into the JSON-RPC string form returned to clients.
let next_cursor_value = page.next_cursor.clone();
next_cursor = next_cursor_value
.as_ref()
.and_then(|cursor| serde_json::to_value(cursor).ok())
@@ -9247,6 +9423,180 @@ pub(crate) fn summary_to_thread(summary: ConversationSummary) -> Thread {
}
}
fn thread_backwards_cursor_for_sort_key(
thread: &Thread,
sort_key: CoreThreadSortKey,
) -> Option<String> {
let timestamp = match sort_key {
CoreThreadSortKey::CreatedAt => thread.created_at,
CoreThreadSortKey::UpdatedAt => thread.updated_at,
};
let timestamp = DateTime::<Utc>::from_timestamp(timestamp, /*nsecs*/ 0)?;
serde_json::to_string(&ThreadListCursor {
timestamp: timestamp.to_rfc3339_opts(SecondsFormat::Secs, true),
id: thread.id.clone(),
include_timestamp_bucket: true,
})
.ok()
}
#[derive(serde::Serialize, serde::Deserialize)]
#[serde(rename_all = "camelCase")]
struct ThreadListCursor {
timestamp: String,
id: String,
include_timestamp_bucket: bool,
}
fn parse_thread_list_cursor(cursor: &str, sort_direction: SortDirection) -> Option<RolloutCursor> {
if let Ok(cursor) = serde_json::from_str::<ThreadListCursor>(cursor) {
let id = if cursor.include_timestamp_bucket {
match sort_direction {
SortDirection::Asc => Uuid::nil().to_string(),
SortDirection::Desc => "ffffffff-ffff-ffff-ffff-ffffffffffff".to_string(),
}
} else {
cursor.id
};
return parse_cursor(&format!("{}|{}", cursor.timestamp, id));
}
parse_cursor(cursor)
}
struct ThreadTurnsPage {
turns: Vec<Turn>,
next_cursor: Option<String>,
backwards_cursor: Option<String>,
}
#[derive(serde::Serialize, serde::Deserialize)]
#[serde(rename_all = "camelCase")]
struct ThreadTurnsCursor {
turn_id: String,
include_anchor: bool,
}
fn paginate_thread_turns(
turns: Vec<Turn>,
cursor: Option<&str>,
limit: Option<u32>,
sort_direction: SortDirection,
) -> Result<ThreadTurnsPage, JSONRPCErrorError> {
if turns.is_empty() {
return Ok(ThreadTurnsPage {
turns: Vec::new(),
next_cursor: None,
backwards_cursor: None,
});
}
let anchor = cursor.map(parse_thread_turns_cursor).transpose()?;
let page_size = limit
.map(|value| value as usize)
.unwrap_or(THREAD_TURNS_DEFAULT_LIMIT)
.clamp(1, THREAD_TURNS_MAX_LIMIT);
let anchor_index = anchor
.as_ref()
.and_then(|anchor| turns.iter().position(|turn| turn.id == anchor.turn_id));
if anchor.is_some() && anchor_index.is_none() {
return Ok(ThreadTurnsPage {
turns: Vec::new(),
next_cursor: None,
backwards_cursor: None,
});
}
let mut keyed_turns: Vec<_> = turns.into_iter().enumerate().collect();
match sort_direction {
SortDirection::Asc => {
if let (Some(anchor), Some(anchor_index)) = (anchor.as_ref(), anchor_index) {
keyed_turns.retain(|(index, _)| {
if anchor.include_anchor {
*index >= anchor_index
} else {
*index > anchor_index
}
});
}
}
SortDirection::Desc => {
keyed_turns.reverse();
if let (Some(anchor), Some(anchor_index)) = (anchor.as_ref(), anchor_index) {
keyed_turns.retain(|(index, _)| {
if anchor.include_anchor {
*index <= anchor_index
} else {
*index < anchor_index
}
});
}
}
}
let more_turns_available = keyed_turns.len() > page_size;
keyed_turns.truncate(page_size);
let backwards_cursor = keyed_turns
.first()
.map(|(_, turn)| serialize_thread_turns_cursor(&turn.id, /*include_anchor*/ true))
.transpose()?;
let next_cursor = if more_turns_available {
keyed_turns
.last()
.map(|(_, turn)| serialize_thread_turns_cursor(&turn.id, /*include_anchor*/ false))
.transpose()?
} else {
None
};
let turns = keyed_turns.into_iter().map(|(_, turn)| turn).collect();
Ok(ThreadTurnsPage {
turns,
next_cursor,
backwards_cursor,
})
}
fn serialize_thread_turns_cursor(
turn_id: &str,
include_anchor: bool,
) -> Result<String, JSONRPCErrorError> {
serde_json::to_string(&ThreadTurnsCursor {
turn_id: turn_id.to_string(),
include_anchor,
})
.map_err(|err| JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message: format!("failed to serialize cursor: {err}"),
data: None,
})
}
fn parse_thread_turns_cursor(cursor: &str) -> Result<ThreadTurnsCursor, JSONRPCErrorError> {
serde_json::from_str(cursor).map_err(|_| JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: format!("invalid cursor: {cursor}"),
data: None,
})
}
fn normalize_thread_turns_status(
turns: &mut [Turn],
loaded_status: ThreadStatus,
has_live_in_progress_turn: bool,
) {
let status = resolve_thread_status(loaded_status, has_live_in_progress_turn);
if matches!(status, ThreadStatus::Active { .. }) {
return;
}
for turn in turns {
if matches!(turn.status, TurnStatus::InProgress) {
turn.status = TurnStatus::Interrupted;
}
}
}
#[cfg(test)]
mod tests {
use super::*;

View File

@@ -76,6 +76,7 @@ use codex_app_server_protocol::ThreadRollbackParams;
use codex_app_server_protocol::ThreadSetNameParams;
use codex_app_server_protocol::ThreadShellCommandParams;
use codex_app_server_protocol::ThreadStartParams;
use codex_app_server_protocol::ThreadTurnsListParams;
use codex_app_server_protocol::ThreadUnarchiveParams;
use codex_app_server_protocol::ThreadUnsubscribeParams;
use codex_app_server_protocol::TurnCompletedNotification;
@@ -451,6 +452,15 @@ impl McpProcess {
self.send_request("thread/read", params).await
}
/// Send a `thread/turns/list` JSON-RPC request.
pub async fn send_thread_turns_list_request(
&mut self,
params: ThreadTurnsListParams,
) -> anyhow::Result<i64> {
let params = Some(serde_json::to_value(params)?);
self.send_request("thread/turns/list", params).await
}
/// Send a `model/list` JSON-RPC request.
pub async fn send_list_models_request(
&mut self,

View File

@@ -485,6 +485,7 @@ async fn thread_fork_ephemeral_remains_pathless_and_omits_listing() -> Result<()
cursor: None,
limit: Some(10),
sort_key: None,
sort_direction: None,
model_providers: None,
source_kinds: None,
archived: None,

View File

@@ -13,6 +13,7 @@ use codex_app_server_protocol::JSONRPCError;
use codex_app_server_protocol::JSONRPCResponse;
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::SessionSource;
use codex_app_server_protocol::SortDirection;
use codex_app_server_protocol::ThreadListResponse;
use codex_app_server_protocol::ThreadSortKey;
use codex_app_server_protocol::ThreadSourceKind;
@@ -84,6 +85,7 @@ async fn list_threads_with_sort(
cursor,
limit,
sort_key,
sort_direction: None,
model_providers: providers,
source_kinds,
archived,
@@ -357,6 +359,7 @@ async fn thread_list_pagination_next_cursor_none_on_last_page() -> Result<()> {
let ThreadListResponse {
data: data1,
next_cursor: cursor1,
..
} = list_threads(
&mut mcp,
/*cursor*/ None,
@@ -384,6 +387,7 @@ async fn thread_list_pagination_next_cursor_none_on_last_page() -> Result<()> {
let ThreadListResponse {
data: data2,
next_cursor: cursor2,
..
} = list_threads(
&mut mcp,
Some(cursor1),
@@ -498,6 +502,7 @@ async fn thread_list_respects_cwd_filter() -> Result<()> {
cursor: None,
limit: Some(10),
sort_key: None,
sort_direction: None,
model_providers: Some(vec!["mock_provider".to_string()]),
source_kinds: None,
archived: None,
@@ -579,6 +584,7 @@ sqlite = true
cursor: None,
limit: Some(10),
sort_key: None,
sort_direction: None,
model_providers: Some(vec!["mock_provider".to_string()]),
source_kinds: None,
archived: None,
@@ -1233,6 +1239,114 @@ async fn thread_list_updated_at_paginates_with_cursor() -> Result<()> {
Ok(())
}
#[tokio::test]
async fn thread_list_backwards_cursor_can_seed_forward_delta_sync() -> Result<()> {
let codex_home = TempDir::new()?;
create_minimal_config(codex_home.path())?;
let id_old = create_fake_rollout(
codex_home.path(),
"2025-02-01T10-00-00",
"2025-02-01T10:00:00Z",
"Hello",
Some("mock_provider"),
/*git_info*/ None,
)?;
let id_watermark = create_fake_rollout(
codex_home.path(),
"2025-02-01T11-00-00",
"2025-02-01T11:00:00Z",
"Hello",
Some("mock_provider"),
/*git_info*/ None,
)?;
set_rollout_mtime(
rollout_path(codex_home.path(), "2025-02-01T10-00-00", &id_old).as_path(),
"2025-02-02T00:00:00Z",
)?;
set_rollout_mtime(
rollout_path(codex_home.path(), "2025-02-01T11-00-00", &id_watermark).as_path(),
"2025-02-03T00:00:00Z",
)?;
let mut mcp = init_mcp(codex_home.path()).await?;
let ThreadListResponse {
data: page1,
backwards_cursor,
..
} = {
let request_id = mcp
.send_thread_list_request(codex_app_server_protocol::ThreadListParams {
cursor: None,
limit: Some(1),
sort_key: Some(ThreadSortKey::UpdatedAt),
sort_direction: Some(SortDirection::Desc),
model_providers: Some(vec!["mock_provider".to_string()]),
source_kinds: None,
archived: None,
cwd: None,
search_term: None,
})
.await?;
let resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(request_id)),
)
.await??;
to_response::<ThreadListResponse>(resp)?
};
let ids_page1: Vec<_> = page1.iter().map(|thread| thread.id.as_str()).collect();
assert_eq!(ids_page1, vec![id_watermark.as_str()]);
let backwards_cursor = backwards_cursor.expect("expected backwardsCursor on first page");
assert!(
backwards_cursor.contains(&id_watermark),
"backwardsCursor should preserve the actual first thread id"
);
let id_new = create_fake_rollout(
codex_home.path(),
"2025-02-01T12-00-00",
"2025-02-01T12:00:00Z",
"Hello",
Some("mock_provider"),
/*git_info*/ None,
)?;
set_rollout_mtime(
rollout_path(codex_home.path(), "2025-02-01T12-00-00", &id_new).as_path(),
"2025-02-04T00:00:00Z",
)?;
let ThreadListResponse {
data: delta_page, ..
} = {
let request_id = mcp
.send_thread_list_request(codex_app_server_protocol::ThreadListParams {
cursor: Some(backwards_cursor),
limit: Some(10),
sort_key: Some(ThreadSortKey::UpdatedAt),
sort_direction: Some(SortDirection::Asc),
model_providers: Some(vec!["mock_provider".to_string()]),
source_kinds: None,
archived: None,
cwd: None,
search_term: None,
})
.await?;
let resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(request_id)),
)
.await??;
to_response::<ThreadListResponse>(resp)?
};
let ids_delta: Vec<_> = delta_page.iter().map(|thread| thread.id.as_str()).collect();
assert_eq!(ids_delta, vec![id_watermark.as_str(), id_new.as_str()]);
Ok(())
}
#[tokio::test]
async fn thread_list_created_at_tie_breaks_by_uuid() -> Result<()> {
let codex_home = TempDir::new()?;
@@ -1449,6 +1563,7 @@ async fn thread_list_invalid_cursor_returns_error() -> Result<()> {
cursor: Some("not-a-cursor".to_string()),
limit: Some(2),
sort_key: None,
sort_direction: None,
model_providers: Some(vec!["mock_provider".to_string()]),
source_kinds: None,
archived: None,

View File

@@ -109,7 +109,7 @@ async fn thread_metadata_update_patches_git_branch_and_returns_updated_thread()
mcp.read_stream_until_response_message(RequestId::Integer(read_id)),
)
.await??;
let ThreadReadResponse { thread: read } = to_response::<ThreadReadResponse>(read_resp)?;
let ThreadReadResponse { thread: read, .. } = to_response::<ThreadReadResponse>(read_resp)?;
assert_eq!(
read.git_info,
@@ -421,7 +421,7 @@ async fn thread_metadata_update_can_clear_stored_git_fields() -> Result<()> {
mcp.read_stream_until_response_message(RequestId::Integer(read_id)),
)
.await??;
let ThreadReadResponse { thread: read } = to_response::<ThreadReadResponse>(read_resp)?;
let ThreadReadResponse { thread: read, .. } = to_response::<ThreadReadResponse>(read_resp)?;
assert_eq!(read.git_info, None);

View File

@@ -2,11 +2,13 @@ use anyhow::Result;
use app_test_support::McpProcess;
use app_test_support::create_fake_rollout_with_text_elements;
use app_test_support::create_mock_responses_server_repeating_assistant;
use app_test_support::rollout_path;
use app_test_support::to_response;
use codex_app_server_protocol::JSONRPCError;
use codex_app_server_protocol::JSONRPCResponse;
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::SessionSource;
use codex_app_server_protocol::SortDirection;
use codex_app_server_protocol::ThreadForkParams;
use codex_app_server_protocol::ThreadForkResponse;
use codex_app_server_protocol::ThreadItem;
@@ -22,6 +24,8 @@ use codex_app_server_protocol::ThreadSetNameResponse;
use codex_app_server_protocol::ThreadStartParams;
use codex_app_server_protocol::ThreadStartResponse;
use codex_app_server_protocol::ThreadStatus;
use codex_app_server_protocol::ThreadTurnsListParams;
use codex_app_server_protocol::ThreadTurnsListResponse;
use codex_app_server_protocol::TurnStartParams;
use codex_app_server_protocol::TurnStartResponse;
use codex_app_server_protocol::TurnStatus;
@@ -31,6 +35,8 @@ use codex_protocol::user_input::TextElement;
use core_test_support::responses;
use pretty_assertions::assert_eq;
use serde_json::Value;
use serde_json::json;
use std::io::Write;
use std::path::Path;
use std::path::PathBuf;
use tempfile::TempDir;
@@ -76,7 +82,7 @@ async fn thread_read_returns_summary_without_turns() -> Result<()> {
mcp.read_stream_until_response_message(RequestId::Integer(read_id)),
)
.await??;
let ThreadReadResponse { thread } = to_response::<ThreadReadResponse>(read_resp)?;
let ThreadReadResponse { thread, .. } = to_response::<ThreadReadResponse>(read_resp)?;
assert_eq!(thread.id, conversation_id);
assert_eq!(thread.preview, preview);
@@ -131,7 +137,7 @@ async fn thread_read_can_include_turns() -> Result<()> {
mcp.read_stream_until_response_message(RequestId::Integer(read_id)),
)
.await??;
let ThreadReadResponse { thread } = to_response::<ThreadReadResponse>(read_resp)?;
let ThreadReadResponse { thread, .. } = to_response::<ThreadReadResponse>(read_resp)?;
assert_eq!(thread.turns.len(), 1);
let turn = &thread.turns[0];
@@ -154,6 +160,88 @@ async fn thread_read_can_include_turns() -> Result<()> {
Ok(())
}
#[tokio::test]
async fn thread_turns_list_can_page_backward_and_forward() -> Result<()> {
let server = create_mock_responses_server_repeating_assistant("Done").await;
let codex_home = TempDir::new()?;
create_config_toml(codex_home.path(), &server.uri())?;
let filename_ts = "2025-01-05T12-00-00";
let conversation_id = create_fake_rollout_with_text_elements(
codex_home.path(),
filename_ts,
"2025-01-05T12:00:00Z",
"first",
vec![],
Some("mock_provider"),
/*git_info*/ None,
)?;
let rollout_path = rollout_path(codex_home.path(), filename_ts, &conversation_id);
append_user_message(rollout_path.as_path(), "2025-01-05T12:01:00Z", "second")?;
append_user_message(rollout_path.as_path(), "2025-01-05T12:02:00Z", "third")?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let read_id = mcp
.send_thread_turns_list_request(ThreadTurnsListParams {
thread_id: conversation_id.clone(),
cursor: None,
limit: Some(2),
sort_direction: Some(SortDirection::Desc),
})
.await?;
let read_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(read_id)),
)
.await??;
let ThreadTurnsListResponse {
data,
next_cursor,
backwards_cursor,
} = to_response::<ThreadTurnsListResponse>(read_resp)?;
assert_eq!(turn_user_texts(&data), vec!["third", "second"]);
let next_cursor = next_cursor.expect("expected nextCursor for older turns");
let backwards_cursor = backwards_cursor.expect("expected backwardsCursor for newest turn");
let read_id = mcp
.send_thread_turns_list_request(ThreadTurnsListParams {
thread_id: conversation_id.clone(),
cursor: Some(next_cursor),
limit: Some(10),
sort_direction: Some(SortDirection::Desc),
})
.await?;
let read_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(read_id)),
)
.await??;
let ThreadTurnsListResponse { data, .. } = to_response::<ThreadTurnsListResponse>(read_resp)?;
assert_eq!(turn_user_texts(&data), vec!["first"]);
append_user_message(rollout_path.as_path(), "2025-01-05T12:03:00Z", "fourth")?;
let read_id = mcp
.send_thread_turns_list_request(ThreadTurnsListParams {
thread_id: conversation_id,
cursor: Some(backwards_cursor),
limit: Some(10),
sort_direction: Some(SortDirection::Asc),
})
.await?;
let read_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(read_id)),
)
.await??;
let ThreadTurnsListResponse { data, .. } = to_response::<ThreadTurnsListResponse>(read_resp)?;
assert_eq!(turn_user_texts(&data), vec!["third", "fourth"]);
Ok(())
}
#[tokio::test]
async fn thread_read_returns_forked_from_id_for_forked_threads() -> Result<()> {
let server = create_mock_responses_server_repeating_assistant("Done").await;
@@ -197,7 +285,7 @@ async fn thread_read_returns_forked_from_id_for_forked_threads() -> Result<()> {
mcp.read_stream_until_response_message(RequestId::Integer(read_id)),
)
.await??;
let ThreadReadResponse { thread } = to_response::<ThreadReadResponse>(read_resp)?;
let ThreadReadResponse { thread, .. } = to_response::<ThreadReadResponse>(read_resp)?;
assert_eq!(thread.forked_from_id, Some(conversation_id));
@@ -242,7 +330,7 @@ async fn thread_read_loaded_thread_returns_precomputed_path_before_materializati
mcp.read_stream_until_response_message(RequestId::Integer(read_id)),
)
.await??;
let ThreadReadResponse { thread: read } = to_response::<ThreadReadResponse>(read_resp)?;
let ThreadReadResponse { thread: read, .. } = to_response::<ThreadReadResponse>(read_resp)?;
assert_eq!(read.id, thread.id);
assert_eq!(read.path, Some(thread_path));
@@ -310,7 +398,7 @@ async fn thread_name_set_is_reflected_in_read_list_and_resume() -> Result<()> {
)
.await??;
let read_result = read_resp.result.clone();
let ThreadReadResponse { thread } = to_response::<ThreadReadResponse>(read_resp)?;
let ThreadReadResponse { thread, .. } = to_response::<ThreadReadResponse>(read_resp)?;
assert_eq!(thread.id, conversation_id);
assert_eq!(thread.name.as_deref(), Some(new_name));
let thread_json = read_result
@@ -334,6 +422,7 @@ async fn thread_name_set_is_reflected_in_read_list_and_resume() -> Result<()> {
cursor: None,
limit: Some(50),
sort_key: None,
sort_direction: None,
model_providers: Some(vec!["mock_provider".to_string()]),
source_kinds: None,
archived: None,
@@ -519,13 +608,47 @@ async fn thread_read_reports_system_error_idle_flag_after_failed_turn() -> Resul
mcp.read_stream_until_response_message(RequestId::Integer(read_id)),
)
.await??;
let ThreadReadResponse { thread } = to_response::<ThreadReadResponse>(read_resp)?;
let ThreadReadResponse { thread, .. } = to_response::<ThreadReadResponse>(read_resp)?;
assert_eq!(thread.status, ThreadStatus::SystemError,);
Ok(())
}
fn append_user_message(path: &Path, timestamp: &str, text: &str) -> std::io::Result<()> {
let mut file = std::fs::OpenOptions::new().append(true).open(path)?;
writeln!(
file,
"{}",
json!({
"timestamp": timestamp,
"type":"event_msg",
"payload": {
"type":"user_message",
"message": text,
"text_elements": [],
"local_images": []
}
})
)
}
fn turn_user_texts(turns: &[codex_app_server_protocol::Turn]) -> Vec<&str> {
turns
.iter()
.filter_map(|turn| match turn.items.first()? {
ThreadItem::UserMessage { content, .. } => match content.first()? {
UserInput::Text { text, .. } => Some(text.as_str()),
UserInput::Image { .. }
| UserInput::LocalImage { .. }
| UserInput::Skill { .. }
| UserInput::Mention { .. } => None,
},
_ => None,
})
.collect()
}
// Helper to create a config.toml pointing at the mock model server.
fn create_config_toml(codex_home: &Path, server_uri: &str) -> std::io::Result<()> {
let config_toml = codex_home.join("config.toml");

View File

@@ -571,6 +571,7 @@ async fn thread_resume_and_read_interrupt_incomplete_rollout_turn_when_thread_is
.await??;
let ThreadReadResponse {
thread: read_thread,
..
} = to_response::<ThreadReadResponse>(read_resp)?;
assert_eq!(read_thread.status, ThreadStatus::Idle);

View File

@@ -135,7 +135,7 @@ async fn thread_shell_command_runs_as_standalone_turn_and_persists_history() ->
mcp.read_stream_until_response_message(RequestId::Integer(read_id)),
)
.await??;
let ThreadReadResponse { thread } = to_response::<ThreadReadResponse>(read_resp)?;
let ThreadReadResponse { thread, .. } = to_response::<ThreadReadResponse>(read_resp)?;
assert_eq!(thread.turns.len(), 1);
let ThreadItem::CommandExecution {
source,
@@ -305,7 +305,7 @@ async fn thread_shell_command_uses_existing_active_turn() -> Result<()> {
mcp.read_stream_until_response_message(RequestId::Integer(read_id)),
)
.await??;
let ThreadReadResponse { thread } = to_response::<ThreadReadResponse>(read_resp)?;
let ThreadReadResponse { thread, .. } = to_response::<ThreadReadResponse>(read_resp)?;
assert_eq!(thread.turns.len(), 1);
assert!(
thread.turns[0].items.iter().any(|item| {

View File

@@ -276,7 +276,7 @@ async fn thread_unsubscribe_clears_cached_status_before_resume() -> Result<()> {
mcp.read_stream_until_response_message(RequestId::Integer(read_id)),
)
.await??;
let ThreadReadResponse { thread } = to_response::<ThreadReadResponse>(read_resp)?;
let ThreadReadResponse { thread, .. } = to_response::<ThreadReadResponse>(read_resp)?;
assert_eq!(thread.status, ThreadStatus::SystemError);
let unsubscribe_id = mcp

View File

@@ -160,6 +160,7 @@ pub use rollout::RolloutRecorder;
pub use rollout::RolloutRecorderParams;
pub use rollout::SESSIONS_SUBDIR;
pub use rollout::SessionMeta;
pub use rollout::SortDirection;
pub use rollout::ThreadItem;
pub use rollout::ThreadSortKey;
pub use rollout::ThreadsPage;

View File

@@ -4,7 +4,9 @@ use crate::event_mapping::is_contextual_user_message_content;
use chrono::Utc;
use codex_git_utils::resolve_root_git_project_for_trust;
use codex_protocol::models::ResponseItem;
use codex_state::SortDirection;
use codex_state::SortKey;
use codex_state::ThreadFilterOptions;
use codex_state::ThreadMetadata;
use codex_utils_output_truncation::TruncationPolicy;
use codex_utils_output_truncation::truncate_text;
@@ -124,12 +126,15 @@ async fn load_recent_threads(sess: &Session) -> Vec<ThreadMetadata> {
match state_db
.list_threads(
MAX_RECENT_THREADS,
/*anchor*/ None,
SortKey::UpdatedAt,
&[],
/*model_providers*/ None,
/*archived_only*/ false,
/*search_term*/ None,
ThreadFilterOptions {
archived_only: false,
allowed_sources: &[],
model_providers: None,
anchor: None,
sort_key: SortKey::UpdatedAt,
sort_direction: SortDirection::Desc,
search_term: None,
},
)
.await
{

View File

@@ -7,6 +7,7 @@ pub use codex_rollout::RolloutRecorder;
pub use codex_rollout::RolloutRecorderParams;
pub use codex_rollout::SESSIONS_SUBDIR;
pub use codex_rollout::SessionMeta;
pub use codex_rollout::SortDirection;
pub use codex_rollout::ThreadItem;
pub use codex_rollout::ThreadSortKey;
pub use codex_rollout::ThreadsPage;

View File

@@ -178,6 +178,7 @@ impl AppServerClient {
cursor,
limit: None,
sort_key: None,
sort_direction: None,
model_providers: None,
source_kinds: None,
archived: None,

View File

@@ -1226,6 +1226,7 @@ async fn resolve_resume_thread_id(
cursor,
limit: Some(100),
sort_key: Some(ThreadSortKey::UpdatedAt),
sort_direction: None,
model_providers: model_providers.clone(),
source_kinds: Some(all_thread_source_kinds()),
archived: Some(false),
@@ -1287,6 +1288,7 @@ async fn resolve_resume_thread_id(
cursor,
limit: Some(100),
sort_key: Some(ThreadSortKey::UpdatedAt),
sort_direction: None,
model_providers: model_providers.clone(),
source_kinds: Some(all_thread_source_kinds()),
archived: Some(false),

View File

@@ -34,6 +34,7 @@ pub use config::Config;
pub use config::RolloutConfig;
pub use config::RolloutConfigView;
pub use list::Cursor;
pub use list::SortDirection;
pub use list::ThreadItem;
pub use list::ThreadListConfig;
pub use list::ThreadListLayout;

View File

@@ -112,6 +112,12 @@ pub enum ThreadSortKey {
UpdatedAt,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum SortDirection {
Asc,
Desc,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ThreadListLayout {
NestedByDate,
@@ -136,6 +142,10 @@ impl Cursor {
fn new(ts: OffsetDateTime, id: Uuid) -> Self {
Self { ts, id }
}
pub(crate) fn parts(&self) -> (OffsetDateTime, Uuid) {
(self.ts, self.id)
}
}
/// Keeps track of where a paginated listing left off. As the file scan goes newest -> oldest,

View File

@@ -32,6 +32,7 @@ use tracing::warn;
use super::ARCHIVED_SESSIONS_SUBDIR;
use super::SESSIONS_SUBDIR;
use super::list::Cursor;
use super::list::SortDirection;
use super::list::ThreadItem;
use super::list::ThreadListConfig;
use super::list::ThreadListLayout;
@@ -219,6 +220,7 @@ impl RolloutRecorder {
page_size: usize,
cursor: Option<&Cursor>,
sort_key: ThreadSortKey,
sort_direction: SortDirection,
allowed_sources: &[SessionSource],
model_providers: Option<&[String]>,
default_provider: &str,
@@ -229,6 +231,7 @@ impl RolloutRecorder {
page_size,
cursor,
sort_key,
sort_direction,
allowed_sources,
model_providers,
default_provider,
@@ -245,6 +248,7 @@ impl RolloutRecorder {
page_size: usize,
cursor: Option<&Cursor>,
sort_key: ThreadSortKey,
sort_direction: SortDirection,
allowed_sources: &[SessionSource],
model_providers: Option<&[String]>,
default_provider: &str,
@@ -255,6 +259,7 @@ impl RolloutRecorder {
page_size,
cursor,
sort_key,
sort_direction,
allowed_sources,
model_providers,
default_provider,
@@ -270,6 +275,7 @@ impl RolloutRecorder {
page_size: usize,
cursor: Option<&Cursor>,
sort_key: ThreadSortKey,
sort_direction: SortDirection,
allowed_sources: &[SessionSource],
model_providers: Option<&[String]>,
default_provider: &str,
@@ -286,6 +292,7 @@ impl RolloutRecorder {
page_size,
cursor,
sort_key,
sort_direction,
allowed_sources,
model_providers,
archived,
@@ -300,38 +307,43 @@ impl RolloutRecorder {
// Filesystem-first listing intentionally overfetches so we can repair stale/missing
// SQLite rollout paths before the final DB-backed page is returned.
let fs_page_size = page_size.saturating_mul(2).max(page_size);
let fs_page = if archived {
let root = codex_home.join(ARCHIVED_SESSIONS_SUBDIR);
get_threads_in_root(
root,
fs_page_size,
cursor,
sort_key,
ThreadListConfig {
let fs_page = match sort_direction {
SortDirection::Asc => {
list_threads_from_files_asc(
codex_home,
page_size,
cursor,
sort_key,
allowed_sources,
model_providers,
default_provider,
layout: ThreadListLayout::Flat,
},
)
.await?
} else {
get_threads(
codex_home,
fs_page_size,
cursor,
sort_key,
allowed_sources,
model_providers,
default_provider,
)
.await?
archived,
search_term,
)
.await?
}
SortDirection::Desc => {
list_threads_from_files_desc(
codex_home,
fs_page_size,
cursor,
sort_key,
allowed_sources,
model_providers,
default_provider,
archived,
)
.await?
}
};
if state_db_ctx.is_none() {
// Keep legacy behavior when SQLite is unavailable: return filesystem results
// at the requested page size.
return Ok(truncate_fs_page(fs_page, page_size, sort_key));
return Ok(match sort_direction {
SortDirection::Asc => fs_page,
SortDirection::Desc => truncate_fs_page(fs_page, page_size, sort_key),
});
}
// Warm the DB by repairing every filesystem hit before querying SQLite.
@@ -351,6 +363,7 @@ impl RolloutRecorder {
page_size,
cursor,
sort_key,
sort_direction,
allowed_sources,
model_providers,
archived,
@@ -363,7 +376,10 @@ impl RolloutRecorder {
// If SQLite listing still fails, return the filesystem page rather than failing the list.
tracing::error!("Falling back on rollout system");
tracing::warn!("state db discrepancy during list_threads_with_db_fallback: falling_back");
Ok(truncate_fs_page(fs_page, page_size, sort_key))
Ok(match sort_direction {
SortDirection::Asc => fs_page,
SortDirection::Desc => truncate_fs_page(fs_page, page_size, sort_key),
})
}
/// Find the newest recorded thread path, optionally filtering to a matching cwd.
@@ -389,6 +405,7 @@ impl RolloutRecorder {
page_size,
db_cursor.as_ref(),
sort_key,
SortDirection::Desc,
allowed_sources,
model_providers,
/*archived*/ false,
@@ -772,6 +789,131 @@ fn truncate_fs_page(
page
}
#[allow(clippy::too_many_arguments)]
async fn list_threads_from_files_desc(
codex_home: &Path,
page_size: usize,
cursor: Option<&Cursor>,
sort_key: ThreadSortKey,
allowed_sources: &[SessionSource],
model_providers: Option<&[String]>,
default_provider: &str,
archived: bool,
) -> std::io::Result<ThreadsPage> {
if archived {
let root = codex_home.join(ARCHIVED_SESSIONS_SUBDIR);
get_threads_in_root(
root,
page_size,
cursor,
sort_key,
ThreadListConfig {
allowed_sources,
model_providers,
default_provider,
layout: ThreadListLayout::Flat,
},
)
.await
} else {
get_threads(
codex_home,
page_size,
cursor,
sort_key,
allowed_sources,
model_providers,
default_provider,
)
.await
}
}
#[allow(clippy::too_many_arguments)]
async fn list_threads_from_files_asc(
codex_home: &Path,
page_size: usize,
cursor: Option<&Cursor>,
sort_key: ThreadSortKey,
allowed_sources: &[SessionSource],
model_providers: Option<&[String]>,
default_provider: &str,
archived: bool,
_search_term: Option<&str>,
) -> std::io::Result<ThreadsPage> {
let mut all_items = Vec::new();
let mut scanned_files = 0usize;
let mut reached_scan_cap = false;
let mut page_cursor = None;
let scan_page_size = page_size.saturating_mul(8).clamp(256, 2048);
loop {
let page = list_threads_from_files_desc(
codex_home,
scan_page_size,
page_cursor.as_ref(),
sort_key,
allowed_sources,
model_providers,
default_provider,
archived,
)
.await?;
scanned_files = scanned_files.saturating_add(page.num_scanned_files);
reached_scan_cap |= page.reached_scan_cap;
all_items.extend(page.items);
page_cursor = page.next_cursor;
if page_cursor.is_none() {
break;
}
}
all_items.sort_by_key(|item| thread_item_sort_key(item, sort_key));
if let Some(cursor) = cursor {
let anchor = cursor.parts();
all_items
.retain(|item| thread_item_sort_key(item, sort_key).is_some_and(|key| key > anchor));
}
let more_matches_available = all_items.len() > page_size || reached_scan_cap;
all_items.truncate(page_size);
let next_cursor = if more_matches_available {
all_items
.last()
.and_then(|item| cursor_from_thread_item(item, sort_key))
} else {
None
};
Ok(ThreadsPage {
items: all_items,
next_cursor,
num_scanned_files: scanned_files,
reached_scan_cap,
})
}
fn thread_item_sort_key(
item: &ThreadItem,
sort_key: ThreadSortKey,
) -> Option<(OffsetDateTime, uuid::Uuid)> {
let file_name = item.path.file_name()?.to_str()?;
let (created_at, id) = parse_timestamp_uuid_from_filename(file_name)?;
let timestamp = match sort_key {
ThreadSortKey::CreatedAt => created_at,
ThreadSortKey::UpdatedAt => {
let updated_at = item.updated_at.as_deref().or(item.created_at.as_deref())?;
OffsetDateTime::parse(updated_at, &Rfc3339).ok()?
}
};
Some((timestamp, id))
}
fn cursor_from_thread_item(item: &ThreadItem, sort_key: ThreadSortKey) -> Option<Cursor> {
let (timestamp, id) = thread_item_sort_key(item, sort_key)?;
let cursor_token = format!("{}|{id}", timestamp.format(&Rfc3339).ok()?);
parse_cursor(cursor_token.as_str())
}
struct LogFileInfo {
/// Full path to the rollout file.
path: PathBuf,

View File

@@ -372,6 +372,7 @@ async fn list_threads_db_disabled_does_not_skip_paginated_items() -> std::io::Re
/*page_size*/ 1,
/*cursor*/ None,
ThreadSortKey::CreatedAt,
SortDirection::Desc,
&[],
/*model_providers*/ None,
default_provider.as_str(),
@@ -387,6 +388,7 @@ async fn list_threads_db_disabled_does_not_skip_paginated_items() -> std::io::Re
/*page_size*/ 1,
Some(&cursor),
ThreadSortKey::CreatedAt,
SortDirection::Desc,
&[],
/*model_providers*/ None,
default_provider.as_str(),
@@ -444,6 +446,7 @@ async fn list_threads_db_enabled_drops_missing_rollout_paths() -> std::io::Resul
/*page_size*/ 10,
/*cursor*/ None,
ThreadSortKey::CreatedAt,
SortDirection::Desc,
&[],
/*model_providers*/ None,
default_provider.as_str(),
@@ -506,6 +509,7 @@ async fn list_threads_db_enabled_repairs_stale_rollout_paths() -> std::io::Resul
/*page_size*/ 1,
/*cursor*/ None,
ThreadSortKey::CreatedAt,
SortDirection::Desc,
&[],
/*model_providers*/ None,
default_provider.as_str(),

View File

@@ -1,6 +1,7 @@
use crate::config::RolloutConfig;
use crate::config::RolloutConfigView;
use crate::list::Cursor;
use crate::list::SortDirection;
use crate::list::ThreadSortKey;
use crate::metadata;
use chrono::DateTime;
@@ -202,6 +203,7 @@ pub async fn list_threads_db(
page_size: usize,
cursor: Option<&Cursor>,
sort_key: ThreadSortKey,
sort_direction: SortDirection,
allowed_sources: &[SessionSource],
model_providers: Option<&[String]>,
archived: bool,
@@ -229,15 +231,21 @@ pub async fn list_threads_db(
match ctx
.list_threads(
page_size,
anchor.as_ref(),
match sort_key {
ThreadSortKey::CreatedAt => codex_state::SortKey::CreatedAt,
ThreadSortKey::UpdatedAt => codex_state::SortKey::UpdatedAt,
codex_state::ThreadFilterOptions {
archived_only: archived,
allowed_sources: allowed_sources.as_slice(),
model_providers: model_providers.as_deref(),
anchor: anchor.as_ref(),
sort_key: match sort_key {
ThreadSortKey::CreatedAt => codex_state::SortKey::CreatedAt,
ThreadSortKey::UpdatedAt => codex_state::SortKey::UpdatedAt,
},
sort_direction: match sort_direction {
SortDirection::Asc => codex_state::SortDirection::Asc,
SortDirection::Desc => codex_state::SortDirection::Desc,
},
search_term,
},
allowed_sources.as_slice(),
model_providers.as_deref(),
archived,
search_term,
)
.await
{

View File

@@ -37,6 +37,7 @@ pub use model::BackfillStats;
pub use model::BackfillStatus;
pub use model::DirectionalThreadSpawnEdgeStatus;
pub use model::ExtractionOutcome;
pub use model::SortDirection;
pub use model::SortKey;
pub use model::Stage1JobClaim;
pub use model::Stage1JobClaimOutcome;
@@ -47,6 +48,7 @@ pub use model::ThreadMetadata;
pub use model::ThreadMetadataBuilder;
pub use model::ThreadsPage;
pub use runtime::RemoteControlEnrollmentRecord;
pub use runtime::ThreadFilterOptions;
pub use runtime::logs_db_filename;
pub use runtime::logs_db_path;
pub use runtime::state_db_filename;

View File

@@ -28,6 +28,7 @@ pub use memories::Stage1StartupClaimParams;
pub use thread_metadata::Anchor;
pub use thread_metadata::BackfillStats;
pub use thread_metadata::ExtractionOutcome;
pub use thread_metadata::SortDirection;
pub use thread_metadata::SortKey;
pub use thread_metadata::ThreadMetadata;
pub use thread_metadata::ThreadMetadataBuilder;

View File

@@ -21,6 +21,13 @@ pub enum SortKey {
UpdatedAt,
}
/// Sort direction to use when listing threads.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum SortDirection {
Asc,
Desc,
}
/// A pagination anchor used for keyset pagination.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Anchor {

View File

@@ -60,6 +60,7 @@ mod test_support;
mod threads;
pub use remote_control::RemoteControlEnrollmentRecord;
pub use threads::ThreadFilterOptions;
// "Partition" is the retained-log-content bucket we cap at 10 MiB:
// - one bucket per non-null thread_id

View File

@@ -1,6 +1,8 @@
use super::threads::ThreadFilterOptions;
use super::threads::push_thread_filters;
use super::threads::push_thread_order_and_limit;
use super::*;
use crate::SortDirection;
use crate::model::Phase2InputSelection;
use crate::model::Phase2JobClaimOutcome;
use crate::model::Stage1JobClaim;
@@ -198,12 +200,15 @@ LEFT JOIN jobs
);
push_thread_filters(
&mut builder,
/*archived_only*/ false,
allowed_sources,
/*model_providers*/ None,
/*anchor*/ None,
SortKey::UpdatedAt,
/*search_term*/ None,
ThreadFilterOptions {
archived_only: false,
allowed_sources,
model_providers: None,
anchor: None,
sort_key: SortKey::UpdatedAt,
sort_direction: SortDirection::Desc,
search_term: None,
},
);
builder.push(" AND threads.memory_mode = 'enabled'");
builder
@@ -215,7 +220,12 @@ LEFT JOIN jobs
builder.push(" AND updated_at <= ").push_bind(idle_cutoff);
builder.push(" AND COALESCE(stage1_outputs.source_updated_at, -1) < updated_at");
builder.push(" AND COALESCE(jobs.last_success_watermark, -1) < updated_at");
push_thread_order_and_limit(&mut builder, SortKey::UpdatedAt, scan_limit);
push_thread_order_and_limit(
&mut builder,
SortKey::UpdatedAt,
SortDirection::Desc,
scan_limit,
);
let items = builder
.build()

View File

@@ -1,4 +1,5 @@
use super::*;
use crate::SortDirection;
use codex_protocol::protocol::SessionSource;
impl StateRuntime {
@@ -366,12 +367,15 @@ FROM threads
);
push_thread_filters(
&mut builder,
archived_only,
allowed_sources,
model_providers,
/*anchor*/ None,
crate::SortKey::UpdatedAt,
/*search_term*/ None,
ThreadFilterOptions {
archived_only,
allowed_sources,
model_providers,
anchor: None,
sort_key: crate::SortKey::UpdatedAt,
sort_direction: crate::SortDirection::Desc,
search_term: None,
},
);
builder.push(" AND title = ");
builder.push_bind(title);
@@ -379,7 +383,12 @@ FROM threads
builder.push(" AND cwd = ");
builder.push_bind(cwd.display().to_string());
}
push_thread_order_and_limit(&mut builder, crate::SortKey::UpdatedAt, /*limit*/ 1);
push_thread_order_and_limit(
&mut builder,
crate::SortKey::UpdatedAt,
crate::SortDirection::Desc,
/*limit*/ 1,
);
let row = builder.build().fetch_optional(self.pool.as_ref()).await?;
row.map(|row| ThreadRow::try_from_row(&row).and_then(crate::ThreadMetadata::try_from))
@@ -387,18 +396,14 @@ FROM threads
}
/// List threads using the underlying database.
#[allow(clippy::too_many_arguments)]
pub async fn list_threads(
&self,
page_size: usize,
anchor: Option<&crate::Anchor>,
sort_key: crate::SortKey,
allowed_sources: &[String],
model_providers: Option<&[String]>,
archived_only: bool,
search_term: Option<&str>,
filters: ThreadFilterOptions<'_>,
) -> anyhow::Result<crate::ThreadsPage> {
let limit = page_size.saturating_add(1);
let sort_key = filters.sort_key;
let sort_direction = filters.sort_direction;
let mut builder = QueryBuilder::<Sqlite>::new(
r#"
@@ -428,16 +433,8 @@ SELECT
FROM threads
"#,
);
push_thread_filters(
&mut builder,
archived_only,
allowed_sources,
model_providers,
anchor,
sort_key,
search_term,
);
push_thread_order_and_limit(&mut builder, sort_key, limit);
push_thread_filters(&mut builder, filters);
push_thread_order_and_limit(&mut builder, sort_key, sort_direction, limit);
let rows = builder.build().fetch_all(self.pool.as_ref()).await?;
let mut items = rows
@@ -473,14 +470,17 @@ FROM threads
let mut builder = QueryBuilder::<Sqlite>::new("SELECT id FROM threads");
push_thread_filters(
&mut builder,
archived_only,
allowed_sources,
model_providers,
anchor,
sort_key,
/*search_term*/ None,
ThreadFilterOptions {
archived_only,
allowed_sources,
model_providers,
anchor,
sort_key,
sort_direction: SortDirection::Desc,
search_term: None,
},
);
push_thread_order_and_limit(&mut builder, sort_key, limit);
push_thread_order_and_limit(&mut builder, sort_key, SortDirection::Desc, limit);
let rows = builder.build().fetch_all(self.pool.as_ref()).await?;
rows.into_iter()
@@ -941,31 +941,37 @@ fn thread_spawn_parent_thread_id_from_source_str(source: &str) -> Option<ThreadI
}
}
#[derive(Clone, Copy)]
pub struct ThreadFilterOptions<'a> {
pub archived_only: bool,
pub allowed_sources: &'a [String],
pub model_providers: Option<&'a [String]>,
pub anchor: Option<&'a crate::Anchor>,
pub sort_key: SortKey,
pub sort_direction: SortDirection,
pub search_term: Option<&'a str>,
}
pub(super) fn push_thread_filters<'a>(
builder: &mut QueryBuilder<'a, Sqlite>,
archived_only: bool,
allowed_sources: &'a [String],
model_providers: Option<&'a [String]>,
anchor: Option<&crate::Anchor>,
sort_key: SortKey,
search_term: Option<&'a str>,
filters: ThreadFilterOptions<'a>,
) {
builder.push(" WHERE 1 = 1");
if archived_only {
if filters.archived_only {
builder.push(" AND archived = 1");
} else {
builder.push(" AND archived = 0");
}
builder.push(" AND first_user_message <> ''");
if !allowed_sources.is_empty() {
if !filters.allowed_sources.is_empty() {
builder.push(" AND source IN (");
let mut separated = builder.separated(", ");
for source in allowed_sources {
for source in filters.allowed_sources {
separated.push_bind(source);
}
separated.push_unseparated(")");
}
if let Some(model_providers) = model_providers
if let Some(model_providers) = filters.model_providers
&& !model_providers.is_empty()
{
builder.push(" AND model_provider IN (");
@@ -975,26 +981,34 @@ pub(super) fn push_thread_filters<'a>(
}
separated.push_unseparated(")");
}
if let Some(search_term) = search_term {
if let Some(search_term) = filters.search_term {
builder.push(" AND instr(title, ");
builder.push_bind(search_term);
builder.push(") > 0");
}
if let Some(anchor) = anchor {
if let Some(anchor) = filters.anchor {
let anchor_ts = datetime_to_epoch_seconds(anchor.ts);
let column = match sort_key {
let column = match filters.sort_key {
SortKey::CreatedAt => "created_at",
SortKey::UpdatedAt => "updated_at",
};
let operator = match filters.sort_direction {
SortDirection::Asc => ">",
SortDirection::Desc => "<",
};
builder.push(" AND (");
builder.push(column);
builder.push(" < ");
builder.push(" ");
builder.push(operator);
builder.push(" ");
builder.push_bind(anchor_ts);
builder.push(" OR (");
builder.push(column);
builder.push(" = ");
builder.push_bind(anchor_ts);
builder.push(" AND id < ");
builder.push(" AND id ");
builder.push(operator);
builder.push(" ");
builder.push_bind(anchor.id.to_string());
builder.push("))");
}
@@ -1003,15 +1017,23 @@ pub(super) fn push_thread_filters<'a>(
pub(super) fn push_thread_order_and_limit(
builder: &mut QueryBuilder<'_, Sqlite>,
sort_key: SortKey,
sort_direction: SortDirection,
limit: usize,
) {
let order_column = match sort_key {
SortKey::CreatedAt => "created_at",
SortKey::UpdatedAt => "updated_at",
};
let order_direction = match sort_direction {
SortDirection::Asc => "ASC",
SortDirection::Desc => "DESC",
};
builder.push(" ORDER BY ");
builder.push(order_column);
builder.push(" DESC, id DESC");
builder.push(" ");
builder.push(order_direction);
builder.push(", id ");
builder.push(order_direction);
builder.push(" LIMIT ");
builder.push_bind(limit as i64);
}
@@ -1019,6 +1041,7 @@ pub(super) fn push_thread_order_and_limit(
#[cfg(test)]
mod tests {
use super::*;
use crate::Anchor;
use crate::DirectionalThreadSpawnEdgeStatus;
use crate::runtime::test_support::test_thread_metadata;
use crate::runtime::test_support::unique_temp_dir;
@@ -1029,6 +1052,7 @@ mod tests {
use codex_protocol::protocol::SessionSource;
use pretty_assertions::assert_eq;
use std::path::PathBuf;
use uuid::Uuid;
#[tokio::test]
async fn upsert_thread_keeps_creation_memory_mode_for_existing_rows() {
@@ -1068,6 +1092,89 @@ mod tests {
assert_eq!(memory_mode, "disabled");
}
#[tokio::test]
async fn list_threads_updated_after_returns_oldest_changes_first() {
let codex_home = unique_temp_dir();
let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string())
.await
.expect("state db should initialize");
let older_id =
ThreadId::from_string("00000000-0000-0000-0000-000000000001").expect("valid thread id");
let same_second_lower_id =
ThreadId::from_string("00000000-0000-0000-0000-000000000002").expect("valid thread id");
let same_second_higher_id =
ThreadId::from_string("00000000-0000-0000-0000-000000000003").expect("valid thread id");
let older_updated_at =
DateTime::<Utc>::from_timestamp(1_700_000_100, 0).expect("valid older timestamp");
let newer_updated_at =
DateTime::<Utc>::from_timestamp(1_700_000_200, 0).expect("valid newer timestamp");
for (thread_id, updated_at) in [
(older_id, older_updated_at),
(same_second_higher_id, newer_updated_at),
(same_second_lower_id, newer_updated_at),
] {
let mut metadata = test_thread_metadata(&codex_home, thread_id, codex_home.clone());
metadata.updated_at = updated_at;
metadata.first_user_message = Some("hello".to_string());
runtime
.upsert_thread(&metadata)
.await
.expect("thread insert should succeed");
}
let anchor = Anchor {
ts: older_updated_at,
id: Uuid::parse_str(&older_id.to_string()).expect("valid uuid"),
};
let model_providers = ["test-provider".to_string()];
let page = runtime
.list_threads(
/*page_size*/ 1,
ThreadFilterOptions {
archived_only: false,
allowed_sources: &[],
model_providers: Some(&model_providers),
anchor: Some(&anchor),
sort_key: SortKey::UpdatedAt,
sort_direction: SortDirection::Asc,
search_term: None,
},
)
.await
.expect("list should succeed");
let ids = page.items.iter().map(|item| item.id).collect::<Vec<_>>();
assert_eq!(ids, vec![same_second_lower_id]);
assert_eq!(
page.next_anchor,
Some(Anchor {
ts: newer_updated_at,
id: Uuid::parse_str(&same_second_lower_id.to_string()).expect("valid uuid"),
})
);
let page = runtime
.list_threads(
/*page_size*/ 1,
ThreadFilterOptions {
archived_only: false,
allowed_sources: &[],
model_providers: Some(&model_providers),
anchor: page.next_anchor.as_ref(),
sort_key: SortKey::UpdatedAt,
sort_direction: SortDirection::Asc,
search_term: None,
},
)
.await
.expect("second page should succeed");
let ids = page.items.iter().map(|item| item.id).collect::<Vec<_>>();
assert_eq!(ids, vec![same_second_higher_id]);
assert_eq!(page.next_anchor, None);
}
#[tokio::test]
async fn apply_rollout_items_restores_memory_mode_from_session_meta() {
let codex_home = unique_temp_dir();

View File

@@ -506,6 +506,7 @@ async fn lookup_session_target_by_name_with_app_server(
cursor: cursor.clone(),
limit: Some(100),
sort_key: Some(AppServerThreadSortKey::UpdatedAt),
sort_direction: None,
model_providers: None,
source_kinds: Some(vec![ThreadSourceKind::Cli, ThreadSourceKind::VsCode]),
archived: Some(false),
@@ -601,6 +602,7 @@ fn latest_session_lookup_params(
cursor: None,
limit: Some(1),
sort_key: Some(AppServerThreadSortKey::UpdatedAt),
sort_direction: None,
model_providers: if is_remote {
None
} else {

View File

@@ -335,6 +335,7 @@ fn spawn_rollout_page_loader(
PAGE_SIZE,
cursor,
request.sort_key,
codex_rollout::SortDirection::Desc,
INTERACTIVE_SESSION_SOURCES.as_slice(),
default_provider.as_ref().map(std::slice::from_ref),
default_provider.as_deref().unwrap_or_default(),
@@ -1135,6 +1136,7 @@ fn thread_list_params(
ThreadSortKey::CreatedAt => AppServerThreadSortKey::CreatedAt,
ThreadSortKey::UpdatedAt => AppServerThreadSortKey::UpdatedAt,
}),
sort_direction: None,
model_providers: match provider_filter {
ProviderFilter::Any => None,
ProviderFilter::MatchDefault(default_provider) => Some(vec![default_provider]),