mirror of
https://github.com/openai/codex.git
synced 2026-04-14 01:35:00 +00:00
Compare commits
7 Commits
dev/shaqay
...
ddr/thread
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
0140e3bba7 | ||
|
|
a7337fe18f | ||
|
|
2b5f3d3baf | ||
|
|
b505ee8069 | ||
|
|
6d7a94e78b | ||
|
|
b2f1ff81e8 | ||
|
|
ca7a98c5ea |
@@ -2569,6 +2569,13 @@
|
||||
},
|
||||
"type": "object"
|
||||
},
|
||||
"SortDirection": {
|
||||
"enum": [
|
||||
"asc",
|
||||
"desc"
|
||||
],
|
||||
"type": "string"
|
||||
},
|
||||
"TextElement": {
|
||||
"properties": {
|
||||
"byteRange": {
|
||||
@@ -2764,6 +2771,17 @@
|
||||
"null"
|
||||
]
|
||||
},
|
||||
"sortDirection": {
|
||||
"anyOf": [
|
||||
{
|
||||
"$ref": "#/definitions/SortDirection"
|
||||
},
|
||||
{
|
||||
"type": "null"
|
||||
}
|
||||
],
|
||||
"description": "Optional sort direction; defaults to descending (newest first)."
|
||||
},
|
||||
"sortKey": {
|
||||
"anyOf": [
|
||||
{
|
||||
@@ -3261,6 +3279,44 @@
|
||||
],
|
||||
"type": "string"
|
||||
},
|
||||
"ThreadTurnsListParams": {
|
||||
"properties": {
|
||||
"cursor": {
|
||||
"description": "Opaque cursor to pass to the next call to continue after the last turn.",
|
||||
"type": [
|
||||
"string",
|
||||
"null"
|
||||
]
|
||||
},
|
||||
"limit": {
|
||||
"description": "Optional turn page size.",
|
||||
"format": "uint32",
|
||||
"minimum": 0.0,
|
||||
"type": [
|
||||
"integer",
|
||||
"null"
|
||||
]
|
||||
},
|
||||
"sortDirection": {
|
||||
"anyOf": [
|
||||
{
|
||||
"$ref": "#/definitions/SortDirection"
|
||||
},
|
||||
{
|
||||
"type": "null"
|
||||
}
|
||||
],
|
||||
"description": "Optional turn pagination direction; defaults to descending."
|
||||
},
|
||||
"threadId": {
|
||||
"type": "string"
|
||||
}
|
||||
},
|
||||
"required": [
|
||||
"threadId"
|
||||
],
|
||||
"type": "object"
|
||||
},
|
||||
"ThreadUnarchiveParams": {
|
||||
"properties": {
|
||||
"threadId": {
|
||||
@@ -3952,6 +4008,30 @@
|
||||
"title": "Thread/readRequest",
|
||||
"type": "object"
|
||||
},
|
||||
{
|
||||
"properties": {
|
||||
"id": {
|
||||
"$ref": "#/definitions/RequestId"
|
||||
},
|
||||
"method": {
|
||||
"enum": [
|
||||
"thread/turns/list"
|
||||
],
|
||||
"title": "Thread/turns/listRequestMethod",
|
||||
"type": "string"
|
||||
},
|
||||
"params": {
|
||||
"$ref": "#/definitions/ThreadTurnsListParams"
|
||||
}
|
||||
},
|
||||
"required": [
|
||||
"id",
|
||||
"method",
|
||||
"params"
|
||||
],
|
||||
"title": "Thread/turns/listRequest",
|
||||
"type": "object"
|
||||
},
|
||||
{
|
||||
"properties": {
|
||||
"id": {
|
||||
|
||||
@@ -578,6 +578,30 @@
|
||||
"title": "Thread/readRequest",
|
||||
"type": "object"
|
||||
},
|
||||
{
|
||||
"properties": {
|
||||
"id": {
|
||||
"$ref": "#/definitions/v2/RequestId"
|
||||
},
|
||||
"method": {
|
||||
"enum": [
|
||||
"thread/turns/list"
|
||||
],
|
||||
"title": "Thread/turns/listRequestMethod",
|
||||
"type": "string"
|
||||
},
|
||||
"params": {
|
||||
"$ref": "#/definitions/v2/ThreadTurnsListParams"
|
||||
}
|
||||
},
|
||||
"required": [
|
||||
"id",
|
||||
"method",
|
||||
"params"
|
||||
],
|
||||
"title": "Thread/turns/listRequest",
|
||||
"type": "object"
|
||||
},
|
||||
{
|
||||
"properties": {
|
||||
"id": {
|
||||
@@ -12332,6 +12356,13 @@
|
||||
"title": "SkillsListResponse",
|
||||
"type": "object"
|
||||
},
|
||||
"SortDirection": {
|
||||
"enum": [
|
||||
"asc",
|
||||
"desc"
|
||||
],
|
||||
"type": "string"
|
||||
},
|
||||
"SubAgentSource": {
|
||||
"oneOf": [
|
||||
{
|
||||
@@ -13543,6 +13574,17 @@
|
||||
"null"
|
||||
]
|
||||
},
|
||||
"sortDirection": {
|
||||
"anyOf": [
|
||||
{
|
||||
"$ref": "#/definitions/v2/SortDirection"
|
||||
},
|
||||
{
|
||||
"type": "null"
|
||||
}
|
||||
],
|
||||
"description": "Optional sort direction; defaults to descending (newest first)."
|
||||
},
|
||||
"sortKey": {
|
||||
"anyOf": [
|
||||
{
|
||||
@@ -13571,6 +13613,13 @@
|
||||
"ThreadListResponse": {
|
||||
"$schema": "http://json-schema.org/draft-07/schema#",
|
||||
"properties": {
|
||||
"backwardsCursor": {
|
||||
"description": "Opaque cursor to pass as `cursor` when reversing `sortDirection`. This is only populated when the page contains at least one thread. Use it with the opposite `sortDirection`; for timestamp sorts it anchors at the start of the page timestamp so same-second updates are not skipped.",
|
||||
"type": [
|
||||
"string",
|
||||
"null"
|
||||
]
|
||||
},
|
||||
"data": {
|
||||
"items": {
|
||||
"$ref": "#/definitions/v2/Thread"
|
||||
@@ -14585,6 +14634,76 @@
|
||||
"title": "ThreadTokenUsageUpdatedNotification",
|
||||
"type": "object"
|
||||
},
|
||||
"ThreadTurnsListParams": {
|
||||
"$schema": "http://json-schema.org/draft-07/schema#",
|
||||
"properties": {
|
||||
"cursor": {
|
||||
"description": "Opaque cursor to pass to the next call to continue after the last turn.",
|
||||
"type": [
|
||||
"string",
|
||||
"null"
|
||||
]
|
||||
},
|
||||
"limit": {
|
||||
"description": "Optional turn page size.",
|
||||
"format": "uint32",
|
||||
"minimum": 0.0,
|
||||
"type": [
|
||||
"integer",
|
||||
"null"
|
||||
]
|
||||
},
|
||||
"sortDirection": {
|
||||
"anyOf": [
|
||||
{
|
||||
"$ref": "#/definitions/v2/SortDirection"
|
||||
},
|
||||
{
|
||||
"type": "null"
|
||||
}
|
||||
],
|
||||
"description": "Optional turn pagination direction; defaults to descending."
|
||||
},
|
||||
"threadId": {
|
||||
"type": "string"
|
||||
}
|
||||
},
|
||||
"required": [
|
||||
"threadId"
|
||||
],
|
||||
"title": "ThreadTurnsListParams",
|
||||
"type": "object"
|
||||
},
|
||||
"ThreadTurnsListResponse": {
|
||||
"$schema": "http://json-schema.org/draft-07/schema#",
|
||||
"properties": {
|
||||
"backwardsCursor": {
|
||||
"description": "Opaque cursor to pass as `cursor` when reversing `sortDirection`. This is only populated when the page contains at least one turn. Use it with the opposite `sortDirection` to include the anchor turn again and catch updates to that turn.",
|
||||
"type": [
|
||||
"string",
|
||||
"null"
|
||||
]
|
||||
},
|
||||
"data": {
|
||||
"items": {
|
||||
"$ref": "#/definitions/v2/Turn"
|
||||
},
|
||||
"type": "array"
|
||||
},
|
||||
"nextCursor": {
|
||||
"description": "Opaque cursor to pass to the next call to continue after the last turn. if None, there are no more turns to return.",
|
||||
"type": [
|
||||
"string",
|
||||
"null"
|
||||
]
|
||||
}
|
||||
},
|
||||
"required": [
|
||||
"data"
|
||||
],
|
||||
"title": "ThreadTurnsListResponse",
|
||||
"type": "object"
|
||||
},
|
||||
"ThreadUnarchiveParams": {
|
||||
"$schema": "http://json-schema.org/draft-07/schema#",
|
||||
"properties": {
|
||||
|
||||
@@ -1160,6 +1160,30 @@
|
||||
"title": "Thread/readRequest",
|
||||
"type": "object"
|
||||
},
|
||||
{
|
||||
"properties": {
|
||||
"id": {
|
||||
"$ref": "#/definitions/RequestId"
|
||||
},
|
||||
"method": {
|
||||
"enum": [
|
||||
"thread/turns/list"
|
||||
],
|
||||
"title": "Thread/turns/listRequestMethod",
|
||||
"type": "string"
|
||||
},
|
||||
"params": {
|
||||
"$ref": "#/definitions/ThreadTurnsListParams"
|
||||
}
|
||||
},
|
||||
"required": [
|
||||
"id",
|
||||
"method",
|
||||
"params"
|
||||
],
|
||||
"title": "Thread/turns/listRequest",
|
||||
"type": "object"
|
||||
},
|
||||
{
|
||||
"properties": {
|
||||
"id": {
|
||||
@@ -10180,6 +10204,13 @@
|
||||
"title": "SkillsListResponse",
|
||||
"type": "object"
|
||||
},
|
||||
"SortDirection": {
|
||||
"enum": [
|
||||
"asc",
|
||||
"desc"
|
||||
],
|
||||
"type": "string"
|
||||
},
|
||||
"SubAgentSource": {
|
||||
"oneOf": [
|
||||
{
|
||||
@@ -11391,6 +11422,17 @@
|
||||
"null"
|
||||
]
|
||||
},
|
||||
"sortDirection": {
|
||||
"anyOf": [
|
||||
{
|
||||
"$ref": "#/definitions/SortDirection"
|
||||
},
|
||||
{
|
||||
"type": "null"
|
||||
}
|
||||
],
|
||||
"description": "Optional sort direction; defaults to descending (newest first)."
|
||||
},
|
||||
"sortKey": {
|
||||
"anyOf": [
|
||||
{
|
||||
@@ -11419,6 +11461,13 @@
|
||||
"ThreadListResponse": {
|
||||
"$schema": "http://json-schema.org/draft-07/schema#",
|
||||
"properties": {
|
||||
"backwardsCursor": {
|
||||
"description": "Opaque cursor to pass as `cursor` when reversing `sortDirection`. This is only populated when the page contains at least one thread. Use it with the opposite `sortDirection`; for timestamp sorts it anchors at the start of the page timestamp so same-second updates are not skipped.",
|
||||
"type": [
|
||||
"string",
|
||||
"null"
|
||||
]
|
||||
},
|
||||
"data": {
|
||||
"items": {
|
||||
"$ref": "#/definitions/Thread"
|
||||
@@ -12433,6 +12482,76 @@
|
||||
"title": "ThreadTokenUsageUpdatedNotification",
|
||||
"type": "object"
|
||||
},
|
||||
"ThreadTurnsListParams": {
|
||||
"$schema": "http://json-schema.org/draft-07/schema#",
|
||||
"properties": {
|
||||
"cursor": {
|
||||
"description": "Opaque cursor to pass to the next call to continue after the last turn.",
|
||||
"type": [
|
||||
"string",
|
||||
"null"
|
||||
]
|
||||
},
|
||||
"limit": {
|
||||
"description": "Optional turn page size.",
|
||||
"format": "uint32",
|
||||
"minimum": 0.0,
|
||||
"type": [
|
||||
"integer",
|
||||
"null"
|
||||
]
|
||||
},
|
||||
"sortDirection": {
|
||||
"anyOf": [
|
||||
{
|
||||
"$ref": "#/definitions/SortDirection"
|
||||
},
|
||||
{
|
||||
"type": "null"
|
||||
}
|
||||
],
|
||||
"description": "Optional turn pagination direction; defaults to descending."
|
||||
},
|
||||
"threadId": {
|
||||
"type": "string"
|
||||
}
|
||||
},
|
||||
"required": [
|
||||
"threadId"
|
||||
],
|
||||
"title": "ThreadTurnsListParams",
|
||||
"type": "object"
|
||||
},
|
||||
"ThreadTurnsListResponse": {
|
||||
"$schema": "http://json-schema.org/draft-07/schema#",
|
||||
"properties": {
|
||||
"backwardsCursor": {
|
||||
"description": "Opaque cursor to pass as `cursor` when reversing `sortDirection`. This is only populated when the page contains at least one turn. Use it with the opposite `sortDirection` to include the anchor turn again and catch updates to that turn.",
|
||||
"type": [
|
||||
"string",
|
||||
"null"
|
||||
]
|
||||
},
|
||||
"data": {
|
||||
"items": {
|
||||
"$ref": "#/definitions/Turn"
|
||||
},
|
||||
"type": "array"
|
||||
},
|
||||
"nextCursor": {
|
||||
"description": "Opaque cursor to pass to the next call to continue after the last turn. if None, there are no more turns to return.",
|
||||
"type": [
|
||||
"string",
|
||||
"null"
|
||||
]
|
||||
}
|
||||
},
|
||||
"required": [
|
||||
"data"
|
||||
],
|
||||
"title": "ThreadTurnsListResponse",
|
||||
"type": "object"
|
||||
},
|
||||
"ThreadUnarchiveParams": {
|
||||
"$schema": "http://json-schema.org/draft-07/schema#",
|
||||
"properties": {
|
||||
|
||||
@@ -1,6 +1,13 @@
|
||||
{
|
||||
"$schema": "http://json-schema.org/draft-07/schema#",
|
||||
"definitions": {
|
||||
"SortDirection": {
|
||||
"enum": [
|
||||
"asc",
|
||||
"desc"
|
||||
],
|
||||
"type": "string"
|
||||
},
|
||||
"ThreadSortKey": {
|
||||
"enum": [
|
||||
"created_at",
|
||||
@@ -72,6 +79,17 @@
|
||||
"null"
|
||||
]
|
||||
},
|
||||
"sortDirection": {
|
||||
"anyOf": [
|
||||
{
|
||||
"$ref": "#/definitions/SortDirection"
|
||||
},
|
||||
{
|
||||
"type": "null"
|
||||
}
|
||||
],
|
||||
"description": "Optional sort direction; defaults to descending (newest first)."
|
||||
},
|
||||
"sortKey": {
|
||||
"anyOf": [
|
||||
{
|
||||
|
||||
@@ -1931,6 +1931,13 @@
|
||||
}
|
||||
},
|
||||
"properties": {
|
||||
"backwardsCursor": {
|
||||
"description": "Opaque cursor to pass as `cursor` when reversing `sortDirection`. This is only populated when the page contains at least one thread. Use it with the opposite `sortDirection`; for timestamp sorts it anchors at the start of the page timestamp so same-second updates are not skipped.",
|
||||
"type": [
|
||||
"string",
|
||||
"null"
|
||||
]
|
||||
},
|
||||
"data": {
|
||||
"items": {
|
||||
"$ref": "#/definitions/Thread"
|
||||
|
||||
@@ -0,0 +1,49 @@
|
||||
{
|
||||
"$schema": "http://json-schema.org/draft-07/schema#",
|
||||
"definitions": {
|
||||
"SortDirection": {
|
||||
"enum": [
|
||||
"asc",
|
||||
"desc"
|
||||
],
|
||||
"type": "string"
|
||||
}
|
||||
},
|
||||
"properties": {
|
||||
"cursor": {
|
||||
"description": "Opaque cursor to pass to the next call to continue after the last turn.",
|
||||
"type": [
|
||||
"string",
|
||||
"null"
|
||||
]
|
||||
},
|
||||
"limit": {
|
||||
"description": "Optional turn page size.",
|
||||
"format": "uint32",
|
||||
"minimum": 0.0,
|
||||
"type": [
|
||||
"integer",
|
||||
"null"
|
||||
]
|
||||
},
|
||||
"sortDirection": {
|
||||
"anyOf": [
|
||||
{
|
||||
"$ref": "#/definitions/SortDirection"
|
||||
},
|
||||
{
|
||||
"type": "null"
|
||||
}
|
||||
],
|
||||
"description": "Optional turn pagination direction; defaults to descending."
|
||||
},
|
||||
"threadId": {
|
||||
"type": "string"
|
||||
}
|
||||
},
|
||||
"required": [
|
||||
"threadId"
|
||||
],
|
||||
"title": "ThreadTurnsListParams",
|
||||
"type": "object"
|
||||
}
|
||||
File diff suppressed because it is too large
Load Diff
File diff suppressed because one or more lines are too long
@@ -0,0 +1,5 @@
|
||||
// GENERATED CODE! DO NOT MODIFY BY HAND!
|
||||
|
||||
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
|
||||
|
||||
export type SortDirection = "asc" | "desc";
|
||||
@@ -1,6 +1,7 @@
|
||||
// GENERATED CODE! DO NOT MODIFY BY HAND!
|
||||
|
||||
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
|
||||
import type { SortDirection } from "./SortDirection";
|
||||
import type { ThreadSortKey } from "./ThreadSortKey";
|
||||
import type { ThreadSourceKind } from "./ThreadSourceKind";
|
||||
|
||||
@@ -17,6 +18,10 @@ limit?: number | null,
|
||||
* Optional sort key; defaults to created_at.
|
||||
*/
|
||||
sortKey?: ThreadSortKey | null,
|
||||
/**
|
||||
* Optional sort direction; defaults to descending (newest first).
|
||||
*/
|
||||
sortDirection?: SortDirection | null,
|
||||
/**
|
||||
* Optional provider filter; when set, only sessions recorded under these
|
||||
* providers are returned. When present but empty, includes all providers.
|
||||
|
||||
@@ -8,4 +8,11 @@ export type ThreadListResponse = { data: Array<Thread>,
|
||||
* Opaque cursor to pass to the next call to continue after the last item.
|
||||
* if None, there are no more items to return.
|
||||
*/
|
||||
nextCursor: string | null, };
|
||||
nextCursor: string | null,
|
||||
/**
|
||||
* Opaque cursor to pass as `cursor` when reversing `sortDirection`.
|
||||
* This is only populated when the page contains at least one thread.
|
||||
* Use it with the opposite `sortDirection`; for timestamp sorts it anchors
|
||||
* at the start of the page timestamp so same-second updates are not skipped.
|
||||
*/
|
||||
backwardsCursor: string | null, };
|
||||
|
||||
@@ -0,0 +1,18 @@
|
||||
// GENERATED CODE! DO NOT MODIFY BY HAND!
|
||||
|
||||
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
|
||||
import type { SortDirection } from "./SortDirection";
|
||||
|
||||
export type ThreadTurnsListParams = { threadId: string,
|
||||
/**
|
||||
* Opaque cursor to pass to the next call to continue after the last turn.
|
||||
*/
|
||||
cursor?: string | null,
|
||||
/**
|
||||
* Optional turn page size.
|
||||
*/
|
||||
limit?: number | null,
|
||||
/**
|
||||
* Optional turn pagination direction; defaults to descending.
|
||||
*/
|
||||
sortDirection?: SortDirection | null, };
|
||||
@@ -0,0 +1,18 @@
|
||||
// GENERATED CODE! DO NOT MODIFY BY HAND!
|
||||
|
||||
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
|
||||
import type { Turn } from "./Turn";
|
||||
|
||||
export type ThreadTurnsListResponse = { data: Array<Turn>,
|
||||
/**
|
||||
* Opaque cursor to pass to the next call to continue after the last turn.
|
||||
* if None, there are no more turns to return.
|
||||
*/
|
||||
nextCursor: string | null,
|
||||
/**
|
||||
* Opaque cursor to pass as `cursor` when reversing `sortDirection`.
|
||||
* This is only populated when the page contains at least one turn.
|
||||
* Use it with the opposite `sortDirection` to include the anchor turn again
|
||||
* and catch updates to that turn.
|
||||
*/
|
||||
backwardsCursor: string | null, };
|
||||
@@ -268,6 +268,7 @@ export type { SkillsListEntry } from "./SkillsListEntry";
|
||||
export type { SkillsListExtraRootsForCwd } from "./SkillsListExtraRootsForCwd";
|
||||
export type { SkillsListParams } from "./SkillsListParams";
|
||||
export type { SkillsListResponse } from "./SkillsListResponse";
|
||||
export type { SortDirection } from "./SortDirection";
|
||||
export type { TerminalInteractionNotification } from "./TerminalInteractionNotification";
|
||||
export type { TextElement } from "./TextElement";
|
||||
export type { TextPosition } from "./TextPosition";
|
||||
@@ -320,6 +321,8 @@ export type { ThreadStatus } from "./ThreadStatus";
|
||||
export type { ThreadStatusChangedNotification } from "./ThreadStatusChangedNotification";
|
||||
export type { ThreadTokenUsage } from "./ThreadTokenUsage";
|
||||
export type { ThreadTokenUsageUpdatedNotification } from "./ThreadTokenUsageUpdatedNotification";
|
||||
export type { ThreadTurnsListParams } from "./ThreadTurnsListParams";
|
||||
export type { ThreadTurnsListResponse } from "./ThreadTurnsListResponse";
|
||||
export type { ThreadUnarchiveParams } from "./ThreadUnarchiveParams";
|
||||
export type { ThreadUnarchiveResponse } from "./ThreadUnarchiveResponse";
|
||||
export type { ThreadUnarchivedNotification } from "./ThreadUnarchivedNotification";
|
||||
|
||||
@@ -317,6 +317,10 @@ client_request_definitions! {
|
||||
params: v2::ThreadReadParams,
|
||||
response: v2::ThreadReadResponse,
|
||||
},
|
||||
ThreadTurnsList => "thread/turns/list" {
|
||||
params: v2::ThreadTurnsListParams,
|
||||
response: v2::ThreadTurnsListResponse,
|
||||
},
|
||||
SkillsList => "skills/list" {
|
||||
params: v2::SkillsListParams,
|
||||
response: v2::SkillsListResponse,
|
||||
|
||||
@@ -975,8 +975,15 @@ impl ThreadHistoryBuilder {
|
||||
}
|
||||
|
||||
fn new_turn(&mut self, id: Option<String>) -> PendingTurn {
|
||||
let id = id.unwrap_or_else(|| {
|
||||
if self.next_rollout_index == 0 {
|
||||
Uuid::now_v7().to_string()
|
||||
} else {
|
||||
format!("rollout-{}", self.current_rollout_index)
|
||||
}
|
||||
});
|
||||
PendingTurn {
|
||||
id: id.unwrap_or_else(|| Uuid::now_v7().to_string()),
|
||||
id,
|
||||
items: Vec::new(),
|
||||
error: None,
|
||||
status: TurnStatus::Completed,
|
||||
@@ -1622,8 +1629,8 @@ mod tests {
|
||||
.collect::<Vec<_>>();
|
||||
let turns = build_turns_from_rollout_items(&items);
|
||||
assert_eq!(turns.len(), 2);
|
||||
assert!(Uuid::parse_str(&turns[0].id).is_ok());
|
||||
assert!(Uuid::parse_str(&turns[1].id).is_ok());
|
||||
assert_eq!(turns[0].id, "rollout-0");
|
||||
assert_eq!(turns[1].id, "rollout-5");
|
||||
assert_ne!(turns[0].id, turns[1].id);
|
||||
assert_eq!(turns[0].status, TurnStatus::Completed);
|
||||
assert_eq!(turns[1].status, TurnStatus::Completed);
|
||||
|
||||
@@ -3125,6 +3125,9 @@ pub struct ThreadListParams {
|
||||
/// Optional sort key; defaults to created_at.
|
||||
#[ts(optional = nullable)]
|
||||
pub sort_key: Option<ThreadSortKey>,
|
||||
/// Optional sort direction; defaults to descending (newest first).
|
||||
#[ts(optional = nullable)]
|
||||
pub sort_direction: Option<SortDirection>,
|
||||
/// Optional provider filter; when set, only sessions recorded under these
|
||||
/// providers are returned. When present but empty, includes all providers.
|
||||
#[ts(optional = nullable)]
|
||||
@@ -3172,6 +3175,14 @@ pub enum ThreadSortKey {
|
||||
UpdatedAt,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq, JsonSchema, TS)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
#[ts(export_to = "v2/")]
|
||||
pub enum SortDirection {
|
||||
Asc,
|
||||
Desc,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
#[ts(export_to = "v2/")]
|
||||
@@ -3180,6 +3191,11 @@ pub struct ThreadListResponse {
|
||||
/// Opaque cursor to pass to the next call to continue after the last item.
|
||||
/// if None, there are no more items to return.
|
||||
pub next_cursor: Option<String>,
|
||||
/// Opaque cursor to pass as `cursor` when reversing `sortDirection`.
|
||||
/// This is only populated when the page contains at least one thread.
|
||||
/// Use it with the opposite `sortDirection`; for timestamp sorts it anchors
|
||||
/// at the start of the page timestamp so same-second updates are not skipped.
|
||||
pub backwards_cursor: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Default, JsonSchema, TS)]
|
||||
@@ -3245,6 +3261,37 @@ pub struct ThreadReadResponse {
|
||||
pub thread: Thread,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
#[ts(export_to = "v2/")]
|
||||
pub struct ThreadTurnsListParams {
|
||||
pub thread_id: String,
|
||||
/// Opaque cursor to pass to the next call to continue after the last turn.
|
||||
#[ts(optional = nullable)]
|
||||
pub cursor: Option<String>,
|
||||
/// Optional turn page size.
|
||||
#[ts(optional = nullable)]
|
||||
pub limit: Option<u32>,
|
||||
/// Optional turn pagination direction; defaults to descending.
|
||||
#[ts(optional = nullable)]
|
||||
pub sort_direction: Option<SortDirection>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
#[ts(export_to = "v2/")]
|
||||
pub struct ThreadTurnsListResponse {
|
||||
pub data: Vec<Turn>,
|
||||
/// Opaque cursor to pass to the next call to continue after the last turn.
|
||||
/// if None, there are no more turns to return.
|
||||
pub next_cursor: Option<String>,
|
||||
/// Opaque cursor to pass as `cursor` when reversing `sortDirection`.
|
||||
/// This is only populated when the page contains at least one turn.
|
||||
/// Use it with the opposite `sortDirection` to include the anchor turn again
|
||||
/// and catch updates to that turn.
|
||||
pub backwards_cursor: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
#[ts(export_to = "v2/")]
|
||||
|
||||
@@ -1124,6 +1124,7 @@ async fn thread_list(endpoint: &Endpoint, config_overrides: &[String], limit: u3
|
||||
cursor: None,
|
||||
limit: Some(limit),
|
||||
sort_key: None,
|
||||
sort_direction: None,
|
||||
model_providers: None,
|
||||
source_kinds: None,
|
||||
archived: None,
|
||||
|
||||
@@ -139,6 +139,7 @@ Example with notification opt-out:
|
||||
- `thread/list` — page through stored rollouts; supports cursor-based pagination and optional `modelProviders`, `sourceKinds`, `archived`, `cwd`, and `searchTerm` filters. Each returned `thread` includes `status` (`ThreadStatus`), defaulting to `notLoaded` when the thread is not currently loaded.
|
||||
- `thread/loaded/list` — list the thread ids currently loaded in memory.
|
||||
- `thread/read` — read a stored thread by id without resuming it; optionally include turns via `includeTurns`. The returned `thread` includes `status` (`ThreadStatus`), defaulting to `notLoaded` when the thread is not currently loaded.
|
||||
- `thread/turns/list` — page through a stored thread’s turn history without resuming it; supports cursor-based pagination with `sortDirection`, `nextCursor`, and `backwardsCursor`.
|
||||
- `thread/metadata/update` — patch stored thread metadata in sqlite; currently supports updating persisted `gitInfo` fields and returns the refreshed `thread`.
|
||||
- `thread/status/changed` — notification emitted when a loaded thread’s status changes (`threadId` + new `status`).
|
||||
- `thread/archive` — move a thread’s rollout file into the archived directory; returns `{}` on success and emits `thread/archived`.
|
||||
@@ -274,11 +275,13 @@ Experimental API: `thread/start`, `thread/resume`, and `thread/fork` accept `per
|
||||
- `cursor` — opaque string from a prior response; omit for the first page.
|
||||
- `limit` — server defaults to a reasonable page size if unset.
|
||||
- `sortKey` — `created_at` (default) or `updated_at`.
|
||||
- `sortDirection` — `desc` (default) or `asc`.
|
||||
- `modelProviders` — restrict results to specific providers; unset, null, or an empty array will include all providers.
|
||||
- `sourceKinds` — restrict results to specific sources; omit or pass `[]` for interactive sessions only (`cli`, `vscode`).
|
||||
- `archived` — when `true`, list archived threads only. When `false` or `null`, list non-archived threads (default).
|
||||
- `cwd` — restrict results to threads whose session cwd exactly matches this path. Relative paths are resolved against the app-server process cwd before matching.
|
||||
- `searchTerm` — restrict results to threads whose extracted title contains this substring (case-sensitive).
|
||||
- Responses include `nextCursor` to continue in the same direction and `backwardsCursor` to pass as `cursor` when reversing `sortDirection`.
|
||||
- Responses include `agentNickname` and `agentRole` for AgentControl-spawned thread sub-agents when available.
|
||||
|
||||
Example:
|
||||
@@ -294,7 +297,8 @@ Example:
|
||||
{ "id": "thr_a", "preview": "Create a TUI", "modelProvider": "openai", "createdAt": 1730831111, "updatedAt": 1730831111, "status": { "type": "notLoaded" }, "agentNickname": "Atlas", "agentRole": "explorer" },
|
||||
{ "id": "thr_b", "preview": "Fix tests", "modelProvider": "openai", "createdAt": 1730750000, "updatedAt": 1730750000, "status": { "type": "notLoaded" } }
|
||||
],
|
||||
"nextCursor": "opaque-token-or-null"
|
||||
"nextCursor": "opaque-token-or-null",
|
||||
"backwardsCursor": "opaque-token-or-null"
|
||||
} }
|
||||
```
|
||||
|
||||
@@ -351,7 +355,7 @@ If this was the last subscriber, the server unloads the thread and emits `thread
|
||||
|
||||
### Example: Read a thread
|
||||
|
||||
Use `thread/read` to fetch a stored thread by id without resuming it. Pass `includeTurns` when you want the rollout history loaded into `thread.turns`. The returned thread includes `agentNickname` and `agentRole` for AgentControl-spawned thread sub-agents when available.
|
||||
Use `thread/read` to fetch a stored thread by id without resuming it. Pass `includeTurns` when you want the full rollout history loaded into `thread.turns`. The returned thread includes `agentNickname` and `agentRole` for AgentControl-spawned thread sub-agents when available.
|
||||
|
||||
```json
|
||||
{ "method": "thread/read", "id": 22, "params": { "threadId": "thr_123" } }
|
||||
@@ -367,6 +371,23 @@ Use `thread/read` to fetch a stored thread by id without resuming it. Pass `incl
|
||||
} }
|
||||
```
|
||||
|
||||
### Example: List thread turns
|
||||
|
||||
Use `thread/turns/list` to page a stored thread’s turn history without resuming it. By default, results are sorted descending so clients can start at the present and fetch older turns with `nextCursor`. The response also includes `backwardsCursor`; pass it as `cursor` on a later request with `sortDirection: "asc"` to fetch turns newer than the first item from the earlier page.
|
||||
|
||||
```json
|
||||
{ "method": "thread/turns/list", "id": 24, "params": {
|
||||
"threadId": "thr_123",
|
||||
"limit": 50,
|
||||
"sortDirection": "desc"
|
||||
} }
|
||||
{ "id": 24, "result": {
|
||||
"data": [ ... ],
|
||||
"nextCursor": "older-turns-cursor-or-null",
|
||||
"backwardsCursor": "newer-turns-cursor-or-null"
|
||||
} }
|
||||
```
|
||||
|
||||
### Example: Update stored thread metadata
|
||||
|
||||
Use `thread/metadata/update` to patch sqlite-backed metadata for a thread without resuming it. Today this supports persisted `gitInfo`; omitted fields are left unchanged, while explicit `null` clears a stored value.
|
||||
|
||||
@@ -117,6 +117,7 @@ use codex_app_server_protocol::SkillsConfigWriteParams;
|
||||
use codex_app_server_protocol::SkillsConfigWriteResponse;
|
||||
use codex_app_server_protocol::SkillsListParams;
|
||||
use codex_app_server_protocol::SkillsListResponse;
|
||||
use codex_app_server_protocol::SortDirection;
|
||||
use codex_app_server_protocol::Thread;
|
||||
use codex_app_server_protocol::ThreadArchiveParams;
|
||||
use codex_app_server_protocol::ThreadArchiveResponse;
|
||||
@@ -167,6 +168,8 @@ use codex_app_server_protocol::ThreadStartParams;
|
||||
use codex_app_server_protocol::ThreadStartResponse;
|
||||
use codex_app_server_protocol::ThreadStartedNotification;
|
||||
use codex_app_server_protocol::ThreadStatus;
|
||||
use codex_app_server_protocol::ThreadTurnsListParams;
|
||||
use codex_app_server_protocol::ThreadTurnsListResponse;
|
||||
use codex_app_server_protocol::ThreadUnarchiveParams;
|
||||
use codex_app_server_protocol::ThreadUnarchiveResponse;
|
||||
use codex_app_server_protocol::ThreadUnarchivedNotification;
|
||||
@@ -354,6 +357,8 @@ use crate::thread_state::ThreadStateManager;
|
||||
|
||||
const THREAD_LIST_DEFAULT_LIMIT: usize = 25;
|
||||
const THREAD_LIST_MAX_LIMIT: usize = 100;
|
||||
const THREAD_TURNS_DEFAULT_LIMIT: usize = 25;
|
||||
const THREAD_TURNS_MAX_LIMIT: usize = 100;
|
||||
|
||||
struct ThreadListFilters {
|
||||
model_providers: Option<Vec<String>>,
|
||||
@@ -785,6 +790,10 @@ impl CodexMessageProcessor {
|
||||
self.thread_read(to_connection_request_id(request_id), params)
|
||||
.await;
|
||||
}
|
||||
ClientRequest::ThreadTurnsList { request_id, params } => {
|
||||
self.thread_turns_list(to_connection_request_id(request_id), params)
|
||||
.await;
|
||||
}
|
||||
ClientRequest::ThreadShellCommand { request_id, params } => {
|
||||
self.thread_shell_command(to_connection_request_id(request_id), params)
|
||||
.await;
|
||||
@@ -3422,6 +3431,7 @@ impl CodexMessageProcessor {
|
||||
cursor,
|
||||
limit,
|
||||
sort_key,
|
||||
sort_direction,
|
||||
model_providers,
|
||||
source_kinds,
|
||||
archived,
|
||||
@@ -3444,11 +3454,12 @@ impl CodexMessageProcessor {
|
||||
ThreadSortKey::CreatedAt => CoreThreadSortKey::CreatedAt,
|
||||
ThreadSortKey::UpdatedAt => CoreThreadSortKey::UpdatedAt,
|
||||
};
|
||||
let (summaries, next_cursor) = match self
|
||||
let list_result = self
|
||||
.list_threads_common(
|
||||
requested_page_size,
|
||||
cursor,
|
||||
core_sort_key,
|
||||
sort_direction.unwrap_or(SortDirection::Desc),
|
||||
ThreadListFilters {
|
||||
model_providers,
|
||||
source_kinds,
|
||||
@@ -3457,8 +3468,8 @@ impl CodexMessageProcessor {
|
||||
search_term,
|
||||
},
|
||||
)
|
||||
.await
|
||||
{
|
||||
.await;
|
||||
let (summaries, next_cursor) = match list_result {
|
||||
Ok(r) => r,
|
||||
Err(error) => {
|
||||
self.outgoing.send_error(request_id, error).await;
|
||||
@@ -3485,7 +3496,7 @@ impl CodexMessageProcessor {
|
||||
.loaded_statuses_for_threads(status_ids)
|
||||
.await;
|
||||
|
||||
let data = threads
|
||||
let data: Vec<_> = threads
|
||||
.into_iter()
|
||||
.map(|(conversation_id, mut thread)| {
|
||||
if let Some(title) = names.get(&conversation_id).cloned() {
|
||||
@@ -3497,7 +3508,14 @@ impl CodexMessageProcessor {
|
||||
thread
|
||||
})
|
||||
.collect();
|
||||
let response = ThreadListResponse { data, next_cursor };
|
||||
let backwards_cursor = data
|
||||
.first()
|
||||
.and_then(|thread| thread_backwards_cursor_for_sort_key(thread, core_sort_key));
|
||||
let response = ThreadListResponse {
|
||||
data,
|
||||
next_cursor,
|
||||
backwards_cursor,
|
||||
};
|
||||
self.outgoing.send_response(request_id, response).await;
|
||||
}
|
||||
|
||||
@@ -3714,6 +3732,155 @@ impl CodexMessageProcessor {
|
||||
self.outgoing.send_response(request_id, response).await;
|
||||
}
|
||||
|
||||
async fn thread_turns_list(
|
||||
&self,
|
||||
request_id: ConnectionRequestId,
|
||||
params: ThreadTurnsListParams,
|
||||
) {
|
||||
let ThreadTurnsListParams {
|
||||
thread_id,
|
||||
cursor,
|
||||
limit,
|
||||
sort_direction,
|
||||
} = params;
|
||||
|
||||
let thread_uuid = match ThreadId::from_string(&thread_id) {
|
||||
Ok(id) => id,
|
||||
Err(err) => {
|
||||
self.send_invalid_request_error(request_id, format!("invalid thread id: {err}"))
|
||||
.await;
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
let state_db_ctx = get_state_db(&self.config).await;
|
||||
let mut rollout_path = self
|
||||
.resolve_rollout_path(thread_uuid, state_db_ctx.as_ref())
|
||||
.await;
|
||||
if rollout_path.is_none() {
|
||||
rollout_path =
|
||||
match find_thread_path_by_id_str(&self.config.codex_home, &thread_uuid.to_string())
|
||||
.await
|
||||
{
|
||||
Ok(Some(path)) => Some(path),
|
||||
Ok(None) => match find_archived_thread_path_by_id_str(
|
||||
&self.config.codex_home,
|
||||
&thread_uuid.to_string(),
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(path) => path,
|
||||
Err(err) => {
|
||||
self.send_invalid_request_error(
|
||||
request_id,
|
||||
format!("failed to locate archived thread id {thread_uuid}: {err}"),
|
||||
)
|
||||
.await;
|
||||
return;
|
||||
}
|
||||
},
|
||||
Err(err) => {
|
||||
self.send_invalid_request_error(
|
||||
request_id,
|
||||
format!("failed to locate thread id {thread_uuid}: {err}"),
|
||||
)
|
||||
.await;
|
||||
return;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
if rollout_path.is_none() {
|
||||
match self.thread_manager.get_thread(thread_uuid).await {
|
||||
Ok(thread) => {
|
||||
rollout_path = thread.rollout_path();
|
||||
if rollout_path.is_none() {
|
||||
self.send_invalid_request_error(
|
||||
request_id,
|
||||
"ephemeral threads do not support thread/turns/list".to_string(),
|
||||
)
|
||||
.await;
|
||||
return;
|
||||
}
|
||||
}
|
||||
Err(_) => {
|
||||
self.send_invalid_request_error(
|
||||
request_id,
|
||||
format!("thread not loaded: {thread_uuid}"),
|
||||
)
|
||||
.await;
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let Some(rollout_path) = rollout_path.as_ref() else {
|
||||
self.send_internal_error(
|
||||
request_id,
|
||||
format!("failed to locate rollout for thread {thread_uuid}"),
|
||||
)
|
||||
.await;
|
||||
return;
|
||||
};
|
||||
|
||||
match read_rollout_items_from_rollout(rollout_path).await {
|
||||
Ok(items) => {
|
||||
// Rollback and compaction events can change earlier turns, so pagination
|
||||
// has to replay the full rollout until turn metadata is indexed separately.
|
||||
let mut turns = build_turns_from_rollout_items(&items);
|
||||
let has_live_in_progress_turn =
|
||||
match self.thread_manager.get_thread(thread_uuid).await {
|
||||
Ok(thread) => matches!(thread.agent_status().await, AgentStatus::Running),
|
||||
Err(_) => false,
|
||||
};
|
||||
normalize_thread_turns_status(
|
||||
&mut turns,
|
||||
self.thread_watch_manager
|
||||
.loaded_status_for_thread(&thread_uuid.to_string())
|
||||
.await,
|
||||
has_live_in_progress_turn,
|
||||
);
|
||||
let page = match paginate_thread_turns(
|
||||
turns,
|
||||
cursor.as_deref(),
|
||||
limit,
|
||||
sort_direction.unwrap_or(SortDirection::Desc),
|
||||
) {
|
||||
Ok(page) => page,
|
||||
Err(error) => {
|
||||
self.outgoing.send_error(request_id, error).await;
|
||||
return;
|
||||
}
|
||||
};
|
||||
let response = ThreadTurnsListResponse {
|
||||
data: page.turns,
|
||||
next_cursor: page.next_cursor,
|
||||
backwards_cursor: page.backwards_cursor,
|
||||
};
|
||||
self.outgoing.send_response(request_id, response).await;
|
||||
}
|
||||
Err(err) if err.kind() == std::io::ErrorKind::NotFound => {
|
||||
self.send_invalid_request_error(
|
||||
request_id,
|
||||
format!(
|
||||
"thread {thread_uuid} is not materialized yet; thread/turns/list is unavailable before first user message"
|
||||
),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
Err(err) => {
|
||||
self.send_internal_error(
|
||||
request_id,
|
||||
format!(
|
||||
"failed to load rollout `{}` for thread {thread_uuid}: {err}",
|
||||
rollout_path.display()
|
||||
),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn thread_created_receiver(&self) -> broadcast::Receiver<ThreadId> {
|
||||
self.thread_manager.subscribe_thread_created()
|
||||
}
|
||||
@@ -4679,6 +4846,7 @@ impl CodexMessageProcessor {
|
||||
requested_page_size: usize,
|
||||
cursor: Option<String>,
|
||||
sort_key: CoreThreadSortKey,
|
||||
sort_direction: SortDirection,
|
||||
filters: ThreadListFilters,
|
||||
) -> Result<(Vec<ConversationSummary>, Option<String>), JSONRPCErrorError> {
|
||||
let ThreadListFilters {
|
||||
@@ -4689,13 +4857,15 @@ impl CodexMessageProcessor {
|
||||
search_term,
|
||||
} = filters;
|
||||
let mut cursor_obj: Option<RolloutCursor> = match cursor.as_ref() {
|
||||
Some(cursor_str) => {
|
||||
Some(parse_cursor(cursor_str).ok_or_else(|| JSONRPCErrorError {
|
||||
code: INVALID_REQUEST_ERROR_CODE,
|
||||
message: format!("invalid cursor: {cursor_str}"),
|
||||
data: None,
|
||||
})?)
|
||||
}
|
||||
Some(cursor_str) => Some(
|
||||
parse_thread_list_cursor(cursor_str, sort_direction).ok_or_else(|| {
|
||||
JSONRPCErrorError {
|
||||
code: INVALID_REQUEST_ERROR_CODE,
|
||||
message: format!("invalid cursor: {cursor_str}"),
|
||||
data: None,
|
||||
}
|
||||
})?,
|
||||
),
|
||||
None => None,
|
||||
};
|
||||
let mut last_cursor = cursor_obj.clone();
|
||||
@@ -4717,6 +4887,10 @@ impl CodexMessageProcessor {
|
||||
let (allowed_sources_vec, source_kind_filter) = compute_source_filters(source_kinds);
|
||||
let allowed_sources = allowed_sources_vec.as_slice();
|
||||
let state_db_ctx = get_state_db(&self.config).await;
|
||||
let core_sort_direction = match sort_direction {
|
||||
SortDirection::Asc => codex_core::SortDirection::Asc,
|
||||
SortDirection::Desc => codex_core::SortDirection::Desc,
|
||||
};
|
||||
|
||||
while remaining > 0 {
|
||||
let page_size = remaining.min(THREAD_LIST_MAX_LIMIT);
|
||||
@@ -4726,37 +4900,35 @@ impl CodexMessageProcessor {
|
||||
page_size,
|
||||
cursor_obj.as_ref(),
|
||||
sort_key,
|
||||
core_sort_direction,
|
||||
allowed_sources,
|
||||
model_provider_filter.as_deref(),
|
||||
fallback_provider.as_str(),
|
||||
search_term.as_deref(),
|
||||
)
|
||||
.await
|
||||
.map_err(|err| JSONRPCErrorError {
|
||||
code: INTERNAL_ERROR_CODE,
|
||||
message: format!("failed to list threads: {err}"),
|
||||
data: None,
|
||||
})?
|
||||
} else {
|
||||
RolloutRecorder::list_threads(
|
||||
&self.config,
|
||||
page_size,
|
||||
cursor_obj.as_ref(),
|
||||
sort_key,
|
||||
core_sort_direction,
|
||||
allowed_sources,
|
||||
model_provider_filter.as_deref(),
|
||||
fallback_provider.as_str(),
|
||||
search_term.as_deref(),
|
||||
)
|
||||
.await
|
||||
.map_err(|err| JSONRPCErrorError {
|
||||
code: INTERNAL_ERROR_CODE,
|
||||
message: format!("failed to list threads: {err}"),
|
||||
data: None,
|
||||
})?
|
||||
};
|
||||
}
|
||||
.map_err(|err| JSONRPCErrorError {
|
||||
code: INTERNAL_ERROR_CODE,
|
||||
message: format!("failed to list threads: {err}"),
|
||||
data: None,
|
||||
})?;
|
||||
|
||||
let mut filtered = Vec::with_capacity(page.items.len());
|
||||
let next_cursor_value = page.next_cursor.clone();
|
||||
let mut candidate_summaries = Vec::with_capacity(page.items.len());
|
||||
for it in page.items {
|
||||
let Some(summary) = summary_from_thread_list_item(
|
||||
it,
|
||||
@@ -4767,6 +4939,11 @@ impl CodexMessageProcessor {
|
||||
else {
|
||||
continue;
|
||||
};
|
||||
candidate_summaries.push(summary);
|
||||
}
|
||||
|
||||
let mut filtered = Vec::with_capacity(candidate_summaries.len());
|
||||
for summary in candidate_summaries {
|
||||
if source_kind_filter
|
||||
.as_ref()
|
||||
.is_none_or(|filter| source_kind_matches(&summary.source, filter))
|
||||
@@ -4784,7 +4961,6 @@ impl CodexMessageProcessor {
|
||||
remaining = requested_page_size.saturating_sub(items.len());
|
||||
|
||||
// Encode RolloutCursor into the JSON-RPC string form returned to clients.
|
||||
let next_cursor_value = page.next_cursor.clone();
|
||||
next_cursor = next_cursor_value
|
||||
.as_ref()
|
||||
.and_then(|cursor| serde_json::to_value(cursor).ok())
|
||||
@@ -9247,6 +9423,180 @@ pub(crate) fn summary_to_thread(summary: ConversationSummary) -> Thread {
|
||||
}
|
||||
}
|
||||
|
||||
fn thread_backwards_cursor_for_sort_key(
|
||||
thread: &Thread,
|
||||
sort_key: CoreThreadSortKey,
|
||||
) -> Option<String> {
|
||||
let timestamp = match sort_key {
|
||||
CoreThreadSortKey::CreatedAt => thread.created_at,
|
||||
CoreThreadSortKey::UpdatedAt => thread.updated_at,
|
||||
};
|
||||
let timestamp = DateTime::<Utc>::from_timestamp(timestamp, /*nsecs*/ 0)?;
|
||||
serde_json::to_string(&ThreadListCursor {
|
||||
timestamp: timestamp.to_rfc3339_opts(SecondsFormat::Secs, true),
|
||||
id: thread.id.clone(),
|
||||
include_timestamp_bucket: true,
|
||||
})
|
||||
.ok()
|
||||
}
|
||||
|
||||
#[derive(serde::Serialize, serde::Deserialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
struct ThreadListCursor {
|
||||
timestamp: String,
|
||||
id: String,
|
||||
include_timestamp_bucket: bool,
|
||||
}
|
||||
|
||||
fn parse_thread_list_cursor(cursor: &str, sort_direction: SortDirection) -> Option<RolloutCursor> {
|
||||
if let Ok(cursor) = serde_json::from_str::<ThreadListCursor>(cursor) {
|
||||
let id = if cursor.include_timestamp_bucket {
|
||||
match sort_direction {
|
||||
SortDirection::Asc => Uuid::nil().to_string(),
|
||||
SortDirection::Desc => "ffffffff-ffff-ffff-ffff-ffffffffffff".to_string(),
|
||||
}
|
||||
} else {
|
||||
cursor.id
|
||||
};
|
||||
return parse_cursor(&format!("{}|{}", cursor.timestamp, id));
|
||||
}
|
||||
|
||||
parse_cursor(cursor)
|
||||
}
|
||||
|
||||
struct ThreadTurnsPage {
|
||||
turns: Vec<Turn>,
|
||||
next_cursor: Option<String>,
|
||||
backwards_cursor: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(serde::Serialize, serde::Deserialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
struct ThreadTurnsCursor {
|
||||
turn_id: String,
|
||||
include_anchor: bool,
|
||||
}
|
||||
|
||||
fn paginate_thread_turns(
|
||||
turns: Vec<Turn>,
|
||||
cursor: Option<&str>,
|
||||
limit: Option<u32>,
|
||||
sort_direction: SortDirection,
|
||||
) -> Result<ThreadTurnsPage, JSONRPCErrorError> {
|
||||
if turns.is_empty() {
|
||||
return Ok(ThreadTurnsPage {
|
||||
turns: Vec::new(),
|
||||
next_cursor: None,
|
||||
backwards_cursor: None,
|
||||
});
|
||||
}
|
||||
|
||||
let anchor = cursor.map(parse_thread_turns_cursor).transpose()?;
|
||||
let page_size = limit
|
||||
.map(|value| value as usize)
|
||||
.unwrap_or(THREAD_TURNS_DEFAULT_LIMIT)
|
||||
.clamp(1, THREAD_TURNS_MAX_LIMIT);
|
||||
|
||||
let anchor_index = anchor
|
||||
.as_ref()
|
||||
.and_then(|anchor| turns.iter().position(|turn| turn.id == anchor.turn_id));
|
||||
if anchor.is_some() && anchor_index.is_none() {
|
||||
return Ok(ThreadTurnsPage {
|
||||
turns: Vec::new(),
|
||||
next_cursor: None,
|
||||
backwards_cursor: None,
|
||||
});
|
||||
}
|
||||
|
||||
let mut keyed_turns: Vec<_> = turns.into_iter().enumerate().collect();
|
||||
match sort_direction {
|
||||
SortDirection::Asc => {
|
||||
if let (Some(anchor), Some(anchor_index)) = (anchor.as_ref(), anchor_index) {
|
||||
keyed_turns.retain(|(index, _)| {
|
||||
if anchor.include_anchor {
|
||||
*index >= anchor_index
|
||||
} else {
|
||||
*index > anchor_index
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
SortDirection::Desc => {
|
||||
keyed_turns.reverse();
|
||||
if let (Some(anchor), Some(anchor_index)) = (anchor.as_ref(), anchor_index) {
|
||||
keyed_turns.retain(|(index, _)| {
|
||||
if anchor.include_anchor {
|
||||
*index <= anchor_index
|
||||
} else {
|
||||
*index < anchor_index
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let more_turns_available = keyed_turns.len() > page_size;
|
||||
keyed_turns.truncate(page_size);
|
||||
let backwards_cursor = keyed_turns
|
||||
.first()
|
||||
.map(|(_, turn)| serialize_thread_turns_cursor(&turn.id, /*include_anchor*/ true))
|
||||
.transpose()?;
|
||||
let next_cursor = if more_turns_available {
|
||||
keyed_turns
|
||||
.last()
|
||||
.map(|(_, turn)| serialize_thread_turns_cursor(&turn.id, /*include_anchor*/ false))
|
||||
.transpose()?
|
||||
} else {
|
||||
None
|
||||
};
|
||||
let turns = keyed_turns.into_iter().map(|(_, turn)| turn).collect();
|
||||
|
||||
Ok(ThreadTurnsPage {
|
||||
turns,
|
||||
next_cursor,
|
||||
backwards_cursor,
|
||||
})
|
||||
}
|
||||
|
||||
fn serialize_thread_turns_cursor(
|
||||
turn_id: &str,
|
||||
include_anchor: bool,
|
||||
) -> Result<String, JSONRPCErrorError> {
|
||||
serde_json::to_string(&ThreadTurnsCursor {
|
||||
turn_id: turn_id.to_string(),
|
||||
include_anchor,
|
||||
})
|
||||
.map_err(|err| JSONRPCErrorError {
|
||||
code: INTERNAL_ERROR_CODE,
|
||||
message: format!("failed to serialize cursor: {err}"),
|
||||
data: None,
|
||||
})
|
||||
}
|
||||
|
||||
fn parse_thread_turns_cursor(cursor: &str) -> Result<ThreadTurnsCursor, JSONRPCErrorError> {
|
||||
serde_json::from_str(cursor).map_err(|_| JSONRPCErrorError {
|
||||
code: INVALID_REQUEST_ERROR_CODE,
|
||||
message: format!("invalid cursor: {cursor}"),
|
||||
data: None,
|
||||
})
|
||||
}
|
||||
|
||||
fn normalize_thread_turns_status(
|
||||
turns: &mut [Turn],
|
||||
loaded_status: ThreadStatus,
|
||||
has_live_in_progress_turn: bool,
|
||||
) {
|
||||
let status = resolve_thread_status(loaded_status, has_live_in_progress_turn);
|
||||
if matches!(status, ThreadStatus::Active { .. }) {
|
||||
return;
|
||||
}
|
||||
for turn in turns {
|
||||
if matches!(turn.status, TurnStatus::InProgress) {
|
||||
turn.status = TurnStatus::Interrupted;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
@@ -76,6 +76,7 @@ use codex_app_server_protocol::ThreadRollbackParams;
|
||||
use codex_app_server_protocol::ThreadSetNameParams;
|
||||
use codex_app_server_protocol::ThreadShellCommandParams;
|
||||
use codex_app_server_protocol::ThreadStartParams;
|
||||
use codex_app_server_protocol::ThreadTurnsListParams;
|
||||
use codex_app_server_protocol::ThreadUnarchiveParams;
|
||||
use codex_app_server_protocol::ThreadUnsubscribeParams;
|
||||
use codex_app_server_protocol::TurnCompletedNotification;
|
||||
@@ -451,6 +452,15 @@ impl McpProcess {
|
||||
self.send_request("thread/read", params).await
|
||||
}
|
||||
|
||||
/// Send a `thread/turns/list` JSON-RPC request.
|
||||
pub async fn send_thread_turns_list_request(
|
||||
&mut self,
|
||||
params: ThreadTurnsListParams,
|
||||
) -> anyhow::Result<i64> {
|
||||
let params = Some(serde_json::to_value(params)?);
|
||||
self.send_request("thread/turns/list", params).await
|
||||
}
|
||||
|
||||
/// Send a `model/list` JSON-RPC request.
|
||||
pub async fn send_list_models_request(
|
||||
&mut self,
|
||||
|
||||
@@ -485,6 +485,7 @@ async fn thread_fork_ephemeral_remains_pathless_and_omits_listing() -> Result<()
|
||||
cursor: None,
|
||||
limit: Some(10),
|
||||
sort_key: None,
|
||||
sort_direction: None,
|
||||
model_providers: None,
|
||||
source_kinds: None,
|
||||
archived: None,
|
||||
|
||||
@@ -13,6 +13,7 @@ use codex_app_server_protocol::JSONRPCError;
|
||||
use codex_app_server_protocol::JSONRPCResponse;
|
||||
use codex_app_server_protocol::RequestId;
|
||||
use codex_app_server_protocol::SessionSource;
|
||||
use codex_app_server_protocol::SortDirection;
|
||||
use codex_app_server_protocol::ThreadListResponse;
|
||||
use codex_app_server_protocol::ThreadSortKey;
|
||||
use codex_app_server_protocol::ThreadSourceKind;
|
||||
@@ -84,6 +85,7 @@ async fn list_threads_with_sort(
|
||||
cursor,
|
||||
limit,
|
||||
sort_key,
|
||||
sort_direction: None,
|
||||
model_providers: providers,
|
||||
source_kinds,
|
||||
archived,
|
||||
@@ -357,6 +359,7 @@ async fn thread_list_pagination_next_cursor_none_on_last_page() -> Result<()> {
|
||||
let ThreadListResponse {
|
||||
data: data1,
|
||||
next_cursor: cursor1,
|
||||
..
|
||||
} = list_threads(
|
||||
&mut mcp,
|
||||
/*cursor*/ None,
|
||||
@@ -384,6 +387,7 @@ async fn thread_list_pagination_next_cursor_none_on_last_page() -> Result<()> {
|
||||
let ThreadListResponse {
|
||||
data: data2,
|
||||
next_cursor: cursor2,
|
||||
..
|
||||
} = list_threads(
|
||||
&mut mcp,
|
||||
Some(cursor1),
|
||||
@@ -498,6 +502,7 @@ async fn thread_list_respects_cwd_filter() -> Result<()> {
|
||||
cursor: None,
|
||||
limit: Some(10),
|
||||
sort_key: None,
|
||||
sort_direction: None,
|
||||
model_providers: Some(vec!["mock_provider".to_string()]),
|
||||
source_kinds: None,
|
||||
archived: None,
|
||||
@@ -579,6 +584,7 @@ sqlite = true
|
||||
cursor: None,
|
||||
limit: Some(10),
|
||||
sort_key: None,
|
||||
sort_direction: None,
|
||||
model_providers: Some(vec!["mock_provider".to_string()]),
|
||||
source_kinds: None,
|
||||
archived: None,
|
||||
@@ -1233,6 +1239,114 @@ async fn thread_list_updated_at_paginates_with_cursor() -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn thread_list_backwards_cursor_can_seed_forward_delta_sync() -> Result<()> {
|
||||
let codex_home = TempDir::new()?;
|
||||
create_minimal_config(codex_home.path())?;
|
||||
|
||||
let id_old = create_fake_rollout(
|
||||
codex_home.path(),
|
||||
"2025-02-01T10-00-00",
|
||||
"2025-02-01T10:00:00Z",
|
||||
"Hello",
|
||||
Some("mock_provider"),
|
||||
/*git_info*/ None,
|
||||
)?;
|
||||
let id_watermark = create_fake_rollout(
|
||||
codex_home.path(),
|
||||
"2025-02-01T11-00-00",
|
||||
"2025-02-01T11:00:00Z",
|
||||
"Hello",
|
||||
Some("mock_provider"),
|
||||
/*git_info*/ None,
|
||||
)?;
|
||||
|
||||
set_rollout_mtime(
|
||||
rollout_path(codex_home.path(), "2025-02-01T10-00-00", &id_old).as_path(),
|
||||
"2025-02-02T00:00:00Z",
|
||||
)?;
|
||||
set_rollout_mtime(
|
||||
rollout_path(codex_home.path(), "2025-02-01T11-00-00", &id_watermark).as_path(),
|
||||
"2025-02-03T00:00:00Z",
|
||||
)?;
|
||||
|
||||
let mut mcp = init_mcp(codex_home.path()).await?;
|
||||
|
||||
let ThreadListResponse {
|
||||
data: page1,
|
||||
backwards_cursor,
|
||||
..
|
||||
} = {
|
||||
let request_id = mcp
|
||||
.send_thread_list_request(codex_app_server_protocol::ThreadListParams {
|
||||
cursor: None,
|
||||
limit: Some(1),
|
||||
sort_key: Some(ThreadSortKey::UpdatedAt),
|
||||
sort_direction: Some(SortDirection::Desc),
|
||||
model_providers: Some(vec!["mock_provider".to_string()]),
|
||||
source_kinds: None,
|
||||
archived: None,
|
||||
cwd: None,
|
||||
search_term: None,
|
||||
})
|
||||
.await?;
|
||||
let resp: JSONRPCResponse = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(request_id)),
|
||||
)
|
||||
.await??;
|
||||
to_response::<ThreadListResponse>(resp)?
|
||||
};
|
||||
let ids_page1: Vec<_> = page1.iter().map(|thread| thread.id.as_str()).collect();
|
||||
assert_eq!(ids_page1, vec![id_watermark.as_str()]);
|
||||
let backwards_cursor = backwards_cursor.expect("expected backwardsCursor on first page");
|
||||
assert!(
|
||||
backwards_cursor.contains(&id_watermark),
|
||||
"backwardsCursor should preserve the actual first thread id"
|
||||
);
|
||||
|
||||
let id_new = create_fake_rollout(
|
||||
codex_home.path(),
|
||||
"2025-02-01T12-00-00",
|
||||
"2025-02-01T12:00:00Z",
|
||||
"Hello",
|
||||
Some("mock_provider"),
|
||||
/*git_info*/ None,
|
||||
)?;
|
||||
set_rollout_mtime(
|
||||
rollout_path(codex_home.path(), "2025-02-01T12-00-00", &id_new).as_path(),
|
||||
"2025-02-04T00:00:00Z",
|
||||
)?;
|
||||
|
||||
let ThreadListResponse {
|
||||
data: delta_page, ..
|
||||
} = {
|
||||
let request_id = mcp
|
||||
.send_thread_list_request(codex_app_server_protocol::ThreadListParams {
|
||||
cursor: Some(backwards_cursor),
|
||||
limit: Some(10),
|
||||
sort_key: Some(ThreadSortKey::UpdatedAt),
|
||||
sort_direction: Some(SortDirection::Asc),
|
||||
model_providers: Some(vec!["mock_provider".to_string()]),
|
||||
source_kinds: None,
|
||||
archived: None,
|
||||
cwd: None,
|
||||
search_term: None,
|
||||
})
|
||||
.await?;
|
||||
let resp: JSONRPCResponse = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(request_id)),
|
||||
)
|
||||
.await??;
|
||||
to_response::<ThreadListResponse>(resp)?
|
||||
};
|
||||
let ids_delta: Vec<_> = delta_page.iter().map(|thread| thread.id.as_str()).collect();
|
||||
assert_eq!(ids_delta, vec![id_watermark.as_str(), id_new.as_str()]);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn thread_list_created_at_tie_breaks_by_uuid() -> Result<()> {
|
||||
let codex_home = TempDir::new()?;
|
||||
@@ -1449,6 +1563,7 @@ async fn thread_list_invalid_cursor_returns_error() -> Result<()> {
|
||||
cursor: Some("not-a-cursor".to_string()),
|
||||
limit: Some(2),
|
||||
sort_key: None,
|
||||
sort_direction: None,
|
||||
model_providers: Some(vec!["mock_provider".to_string()]),
|
||||
source_kinds: None,
|
||||
archived: None,
|
||||
|
||||
@@ -109,7 +109,7 @@ async fn thread_metadata_update_patches_git_branch_and_returns_updated_thread()
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(read_id)),
|
||||
)
|
||||
.await??;
|
||||
let ThreadReadResponse { thread: read } = to_response::<ThreadReadResponse>(read_resp)?;
|
||||
let ThreadReadResponse { thread: read, .. } = to_response::<ThreadReadResponse>(read_resp)?;
|
||||
|
||||
assert_eq!(
|
||||
read.git_info,
|
||||
@@ -421,7 +421,7 @@ async fn thread_metadata_update_can_clear_stored_git_fields() -> Result<()> {
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(read_id)),
|
||||
)
|
||||
.await??;
|
||||
let ThreadReadResponse { thread: read } = to_response::<ThreadReadResponse>(read_resp)?;
|
||||
let ThreadReadResponse { thread: read, .. } = to_response::<ThreadReadResponse>(read_resp)?;
|
||||
|
||||
assert_eq!(read.git_info, None);
|
||||
|
||||
|
||||
@@ -2,11 +2,13 @@ use anyhow::Result;
|
||||
use app_test_support::McpProcess;
|
||||
use app_test_support::create_fake_rollout_with_text_elements;
|
||||
use app_test_support::create_mock_responses_server_repeating_assistant;
|
||||
use app_test_support::rollout_path;
|
||||
use app_test_support::to_response;
|
||||
use codex_app_server_protocol::JSONRPCError;
|
||||
use codex_app_server_protocol::JSONRPCResponse;
|
||||
use codex_app_server_protocol::RequestId;
|
||||
use codex_app_server_protocol::SessionSource;
|
||||
use codex_app_server_protocol::SortDirection;
|
||||
use codex_app_server_protocol::ThreadForkParams;
|
||||
use codex_app_server_protocol::ThreadForkResponse;
|
||||
use codex_app_server_protocol::ThreadItem;
|
||||
@@ -22,6 +24,8 @@ use codex_app_server_protocol::ThreadSetNameResponse;
|
||||
use codex_app_server_protocol::ThreadStartParams;
|
||||
use codex_app_server_protocol::ThreadStartResponse;
|
||||
use codex_app_server_protocol::ThreadStatus;
|
||||
use codex_app_server_protocol::ThreadTurnsListParams;
|
||||
use codex_app_server_protocol::ThreadTurnsListResponse;
|
||||
use codex_app_server_protocol::TurnStartParams;
|
||||
use codex_app_server_protocol::TurnStartResponse;
|
||||
use codex_app_server_protocol::TurnStatus;
|
||||
@@ -31,6 +35,8 @@ use codex_protocol::user_input::TextElement;
|
||||
use core_test_support::responses;
|
||||
use pretty_assertions::assert_eq;
|
||||
use serde_json::Value;
|
||||
use serde_json::json;
|
||||
use std::io::Write;
|
||||
use std::path::Path;
|
||||
use std::path::PathBuf;
|
||||
use tempfile::TempDir;
|
||||
@@ -76,7 +82,7 @@ async fn thread_read_returns_summary_without_turns() -> Result<()> {
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(read_id)),
|
||||
)
|
||||
.await??;
|
||||
let ThreadReadResponse { thread } = to_response::<ThreadReadResponse>(read_resp)?;
|
||||
let ThreadReadResponse { thread, .. } = to_response::<ThreadReadResponse>(read_resp)?;
|
||||
|
||||
assert_eq!(thread.id, conversation_id);
|
||||
assert_eq!(thread.preview, preview);
|
||||
@@ -131,7 +137,7 @@ async fn thread_read_can_include_turns() -> Result<()> {
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(read_id)),
|
||||
)
|
||||
.await??;
|
||||
let ThreadReadResponse { thread } = to_response::<ThreadReadResponse>(read_resp)?;
|
||||
let ThreadReadResponse { thread, .. } = to_response::<ThreadReadResponse>(read_resp)?;
|
||||
|
||||
assert_eq!(thread.turns.len(), 1);
|
||||
let turn = &thread.turns[0];
|
||||
@@ -154,6 +160,88 @@ async fn thread_read_can_include_turns() -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn thread_turns_list_can_page_backward_and_forward() -> Result<()> {
|
||||
let server = create_mock_responses_server_repeating_assistant("Done").await;
|
||||
let codex_home = TempDir::new()?;
|
||||
create_config_toml(codex_home.path(), &server.uri())?;
|
||||
|
||||
let filename_ts = "2025-01-05T12-00-00";
|
||||
let conversation_id = create_fake_rollout_with_text_elements(
|
||||
codex_home.path(),
|
||||
filename_ts,
|
||||
"2025-01-05T12:00:00Z",
|
||||
"first",
|
||||
vec![],
|
||||
Some("mock_provider"),
|
||||
/*git_info*/ None,
|
||||
)?;
|
||||
let rollout_path = rollout_path(codex_home.path(), filename_ts, &conversation_id);
|
||||
append_user_message(rollout_path.as_path(), "2025-01-05T12:01:00Z", "second")?;
|
||||
append_user_message(rollout_path.as_path(), "2025-01-05T12:02:00Z", "third")?;
|
||||
|
||||
let mut mcp = McpProcess::new(codex_home.path()).await?;
|
||||
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
|
||||
|
||||
let read_id = mcp
|
||||
.send_thread_turns_list_request(ThreadTurnsListParams {
|
||||
thread_id: conversation_id.clone(),
|
||||
cursor: None,
|
||||
limit: Some(2),
|
||||
sort_direction: Some(SortDirection::Desc),
|
||||
})
|
||||
.await?;
|
||||
let read_resp: JSONRPCResponse = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(read_id)),
|
||||
)
|
||||
.await??;
|
||||
let ThreadTurnsListResponse {
|
||||
data,
|
||||
next_cursor,
|
||||
backwards_cursor,
|
||||
} = to_response::<ThreadTurnsListResponse>(read_resp)?;
|
||||
assert_eq!(turn_user_texts(&data), vec!["third", "second"]);
|
||||
let next_cursor = next_cursor.expect("expected nextCursor for older turns");
|
||||
let backwards_cursor = backwards_cursor.expect("expected backwardsCursor for newest turn");
|
||||
|
||||
let read_id = mcp
|
||||
.send_thread_turns_list_request(ThreadTurnsListParams {
|
||||
thread_id: conversation_id.clone(),
|
||||
cursor: Some(next_cursor),
|
||||
limit: Some(10),
|
||||
sort_direction: Some(SortDirection::Desc),
|
||||
})
|
||||
.await?;
|
||||
let read_resp: JSONRPCResponse = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(read_id)),
|
||||
)
|
||||
.await??;
|
||||
let ThreadTurnsListResponse { data, .. } = to_response::<ThreadTurnsListResponse>(read_resp)?;
|
||||
assert_eq!(turn_user_texts(&data), vec!["first"]);
|
||||
|
||||
append_user_message(rollout_path.as_path(), "2025-01-05T12:03:00Z", "fourth")?;
|
||||
|
||||
let read_id = mcp
|
||||
.send_thread_turns_list_request(ThreadTurnsListParams {
|
||||
thread_id: conversation_id,
|
||||
cursor: Some(backwards_cursor),
|
||||
limit: Some(10),
|
||||
sort_direction: Some(SortDirection::Asc),
|
||||
})
|
||||
.await?;
|
||||
let read_resp: JSONRPCResponse = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(read_id)),
|
||||
)
|
||||
.await??;
|
||||
let ThreadTurnsListResponse { data, .. } = to_response::<ThreadTurnsListResponse>(read_resp)?;
|
||||
assert_eq!(turn_user_texts(&data), vec!["third", "fourth"]);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn thread_read_returns_forked_from_id_for_forked_threads() -> Result<()> {
|
||||
let server = create_mock_responses_server_repeating_assistant("Done").await;
|
||||
@@ -197,7 +285,7 @@ async fn thread_read_returns_forked_from_id_for_forked_threads() -> Result<()> {
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(read_id)),
|
||||
)
|
||||
.await??;
|
||||
let ThreadReadResponse { thread } = to_response::<ThreadReadResponse>(read_resp)?;
|
||||
let ThreadReadResponse { thread, .. } = to_response::<ThreadReadResponse>(read_resp)?;
|
||||
|
||||
assert_eq!(thread.forked_from_id, Some(conversation_id));
|
||||
|
||||
@@ -242,7 +330,7 @@ async fn thread_read_loaded_thread_returns_precomputed_path_before_materializati
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(read_id)),
|
||||
)
|
||||
.await??;
|
||||
let ThreadReadResponse { thread: read } = to_response::<ThreadReadResponse>(read_resp)?;
|
||||
let ThreadReadResponse { thread: read, .. } = to_response::<ThreadReadResponse>(read_resp)?;
|
||||
|
||||
assert_eq!(read.id, thread.id);
|
||||
assert_eq!(read.path, Some(thread_path));
|
||||
@@ -310,7 +398,7 @@ async fn thread_name_set_is_reflected_in_read_list_and_resume() -> Result<()> {
|
||||
)
|
||||
.await??;
|
||||
let read_result = read_resp.result.clone();
|
||||
let ThreadReadResponse { thread } = to_response::<ThreadReadResponse>(read_resp)?;
|
||||
let ThreadReadResponse { thread, .. } = to_response::<ThreadReadResponse>(read_resp)?;
|
||||
assert_eq!(thread.id, conversation_id);
|
||||
assert_eq!(thread.name.as_deref(), Some(new_name));
|
||||
let thread_json = read_result
|
||||
@@ -334,6 +422,7 @@ async fn thread_name_set_is_reflected_in_read_list_and_resume() -> Result<()> {
|
||||
cursor: None,
|
||||
limit: Some(50),
|
||||
sort_key: None,
|
||||
sort_direction: None,
|
||||
model_providers: Some(vec!["mock_provider".to_string()]),
|
||||
source_kinds: None,
|
||||
archived: None,
|
||||
@@ -519,13 +608,47 @@ async fn thread_read_reports_system_error_idle_flag_after_failed_turn() -> Resul
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(read_id)),
|
||||
)
|
||||
.await??;
|
||||
let ThreadReadResponse { thread } = to_response::<ThreadReadResponse>(read_resp)?;
|
||||
let ThreadReadResponse { thread, .. } = to_response::<ThreadReadResponse>(read_resp)?;
|
||||
|
||||
assert_eq!(thread.status, ThreadStatus::SystemError,);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn append_user_message(path: &Path, timestamp: &str, text: &str) -> std::io::Result<()> {
|
||||
let mut file = std::fs::OpenOptions::new().append(true).open(path)?;
|
||||
writeln!(
|
||||
file,
|
||||
"{}",
|
||||
json!({
|
||||
"timestamp": timestamp,
|
||||
"type":"event_msg",
|
||||
"payload": {
|
||||
"type":"user_message",
|
||||
"message": text,
|
||||
"text_elements": [],
|
||||
"local_images": []
|
||||
}
|
||||
})
|
||||
)
|
||||
}
|
||||
|
||||
fn turn_user_texts(turns: &[codex_app_server_protocol::Turn]) -> Vec<&str> {
|
||||
turns
|
||||
.iter()
|
||||
.filter_map(|turn| match turn.items.first()? {
|
||||
ThreadItem::UserMessage { content, .. } => match content.first()? {
|
||||
UserInput::Text { text, .. } => Some(text.as_str()),
|
||||
UserInput::Image { .. }
|
||||
| UserInput::LocalImage { .. }
|
||||
| UserInput::Skill { .. }
|
||||
| UserInput::Mention { .. } => None,
|
||||
},
|
||||
_ => None,
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
// Helper to create a config.toml pointing at the mock model server.
|
||||
fn create_config_toml(codex_home: &Path, server_uri: &str) -> std::io::Result<()> {
|
||||
let config_toml = codex_home.join("config.toml");
|
||||
|
||||
@@ -571,6 +571,7 @@ async fn thread_resume_and_read_interrupt_incomplete_rollout_turn_when_thread_is
|
||||
.await??;
|
||||
let ThreadReadResponse {
|
||||
thread: read_thread,
|
||||
..
|
||||
} = to_response::<ThreadReadResponse>(read_resp)?;
|
||||
|
||||
assert_eq!(read_thread.status, ThreadStatus::Idle);
|
||||
|
||||
@@ -135,7 +135,7 @@ async fn thread_shell_command_runs_as_standalone_turn_and_persists_history() ->
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(read_id)),
|
||||
)
|
||||
.await??;
|
||||
let ThreadReadResponse { thread } = to_response::<ThreadReadResponse>(read_resp)?;
|
||||
let ThreadReadResponse { thread, .. } = to_response::<ThreadReadResponse>(read_resp)?;
|
||||
assert_eq!(thread.turns.len(), 1);
|
||||
let ThreadItem::CommandExecution {
|
||||
source,
|
||||
@@ -305,7 +305,7 @@ async fn thread_shell_command_uses_existing_active_turn() -> Result<()> {
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(read_id)),
|
||||
)
|
||||
.await??;
|
||||
let ThreadReadResponse { thread } = to_response::<ThreadReadResponse>(read_resp)?;
|
||||
let ThreadReadResponse { thread, .. } = to_response::<ThreadReadResponse>(read_resp)?;
|
||||
assert_eq!(thread.turns.len(), 1);
|
||||
assert!(
|
||||
thread.turns[0].items.iter().any(|item| {
|
||||
|
||||
@@ -276,7 +276,7 @@ async fn thread_unsubscribe_clears_cached_status_before_resume() -> Result<()> {
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(read_id)),
|
||||
)
|
||||
.await??;
|
||||
let ThreadReadResponse { thread } = to_response::<ThreadReadResponse>(read_resp)?;
|
||||
let ThreadReadResponse { thread, .. } = to_response::<ThreadReadResponse>(read_resp)?;
|
||||
assert_eq!(thread.status, ThreadStatus::SystemError);
|
||||
|
||||
let unsubscribe_id = mcp
|
||||
|
||||
@@ -160,6 +160,7 @@ pub use rollout::RolloutRecorder;
|
||||
pub use rollout::RolloutRecorderParams;
|
||||
pub use rollout::SESSIONS_SUBDIR;
|
||||
pub use rollout::SessionMeta;
|
||||
pub use rollout::SortDirection;
|
||||
pub use rollout::ThreadItem;
|
||||
pub use rollout::ThreadSortKey;
|
||||
pub use rollout::ThreadsPage;
|
||||
|
||||
@@ -4,7 +4,9 @@ use crate::event_mapping::is_contextual_user_message_content;
|
||||
use chrono::Utc;
|
||||
use codex_git_utils::resolve_root_git_project_for_trust;
|
||||
use codex_protocol::models::ResponseItem;
|
||||
use codex_state::SortDirection;
|
||||
use codex_state::SortKey;
|
||||
use codex_state::ThreadFilterOptions;
|
||||
use codex_state::ThreadMetadata;
|
||||
use codex_utils_output_truncation::TruncationPolicy;
|
||||
use codex_utils_output_truncation::truncate_text;
|
||||
@@ -124,12 +126,15 @@ async fn load_recent_threads(sess: &Session) -> Vec<ThreadMetadata> {
|
||||
match state_db
|
||||
.list_threads(
|
||||
MAX_RECENT_THREADS,
|
||||
/*anchor*/ None,
|
||||
SortKey::UpdatedAt,
|
||||
&[],
|
||||
/*model_providers*/ None,
|
||||
/*archived_only*/ false,
|
||||
/*search_term*/ None,
|
||||
ThreadFilterOptions {
|
||||
archived_only: false,
|
||||
allowed_sources: &[],
|
||||
model_providers: None,
|
||||
anchor: None,
|
||||
sort_key: SortKey::UpdatedAt,
|
||||
sort_direction: SortDirection::Desc,
|
||||
search_term: None,
|
||||
},
|
||||
)
|
||||
.await
|
||||
{
|
||||
|
||||
@@ -7,6 +7,7 @@ pub use codex_rollout::RolloutRecorder;
|
||||
pub use codex_rollout::RolloutRecorderParams;
|
||||
pub use codex_rollout::SESSIONS_SUBDIR;
|
||||
pub use codex_rollout::SessionMeta;
|
||||
pub use codex_rollout::SortDirection;
|
||||
pub use codex_rollout::ThreadItem;
|
||||
pub use codex_rollout::ThreadSortKey;
|
||||
pub use codex_rollout::ThreadsPage;
|
||||
|
||||
@@ -178,6 +178,7 @@ impl AppServerClient {
|
||||
cursor,
|
||||
limit: None,
|
||||
sort_key: None,
|
||||
sort_direction: None,
|
||||
model_providers: None,
|
||||
source_kinds: None,
|
||||
archived: None,
|
||||
|
||||
@@ -1226,6 +1226,7 @@ async fn resolve_resume_thread_id(
|
||||
cursor,
|
||||
limit: Some(100),
|
||||
sort_key: Some(ThreadSortKey::UpdatedAt),
|
||||
sort_direction: None,
|
||||
model_providers: model_providers.clone(),
|
||||
source_kinds: Some(all_thread_source_kinds()),
|
||||
archived: Some(false),
|
||||
@@ -1287,6 +1288,7 @@ async fn resolve_resume_thread_id(
|
||||
cursor,
|
||||
limit: Some(100),
|
||||
sort_key: Some(ThreadSortKey::UpdatedAt),
|
||||
sort_direction: None,
|
||||
model_providers: model_providers.clone(),
|
||||
source_kinds: Some(all_thread_source_kinds()),
|
||||
archived: Some(false),
|
||||
|
||||
@@ -34,6 +34,7 @@ pub use config::Config;
|
||||
pub use config::RolloutConfig;
|
||||
pub use config::RolloutConfigView;
|
||||
pub use list::Cursor;
|
||||
pub use list::SortDirection;
|
||||
pub use list::ThreadItem;
|
||||
pub use list::ThreadListConfig;
|
||||
pub use list::ThreadListLayout;
|
||||
|
||||
@@ -112,6 +112,12 @@ pub enum ThreadSortKey {
|
||||
UpdatedAt,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
pub enum SortDirection {
|
||||
Asc,
|
||||
Desc,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
pub enum ThreadListLayout {
|
||||
NestedByDate,
|
||||
@@ -136,6 +142,10 @@ impl Cursor {
|
||||
fn new(ts: OffsetDateTime, id: Uuid) -> Self {
|
||||
Self { ts, id }
|
||||
}
|
||||
|
||||
pub(crate) fn parts(&self) -> (OffsetDateTime, Uuid) {
|
||||
(self.ts, self.id)
|
||||
}
|
||||
}
|
||||
|
||||
/// Keeps track of where a paginated listing left off. As the file scan goes newest -> oldest,
|
||||
|
||||
@@ -32,6 +32,7 @@ use tracing::warn;
|
||||
use super::ARCHIVED_SESSIONS_SUBDIR;
|
||||
use super::SESSIONS_SUBDIR;
|
||||
use super::list::Cursor;
|
||||
use super::list::SortDirection;
|
||||
use super::list::ThreadItem;
|
||||
use super::list::ThreadListConfig;
|
||||
use super::list::ThreadListLayout;
|
||||
@@ -219,6 +220,7 @@ impl RolloutRecorder {
|
||||
page_size: usize,
|
||||
cursor: Option<&Cursor>,
|
||||
sort_key: ThreadSortKey,
|
||||
sort_direction: SortDirection,
|
||||
allowed_sources: &[SessionSource],
|
||||
model_providers: Option<&[String]>,
|
||||
default_provider: &str,
|
||||
@@ -229,6 +231,7 @@ impl RolloutRecorder {
|
||||
page_size,
|
||||
cursor,
|
||||
sort_key,
|
||||
sort_direction,
|
||||
allowed_sources,
|
||||
model_providers,
|
||||
default_provider,
|
||||
@@ -245,6 +248,7 @@ impl RolloutRecorder {
|
||||
page_size: usize,
|
||||
cursor: Option<&Cursor>,
|
||||
sort_key: ThreadSortKey,
|
||||
sort_direction: SortDirection,
|
||||
allowed_sources: &[SessionSource],
|
||||
model_providers: Option<&[String]>,
|
||||
default_provider: &str,
|
||||
@@ -255,6 +259,7 @@ impl RolloutRecorder {
|
||||
page_size,
|
||||
cursor,
|
||||
sort_key,
|
||||
sort_direction,
|
||||
allowed_sources,
|
||||
model_providers,
|
||||
default_provider,
|
||||
@@ -270,6 +275,7 @@ impl RolloutRecorder {
|
||||
page_size: usize,
|
||||
cursor: Option<&Cursor>,
|
||||
sort_key: ThreadSortKey,
|
||||
sort_direction: SortDirection,
|
||||
allowed_sources: &[SessionSource],
|
||||
model_providers: Option<&[String]>,
|
||||
default_provider: &str,
|
||||
@@ -286,6 +292,7 @@ impl RolloutRecorder {
|
||||
page_size,
|
||||
cursor,
|
||||
sort_key,
|
||||
sort_direction,
|
||||
allowed_sources,
|
||||
model_providers,
|
||||
archived,
|
||||
@@ -300,38 +307,43 @@ impl RolloutRecorder {
|
||||
// Filesystem-first listing intentionally overfetches so we can repair stale/missing
|
||||
// SQLite rollout paths before the final DB-backed page is returned.
|
||||
let fs_page_size = page_size.saturating_mul(2).max(page_size);
|
||||
let fs_page = if archived {
|
||||
let root = codex_home.join(ARCHIVED_SESSIONS_SUBDIR);
|
||||
get_threads_in_root(
|
||||
root,
|
||||
fs_page_size,
|
||||
cursor,
|
||||
sort_key,
|
||||
ThreadListConfig {
|
||||
let fs_page = match sort_direction {
|
||||
SortDirection::Asc => {
|
||||
list_threads_from_files_asc(
|
||||
codex_home,
|
||||
page_size,
|
||||
cursor,
|
||||
sort_key,
|
||||
allowed_sources,
|
||||
model_providers,
|
||||
default_provider,
|
||||
layout: ThreadListLayout::Flat,
|
||||
},
|
||||
)
|
||||
.await?
|
||||
} else {
|
||||
get_threads(
|
||||
codex_home,
|
||||
fs_page_size,
|
||||
cursor,
|
||||
sort_key,
|
||||
allowed_sources,
|
||||
model_providers,
|
||||
default_provider,
|
||||
)
|
||||
.await?
|
||||
archived,
|
||||
search_term,
|
||||
)
|
||||
.await?
|
||||
}
|
||||
SortDirection::Desc => {
|
||||
list_threads_from_files_desc(
|
||||
codex_home,
|
||||
fs_page_size,
|
||||
cursor,
|
||||
sort_key,
|
||||
allowed_sources,
|
||||
model_providers,
|
||||
default_provider,
|
||||
archived,
|
||||
)
|
||||
.await?
|
||||
}
|
||||
};
|
||||
|
||||
if state_db_ctx.is_none() {
|
||||
// Keep legacy behavior when SQLite is unavailable: return filesystem results
|
||||
// at the requested page size.
|
||||
return Ok(truncate_fs_page(fs_page, page_size, sort_key));
|
||||
return Ok(match sort_direction {
|
||||
SortDirection::Asc => fs_page,
|
||||
SortDirection::Desc => truncate_fs_page(fs_page, page_size, sort_key),
|
||||
});
|
||||
}
|
||||
|
||||
// Warm the DB by repairing every filesystem hit before querying SQLite.
|
||||
@@ -351,6 +363,7 @@ impl RolloutRecorder {
|
||||
page_size,
|
||||
cursor,
|
||||
sort_key,
|
||||
sort_direction,
|
||||
allowed_sources,
|
||||
model_providers,
|
||||
archived,
|
||||
@@ -363,7 +376,10 @@ impl RolloutRecorder {
|
||||
// If SQLite listing still fails, return the filesystem page rather than failing the list.
|
||||
tracing::error!("Falling back on rollout system");
|
||||
tracing::warn!("state db discrepancy during list_threads_with_db_fallback: falling_back");
|
||||
Ok(truncate_fs_page(fs_page, page_size, sort_key))
|
||||
Ok(match sort_direction {
|
||||
SortDirection::Asc => fs_page,
|
||||
SortDirection::Desc => truncate_fs_page(fs_page, page_size, sort_key),
|
||||
})
|
||||
}
|
||||
|
||||
/// Find the newest recorded thread path, optionally filtering to a matching cwd.
|
||||
@@ -389,6 +405,7 @@ impl RolloutRecorder {
|
||||
page_size,
|
||||
db_cursor.as_ref(),
|
||||
sort_key,
|
||||
SortDirection::Desc,
|
||||
allowed_sources,
|
||||
model_providers,
|
||||
/*archived*/ false,
|
||||
@@ -772,6 +789,131 @@ fn truncate_fs_page(
|
||||
page
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
async fn list_threads_from_files_desc(
|
||||
codex_home: &Path,
|
||||
page_size: usize,
|
||||
cursor: Option<&Cursor>,
|
||||
sort_key: ThreadSortKey,
|
||||
allowed_sources: &[SessionSource],
|
||||
model_providers: Option<&[String]>,
|
||||
default_provider: &str,
|
||||
archived: bool,
|
||||
) -> std::io::Result<ThreadsPage> {
|
||||
if archived {
|
||||
let root = codex_home.join(ARCHIVED_SESSIONS_SUBDIR);
|
||||
get_threads_in_root(
|
||||
root,
|
||||
page_size,
|
||||
cursor,
|
||||
sort_key,
|
||||
ThreadListConfig {
|
||||
allowed_sources,
|
||||
model_providers,
|
||||
default_provider,
|
||||
layout: ThreadListLayout::Flat,
|
||||
},
|
||||
)
|
||||
.await
|
||||
} else {
|
||||
get_threads(
|
||||
codex_home,
|
||||
page_size,
|
||||
cursor,
|
||||
sort_key,
|
||||
allowed_sources,
|
||||
model_providers,
|
||||
default_provider,
|
||||
)
|
||||
.await
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
async fn list_threads_from_files_asc(
|
||||
codex_home: &Path,
|
||||
page_size: usize,
|
||||
cursor: Option<&Cursor>,
|
||||
sort_key: ThreadSortKey,
|
||||
allowed_sources: &[SessionSource],
|
||||
model_providers: Option<&[String]>,
|
||||
default_provider: &str,
|
||||
archived: bool,
|
||||
_search_term: Option<&str>,
|
||||
) -> std::io::Result<ThreadsPage> {
|
||||
let mut all_items = Vec::new();
|
||||
let mut scanned_files = 0usize;
|
||||
let mut reached_scan_cap = false;
|
||||
let mut page_cursor = None;
|
||||
let scan_page_size = page_size.saturating_mul(8).clamp(256, 2048);
|
||||
loop {
|
||||
let page = list_threads_from_files_desc(
|
||||
codex_home,
|
||||
scan_page_size,
|
||||
page_cursor.as_ref(),
|
||||
sort_key,
|
||||
allowed_sources,
|
||||
model_providers,
|
||||
default_provider,
|
||||
archived,
|
||||
)
|
||||
.await?;
|
||||
scanned_files = scanned_files.saturating_add(page.num_scanned_files);
|
||||
reached_scan_cap |= page.reached_scan_cap;
|
||||
all_items.extend(page.items);
|
||||
page_cursor = page.next_cursor;
|
||||
if page_cursor.is_none() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
all_items.sort_by_key(|item| thread_item_sort_key(item, sort_key));
|
||||
if let Some(cursor) = cursor {
|
||||
let anchor = cursor.parts();
|
||||
all_items
|
||||
.retain(|item| thread_item_sort_key(item, sort_key).is_some_and(|key| key > anchor));
|
||||
}
|
||||
|
||||
let more_matches_available = all_items.len() > page_size || reached_scan_cap;
|
||||
all_items.truncate(page_size);
|
||||
let next_cursor = if more_matches_available {
|
||||
all_items
|
||||
.last()
|
||||
.and_then(|item| cursor_from_thread_item(item, sort_key))
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
Ok(ThreadsPage {
|
||||
items: all_items,
|
||||
next_cursor,
|
||||
num_scanned_files: scanned_files,
|
||||
reached_scan_cap,
|
||||
})
|
||||
}
|
||||
|
||||
fn thread_item_sort_key(
|
||||
item: &ThreadItem,
|
||||
sort_key: ThreadSortKey,
|
||||
) -> Option<(OffsetDateTime, uuid::Uuid)> {
|
||||
let file_name = item.path.file_name()?.to_str()?;
|
||||
let (created_at, id) = parse_timestamp_uuid_from_filename(file_name)?;
|
||||
let timestamp = match sort_key {
|
||||
ThreadSortKey::CreatedAt => created_at,
|
||||
ThreadSortKey::UpdatedAt => {
|
||||
let updated_at = item.updated_at.as_deref().or(item.created_at.as_deref())?;
|
||||
OffsetDateTime::parse(updated_at, &Rfc3339).ok()?
|
||||
}
|
||||
};
|
||||
Some((timestamp, id))
|
||||
}
|
||||
|
||||
fn cursor_from_thread_item(item: &ThreadItem, sort_key: ThreadSortKey) -> Option<Cursor> {
|
||||
let (timestamp, id) = thread_item_sort_key(item, sort_key)?;
|
||||
let cursor_token = format!("{}|{id}", timestamp.format(&Rfc3339).ok()?);
|
||||
parse_cursor(cursor_token.as_str())
|
||||
}
|
||||
|
||||
struct LogFileInfo {
|
||||
/// Full path to the rollout file.
|
||||
path: PathBuf,
|
||||
|
||||
@@ -372,6 +372,7 @@ async fn list_threads_db_disabled_does_not_skip_paginated_items() -> std::io::Re
|
||||
/*page_size*/ 1,
|
||||
/*cursor*/ None,
|
||||
ThreadSortKey::CreatedAt,
|
||||
SortDirection::Desc,
|
||||
&[],
|
||||
/*model_providers*/ None,
|
||||
default_provider.as_str(),
|
||||
@@ -387,6 +388,7 @@ async fn list_threads_db_disabled_does_not_skip_paginated_items() -> std::io::Re
|
||||
/*page_size*/ 1,
|
||||
Some(&cursor),
|
||||
ThreadSortKey::CreatedAt,
|
||||
SortDirection::Desc,
|
||||
&[],
|
||||
/*model_providers*/ None,
|
||||
default_provider.as_str(),
|
||||
@@ -444,6 +446,7 @@ async fn list_threads_db_enabled_drops_missing_rollout_paths() -> std::io::Resul
|
||||
/*page_size*/ 10,
|
||||
/*cursor*/ None,
|
||||
ThreadSortKey::CreatedAt,
|
||||
SortDirection::Desc,
|
||||
&[],
|
||||
/*model_providers*/ None,
|
||||
default_provider.as_str(),
|
||||
@@ -506,6 +509,7 @@ async fn list_threads_db_enabled_repairs_stale_rollout_paths() -> std::io::Resul
|
||||
/*page_size*/ 1,
|
||||
/*cursor*/ None,
|
||||
ThreadSortKey::CreatedAt,
|
||||
SortDirection::Desc,
|
||||
&[],
|
||||
/*model_providers*/ None,
|
||||
default_provider.as_str(),
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
use crate::config::RolloutConfig;
|
||||
use crate::config::RolloutConfigView;
|
||||
use crate::list::Cursor;
|
||||
use crate::list::SortDirection;
|
||||
use crate::list::ThreadSortKey;
|
||||
use crate::metadata;
|
||||
use chrono::DateTime;
|
||||
@@ -202,6 +203,7 @@ pub async fn list_threads_db(
|
||||
page_size: usize,
|
||||
cursor: Option<&Cursor>,
|
||||
sort_key: ThreadSortKey,
|
||||
sort_direction: SortDirection,
|
||||
allowed_sources: &[SessionSource],
|
||||
model_providers: Option<&[String]>,
|
||||
archived: bool,
|
||||
@@ -229,15 +231,21 @@ pub async fn list_threads_db(
|
||||
match ctx
|
||||
.list_threads(
|
||||
page_size,
|
||||
anchor.as_ref(),
|
||||
match sort_key {
|
||||
ThreadSortKey::CreatedAt => codex_state::SortKey::CreatedAt,
|
||||
ThreadSortKey::UpdatedAt => codex_state::SortKey::UpdatedAt,
|
||||
codex_state::ThreadFilterOptions {
|
||||
archived_only: archived,
|
||||
allowed_sources: allowed_sources.as_slice(),
|
||||
model_providers: model_providers.as_deref(),
|
||||
anchor: anchor.as_ref(),
|
||||
sort_key: match sort_key {
|
||||
ThreadSortKey::CreatedAt => codex_state::SortKey::CreatedAt,
|
||||
ThreadSortKey::UpdatedAt => codex_state::SortKey::UpdatedAt,
|
||||
},
|
||||
sort_direction: match sort_direction {
|
||||
SortDirection::Asc => codex_state::SortDirection::Asc,
|
||||
SortDirection::Desc => codex_state::SortDirection::Desc,
|
||||
},
|
||||
search_term,
|
||||
},
|
||||
allowed_sources.as_slice(),
|
||||
model_providers.as_deref(),
|
||||
archived,
|
||||
search_term,
|
||||
)
|
||||
.await
|
||||
{
|
||||
|
||||
@@ -37,6 +37,7 @@ pub use model::BackfillStats;
|
||||
pub use model::BackfillStatus;
|
||||
pub use model::DirectionalThreadSpawnEdgeStatus;
|
||||
pub use model::ExtractionOutcome;
|
||||
pub use model::SortDirection;
|
||||
pub use model::SortKey;
|
||||
pub use model::Stage1JobClaim;
|
||||
pub use model::Stage1JobClaimOutcome;
|
||||
@@ -47,6 +48,7 @@ pub use model::ThreadMetadata;
|
||||
pub use model::ThreadMetadataBuilder;
|
||||
pub use model::ThreadsPage;
|
||||
pub use runtime::RemoteControlEnrollmentRecord;
|
||||
pub use runtime::ThreadFilterOptions;
|
||||
pub use runtime::logs_db_filename;
|
||||
pub use runtime::logs_db_path;
|
||||
pub use runtime::state_db_filename;
|
||||
|
||||
@@ -28,6 +28,7 @@ pub use memories::Stage1StartupClaimParams;
|
||||
pub use thread_metadata::Anchor;
|
||||
pub use thread_metadata::BackfillStats;
|
||||
pub use thread_metadata::ExtractionOutcome;
|
||||
pub use thread_metadata::SortDirection;
|
||||
pub use thread_metadata::SortKey;
|
||||
pub use thread_metadata::ThreadMetadata;
|
||||
pub use thread_metadata::ThreadMetadataBuilder;
|
||||
|
||||
@@ -21,6 +21,13 @@ pub enum SortKey {
|
||||
UpdatedAt,
|
||||
}
|
||||
|
||||
/// Sort direction to use when listing threads.
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
pub enum SortDirection {
|
||||
Asc,
|
||||
Desc,
|
||||
}
|
||||
|
||||
/// A pagination anchor used for keyset pagination.
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub struct Anchor {
|
||||
|
||||
@@ -60,6 +60,7 @@ mod test_support;
|
||||
mod threads;
|
||||
|
||||
pub use remote_control::RemoteControlEnrollmentRecord;
|
||||
pub use threads::ThreadFilterOptions;
|
||||
|
||||
// "Partition" is the retained-log-content bucket we cap at 10 MiB:
|
||||
// - one bucket per non-null thread_id
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
use super::threads::ThreadFilterOptions;
|
||||
use super::threads::push_thread_filters;
|
||||
use super::threads::push_thread_order_and_limit;
|
||||
use super::*;
|
||||
use crate::SortDirection;
|
||||
use crate::model::Phase2InputSelection;
|
||||
use crate::model::Phase2JobClaimOutcome;
|
||||
use crate::model::Stage1JobClaim;
|
||||
@@ -198,12 +200,15 @@ LEFT JOIN jobs
|
||||
);
|
||||
push_thread_filters(
|
||||
&mut builder,
|
||||
/*archived_only*/ false,
|
||||
allowed_sources,
|
||||
/*model_providers*/ None,
|
||||
/*anchor*/ None,
|
||||
SortKey::UpdatedAt,
|
||||
/*search_term*/ None,
|
||||
ThreadFilterOptions {
|
||||
archived_only: false,
|
||||
allowed_sources,
|
||||
model_providers: None,
|
||||
anchor: None,
|
||||
sort_key: SortKey::UpdatedAt,
|
||||
sort_direction: SortDirection::Desc,
|
||||
search_term: None,
|
||||
},
|
||||
);
|
||||
builder.push(" AND threads.memory_mode = 'enabled'");
|
||||
builder
|
||||
@@ -215,7 +220,12 @@ LEFT JOIN jobs
|
||||
builder.push(" AND updated_at <= ").push_bind(idle_cutoff);
|
||||
builder.push(" AND COALESCE(stage1_outputs.source_updated_at, -1) < updated_at");
|
||||
builder.push(" AND COALESCE(jobs.last_success_watermark, -1) < updated_at");
|
||||
push_thread_order_and_limit(&mut builder, SortKey::UpdatedAt, scan_limit);
|
||||
push_thread_order_and_limit(
|
||||
&mut builder,
|
||||
SortKey::UpdatedAt,
|
||||
SortDirection::Desc,
|
||||
scan_limit,
|
||||
);
|
||||
|
||||
let items = builder
|
||||
.build()
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
use super::*;
|
||||
use crate::SortDirection;
|
||||
use codex_protocol::protocol::SessionSource;
|
||||
|
||||
impl StateRuntime {
|
||||
@@ -366,12 +367,15 @@ FROM threads
|
||||
);
|
||||
push_thread_filters(
|
||||
&mut builder,
|
||||
archived_only,
|
||||
allowed_sources,
|
||||
model_providers,
|
||||
/*anchor*/ None,
|
||||
crate::SortKey::UpdatedAt,
|
||||
/*search_term*/ None,
|
||||
ThreadFilterOptions {
|
||||
archived_only,
|
||||
allowed_sources,
|
||||
model_providers,
|
||||
anchor: None,
|
||||
sort_key: crate::SortKey::UpdatedAt,
|
||||
sort_direction: crate::SortDirection::Desc,
|
||||
search_term: None,
|
||||
},
|
||||
);
|
||||
builder.push(" AND title = ");
|
||||
builder.push_bind(title);
|
||||
@@ -379,7 +383,12 @@ FROM threads
|
||||
builder.push(" AND cwd = ");
|
||||
builder.push_bind(cwd.display().to_string());
|
||||
}
|
||||
push_thread_order_and_limit(&mut builder, crate::SortKey::UpdatedAt, /*limit*/ 1);
|
||||
push_thread_order_and_limit(
|
||||
&mut builder,
|
||||
crate::SortKey::UpdatedAt,
|
||||
crate::SortDirection::Desc,
|
||||
/*limit*/ 1,
|
||||
);
|
||||
|
||||
let row = builder.build().fetch_optional(self.pool.as_ref()).await?;
|
||||
row.map(|row| ThreadRow::try_from_row(&row).and_then(crate::ThreadMetadata::try_from))
|
||||
@@ -387,18 +396,14 @@ FROM threads
|
||||
}
|
||||
|
||||
/// List threads using the underlying database.
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub async fn list_threads(
|
||||
&self,
|
||||
page_size: usize,
|
||||
anchor: Option<&crate::Anchor>,
|
||||
sort_key: crate::SortKey,
|
||||
allowed_sources: &[String],
|
||||
model_providers: Option<&[String]>,
|
||||
archived_only: bool,
|
||||
search_term: Option<&str>,
|
||||
filters: ThreadFilterOptions<'_>,
|
||||
) -> anyhow::Result<crate::ThreadsPage> {
|
||||
let limit = page_size.saturating_add(1);
|
||||
let sort_key = filters.sort_key;
|
||||
let sort_direction = filters.sort_direction;
|
||||
|
||||
let mut builder = QueryBuilder::<Sqlite>::new(
|
||||
r#"
|
||||
@@ -428,16 +433,8 @@ SELECT
|
||||
FROM threads
|
||||
"#,
|
||||
);
|
||||
push_thread_filters(
|
||||
&mut builder,
|
||||
archived_only,
|
||||
allowed_sources,
|
||||
model_providers,
|
||||
anchor,
|
||||
sort_key,
|
||||
search_term,
|
||||
);
|
||||
push_thread_order_and_limit(&mut builder, sort_key, limit);
|
||||
push_thread_filters(&mut builder, filters);
|
||||
push_thread_order_and_limit(&mut builder, sort_key, sort_direction, limit);
|
||||
|
||||
let rows = builder.build().fetch_all(self.pool.as_ref()).await?;
|
||||
let mut items = rows
|
||||
@@ -473,14 +470,17 @@ FROM threads
|
||||
let mut builder = QueryBuilder::<Sqlite>::new("SELECT id FROM threads");
|
||||
push_thread_filters(
|
||||
&mut builder,
|
||||
archived_only,
|
||||
allowed_sources,
|
||||
model_providers,
|
||||
anchor,
|
||||
sort_key,
|
||||
/*search_term*/ None,
|
||||
ThreadFilterOptions {
|
||||
archived_only,
|
||||
allowed_sources,
|
||||
model_providers,
|
||||
anchor,
|
||||
sort_key,
|
||||
sort_direction: SortDirection::Desc,
|
||||
search_term: None,
|
||||
},
|
||||
);
|
||||
push_thread_order_and_limit(&mut builder, sort_key, limit);
|
||||
push_thread_order_and_limit(&mut builder, sort_key, SortDirection::Desc, limit);
|
||||
|
||||
let rows = builder.build().fetch_all(self.pool.as_ref()).await?;
|
||||
rows.into_iter()
|
||||
@@ -941,31 +941,37 @@ fn thread_spawn_parent_thread_id_from_source_str(source: &str) -> Option<ThreadI
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy)]
|
||||
pub struct ThreadFilterOptions<'a> {
|
||||
pub archived_only: bool,
|
||||
pub allowed_sources: &'a [String],
|
||||
pub model_providers: Option<&'a [String]>,
|
||||
pub anchor: Option<&'a crate::Anchor>,
|
||||
pub sort_key: SortKey,
|
||||
pub sort_direction: SortDirection,
|
||||
pub search_term: Option<&'a str>,
|
||||
}
|
||||
|
||||
pub(super) fn push_thread_filters<'a>(
|
||||
builder: &mut QueryBuilder<'a, Sqlite>,
|
||||
archived_only: bool,
|
||||
allowed_sources: &'a [String],
|
||||
model_providers: Option<&'a [String]>,
|
||||
anchor: Option<&crate::Anchor>,
|
||||
sort_key: SortKey,
|
||||
search_term: Option<&'a str>,
|
||||
filters: ThreadFilterOptions<'a>,
|
||||
) {
|
||||
builder.push(" WHERE 1 = 1");
|
||||
if archived_only {
|
||||
if filters.archived_only {
|
||||
builder.push(" AND archived = 1");
|
||||
} else {
|
||||
builder.push(" AND archived = 0");
|
||||
}
|
||||
builder.push(" AND first_user_message <> ''");
|
||||
if !allowed_sources.is_empty() {
|
||||
if !filters.allowed_sources.is_empty() {
|
||||
builder.push(" AND source IN (");
|
||||
let mut separated = builder.separated(", ");
|
||||
for source in allowed_sources {
|
||||
for source in filters.allowed_sources {
|
||||
separated.push_bind(source);
|
||||
}
|
||||
separated.push_unseparated(")");
|
||||
}
|
||||
if let Some(model_providers) = model_providers
|
||||
if let Some(model_providers) = filters.model_providers
|
||||
&& !model_providers.is_empty()
|
||||
{
|
||||
builder.push(" AND model_provider IN (");
|
||||
@@ -975,26 +981,34 @@ pub(super) fn push_thread_filters<'a>(
|
||||
}
|
||||
separated.push_unseparated(")");
|
||||
}
|
||||
if let Some(search_term) = search_term {
|
||||
if let Some(search_term) = filters.search_term {
|
||||
builder.push(" AND instr(title, ");
|
||||
builder.push_bind(search_term);
|
||||
builder.push(") > 0");
|
||||
}
|
||||
if let Some(anchor) = anchor {
|
||||
if let Some(anchor) = filters.anchor {
|
||||
let anchor_ts = datetime_to_epoch_seconds(anchor.ts);
|
||||
let column = match sort_key {
|
||||
let column = match filters.sort_key {
|
||||
SortKey::CreatedAt => "created_at",
|
||||
SortKey::UpdatedAt => "updated_at",
|
||||
};
|
||||
let operator = match filters.sort_direction {
|
||||
SortDirection::Asc => ">",
|
||||
SortDirection::Desc => "<",
|
||||
};
|
||||
builder.push(" AND (");
|
||||
builder.push(column);
|
||||
builder.push(" < ");
|
||||
builder.push(" ");
|
||||
builder.push(operator);
|
||||
builder.push(" ");
|
||||
builder.push_bind(anchor_ts);
|
||||
builder.push(" OR (");
|
||||
builder.push(column);
|
||||
builder.push(" = ");
|
||||
builder.push_bind(anchor_ts);
|
||||
builder.push(" AND id < ");
|
||||
builder.push(" AND id ");
|
||||
builder.push(operator);
|
||||
builder.push(" ");
|
||||
builder.push_bind(anchor.id.to_string());
|
||||
builder.push("))");
|
||||
}
|
||||
@@ -1003,15 +1017,23 @@ pub(super) fn push_thread_filters<'a>(
|
||||
pub(super) fn push_thread_order_and_limit(
|
||||
builder: &mut QueryBuilder<'_, Sqlite>,
|
||||
sort_key: SortKey,
|
||||
sort_direction: SortDirection,
|
||||
limit: usize,
|
||||
) {
|
||||
let order_column = match sort_key {
|
||||
SortKey::CreatedAt => "created_at",
|
||||
SortKey::UpdatedAt => "updated_at",
|
||||
};
|
||||
let order_direction = match sort_direction {
|
||||
SortDirection::Asc => "ASC",
|
||||
SortDirection::Desc => "DESC",
|
||||
};
|
||||
builder.push(" ORDER BY ");
|
||||
builder.push(order_column);
|
||||
builder.push(" DESC, id DESC");
|
||||
builder.push(" ");
|
||||
builder.push(order_direction);
|
||||
builder.push(", id ");
|
||||
builder.push(order_direction);
|
||||
builder.push(" LIMIT ");
|
||||
builder.push_bind(limit as i64);
|
||||
}
|
||||
@@ -1019,6 +1041,7 @@ pub(super) fn push_thread_order_and_limit(
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::Anchor;
|
||||
use crate::DirectionalThreadSpawnEdgeStatus;
|
||||
use crate::runtime::test_support::test_thread_metadata;
|
||||
use crate::runtime::test_support::unique_temp_dir;
|
||||
@@ -1029,6 +1052,7 @@ mod tests {
|
||||
use codex_protocol::protocol::SessionSource;
|
||||
use pretty_assertions::assert_eq;
|
||||
use std::path::PathBuf;
|
||||
use uuid::Uuid;
|
||||
|
||||
#[tokio::test]
|
||||
async fn upsert_thread_keeps_creation_memory_mode_for_existing_rows() {
|
||||
@@ -1068,6 +1092,89 @@ mod tests {
|
||||
assert_eq!(memory_mode, "disabled");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn list_threads_updated_after_returns_oldest_changes_first() {
|
||||
let codex_home = unique_temp_dir();
|
||||
let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string())
|
||||
.await
|
||||
.expect("state db should initialize");
|
||||
let older_id =
|
||||
ThreadId::from_string("00000000-0000-0000-0000-000000000001").expect("valid thread id");
|
||||
let same_second_lower_id =
|
||||
ThreadId::from_string("00000000-0000-0000-0000-000000000002").expect("valid thread id");
|
||||
let same_second_higher_id =
|
||||
ThreadId::from_string("00000000-0000-0000-0000-000000000003").expect("valid thread id");
|
||||
let older_updated_at =
|
||||
DateTime::<Utc>::from_timestamp(1_700_000_100, 0).expect("valid older timestamp");
|
||||
let newer_updated_at =
|
||||
DateTime::<Utc>::from_timestamp(1_700_000_200, 0).expect("valid newer timestamp");
|
||||
|
||||
for (thread_id, updated_at) in [
|
||||
(older_id, older_updated_at),
|
||||
(same_second_higher_id, newer_updated_at),
|
||||
(same_second_lower_id, newer_updated_at),
|
||||
] {
|
||||
let mut metadata = test_thread_metadata(&codex_home, thread_id, codex_home.clone());
|
||||
metadata.updated_at = updated_at;
|
||||
metadata.first_user_message = Some("hello".to_string());
|
||||
runtime
|
||||
.upsert_thread(&metadata)
|
||||
.await
|
||||
.expect("thread insert should succeed");
|
||||
}
|
||||
|
||||
let anchor = Anchor {
|
||||
ts: older_updated_at,
|
||||
id: Uuid::parse_str(&older_id.to_string()).expect("valid uuid"),
|
||||
};
|
||||
let model_providers = ["test-provider".to_string()];
|
||||
let page = runtime
|
||||
.list_threads(
|
||||
/*page_size*/ 1,
|
||||
ThreadFilterOptions {
|
||||
archived_only: false,
|
||||
allowed_sources: &[],
|
||||
model_providers: Some(&model_providers),
|
||||
anchor: Some(&anchor),
|
||||
sort_key: SortKey::UpdatedAt,
|
||||
sort_direction: SortDirection::Asc,
|
||||
search_term: None,
|
||||
},
|
||||
)
|
||||
.await
|
||||
.expect("list should succeed");
|
||||
|
||||
let ids = page.items.iter().map(|item| item.id).collect::<Vec<_>>();
|
||||
assert_eq!(ids, vec![same_second_lower_id]);
|
||||
assert_eq!(
|
||||
page.next_anchor,
|
||||
Some(Anchor {
|
||||
ts: newer_updated_at,
|
||||
id: Uuid::parse_str(&same_second_lower_id.to_string()).expect("valid uuid"),
|
||||
})
|
||||
);
|
||||
|
||||
let page = runtime
|
||||
.list_threads(
|
||||
/*page_size*/ 1,
|
||||
ThreadFilterOptions {
|
||||
archived_only: false,
|
||||
allowed_sources: &[],
|
||||
model_providers: Some(&model_providers),
|
||||
anchor: page.next_anchor.as_ref(),
|
||||
sort_key: SortKey::UpdatedAt,
|
||||
sort_direction: SortDirection::Asc,
|
||||
search_term: None,
|
||||
},
|
||||
)
|
||||
.await
|
||||
.expect("second page should succeed");
|
||||
|
||||
let ids = page.items.iter().map(|item| item.id).collect::<Vec<_>>();
|
||||
assert_eq!(ids, vec![same_second_higher_id]);
|
||||
assert_eq!(page.next_anchor, None);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn apply_rollout_items_restores_memory_mode_from_session_meta() {
|
||||
let codex_home = unique_temp_dir();
|
||||
|
||||
@@ -506,6 +506,7 @@ async fn lookup_session_target_by_name_with_app_server(
|
||||
cursor: cursor.clone(),
|
||||
limit: Some(100),
|
||||
sort_key: Some(AppServerThreadSortKey::UpdatedAt),
|
||||
sort_direction: None,
|
||||
model_providers: None,
|
||||
source_kinds: Some(vec![ThreadSourceKind::Cli, ThreadSourceKind::VsCode]),
|
||||
archived: Some(false),
|
||||
@@ -601,6 +602,7 @@ fn latest_session_lookup_params(
|
||||
cursor: None,
|
||||
limit: Some(1),
|
||||
sort_key: Some(AppServerThreadSortKey::UpdatedAt),
|
||||
sort_direction: None,
|
||||
model_providers: if is_remote {
|
||||
None
|
||||
} else {
|
||||
|
||||
@@ -335,6 +335,7 @@ fn spawn_rollout_page_loader(
|
||||
PAGE_SIZE,
|
||||
cursor,
|
||||
request.sort_key,
|
||||
codex_rollout::SortDirection::Desc,
|
||||
INTERACTIVE_SESSION_SOURCES.as_slice(),
|
||||
default_provider.as_ref().map(std::slice::from_ref),
|
||||
default_provider.as_deref().unwrap_or_default(),
|
||||
@@ -1135,6 +1136,7 @@ fn thread_list_params(
|
||||
ThreadSortKey::CreatedAt => AppServerThreadSortKey::CreatedAt,
|
||||
ThreadSortKey::UpdatedAt => AppServerThreadSortKey::UpdatedAt,
|
||||
}),
|
||||
sort_direction: None,
|
||||
model_providers: match provider_filter {
|
||||
ProviderFilter::Any => None,
|
||||
ProviderFilter::MatchDefault(default_provider) => Some(vec![default_provider]),
|
||||
|
||||
Reference in New Issue
Block a user