Compare commits

...

1 Commits

Author SHA1 Message Date
Brent Traut
bbb721bf06 [codex] Add thread catalog subscription API 2026-06-03 01:14:09 -07:00
27 changed files with 1352 additions and 10 deletions

View File

@@ -4700,6 +4700,52 @@
"title": "Thread/listRequest",
"type": "object"
},
{
"properties": {
"id": {
"$ref": "#/definitions/RequestId"
},
"method": {
"enum": [
"thread/catalog/subscribe"
],
"title": "Thread/catalog/subscribeRequestMethod",
"type": "string"
},
"params": {
"type": "null"
}
},
"required": [
"id",
"method"
],
"title": "Thread/catalog/subscribeRequest",
"type": "object"
},
{
"properties": {
"id": {
"$ref": "#/definitions/RequestId"
},
"method": {
"enum": [
"thread/catalog/unsubscribe"
],
"title": "Thread/catalog/unsubscribeRequestMethod",
"type": "string"
},
"params": {
"type": "null"
}
},
"required": [
"id",
"method"
],
"title": "Thread/catalog/unsubscribeRequest",
"type": "object"
},
{
"properties": {
"id": {

View File

@@ -3498,6 +3498,17 @@
],
"type": "object"
},
"ThreadCatalogChangedNotification": {
"properties": {
"thread": {
"$ref": "#/definitions/ThreadSummary"
}
},
"required": [
"thread"
],
"type": "object"
},
"ThreadClosedNotification": {
"properties": {
"threadId": {
@@ -4677,6 +4688,118 @@
],
"type": "object"
},
"ThreadSummary": {
"properties": {
"agentNickname": {
"type": [
"string",
"null"
]
},
"agentRole": {
"type": [
"string",
"null"
]
},
"archivedAt": {
"format": "int64",
"type": [
"integer",
"null"
]
},
"cliVersion": {
"type": "string"
},
"createdAt": {
"format": "int64",
"type": "integer"
},
"cwd": {
"$ref": "#/definitions/AbsolutePathBuf"
},
"ephemeral": {
"type": "boolean"
},
"forkedFromId": {
"type": [
"string",
"null"
]
},
"gitInfo": {
"anyOf": [
{
"$ref": "#/definitions/GitInfo"
},
{
"type": "null"
}
]
},
"id": {
"type": "string"
},
"modelProvider": {
"type": "string"
},
"name": {
"type": [
"string",
"null"
]
},
"parentThreadId": {
"type": [
"string",
"null"
]
},
"path": {
"type": [
"string",
"null"
]
},
"preview": {
"type": "string"
},
"sessionId": {
"type": "string"
},
"source": {
"$ref": "#/definitions/SessionSource"
},
"threadSource": {
"anyOf": [
{
"$ref": "#/definitions/ThreadSource"
},
{
"type": "null"
}
]
},
"updatedAt": {
"format": "int64",
"type": "integer"
}
},
"required": [
"cliVersion",
"createdAt",
"cwd",
"ephemeral",
"id",
"modelProvider",
"preview",
"sessionId",
"source",
"updatedAt"
],
"type": "object"
},
"ThreadTokenUsage": {
"properties": {
"last": {
@@ -5369,6 +5492,26 @@
"title": "Thread/status/changedNotification",
"type": "object"
},
{
"properties": {
"method": {
"enum": [
"thread/catalog/changed"
],
"title": "Thread/catalog/changedNotificationMethod",
"type": "string"
},
"params": {
"$ref": "#/definitions/ThreadCatalogChangedNotification"
}
},
"required": [
"method",
"params"
],
"title": "Thread/catalog/changedNotification",
"type": "object"
},
{
"properties": {
"method": {

View File

@@ -612,6 +612,52 @@
"title": "Thread/listRequest",
"type": "object"
},
{
"properties": {
"id": {
"$ref": "#/definitions/v2/RequestId"
},
"method": {
"enum": [
"thread/catalog/subscribe"
],
"title": "Thread/catalog/subscribeRequestMethod",
"type": "string"
},
"params": {
"type": "null"
}
},
"required": [
"id",
"method"
],
"title": "Thread/catalog/subscribeRequest",
"type": "object"
},
{
"properties": {
"id": {
"$ref": "#/definitions/v2/RequestId"
},
"method": {
"enum": [
"thread/catalog/unsubscribe"
],
"title": "Thread/catalog/unsubscribeRequestMethod",
"type": "string"
},
"params": {
"type": "null"
}
},
"required": [
"id",
"method"
],
"title": "Thread/catalog/unsubscribeRequest",
"type": "object"
},
{
"properties": {
"id": {
@@ -4011,6 +4057,26 @@
"title": "Thread/status/changedNotification",
"type": "object"
},
{
"properties": {
"method": {
"enum": [
"thread/catalog/changed"
],
"title": "Thread/catalog/changedNotificationMethod",
"type": "string"
},
"params": {
"$ref": "#/definitions/v2/ThreadCatalogChangedNotification"
}
},
"required": [
"method",
"params"
],
"title": "Thread/catalog/changedNotification",
"type": "object"
},
{
"properties": {
"method": {
@@ -15687,6 +15753,29 @@
"title": "ThreadArchivedNotification",
"type": "object"
},
"ThreadCatalogChangedNotification": {
"$schema": "http://json-schema.org/draft-07/schema#",
"properties": {
"thread": {
"$ref": "#/definitions/v2/ThreadSummary"
}
},
"required": [
"thread"
],
"title": "ThreadCatalogChangedNotification",
"type": "object"
},
"ThreadCatalogSubscribeResponse": {
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "ThreadCatalogSubscribeResponse",
"type": "object"
},
"ThreadCatalogUnsubscribeResponse": {
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "ThreadCatalogUnsubscribeResponse",
"type": "object"
},
"ThreadClosedNotification": {
"$schema": "http://json-schema.org/draft-07/schema#",
"properties": {
@@ -18071,6 +18160,118 @@
"title": "ThreadStatusChangedNotification",
"type": "object"
},
"ThreadSummary": {
"properties": {
"agentNickname": {
"type": [
"string",
"null"
]
},
"agentRole": {
"type": [
"string",
"null"
]
},
"archivedAt": {
"format": "int64",
"type": [
"integer",
"null"
]
},
"cliVersion": {
"type": "string"
},
"createdAt": {
"format": "int64",
"type": "integer"
},
"cwd": {
"$ref": "#/definitions/v2/AbsolutePathBuf"
},
"ephemeral": {
"type": "boolean"
},
"forkedFromId": {
"type": [
"string",
"null"
]
},
"gitInfo": {
"anyOf": [
{
"$ref": "#/definitions/v2/GitInfo"
},
{
"type": "null"
}
]
},
"id": {
"type": "string"
},
"modelProvider": {
"type": "string"
},
"name": {
"type": [
"string",
"null"
]
},
"parentThreadId": {
"type": [
"string",
"null"
]
},
"path": {
"type": [
"string",
"null"
]
},
"preview": {
"type": "string"
},
"sessionId": {
"type": "string"
},
"source": {
"$ref": "#/definitions/v2/SessionSource"
},
"threadSource": {
"anyOf": [
{
"$ref": "#/definitions/v2/ThreadSource"
},
{
"type": "null"
}
]
},
"updatedAt": {
"format": "int64",
"type": "integer"
}
},
"required": [
"cliVersion",
"createdAt",
"cwd",
"ephemeral",
"id",
"modelProvider",
"preview",
"sessionId",
"source",
"updatedAt"
],
"type": "object"
},
"ThreadTokenUsage": {
"properties": {
"last": {

View File

@@ -1361,6 +1361,52 @@
"title": "Thread/listRequest",
"type": "object"
},
{
"properties": {
"id": {
"$ref": "#/definitions/RequestId"
},
"method": {
"enum": [
"thread/catalog/subscribe"
],
"title": "Thread/catalog/subscribeRequestMethod",
"type": "string"
},
"params": {
"type": "null"
}
},
"required": [
"id",
"method"
],
"title": "Thread/catalog/subscribeRequest",
"type": "object"
},
{
"properties": {
"id": {
"$ref": "#/definitions/RequestId"
},
"method": {
"enum": [
"thread/catalog/unsubscribe"
],
"title": "Thread/catalog/unsubscribeRequestMethod",
"type": "string"
},
"params": {
"type": "null"
}
},
"required": [
"id",
"method"
],
"title": "Thread/catalog/unsubscribeRequest",
"type": "object"
},
{
"properties": {
"id": {
@@ -11406,6 +11452,26 @@
"title": "Thread/status/changedNotification",
"type": "object"
},
{
"properties": {
"method": {
"enum": [
"thread/catalog/changed"
],
"title": "Thread/catalog/changedNotificationMethod",
"type": "string"
},
"params": {
"$ref": "#/definitions/ThreadCatalogChangedNotification"
}
},
"required": [
"method",
"params"
],
"title": "Thread/catalog/changedNotification",
"type": "object"
},
{
"properties": {
"method": {
@@ -13511,6 +13577,29 @@
"title": "ThreadArchivedNotification",
"type": "object"
},
"ThreadCatalogChangedNotification": {
"$schema": "http://json-schema.org/draft-07/schema#",
"properties": {
"thread": {
"$ref": "#/definitions/ThreadSummary"
}
},
"required": [
"thread"
],
"title": "ThreadCatalogChangedNotification",
"type": "object"
},
"ThreadCatalogSubscribeResponse": {
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "ThreadCatalogSubscribeResponse",
"type": "object"
},
"ThreadCatalogUnsubscribeResponse": {
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "ThreadCatalogUnsubscribeResponse",
"type": "object"
},
"ThreadClosedNotification": {
"$schema": "http://json-schema.org/draft-07/schema#",
"properties": {
@@ -15895,6 +15984,118 @@
"title": "ThreadStatusChangedNotification",
"type": "object"
},
"ThreadSummary": {
"properties": {
"agentNickname": {
"type": [
"string",
"null"
]
},
"agentRole": {
"type": [
"string",
"null"
]
},
"archivedAt": {
"format": "int64",
"type": [
"integer",
"null"
]
},
"cliVersion": {
"type": "string"
},
"createdAt": {
"format": "int64",
"type": "integer"
},
"cwd": {
"$ref": "#/definitions/AbsolutePathBuf"
},
"ephemeral": {
"type": "boolean"
},
"forkedFromId": {
"type": [
"string",
"null"
]
},
"gitInfo": {
"anyOf": [
{
"$ref": "#/definitions/GitInfo"
},
{
"type": "null"
}
]
},
"id": {
"type": "string"
},
"modelProvider": {
"type": "string"
},
"name": {
"type": [
"string",
"null"
]
},
"parentThreadId": {
"type": [
"string",
"null"
]
},
"path": {
"type": [
"string",
"null"
]
},
"preview": {
"type": "string"
},
"sessionId": {
"type": "string"
},
"source": {
"$ref": "#/definitions/SessionSource"
},
"threadSource": {
"anyOf": [
{
"$ref": "#/definitions/ThreadSource"
},
{
"type": "null"
}
]
},
"updatedAt": {
"format": "int64",
"type": "integer"
}
},
"required": [
"cliVersion",
"createdAt",
"cwd",
"ephemeral",
"id",
"modelProvider",
"preview",
"sessionId",
"source",
"updatedAt"
],
"type": "object"
},
"ThreadTokenUsage": {
"properties": {
"last": {

View File

@@ -0,0 +1,284 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"definitions": {
"AbsolutePathBuf": {
"description": "A path that is guaranteed to be absolute and normalized (though it is not guaranteed to be canonicalized or exist on the filesystem).\n\nIMPORTANT: When deserializing an `AbsolutePathBuf`, a base path must be set using [AbsolutePathBufGuard::new]. If no base path is set, the deserialization will fail unless the path being deserialized is already absolute.",
"type": "string"
},
"AgentPath": {
"type": "string"
},
"GitInfo": {
"properties": {
"branch": {
"type": [
"string",
"null"
]
},
"originUrl": {
"type": [
"string",
"null"
]
},
"sha": {
"type": [
"string",
"null"
]
}
},
"type": "object"
},
"SessionSource": {
"oneOf": [
{
"enum": [
"cli",
"vscode",
"exec",
"appServer",
"unknown"
],
"type": "string"
},
{
"additionalProperties": false,
"properties": {
"custom": {
"type": "string"
}
},
"required": [
"custom"
],
"title": "CustomSessionSource",
"type": "object"
},
{
"additionalProperties": false,
"properties": {
"subAgent": {
"$ref": "#/definitions/SubAgentSource"
}
},
"required": [
"subAgent"
],
"title": "SubAgentSessionSource",
"type": "object"
}
]
},
"SubAgentSource": {
"oneOf": [
{
"enum": [
"review",
"compact",
"memory_consolidation"
],
"type": "string"
},
{
"additionalProperties": false,
"properties": {
"thread_spawn": {
"properties": {
"agent_nickname": {
"default": null,
"type": [
"string",
"null"
]
},
"agent_path": {
"anyOf": [
{
"$ref": "#/definitions/AgentPath"
},
{
"type": "null"
}
],
"default": null
},
"agent_role": {
"default": null,
"type": [
"string",
"null"
]
},
"depth": {
"format": "int32",
"type": "integer"
},
"parent_thread_id": {
"$ref": "#/definitions/ThreadId"
}
},
"required": [
"depth",
"parent_thread_id"
],
"type": "object"
}
},
"required": [
"thread_spawn"
],
"title": "ThreadSpawnSubAgentSource",
"type": "object"
},
{
"additionalProperties": false,
"properties": {
"other": {
"type": "string"
}
},
"required": [
"other"
],
"title": "OtherSubAgentSource",
"type": "object"
}
]
},
"ThreadId": {
"type": "string"
},
"ThreadSource": {
"enum": [
"user",
"subagent",
"memory_consolidation"
],
"type": "string"
},
"ThreadSummary": {
"properties": {
"agentNickname": {
"type": [
"string",
"null"
]
},
"agentRole": {
"type": [
"string",
"null"
]
},
"archivedAt": {
"format": "int64",
"type": [
"integer",
"null"
]
},
"cliVersion": {
"type": "string"
},
"createdAt": {
"format": "int64",
"type": "integer"
},
"cwd": {
"$ref": "#/definitions/AbsolutePathBuf"
},
"ephemeral": {
"type": "boolean"
},
"forkedFromId": {
"type": [
"string",
"null"
]
},
"gitInfo": {
"anyOf": [
{
"$ref": "#/definitions/GitInfo"
},
{
"type": "null"
}
]
},
"id": {
"type": "string"
},
"modelProvider": {
"type": "string"
},
"name": {
"type": [
"string",
"null"
]
},
"parentThreadId": {
"type": [
"string",
"null"
]
},
"path": {
"type": [
"string",
"null"
]
},
"preview": {
"type": "string"
},
"sessionId": {
"type": "string"
},
"source": {
"$ref": "#/definitions/SessionSource"
},
"threadSource": {
"anyOf": [
{
"$ref": "#/definitions/ThreadSource"
},
{
"type": "null"
}
]
},
"updatedAt": {
"format": "int64",
"type": "integer"
}
},
"required": [
"cliVersion",
"createdAt",
"cwd",
"ephemeral",
"id",
"modelProvider",
"preview",
"sessionId",
"source",
"updatedAt"
],
"type": "object"
}
},
"properties": {
"thread": {
"$ref": "#/definitions/ThreadSummary"
}
},
"required": [
"thread"
],
"title": "ThreadCatalogChangedNotification",
"type": "object"
}

View File

@@ -0,0 +1,5 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "ThreadCatalogSubscribeResponse",
"type": "object"
}

View File

@@ -0,0 +1,5 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "ThreadCatalogUnsubscribeResponse",
"type": "object"
}

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@@ -0,0 +1,6 @@
// GENERATED CODE! DO NOT MODIFY BY HAND!
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
import type { ThreadSummary } from "./ThreadSummary";
export type ThreadCatalogChangedNotification = { thread: ThreadSummary, };

View File

@@ -0,0 +1,5 @@
// GENERATED CODE! DO NOT MODIFY BY HAND!
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
export type ThreadCatalogSubscribeResponse = Record<string, never>;

View File

@@ -0,0 +1,5 @@
// GENERATED CODE! DO NOT MODIFY BY HAND!
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
export type ThreadCatalogUnsubscribeResponse = Record<string, never>;

View File

@@ -0,0 +1,9 @@
// GENERATED CODE! DO NOT MODIFY BY HAND!
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
import type { AbsolutePathBuf } from "../AbsolutePathBuf";
import type { GitInfo } from "./GitInfo";
import type { SessionSource } from "./SessionSource";
import type { ThreadSource } from "./ThreadSource";
export type ThreadSummary = { id: string, sessionId: string, forkedFromId: string | null, parentThreadId: string | null, preview: string, ephemeral: boolean, modelProvider: string, createdAt: number, updatedAt: number, archivedAt: number | null, path: string | null, cwd: AbsolutePathBuf, cliVersion: string, source: SessionSource, threadSource: ThreadSource | null, agentNickname: string | null, agentRole: string | null, gitInfo: GitInfo | null, name: string | null, };

View File

@@ -363,6 +363,9 @@ export type { ThreadApproveGuardianDeniedActionResponse } from "./ThreadApproveG
export type { ThreadArchiveParams } from "./ThreadArchiveParams";
export type { ThreadArchiveResponse } from "./ThreadArchiveResponse";
export type { ThreadArchivedNotification } from "./ThreadArchivedNotification";
export type { ThreadCatalogChangedNotification } from "./ThreadCatalogChangedNotification";
export type { ThreadCatalogSubscribeResponse } from "./ThreadCatalogSubscribeResponse";
export type { ThreadCatalogUnsubscribeResponse } from "./ThreadCatalogUnsubscribeResponse";
export type { ThreadClosedNotification } from "./ThreadClosedNotification";
export type { ThreadCompactStartParams } from "./ThreadCompactStartParams";
export type { ThreadCompactStartResponse } from "./ThreadCompactStartResponse";
@@ -422,6 +425,7 @@ export type { ThreadStartSource } from "./ThreadStartSource";
export type { ThreadStartedNotification } from "./ThreadStartedNotification";
export type { ThreadStatus } from "./ThreadStatus";
export type { ThreadStatusChangedNotification } from "./ThreadStatusChangedNotification";
export type { ThreadSummary } from "./ThreadSummary";
export type { ThreadTokenUsage } from "./ThreadTokenUsage";
export type { ThreadTokenUsageUpdatedNotification } from "./ThreadTokenUsageUpdatedNotification";
export type { ThreadUnarchiveParams } from "./ThreadUnarchiveParams";

View File

@@ -569,6 +569,16 @@ client_request_definitions! {
serialization: None,
response: v2::ThreadListResponse,
},
ThreadCatalogSubscribe => "thread/catalog/subscribe" {
params: #[ts(type = "undefined")] #[serde(skip_serializing_if = "Option::is_none")] Option<()>,
serialization: None,
response: v2::ThreadCatalogSubscribeResponse,
},
ThreadCatalogUnsubscribe => "thread/catalog/unsubscribe" {
params: #[ts(type = "undefined")] #[serde(skip_serializing_if = "Option::is_none")] Option<()>,
serialization: None,
response: v2::ThreadCatalogUnsubscribeResponse,
},
#[experimental("thread/search")]
ThreadSearch => "thread/search" {
params: v2::ThreadSearchParams,
@@ -1482,6 +1492,7 @@ server_notification_definitions! {
Error => "error" (v2::ErrorNotification),
ThreadStarted => "thread/started" (v2::ThreadStartedNotification),
ThreadStatusChanged => "thread/status/changed" (v2::ThreadStatusChangedNotification),
ThreadCatalogChanged => "thread/catalog/changed" (v2::ThreadCatalogChangedNotification),
ThreadArchived => "thread/archived" (v2::ThreadArchivedNotification),
ThreadUnarchived => "thread/unarchived" (v2::ThreadUnarchivedNotification),
ThreadClosed => "thread/closed" (v2::ThreadClosedNotification),

View File

@@ -6,6 +6,7 @@ use super::SandboxPolicy;
use super::Thread;
use super::ThreadItem;
use super::ThreadSource;
use super::ThreadSummary;
use super::Turn;
use super::TurnEnvironmentParams;
use super::TurnItemsView;
@@ -998,6 +999,16 @@ pub struct ThreadListParams {
pub search_term: Option<String>,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct ThreadCatalogSubscribeResponse {}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct ThreadCatalogUnsubscribeResponse {}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
@@ -1320,6 +1331,13 @@ pub struct ThreadStatusChangedNotification {
pub status: ThreadStatus,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct ThreadCatalogChangedNotification {
pub thread: ThreadSummary,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]

View File

@@ -149,6 +149,34 @@ pub struct Thread {
pub turns: Vec<Turn>,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct ThreadSummary {
pub id: String,
pub session_id: String,
pub forked_from_id: Option<String>,
pub parent_thread_id: Option<String>,
pub preview: String,
pub ephemeral: bool,
pub model_provider: String,
#[ts(type = "number")]
pub created_at: i64,
#[ts(type = "number")]
pub updated_at: i64,
#[ts(type = "number | null")]
pub archived_at: Option<i64>,
pub path: Option<PathBuf>,
pub cwd: AbsolutePathBuf,
pub cli_version: String,
pub source: SessionSource,
pub thread_source: Option<ThreadSource>,
pub agent_nickname: Option<String>,
pub agent_role: Option<String>,
pub git_info: Option<GitInfo>,
pub name: Option<String>,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]

View File

@@ -369,6 +369,18 @@ Example:
When `nextCursor` is `null`, youve reached the final page.
`thread/catalog/subscribe` watches all future thread metadata changes without draining existing history. Install the subscription before calling `thread/list` when you need a race-free sidebar bootstrap; after the subscribe response arrives, buffer catalog notifications until the list response arrives, then apply the complete summaries idempotently and filter them client-side. The summary contains sidebar metadata such as id, preview/name, cwd, timestamps, archive state, git info, and source; it does not contain turns, items, messages, deltas, tool state, status, or runtime history.
```json
{ "method": "thread/catalog/subscribe", "id": 21 }
{ "id": 21, "result": {} }
{ "method": "thread/catalog/changed", "params": {
"thread": { "id": "thr_z", "preview": "Fix tests", "updatedAt": 1730832222, "archivedAt": null }
} }
```
The subscription is connection-local. Repeated subscribe and unsubscribe calls are harmless. Reconnecting clients should subscribe again and refetch the current page with `thread/list`; catalog subscriptions do not replay changes that happened while the connection was offline.
### Example: List loaded threads
`thread/loaded/list` returns thread ids currently loaded in memory. This is useful when you want to check which sessions are active without scanning rollouts on disk.

View File

@@ -334,6 +334,8 @@ impl MessageProcessor {
let pending_thread_unloads = Arc::new(Mutex::new(HashSet::new()));
let thread_watch_manager =
crate::thread_status::ThreadWatchManager::new_with_outgoing(outgoing.clone());
let thread_catalog_subscriptions =
crate::request_processors::ThreadCatalogSubscriptions::new(outgoing.clone());
let thread_list_state_permit = Arc::new(Semaphore::new(/*permits*/ 1));
let workspace_settings_cache =
Arc::new(workspace_settings::WorkspaceSettingsCache::default());
@@ -428,6 +430,7 @@ impl MessageProcessor {
Arc::clone(&pending_thread_unloads),
thread_state_manager.clone(),
thread_watch_manager.clone(),
thread_catalog_subscriptions.clone(),
Arc::clone(&thread_list_state_permit),
thread_goal_processor.clone(),
state_db,
@@ -442,8 +445,10 @@ impl MessageProcessor {
Arc::clone(&config),
config_manager.clone(),
pending_thread_unloads,
Arc::clone(&thread_store),
thread_state_manager,
thread_watch_manager,
thread_catalog_subscriptions,
thread_list_state_permit,
Arc::clone(&skills_watcher),
);
@@ -1084,6 +1089,16 @@ impl MessageProcessor {
ClientRequest::ThreadList { params, .. } => {
self.thread_processor.thread_list(params).await
}
ClientRequest::ThreadCatalogSubscribe { .. } => {
self.thread_processor
.thread_catalog_subscribe(connection_id)
.await
}
ClientRequest::ThreadCatalogUnsubscribe { .. } => {
self.thread_processor
.thread_catalog_unsubscribe(connection_id)
.await
}
ClientRequest::ThreadSearch { params, .. } => {
self.thread_processor.thread_search(params).await
}

View File

@@ -168,6 +168,9 @@ use codex_app_server_protocol::ThreadArchiveResponse;
use codex_app_server_protocol::ThreadArchivedNotification;
use codex_app_server_protocol::ThreadBackgroundTerminalsCleanParams;
use codex_app_server_protocol::ThreadBackgroundTerminalsCleanResponse;
use codex_app_server_protocol::ThreadCatalogChangedNotification;
use codex_app_server_protocol::ThreadCatalogSubscribeResponse;
use codex_app_server_protocol::ThreadCatalogUnsubscribeResponse;
use codex_app_server_protocol::ThreadClosedNotification;
use codex_app_server_protocol::ThreadCompactStartParams;
use codex_app_server_protocol::ThreadCompactStartResponse;
@@ -234,6 +237,7 @@ use codex_app_server_protocol::ThreadStartParams;
use codex_app_server_protocol::ThreadStartResponse;
use codex_app_server_protocol::ThreadStartedNotification;
use codex_app_server_protocol::ThreadStatus;
use codex_app_server_protocol::ThreadSummary;
use codex_app_server_protocol::ThreadTurnsItemsListParams;
use codex_app_server_protocol::ThreadTurnsListParams;
use codex_app_server_protocol::ThreadTurnsListResponse;
@@ -470,6 +474,7 @@ mod plugins;
mod process_exec_processor;
mod remote_control_processor;
mod search;
mod thread_catalog_subscription;
mod thread_processor;
mod token_usage_replay;
mod turn_processor;
@@ -492,6 +497,7 @@ pub(crate) use plugins::PluginRequestProcessor;
pub(crate) use process_exec_processor::ProcessExecRequestProcessor;
pub(crate) use remote_control_processor::RemoteControlRequestProcessor;
pub(crate) use search::SearchRequestProcessor;
pub(crate) use thread_catalog_subscription::ThreadCatalogSubscriptions;
pub(crate) use thread_goal_processor::ThreadGoalRequestProcessor;
pub(crate) use thread_processor::ThreadRequestProcessor;
pub(crate) use turn_processor::TurnRequestProcessor;

View File

@@ -0,0 +1,84 @@
use super::*;
#[derive(Clone)]
pub(crate) struct ThreadCatalogSubscriptions {
outgoing: Arc<OutgoingMessageSender>,
connection_ids: Arc<Mutex<HashSet<ConnectionId>>>,
}
impl ThreadCatalogSubscriptions {
pub(crate) fn new(outgoing: Arc<OutgoingMessageSender>) -> Self {
Self {
outgoing,
connection_ids: Arc::new(Mutex::new(HashSet::new())),
}
}
pub(super) async fn subscribe(
&self,
connection_id: ConnectionId,
) -> ThreadCatalogSubscribeResponse {
self.connection_ids.lock().await.insert(connection_id);
ThreadCatalogSubscribeResponse {}
}
pub(super) async fn unsubscribe(
&self,
connection_id: ConnectionId,
) -> ThreadCatalogUnsubscribeResponse {
self.connection_ids.lock().await.remove(&connection_id);
ThreadCatalogUnsubscribeResponse {}
}
pub(super) async fn connection_closed(&self, connection_id: ConnectionId) {
self.connection_ids.lock().await.remove(&connection_id);
}
pub(super) async fn publish_thread_summary(&self, thread: ThreadSummary) {
let connection_ids = self
.connection_ids
.lock()
.await
.iter()
.copied()
.collect::<Vec<_>>();
if connection_ids.is_empty() {
return;
}
self.outgoing
.send_server_notification_to_connections(
&connection_ids,
ServerNotification::ThreadCatalogChanged(ThreadCatalogChangedNotification {
thread,
}),
)
.await;
}
pub(super) async fn publish_thread_change(
&self,
thread_store: &Arc<dyn ThreadStore>,
thread_id: ThreadId,
fallback_provider: &str,
fallback_cwd: &AbsolutePathBuf,
) {
let stored_thread = match thread_store
.read_thread(StoreReadThreadParams {
thread_id,
include_archived: true,
include_history: false,
})
.await
{
Ok(stored_thread) => stored_thread,
Err(ThreadStoreError::ThreadNotFound { .. }) => return,
Err(err) => {
warn!("failed to read thread {thread_id} for catalog notification: {err}");
return;
}
};
let summary =
thread_summary_from_stored_thread(stored_thread, fallback_provider, fallback_cwd);
self.publish_thread_summary(summary).await;
}
}

View File

@@ -8,9 +8,12 @@ pub(super) struct ListenerTaskContext {
pub(super) thread_state_manager: ThreadStateManager,
pub(super) outgoing: Arc<OutgoingMessageSender>,
pub(super) pending_thread_unloads: Arc<Mutex<HashSet<ThreadId>>>,
pub(super) thread_store: Arc<dyn ThreadStore>,
pub(super) thread_watch_manager: ThreadWatchManager,
pub(super) thread_catalog_subscriptions: ThreadCatalogSubscriptions,
pub(super) thread_list_state_permit: Arc<Semaphore>,
pub(super) fallback_model_provider: String,
pub(super) fallback_cwd: AbsolutePathBuf,
pub(super) codex_home: PathBuf,
pub(super) skills_watcher: Arc<SkillsWatcher>,
}
@@ -119,6 +122,17 @@ impl UnloadingState {
}
}
fn catalog_worthy_event(event: &EventMsg) -> bool {
matches!(
event,
EventMsg::TurnStarted(_)
| EventMsg::TurnComplete(_)
| EventMsg::TurnAborted(_)
| EventMsg::ThreadSettingsApplied(_)
| EventMsg::ThreadRolledBack(_)
)
}
pub(super) enum ThreadShutdownResult {
Complete,
SubmitFailed,
@@ -256,9 +270,12 @@ pub(super) async fn ensure_listener_task_running(
thread_manager,
thread_state_manager,
pending_thread_unloads,
thread_store,
thread_watch_manager,
thread_catalog_subscriptions,
thread_list_state_permit,
fallback_model_provider,
fallback_cwd,
codex_home,
..
} = listener_task_context;
@@ -339,6 +356,16 @@ pub(super) async fn ensure_listener_task_running(
fallback_model_provider.clone(),
)
.await;
if catalog_worthy_event(&event.msg) {
thread_catalog_subscriptions
.publish_thread_change(
&thread_store,
conversation_id,
fallback_model_provider.as_str(),
&fallback_cwd,
)
.await;
}
}
unloading_watchers_open = unloading_state.wait_for_unloading_trigger() => {
if !unloading_watchers_open {

View File

@@ -330,6 +330,7 @@ pub(crate) struct ThreadRequestProcessor {
pub(super) pending_thread_unloads: Arc<Mutex<HashSet<ThreadId>>>,
pub(super) thread_state_manager: ThreadStateManager,
pub(super) thread_watch_manager: ThreadWatchManager,
pub(super) thread_catalog_subscriptions: ThreadCatalogSubscriptions,
pub(super) thread_list_state_permit: Arc<Semaphore>,
pub(super) thread_goal_processor: ThreadGoalRequestProcessor,
pub(super) state_db: Option<StateDbHandle>,
@@ -350,6 +351,7 @@ impl ThreadRequestProcessor {
pending_thread_unloads: Arc<Mutex<HashSet<ThreadId>>>,
thread_state_manager: ThreadStateManager,
thread_watch_manager: ThreadWatchManager,
thread_catalog_subscriptions: ThreadCatalogSubscriptions,
thread_list_state_permit: Arc<Semaphore>,
thread_goal_processor: ThreadGoalRequestProcessor,
state_db: Option<StateDbHandle>,
@@ -366,6 +368,7 @@ impl ThreadRequestProcessor {
pending_thread_unloads,
thread_state_manager,
thread_watch_manager,
thread_catalog_subscriptions,
thread_list_state_permit,
thread_goal_processor,
state_db,
@@ -448,9 +451,12 @@ impl ThreadRequestProcessor {
.send_response(request_id.clone(), response)
.await;
for thread_id in archived_thread_ids {
self.publish_thread_catalog_change(thread_id).await;
self.outgoing
.send_server_notification(ServerNotification::ThreadArchived(
ThreadArchivedNotification { thread_id },
ThreadArchivedNotification {
thread_id: thread_id.to_string(),
},
))
.await;
}
@@ -489,6 +495,9 @@ impl ThreadRequestProcessor {
.send_response(request_id.clone(), response)
.await;
if let Some(notification) = notification {
if let Ok(thread_id) = ThreadId::from_string(&notification.thread_id) {
self.publish_thread_catalog_change(thread_id).await;
}
self.outgoing
.send_server_notification(ServerNotification::ThreadNameUpdated(
notification,
@@ -505,9 +514,12 @@ impl ThreadRequestProcessor {
&self,
params: ThreadMetadataUpdateParams,
) -> Result<Option<ClientResponsePayload>, JSONRPCErrorError> {
self.thread_metadata_update_response_inner(params)
.await
.map(|response| Some(response.into()))
let thread_id = ThreadId::from_string(&params.thread_id).ok();
let response = self.thread_metadata_update_response_inner(params).await?;
if let Some(thread_id) = thread_id {
self.publish_thread_catalog_change(thread_id).await;
}
Ok(Some(response.into()))
}
pub(crate) async fn thread_memory_mode_set(
@@ -537,6 +549,9 @@ impl ThreadRequestProcessor {
self.outgoing
.send_response(request_id.clone(), response)
.await;
if let Ok(thread_id) = ThreadId::from_string(&notification.thread_id) {
self.publish_thread_catalog_change(thread_id).await;
}
self.outgoing
.send_server_notification(ServerNotification::ThreadUnarchived(notification))
.await;
@@ -585,6 +600,30 @@ impl ThreadRequestProcessor {
.map(|response| Some(response.into()))
}
pub(crate) async fn thread_catalog_subscribe(
&self,
connection_id: ConnectionId,
) -> Result<Option<ClientResponsePayload>, JSONRPCErrorError> {
Ok(Some(
self.thread_catalog_subscriptions
.subscribe(connection_id)
.await
.into(),
))
}
pub(crate) async fn thread_catalog_unsubscribe(
&self,
connection_id: ConnectionId,
) -> Result<Option<ClientResponsePayload>, JSONRPCErrorError> {
Ok(Some(
self.thread_catalog_subscriptions
.unsubscribe(connection_id)
.await
.into(),
))
}
pub(crate) async fn thread_search(
&self,
params: ThreadSearchParams,
@@ -777,9 +816,12 @@ impl ThreadRequestProcessor {
thread_state_manager: self.thread_state_manager.clone(),
outgoing: Arc::clone(&self.outgoing),
pending_thread_unloads: Arc::clone(&self.pending_thread_unloads),
thread_store: Arc::clone(&self.thread_store),
thread_watch_manager: self.thread_watch_manager.clone(),
thread_catalog_subscriptions: self.thread_catalog_subscriptions.clone(),
thread_list_state_permit: self.thread_list_state_permit.clone(),
fallback_model_provider: self.config.model_provider_id.clone(),
fallback_cwd: self.config.cwd.clone(),
codex_home: self.config.codex_home.to_path_buf(),
skills_watcher: Arc::clone(&self.skills_watcher),
}
@@ -872,9 +914,12 @@ impl ThreadRequestProcessor {
thread_state_manager: self.thread_state_manager.clone(),
outgoing: Arc::clone(&self.outgoing),
pending_thread_unloads: Arc::clone(&self.pending_thread_unloads),
thread_store: Arc::clone(&self.thread_store),
thread_watch_manager: self.thread_watch_manager.clone(),
thread_catalog_subscriptions: self.thread_catalog_subscriptions.clone(),
thread_list_state_permit: self.thread_list_state_permit.clone(),
fallback_model_provider: self.config.model_provider_id.clone(),
fallback_cwd: self.config.cwd.clone(),
codex_home: self.config.codex_home.to_path_buf(),
skills_watcher: Arc::clone(&self.skills_watcher),
};
@@ -1275,7 +1320,7 @@ impl ThreadRequestProcessor {
async fn thread_archive_inner(
&self,
params: ThreadArchiveParams,
) -> Result<(ThreadArchiveResponse, Vec<String>), JSONRPCErrorError> {
) -> Result<(ThreadArchiveResponse, Vec<ThreadId>), JSONRPCErrorError> {
let _thread_list_state_permit = self.acquire_thread_list_state_permit().await?;
self.thread_archive_response(params).await
}
@@ -1283,7 +1328,7 @@ impl ThreadRequestProcessor {
async fn thread_archive_response(
&self,
params: ThreadArchiveParams,
) -> Result<(ThreadArchiveResponse, Vec<String>), JSONRPCErrorError> {
) -> Result<(ThreadArchiveResponse, Vec<ThreadId>), JSONRPCErrorError> {
let thread_id = ThreadId::from_string(&params.thread_id)
.map_err(|err| invalid_request(format!("invalid session id: {err}")))?;
@@ -1360,7 +1405,7 @@ impl ThreadRequestProcessor {
.await
{
Ok(()) => {
archived_thread_ids.push(parent_thread_id.to_string());
archived_thread_ids.push(*parent_thread_id);
}
Err(err) => return Err(thread_store_archive_error("archive", err)),
}
@@ -1375,7 +1420,7 @@ impl ThreadRequestProcessor {
.await
{
Ok(()) => {
archived_thread_ids.push(descendant_thread_id.to_string());
archived_thread_ids.push(descendant_thread_id);
}
Err(err) => {
warn!(
@@ -1851,6 +1896,17 @@ impl ThreadRequestProcessor {
})
}
async fn publish_thread_catalog_change(&self, thread_id: ThreadId) {
self.thread_catalog_subscriptions
.publish_thread_change(
&self.thread_store,
thread_id,
self.config.model_provider_id.as_str(),
&self.config.cwd,
)
.await;
}
async fn thread_search_response_inner(
&self,
params: ThreadSearchParams,
@@ -2333,6 +2389,9 @@ impl ThreadRequestProcessor {
}
pub(crate) async fn connection_closed(&self, connection_id: ConnectionId) {
self.thread_catalog_subscriptions
.connection_closed(connection_id)
.await;
let thread_ids = self
.thread_state_manager
.remove_connection(connection_id)
@@ -3373,6 +3432,8 @@ impl ThreadRequestProcessor {
.await;
}
self.publish_thread_catalog_change(thread_id).await;
self.outgoing
.send_server_notification(ServerNotification::ThreadStarted(notif))
.await;

View File

@@ -264,6 +264,36 @@ pub(super) fn thread_started_notification(mut thread: Thread) -> ThreadStartedNo
ThreadStartedNotification { thread }
}
pub(super) fn thread_summary_from_stored_thread(
thread: StoredThread,
fallback_provider: &str,
fallback_cwd: &AbsolutePathBuf,
) -> ThreadSummary {
let archived_at = thread.archived_at.map(|dt| dt.timestamp());
let (thread, _) = thread_from_stored_thread(thread, fallback_provider, fallback_cwd);
ThreadSummary {
id: thread.id,
session_id: thread.session_id,
forked_from_id: thread.forked_from_id,
parent_thread_id: thread.parent_thread_id,
preview: thread.preview,
ephemeral: thread.ephemeral,
model_provider: thread.model_provider,
created_at: thread.created_at,
updated_at: thread.updated_at,
archived_at,
path: thread.path,
cwd: thread.cwd,
cli_version: thread.cli_version,
source: thread.source,
thread_source: thread.thread_source,
agent_nickname: thread.agent_nickname,
agent_role: thread.agent_role,
git_info: thread.git_info,
name: thread.name,
}
}
#[cfg(test)]
pub(crate) fn summary_to_thread(
summary: ConversationSummary,

View File

@@ -12,8 +12,10 @@ pub(crate) struct TurnRequestProcessor {
config: Arc<Config>,
config_manager: ConfigManager,
pending_thread_unloads: Arc<Mutex<HashSet<ThreadId>>>,
thread_store: Arc<dyn ThreadStore>,
thread_state_manager: ThreadStateManager,
thread_watch_manager: ThreadWatchManager,
thread_catalog_subscriptions: ThreadCatalogSubscriptions,
thread_list_state_permit: Arc<Semaphore>,
skills_watcher: Arc<SkillsWatcher>,
}
@@ -82,8 +84,10 @@ impl TurnRequestProcessor {
config: Arc<Config>,
config_manager: ConfigManager,
pending_thread_unloads: Arc<Mutex<HashSet<ThreadId>>>,
thread_store: Arc<dyn ThreadStore>,
thread_state_manager: ThreadStateManager,
thread_watch_manager: ThreadWatchManager,
thread_catalog_subscriptions: ThreadCatalogSubscriptions,
thread_list_state_permit: Arc<Semaphore>,
skills_watcher: Arc<SkillsWatcher>,
) -> Self {
@@ -96,8 +100,10 @@ impl TurnRequestProcessor {
config,
config_manager,
pending_thread_unloads,
thread_store,
thread_state_manager,
thread_watch_manager,
thread_catalog_subscriptions,
thread_list_state_permit,
skills_watcher,
}
@@ -1273,9 +1279,12 @@ impl TurnRequestProcessor {
thread_state_manager: self.thread_state_manager.clone(),
outgoing: Arc::clone(&self.outgoing),
pending_thread_unloads: Arc::clone(&self.pending_thread_unloads),
thread_store: Arc::clone(&self.thread_store),
thread_watch_manager: self.thread_watch_manager.clone(),
thread_catalog_subscriptions: self.thread_catalog_subscriptions.clone(),
thread_list_state_permit: self.thread_list_state_permit.clone(),
fallback_model_provider: self.config.model_provider_id.clone(),
fallback_cwd: self.config.cwd.clone(),
codex_home: self.config.codex_home.to_path_buf(),
skills_watcher: Arc::clone(&self.skills_watcher),
}

View File

@@ -48,6 +48,7 @@ mod review;
mod safety_check_downgrade;
mod skills_list;
mod thread_archive;
mod thread_catalog_subscription;
mod thread_fork;
mod thread_inject_items;
mod thread_list;

View File

@@ -0,0 +1,125 @@
use anyhow::Result;
use app_test_support::TestAppServer;
use app_test_support::create_fake_rollout;
use app_test_support::to_response;
use app_test_support::write_mock_responses_config_toml;
use codex_app_server_protocol::JSONRPCNotification;
use codex_app_server_protocol::JSONRPCResponse;
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::ThreadCatalogChangedNotification;
use codex_app_server_protocol::ThreadListParams;
use codex_app_server_protocol::ThreadListResponse;
use codex_app_server_protocol::ThreadSetNameParams;
use codex_app_server_protocol::ThreadSetNameResponse;
use codex_app_server_protocol::ThreadSortKey;
use pretty_assertions::assert_eq;
use serde::de::DeserializeOwned;
use tempfile::TempDir;
use tokio::time::Duration;
use tokio::time::timeout;
const DEFAULT_READ_TIMEOUT: Duration = Duration::from_secs(10);
#[tokio::test]
async fn catalog_subscription_reports_thread_outside_loaded_page() -> Result<()> {
let codex_home = TempDir::new()?;
write_mock_responses_config_toml(
codex_home.path(),
"http://localhost:1",
&Default::default(),
i64::MAX,
/*requires_openai_auth*/ None,
"mock_provider",
"",
)?;
let older_thread_id = create_fake_rollout(
codex_home.path(),
"2025-01-05T12-00-00",
"2025-01-05T12:00:00Z",
"Older thread",
Some("mock_provider"),
/*git_info*/ None,
)?;
let newer_thread_id = create_fake_rollout(
codex_home.path(),
"2025-01-05T12-05-00",
"2025-01-05T12:05:00Z",
"Newer thread",
Some("mock_provider"),
/*git_info*/ None,
)?;
let mut app = TestAppServer::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, app.initialize()).await??;
let subscribe_id = app
.send_raw_request("thread/catalog/subscribe", /*params*/ None)
.await?;
let _: codex_app_server_protocol::ThreadCatalogSubscribeResponse =
read_response(&mut app, subscribe_id).await?;
let list_id = app
.send_thread_list_request(ThreadListParams {
cursor: None,
limit: Some(1),
sort_key: Some(ThreadSortKey::CreatedAt),
sort_direction: None,
model_providers: Some(vec!["mock_provider".to_string()]),
source_kinds: None,
archived: None,
cwd: None,
use_state_db_only: false,
search_term: None,
})
.await?;
let page = read_response::<ThreadListResponse>(&mut app, list_id).await?;
assert_eq!(page.data.len(), 1);
assert_eq!(page.data[0].id, newer_thread_id);
rename_thread(
&mut app,
older_thread_id.clone(),
"Renamed outside first page",
)
.await?;
let notification: JSONRPCNotification = timeout(
DEFAULT_READ_TIMEOUT,
app.read_stream_until_notification_message("thread/catalog/changed"),
)
.await??;
let raw_params = notification
.params
.expect("thread/catalog/changed should have params");
assert_eq!(raw_params["thread"].get("turns"), None);
let changed: ThreadCatalogChangedNotification = serde_json::from_value(raw_params)?;
assert_eq!(changed.thread.id, older_thread_id);
assert_eq!(
changed.thread.name.as_deref(),
Some("Renamed outside first page")
);
assert_eq!(changed.thread.archived_at, None);
Ok(())
}
async fn rename_thread(app: &mut TestAppServer, thread_id: String, name: &str) -> Result<()> {
let rename_id = app
.send_thread_set_name_request(ThreadSetNameParams {
thread_id,
name: name.to_string(),
})
.await?;
let _: ThreadSetNameResponse = read_response(app, rename_id).await?;
Ok(())
}
async fn read_response<T: DeserializeOwned>(app: &mut TestAppServer, id: i64) -> Result<T> {
let response: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
app.read_stream_until_response_message(RequestId::Integer(id)),
)
.await??;
to_response(response)
}