mirror of
https://github.com/openai/codex.git
synced 2026-06-02 19:31:59 +00:00
Compare commits
1 Commits
pr22327
...
ddr/items-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
bd45013a04 |
1
codex-rs/Cargo.lock
generated
1
codex-rs/Cargo.lock
generated
@@ -3184,6 +3184,7 @@ dependencies = [
|
||||
"anyhow",
|
||||
"async-trait",
|
||||
"chrono",
|
||||
"codex-app-server-protocol",
|
||||
"codex-file-search",
|
||||
"codex-git-utils",
|
||||
"codex-login",
|
||||
|
||||
@@ -3393,6 +3393,44 @@
|
||||
],
|
||||
"type": "object"
|
||||
},
|
||||
"ThreadItemsListParams": {
|
||||
"properties": {
|
||||
"cursor": {
|
||||
"description": "Opaque cursor to pass to the next call to continue after the last item.",
|
||||
"type": [
|
||||
"string",
|
||||
"null"
|
||||
]
|
||||
},
|
||||
"limit": {
|
||||
"description": "Optional item page size.",
|
||||
"format": "uint32",
|
||||
"minimum": 0.0,
|
||||
"type": [
|
||||
"integer",
|
||||
"null"
|
||||
]
|
||||
},
|
||||
"sortDirection": {
|
||||
"anyOf": [
|
||||
{
|
||||
"$ref": "#/definitions/SortDirection"
|
||||
},
|
||||
{
|
||||
"type": "null"
|
||||
}
|
||||
],
|
||||
"description": "Optional item pagination direction; defaults to descending."
|
||||
},
|
||||
"threadId": {
|
||||
"type": "string"
|
||||
}
|
||||
},
|
||||
"required": [
|
||||
"threadId"
|
||||
],
|
||||
"type": "object"
|
||||
},
|
||||
"ThreadListCwdFilter": {
|
||||
"anyOf": [
|
||||
{
|
||||
@@ -4807,6 +4845,30 @@
|
||||
"title": "Thread/turns/listRequest",
|
||||
"type": "object"
|
||||
},
|
||||
{
|
||||
"properties": {
|
||||
"id": {
|
||||
"$ref": "#/definitions/RequestId"
|
||||
},
|
||||
"method": {
|
||||
"enum": [
|
||||
"thread/items/list"
|
||||
],
|
||||
"title": "Thread/items/listRequestMethod",
|
||||
"type": "string"
|
||||
},
|
||||
"params": {
|
||||
"$ref": "#/definitions/ThreadItemsListParams"
|
||||
}
|
||||
},
|
||||
"required": [
|
||||
"id",
|
||||
"method",
|
||||
"params"
|
||||
],
|
||||
"title": "Thread/items/listRequest",
|
||||
"type": "object"
|
||||
},
|
||||
{
|
||||
"description": "Append raw Responses API items to the thread history without starting a user turn.",
|
||||
"properties": {
|
||||
|
||||
@@ -592,6 +592,30 @@
|
||||
"title": "Thread/turns/listRequest",
|
||||
"type": "object"
|
||||
},
|
||||
{
|
||||
"properties": {
|
||||
"id": {
|
||||
"$ref": "#/definitions/v2/RequestId"
|
||||
},
|
||||
"method": {
|
||||
"enum": [
|
||||
"thread/items/list"
|
||||
],
|
||||
"title": "Thread/items/listRequestMethod",
|
||||
"type": "string"
|
||||
},
|
||||
"params": {
|
||||
"$ref": "#/definitions/v2/ThreadItemsListParams"
|
||||
}
|
||||
},
|
||||
"required": [
|
||||
"id",
|
||||
"method",
|
||||
"params"
|
||||
],
|
||||
"title": "Thread/items/listRequest",
|
||||
"type": "object"
|
||||
},
|
||||
{
|
||||
"description": "Append raw Responses API items to the thread history without starting a user turn.",
|
||||
"properties": {
|
||||
@@ -14508,6 +14532,59 @@
|
||||
"title": "ThreadForkResponse",
|
||||
"type": "object"
|
||||
},
|
||||
"ThreadHistoryItem": {
|
||||
"properties": {
|
||||
"item": {
|
||||
"$ref": "#/definitions/v2/ThreadItem"
|
||||
},
|
||||
"turnCompletedAt": {
|
||||
"description": "Unix timestamp (in seconds) when the turn completed.",
|
||||
"format": "int64",
|
||||
"type": [
|
||||
"integer",
|
||||
"null"
|
||||
]
|
||||
},
|
||||
"turnDurationMs": {
|
||||
"description": "Duration between turn start and completion in milliseconds, if known.",
|
||||
"format": "int64",
|
||||
"type": [
|
||||
"integer",
|
||||
"null"
|
||||
]
|
||||
},
|
||||
"turnError": {
|
||||
"anyOf": [
|
||||
{
|
||||
"$ref": "#/definitions/v2/TurnError"
|
||||
},
|
||||
{
|
||||
"type": "null"
|
||||
}
|
||||
]
|
||||
},
|
||||
"turnId": {
|
||||
"type": "string"
|
||||
},
|
||||
"turnStartedAt": {
|
||||
"description": "Unix timestamp (in seconds) when the turn started.",
|
||||
"format": "int64",
|
||||
"type": [
|
||||
"integer",
|
||||
"null"
|
||||
]
|
||||
},
|
||||
"turnStatus": {
|
||||
"$ref": "#/definitions/v2/TurnStatus"
|
||||
}
|
||||
},
|
||||
"required": [
|
||||
"item",
|
||||
"turnId",
|
||||
"turnStatus"
|
||||
],
|
||||
"type": "object"
|
||||
},
|
||||
"ThreadId": {
|
||||
"type": "string"
|
||||
},
|
||||
@@ -15195,6 +15272,76 @@
|
||||
}
|
||||
]
|
||||
},
|
||||
"ThreadItemsListParams": {
|
||||
"$schema": "http://json-schema.org/draft-07/schema#",
|
||||
"properties": {
|
||||
"cursor": {
|
||||
"description": "Opaque cursor to pass to the next call to continue after the last item.",
|
||||
"type": [
|
||||
"string",
|
||||
"null"
|
||||
]
|
||||
},
|
||||
"limit": {
|
||||
"description": "Optional item page size.",
|
||||
"format": "uint32",
|
||||
"minimum": 0.0,
|
||||
"type": [
|
||||
"integer",
|
||||
"null"
|
||||
]
|
||||
},
|
||||
"sortDirection": {
|
||||
"anyOf": [
|
||||
{
|
||||
"$ref": "#/definitions/v2/SortDirection"
|
||||
},
|
||||
{
|
||||
"type": "null"
|
||||
}
|
||||
],
|
||||
"description": "Optional item pagination direction; defaults to descending."
|
||||
},
|
||||
"threadId": {
|
||||
"type": "string"
|
||||
}
|
||||
},
|
||||
"required": [
|
||||
"threadId"
|
||||
],
|
||||
"title": "ThreadItemsListParams",
|
||||
"type": "object"
|
||||
},
|
||||
"ThreadItemsListResponse": {
|
||||
"$schema": "http://json-schema.org/draft-07/schema#",
|
||||
"properties": {
|
||||
"backwardsCursor": {
|
||||
"description": "Opaque cursor to pass as `cursor` when reversing `sortDirection`. This is only populated when the page contains at least one item. Use it with the opposite `sortDirection` to include the anchor item again and catch updates to that item.",
|
||||
"type": [
|
||||
"string",
|
||||
"null"
|
||||
]
|
||||
},
|
||||
"data": {
|
||||
"items": {
|
||||
"$ref": "#/definitions/v2/ThreadHistoryItem"
|
||||
},
|
||||
"type": "array"
|
||||
},
|
||||
"nextCursor": {
|
||||
"description": "Opaque cursor to pass to the next call to continue after the last item. if None, there are no more items to return.",
|
||||
"type": [
|
||||
"string",
|
||||
"null"
|
||||
]
|
||||
}
|
||||
},
|
||||
"required": [
|
||||
"data"
|
||||
],
|
||||
"title": "ThreadItemsListResponse",
|
||||
"type": "object"
|
||||
},
|
||||
"ThreadListCwdFilter": {
|
||||
"anyOf": [
|
||||
{
|
||||
|
||||
@@ -1283,6 +1283,30 @@
|
||||
"title": "Thread/turns/listRequest",
|
||||
"type": "object"
|
||||
},
|
||||
{
|
||||
"properties": {
|
||||
"id": {
|
||||
"$ref": "#/definitions/RequestId"
|
||||
},
|
||||
"method": {
|
||||
"enum": [
|
||||
"thread/items/list"
|
||||
],
|
||||
"title": "Thread/items/listRequestMethod",
|
||||
"type": "string"
|
||||
},
|
||||
"params": {
|
||||
"$ref": "#/definitions/ThreadItemsListParams"
|
||||
}
|
||||
},
|
||||
"required": [
|
||||
"id",
|
||||
"method",
|
||||
"params"
|
||||
],
|
||||
"title": "Thread/items/listRequest",
|
||||
"type": "object"
|
||||
},
|
||||
{
|
||||
"description": "Append raw Responses API items to the thread history without starting a user turn.",
|
||||
"properties": {
|
||||
@@ -12395,6 +12419,59 @@
|
||||
"title": "ThreadForkResponse",
|
||||
"type": "object"
|
||||
},
|
||||
"ThreadHistoryItem": {
|
||||
"properties": {
|
||||
"item": {
|
||||
"$ref": "#/definitions/ThreadItem"
|
||||
},
|
||||
"turnCompletedAt": {
|
||||
"description": "Unix timestamp (in seconds) when the turn completed.",
|
||||
"format": "int64",
|
||||
"type": [
|
||||
"integer",
|
||||
"null"
|
||||
]
|
||||
},
|
||||
"turnDurationMs": {
|
||||
"description": "Duration between turn start and completion in milliseconds, if known.",
|
||||
"format": "int64",
|
||||
"type": [
|
||||
"integer",
|
||||
"null"
|
||||
]
|
||||
},
|
||||
"turnError": {
|
||||
"anyOf": [
|
||||
{
|
||||
"$ref": "#/definitions/TurnError"
|
||||
},
|
||||
{
|
||||
"type": "null"
|
||||
}
|
||||
]
|
||||
},
|
||||
"turnId": {
|
||||
"type": "string"
|
||||
},
|
||||
"turnStartedAt": {
|
||||
"description": "Unix timestamp (in seconds) when the turn started.",
|
||||
"format": "int64",
|
||||
"type": [
|
||||
"integer",
|
||||
"null"
|
||||
]
|
||||
},
|
||||
"turnStatus": {
|
||||
"$ref": "#/definitions/TurnStatus"
|
||||
}
|
||||
},
|
||||
"required": [
|
||||
"item",
|
||||
"turnId",
|
||||
"turnStatus"
|
||||
],
|
||||
"type": "object"
|
||||
},
|
||||
"ThreadId": {
|
||||
"type": "string"
|
||||
},
|
||||
@@ -13082,6 +13159,76 @@
|
||||
}
|
||||
]
|
||||
},
|
||||
"ThreadItemsListParams": {
|
||||
"$schema": "http://json-schema.org/draft-07/schema#",
|
||||
"properties": {
|
||||
"cursor": {
|
||||
"description": "Opaque cursor to pass to the next call to continue after the last item.",
|
||||
"type": [
|
||||
"string",
|
||||
"null"
|
||||
]
|
||||
},
|
||||
"limit": {
|
||||
"description": "Optional item page size.",
|
||||
"format": "uint32",
|
||||
"minimum": 0.0,
|
||||
"type": [
|
||||
"integer",
|
||||
"null"
|
||||
]
|
||||
},
|
||||
"sortDirection": {
|
||||
"anyOf": [
|
||||
{
|
||||
"$ref": "#/definitions/SortDirection"
|
||||
},
|
||||
{
|
||||
"type": "null"
|
||||
}
|
||||
],
|
||||
"description": "Optional item pagination direction; defaults to descending."
|
||||
},
|
||||
"threadId": {
|
||||
"type": "string"
|
||||
}
|
||||
},
|
||||
"required": [
|
||||
"threadId"
|
||||
],
|
||||
"title": "ThreadItemsListParams",
|
||||
"type": "object"
|
||||
},
|
||||
"ThreadItemsListResponse": {
|
||||
"$schema": "http://json-schema.org/draft-07/schema#",
|
||||
"properties": {
|
||||
"backwardsCursor": {
|
||||
"description": "Opaque cursor to pass as `cursor` when reversing `sortDirection`. This is only populated when the page contains at least one item. Use it with the opposite `sortDirection` to include the anchor item again and catch updates to that item.",
|
||||
"type": [
|
||||
"string",
|
||||
"null"
|
||||
]
|
||||
},
|
||||
"data": {
|
||||
"items": {
|
||||
"$ref": "#/definitions/ThreadHistoryItem"
|
||||
},
|
||||
"type": "array"
|
||||
},
|
||||
"nextCursor": {
|
||||
"description": "Opaque cursor to pass to the next call to continue after the last item. if None, there are no more items to return.",
|
||||
"type": [
|
||||
"string",
|
||||
"null"
|
||||
]
|
||||
}
|
||||
},
|
||||
"required": [
|
||||
"data"
|
||||
],
|
||||
"title": "ThreadItemsListResponse",
|
||||
"type": "object"
|
||||
},
|
||||
"ThreadListCwdFilter": {
|
||||
"anyOf": [
|
||||
{
|
||||
|
||||
49
codex-rs/app-server-protocol/schema/json/v2/ThreadItemsListParams.json
generated
Normal file
49
codex-rs/app-server-protocol/schema/json/v2/ThreadItemsListParams.json
generated
Normal file
@@ -0,0 +1,49 @@
|
||||
{
|
||||
"$schema": "http://json-schema.org/draft-07/schema#",
|
||||
"definitions": {
|
||||
"SortDirection": {
|
||||
"enum": [
|
||||
"asc",
|
||||
"desc"
|
||||
],
|
||||
"type": "string"
|
||||
}
|
||||
},
|
||||
"properties": {
|
||||
"cursor": {
|
||||
"description": "Opaque cursor to pass to the next call to continue after the last item.",
|
||||
"type": [
|
||||
"string",
|
||||
"null"
|
||||
]
|
||||
},
|
||||
"limit": {
|
||||
"description": "Optional item page size.",
|
||||
"format": "uint32",
|
||||
"minimum": 0.0,
|
||||
"type": [
|
||||
"integer",
|
||||
"null"
|
||||
]
|
||||
},
|
||||
"sortDirection": {
|
||||
"anyOf": [
|
||||
{
|
||||
"$ref": "#/definitions/SortDirection"
|
||||
},
|
||||
{
|
||||
"type": "null"
|
||||
}
|
||||
],
|
||||
"description": "Optional item pagination direction; defaults to descending."
|
||||
},
|
||||
"threadId": {
|
||||
"type": "string"
|
||||
}
|
||||
},
|
||||
"required": [
|
||||
"threadId"
|
||||
],
|
||||
"title": "ThreadItemsListParams",
|
||||
"type": "object"
|
||||
}
|
||||
1633
codex-rs/app-server-protocol/schema/json/v2/ThreadItemsListResponse.json
generated
Normal file
1633
codex-rs/app-server-protocol/schema/json/v2/ThreadItemsListResponse.json
generated
Normal file
File diff suppressed because it is too large
Load Diff
File diff suppressed because one or more lines are too long
20
codex-rs/app-server-protocol/schema/typescript/v2/ThreadHistoryItem.ts
generated
Normal file
20
codex-rs/app-server-protocol/schema/typescript/v2/ThreadHistoryItem.ts
generated
Normal file
@@ -0,0 +1,20 @@
|
||||
// GENERATED CODE! DO NOT MODIFY BY HAND!
|
||||
|
||||
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
|
||||
import type { ThreadItem } from "./ThreadItem";
|
||||
import type { TurnError } from "./TurnError";
|
||||
import type { TurnStatus } from "./TurnStatus";
|
||||
|
||||
export type ThreadHistoryItem = { turnId: string, item: ThreadItem, turnStatus: TurnStatus, turnError: TurnError | null,
|
||||
/**
|
||||
* Unix timestamp (in seconds) when the turn started.
|
||||
*/
|
||||
turnStartedAt: number | null,
|
||||
/**
|
||||
* Unix timestamp (in seconds) when the turn completed.
|
||||
*/
|
||||
turnCompletedAt: number | null,
|
||||
/**
|
||||
* Duration between turn start and completion in milliseconds, if known.
|
||||
*/
|
||||
turnDurationMs: number | null, };
|
||||
18
codex-rs/app-server-protocol/schema/typescript/v2/ThreadItemsListParams.ts
generated
Normal file
18
codex-rs/app-server-protocol/schema/typescript/v2/ThreadItemsListParams.ts
generated
Normal file
@@ -0,0 +1,18 @@
|
||||
// GENERATED CODE! DO NOT MODIFY BY HAND!
|
||||
|
||||
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
|
||||
import type { SortDirection } from "./SortDirection";
|
||||
|
||||
export type ThreadItemsListParams = { threadId: string,
|
||||
/**
|
||||
* Opaque cursor to pass to the next call to continue after the last item.
|
||||
*/
|
||||
cursor?: string | null,
|
||||
/**
|
||||
* Optional item page size.
|
||||
*/
|
||||
limit?: number | null,
|
||||
/**
|
||||
* Optional item pagination direction; defaults to descending.
|
||||
*/
|
||||
sortDirection?: SortDirection | null, };
|
||||
18
codex-rs/app-server-protocol/schema/typescript/v2/ThreadItemsListResponse.ts
generated
Normal file
18
codex-rs/app-server-protocol/schema/typescript/v2/ThreadItemsListResponse.ts
generated
Normal file
@@ -0,0 +1,18 @@
|
||||
// GENERATED CODE! DO NOT MODIFY BY HAND!
|
||||
|
||||
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
|
||||
import type { ThreadHistoryItem } from "./ThreadHistoryItem";
|
||||
|
||||
export type ThreadItemsListResponse = { data: Array<ThreadHistoryItem>,
|
||||
/**
|
||||
* Opaque cursor to pass to the next call to continue after the last item.
|
||||
* if None, there are no more items to return.
|
||||
*/
|
||||
nextCursor: string | null,
|
||||
/**
|
||||
* Opaque cursor to pass as `cursor` when reversing `sortDirection`.
|
||||
* This is only populated when the page contains at least one item.
|
||||
* Use it with the opposite `sortDirection` to include the anchor item again
|
||||
* and catch updates to that item.
|
||||
*/
|
||||
backwardsCursor: string | null, };
|
||||
@@ -324,9 +324,12 @@ export type { ThreadCompactStartParams } from "./ThreadCompactStartParams";
|
||||
export type { ThreadCompactStartResponse } from "./ThreadCompactStartResponse";
|
||||
export type { ThreadForkParams } from "./ThreadForkParams";
|
||||
export type { ThreadForkResponse } from "./ThreadForkResponse";
|
||||
export type { ThreadHistoryItem } from "./ThreadHistoryItem";
|
||||
export type { ThreadInjectItemsParams } from "./ThreadInjectItemsParams";
|
||||
export type { ThreadInjectItemsResponse } from "./ThreadInjectItemsResponse";
|
||||
export type { ThreadItem } from "./ThreadItem";
|
||||
export type { ThreadItemsListParams } from "./ThreadItemsListParams";
|
||||
export type { ThreadItemsListResponse } from "./ThreadItemsListResponse";
|
||||
export type { ThreadListParams } from "./ThreadListParams";
|
||||
export type { ThreadListResponse } from "./ThreadListResponse";
|
||||
export type { ThreadLoadedListParams } from "./ThreadLoadedListParams";
|
||||
|
||||
@@ -340,6 +340,10 @@ client_request_definitions! {
|
||||
params: v2::ThreadTurnsListParams,
|
||||
response: v2::ThreadTurnsListResponse,
|
||||
},
|
||||
ThreadItemsList => "thread/items/list" {
|
||||
params: v2::ThreadItemsListParams,
|
||||
response: v2::ThreadItemsListResponse,
|
||||
},
|
||||
/// Append raw Responses API items to the thread history without starting a user turn.
|
||||
ThreadInjectItems => "thread/inject_items" {
|
||||
params: v2::ThreadInjectItemsParams,
|
||||
|
||||
@@ -4036,6 +4036,56 @@ pub struct ThreadTurnsListResponse {
|
||||
pub backwards_cursor: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
#[ts(export_to = "v2/")]
|
||||
pub struct ThreadItemsListParams {
|
||||
pub thread_id: String,
|
||||
/// Opaque cursor to pass to the next call to continue after the last item.
|
||||
#[ts(optional = nullable)]
|
||||
pub cursor: Option<String>,
|
||||
/// Optional item page size.
|
||||
#[ts(optional = nullable)]
|
||||
pub limit: Option<u32>,
|
||||
/// Optional item pagination direction; defaults to descending.
|
||||
#[ts(optional = nullable)]
|
||||
pub sort_direction: Option<SortDirection>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
#[ts(export_to = "v2/")]
|
||||
pub struct ThreadHistoryItem {
|
||||
pub turn_id: String,
|
||||
pub item: ThreadItem,
|
||||
pub turn_status: TurnStatus,
|
||||
pub turn_error: Option<TurnError>,
|
||||
/// Unix timestamp (in seconds) when the turn started.
|
||||
#[ts(type = "number | null")]
|
||||
pub turn_started_at: Option<i64>,
|
||||
/// Unix timestamp (in seconds) when the turn completed.
|
||||
#[ts(type = "number | null")]
|
||||
pub turn_completed_at: Option<i64>,
|
||||
/// Duration between turn start and completion in milliseconds, if known.
|
||||
#[ts(type = "number | null")]
|
||||
pub turn_duration_ms: Option<i64>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
#[ts(export_to = "v2/")]
|
||||
pub struct ThreadItemsListResponse {
|
||||
pub data: Vec<ThreadHistoryItem>,
|
||||
/// Opaque cursor to pass to the next call to continue after the last item.
|
||||
/// if None, there are no more items to return.
|
||||
pub next_cursor: Option<String>,
|
||||
/// Opaque cursor to pass as `cursor` when reversing `sortDirection`.
|
||||
/// This is only populated when the page contains at least one item.
|
||||
/// Use it with the opposite `sortDirection` to include the anchor item again
|
||||
/// and catch updates to that item.
|
||||
pub backwards_cursor: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
#[ts(export_to = "v2/")]
|
||||
|
||||
@@ -143,6 +143,7 @@ Example with notification opt-out:
|
||||
- `thread/loaded/list` — list the thread ids currently loaded in memory.
|
||||
- `thread/read` — read a stored thread by id without resuming it; optionally include turns via `includeTurns`. The returned `thread` includes `status` (`ThreadStatus`), defaulting to `notLoaded` when the thread is not currently loaded.
|
||||
- `thread/turns/list` — page through a stored thread’s turn history without resuming it; supports cursor-based pagination with `sortDirection`, `nextCursor`, and `backwardsCursor`.
|
||||
- `thread/items/list` — page through sqlite-backed stored thread items without resuming it; returns only the lightweight renderable/searchable item subset used for scrolling UIs, along with per-item turn metadata and cursor-based pagination.
|
||||
- `thread/metadata/update` — patch stored thread metadata in sqlite; currently supports updating persisted `gitInfo` fields and returns the refreshed `thread`.
|
||||
- `thread/memoryMode/set` — experimental; set a thread’s persisted memory eligibility to `"enabled"` or `"disabled"` for either a loaded thread or a stored rollout; returns `{}` on success.
|
||||
- `memory/reset` — experimental; clear the current `CODEX_HOME/memories` directory and reset persisted memory stage data in sqlite while preserving existing thread memory modes; returns `{}` on success.
|
||||
@@ -418,6 +419,23 @@ Use `thread/turns/list` to page a stored thread’s turn history without resumin
|
||||
} }
|
||||
```
|
||||
|
||||
### Example: List thread items
|
||||
|
||||
Use `thread/items/list` to page a stored thread’s lightweight item history from sqlite. This excludes heavy tool execution payloads and is intended for scrolling/search-oriented UIs.
|
||||
|
||||
```json
|
||||
{ "method": "thread/items/list", "id": 25, "params": {
|
||||
"threadId": "thr_123",
|
||||
"limit": 100,
|
||||
"sortDirection": "desc"
|
||||
} }
|
||||
{ "id": 25, "result": {
|
||||
"data": [ ... ],
|
||||
"nextCursor": "older-items-cursor-or-null",
|
||||
"backwardsCursor": "newer-items-cursor-or-null"
|
||||
} }
|
||||
```
|
||||
|
||||
### Example: Update stored thread metadata
|
||||
|
||||
Use `thread/metadata/update` to patch sqlite-backed metadata for a thread without resuming it. Today this supports persisted `gitInfo`; omitted fields are left unchanged, while explicit `null` clears a stored value.
|
||||
|
||||
@@ -148,11 +148,14 @@ use codex_app_server_protocol::ThreadDecrementElicitationParams;
|
||||
use codex_app_server_protocol::ThreadDecrementElicitationResponse;
|
||||
use codex_app_server_protocol::ThreadForkParams;
|
||||
use codex_app_server_protocol::ThreadForkResponse;
|
||||
use codex_app_server_protocol::ThreadHistoryItem;
|
||||
use codex_app_server_protocol::ThreadIncrementElicitationParams;
|
||||
use codex_app_server_protocol::ThreadIncrementElicitationResponse;
|
||||
use codex_app_server_protocol::ThreadInjectItemsParams;
|
||||
use codex_app_server_protocol::ThreadInjectItemsResponse;
|
||||
use codex_app_server_protocol::ThreadItem;
|
||||
use codex_app_server_protocol::ThreadItemsListParams;
|
||||
use codex_app_server_protocol::ThreadItemsListResponse;
|
||||
use codex_app_server_protocol::ThreadListCwdFilter;
|
||||
use codex_app_server_protocol::ThreadListParams;
|
||||
use codex_app_server_protocol::ThreadListResponse;
|
||||
@@ -340,6 +343,7 @@ use codex_rmcp_client::perform_oauth_login_return_url;
|
||||
use codex_rollout::state_db::StateDbHandle;
|
||||
use codex_rollout::state_db::get_state_db;
|
||||
use codex_rollout::state_db::reconcile_rollout;
|
||||
use codex_rollout::state_db::sync_renderable_thread_items;
|
||||
use codex_state::StateRuntime;
|
||||
use codex_state::ThreadMetadata;
|
||||
use codex_state::ThreadMetadataBuilder;
|
||||
@@ -956,6 +960,10 @@ impl CodexMessageProcessor {
|
||||
self.thread_turns_list(to_connection_request_id(request_id), params)
|
||||
.await;
|
||||
}
|
||||
ClientRequest::ThreadItemsList { request_id, params } => {
|
||||
self.thread_items_list(to_connection_request_id(request_id), params)
|
||||
.await;
|
||||
}
|
||||
ClientRequest::ThreadShellCommand { request_id, params } => {
|
||||
self.thread_shell_command(to_connection_request_id(request_id), params)
|
||||
.await;
|
||||
@@ -4354,6 +4362,178 @@ impl CodexMessageProcessor {
|
||||
}
|
||||
}
|
||||
|
||||
async fn thread_items_list(
|
||||
&self,
|
||||
request_id: ConnectionRequestId,
|
||||
params: ThreadItemsListParams,
|
||||
) {
|
||||
let ThreadItemsListParams {
|
||||
thread_id,
|
||||
cursor,
|
||||
limit,
|
||||
sort_direction,
|
||||
} = params;
|
||||
|
||||
let thread_uuid = match ThreadId::from_string(&thread_id) {
|
||||
Ok(id) => id,
|
||||
Err(err) => {
|
||||
self.send_invalid_request_error(request_id, format!("invalid thread id: {err}"))
|
||||
.await;
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
let Some(state_db_ctx) = open_state_db_for_direct_thread_lookup(&self.config).await else {
|
||||
self.send_internal_error(
|
||||
request_id,
|
||||
format!("sqlite state db unavailable for thread {thread_uuid}"),
|
||||
)
|
||||
.await;
|
||||
return;
|
||||
};
|
||||
|
||||
let page_size = limit
|
||||
.map(|value| value as usize)
|
||||
.unwrap_or(THREAD_TURNS_DEFAULT_LIMIT)
|
||||
.clamp(1, THREAD_TURNS_MAX_LIMIT);
|
||||
let sort_direction = sort_direction.unwrap_or(SortDirection::Desc);
|
||||
let anchor = match cursor.as_deref() {
|
||||
Some(cursor) => match parse_thread_items_anchor(cursor) {
|
||||
Ok(anchor) => Some(anchor),
|
||||
Err(error) => {
|
||||
self.outgoing.send_error(request_id, error).await;
|
||||
return;
|
||||
}
|
||||
},
|
||||
None => None,
|
||||
};
|
||||
|
||||
let mut page = match state_db_ctx
|
||||
.list_thread_items(
|
||||
&thread_uuid.to_string(),
|
||||
page_size,
|
||||
anchor.as_ref(),
|
||||
match sort_direction {
|
||||
SortDirection::Asc => codex_state::SortDirection::Asc,
|
||||
SortDirection::Desc => codex_state::SortDirection::Desc,
|
||||
},
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(page) => page,
|
||||
Err(err) => {
|
||||
self.send_internal_error(
|
||||
request_id,
|
||||
format!("failed to query sqlite thread items for {thread_uuid}: {err}"),
|
||||
)
|
||||
.await;
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
let mut rollout_path = None;
|
||||
if page.items.is_empty() {
|
||||
rollout_path = self
|
||||
.resolve_rollout_path(thread_uuid, Some(&state_db_ctx))
|
||||
.await;
|
||||
if rollout_path.is_none() {
|
||||
rollout_path = match find_thread_path_by_id_str(
|
||||
&self.config.codex_home,
|
||||
&thread_uuid.to_string(),
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(Some(path)) => Some(path),
|
||||
Ok(None) => match find_archived_thread_path_by_id_str(
|
||||
&self.config.codex_home,
|
||||
&thread_uuid.to_string(),
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(path) => path,
|
||||
Err(err) => {
|
||||
self.send_invalid_request_error(
|
||||
request_id,
|
||||
format!("failed to locate archived thread id {thread_uuid}: {err}"),
|
||||
)
|
||||
.await;
|
||||
return;
|
||||
}
|
||||
},
|
||||
Err(err) => {
|
||||
self.send_invalid_request_error(
|
||||
request_id,
|
||||
format!("failed to locate thread id {thread_uuid}: {err}"),
|
||||
)
|
||||
.await;
|
||||
return;
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
if page.items.is_empty()
|
||||
&& let Some(rollout_path) = rollout_path
|
||||
{
|
||||
sync_renderable_thread_items(
|
||||
Some(state_db_ctx.as_ref()),
|
||||
thread_uuid,
|
||||
rollout_path.as_path(),
|
||||
"thread_items_list",
|
||||
)
|
||||
.await;
|
||||
page = match state_db_ctx
|
||||
.list_thread_items(
|
||||
&thread_uuid.to_string(),
|
||||
page_size,
|
||||
anchor.as_ref(),
|
||||
match sort_direction {
|
||||
SortDirection::Asc => codex_state::SortDirection::Asc,
|
||||
SortDirection::Desc => codex_state::SortDirection::Desc,
|
||||
},
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(page) => page,
|
||||
Err(err) => {
|
||||
self.send_internal_error(
|
||||
request_id,
|
||||
format!(
|
||||
"failed to query repaired sqlite thread items for {thread_uuid}: {err}"
|
||||
),
|
||||
)
|
||||
.await;
|
||||
return;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
let backwards_cursor = page
|
||||
.items
|
||||
.first()
|
||||
.and_then(|item| thread_items_backwards_cursor(item.item_at, sort_direction));
|
||||
let data = match page
|
||||
.items
|
||||
.into_iter()
|
||||
.map(thread_item_record_to_history_item)
|
||||
.collect::<Result<Vec<_>, JSONRPCErrorError>>()
|
||||
{
|
||||
Ok(data) => data,
|
||||
Err(error) => {
|
||||
self.outgoing.send_error(request_id, error).await;
|
||||
return;
|
||||
}
|
||||
};
|
||||
let response = ThreadItemsListResponse {
|
||||
data,
|
||||
next_cursor: page
|
||||
.next_anchor
|
||||
.map(|anchor| anchor.ts.to_rfc3339_opts(SecondsFormat::Millis, true)),
|
||||
backwards_cursor,
|
||||
};
|
||||
self.outgoing.send_response(request_id, response).await;
|
||||
}
|
||||
|
||||
pub(crate) fn thread_created_receiver(&self) -> broadcast::Receiver<ThreadId> {
|
||||
self.thread_manager.subscribe_thread_created()
|
||||
}
|
||||
@@ -10027,6 +10207,75 @@ fn thread_backwards_cursor_for_sort_key(
|
||||
Some(timestamp.to_rfc3339_opts(SecondsFormat::Millis, true))
|
||||
}
|
||||
|
||||
fn parse_thread_items_anchor(cursor: &str) -> Result<codex_state::Anchor, JSONRPCErrorError> {
|
||||
let Some(ts) = parse_datetime(Some(cursor)) else {
|
||||
return Err(JSONRPCErrorError {
|
||||
code: INVALID_REQUEST_ERROR_CODE,
|
||||
message: format!("invalid cursor: {cursor}"),
|
||||
data: None,
|
||||
});
|
||||
};
|
||||
Ok(codex_state::Anchor { ts })
|
||||
}
|
||||
|
||||
fn thread_items_backwards_cursor(
|
||||
item_at: DateTime<Utc>,
|
||||
sort_direction: SortDirection,
|
||||
) -> Option<String> {
|
||||
let timestamp = match sort_direction {
|
||||
SortDirection::Asc => item_at.checked_add_signed(ChronoDuration::milliseconds(1))?,
|
||||
SortDirection::Desc => item_at.checked_sub_signed(ChronoDuration::milliseconds(1))?,
|
||||
};
|
||||
Some(timestamp.to_rfc3339_opts(SecondsFormat::Millis, true))
|
||||
}
|
||||
|
||||
fn thread_item_record_to_history_item(
|
||||
record: codex_state::ThreadItemRecord,
|
||||
) -> Result<ThreadHistoryItem, JSONRPCErrorError> {
|
||||
let item = serde_json::from_str::<ThreadItem>(&record.payload_json).map_err(|err| {
|
||||
JSONRPCErrorError {
|
||||
code: INTERNAL_ERROR_CODE,
|
||||
message: format!(
|
||||
"failed to deserialize persisted thread item {}: {err}",
|
||||
record.item_id
|
||||
),
|
||||
data: None,
|
||||
}
|
||||
})?;
|
||||
let turn_status =
|
||||
serde_json::from_value::<TurnStatus>(serde_json::Value::String(record.turn_status.clone()))
|
||||
.map_err(|err| JSONRPCErrorError {
|
||||
code: INTERNAL_ERROR_CODE,
|
||||
message: format!(
|
||||
"failed to deserialize persisted turn status {}: {err}",
|
||||
record.turn_status
|
||||
),
|
||||
data: None,
|
||||
})?;
|
||||
let turn_error = record
|
||||
.turn_error_json
|
||||
.as_deref()
|
||||
.map(serde_json::from_str::<TurnError>)
|
||||
.transpose()
|
||||
.map_err(|err| JSONRPCErrorError {
|
||||
code: INTERNAL_ERROR_CODE,
|
||||
message: format!(
|
||||
"failed to deserialize persisted turn error for {}: {err}",
|
||||
record.item_id
|
||||
),
|
||||
data: None,
|
||||
})?;
|
||||
Ok(ThreadHistoryItem {
|
||||
turn_id: record.turn_id,
|
||||
item,
|
||||
turn_status,
|
||||
turn_error,
|
||||
turn_started_at: record.turn_started_at,
|
||||
turn_completed_at: record.turn_completed_at,
|
||||
turn_duration_ms: record.turn_duration_ms,
|
||||
})
|
||||
}
|
||||
|
||||
struct ThreadTurnsPage {
|
||||
turns: Vec<Turn>,
|
||||
next_cursor: Option<String>,
|
||||
|
||||
@@ -66,6 +66,7 @@ use codex_app_server_protocol::ThreadArchiveParams;
|
||||
use codex_app_server_protocol::ThreadCompactStartParams;
|
||||
use codex_app_server_protocol::ThreadForkParams;
|
||||
use codex_app_server_protocol::ThreadInjectItemsParams;
|
||||
use codex_app_server_protocol::ThreadItemsListParams;
|
||||
use codex_app_server_protocol::ThreadListParams;
|
||||
use codex_app_server_protocol::ThreadLoadedListParams;
|
||||
use codex_app_server_protocol::ThreadMemoryModeSetParams;
|
||||
@@ -486,6 +487,15 @@ impl McpProcess {
|
||||
self.send_request("thread/turns/list", params).await
|
||||
}
|
||||
|
||||
/// Send a `thread/items/list` JSON-RPC request.
|
||||
pub async fn send_thread_items_list_request(
|
||||
&mut self,
|
||||
params: ThreadItemsListParams,
|
||||
) -> anyhow::Result<i64> {
|
||||
let params = Some(serde_json::to_value(params)?);
|
||||
self.send_request("thread/items/list", params).await
|
||||
}
|
||||
|
||||
/// Send a `model/list` JSON-RPC request.
|
||||
pub async fn send_list_models_request(
|
||||
&mut self,
|
||||
|
||||
@@ -41,6 +41,7 @@ mod skills_list;
|
||||
mod thread_archive;
|
||||
mod thread_fork;
|
||||
mod thread_inject_items;
|
||||
mod thread_items_list;
|
||||
mod thread_list;
|
||||
mod thread_loaded_list;
|
||||
mod thread_memory_mode_set;
|
||||
|
||||
394
codex-rs/app-server/tests/suite/v2/thread_items_list.rs
Normal file
394
codex-rs/app-server/tests/suite/v2/thread_items_list.rs
Normal file
@@ -0,0 +1,394 @@
|
||||
use anyhow::Result;
|
||||
use app_test_support::McpProcess;
|
||||
use app_test_support::test_absolute_path;
|
||||
use app_test_support::to_response;
|
||||
use codex_app_server_protocol::JSONRPCResponse;
|
||||
use codex_app_server_protocol::RequestId;
|
||||
use codex_app_server_protocol::SortDirection;
|
||||
use codex_app_server_protocol::ThreadHistoryItem;
|
||||
use codex_app_server_protocol::ThreadItem;
|
||||
use codex_app_server_protocol::ThreadItemsListParams;
|
||||
use codex_app_server_protocol::ThreadItemsListResponse;
|
||||
use codex_protocol::protocol::AgentMessageEvent;
|
||||
use codex_protocol::protocol::EventMsg;
|
||||
use codex_protocol::protocol::ExecCommandEndEvent;
|
||||
use codex_protocol::protocol::ExecCommandSource;
|
||||
use codex_protocol::protocol::ExecCommandStatus;
|
||||
use codex_protocol::protocol::ImageGenerationEndEvent;
|
||||
use codex_protocol::protocol::RolloutItem;
|
||||
use codex_protocol::protocol::RolloutLine;
|
||||
use codex_protocol::protocol::SessionMeta;
|
||||
use codex_protocol::protocol::SessionMetaLine;
|
||||
use codex_protocol::protocol::SessionSource;
|
||||
use codex_protocol::protocol::TurnCompleteEvent;
|
||||
use codex_protocol::protocol::TurnStartedEvent;
|
||||
use codex_protocol::protocol::UserMessageEvent;
|
||||
use codex_rollout::state_db::sync_renderable_thread_items;
|
||||
use std::io::Write;
|
||||
use std::path::Path;
|
||||
use std::time::Duration;
|
||||
use tempfile::TempDir;
|
||||
use tokio::time::timeout;
|
||||
use uuid::Uuid;
|
||||
|
||||
#[cfg(windows)]
|
||||
const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(25);
|
||||
#[cfg(not(windows))]
|
||||
const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
|
||||
|
||||
#[tokio::test]
|
||||
async fn thread_items_list_pages_renderable_items_and_skips_command_execution() -> Result<()> {
|
||||
let codex_home = TempDir::new()?;
|
||||
let thread_id = write_thread_rollout(
|
||||
codex_home.path(),
|
||||
&[
|
||||
base_turn_events("turn-1", "first", "assistant one")?,
|
||||
user_turn("turn-2", "second")?,
|
||||
]
|
||||
.concat(),
|
||||
)?;
|
||||
create_config_toml(codex_home.path())?;
|
||||
|
||||
let mut mcp = McpProcess::new(codex_home.path()).await?;
|
||||
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
|
||||
|
||||
let read_id = mcp
|
||||
.send_thread_items_list_request(ThreadItemsListParams {
|
||||
thread_id: thread_id.clone(),
|
||||
cursor: None,
|
||||
limit: Some(2),
|
||||
sort_direction: Some(SortDirection::Desc),
|
||||
})
|
||||
.await?;
|
||||
let read_resp: JSONRPCResponse = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(read_id)),
|
||||
)
|
||||
.await??;
|
||||
let ThreadItemsListResponse {
|
||||
data, next_cursor, ..
|
||||
} = to_response::<ThreadItemsListResponse>(read_resp)?;
|
||||
assert_eq!(history_kinds(&data), vec!["userMessage", "imageGeneration"]);
|
||||
let next_cursor = next_cursor.expect("expected next cursor");
|
||||
|
||||
let read_id = mcp
|
||||
.send_thread_items_list_request(ThreadItemsListParams {
|
||||
thread_id: thread_id.clone(),
|
||||
cursor: Some(next_cursor),
|
||||
limit: Some(10),
|
||||
sort_direction: Some(SortDirection::Desc),
|
||||
})
|
||||
.await?;
|
||||
let read_resp: JSONRPCResponse = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(read_id)),
|
||||
)
|
||||
.await??;
|
||||
let ThreadItemsListResponse { data, .. } = to_response::<ThreadItemsListResponse>(read_resp)?;
|
||||
assert_eq!(history_kinds(&data), vec!["agentMessage", "userMessage"]);
|
||||
|
||||
let read_id = mcp
|
||||
.send_thread_items_list_request(ThreadItemsListParams {
|
||||
thread_id,
|
||||
cursor: None,
|
||||
limit: Some(10),
|
||||
sort_direction: Some(SortDirection::Desc),
|
||||
})
|
||||
.await?;
|
||||
let read_resp: JSONRPCResponse = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(read_id)),
|
||||
)
|
||||
.await??;
|
||||
let ThreadItemsListResponse { data, .. } = to_response::<ThreadItemsListResponse>(read_resp)?;
|
||||
assert_eq!(
|
||||
history_kinds(&data),
|
||||
vec![
|
||||
"userMessage",
|
||||
"imageGeneration",
|
||||
"agentMessage",
|
||||
"userMessage",
|
||||
]
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn thread_items_list_backwards_cursor_includes_anchor_for_newer_items() -> Result<()> {
|
||||
let codex_home = TempDir::new()?;
|
||||
let thread_id = write_thread_rollout(
|
||||
codex_home.path(),
|
||||
&[
|
||||
user_turn("turn-1", "first")?,
|
||||
user_turn("turn-2", "second")?,
|
||||
]
|
||||
.concat(),
|
||||
)?;
|
||||
create_config_toml(codex_home.path())?;
|
||||
|
||||
let rollout_path = rollout_path(codex_home.path(), &thread_id);
|
||||
let mut mcp = McpProcess::new(codex_home.path()).await?;
|
||||
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
|
||||
let state_db =
|
||||
codex_state::StateRuntime::init(codex_home.path().to_path_buf(), "mock_provider".into())
|
||||
.await?;
|
||||
|
||||
let read_id = mcp
|
||||
.send_thread_items_list_request(ThreadItemsListParams {
|
||||
thread_id: thread_id.clone(),
|
||||
cursor: None,
|
||||
limit: Some(1),
|
||||
sort_direction: Some(SortDirection::Desc),
|
||||
})
|
||||
.await?;
|
||||
let read_resp: JSONRPCResponse = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(read_id)),
|
||||
)
|
||||
.await??;
|
||||
let ThreadItemsListResponse {
|
||||
backwards_cursor, ..
|
||||
} = to_response::<ThreadItemsListResponse>(read_resp)?;
|
||||
let backwards_cursor = backwards_cursor.expect("expected backwards cursor");
|
||||
|
||||
append_rollout_events(rollout_path.as_path(), &user_turn("turn-3", "third")?)?;
|
||||
sync_renderable_thread_items(
|
||||
Some(state_db.as_ref()),
|
||||
codex_protocol::ThreadId::from_string(&thread_id)?,
|
||||
rollout_path.as_path(),
|
||||
"thread_items_list_test",
|
||||
)
|
||||
.await;
|
||||
|
||||
let read_id = mcp
|
||||
.send_thread_items_list_request(ThreadItemsListParams {
|
||||
thread_id,
|
||||
cursor: Some(backwards_cursor),
|
||||
limit: Some(10),
|
||||
sort_direction: Some(SortDirection::Asc),
|
||||
})
|
||||
.await?;
|
||||
let read_resp: JSONRPCResponse = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(read_id)),
|
||||
)
|
||||
.await??;
|
||||
let ThreadItemsListResponse { data, .. } = to_response::<ThreadItemsListResponse>(read_resp)?;
|
||||
assert_eq!(user_texts(&data), vec!["second", "third"]);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn base_turn_events(turn_id: &str, user_text: &str, agent_text: &str) -> Result<Vec<RolloutLine>> {
|
||||
let mut events = user_turn(turn_id, user_text)?;
|
||||
events.insert(
|
||||
2,
|
||||
rollout_line(
|
||||
"2025-01-05T12:00:02Z",
|
||||
RolloutItem::EventMsg(EventMsg::AgentMessage(AgentMessageEvent {
|
||||
message: agent_text.to_string(),
|
||||
phase: None,
|
||||
memory_citation: None,
|
||||
})),
|
||||
),
|
||||
);
|
||||
events.insert(
|
||||
3,
|
||||
rollout_line(
|
||||
"2025-01-05T12:00:03Z",
|
||||
RolloutItem::EventMsg(EventMsg::ExecCommandEnd(ExecCommandEndEvent {
|
||||
call_id: "cmd-1".to_string(),
|
||||
process_id: None,
|
||||
turn_id: turn_id.to_string(),
|
||||
command: vec!["echo".to_string(), "secret".to_string()],
|
||||
cwd: test_absolute_path("/"),
|
||||
parsed_cmd: Vec::new(),
|
||||
source: ExecCommandSource::Agent,
|
||||
interaction_input: None,
|
||||
stdout: "secret".to_string(),
|
||||
stderr: String::new(),
|
||||
aggregated_output: "secret".to_string(),
|
||||
exit_code: 0,
|
||||
duration: Duration::from_millis(10),
|
||||
formatted_output: "secret".to_string(),
|
||||
status: ExecCommandStatus::Completed,
|
||||
})),
|
||||
),
|
||||
);
|
||||
events.insert(
|
||||
4,
|
||||
rollout_line(
|
||||
"2025-01-05T12:00:04Z",
|
||||
RolloutItem::EventMsg(EventMsg::ImageGenerationEnd(ImageGenerationEndEvent {
|
||||
call_id: "img-1".to_string(),
|
||||
status: "completed".to_string(),
|
||||
revised_prompt: Some("draw cat".to_string()),
|
||||
result: "https://example.com/generated.png".to_string(),
|
||||
saved_path: None,
|
||||
})),
|
||||
),
|
||||
);
|
||||
Ok(events)
|
||||
}
|
||||
|
||||
fn user_turn(turn_id: &str, text: &str) -> Result<Vec<RolloutLine>> {
|
||||
Ok(vec![
|
||||
rollout_line(
|
||||
"2025-01-05T12:00:00Z",
|
||||
RolloutItem::EventMsg(EventMsg::TurnStarted(TurnStartedEvent {
|
||||
turn_id: turn_id.to_string(),
|
||||
started_at: Some(1_736_078_400),
|
||||
model_context_window: None,
|
||||
collaboration_mode_kind: Default::default(),
|
||||
})),
|
||||
),
|
||||
rollout_line(
|
||||
"2025-01-05T12:00:01Z",
|
||||
RolloutItem::EventMsg(EventMsg::UserMessage(UserMessageEvent {
|
||||
message: text.to_string(),
|
||||
images: Some(Vec::new()),
|
||||
local_images: Vec::new(),
|
||||
text_elements: Vec::new(),
|
||||
})),
|
||||
),
|
||||
rollout_line(
|
||||
"2025-01-05T12:00:05Z",
|
||||
RolloutItem::EventMsg(EventMsg::TurnComplete(TurnCompleteEvent {
|
||||
turn_id: turn_id.to_string(),
|
||||
last_agent_message: None,
|
||||
completed_at: Some(1_736_078_405),
|
||||
duration_ms: Some(5_000),
|
||||
time_to_first_token_ms: None,
|
||||
})),
|
||||
),
|
||||
])
|
||||
}
|
||||
|
||||
fn write_thread_rollout(codex_home: &Path, events: &[RolloutLine]) -> Result<String> {
|
||||
let thread_id = Uuid::now_v7().to_string();
|
||||
let rollout_path = rollout_path(codex_home, &thread_id);
|
||||
let Some(parent) = rollout_path.parent() else {
|
||||
anyhow::bail!(
|
||||
"rollout path should have parent: {}",
|
||||
rollout_path.display()
|
||||
);
|
||||
};
|
||||
std::fs::create_dir_all(parent)?;
|
||||
|
||||
let session_meta = RolloutLine {
|
||||
timestamp: "2025-01-05T12:00:00Z".to_string(),
|
||||
item: RolloutItem::SessionMeta(SessionMetaLine {
|
||||
meta: SessionMeta {
|
||||
id: codex_protocol::ThreadId::from_string(&thread_id)?,
|
||||
forked_from_id: None,
|
||||
timestamp: "2025-01-05T12:00:00Z".to_string(),
|
||||
cwd: test_absolute_path("/").into(),
|
||||
originator: "test".to_string(),
|
||||
cli_version: "0.0.0".to_string(),
|
||||
source: SessionSource::Cli,
|
||||
agent_path: None,
|
||||
agent_nickname: None,
|
||||
agent_role: None,
|
||||
model_provider: Some("mock_provider".to_string()),
|
||||
base_instructions: None,
|
||||
dynamic_tools: None,
|
||||
memory_mode: None,
|
||||
},
|
||||
git: None,
|
||||
}),
|
||||
};
|
||||
let mut lines = vec![session_meta];
|
||||
lines.extend_from_slice(events);
|
||||
let jsonl = lines
|
||||
.iter()
|
||||
.map(serde_json::to_string)
|
||||
.collect::<std::result::Result<Vec<_>, _>>()?
|
||||
.join("\n");
|
||||
std::fs::write(rollout_path, format!("{jsonl}\n"))?;
|
||||
Ok(thread_id)
|
||||
}
|
||||
|
||||
fn append_rollout_events(path: &Path, events: &[RolloutLine]) -> Result<()> {
|
||||
let mut file = std::fs::OpenOptions::new().append(true).open(path)?;
|
||||
for line in events {
|
||||
writeln!(file, "{}", serde_json::to_string(line)?)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn rollout_line(timestamp: &str, item: RolloutItem) -> RolloutLine {
|
||||
RolloutLine {
|
||||
timestamp: timestamp.to_string(),
|
||||
item,
|
||||
}
|
||||
}
|
||||
|
||||
fn rollout_path(codex_home: &Path, thread_id: &str) -> std::path::PathBuf {
|
||||
codex_home.join(format!(
|
||||
"sessions/2025/01/05/rollout-2025-01-05T12-00-00-{thread_id}.jsonl"
|
||||
))
|
||||
}
|
||||
|
||||
fn history_kinds(items: &[ThreadHistoryItem]) -> Vec<&'static str> {
|
||||
items
|
||||
.iter()
|
||||
.map(|item| match item.item {
|
||||
ThreadItem::UserMessage { .. } => "userMessage",
|
||||
ThreadItem::AgentMessage { .. } => "agentMessage",
|
||||
ThreadItem::ImageGeneration { .. } => "imageGeneration",
|
||||
ThreadItem::HookPrompt { .. } => "hookPrompt",
|
||||
ThreadItem::Plan { .. } => "plan",
|
||||
ThreadItem::Reasoning { .. } => "reasoning",
|
||||
ThreadItem::CommandExecution { .. } => "commandExecution",
|
||||
ThreadItem::FileChange { .. } => "fileChange",
|
||||
ThreadItem::McpToolCall { .. } => "mcpToolCall",
|
||||
ThreadItem::DynamicToolCall { .. } => "dynamicToolCall",
|
||||
ThreadItem::CollabAgentToolCall { .. } => "collabAgentToolCall",
|
||||
ThreadItem::WebSearch { .. } => "webSearch",
|
||||
ThreadItem::ImageView { .. } => "imageView",
|
||||
ThreadItem::EnteredReviewMode { .. } => "enteredReviewMode",
|
||||
ThreadItem::ExitedReviewMode { .. } => "exitedReviewMode",
|
||||
ThreadItem::ContextCompaction { .. } => "contextCompaction",
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
fn user_texts(items: &[ThreadHistoryItem]) -> Vec<&str> {
|
||||
items
|
||||
.iter()
|
||||
.filter_map(|item| match &item.item {
|
||||
ThreadItem::UserMessage { content, .. } => match content.first()? {
|
||||
codex_app_server_protocol::UserInput::Text { text, .. } => Some(text.as_str()),
|
||||
codex_app_server_protocol::UserInput::Image { .. }
|
||||
| codex_app_server_protocol::UserInput::LocalImage { .. }
|
||||
| codex_app_server_protocol::UserInput::Skill { .. }
|
||||
| codex_app_server_protocol::UserInput::Mention { .. } => None,
|
||||
},
|
||||
_ => None,
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
fn create_config_toml(codex_home: &Path) -> std::io::Result<()> {
|
||||
let config_toml = codex_home.join("config.toml");
|
||||
std::fs::write(
|
||||
config_toml,
|
||||
r#"
|
||||
model = "mock-model"
|
||||
model_provider = "mock_provider"
|
||||
approval_policy = "never"
|
||||
suppress_unstable_features_warning = true
|
||||
|
||||
[features]
|
||||
sqlite = true
|
||||
|
||||
[model_providers.mock_provider]
|
||||
name = "Mock provider for test"
|
||||
base_url = "http://127.0.0.1:1/v1"
|
||||
wire_api = "responses"
|
||||
request_max_retries = 0
|
||||
stream_max_retries = 0
|
||||
"#,
|
||||
)
|
||||
}
|
||||
@@ -16,6 +16,7 @@ workspace = true
|
||||
anyhow = { workspace = true }
|
||||
async-trait = { workspace = true }
|
||||
chrono = { workspace = true, features = ["serde"] }
|
||||
codex-app-server-protocol = { workspace = true }
|
||||
codex-file-search = { workspace = true }
|
||||
codex-git-utils = { workspace = true }
|
||||
codex-login = { workspace = true }
|
||||
|
||||
@@ -11,6 +11,7 @@ pub(crate) mod policy;
|
||||
pub(crate) mod recorder;
|
||||
pub(crate) mod session_index;
|
||||
pub mod state_db;
|
||||
pub(crate) mod thread_items;
|
||||
|
||||
pub(crate) mod default_client {
|
||||
pub use codex_login::default_client::*;
|
||||
|
||||
@@ -5,6 +5,7 @@ use crate::list;
|
||||
use crate::list::parse_timestamp_uuid_from_filename;
|
||||
use crate::recorder::RolloutRecorder;
|
||||
use crate::state_db::normalize_cwd_for_state_db;
|
||||
use crate::thread_items::build_persisted_thread_items;
|
||||
use chrono::DateTime;
|
||||
use chrono::NaiveDateTime;
|
||||
use chrono::Timelike;
|
||||
@@ -290,6 +291,38 @@ pub(crate) async fn backfill_sessions(
|
||||
rollout.path.display()
|
||||
);
|
||||
}
|
||||
match RolloutRecorder::load_rollout_items(&rollout.path).await {
|
||||
Ok((items, _, _)) => {
|
||||
match build_persisted_thread_items(items.as_slice()) {
|
||||
Ok(persisted_items) => {
|
||||
if let Err(err) = runtime
|
||||
.replace_thread_items(
|
||||
&metadata.id.to_string(),
|
||||
persisted_items.as_slice(),
|
||||
)
|
||||
.await
|
||||
{
|
||||
warn!(
|
||||
"failed to backfill thread items {}: {err}",
|
||||
rollout.path.display()
|
||||
);
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
warn!(
|
||||
"failed to build thread items during backfill {}: {err}",
|
||||
rollout.path.display()
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
warn!(
|
||||
"failed to reload rollout items during backfill {}: {err}",
|
||||
rollout.path.display()
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
|
||||
@@ -1696,6 +1696,15 @@ async fn sync_thread_state_after_write(
|
||||
if state_db::touch_thread_updated_at(state_db_ctx, thread_id, updated_at, "rollout_writer")
|
||||
.await
|
||||
{
|
||||
if let Some(thread_id) = thread_id {
|
||||
state_db::sync_renderable_thread_items(
|
||||
state_db_ctx,
|
||||
thread_id,
|
||||
rollout_path,
|
||||
"rollout_writer",
|
||||
)
|
||||
.await;
|
||||
}
|
||||
return;
|
||||
}
|
||||
state_db::apply_rollout_items(
|
||||
|
||||
@@ -4,6 +4,7 @@ use crate::list::Cursor;
|
||||
use crate::list::SortDirection;
|
||||
use crate::list::ThreadSortKey;
|
||||
use crate::metadata;
|
||||
use crate::thread_items::build_persisted_thread_items;
|
||||
use chrono::DateTime;
|
||||
use chrono::Utc;
|
||||
use codex_protocol::ThreadId;
|
||||
@@ -414,6 +415,7 @@ pub async fn reconcile_rollout(
|
||||
rollout_path.display()
|
||||
);
|
||||
}
|
||||
sync_renderable_thread_items(Some(ctx), metadata.id, rollout_path, "reconcile_rollout").await;
|
||||
}
|
||||
|
||||
/// Repair a thread's rollout path after filesystem fallback succeeds.
|
||||
@@ -521,7 +523,9 @@ pub async fn apply_rollout_items(
|
||||
"state db apply_rollout_items failed during {stage} for {}: {err}",
|
||||
rollout_path.display()
|
||||
);
|
||||
return;
|
||||
}
|
||||
sync_renderable_thread_items(Some(ctx), builder.id, rollout_path, stage).await;
|
||||
}
|
||||
|
||||
pub async fn touch_thread_updated_at(
|
||||
@@ -544,6 +548,47 @@ pub async fn touch_thread_updated_at(
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn sync_renderable_thread_items(
|
||||
context: Option<&codex_state::StateRuntime>,
|
||||
thread_id: ThreadId,
|
||||
rollout_path: &Path,
|
||||
stage: &str,
|
||||
) {
|
||||
let Some(ctx) = context else {
|
||||
return;
|
||||
};
|
||||
let rollout_items =
|
||||
match crate::recorder::RolloutRecorder::load_rollout_items(rollout_path).await {
|
||||
Ok((items, _, _)) => items,
|
||||
Err(err) => {
|
||||
warn!(
|
||||
"state db renderable-item sync failed during {stage} for {}: {err}",
|
||||
rollout_path.display()
|
||||
);
|
||||
return;
|
||||
}
|
||||
};
|
||||
let persisted_items = match build_persisted_thread_items(rollout_items.as_slice()) {
|
||||
Ok(items) => items,
|
||||
Err(err) => {
|
||||
warn!(
|
||||
"state db renderable-item build failed during {stage} for {}: {err}",
|
||||
rollout_path.display()
|
||||
);
|
||||
return;
|
||||
}
|
||||
};
|
||||
if let Err(err) = ctx
|
||||
.replace_thread_items(&thread_id.to_string(), persisted_items.as_slice())
|
||||
.await
|
||||
{
|
||||
warn!(
|
||||
"state db renderable-item replace failed during {stage} for {}: {err}",
|
||||
rollout_path.display()
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
#[path = "state_db_tests.rs"]
|
||||
mod tests;
|
||||
|
||||
168
codex-rs/rollout/src/thread_items.rs
Normal file
168
codex-rs/rollout/src/thread_items.rs
Normal file
@@ -0,0 +1,168 @@
|
||||
use chrono::DateTime;
|
||||
use chrono::Utc;
|
||||
use codex_app_server_protocol::ThreadItem as ApiThreadItem;
|
||||
use codex_app_server_protocol::Turn;
|
||||
use codex_app_server_protocol::TurnError;
|
||||
use codex_app_server_protocol::build_turns_from_rollout_items;
|
||||
use codex_protocol::protocol::RolloutItem;
|
||||
use codex_state::ThreadItemRecordInsert;
|
||||
|
||||
/// Build the lightweight, renderable thread-item subset we persist in SQLite.
|
||||
pub(crate) fn build_persisted_thread_items(
|
||||
items: &[RolloutItem],
|
||||
) -> anyhow::Result<Vec<ThreadItemRecordInsert>> {
|
||||
let turns = build_turns_from_rollout_items(items);
|
||||
let mut persisted = Vec::new();
|
||||
let mut last_item_at_ms = None;
|
||||
|
||||
for turn in turns {
|
||||
for (index, item) in turn.items.iter().enumerate() {
|
||||
if !should_persist_item(item) {
|
||||
continue;
|
||||
}
|
||||
let item_at = item_timestamp_for_turn(&turn, index, last_item_at_ms)?;
|
||||
last_item_at_ms = Some(item_at.timestamp_millis());
|
||||
persisted.push(ThreadItemRecordInsert {
|
||||
turn_id: turn.id.clone(),
|
||||
item_id: item.id().to_string(),
|
||||
item_kind: item_kind(item).to_string(),
|
||||
item_at,
|
||||
turn_status: serde_json::to_value(turn.status.clone())?
|
||||
.as_str()
|
||||
.unwrap_or_default()
|
||||
.to_string(),
|
||||
turn_error_json: serialize_turn_error(turn.error.as_ref())?,
|
||||
turn_started_at: turn.started_at,
|
||||
turn_completed_at: turn.completed_at,
|
||||
turn_duration_ms: turn.duration_ms,
|
||||
search_text: search_text(item),
|
||||
payload_json: serde_json::to_string(item)?,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
Ok(persisted)
|
||||
}
|
||||
|
||||
fn should_persist_item(item: &ApiThreadItem) -> bool {
|
||||
matches!(
|
||||
item,
|
||||
ApiThreadItem::UserMessage { .. }
|
||||
| ApiThreadItem::HookPrompt { .. }
|
||||
| ApiThreadItem::AgentMessage { .. }
|
||||
| ApiThreadItem::Plan { .. }
|
||||
| ApiThreadItem::Reasoning { .. }
|
||||
| ApiThreadItem::WebSearch { .. }
|
||||
| ApiThreadItem::ImageView { .. }
|
||||
| ApiThreadItem::ImageGeneration { .. }
|
||||
| ApiThreadItem::EnteredReviewMode { .. }
|
||||
| ApiThreadItem::ExitedReviewMode { .. }
|
||||
| ApiThreadItem::ContextCompaction { .. }
|
||||
)
|
||||
}
|
||||
|
||||
fn item_kind(item: &ApiThreadItem) -> &'static str {
|
||||
match item {
|
||||
ApiThreadItem::UserMessage { .. } => "userMessage",
|
||||
ApiThreadItem::HookPrompt { .. } => "hookPrompt",
|
||||
ApiThreadItem::AgentMessage { .. } => "agentMessage",
|
||||
ApiThreadItem::Plan { .. } => "plan",
|
||||
ApiThreadItem::Reasoning { .. } => "reasoning",
|
||||
ApiThreadItem::CommandExecution { .. } => "commandExecution",
|
||||
ApiThreadItem::FileChange { .. } => "fileChange",
|
||||
ApiThreadItem::McpToolCall { .. } => "mcpToolCall",
|
||||
ApiThreadItem::DynamicToolCall { .. } => "dynamicToolCall",
|
||||
ApiThreadItem::CollabAgentToolCall { .. } => "collabAgentToolCall",
|
||||
ApiThreadItem::WebSearch { .. } => "webSearch",
|
||||
ApiThreadItem::ImageView { .. } => "imageView",
|
||||
ApiThreadItem::ImageGeneration { .. } => "imageGeneration",
|
||||
ApiThreadItem::EnteredReviewMode { .. } => "enteredReviewMode",
|
||||
ApiThreadItem::ExitedReviewMode { .. } => "exitedReviewMode",
|
||||
ApiThreadItem::ContextCompaction { .. } => "contextCompaction",
|
||||
}
|
||||
}
|
||||
|
||||
fn serialize_turn_error(turn_error: Option<&TurnError>) -> anyhow::Result<Option<String>> {
|
||||
turn_error
|
||||
.map(serde_json::to_string)
|
||||
.transpose()
|
||||
.map_err(Into::into)
|
||||
}
|
||||
|
||||
fn item_timestamp_for_turn(
|
||||
turn: &Turn,
|
||||
index: usize,
|
||||
last_item_at_ms: Option<i64>,
|
||||
) -> anyhow::Result<DateTime<Utc>> {
|
||||
let base_ms = turn
|
||||
.started_at
|
||||
.or(turn.completed_at)
|
||||
.map(|seconds| seconds.saturating_mul(1000))
|
||||
.or(last_item_at_ms.map(|value| value.saturating_add(1)))
|
||||
.unwrap_or_else(|| i64::try_from(index).unwrap_or(i64::MAX));
|
||||
let candidate_ms = base_ms.saturating_add(i64::try_from(index).unwrap_or(i64::MAX));
|
||||
DateTime::<Utc>::from_timestamp_millis(candidate_ms)
|
||||
.ok_or_else(|| anyhow::anyhow!("invalid thread item timestamp millis: {candidate_ms}"))
|
||||
}
|
||||
|
||||
fn search_text(item: &ApiThreadItem) -> String {
|
||||
match item {
|
||||
ApiThreadItem::UserMessage { content, .. } => content
|
||||
.iter()
|
||||
.map(|entry| match entry {
|
||||
codex_app_server_protocol::UserInput::Text { text, .. } => text.clone(),
|
||||
codex_app_server_protocol::UserInput::Image { url } => url.clone(),
|
||||
codex_app_server_protocol::UserInput::LocalImage { path } => {
|
||||
path.display().to_string()
|
||||
}
|
||||
codex_app_server_protocol::UserInput::Skill { name, path } => {
|
||||
format!("{name} {}", path.display())
|
||||
}
|
||||
codex_app_server_protocol::UserInput::Mention { name, path } => {
|
||||
format!("{name} {path}")
|
||||
}
|
||||
})
|
||||
.collect::<Vec<_>>()
|
||||
.join("\n"),
|
||||
ApiThreadItem::HookPrompt { fragments, .. } => fragments
|
||||
.iter()
|
||||
.map(|fragment| fragment.text.clone())
|
||||
.collect::<Vec<_>>()
|
||||
.join("\n"),
|
||||
ApiThreadItem::AgentMessage { text, .. } => text.clone(),
|
||||
ApiThreadItem::Plan { text, .. } => text.clone(),
|
||||
ApiThreadItem::Reasoning {
|
||||
summary, content, ..
|
||||
} => summary
|
||||
.iter()
|
||||
.chain(content.iter())
|
||||
.cloned()
|
||||
.collect::<Vec<_>>()
|
||||
.join("\n"),
|
||||
ApiThreadItem::WebSearch { query, .. } => query.clone(),
|
||||
ApiThreadItem::ImageView { path, .. } => path.display().to_string(),
|
||||
ApiThreadItem::ImageGeneration {
|
||||
revised_prompt,
|
||||
saved_path,
|
||||
..
|
||||
} => [
|
||||
revised_prompt.clone().unwrap_or_default(),
|
||||
saved_path
|
||||
.as_ref()
|
||||
.map(|path| path.display().to_string())
|
||||
.unwrap_or_default(),
|
||||
]
|
||||
.into_iter()
|
||||
.filter(|value| !value.is_empty())
|
||||
.collect::<Vec<_>>()
|
||||
.join("\n"),
|
||||
ApiThreadItem::EnteredReviewMode { review, .. }
|
||||
| ApiThreadItem::ExitedReviewMode { review, .. } => review.clone(),
|
||||
ApiThreadItem::ContextCompaction { .. }
|
||||
| ApiThreadItem::CommandExecution { .. }
|
||||
| ApiThreadItem::FileChange { .. }
|
||||
| ApiThreadItem::McpToolCall { .. }
|
||||
| ApiThreadItem::DynamicToolCall { .. }
|
||||
| ApiThreadItem::CollabAgentToolCall { .. } => String::new(),
|
||||
}
|
||||
}
|
||||
18
codex-rs/state/migrations/0028_thread_items.sql
Normal file
18
codex-rs/state/migrations/0028_thread_items.sql
Normal file
@@ -0,0 +1,18 @@
|
||||
CREATE TABLE thread_items (
|
||||
thread_id TEXT NOT NULL,
|
||||
turn_id TEXT NOT NULL,
|
||||
item_id TEXT NOT NULL,
|
||||
item_kind TEXT NOT NULL,
|
||||
item_at_ms INTEGER NOT NULL,
|
||||
turn_status TEXT NOT NULL,
|
||||
turn_error_json TEXT,
|
||||
turn_started_at INTEGER,
|
||||
turn_completed_at INTEGER,
|
||||
turn_duration_ms INTEGER,
|
||||
search_text TEXT NOT NULL,
|
||||
payload_json TEXT NOT NULL,
|
||||
PRIMARY KEY (thread_id, item_id)
|
||||
);
|
||||
|
||||
CREATE INDEX idx_thread_items_thread_item_at_ms
|
||||
ON thread_items(thread_id, item_at_ms DESC);
|
||||
@@ -44,6 +44,9 @@ pub use model::Stage1JobClaimOutcome;
|
||||
pub use model::Stage1Output;
|
||||
pub use model::Stage1OutputRef;
|
||||
pub use model::Stage1StartupClaimParams;
|
||||
pub use model::ThreadItemRecord;
|
||||
pub use model::ThreadItemRecordInsert;
|
||||
pub use model::ThreadItemsPage;
|
||||
pub use model::ThreadMetadata;
|
||||
pub use model::ThreadMetadataBuilder;
|
||||
pub use model::ThreadsPage;
|
||||
@@ -60,7 +63,7 @@ pub const SQLITE_HOME_ENV: &str = "CODEX_SQLITE_HOME";
|
||||
pub const LOGS_DB_FILENAME: &str = "logs";
|
||||
pub const LOGS_DB_VERSION: u32 = 2;
|
||||
pub const STATE_DB_FILENAME: &str = "state";
|
||||
pub const STATE_DB_VERSION: u32 = 5;
|
||||
pub const STATE_DB_VERSION: u32 = 6;
|
||||
|
||||
/// Errors encountered during DB operations. Tags: [stage]
|
||||
pub const DB_ERROR_METRIC: &str = "codex.db.error";
|
||||
|
||||
@@ -3,6 +3,7 @@ mod backfill_state;
|
||||
mod graph;
|
||||
mod log;
|
||||
mod memories;
|
||||
mod thread_item;
|
||||
mod thread_metadata;
|
||||
|
||||
pub use agent_job::AgentJob;
|
||||
@@ -25,6 +26,9 @@ pub use memories::Stage1JobClaimOutcome;
|
||||
pub use memories::Stage1Output;
|
||||
pub use memories::Stage1OutputRef;
|
||||
pub use memories::Stage1StartupClaimParams;
|
||||
pub use thread_item::ThreadItemRecord;
|
||||
pub use thread_item::ThreadItemRecordInsert;
|
||||
pub use thread_item::ThreadItemsPage;
|
||||
pub use thread_metadata::Anchor;
|
||||
pub use thread_metadata::BackfillStats;
|
||||
pub use thread_metadata::ExtractionOutcome;
|
||||
@@ -38,6 +42,8 @@ pub(crate) use agent_job::AgentJobItemRow;
|
||||
pub(crate) use agent_job::AgentJobRow;
|
||||
pub(crate) use memories::Stage1OutputRow;
|
||||
pub(crate) use memories::stage1_output_ref_from_parts;
|
||||
pub(crate) use thread_item::ThreadItemRow;
|
||||
pub(crate) use thread_item::anchor_from_thread_item;
|
||||
pub(crate) use thread_metadata::ThreadRow;
|
||||
pub(crate) use thread_metadata::anchor_from_item;
|
||||
pub(crate) use thread_metadata::datetime_to_epoch_millis;
|
||||
|
||||
109
codex-rs/state/src/model/thread_item.rs
Normal file
109
codex-rs/state/src/model/thread_item.rs
Normal file
@@ -0,0 +1,109 @@
|
||||
use anyhow::Result;
|
||||
use chrono::DateTime;
|
||||
use chrono::Utc;
|
||||
use sqlx::Row;
|
||||
use sqlx::sqlite::SqliteRow;
|
||||
|
||||
use super::Anchor;
|
||||
use super::epoch_millis_to_datetime;
|
||||
|
||||
/// A lightweight, renderable item persisted for thread history pagination.
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub struct ThreadItemRecord {
|
||||
pub thread_id: String,
|
||||
pub turn_id: String,
|
||||
pub item_id: String,
|
||||
pub item_kind: String,
|
||||
pub item_at: DateTime<Utc>,
|
||||
pub turn_status: String,
|
||||
pub turn_error_json: Option<String>,
|
||||
pub turn_started_at: Option<i64>,
|
||||
pub turn_completed_at: Option<i64>,
|
||||
pub turn_duration_ms: Option<i64>,
|
||||
pub search_text: String,
|
||||
pub payload_json: String,
|
||||
}
|
||||
|
||||
/// Insert payload for a lightweight persisted thread item.
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub struct ThreadItemRecordInsert {
|
||||
pub turn_id: String,
|
||||
pub item_id: String,
|
||||
pub item_kind: String,
|
||||
pub item_at: DateTime<Utc>,
|
||||
pub turn_status: String,
|
||||
pub turn_error_json: Option<String>,
|
||||
pub turn_started_at: Option<i64>,
|
||||
pub turn_completed_at: Option<i64>,
|
||||
pub turn_duration_ms: Option<i64>,
|
||||
pub search_text: String,
|
||||
pub payload_json: String,
|
||||
}
|
||||
|
||||
/// A single page of persisted thread-item results.
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub struct ThreadItemsPage {
|
||||
pub items: Vec<ThreadItemRecord>,
|
||||
pub next_anchor: Option<Anchor>,
|
||||
pub num_scanned_rows: usize,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct ThreadItemRow {
|
||||
thread_id: String,
|
||||
turn_id: String,
|
||||
item_id: String,
|
||||
item_kind: String,
|
||||
item_at: i64,
|
||||
turn_status: String,
|
||||
turn_error_json: Option<String>,
|
||||
turn_started_at: Option<i64>,
|
||||
turn_completed_at: Option<i64>,
|
||||
turn_duration_ms: Option<i64>,
|
||||
search_text: String,
|
||||
payload_json: String,
|
||||
}
|
||||
|
||||
impl ThreadItemRow {
|
||||
pub(crate) fn try_from_row(row: &SqliteRow) -> Result<Self> {
|
||||
Ok(Self {
|
||||
thread_id: row.try_get("thread_id")?,
|
||||
turn_id: row.try_get("turn_id")?,
|
||||
item_id: row.try_get("item_id")?,
|
||||
item_kind: row.try_get("item_kind")?,
|
||||
item_at: row.try_get("item_at")?,
|
||||
turn_status: row.try_get("turn_status")?,
|
||||
turn_error_json: row.try_get("turn_error_json")?,
|
||||
turn_started_at: row.try_get("turn_started_at")?,
|
||||
turn_completed_at: row.try_get("turn_completed_at")?,
|
||||
turn_duration_ms: row.try_get("turn_duration_ms")?,
|
||||
search_text: row.try_get("search_text")?,
|
||||
payload_json: row.try_get("payload_json")?,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<ThreadItemRow> for ThreadItemRecord {
|
||||
type Error = anyhow::Error;
|
||||
|
||||
fn try_from(value: ThreadItemRow) -> Result<Self> {
|
||||
Ok(Self {
|
||||
thread_id: value.thread_id,
|
||||
turn_id: value.turn_id,
|
||||
item_id: value.item_id,
|
||||
item_kind: value.item_kind,
|
||||
item_at: epoch_millis_to_datetime(value.item_at)?,
|
||||
turn_status: value.turn_status,
|
||||
turn_error_json: value.turn_error_json,
|
||||
turn_started_at: value.turn_started_at,
|
||||
turn_completed_at: value.turn_completed_at,
|
||||
turn_duration_ms: value.turn_duration_ms,
|
||||
search_text: value.search_text,
|
||||
payload_json: value.payload_json,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn anchor_from_thread_item(item: &ThreadItemRecord) -> Anchor {
|
||||
Anchor { ts: item.item_at }
|
||||
}
|
||||
@@ -60,6 +60,7 @@ mod memories;
|
||||
mod remote_control;
|
||||
#[cfg(test)]
|
||||
mod test_support;
|
||||
mod thread_items;
|
||||
mod threads;
|
||||
|
||||
pub use remote_control::RemoteControlEnrollmentRecord;
|
||||
@@ -81,6 +82,7 @@ pub struct StateRuntime {
|
||||
pool: Arc<sqlx::SqlitePool>,
|
||||
logs_pool: Arc<sqlx::SqlitePool>,
|
||||
thread_updated_at_millis: Arc<AtomicI64>,
|
||||
thread_item_at_millis: Arc<AtomicI64>,
|
||||
}
|
||||
|
||||
impl StateRuntime {
|
||||
@@ -130,12 +132,18 @@ impl StateRuntime {
|
||||
.fetch_one(pool.as_ref())
|
||||
.await?;
|
||||
let thread_updated_at_millis = thread_updated_at_millis.unwrap_or(0);
|
||||
let thread_item_at_millis: Option<i64> =
|
||||
sqlx::query_scalar("SELECT MAX(thread_items.item_at_ms) FROM thread_items")
|
||||
.fetch_one(pool.as_ref())
|
||||
.await?;
|
||||
let thread_item_at_millis = thread_item_at_millis.unwrap_or(0);
|
||||
let runtime = Arc::new(Self {
|
||||
pool,
|
||||
logs_pool,
|
||||
codex_home,
|
||||
default_provider,
|
||||
thread_updated_at_millis: Arc::new(AtomicI64::new(thread_updated_at_millis)),
|
||||
thread_item_at_millis: Arc::new(AtomicI64::new(thread_item_at_millis)),
|
||||
});
|
||||
if let Err(err) = runtime.run_logs_startup_maintenance().await {
|
||||
warn!(
|
||||
|
||||
349
codex-rs/state/src/runtime/thread_items.rs
Normal file
349
codex-rs/state/src/runtime/thread_items.rs
Normal file
@@ -0,0 +1,349 @@
|
||||
use super::*;
|
||||
use crate::SortDirection;
|
||||
use crate::ThreadItemRecordInsert;
|
||||
use crate::ThreadItemsPage;
|
||||
use crate::model::ThreadItemRow;
|
||||
use crate::model::anchor_from_thread_item;
|
||||
use std::collections::HashMap;
|
||||
use std::collections::HashSet;
|
||||
use std::sync::atomic::Ordering;
|
||||
|
||||
impl StateRuntime {
|
||||
pub async fn replace_thread_items(
|
||||
&self,
|
||||
thread_id: &str,
|
||||
items: &[ThreadItemRecordInsert],
|
||||
) -> anyhow::Result<()> {
|
||||
let existing_rows =
|
||||
sqlx::query("SELECT item_id, item_at_ms FROM thread_items WHERE thread_id = ?")
|
||||
.bind(thread_id)
|
||||
.fetch_all(self.pool.as_ref())
|
||||
.await?;
|
||||
let existing_item_at_millis: HashMap<String, i64> = existing_rows
|
||||
.into_iter()
|
||||
.filter_map(|row| {
|
||||
let item_id = row.try_get::<String, _>("item_id").ok()?;
|
||||
let item_at_ms = row.try_get::<i64, _>("item_at_ms").ok()?;
|
||||
Some((item_id, item_at_ms))
|
||||
})
|
||||
.collect();
|
||||
let mut assigned_item_at_millis = HashSet::new();
|
||||
|
||||
let mut tx = self.pool.begin().await?;
|
||||
sqlx::query("DELETE FROM thread_items WHERE thread_id = ?")
|
||||
.bind(thread_id)
|
||||
.execute(&mut *tx)
|
||||
.await?;
|
||||
|
||||
for item in items {
|
||||
let mut item_at_ms =
|
||||
if let Some(item_at_ms) = existing_item_at_millis.get(&item.item_id) {
|
||||
*item_at_ms
|
||||
} else {
|
||||
datetime_to_epoch_millis(self.allocate_thread_item_at(item.item_at)?)
|
||||
};
|
||||
while !assigned_item_at_millis.insert(item_at_ms) {
|
||||
item_at_ms = item_at_ms.saturating_add(1);
|
||||
}
|
||||
sqlx::query(
|
||||
r#"
|
||||
INSERT INTO thread_items (
|
||||
thread_id,
|
||||
turn_id,
|
||||
item_id,
|
||||
item_kind,
|
||||
item_at_ms,
|
||||
turn_status,
|
||||
turn_error_json,
|
||||
turn_started_at,
|
||||
turn_completed_at,
|
||||
turn_duration_ms,
|
||||
search_text,
|
||||
payload_json
|
||||
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||||
"#,
|
||||
)
|
||||
.bind(thread_id)
|
||||
.bind(item.turn_id.as_str())
|
||||
.bind(item.item_id.as_str())
|
||||
.bind(item.item_kind.as_str())
|
||||
.bind(item_at_ms)
|
||||
.bind(item.turn_status.as_str())
|
||||
.bind(item.turn_error_json.as_deref())
|
||||
.bind(item.turn_started_at)
|
||||
.bind(item.turn_completed_at)
|
||||
.bind(item.turn_duration_ms)
|
||||
.bind(item.search_text.as_str())
|
||||
.bind(item.payload_json.as_str())
|
||||
.execute(&mut *tx)
|
||||
.await?;
|
||||
}
|
||||
|
||||
tx.commit().await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn list_thread_items(
|
||||
&self,
|
||||
thread_id: &str,
|
||||
page_size: usize,
|
||||
anchor: Option<&crate::Anchor>,
|
||||
sort_direction: crate::SortDirection,
|
||||
) -> anyhow::Result<crate::ThreadItemsPage> {
|
||||
let limit = page_size.saturating_add(1);
|
||||
let mut builder = QueryBuilder::<Sqlite>::new(
|
||||
r#"
|
||||
SELECT
|
||||
thread_id,
|
||||
turn_id,
|
||||
item_id,
|
||||
item_kind,
|
||||
item_at_ms AS item_at,
|
||||
turn_status,
|
||||
turn_error_json,
|
||||
turn_started_at,
|
||||
turn_completed_at,
|
||||
turn_duration_ms,
|
||||
search_text,
|
||||
payload_json
|
||||
FROM thread_items
|
||||
WHERE thread_id =
|
||||
"#,
|
||||
);
|
||||
builder.push_bind(thread_id);
|
||||
if let Some(anchor) = anchor {
|
||||
let item_at_ms = datetime_to_epoch_millis(anchor.ts);
|
||||
match sort_direction {
|
||||
SortDirection::Asc => builder.push(" AND item_at_ms > ").push_bind(item_at_ms),
|
||||
SortDirection::Desc => builder.push(" AND item_at_ms < ").push_bind(item_at_ms),
|
||||
};
|
||||
}
|
||||
let sort_sql = match sort_direction {
|
||||
SortDirection::Asc => " ASC",
|
||||
SortDirection::Desc => " DESC",
|
||||
};
|
||||
builder
|
||||
.push(" ORDER BY item_at_ms")
|
||||
.push(sort_sql)
|
||||
.push(" LIMIT ")
|
||||
.push_bind(i64::try_from(limit).unwrap_or(i64::MAX));
|
||||
|
||||
let rows = builder.build().fetch_all(self.pool.as_ref()).await?;
|
||||
let mut items = rows
|
||||
.into_iter()
|
||||
.map(|row| {
|
||||
ThreadItemRow::try_from_row(&row).and_then(crate::ThreadItemRecord::try_from)
|
||||
})
|
||||
.collect::<Result<Vec<_>, _>>()?;
|
||||
let num_scanned_rows = items.len();
|
||||
let next_anchor = if items.len() > page_size {
|
||||
items.pop();
|
||||
items.last().map(anchor_from_thread_item)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
Ok(ThreadItemsPage {
|
||||
items,
|
||||
next_anchor,
|
||||
num_scanned_rows,
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn get_thread_item(
|
||||
&self,
|
||||
thread_id: &str,
|
||||
item_id: &str,
|
||||
) -> anyhow::Result<Option<crate::ThreadItemRecord>> {
|
||||
let row = sqlx::query(
|
||||
r#"
|
||||
SELECT
|
||||
thread_id,
|
||||
turn_id,
|
||||
item_id,
|
||||
item_kind,
|
||||
item_at_ms AS item_at,
|
||||
turn_status,
|
||||
turn_error_json,
|
||||
turn_started_at,
|
||||
turn_completed_at,
|
||||
turn_duration_ms,
|
||||
search_text,
|
||||
payload_json
|
||||
FROM thread_items
|
||||
WHERE thread_id = ? AND item_id = ?
|
||||
"#,
|
||||
)
|
||||
.bind(thread_id)
|
||||
.bind(item_id)
|
||||
.fetch_optional(self.pool.as_ref())
|
||||
.await?;
|
||||
row.map(|row| ThreadItemRow::try_from_row(&row).and_then(crate::ThreadItemRecord::try_from))
|
||||
.transpose()
|
||||
}
|
||||
|
||||
fn allocate_thread_item_at(&self, item_at: DateTime<Utc>) -> anyhow::Result<DateTime<Utc>> {
|
||||
let candidate = datetime_to_epoch_millis(item_at);
|
||||
let allocated = loop {
|
||||
let current = self.thread_item_at_millis.load(Ordering::Relaxed);
|
||||
if candidate > current {
|
||||
if self
|
||||
.thread_item_at_millis
|
||||
.compare_exchange(current, candidate, Ordering::Relaxed, Ordering::Relaxed)
|
||||
.is_ok()
|
||||
{
|
||||
break candidate;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
if candidate.saturating_add(1000) <= current {
|
||||
break candidate;
|
||||
}
|
||||
let bumped = current.saturating_add(1);
|
||||
if self
|
||||
.thread_item_at_millis
|
||||
.compare_exchange(current, bumped, Ordering::Relaxed, Ordering::Relaxed)
|
||||
.is_ok()
|
||||
{
|
||||
break bumped;
|
||||
}
|
||||
};
|
||||
epoch_millis_to_datetime(allocated)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::super::test_support::unique_temp_dir;
|
||||
use super::StateRuntime;
|
||||
use super::anchor_from_thread_item;
|
||||
use crate::SortDirection;
|
||||
use crate::ThreadItemRecordInsert;
|
||||
use chrono::DateTime;
|
||||
use chrono::Utc;
|
||||
use pretty_assertions::assert_eq;
|
||||
|
||||
fn item(item_id: &str, item_at_ms: i64) -> ThreadItemRecordInsert {
|
||||
ThreadItemRecordInsert {
|
||||
turn_id: "turn-1".to_string(),
|
||||
item_id: item_id.to_string(),
|
||||
item_kind: "agentMessage".to_string(),
|
||||
item_at: DateTime::<Utc>::from_timestamp_millis(item_at_ms).expect("timestamp millis"),
|
||||
turn_status: "completed".to_string(),
|
||||
turn_error_json: None,
|
||||
turn_started_at: Some(item_at_ms / 1000),
|
||||
turn_completed_at: Some(item_at_ms / 1000),
|
||||
turn_duration_ms: Some(1),
|
||||
search_text: format!("search {item_id}"),
|
||||
payload_json: format!(
|
||||
r#"{{"type":"agentMessage","id":"{item_id}","text":"{item_id}"}}"#
|
||||
),
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn replace_thread_items_pages_and_reuses_existing_timestamps() {
|
||||
let codex_home = unique_temp_dir();
|
||||
let runtime = StateRuntime::init(codex_home, "test-provider".to_string())
|
||||
.await
|
||||
.expect("state db should initialize");
|
||||
let thread_id = "thread-1";
|
||||
|
||||
runtime
|
||||
.replace_thread_items(
|
||||
thread_id,
|
||||
&[
|
||||
item("item-a", 1_700_001_111_123),
|
||||
item("item-b", 1_700_001_111_123),
|
||||
],
|
||||
)
|
||||
.await
|
||||
.expect("initial replace should succeed");
|
||||
|
||||
let page = runtime
|
||||
.list_thread_items(thread_id, 1, None, SortDirection::Desc)
|
||||
.await
|
||||
.expect("list should succeed");
|
||||
assert_eq!(page.items.len(), 1);
|
||||
assert_eq!(page.items[0].item_id, "item-b");
|
||||
|
||||
let next_anchor = page.next_anchor.expect("expected next anchor");
|
||||
let older_page = runtime
|
||||
.list_thread_items(thread_id, 10, Some(&next_anchor), SortDirection::Desc)
|
||||
.await
|
||||
.expect("older page should succeed");
|
||||
assert_eq!(
|
||||
older_page
|
||||
.items
|
||||
.iter()
|
||||
.map(|item| item.item_id.as_str())
|
||||
.collect::<Vec<_>>(),
|
||||
vec!["item-a"]
|
||||
);
|
||||
|
||||
let previous_item_b = runtime
|
||||
.get_thread_item(thread_id, "item-b")
|
||||
.await
|
||||
.expect("item should load")
|
||||
.expect("item should exist");
|
||||
|
||||
runtime
|
||||
.replace_thread_items(
|
||||
thread_id,
|
||||
&[
|
||||
item("item-b", 1_700_001_111_123),
|
||||
item("item-c", 1_700_001_111_100),
|
||||
],
|
||||
)
|
||||
.await
|
||||
.expect("replacement should succeed");
|
||||
|
||||
let current_item_b = runtime
|
||||
.get_thread_item(thread_id, "item-b")
|
||||
.await
|
||||
.expect("item should load")
|
||||
.expect("item should exist");
|
||||
assert_eq!(current_item_b.item_at, previous_item_b.item_at);
|
||||
|
||||
let asc_page = runtime
|
||||
.list_thread_items(thread_id, 10, None, SortDirection::Asc)
|
||||
.await
|
||||
.expect("asc list should succeed");
|
||||
assert_eq!(
|
||||
asc_page
|
||||
.items
|
||||
.iter()
|
||||
.map(|item| item.item_id.as_str())
|
||||
.collect::<Vec<_>>(),
|
||||
vec!["item-b", "item-c"]
|
||||
);
|
||||
assert!(asc_page.items[0].item_at < asc_page.items[1].item_at);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn replace_thread_items_next_anchor_tracks_last_row_on_page() {
|
||||
let codex_home = unique_temp_dir();
|
||||
let runtime = StateRuntime::init(codex_home, "test-provider".to_string())
|
||||
.await
|
||||
.expect("state db should initialize");
|
||||
let thread_id = "thread-2";
|
||||
|
||||
runtime
|
||||
.replace_thread_items(
|
||||
thread_id,
|
||||
&[
|
||||
item("item-a", 1_700_001_111_100),
|
||||
item("item-b", 1_700_001_111_200),
|
||||
item("item-c", 1_700_001_111_300),
|
||||
],
|
||||
)
|
||||
.await
|
||||
.expect("replace should succeed");
|
||||
|
||||
let page = runtime
|
||||
.list_thread_items(thread_id, 2, None, SortDirection::Desc)
|
||||
.await
|
||||
.expect("list should succeed");
|
||||
let anchor = page.next_anchor.expect("expected next anchor");
|
||||
assert_eq!(anchor, anchor_from_thread_item(&page.items[1]));
|
||||
}
|
||||
}
|
||||
@@ -909,6 +909,10 @@ ON CONFLICT(thread_id, position) DO NOTHING
|
||||
|
||||
/// Delete a thread metadata row by id.
|
||||
pub async fn delete_thread(&self, thread_id: ThreadId) -> anyhow::Result<u64> {
|
||||
sqlx::query("DELETE FROM thread_items WHERE thread_id = ?")
|
||||
.bind(thread_id.to_string())
|
||||
.execute(self.pool.as_ref())
|
||||
.await?;
|
||||
let result = sqlx::query("DELETE FROM threads WHERE id = ?")
|
||||
.bind(thread_id.to_string())
|
||||
.execute(self.pool.as_ref())
|
||||
|
||||
Reference in New Issue
Block a user