Compare commits

...

1 Commits

Author SHA1 Message Date
David de Regt
bd45013a04 wip branch for items migration and pagination 2026-04-27 13:18:59 -07:00
31 changed files with 3581 additions and 2 deletions

1
codex-rs/Cargo.lock generated
View File

@@ -3184,6 +3184,7 @@ dependencies = [
"anyhow",
"async-trait",
"chrono",
"codex-app-server-protocol",
"codex-file-search",
"codex-git-utils",
"codex-login",

View File

@@ -3393,6 +3393,44 @@
],
"type": "object"
},
"ThreadItemsListParams": {
"properties": {
"cursor": {
"description": "Opaque cursor to pass to the next call to continue after the last item.",
"type": [
"string",
"null"
]
},
"limit": {
"description": "Optional item page size.",
"format": "uint32",
"minimum": 0.0,
"type": [
"integer",
"null"
]
},
"sortDirection": {
"anyOf": [
{
"$ref": "#/definitions/SortDirection"
},
{
"type": "null"
}
],
"description": "Optional item pagination direction; defaults to descending."
},
"threadId": {
"type": "string"
}
},
"required": [
"threadId"
],
"type": "object"
},
"ThreadListCwdFilter": {
"anyOf": [
{
@@ -4807,6 +4845,30 @@
"title": "Thread/turns/listRequest",
"type": "object"
},
{
"properties": {
"id": {
"$ref": "#/definitions/RequestId"
},
"method": {
"enum": [
"thread/items/list"
],
"title": "Thread/items/listRequestMethod",
"type": "string"
},
"params": {
"$ref": "#/definitions/ThreadItemsListParams"
}
},
"required": [
"id",
"method",
"params"
],
"title": "Thread/items/listRequest",
"type": "object"
},
{
"description": "Append raw Responses API items to the thread history without starting a user turn.",
"properties": {

View File

@@ -592,6 +592,30 @@
"title": "Thread/turns/listRequest",
"type": "object"
},
{
"properties": {
"id": {
"$ref": "#/definitions/v2/RequestId"
},
"method": {
"enum": [
"thread/items/list"
],
"title": "Thread/items/listRequestMethod",
"type": "string"
},
"params": {
"$ref": "#/definitions/v2/ThreadItemsListParams"
}
},
"required": [
"id",
"method",
"params"
],
"title": "Thread/items/listRequest",
"type": "object"
},
{
"description": "Append raw Responses API items to the thread history without starting a user turn.",
"properties": {
@@ -14508,6 +14532,59 @@
"title": "ThreadForkResponse",
"type": "object"
},
"ThreadHistoryItem": {
"properties": {
"item": {
"$ref": "#/definitions/v2/ThreadItem"
},
"turnCompletedAt": {
"description": "Unix timestamp (in seconds) when the turn completed.",
"format": "int64",
"type": [
"integer",
"null"
]
},
"turnDurationMs": {
"description": "Duration between turn start and completion in milliseconds, if known.",
"format": "int64",
"type": [
"integer",
"null"
]
},
"turnError": {
"anyOf": [
{
"$ref": "#/definitions/v2/TurnError"
},
{
"type": "null"
}
]
},
"turnId": {
"type": "string"
},
"turnStartedAt": {
"description": "Unix timestamp (in seconds) when the turn started.",
"format": "int64",
"type": [
"integer",
"null"
]
},
"turnStatus": {
"$ref": "#/definitions/v2/TurnStatus"
}
},
"required": [
"item",
"turnId",
"turnStatus"
],
"type": "object"
},
"ThreadId": {
"type": "string"
},
@@ -15195,6 +15272,76 @@
}
]
},
"ThreadItemsListParams": {
"$schema": "http://json-schema.org/draft-07/schema#",
"properties": {
"cursor": {
"description": "Opaque cursor to pass to the next call to continue after the last item.",
"type": [
"string",
"null"
]
},
"limit": {
"description": "Optional item page size.",
"format": "uint32",
"minimum": 0.0,
"type": [
"integer",
"null"
]
},
"sortDirection": {
"anyOf": [
{
"$ref": "#/definitions/v2/SortDirection"
},
{
"type": "null"
}
],
"description": "Optional item pagination direction; defaults to descending."
},
"threadId": {
"type": "string"
}
},
"required": [
"threadId"
],
"title": "ThreadItemsListParams",
"type": "object"
},
"ThreadItemsListResponse": {
"$schema": "http://json-schema.org/draft-07/schema#",
"properties": {
"backwardsCursor": {
"description": "Opaque cursor to pass as `cursor` when reversing `sortDirection`. This is only populated when the page contains at least one item. Use it with the opposite `sortDirection` to include the anchor item again and catch updates to that item.",
"type": [
"string",
"null"
]
},
"data": {
"items": {
"$ref": "#/definitions/v2/ThreadHistoryItem"
},
"type": "array"
},
"nextCursor": {
"description": "Opaque cursor to pass to the next call to continue after the last item. if None, there are no more items to return.",
"type": [
"string",
"null"
]
}
},
"required": [
"data"
],
"title": "ThreadItemsListResponse",
"type": "object"
},
"ThreadListCwdFilter": {
"anyOf": [
{

View File

@@ -1283,6 +1283,30 @@
"title": "Thread/turns/listRequest",
"type": "object"
},
{
"properties": {
"id": {
"$ref": "#/definitions/RequestId"
},
"method": {
"enum": [
"thread/items/list"
],
"title": "Thread/items/listRequestMethod",
"type": "string"
},
"params": {
"$ref": "#/definitions/ThreadItemsListParams"
}
},
"required": [
"id",
"method",
"params"
],
"title": "Thread/items/listRequest",
"type": "object"
},
{
"description": "Append raw Responses API items to the thread history without starting a user turn.",
"properties": {
@@ -12395,6 +12419,59 @@
"title": "ThreadForkResponse",
"type": "object"
},
"ThreadHistoryItem": {
"properties": {
"item": {
"$ref": "#/definitions/ThreadItem"
},
"turnCompletedAt": {
"description": "Unix timestamp (in seconds) when the turn completed.",
"format": "int64",
"type": [
"integer",
"null"
]
},
"turnDurationMs": {
"description": "Duration between turn start and completion in milliseconds, if known.",
"format": "int64",
"type": [
"integer",
"null"
]
},
"turnError": {
"anyOf": [
{
"$ref": "#/definitions/TurnError"
},
{
"type": "null"
}
]
},
"turnId": {
"type": "string"
},
"turnStartedAt": {
"description": "Unix timestamp (in seconds) when the turn started.",
"format": "int64",
"type": [
"integer",
"null"
]
},
"turnStatus": {
"$ref": "#/definitions/TurnStatus"
}
},
"required": [
"item",
"turnId",
"turnStatus"
],
"type": "object"
},
"ThreadId": {
"type": "string"
},
@@ -13082,6 +13159,76 @@
}
]
},
"ThreadItemsListParams": {
"$schema": "http://json-schema.org/draft-07/schema#",
"properties": {
"cursor": {
"description": "Opaque cursor to pass to the next call to continue after the last item.",
"type": [
"string",
"null"
]
},
"limit": {
"description": "Optional item page size.",
"format": "uint32",
"minimum": 0.0,
"type": [
"integer",
"null"
]
},
"sortDirection": {
"anyOf": [
{
"$ref": "#/definitions/SortDirection"
},
{
"type": "null"
}
],
"description": "Optional item pagination direction; defaults to descending."
},
"threadId": {
"type": "string"
}
},
"required": [
"threadId"
],
"title": "ThreadItemsListParams",
"type": "object"
},
"ThreadItemsListResponse": {
"$schema": "http://json-schema.org/draft-07/schema#",
"properties": {
"backwardsCursor": {
"description": "Opaque cursor to pass as `cursor` when reversing `sortDirection`. This is only populated when the page contains at least one item. Use it with the opposite `sortDirection` to include the anchor item again and catch updates to that item.",
"type": [
"string",
"null"
]
},
"data": {
"items": {
"$ref": "#/definitions/ThreadHistoryItem"
},
"type": "array"
},
"nextCursor": {
"description": "Opaque cursor to pass to the next call to continue after the last item. if None, there are no more items to return.",
"type": [
"string",
"null"
]
}
},
"required": [
"data"
],
"title": "ThreadItemsListResponse",
"type": "object"
},
"ThreadListCwdFilter": {
"anyOf": [
{

View File

@@ -0,0 +1,49 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"definitions": {
"SortDirection": {
"enum": [
"asc",
"desc"
],
"type": "string"
}
},
"properties": {
"cursor": {
"description": "Opaque cursor to pass to the next call to continue after the last item.",
"type": [
"string",
"null"
]
},
"limit": {
"description": "Optional item page size.",
"format": "uint32",
"minimum": 0.0,
"type": [
"integer",
"null"
]
},
"sortDirection": {
"anyOf": [
{
"$ref": "#/definitions/SortDirection"
},
{
"type": "null"
}
],
"description": "Optional item pagination direction; defaults to descending."
},
"threadId": {
"type": "string"
}
},
"required": [
"threadId"
],
"title": "ThreadItemsListParams",
"type": "object"
}

File diff suppressed because it is too large Load Diff

File diff suppressed because one or more lines are too long

View File

@@ -0,0 +1,20 @@
// GENERATED CODE! DO NOT MODIFY BY HAND!
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
import type { ThreadItem } from "./ThreadItem";
import type { TurnError } from "./TurnError";
import type { TurnStatus } from "./TurnStatus";
export type ThreadHistoryItem = { turnId: string, item: ThreadItem, turnStatus: TurnStatus, turnError: TurnError | null,
/**
* Unix timestamp (in seconds) when the turn started.
*/
turnStartedAt: number | null,
/**
* Unix timestamp (in seconds) when the turn completed.
*/
turnCompletedAt: number | null,
/**
* Duration between turn start and completion in milliseconds, if known.
*/
turnDurationMs: number | null, };

View File

@@ -0,0 +1,18 @@
// GENERATED CODE! DO NOT MODIFY BY HAND!
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
import type { SortDirection } from "./SortDirection";
export type ThreadItemsListParams = { threadId: string,
/**
* Opaque cursor to pass to the next call to continue after the last item.
*/
cursor?: string | null,
/**
* Optional item page size.
*/
limit?: number | null,
/**
* Optional item pagination direction; defaults to descending.
*/
sortDirection?: SortDirection | null, };

View File

@@ -0,0 +1,18 @@
// GENERATED CODE! DO NOT MODIFY BY HAND!
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
import type { ThreadHistoryItem } from "./ThreadHistoryItem";
export type ThreadItemsListResponse = { data: Array<ThreadHistoryItem>,
/**
* Opaque cursor to pass to the next call to continue after the last item.
* if None, there are no more items to return.
*/
nextCursor: string | null,
/**
* Opaque cursor to pass as `cursor` when reversing `sortDirection`.
* This is only populated when the page contains at least one item.
* Use it with the opposite `sortDirection` to include the anchor item again
* and catch updates to that item.
*/
backwardsCursor: string | null, };

View File

@@ -324,9 +324,12 @@ export type { ThreadCompactStartParams } from "./ThreadCompactStartParams";
export type { ThreadCompactStartResponse } from "./ThreadCompactStartResponse";
export type { ThreadForkParams } from "./ThreadForkParams";
export type { ThreadForkResponse } from "./ThreadForkResponse";
export type { ThreadHistoryItem } from "./ThreadHistoryItem";
export type { ThreadInjectItemsParams } from "./ThreadInjectItemsParams";
export type { ThreadInjectItemsResponse } from "./ThreadInjectItemsResponse";
export type { ThreadItem } from "./ThreadItem";
export type { ThreadItemsListParams } from "./ThreadItemsListParams";
export type { ThreadItemsListResponse } from "./ThreadItemsListResponse";
export type { ThreadListParams } from "./ThreadListParams";
export type { ThreadListResponse } from "./ThreadListResponse";
export type { ThreadLoadedListParams } from "./ThreadLoadedListParams";

View File

@@ -340,6 +340,10 @@ client_request_definitions! {
params: v2::ThreadTurnsListParams,
response: v2::ThreadTurnsListResponse,
},
ThreadItemsList => "thread/items/list" {
params: v2::ThreadItemsListParams,
response: v2::ThreadItemsListResponse,
},
/// Append raw Responses API items to the thread history without starting a user turn.
ThreadInjectItems => "thread/inject_items" {
params: v2::ThreadInjectItemsParams,

View File

@@ -4036,6 +4036,56 @@ pub struct ThreadTurnsListResponse {
pub backwards_cursor: Option<String>,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct ThreadItemsListParams {
pub thread_id: String,
/// Opaque cursor to pass to the next call to continue after the last item.
#[ts(optional = nullable)]
pub cursor: Option<String>,
/// Optional item page size.
#[ts(optional = nullable)]
pub limit: Option<u32>,
/// Optional item pagination direction; defaults to descending.
#[ts(optional = nullable)]
pub sort_direction: Option<SortDirection>,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct ThreadHistoryItem {
pub turn_id: String,
pub item: ThreadItem,
pub turn_status: TurnStatus,
pub turn_error: Option<TurnError>,
/// Unix timestamp (in seconds) when the turn started.
#[ts(type = "number | null")]
pub turn_started_at: Option<i64>,
/// Unix timestamp (in seconds) when the turn completed.
#[ts(type = "number | null")]
pub turn_completed_at: Option<i64>,
/// Duration between turn start and completion in milliseconds, if known.
#[ts(type = "number | null")]
pub turn_duration_ms: Option<i64>,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct ThreadItemsListResponse {
pub data: Vec<ThreadHistoryItem>,
/// Opaque cursor to pass to the next call to continue after the last item.
/// if None, there are no more items to return.
pub next_cursor: Option<String>,
/// Opaque cursor to pass as `cursor` when reversing `sortDirection`.
/// This is only populated when the page contains at least one item.
/// Use it with the opposite `sortDirection` to include the anchor item again
/// and catch updates to that item.
pub backwards_cursor: Option<String>,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]

View File

@@ -143,6 +143,7 @@ Example with notification opt-out:
- `thread/loaded/list` — list the thread ids currently loaded in memory.
- `thread/read` — read a stored thread by id without resuming it; optionally include turns via `includeTurns`. The returned `thread` includes `status` (`ThreadStatus`), defaulting to `notLoaded` when the thread is not currently loaded.
- `thread/turns/list` — page through a stored threads turn history without resuming it; supports cursor-based pagination with `sortDirection`, `nextCursor`, and `backwardsCursor`.
- `thread/items/list` — page through sqlite-backed stored thread items without resuming it; returns only the lightweight renderable/searchable item subset used for scrolling UIs, along with per-item turn metadata and cursor-based pagination.
- `thread/metadata/update` — patch stored thread metadata in sqlite; currently supports updating persisted `gitInfo` fields and returns the refreshed `thread`.
- `thread/memoryMode/set` — experimental; set a threads persisted memory eligibility to `"enabled"` or `"disabled"` for either a loaded thread or a stored rollout; returns `{}` on success.
- `memory/reset` — experimental; clear the current `CODEX_HOME/memories` directory and reset persisted memory stage data in sqlite while preserving existing thread memory modes; returns `{}` on success.
@@ -418,6 +419,23 @@ Use `thread/turns/list` to page a stored threads turn history without resumin
} }
```
### Example: List thread items
Use `thread/items/list` to page a stored threads lightweight item history from sqlite. This excludes heavy tool execution payloads and is intended for scrolling/search-oriented UIs.
```json
{ "method": "thread/items/list", "id": 25, "params": {
"threadId": "thr_123",
"limit": 100,
"sortDirection": "desc"
} }
{ "id": 25, "result": {
"data": [ ... ],
"nextCursor": "older-items-cursor-or-null",
"backwardsCursor": "newer-items-cursor-or-null"
} }
```
### Example: Update stored thread metadata
Use `thread/metadata/update` to patch sqlite-backed metadata for a thread without resuming it. Today this supports persisted `gitInfo`; omitted fields are left unchanged, while explicit `null` clears a stored value.

View File

@@ -148,11 +148,14 @@ use codex_app_server_protocol::ThreadDecrementElicitationParams;
use codex_app_server_protocol::ThreadDecrementElicitationResponse;
use codex_app_server_protocol::ThreadForkParams;
use codex_app_server_protocol::ThreadForkResponse;
use codex_app_server_protocol::ThreadHistoryItem;
use codex_app_server_protocol::ThreadIncrementElicitationParams;
use codex_app_server_protocol::ThreadIncrementElicitationResponse;
use codex_app_server_protocol::ThreadInjectItemsParams;
use codex_app_server_protocol::ThreadInjectItemsResponse;
use codex_app_server_protocol::ThreadItem;
use codex_app_server_protocol::ThreadItemsListParams;
use codex_app_server_protocol::ThreadItemsListResponse;
use codex_app_server_protocol::ThreadListCwdFilter;
use codex_app_server_protocol::ThreadListParams;
use codex_app_server_protocol::ThreadListResponse;
@@ -340,6 +343,7 @@ use codex_rmcp_client::perform_oauth_login_return_url;
use codex_rollout::state_db::StateDbHandle;
use codex_rollout::state_db::get_state_db;
use codex_rollout::state_db::reconcile_rollout;
use codex_rollout::state_db::sync_renderable_thread_items;
use codex_state::StateRuntime;
use codex_state::ThreadMetadata;
use codex_state::ThreadMetadataBuilder;
@@ -956,6 +960,10 @@ impl CodexMessageProcessor {
self.thread_turns_list(to_connection_request_id(request_id), params)
.await;
}
ClientRequest::ThreadItemsList { request_id, params } => {
self.thread_items_list(to_connection_request_id(request_id), params)
.await;
}
ClientRequest::ThreadShellCommand { request_id, params } => {
self.thread_shell_command(to_connection_request_id(request_id), params)
.await;
@@ -4354,6 +4362,178 @@ impl CodexMessageProcessor {
}
}
async fn thread_items_list(
&self,
request_id: ConnectionRequestId,
params: ThreadItemsListParams,
) {
let ThreadItemsListParams {
thread_id,
cursor,
limit,
sort_direction,
} = params;
let thread_uuid = match ThreadId::from_string(&thread_id) {
Ok(id) => id,
Err(err) => {
self.send_invalid_request_error(request_id, format!("invalid thread id: {err}"))
.await;
return;
}
};
let Some(state_db_ctx) = open_state_db_for_direct_thread_lookup(&self.config).await else {
self.send_internal_error(
request_id,
format!("sqlite state db unavailable for thread {thread_uuid}"),
)
.await;
return;
};
let page_size = limit
.map(|value| value as usize)
.unwrap_or(THREAD_TURNS_DEFAULT_LIMIT)
.clamp(1, THREAD_TURNS_MAX_LIMIT);
let sort_direction = sort_direction.unwrap_or(SortDirection::Desc);
let anchor = match cursor.as_deref() {
Some(cursor) => match parse_thread_items_anchor(cursor) {
Ok(anchor) => Some(anchor),
Err(error) => {
self.outgoing.send_error(request_id, error).await;
return;
}
},
None => None,
};
let mut page = match state_db_ctx
.list_thread_items(
&thread_uuid.to_string(),
page_size,
anchor.as_ref(),
match sort_direction {
SortDirection::Asc => codex_state::SortDirection::Asc,
SortDirection::Desc => codex_state::SortDirection::Desc,
},
)
.await
{
Ok(page) => page,
Err(err) => {
self.send_internal_error(
request_id,
format!("failed to query sqlite thread items for {thread_uuid}: {err}"),
)
.await;
return;
}
};
let mut rollout_path = None;
if page.items.is_empty() {
rollout_path = self
.resolve_rollout_path(thread_uuid, Some(&state_db_ctx))
.await;
if rollout_path.is_none() {
rollout_path = match find_thread_path_by_id_str(
&self.config.codex_home,
&thread_uuid.to_string(),
)
.await
{
Ok(Some(path)) => Some(path),
Ok(None) => match find_archived_thread_path_by_id_str(
&self.config.codex_home,
&thread_uuid.to_string(),
)
.await
{
Ok(path) => path,
Err(err) => {
self.send_invalid_request_error(
request_id,
format!("failed to locate archived thread id {thread_uuid}: {err}"),
)
.await;
return;
}
},
Err(err) => {
self.send_invalid_request_error(
request_id,
format!("failed to locate thread id {thread_uuid}: {err}"),
)
.await;
return;
}
};
}
}
if page.items.is_empty()
&& let Some(rollout_path) = rollout_path
{
sync_renderable_thread_items(
Some(state_db_ctx.as_ref()),
thread_uuid,
rollout_path.as_path(),
"thread_items_list",
)
.await;
page = match state_db_ctx
.list_thread_items(
&thread_uuid.to_string(),
page_size,
anchor.as_ref(),
match sort_direction {
SortDirection::Asc => codex_state::SortDirection::Asc,
SortDirection::Desc => codex_state::SortDirection::Desc,
},
)
.await
{
Ok(page) => page,
Err(err) => {
self.send_internal_error(
request_id,
format!(
"failed to query repaired sqlite thread items for {thread_uuid}: {err}"
),
)
.await;
return;
}
};
}
let backwards_cursor = page
.items
.first()
.and_then(|item| thread_items_backwards_cursor(item.item_at, sort_direction));
let data = match page
.items
.into_iter()
.map(thread_item_record_to_history_item)
.collect::<Result<Vec<_>, JSONRPCErrorError>>()
{
Ok(data) => data,
Err(error) => {
self.outgoing.send_error(request_id, error).await;
return;
}
};
let response = ThreadItemsListResponse {
data,
next_cursor: page
.next_anchor
.map(|anchor| anchor.ts.to_rfc3339_opts(SecondsFormat::Millis, true)),
backwards_cursor,
};
self.outgoing.send_response(request_id, response).await;
}
pub(crate) fn thread_created_receiver(&self) -> broadcast::Receiver<ThreadId> {
self.thread_manager.subscribe_thread_created()
}
@@ -10027,6 +10207,75 @@ fn thread_backwards_cursor_for_sort_key(
Some(timestamp.to_rfc3339_opts(SecondsFormat::Millis, true))
}
fn parse_thread_items_anchor(cursor: &str) -> Result<codex_state::Anchor, JSONRPCErrorError> {
let Some(ts) = parse_datetime(Some(cursor)) else {
return Err(JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: format!("invalid cursor: {cursor}"),
data: None,
});
};
Ok(codex_state::Anchor { ts })
}
fn thread_items_backwards_cursor(
item_at: DateTime<Utc>,
sort_direction: SortDirection,
) -> Option<String> {
let timestamp = match sort_direction {
SortDirection::Asc => item_at.checked_add_signed(ChronoDuration::milliseconds(1))?,
SortDirection::Desc => item_at.checked_sub_signed(ChronoDuration::milliseconds(1))?,
};
Some(timestamp.to_rfc3339_opts(SecondsFormat::Millis, true))
}
fn thread_item_record_to_history_item(
record: codex_state::ThreadItemRecord,
) -> Result<ThreadHistoryItem, JSONRPCErrorError> {
let item = serde_json::from_str::<ThreadItem>(&record.payload_json).map_err(|err| {
JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message: format!(
"failed to deserialize persisted thread item {}: {err}",
record.item_id
),
data: None,
}
})?;
let turn_status =
serde_json::from_value::<TurnStatus>(serde_json::Value::String(record.turn_status.clone()))
.map_err(|err| JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message: format!(
"failed to deserialize persisted turn status {}: {err}",
record.turn_status
),
data: None,
})?;
let turn_error = record
.turn_error_json
.as_deref()
.map(serde_json::from_str::<TurnError>)
.transpose()
.map_err(|err| JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message: format!(
"failed to deserialize persisted turn error for {}: {err}",
record.item_id
),
data: None,
})?;
Ok(ThreadHistoryItem {
turn_id: record.turn_id,
item,
turn_status,
turn_error,
turn_started_at: record.turn_started_at,
turn_completed_at: record.turn_completed_at,
turn_duration_ms: record.turn_duration_ms,
})
}
struct ThreadTurnsPage {
turns: Vec<Turn>,
next_cursor: Option<String>,

View File

@@ -66,6 +66,7 @@ use codex_app_server_protocol::ThreadArchiveParams;
use codex_app_server_protocol::ThreadCompactStartParams;
use codex_app_server_protocol::ThreadForkParams;
use codex_app_server_protocol::ThreadInjectItemsParams;
use codex_app_server_protocol::ThreadItemsListParams;
use codex_app_server_protocol::ThreadListParams;
use codex_app_server_protocol::ThreadLoadedListParams;
use codex_app_server_protocol::ThreadMemoryModeSetParams;
@@ -486,6 +487,15 @@ impl McpProcess {
self.send_request("thread/turns/list", params).await
}
/// Send a `thread/items/list` JSON-RPC request.
pub async fn send_thread_items_list_request(
&mut self,
params: ThreadItemsListParams,
) -> anyhow::Result<i64> {
let params = Some(serde_json::to_value(params)?);
self.send_request("thread/items/list", params).await
}
/// Send a `model/list` JSON-RPC request.
pub async fn send_list_models_request(
&mut self,

View File

@@ -41,6 +41,7 @@ mod skills_list;
mod thread_archive;
mod thread_fork;
mod thread_inject_items;
mod thread_items_list;
mod thread_list;
mod thread_loaded_list;
mod thread_memory_mode_set;

View File

@@ -0,0 +1,394 @@
use anyhow::Result;
use app_test_support::McpProcess;
use app_test_support::test_absolute_path;
use app_test_support::to_response;
use codex_app_server_protocol::JSONRPCResponse;
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::SortDirection;
use codex_app_server_protocol::ThreadHistoryItem;
use codex_app_server_protocol::ThreadItem;
use codex_app_server_protocol::ThreadItemsListParams;
use codex_app_server_protocol::ThreadItemsListResponse;
use codex_protocol::protocol::AgentMessageEvent;
use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::ExecCommandEndEvent;
use codex_protocol::protocol::ExecCommandSource;
use codex_protocol::protocol::ExecCommandStatus;
use codex_protocol::protocol::ImageGenerationEndEvent;
use codex_protocol::protocol::RolloutItem;
use codex_protocol::protocol::RolloutLine;
use codex_protocol::protocol::SessionMeta;
use codex_protocol::protocol::SessionMetaLine;
use codex_protocol::protocol::SessionSource;
use codex_protocol::protocol::TurnCompleteEvent;
use codex_protocol::protocol::TurnStartedEvent;
use codex_protocol::protocol::UserMessageEvent;
use codex_rollout::state_db::sync_renderable_thread_items;
use std::io::Write;
use std::path::Path;
use std::time::Duration;
use tempfile::TempDir;
use tokio::time::timeout;
use uuid::Uuid;
#[cfg(windows)]
const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(25);
#[cfg(not(windows))]
const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
#[tokio::test]
async fn thread_items_list_pages_renderable_items_and_skips_command_execution() -> Result<()> {
let codex_home = TempDir::new()?;
let thread_id = write_thread_rollout(
codex_home.path(),
&[
base_turn_events("turn-1", "first", "assistant one")?,
user_turn("turn-2", "second")?,
]
.concat(),
)?;
create_config_toml(codex_home.path())?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let read_id = mcp
.send_thread_items_list_request(ThreadItemsListParams {
thread_id: thread_id.clone(),
cursor: None,
limit: Some(2),
sort_direction: Some(SortDirection::Desc),
})
.await?;
let read_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(read_id)),
)
.await??;
let ThreadItemsListResponse {
data, next_cursor, ..
} = to_response::<ThreadItemsListResponse>(read_resp)?;
assert_eq!(history_kinds(&data), vec!["userMessage", "imageGeneration"]);
let next_cursor = next_cursor.expect("expected next cursor");
let read_id = mcp
.send_thread_items_list_request(ThreadItemsListParams {
thread_id: thread_id.clone(),
cursor: Some(next_cursor),
limit: Some(10),
sort_direction: Some(SortDirection::Desc),
})
.await?;
let read_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(read_id)),
)
.await??;
let ThreadItemsListResponse { data, .. } = to_response::<ThreadItemsListResponse>(read_resp)?;
assert_eq!(history_kinds(&data), vec!["agentMessage", "userMessage"]);
let read_id = mcp
.send_thread_items_list_request(ThreadItemsListParams {
thread_id,
cursor: None,
limit: Some(10),
sort_direction: Some(SortDirection::Desc),
})
.await?;
let read_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(read_id)),
)
.await??;
let ThreadItemsListResponse { data, .. } = to_response::<ThreadItemsListResponse>(read_resp)?;
assert_eq!(
history_kinds(&data),
vec![
"userMessage",
"imageGeneration",
"agentMessage",
"userMessage",
]
);
Ok(())
}
#[tokio::test]
async fn thread_items_list_backwards_cursor_includes_anchor_for_newer_items() -> Result<()> {
let codex_home = TempDir::new()?;
let thread_id = write_thread_rollout(
codex_home.path(),
&[
user_turn("turn-1", "first")?,
user_turn("turn-2", "second")?,
]
.concat(),
)?;
create_config_toml(codex_home.path())?;
let rollout_path = rollout_path(codex_home.path(), &thread_id);
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let state_db =
codex_state::StateRuntime::init(codex_home.path().to_path_buf(), "mock_provider".into())
.await?;
let read_id = mcp
.send_thread_items_list_request(ThreadItemsListParams {
thread_id: thread_id.clone(),
cursor: None,
limit: Some(1),
sort_direction: Some(SortDirection::Desc),
})
.await?;
let read_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(read_id)),
)
.await??;
let ThreadItemsListResponse {
backwards_cursor, ..
} = to_response::<ThreadItemsListResponse>(read_resp)?;
let backwards_cursor = backwards_cursor.expect("expected backwards cursor");
append_rollout_events(rollout_path.as_path(), &user_turn("turn-3", "third")?)?;
sync_renderable_thread_items(
Some(state_db.as_ref()),
codex_protocol::ThreadId::from_string(&thread_id)?,
rollout_path.as_path(),
"thread_items_list_test",
)
.await;
let read_id = mcp
.send_thread_items_list_request(ThreadItemsListParams {
thread_id,
cursor: Some(backwards_cursor),
limit: Some(10),
sort_direction: Some(SortDirection::Asc),
})
.await?;
let read_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(read_id)),
)
.await??;
let ThreadItemsListResponse { data, .. } = to_response::<ThreadItemsListResponse>(read_resp)?;
assert_eq!(user_texts(&data), vec!["second", "third"]);
Ok(())
}
fn base_turn_events(turn_id: &str, user_text: &str, agent_text: &str) -> Result<Vec<RolloutLine>> {
let mut events = user_turn(turn_id, user_text)?;
events.insert(
2,
rollout_line(
"2025-01-05T12:00:02Z",
RolloutItem::EventMsg(EventMsg::AgentMessage(AgentMessageEvent {
message: agent_text.to_string(),
phase: None,
memory_citation: None,
})),
),
);
events.insert(
3,
rollout_line(
"2025-01-05T12:00:03Z",
RolloutItem::EventMsg(EventMsg::ExecCommandEnd(ExecCommandEndEvent {
call_id: "cmd-1".to_string(),
process_id: None,
turn_id: turn_id.to_string(),
command: vec!["echo".to_string(), "secret".to_string()],
cwd: test_absolute_path("/"),
parsed_cmd: Vec::new(),
source: ExecCommandSource::Agent,
interaction_input: None,
stdout: "secret".to_string(),
stderr: String::new(),
aggregated_output: "secret".to_string(),
exit_code: 0,
duration: Duration::from_millis(10),
formatted_output: "secret".to_string(),
status: ExecCommandStatus::Completed,
})),
),
);
events.insert(
4,
rollout_line(
"2025-01-05T12:00:04Z",
RolloutItem::EventMsg(EventMsg::ImageGenerationEnd(ImageGenerationEndEvent {
call_id: "img-1".to_string(),
status: "completed".to_string(),
revised_prompt: Some("draw cat".to_string()),
result: "https://example.com/generated.png".to_string(),
saved_path: None,
})),
),
);
Ok(events)
}
fn user_turn(turn_id: &str, text: &str) -> Result<Vec<RolloutLine>> {
Ok(vec![
rollout_line(
"2025-01-05T12:00:00Z",
RolloutItem::EventMsg(EventMsg::TurnStarted(TurnStartedEvent {
turn_id: turn_id.to_string(),
started_at: Some(1_736_078_400),
model_context_window: None,
collaboration_mode_kind: Default::default(),
})),
),
rollout_line(
"2025-01-05T12:00:01Z",
RolloutItem::EventMsg(EventMsg::UserMessage(UserMessageEvent {
message: text.to_string(),
images: Some(Vec::new()),
local_images: Vec::new(),
text_elements: Vec::new(),
})),
),
rollout_line(
"2025-01-05T12:00:05Z",
RolloutItem::EventMsg(EventMsg::TurnComplete(TurnCompleteEvent {
turn_id: turn_id.to_string(),
last_agent_message: None,
completed_at: Some(1_736_078_405),
duration_ms: Some(5_000),
time_to_first_token_ms: None,
})),
),
])
}
fn write_thread_rollout(codex_home: &Path, events: &[RolloutLine]) -> Result<String> {
let thread_id = Uuid::now_v7().to_string();
let rollout_path = rollout_path(codex_home, &thread_id);
let Some(parent) = rollout_path.parent() else {
anyhow::bail!(
"rollout path should have parent: {}",
rollout_path.display()
);
};
std::fs::create_dir_all(parent)?;
let session_meta = RolloutLine {
timestamp: "2025-01-05T12:00:00Z".to_string(),
item: RolloutItem::SessionMeta(SessionMetaLine {
meta: SessionMeta {
id: codex_protocol::ThreadId::from_string(&thread_id)?,
forked_from_id: None,
timestamp: "2025-01-05T12:00:00Z".to_string(),
cwd: test_absolute_path("/").into(),
originator: "test".to_string(),
cli_version: "0.0.0".to_string(),
source: SessionSource::Cli,
agent_path: None,
agent_nickname: None,
agent_role: None,
model_provider: Some("mock_provider".to_string()),
base_instructions: None,
dynamic_tools: None,
memory_mode: None,
},
git: None,
}),
};
let mut lines = vec![session_meta];
lines.extend_from_slice(events);
let jsonl = lines
.iter()
.map(serde_json::to_string)
.collect::<std::result::Result<Vec<_>, _>>()?
.join("\n");
std::fs::write(rollout_path, format!("{jsonl}\n"))?;
Ok(thread_id)
}
fn append_rollout_events(path: &Path, events: &[RolloutLine]) -> Result<()> {
let mut file = std::fs::OpenOptions::new().append(true).open(path)?;
for line in events {
writeln!(file, "{}", serde_json::to_string(line)?)?;
}
Ok(())
}
fn rollout_line(timestamp: &str, item: RolloutItem) -> RolloutLine {
RolloutLine {
timestamp: timestamp.to_string(),
item,
}
}
fn rollout_path(codex_home: &Path, thread_id: &str) -> std::path::PathBuf {
codex_home.join(format!(
"sessions/2025/01/05/rollout-2025-01-05T12-00-00-{thread_id}.jsonl"
))
}
fn history_kinds(items: &[ThreadHistoryItem]) -> Vec<&'static str> {
items
.iter()
.map(|item| match item.item {
ThreadItem::UserMessage { .. } => "userMessage",
ThreadItem::AgentMessage { .. } => "agentMessage",
ThreadItem::ImageGeneration { .. } => "imageGeneration",
ThreadItem::HookPrompt { .. } => "hookPrompt",
ThreadItem::Plan { .. } => "plan",
ThreadItem::Reasoning { .. } => "reasoning",
ThreadItem::CommandExecution { .. } => "commandExecution",
ThreadItem::FileChange { .. } => "fileChange",
ThreadItem::McpToolCall { .. } => "mcpToolCall",
ThreadItem::DynamicToolCall { .. } => "dynamicToolCall",
ThreadItem::CollabAgentToolCall { .. } => "collabAgentToolCall",
ThreadItem::WebSearch { .. } => "webSearch",
ThreadItem::ImageView { .. } => "imageView",
ThreadItem::EnteredReviewMode { .. } => "enteredReviewMode",
ThreadItem::ExitedReviewMode { .. } => "exitedReviewMode",
ThreadItem::ContextCompaction { .. } => "contextCompaction",
})
.collect()
}
fn user_texts(items: &[ThreadHistoryItem]) -> Vec<&str> {
items
.iter()
.filter_map(|item| match &item.item {
ThreadItem::UserMessage { content, .. } => match content.first()? {
codex_app_server_protocol::UserInput::Text { text, .. } => Some(text.as_str()),
codex_app_server_protocol::UserInput::Image { .. }
| codex_app_server_protocol::UserInput::LocalImage { .. }
| codex_app_server_protocol::UserInput::Skill { .. }
| codex_app_server_protocol::UserInput::Mention { .. } => None,
},
_ => None,
})
.collect()
}
fn create_config_toml(codex_home: &Path) -> std::io::Result<()> {
let config_toml = codex_home.join("config.toml");
std::fs::write(
config_toml,
r#"
model = "mock-model"
model_provider = "mock_provider"
approval_policy = "never"
suppress_unstable_features_warning = true
[features]
sqlite = true
[model_providers.mock_provider]
name = "Mock provider for test"
base_url = "http://127.0.0.1:1/v1"
wire_api = "responses"
request_max_retries = 0
stream_max_retries = 0
"#,
)
}

View File

@@ -16,6 +16,7 @@ workspace = true
anyhow = { workspace = true }
async-trait = { workspace = true }
chrono = { workspace = true, features = ["serde"] }
codex-app-server-protocol = { workspace = true }
codex-file-search = { workspace = true }
codex-git-utils = { workspace = true }
codex-login = { workspace = true }

View File

@@ -11,6 +11,7 @@ pub(crate) mod policy;
pub(crate) mod recorder;
pub(crate) mod session_index;
pub mod state_db;
pub(crate) mod thread_items;
pub(crate) mod default_client {
pub use codex_login::default_client::*;

View File

@@ -5,6 +5,7 @@ use crate::list;
use crate::list::parse_timestamp_uuid_from_filename;
use crate::recorder::RolloutRecorder;
use crate::state_db::normalize_cwd_for_state_db;
use crate::thread_items::build_persisted_thread_items;
use chrono::DateTime;
use chrono::NaiveDateTime;
use chrono::Timelike;
@@ -290,6 +291,38 @@ pub(crate) async fn backfill_sessions(
rollout.path.display()
);
}
match RolloutRecorder::load_rollout_items(&rollout.path).await {
Ok((items, _, _)) => {
match build_persisted_thread_items(items.as_slice()) {
Ok(persisted_items) => {
if let Err(err) = runtime
.replace_thread_items(
&metadata.id.to_string(),
persisted_items.as_slice(),
)
.await
{
warn!(
"failed to backfill thread items {}: {err}",
rollout.path.display()
);
}
}
Err(err) => {
warn!(
"failed to build thread items during backfill {}: {err}",
rollout.path.display()
);
}
}
}
Err(err) => {
warn!(
"failed to reload rollout items during backfill {}: {err}",
rollout.path.display()
);
}
}
}
}
Err(err) => {

View File

@@ -1696,6 +1696,15 @@ async fn sync_thread_state_after_write(
if state_db::touch_thread_updated_at(state_db_ctx, thread_id, updated_at, "rollout_writer")
.await
{
if let Some(thread_id) = thread_id {
state_db::sync_renderable_thread_items(
state_db_ctx,
thread_id,
rollout_path,
"rollout_writer",
)
.await;
}
return;
}
state_db::apply_rollout_items(

View File

@@ -4,6 +4,7 @@ use crate::list::Cursor;
use crate::list::SortDirection;
use crate::list::ThreadSortKey;
use crate::metadata;
use crate::thread_items::build_persisted_thread_items;
use chrono::DateTime;
use chrono::Utc;
use codex_protocol::ThreadId;
@@ -414,6 +415,7 @@ pub async fn reconcile_rollout(
rollout_path.display()
);
}
sync_renderable_thread_items(Some(ctx), metadata.id, rollout_path, "reconcile_rollout").await;
}
/// Repair a thread's rollout path after filesystem fallback succeeds.
@@ -521,7 +523,9 @@ pub async fn apply_rollout_items(
"state db apply_rollout_items failed during {stage} for {}: {err}",
rollout_path.display()
);
return;
}
sync_renderable_thread_items(Some(ctx), builder.id, rollout_path, stage).await;
}
pub async fn touch_thread_updated_at(
@@ -544,6 +548,47 @@ pub async fn touch_thread_updated_at(
})
}
pub async fn sync_renderable_thread_items(
context: Option<&codex_state::StateRuntime>,
thread_id: ThreadId,
rollout_path: &Path,
stage: &str,
) {
let Some(ctx) = context else {
return;
};
let rollout_items =
match crate::recorder::RolloutRecorder::load_rollout_items(rollout_path).await {
Ok((items, _, _)) => items,
Err(err) => {
warn!(
"state db renderable-item sync failed during {stage} for {}: {err}",
rollout_path.display()
);
return;
}
};
let persisted_items = match build_persisted_thread_items(rollout_items.as_slice()) {
Ok(items) => items,
Err(err) => {
warn!(
"state db renderable-item build failed during {stage} for {}: {err}",
rollout_path.display()
);
return;
}
};
if let Err(err) = ctx
.replace_thread_items(&thread_id.to_string(), persisted_items.as_slice())
.await
{
warn!(
"state db renderable-item replace failed during {stage} for {}: {err}",
rollout_path.display()
);
}
}
#[cfg(test)]
#[path = "state_db_tests.rs"]
mod tests;

View File

@@ -0,0 +1,168 @@
use chrono::DateTime;
use chrono::Utc;
use codex_app_server_protocol::ThreadItem as ApiThreadItem;
use codex_app_server_protocol::Turn;
use codex_app_server_protocol::TurnError;
use codex_app_server_protocol::build_turns_from_rollout_items;
use codex_protocol::protocol::RolloutItem;
use codex_state::ThreadItemRecordInsert;
/// Build the lightweight, renderable thread-item subset we persist in SQLite.
pub(crate) fn build_persisted_thread_items(
items: &[RolloutItem],
) -> anyhow::Result<Vec<ThreadItemRecordInsert>> {
let turns = build_turns_from_rollout_items(items);
let mut persisted = Vec::new();
let mut last_item_at_ms = None;
for turn in turns {
for (index, item) in turn.items.iter().enumerate() {
if !should_persist_item(item) {
continue;
}
let item_at = item_timestamp_for_turn(&turn, index, last_item_at_ms)?;
last_item_at_ms = Some(item_at.timestamp_millis());
persisted.push(ThreadItemRecordInsert {
turn_id: turn.id.clone(),
item_id: item.id().to_string(),
item_kind: item_kind(item).to_string(),
item_at,
turn_status: serde_json::to_value(turn.status.clone())?
.as_str()
.unwrap_or_default()
.to_string(),
turn_error_json: serialize_turn_error(turn.error.as_ref())?,
turn_started_at: turn.started_at,
turn_completed_at: turn.completed_at,
turn_duration_ms: turn.duration_ms,
search_text: search_text(item),
payload_json: serde_json::to_string(item)?,
});
}
}
Ok(persisted)
}
fn should_persist_item(item: &ApiThreadItem) -> bool {
matches!(
item,
ApiThreadItem::UserMessage { .. }
| ApiThreadItem::HookPrompt { .. }
| ApiThreadItem::AgentMessage { .. }
| ApiThreadItem::Plan { .. }
| ApiThreadItem::Reasoning { .. }
| ApiThreadItem::WebSearch { .. }
| ApiThreadItem::ImageView { .. }
| ApiThreadItem::ImageGeneration { .. }
| ApiThreadItem::EnteredReviewMode { .. }
| ApiThreadItem::ExitedReviewMode { .. }
| ApiThreadItem::ContextCompaction { .. }
)
}
fn item_kind(item: &ApiThreadItem) -> &'static str {
match item {
ApiThreadItem::UserMessage { .. } => "userMessage",
ApiThreadItem::HookPrompt { .. } => "hookPrompt",
ApiThreadItem::AgentMessage { .. } => "agentMessage",
ApiThreadItem::Plan { .. } => "plan",
ApiThreadItem::Reasoning { .. } => "reasoning",
ApiThreadItem::CommandExecution { .. } => "commandExecution",
ApiThreadItem::FileChange { .. } => "fileChange",
ApiThreadItem::McpToolCall { .. } => "mcpToolCall",
ApiThreadItem::DynamicToolCall { .. } => "dynamicToolCall",
ApiThreadItem::CollabAgentToolCall { .. } => "collabAgentToolCall",
ApiThreadItem::WebSearch { .. } => "webSearch",
ApiThreadItem::ImageView { .. } => "imageView",
ApiThreadItem::ImageGeneration { .. } => "imageGeneration",
ApiThreadItem::EnteredReviewMode { .. } => "enteredReviewMode",
ApiThreadItem::ExitedReviewMode { .. } => "exitedReviewMode",
ApiThreadItem::ContextCompaction { .. } => "contextCompaction",
}
}
fn serialize_turn_error(turn_error: Option<&TurnError>) -> anyhow::Result<Option<String>> {
turn_error
.map(serde_json::to_string)
.transpose()
.map_err(Into::into)
}
fn item_timestamp_for_turn(
turn: &Turn,
index: usize,
last_item_at_ms: Option<i64>,
) -> anyhow::Result<DateTime<Utc>> {
let base_ms = turn
.started_at
.or(turn.completed_at)
.map(|seconds| seconds.saturating_mul(1000))
.or(last_item_at_ms.map(|value| value.saturating_add(1)))
.unwrap_or_else(|| i64::try_from(index).unwrap_or(i64::MAX));
let candidate_ms = base_ms.saturating_add(i64::try_from(index).unwrap_or(i64::MAX));
DateTime::<Utc>::from_timestamp_millis(candidate_ms)
.ok_or_else(|| anyhow::anyhow!("invalid thread item timestamp millis: {candidate_ms}"))
}
fn search_text(item: &ApiThreadItem) -> String {
match item {
ApiThreadItem::UserMessage { content, .. } => content
.iter()
.map(|entry| match entry {
codex_app_server_protocol::UserInput::Text { text, .. } => text.clone(),
codex_app_server_protocol::UserInput::Image { url } => url.clone(),
codex_app_server_protocol::UserInput::LocalImage { path } => {
path.display().to_string()
}
codex_app_server_protocol::UserInput::Skill { name, path } => {
format!("{name} {}", path.display())
}
codex_app_server_protocol::UserInput::Mention { name, path } => {
format!("{name} {path}")
}
})
.collect::<Vec<_>>()
.join("\n"),
ApiThreadItem::HookPrompt { fragments, .. } => fragments
.iter()
.map(|fragment| fragment.text.clone())
.collect::<Vec<_>>()
.join("\n"),
ApiThreadItem::AgentMessage { text, .. } => text.clone(),
ApiThreadItem::Plan { text, .. } => text.clone(),
ApiThreadItem::Reasoning {
summary, content, ..
} => summary
.iter()
.chain(content.iter())
.cloned()
.collect::<Vec<_>>()
.join("\n"),
ApiThreadItem::WebSearch { query, .. } => query.clone(),
ApiThreadItem::ImageView { path, .. } => path.display().to_string(),
ApiThreadItem::ImageGeneration {
revised_prompt,
saved_path,
..
} => [
revised_prompt.clone().unwrap_or_default(),
saved_path
.as_ref()
.map(|path| path.display().to_string())
.unwrap_or_default(),
]
.into_iter()
.filter(|value| !value.is_empty())
.collect::<Vec<_>>()
.join("\n"),
ApiThreadItem::EnteredReviewMode { review, .. }
| ApiThreadItem::ExitedReviewMode { review, .. } => review.clone(),
ApiThreadItem::ContextCompaction { .. }
| ApiThreadItem::CommandExecution { .. }
| ApiThreadItem::FileChange { .. }
| ApiThreadItem::McpToolCall { .. }
| ApiThreadItem::DynamicToolCall { .. }
| ApiThreadItem::CollabAgentToolCall { .. } => String::new(),
}
}

View File

@@ -0,0 +1,18 @@
CREATE TABLE thread_items (
thread_id TEXT NOT NULL,
turn_id TEXT NOT NULL,
item_id TEXT NOT NULL,
item_kind TEXT NOT NULL,
item_at_ms INTEGER NOT NULL,
turn_status TEXT NOT NULL,
turn_error_json TEXT,
turn_started_at INTEGER,
turn_completed_at INTEGER,
turn_duration_ms INTEGER,
search_text TEXT NOT NULL,
payload_json TEXT NOT NULL,
PRIMARY KEY (thread_id, item_id)
);
CREATE INDEX idx_thread_items_thread_item_at_ms
ON thread_items(thread_id, item_at_ms DESC);

View File

@@ -44,6 +44,9 @@ pub use model::Stage1JobClaimOutcome;
pub use model::Stage1Output;
pub use model::Stage1OutputRef;
pub use model::Stage1StartupClaimParams;
pub use model::ThreadItemRecord;
pub use model::ThreadItemRecordInsert;
pub use model::ThreadItemsPage;
pub use model::ThreadMetadata;
pub use model::ThreadMetadataBuilder;
pub use model::ThreadsPage;
@@ -60,7 +63,7 @@ pub const SQLITE_HOME_ENV: &str = "CODEX_SQLITE_HOME";
pub const LOGS_DB_FILENAME: &str = "logs";
pub const LOGS_DB_VERSION: u32 = 2;
pub const STATE_DB_FILENAME: &str = "state";
pub const STATE_DB_VERSION: u32 = 5;
pub const STATE_DB_VERSION: u32 = 6;
/// Errors encountered during DB operations. Tags: [stage]
pub const DB_ERROR_METRIC: &str = "codex.db.error";

View File

@@ -3,6 +3,7 @@ mod backfill_state;
mod graph;
mod log;
mod memories;
mod thread_item;
mod thread_metadata;
pub use agent_job::AgentJob;
@@ -25,6 +26,9 @@ pub use memories::Stage1JobClaimOutcome;
pub use memories::Stage1Output;
pub use memories::Stage1OutputRef;
pub use memories::Stage1StartupClaimParams;
pub use thread_item::ThreadItemRecord;
pub use thread_item::ThreadItemRecordInsert;
pub use thread_item::ThreadItemsPage;
pub use thread_metadata::Anchor;
pub use thread_metadata::BackfillStats;
pub use thread_metadata::ExtractionOutcome;
@@ -38,6 +42,8 @@ pub(crate) use agent_job::AgentJobItemRow;
pub(crate) use agent_job::AgentJobRow;
pub(crate) use memories::Stage1OutputRow;
pub(crate) use memories::stage1_output_ref_from_parts;
pub(crate) use thread_item::ThreadItemRow;
pub(crate) use thread_item::anchor_from_thread_item;
pub(crate) use thread_metadata::ThreadRow;
pub(crate) use thread_metadata::anchor_from_item;
pub(crate) use thread_metadata::datetime_to_epoch_millis;

View File

@@ -0,0 +1,109 @@
use anyhow::Result;
use chrono::DateTime;
use chrono::Utc;
use sqlx::Row;
use sqlx::sqlite::SqliteRow;
use super::Anchor;
use super::epoch_millis_to_datetime;
/// A lightweight, renderable item persisted for thread history pagination.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ThreadItemRecord {
pub thread_id: String,
pub turn_id: String,
pub item_id: String,
pub item_kind: String,
pub item_at: DateTime<Utc>,
pub turn_status: String,
pub turn_error_json: Option<String>,
pub turn_started_at: Option<i64>,
pub turn_completed_at: Option<i64>,
pub turn_duration_ms: Option<i64>,
pub search_text: String,
pub payload_json: String,
}
/// Insert payload for a lightweight persisted thread item.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ThreadItemRecordInsert {
pub turn_id: String,
pub item_id: String,
pub item_kind: String,
pub item_at: DateTime<Utc>,
pub turn_status: String,
pub turn_error_json: Option<String>,
pub turn_started_at: Option<i64>,
pub turn_completed_at: Option<i64>,
pub turn_duration_ms: Option<i64>,
pub search_text: String,
pub payload_json: String,
}
/// A single page of persisted thread-item results.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ThreadItemsPage {
pub items: Vec<ThreadItemRecord>,
pub next_anchor: Option<Anchor>,
pub num_scanned_rows: usize,
}
#[derive(Debug)]
pub(crate) struct ThreadItemRow {
thread_id: String,
turn_id: String,
item_id: String,
item_kind: String,
item_at: i64,
turn_status: String,
turn_error_json: Option<String>,
turn_started_at: Option<i64>,
turn_completed_at: Option<i64>,
turn_duration_ms: Option<i64>,
search_text: String,
payload_json: String,
}
impl ThreadItemRow {
pub(crate) fn try_from_row(row: &SqliteRow) -> Result<Self> {
Ok(Self {
thread_id: row.try_get("thread_id")?,
turn_id: row.try_get("turn_id")?,
item_id: row.try_get("item_id")?,
item_kind: row.try_get("item_kind")?,
item_at: row.try_get("item_at")?,
turn_status: row.try_get("turn_status")?,
turn_error_json: row.try_get("turn_error_json")?,
turn_started_at: row.try_get("turn_started_at")?,
turn_completed_at: row.try_get("turn_completed_at")?,
turn_duration_ms: row.try_get("turn_duration_ms")?,
search_text: row.try_get("search_text")?,
payload_json: row.try_get("payload_json")?,
})
}
}
impl TryFrom<ThreadItemRow> for ThreadItemRecord {
type Error = anyhow::Error;
fn try_from(value: ThreadItemRow) -> Result<Self> {
Ok(Self {
thread_id: value.thread_id,
turn_id: value.turn_id,
item_id: value.item_id,
item_kind: value.item_kind,
item_at: epoch_millis_to_datetime(value.item_at)?,
turn_status: value.turn_status,
turn_error_json: value.turn_error_json,
turn_started_at: value.turn_started_at,
turn_completed_at: value.turn_completed_at,
turn_duration_ms: value.turn_duration_ms,
search_text: value.search_text,
payload_json: value.payload_json,
})
}
}
pub(crate) fn anchor_from_thread_item(item: &ThreadItemRecord) -> Anchor {
Anchor { ts: item.item_at }
}

View File

@@ -60,6 +60,7 @@ mod memories;
mod remote_control;
#[cfg(test)]
mod test_support;
mod thread_items;
mod threads;
pub use remote_control::RemoteControlEnrollmentRecord;
@@ -81,6 +82,7 @@ pub struct StateRuntime {
pool: Arc<sqlx::SqlitePool>,
logs_pool: Arc<sqlx::SqlitePool>,
thread_updated_at_millis: Arc<AtomicI64>,
thread_item_at_millis: Arc<AtomicI64>,
}
impl StateRuntime {
@@ -130,12 +132,18 @@ impl StateRuntime {
.fetch_one(pool.as_ref())
.await?;
let thread_updated_at_millis = thread_updated_at_millis.unwrap_or(0);
let thread_item_at_millis: Option<i64> =
sqlx::query_scalar("SELECT MAX(thread_items.item_at_ms) FROM thread_items")
.fetch_one(pool.as_ref())
.await?;
let thread_item_at_millis = thread_item_at_millis.unwrap_or(0);
let runtime = Arc::new(Self {
pool,
logs_pool,
codex_home,
default_provider,
thread_updated_at_millis: Arc::new(AtomicI64::new(thread_updated_at_millis)),
thread_item_at_millis: Arc::new(AtomicI64::new(thread_item_at_millis)),
});
if let Err(err) = runtime.run_logs_startup_maintenance().await {
warn!(

View File

@@ -0,0 +1,349 @@
use super::*;
use crate::SortDirection;
use crate::ThreadItemRecordInsert;
use crate::ThreadItemsPage;
use crate::model::ThreadItemRow;
use crate::model::anchor_from_thread_item;
use std::collections::HashMap;
use std::collections::HashSet;
use std::sync::atomic::Ordering;
impl StateRuntime {
pub async fn replace_thread_items(
&self,
thread_id: &str,
items: &[ThreadItemRecordInsert],
) -> anyhow::Result<()> {
let existing_rows =
sqlx::query("SELECT item_id, item_at_ms FROM thread_items WHERE thread_id = ?")
.bind(thread_id)
.fetch_all(self.pool.as_ref())
.await?;
let existing_item_at_millis: HashMap<String, i64> = existing_rows
.into_iter()
.filter_map(|row| {
let item_id = row.try_get::<String, _>("item_id").ok()?;
let item_at_ms = row.try_get::<i64, _>("item_at_ms").ok()?;
Some((item_id, item_at_ms))
})
.collect();
let mut assigned_item_at_millis = HashSet::new();
let mut tx = self.pool.begin().await?;
sqlx::query("DELETE FROM thread_items WHERE thread_id = ?")
.bind(thread_id)
.execute(&mut *tx)
.await?;
for item in items {
let mut item_at_ms =
if let Some(item_at_ms) = existing_item_at_millis.get(&item.item_id) {
*item_at_ms
} else {
datetime_to_epoch_millis(self.allocate_thread_item_at(item.item_at)?)
};
while !assigned_item_at_millis.insert(item_at_ms) {
item_at_ms = item_at_ms.saturating_add(1);
}
sqlx::query(
r#"
INSERT INTO thread_items (
thread_id,
turn_id,
item_id,
item_kind,
item_at_ms,
turn_status,
turn_error_json,
turn_started_at,
turn_completed_at,
turn_duration_ms,
search_text,
payload_json
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
"#,
)
.bind(thread_id)
.bind(item.turn_id.as_str())
.bind(item.item_id.as_str())
.bind(item.item_kind.as_str())
.bind(item_at_ms)
.bind(item.turn_status.as_str())
.bind(item.turn_error_json.as_deref())
.bind(item.turn_started_at)
.bind(item.turn_completed_at)
.bind(item.turn_duration_ms)
.bind(item.search_text.as_str())
.bind(item.payload_json.as_str())
.execute(&mut *tx)
.await?;
}
tx.commit().await?;
Ok(())
}
pub async fn list_thread_items(
&self,
thread_id: &str,
page_size: usize,
anchor: Option<&crate::Anchor>,
sort_direction: crate::SortDirection,
) -> anyhow::Result<crate::ThreadItemsPage> {
let limit = page_size.saturating_add(1);
let mut builder = QueryBuilder::<Sqlite>::new(
r#"
SELECT
thread_id,
turn_id,
item_id,
item_kind,
item_at_ms AS item_at,
turn_status,
turn_error_json,
turn_started_at,
turn_completed_at,
turn_duration_ms,
search_text,
payload_json
FROM thread_items
WHERE thread_id =
"#,
);
builder.push_bind(thread_id);
if let Some(anchor) = anchor {
let item_at_ms = datetime_to_epoch_millis(anchor.ts);
match sort_direction {
SortDirection::Asc => builder.push(" AND item_at_ms > ").push_bind(item_at_ms),
SortDirection::Desc => builder.push(" AND item_at_ms < ").push_bind(item_at_ms),
};
}
let sort_sql = match sort_direction {
SortDirection::Asc => " ASC",
SortDirection::Desc => " DESC",
};
builder
.push(" ORDER BY item_at_ms")
.push(sort_sql)
.push(" LIMIT ")
.push_bind(i64::try_from(limit).unwrap_or(i64::MAX));
let rows = builder.build().fetch_all(self.pool.as_ref()).await?;
let mut items = rows
.into_iter()
.map(|row| {
ThreadItemRow::try_from_row(&row).and_then(crate::ThreadItemRecord::try_from)
})
.collect::<Result<Vec<_>, _>>()?;
let num_scanned_rows = items.len();
let next_anchor = if items.len() > page_size {
items.pop();
items.last().map(anchor_from_thread_item)
} else {
None
};
Ok(ThreadItemsPage {
items,
next_anchor,
num_scanned_rows,
})
}
pub async fn get_thread_item(
&self,
thread_id: &str,
item_id: &str,
) -> anyhow::Result<Option<crate::ThreadItemRecord>> {
let row = sqlx::query(
r#"
SELECT
thread_id,
turn_id,
item_id,
item_kind,
item_at_ms AS item_at,
turn_status,
turn_error_json,
turn_started_at,
turn_completed_at,
turn_duration_ms,
search_text,
payload_json
FROM thread_items
WHERE thread_id = ? AND item_id = ?
"#,
)
.bind(thread_id)
.bind(item_id)
.fetch_optional(self.pool.as_ref())
.await?;
row.map(|row| ThreadItemRow::try_from_row(&row).and_then(crate::ThreadItemRecord::try_from))
.transpose()
}
fn allocate_thread_item_at(&self, item_at: DateTime<Utc>) -> anyhow::Result<DateTime<Utc>> {
let candidate = datetime_to_epoch_millis(item_at);
let allocated = loop {
let current = self.thread_item_at_millis.load(Ordering::Relaxed);
if candidate > current {
if self
.thread_item_at_millis
.compare_exchange(current, candidate, Ordering::Relaxed, Ordering::Relaxed)
.is_ok()
{
break candidate;
}
continue;
}
if candidate.saturating_add(1000) <= current {
break candidate;
}
let bumped = current.saturating_add(1);
if self
.thread_item_at_millis
.compare_exchange(current, bumped, Ordering::Relaxed, Ordering::Relaxed)
.is_ok()
{
break bumped;
}
};
epoch_millis_to_datetime(allocated)
}
}
#[cfg(test)]
mod tests {
use super::super::test_support::unique_temp_dir;
use super::StateRuntime;
use super::anchor_from_thread_item;
use crate::SortDirection;
use crate::ThreadItemRecordInsert;
use chrono::DateTime;
use chrono::Utc;
use pretty_assertions::assert_eq;
fn item(item_id: &str, item_at_ms: i64) -> ThreadItemRecordInsert {
ThreadItemRecordInsert {
turn_id: "turn-1".to_string(),
item_id: item_id.to_string(),
item_kind: "agentMessage".to_string(),
item_at: DateTime::<Utc>::from_timestamp_millis(item_at_ms).expect("timestamp millis"),
turn_status: "completed".to_string(),
turn_error_json: None,
turn_started_at: Some(item_at_ms / 1000),
turn_completed_at: Some(item_at_ms / 1000),
turn_duration_ms: Some(1),
search_text: format!("search {item_id}"),
payload_json: format!(
r#"{{"type":"agentMessage","id":"{item_id}","text":"{item_id}"}}"#
),
}
}
#[tokio::test]
async fn replace_thread_items_pages_and_reuses_existing_timestamps() {
let codex_home = unique_temp_dir();
let runtime = StateRuntime::init(codex_home, "test-provider".to_string())
.await
.expect("state db should initialize");
let thread_id = "thread-1";
runtime
.replace_thread_items(
thread_id,
&[
item("item-a", 1_700_001_111_123),
item("item-b", 1_700_001_111_123),
],
)
.await
.expect("initial replace should succeed");
let page = runtime
.list_thread_items(thread_id, 1, None, SortDirection::Desc)
.await
.expect("list should succeed");
assert_eq!(page.items.len(), 1);
assert_eq!(page.items[0].item_id, "item-b");
let next_anchor = page.next_anchor.expect("expected next anchor");
let older_page = runtime
.list_thread_items(thread_id, 10, Some(&next_anchor), SortDirection::Desc)
.await
.expect("older page should succeed");
assert_eq!(
older_page
.items
.iter()
.map(|item| item.item_id.as_str())
.collect::<Vec<_>>(),
vec!["item-a"]
);
let previous_item_b = runtime
.get_thread_item(thread_id, "item-b")
.await
.expect("item should load")
.expect("item should exist");
runtime
.replace_thread_items(
thread_id,
&[
item("item-b", 1_700_001_111_123),
item("item-c", 1_700_001_111_100),
],
)
.await
.expect("replacement should succeed");
let current_item_b = runtime
.get_thread_item(thread_id, "item-b")
.await
.expect("item should load")
.expect("item should exist");
assert_eq!(current_item_b.item_at, previous_item_b.item_at);
let asc_page = runtime
.list_thread_items(thread_id, 10, None, SortDirection::Asc)
.await
.expect("asc list should succeed");
assert_eq!(
asc_page
.items
.iter()
.map(|item| item.item_id.as_str())
.collect::<Vec<_>>(),
vec!["item-b", "item-c"]
);
assert!(asc_page.items[0].item_at < asc_page.items[1].item_at);
}
#[tokio::test]
async fn replace_thread_items_next_anchor_tracks_last_row_on_page() {
let codex_home = unique_temp_dir();
let runtime = StateRuntime::init(codex_home, "test-provider".to_string())
.await
.expect("state db should initialize");
let thread_id = "thread-2";
runtime
.replace_thread_items(
thread_id,
&[
item("item-a", 1_700_001_111_100),
item("item-b", 1_700_001_111_200),
item("item-c", 1_700_001_111_300),
],
)
.await
.expect("replace should succeed");
let page = runtime
.list_thread_items(thread_id, 2, None, SortDirection::Desc)
.await
.expect("list should succeed");
let anchor = page.next_anchor.expect("expected next anchor");
assert_eq!(anchor, anchor_from_thread_item(&page.items[1]));
}
}

View File

@@ -909,6 +909,10 @@ ON CONFLICT(thread_id, position) DO NOTHING
/// Delete a thread metadata row by id.
pub async fn delete_thread(&self, thread_id: ThreadId) -> anyhow::Result<u64> {
sqlx::query("DELETE FROM thread_items WHERE thread_id = ?")
.bind(thread_id.to_string())
.execute(self.pool.as_ref())
.await?;
let result = sqlx::query("DELETE FROM threads WHERE id = ?")
.bind(thread_id.to_string())
.execute(self.pool.as_ref())