[codex] Add remote thread store implementation (#17826)

- Add a "remote" thread store implementation
- Implement the remote thread store as a thin wrapper that makes grpc
calls to a configurable service endpoint
- Implement only the thread/list method to start
- Encode the grpc method/param shape as protobufs in the remote
implementation

A wart: the proto generation script is an "example" binary target. This
is an example target only because Cargo lets examples use
dev-dependencies, which keeps tonic-prost-build out of the normal
codex-thread-store dependency surface. A regular bin would either need
to add proto generation deps as normal runtime deps, or use a
feature-gated optional dep, which this repo’s manifest checks explicitly
reject.
This commit is contained in:
Tom
2026-04-16 10:15:31 -07:00
committed by GitHub
parent baaf42b2e4
commit 6e72f0dbfd
13 changed files with 1307 additions and 4 deletions

View File

@@ -7,6 +7,7 @@
mod error;
mod local;
mod recorder;
mod remote;
mod store;
mod types;
@@ -14,6 +15,7 @@ pub use error::ThreadStoreError;
pub use error::ThreadStoreResult;
pub use local::LocalThreadStore;
pub use recorder::ThreadRecorder;
pub use remote::RemoteThreadStore;
pub use store::ThreadStore;
pub use types::AppendThreadItemsParams;
pub use types::ArchiveThreadParams;

View File

@@ -0,0 +1,13 @@
# Remote Thread Store
- The Rust protobuf output in `proto/codex.thread_store.v1.rs` is checked in.
- Do not add build-time protobuf generation to `codex-thread-store` unless the Bazel/Cargo story is intentionally changed.
- When `proto/codex.thread_store.v1.proto` changes, regenerate the Rust file manually and include both files in the same commit.
Run this from the repository root:
```sh
./codex-rs/thread-store/scripts/generate-proto.sh
```
The command requires `protoc` to be available on `PATH`.

View File

@@ -0,0 +1,255 @@
use std::path::PathBuf;
use std::str::FromStr;
use chrono::DateTime;
use chrono::Utc;
use codex_git_utils::GitSha;
use codex_protocol::AgentPath;
use codex_protocol::ThreadId;
use codex_protocol::openai_models::ReasoningEffort;
use codex_protocol::protocol::AskForApproval;
use codex_protocol::protocol::GitInfo;
use codex_protocol::protocol::SandboxPolicy;
use codex_protocol::protocol::SessionSource;
use codex_protocol::protocol::SubAgentSource;
use super::proto;
use crate::StoredThread;
use crate::ThreadSortKey;
use crate::ThreadStoreError;
use crate::ThreadStoreResult;
pub(super) fn remote_status_to_error(status: tonic::Status) -> ThreadStoreError {
match status.code() {
tonic::Code::InvalidArgument => ThreadStoreError::InvalidRequest {
message: status.message().to_string(),
},
tonic::Code::AlreadyExists | tonic::Code::FailedPrecondition | tonic::Code::Aborted => {
ThreadStoreError::Conflict {
message: status.message().to_string(),
}
}
_ => ThreadStoreError::Internal {
message: format!("remote thread store request failed: {status}"),
},
}
}
pub(super) fn proto_sort_key(sort_key: ThreadSortKey) -> proto::ThreadSortKey {
match sort_key {
ThreadSortKey::CreatedAt => proto::ThreadSortKey::CreatedAt,
ThreadSortKey::UpdatedAt => proto::ThreadSortKey::UpdatedAt,
}
}
pub(super) fn proto_session_source(source: &SessionSource) -> proto::SessionSource {
match source {
SessionSource::Cli => proto_source(proto::SessionSourceKind::Cli),
SessionSource::VSCode => proto_source(proto::SessionSourceKind::Vscode),
SessionSource::Exec => proto_source(proto::SessionSourceKind::Exec),
SessionSource::Mcp => proto_source(proto::SessionSourceKind::AppServer),
SessionSource::Custom(custom) => proto::SessionSource {
kind: proto::SessionSourceKind::Custom.into(),
custom: Some(custom.clone()),
..Default::default()
},
SessionSource::SubAgent(SubAgentSource::Review) => {
proto_source(proto::SessionSourceKind::SubAgentReview)
}
SessionSource::SubAgent(SubAgentSource::Compact) => {
proto_source(proto::SessionSourceKind::SubAgentCompact)
}
SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
parent_thread_id,
depth,
agent_path,
agent_nickname,
agent_role,
}) => proto::SessionSource {
kind: proto::SessionSourceKind::SubAgentThreadSpawn.into(),
sub_agent_parent_thread_id: Some(parent_thread_id.to_string()),
sub_agent_depth: Some(*depth),
sub_agent_path: agent_path.as_ref().map(|path| path.as_str().to_string()),
sub_agent_nickname: agent_nickname.clone(),
sub_agent_role: agent_role.clone(),
..Default::default()
},
SessionSource::SubAgent(SubAgentSource::MemoryConsolidation) => {
proto_source(proto::SessionSourceKind::SubAgentMemoryConsolidation)
}
SessionSource::SubAgent(SubAgentSource::Other(other)) => proto::SessionSource {
kind: proto::SessionSourceKind::SubAgentOther.into(),
sub_agent_other: Some(other.clone()),
..Default::default()
},
SessionSource::Unknown => proto_source(proto::SessionSourceKind::Unknown),
}
}
fn proto_source(kind: proto::SessionSourceKind) -> proto::SessionSource {
proto::SessionSource {
kind: kind.into(),
..Default::default()
}
}
pub(super) fn stored_thread_from_proto(
thread: proto::StoredThread,
) -> ThreadStoreResult<StoredThread> {
// Keep this mapping boring: the proto mirrors StoredThread for remote-readable
// summary fields, except for Rust domain types that cross gRPC as stable scalar
// values. Local-only fields such as rollout_path intentionally stay local.
let source = thread
.source
.as_ref()
.map(session_source_from_proto)
.transpose()?
.unwrap_or(SessionSource::Unknown);
let thread_id = ThreadId::from_string(&thread.thread_id).map_err(|err| {
ThreadStoreError::InvalidRequest {
message: format!("remote thread store returned invalid thread_id: {err}"),
}
})?;
let forked_from_id = thread
.forked_from_id
.as_deref()
.map(ThreadId::from_string)
.transpose()
.map_err(|err| ThreadStoreError::InvalidRequest {
message: format!("remote thread store returned invalid forked_from_id: {err}"),
})?;
Ok(StoredThread {
thread_id,
rollout_path: None,
forked_from_id,
preview: thread.preview,
name: thread.name,
model_provider: thread.model_provider,
model: thread.model,
reasoning_effort: thread
.reasoning_effort
.as_deref()
.map(parse_reasoning_effort)
.transpose()?,
created_at: datetime_from_unix(thread.created_at)?,
updated_at: datetime_from_unix(thread.updated_at)?,
archived_at: thread.archived_at.map(datetime_from_unix).transpose()?,
cwd: PathBuf::from(thread.cwd),
cli_version: thread.cli_version,
source,
agent_nickname: thread.agent_nickname,
agent_role: thread.agent_role,
agent_path: thread.agent_path,
git_info: thread.git_info.map(git_info_from_proto),
approval_mode: AskForApproval::OnRequest,
sandbox_policy: SandboxPolicy::new_read_only_policy(),
token_usage: None,
first_user_message: thread.first_user_message,
history: None,
})
}
#[cfg(test)]
pub(super) fn stored_thread_to_proto(thread: StoredThread) -> proto::StoredThread {
proto::StoredThread {
thread_id: thread.thread_id.to_string(),
forked_from_id: thread.forked_from_id.map(|thread_id| thread_id.to_string()),
preview: thread.preview,
name: thread.name,
model_provider: thread.model_provider,
model: thread.model,
created_at: thread.created_at.timestamp(),
updated_at: thread.updated_at.timestamp(),
archived_at: thread.archived_at.map(|timestamp| timestamp.timestamp()),
cwd: thread.cwd.to_string_lossy().into_owned(),
cli_version: thread.cli_version,
source: Some(proto_session_source(&thread.source)),
git_info: thread.git_info.map(git_info_to_proto),
agent_nickname: thread.agent_nickname,
agent_role: thread.agent_role,
agent_path: thread.agent_path,
reasoning_effort: thread.reasoning_effort.map(|effort| effort.to_string()),
first_user_message: thread.first_user_message,
}
}
fn datetime_from_unix(timestamp: i64) -> ThreadStoreResult<DateTime<Utc>> {
DateTime::from_timestamp(timestamp, 0).ok_or_else(|| ThreadStoreError::InvalidRequest {
message: format!("remote thread store returned invalid timestamp: {timestamp}"),
})
}
fn session_source_from_proto(source: &proto::SessionSource) -> ThreadStoreResult<SessionSource> {
let kind = proto::SessionSourceKind::try_from(source.kind).unwrap_or_default();
Ok(match kind {
proto::SessionSourceKind::Unknown => SessionSource::Unknown,
proto::SessionSourceKind::Cli => SessionSource::Cli,
proto::SessionSourceKind::Vscode => SessionSource::VSCode,
proto::SessionSourceKind::Exec => SessionSource::Exec,
proto::SessionSourceKind::AppServer => SessionSource::Mcp,
proto::SessionSourceKind::Custom => {
SessionSource::Custom(source.custom.clone().unwrap_or_default())
}
proto::SessionSourceKind::SubAgentReview => SessionSource::SubAgent(SubAgentSource::Review),
proto::SessionSourceKind::SubAgentCompact => {
SessionSource::SubAgent(SubAgentSource::Compact)
}
proto::SessionSourceKind::SubAgentThreadSpawn => {
let parent_thread_id = source
.sub_agent_parent_thread_id
.as_deref()
.map(ThreadId::from_string)
.transpose()
.map_err(|err| ThreadStoreError::InvalidRequest {
message: format!(
"remote thread store returned invalid sub-agent parent thread id: {err}"
),
})?
.ok_or_else(|| ThreadStoreError::InvalidRequest {
message: "remote thread store omitted sub-agent parent thread id".to_string(),
})?;
SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
parent_thread_id,
depth: source.sub_agent_depth.unwrap_or_default(),
agent_path: source
.sub_agent_path
.clone()
.map(AgentPath::from_string)
.transpose()
.map_err(|message| ThreadStoreError::InvalidRequest { message })?,
agent_nickname: source.sub_agent_nickname.clone(),
agent_role: source.sub_agent_role.clone(),
})
}
proto::SessionSourceKind::SubAgentMemoryConsolidation => {
SessionSource::SubAgent(SubAgentSource::MemoryConsolidation)
}
proto::SessionSourceKind::SubAgentOther => SessionSource::SubAgent(SubAgentSource::Other(
source.sub_agent_other.clone().unwrap_or_default(),
)),
})
}
fn git_info_from_proto(info: proto::GitInfo) -> GitInfo {
GitInfo {
commit_hash: info.sha.as_deref().map(GitSha::new),
branch: info.branch,
repository_url: info.origin_url,
}
}
#[cfg(test)]
fn git_info_to_proto(info: GitInfo) -> proto::GitInfo {
proto::GitInfo {
sha: info.commit_hash.map(|sha| sha.0),
branch: info.branch,
origin_url: info.repository_url,
}
}
fn parse_reasoning_effort(value: &str) -> ThreadStoreResult<ReasoningEffort> {
ReasoningEffort::from_str(value).map_err(|message| ThreadStoreError::InvalidRequest {
message: format!("remote thread store returned {message}"),
})
}

View File

@@ -0,0 +1,243 @@
use super::RemoteThreadStore;
use super::helpers::proto_session_source;
use super::helpers::proto_sort_key;
use super::helpers::remote_status_to_error;
use super::helpers::stored_thread_from_proto;
use super::proto;
use crate::ListThreadsParams;
use crate::ThreadPage;
use crate::ThreadStoreError;
use crate::ThreadStoreResult;
pub(super) async fn list_threads(
store: &RemoteThreadStore,
params: ListThreadsParams,
) -> ThreadStoreResult<ThreadPage> {
let request = proto::ListThreadsRequest {
page_size: params
.page_size
.try_into()
.map_err(|_| ThreadStoreError::InvalidRequest {
message: format!("page_size is too large: {}", params.page_size),
})?,
cursor: params.cursor,
sort_key: proto_sort_key(params.sort_key).into(),
allowed_sources: params
.allowed_sources
.iter()
.map(proto_session_source)
.collect(),
model_provider_filter: params
.model_providers
.map(|values| proto::ModelProviderFilter { values }),
archived: params.archived,
search_term: params.search_term,
};
let response = store
.client()
.await?
.list_threads(request)
.await
.map_err(remote_status_to_error)?
.into_inner();
let items = response
.threads
.into_iter()
.map(stored_thread_from_proto)
.collect::<ThreadStoreResult<Vec<_>>>()?;
Ok(ThreadPage {
items,
next_cursor: response.next_cursor,
})
}
#[cfg(test)]
mod tests {
use std::path::PathBuf;
use codex_protocol::openai_models::ReasoningEffort;
use codex_protocol::protocol::SessionSource;
use pretty_assertions::assert_eq;
use tonic::Request;
use tonic::Response;
use tonic::Status;
use tonic::transport::Server;
use super::super::helpers::stored_thread_to_proto;
use super::super::proto::thread_store_server;
use super::super::proto::thread_store_server::ThreadStoreServer;
use super::*;
use crate::ThreadSortKey;
use crate::ThreadStore;
#[derive(Default)]
struct TestServer;
#[tonic::async_trait]
impl thread_store_server::ThreadStore for TestServer {
async fn list_threads(
&self,
request: Request<proto::ListThreadsRequest>,
) -> Result<Response<proto::ListThreadsResponse>, Status> {
let request = request.into_inner();
assert_eq!(request.page_size, 2);
assert_eq!(request.cursor.as_deref(), Some("cursor-1"));
assert_eq!(
proto::ThreadSortKey::try_from(request.sort_key),
Ok(proto::ThreadSortKey::UpdatedAt)
);
assert_eq!(request.archived, true);
assert_eq!(request.search_term.as_deref(), Some("needle"));
assert_eq!(
request.model_provider_filter,
Some(proto::ModelProviderFilter {
values: vec!["openai".to_string()],
})
);
assert_eq!(request.allowed_sources.len(), 1);
assert_eq!(
proto::SessionSourceKind::try_from(request.allowed_sources[0].kind),
Ok(proto::SessionSourceKind::Cli)
);
Ok(Response::new(proto::ListThreadsResponse {
threads: vec![proto::StoredThread {
thread_id: "11111111-1111-1111-1111-111111111111".to_string(),
forked_from_id: None,
preview: "hello".to_string(),
name: Some("named thread".to_string()),
model_provider: "openai".to_string(),
model: Some("gpt-5".to_string()),
created_at: 100,
updated_at: 200,
archived_at: Some(300),
cwd: "/workspace".to_string(),
cli_version: "1.2.3".to_string(),
source: Some(proto::SessionSource {
kind: proto::SessionSourceKind::Cli.into(),
..Default::default()
}),
git_info: Some(proto::GitInfo {
sha: Some("abc123".to_string()),
branch: Some("main".to_string()),
origin_url: Some("https://example.test/repo.git".to_string()),
}),
agent_nickname: None,
agent_role: None,
agent_path: None,
reasoning_effort: Some("medium".to_string()),
first_user_message: Some("hello".to_string()),
}],
next_cursor: Some("cursor-2".to_string()),
}))
}
}
#[tokio::test]
async fn list_threads_calls_remote_service() {
let listener = tokio::net::TcpListener::bind("127.0.0.1:0")
.await
.expect("bind test server");
let addr = listener.local_addr().expect("test server addr");
let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
let server = tokio::spawn(async move {
Server::builder()
.add_service(ThreadStoreServer::new(TestServer))
.serve_with_incoming_shutdown(
tokio_stream::wrappers::TcpListenerStream::new(listener),
async {
let _ = shutdown_rx.await;
},
)
.await
});
let store = RemoteThreadStore::new(format!("http://{addr}"));
let page = store
.list_threads(ListThreadsParams {
page_size: 2,
cursor: Some("cursor-1".to_string()),
sort_key: ThreadSortKey::UpdatedAt,
allowed_sources: vec![SessionSource::Cli],
model_providers: Some(vec!["openai".to_string()]),
archived: true,
search_term: Some("needle".to_string()),
})
.await
.expect("list threads");
assert_eq!(page.next_cursor.as_deref(), Some("cursor-2"));
assert_eq!(page.items.len(), 1);
let item = &page.items[0];
assert_eq!(
item.thread_id.to_string(),
"11111111-1111-1111-1111-111111111111"
);
assert_eq!(item.name.as_deref(), Some("named thread"));
assert_eq!(item.preview, "hello");
assert_eq!(item.first_user_message.as_deref(), Some("hello"));
assert_eq!(item.model_provider, "openai");
assert_eq!(item.model.as_deref(), Some("gpt-5"));
assert_eq!(item.created_at.timestamp(), 100);
assert_eq!(item.updated_at.timestamp(), 200);
assert_eq!(item.archived_at.map(|ts| ts.timestamp()), Some(300));
assert_eq!(item.cwd, PathBuf::from("/workspace"));
assert_eq!(item.cli_version, "1.2.3");
assert_eq!(item.source, SessionSource::Cli);
assert_eq!(item.reasoning_effort, Some(ReasoningEffort::Medium));
assert_eq!(
item.git_info.as_ref().and_then(|git| git.branch.as_deref()),
Some("main")
);
let _ = shutdown_tx.send(());
server.await.expect("join server").expect("server");
}
#[test]
fn stored_thread_proto_roundtrips_through_domain_type() {
let thread = proto::StoredThread {
thread_id: "11111111-1111-1111-1111-111111111111".to_string(),
forked_from_id: Some("22222222-2222-2222-2222-222222222222".to_string()),
preview: "preview text".to_string(),
name: Some("named thread".to_string()),
model_provider: "openai".to_string(),
model: Some("gpt-5".to_string()),
created_at: 100,
updated_at: 200,
archived_at: Some(300),
cwd: "/workspace/project".to_string(),
cli_version: "1.2.3".to_string(),
source: Some(proto::SessionSource {
kind: proto::SessionSourceKind::SubAgentThreadSpawn.into(),
sub_agent_parent_thread_id: Some(
"33333333-3333-3333-3333-333333333333".to_string(),
),
sub_agent_depth: Some(2),
sub_agent_path: Some("/root/review/backend".to_string()),
sub_agent_nickname: Some("Navigator".to_string()),
sub_agent_role: Some("explorer".to_string()),
..Default::default()
}),
git_info: Some(proto::GitInfo {
sha: Some("abc123".to_string()),
branch: Some("main".to_string()),
origin_url: Some("https://example.test/repo.git".to_string()),
}),
agent_nickname: Some("Navigator".to_string()),
agent_role: Some("explorer".to_string()),
agent_path: Some("/root/review/backend".to_string()),
reasoning_effort: Some("high".to_string()),
first_user_message: Some("first message".to_string()),
};
let stored = stored_thread_from_proto(thread.clone()).expect("proto to stored thread");
assert_eq!(stored.rollout_path, None);
assert!(stored.history.is_none());
assert_eq!(stored_thread_to_proto(stored), thread);
}
}

View File

@@ -0,0 +1,112 @@
mod helpers;
mod list_threads;
use async_trait::async_trait;
use crate::AppendThreadItemsParams;
use crate::ArchiveThreadParams;
use crate::CreateThreadParams;
use crate::ListThreadsParams;
use crate::LoadThreadHistoryParams;
use crate::ReadThreadParams;
use crate::ResumeThreadRecorderParams;
use crate::SetThreadNameParams;
use crate::StoredThread;
use crate::StoredThreadHistory;
use crate::ThreadPage;
use crate::ThreadRecorder;
use crate::ThreadStore;
use crate::ThreadStoreError;
use crate::ThreadStoreResult;
use crate::UpdateThreadMetadataParams;
use proto::thread_store_client::ThreadStoreClient;
#[path = "proto/codex.thread_store.v1.rs"]
mod proto;
/// gRPC-backed [`ThreadStore`] implementation for deployments whose durable thread data lives
/// outside the app-server process.
#[derive(Clone, Debug)]
pub struct RemoteThreadStore {
endpoint: String,
}
impl RemoteThreadStore {
pub fn new(endpoint: impl Into<String>) -> Self {
Self {
endpoint: endpoint.into(),
}
}
async fn client(&self) -> ThreadStoreResult<ThreadStoreClient<tonic::transport::Channel>> {
ThreadStoreClient::connect(self.endpoint.clone())
.await
.map_err(|err| ThreadStoreError::Internal {
message: format!("failed to connect to remote thread store: {err}"),
})
}
}
#[async_trait]
impl ThreadStore for RemoteThreadStore {
async fn create_thread(
&self,
_params: CreateThreadParams,
) -> ThreadStoreResult<Box<dyn ThreadRecorder>> {
Err(not_implemented("create_thread"))
}
async fn resume_thread_recorder(
&self,
_params: ResumeThreadRecorderParams,
) -> ThreadStoreResult<Box<dyn ThreadRecorder>> {
Err(not_implemented("resume_thread_recorder"))
}
async fn append_items(&self, _params: AppendThreadItemsParams) -> ThreadStoreResult<()> {
Err(not_implemented("append_items"))
}
async fn load_history(
&self,
_params: LoadThreadHistoryParams,
) -> ThreadStoreResult<StoredThreadHistory> {
Err(not_implemented("load_history"))
}
async fn read_thread(&self, _params: ReadThreadParams) -> ThreadStoreResult<StoredThread> {
Err(not_implemented("read_thread"))
}
async fn list_threads(&self, params: ListThreadsParams) -> ThreadStoreResult<ThreadPage> {
list_threads::list_threads(self, params).await
}
async fn set_thread_name(&self, _params: SetThreadNameParams) -> ThreadStoreResult<()> {
Err(not_implemented("set_thread_name"))
}
async fn update_thread_metadata(
&self,
_params: UpdateThreadMetadataParams,
) -> ThreadStoreResult<StoredThread> {
Err(not_implemented("update_thread_metadata"))
}
async fn archive_thread(&self, _params: ArchiveThreadParams) -> ThreadStoreResult<()> {
Err(not_implemented("archive_thread"))
}
async fn unarchive_thread(
&self,
_params: ArchiveThreadParams,
) -> ThreadStoreResult<StoredThread> {
Err(not_implemented("unarchive_thread"))
}
}
fn not_implemented(method: &str) -> ThreadStoreError {
ThreadStoreError::Internal {
message: format!("remote thread store does not implement {method} yet"),
}
}

View File

@@ -0,0 +1,86 @@
syntax = "proto3";
package codex.thread_store.v1;
service ThreadStore {
rpc ListThreads(ListThreadsRequest) returns (ListThreadsResponse);
}
message ListThreadsRequest {
uint32 page_size = 1;
optional string cursor = 2;
ThreadSortKey sort_key = 3;
repeated SessionSource allowed_sources = 4;
optional ModelProviderFilter model_provider_filter = 5;
bool archived = 6;
optional string search_term = 7;
}
message ModelProviderFilter {
repeated string values = 1;
}
enum ThreadSortKey {
THREAD_SORT_KEY_CREATED_AT = 0;
THREAD_SORT_KEY_UPDATED_AT = 1;
}
message ListThreadsResponse {
repeated StoredThread threads = 1;
optional string next_cursor = 2;
}
message StoredThread {
// Mirrors Rust's StoredThread. Domain types that are not protobuf-native,
// such as ThreadId, DateTime<Utc>, and PathBuf, are represented as their
// stable scalar forms on the wire.
string thread_id = 1;
optional string forked_from_id = 2;
string preview = 3;
optional string name = 4;
string model_provider = 5;
optional string model = 6;
int64 created_at = 7;
int64 updated_at = 8;
optional int64 archived_at = 9;
string cwd = 10;
string cli_version = 11;
SessionSource source = 12;
optional GitInfo git_info = 13;
optional string agent_nickname = 14;
optional string agent_role = 15;
optional string agent_path = 16;
optional string reasoning_effort = 17;
optional string first_user_message = 18;
}
message SessionSource {
SessionSourceKind kind = 1;
optional string custom = 2;
optional string sub_agent_parent_thread_id = 3;
optional int32 sub_agent_depth = 4;
optional string sub_agent_other = 5;
optional string sub_agent_path = 6;
optional string sub_agent_nickname = 7;
optional string sub_agent_role = 8;
}
enum SessionSourceKind {
SESSION_SOURCE_KIND_UNKNOWN = 0;
SESSION_SOURCE_KIND_CLI = 1;
SESSION_SOURCE_KIND_VSCODE = 2;
SESSION_SOURCE_KIND_EXEC = 3;
SESSION_SOURCE_KIND_APP_SERVER = 4;
SESSION_SOURCE_KIND_CUSTOM = 5;
SESSION_SOURCE_KIND_SUB_AGENT_REVIEW = 6;
SESSION_SOURCE_KIND_SUB_AGENT_COMPACT = 7;
SESSION_SOURCE_KIND_SUB_AGENT_THREAD_SPAWN = 8;
SESSION_SOURCE_KIND_SUB_AGENT_MEMORY_CONSOLIDATION = 9;
SESSION_SOURCE_KIND_SUB_AGENT_OTHER = 10;
}
message GitInfo {
optional string sha = 1;
optional string branch = 2;
optional string origin_url = 3;
}

View File

@@ -0,0 +1,460 @@
// This file is @generated by prost-build.
#![allow(clippy::trivially_copy_pass_by_ref)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ListThreadsRequest {
#[prost(uint32, tag = "1")]
pub page_size: u32,
#[prost(string, optional, tag = "2")]
pub cursor: ::core::option::Option<::prost::alloc::string::String>,
#[prost(enumeration = "ThreadSortKey", tag = "3")]
pub sort_key: i32,
#[prost(message, repeated, tag = "4")]
pub allowed_sources: ::prost::alloc::vec::Vec<SessionSource>,
#[prost(message, optional, tag = "5")]
pub model_provider_filter: ::core::option::Option<ModelProviderFilter>,
#[prost(bool, tag = "6")]
pub archived: bool,
#[prost(string, optional, tag = "7")]
pub search_term: ::core::option::Option<::prost::alloc::string::String>,
}
#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
pub struct ModelProviderFilter {
#[prost(string, repeated, tag = "1")]
pub values: ::prost::alloc::vec::Vec<::prost::alloc::string::String>,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ListThreadsResponse {
#[prost(message, repeated, tag = "1")]
pub threads: ::prost::alloc::vec::Vec<StoredThread>,
#[prost(string, optional, tag = "2")]
pub next_cursor: ::core::option::Option<::prost::alloc::string::String>,
}
#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
pub struct StoredThread {
/// Mirrors Rust's StoredThread. Domain types that are not protobuf-native,
/// such as ThreadId, DateTime<Utc>, and PathBuf, are represented as their
/// stable scalar forms on the wire.
#[prost(string, tag = "1")]
pub thread_id: ::prost::alloc::string::String,
#[prost(string, optional, tag = "2")]
pub forked_from_id: ::core::option::Option<::prost::alloc::string::String>,
#[prost(string, tag = "3")]
pub preview: ::prost::alloc::string::String,
#[prost(string, optional, tag = "4")]
pub name: ::core::option::Option<::prost::alloc::string::String>,
#[prost(string, tag = "5")]
pub model_provider: ::prost::alloc::string::String,
#[prost(string, optional, tag = "6")]
pub model: ::core::option::Option<::prost::alloc::string::String>,
#[prost(int64, tag = "7")]
pub created_at: i64,
#[prost(int64, tag = "8")]
pub updated_at: i64,
#[prost(int64, optional, tag = "9")]
pub archived_at: ::core::option::Option<i64>,
#[prost(string, tag = "10")]
pub cwd: ::prost::alloc::string::String,
#[prost(string, tag = "11")]
pub cli_version: ::prost::alloc::string::String,
#[prost(message, optional, tag = "12")]
pub source: ::core::option::Option<SessionSource>,
#[prost(message, optional, tag = "13")]
pub git_info: ::core::option::Option<GitInfo>,
#[prost(string, optional, tag = "14")]
pub agent_nickname: ::core::option::Option<::prost::alloc::string::String>,
#[prost(string, optional, tag = "15")]
pub agent_role: ::core::option::Option<::prost::alloc::string::String>,
#[prost(string, optional, tag = "16")]
pub agent_path: ::core::option::Option<::prost::alloc::string::String>,
#[prost(string, optional, tag = "17")]
pub reasoning_effort: ::core::option::Option<::prost::alloc::string::String>,
#[prost(string, optional, tag = "18")]
pub first_user_message: ::core::option::Option<::prost::alloc::string::String>,
}
#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
pub struct SessionSource {
#[prost(enumeration = "SessionSourceKind", tag = "1")]
pub kind: i32,
#[prost(string, optional, tag = "2")]
pub custom: ::core::option::Option<::prost::alloc::string::String>,
#[prost(string, optional, tag = "3")]
pub sub_agent_parent_thread_id: ::core::option::Option<::prost::alloc::string::String>,
#[prost(int32, optional, tag = "4")]
pub sub_agent_depth: ::core::option::Option<i32>,
#[prost(string, optional, tag = "5")]
pub sub_agent_other: ::core::option::Option<::prost::alloc::string::String>,
#[prost(string, optional, tag = "6")]
pub sub_agent_path: ::core::option::Option<::prost::alloc::string::String>,
#[prost(string, optional, tag = "7")]
pub sub_agent_nickname: ::core::option::Option<::prost::alloc::string::String>,
#[prost(string, optional, tag = "8")]
pub sub_agent_role: ::core::option::Option<::prost::alloc::string::String>,
}
#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
pub struct GitInfo {
#[prost(string, optional, tag = "1")]
pub sha: ::core::option::Option<::prost::alloc::string::String>,
#[prost(string, optional, tag = "2")]
pub branch: ::core::option::Option<::prost::alloc::string::String>,
#[prost(string, optional, tag = "3")]
pub origin_url: ::core::option::Option<::prost::alloc::string::String>,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)]
#[repr(i32)]
pub enum ThreadSortKey {
CreatedAt = 0,
UpdatedAt = 1,
}
impl ThreadSortKey {
/// String value of the enum field names used in the ProtoBuf definition.
///
/// The values are not transformed in any way and thus are considered stable
/// (if the ProtoBuf definition does not change) and safe for programmatic use.
pub fn as_str_name(&self) -> &'static str {
match self {
Self::CreatedAt => "THREAD_SORT_KEY_CREATED_AT",
Self::UpdatedAt => "THREAD_SORT_KEY_UPDATED_AT",
}
}
/// Creates an enum from field names used in the ProtoBuf definition.
pub fn from_str_name(value: &str) -> ::core::option::Option<Self> {
match value {
"THREAD_SORT_KEY_CREATED_AT" => Some(Self::CreatedAt),
"THREAD_SORT_KEY_UPDATED_AT" => Some(Self::UpdatedAt),
_ => None,
}
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)]
#[repr(i32)]
pub enum SessionSourceKind {
Unknown = 0,
Cli = 1,
Vscode = 2,
Exec = 3,
AppServer = 4,
Custom = 5,
SubAgentReview = 6,
SubAgentCompact = 7,
SubAgentThreadSpawn = 8,
SubAgentMemoryConsolidation = 9,
SubAgentOther = 10,
}
impl SessionSourceKind {
/// String value of the enum field names used in the ProtoBuf definition.
///
/// The values are not transformed in any way and thus are considered stable
/// (if the ProtoBuf definition does not change) and safe for programmatic use.
pub fn as_str_name(&self) -> &'static str {
match self {
Self::Unknown => "SESSION_SOURCE_KIND_UNKNOWN",
Self::Cli => "SESSION_SOURCE_KIND_CLI",
Self::Vscode => "SESSION_SOURCE_KIND_VSCODE",
Self::Exec => "SESSION_SOURCE_KIND_EXEC",
Self::AppServer => "SESSION_SOURCE_KIND_APP_SERVER",
Self::Custom => "SESSION_SOURCE_KIND_CUSTOM",
Self::SubAgentReview => "SESSION_SOURCE_KIND_SUB_AGENT_REVIEW",
Self::SubAgentCompact => "SESSION_SOURCE_KIND_SUB_AGENT_COMPACT",
Self::SubAgentThreadSpawn => "SESSION_SOURCE_KIND_SUB_AGENT_THREAD_SPAWN",
Self::SubAgentMemoryConsolidation => {
"SESSION_SOURCE_KIND_SUB_AGENT_MEMORY_CONSOLIDATION"
}
Self::SubAgentOther => "SESSION_SOURCE_KIND_SUB_AGENT_OTHER",
}
}
/// Creates an enum from field names used in the ProtoBuf definition.
pub fn from_str_name(value: &str) -> ::core::option::Option<Self> {
match value {
"SESSION_SOURCE_KIND_UNKNOWN" => Some(Self::Unknown),
"SESSION_SOURCE_KIND_CLI" => Some(Self::Cli),
"SESSION_SOURCE_KIND_VSCODE" => Some(Self::Vscode),
"SESSION_SOURCE_KIND_EXEC" => Some(Self::Exec),
"SESSION_SOURCE_KIND_APP_SERVER" => Some(Self::AppServer),
"SESSION_SOURCE_KIND_CUSTOM" => Some(Self::Custom),
"SESSION_SOURCE_KIND_SUB_AGENT_REVIEW" => Some(Self::SubAgentReview),
"SESSION_SOURCE_KIND_SUB_AGENT_COMPACT" => Some(Self::SubAgentCompact),
"SESSION_SOURCE_KIND_SUB_AGENT_THREAD_SPAWN" => Some(Self::SubAgentThreadSpawn),
"SESSION_SOURCE_KIND_SUB_AGENT_MEMORY_CONSOLIDATION" => {
Some(Self::SubAgentMemoryConsolidation)
}
"SESSION_SOURCE_KIND_SUB_AGENT_OTHER" => Some(Self::SubAgentOther),
_ => None,
}
}
}
/// Generated client implementations.
pub mod thread_store_client {
#![allow(
unused_variables,
dead_code,
missing_docs,
clippy::wildcard_imports,
clippy::let_unit_value
)]
use tonic::codegen::http::Uri;
use tonic::codegen::*;
#[derive(Debug, Clone)]
pub struct ThreadStoreClient<T> {
inner: tonic::client::Grpc<T>,
}
impl ThreadStoreClient<tonic::transport::Channel> {
/// Attempt to create a new client by connecting to a given endpoint.
pub async fn connect<D>(dst: D) -> Result<Self, tonic::transport::Error>
where
D: TryInto<tonic::transport::Endpoint>,
D::Error: Into<StdError>,
{
let conn = tonic::transport::Endpoint::new(dst)?.connect().await?;
Ok(Self::new(conn))
}
}
impl<T> ThreadStoreClient<T>
where
T: tonic::client::GrpcService<tonic::body::Body>,
T::Error: Into<StdError>,
T::ResponseBody: Body<Data = Bytes> + std::marker::Send + 'static,
<T::ResponseBody as Body>::Error: Into<StdError> + std::marker::Send,
{
pub fn new(inner: T) -> Self {
let inner = tonic::client::Grpc::new(inner);
Self { inner }
}
pub fn with_origin(inner: T, origin: Uri) -> Self {
let inner = tonic::client::Grpc::with_origin(inner, origin);
Self { inner }
}
pub fn with_interceptor<F>(
inner: T,
interceptor: F,
) -> ThreadStoreClient<InterceptedService<T, F>>
where
F: tonic::service::Interceptor,
T::ResponseBody: Default,
T: tonic::codegen::Service<
http::Request<tonic::body::Body>,
Response = http::Response<
<T as tonic::client::GrpcService<tonic::body::Body>>::ResponseBody,
>,
>,
<T as tonic::codegen::Service<http::Request<tonic::body::Body>>>::Error:
Into<StdError> + std::marker::Send + std::marker::Sync,
{
ThreadStoreClient::new(InterceptedService::new(inner, interceptor))
}
/// Compress requests with the given encoding.
///
/// This requires the server to support it otherwise it might respond with an
/// error.
#[must_use]
pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self {
self.inner = self.inner.send_compressed(encoding);
self
}
/// Enable decompressing responses.
#[must_use]
pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self {
self.inner = self.inner.accept_compressed(encoding);
self
}
/// Limits the maximum size of a decoded message.
///
/// Default: `4MB`
#[must_use]
pub fn max_decoding_message_size(mut self, limit: usize) -> Self {
self.inner = self.inner.max_decoding_message_size(limit);
self
}
/// Limits the maximum size of an encoded message.
///
/// Default: `usize::MAX`
#[must_use]
pub fn max_encoding_message_size(mut self, limit: usize) -> Self {
self.inner = self.inner.max_encoding_message_size(limit);
self
}
pub async fn list_threads(
&mut self,
request: impl tonic::IntoRequest<super::ListThreadsRequest>,
) -> std::result::Result<tonic::Response<super::ListThreadsResponse>, tonic::Status>
{
self.inner.ready().await.map_err(|e| {
tonic::Status::unknown(format!("Service was not ready: {}", e.into()))
})?;
let codec = tonic_prost::ProstCodec::default();
let path = http::uri::PathAndQuery::from_static(
"/codex.thread_store.v1.ThreadStore/ListThreads",
);
let mut req = request.into_request();
req.extensions_mut().insert(GrpcMethod::new(
"codex.thread_store.v1.ThreadStore",
"ListThreads",
));
self.inner.unary(req, path, codec).await
}
}
}
/// Generated server implementations.
pub mod thread_store_server {
#![allow(
unused_variables,
dead_code,
missing_docs,
clippy::wildcard_imports,
clippy::let_unit_value
)]
use tonic::codegen::*;
/// Generated trait containing gRPC methods that should be implemented for use with ThreadStoreServer.
#[async_trait]
pub trait ThreadStore: std::marker::Send + std::marker::Sync + 'static {
async fn list_threads(
&self,
request: tonic::Request<super::ListThreadsRequest>,
) -> std::result::Result<tonic::Response<super::ListThreadsResponse>, tonic::Status>;
}
#[derive(Debug)]
pub struct ThreadStoreServer<T> {
inner: Arc<T>,
accept_compression_encodings: EnabledCompressionEncodings,
send_compression_encodings: EnabledCompressionEncodings,
max_decoding_message_size: Option<usize>,
max_encoding_message_size: Option<usize>,
}
impl<T> ThreadStoreServer<T> {
pub fn new(inner: T) -> Self {
Self::from_arc(Arc::new(inner))
}
pub fn from_arc(inner: Arc<T>) -> Self {
Self {
inner,
accept_compression_encodings: Default::default(),
send_compression_encodings: Default::default(),
max_decoding_message_size: None,
max_encoding_message_size: None,
}
}
pub fn with_interceptor<F>(inner: T, interceptor: F) -> InterceptedService<Self, F>
where
F: tonic::service::Interceptor,
{
InterceptedService::new(Self::new(inner), interceptor)
}
/// Enable decompressing requests with the given encoding.
#[must_use]
pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self {
self.accept_compression_encodings.enable(encoding);
self
}
/// Compress responses with the given encoding, if the client supports it.
#[must_use]
pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self {
self.send_compression_encodings.enable(encoding);
self
}
/// Limits the maximum size of a decoded message.
///
/// Default: `4MB`
#[must_use]
pub fn max_decoding_message_size(mut self, limit: usize) -> Self {
self.max_decoding_message_size = Some(limit);
self
}
/// Limits the maximum size of an encoded message.
///
/// Default: `usize::MAX`
#[must_use]
pub fn max_encoding_message_size(mut self, limit: usize) -> Self {
self.max_encoding_message_size = Some(limit);
self
}
}
impl<T, B> tonic::codegen::Service<http::Request<B>> for ThreadStoreServer<T>
where
T: ThreadStore,
B: Body + std::marker::Send + 'static,
B::Error: Into<StdError> + std::marker::Send + 'static,
{
type Response = http::Response<tonic::body::Body>;
type Error = std::convert::Infallible;
type Future = BoxFuture<Self::Response, Self::Error>;
fn poll_ready(
&mut self,
_cx: &mut Context<'_>,
) -> Poll<std::result::Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, req: http::Request<B>) -> Self::Future {
match req.uri().path() {
"/codex.thread_store.v1.ThreadStore/ListThreads" => {
#[allow(non_camel_case_types)]
struct ListThreadsSvc<T: ThreadStore>(pub Arc<T>);
impl<T: ThreadStore> tonic::server::UnaryService<super::ListThreadsRequest> for ListThreadsSvc<T> {
type Response = super::ListThreadsResponse;
type Future = BoxFuture<tonic::Response<Self::Response>, tonic::Status>;
fn call(
&mut self,
request: tonic::Request<super::ListThreadsRequest>,
) -> Self::Future {
let inner = Arc::clone(&self.0);
let fut = async move {
<T as ThreadStore>::list_threads(&inner, request).await
};
Box::pin(fut)
}
}
let accept_compression_encodings = self.accept_compression_encodings;
let send_compression_encodings = self.send_compression_encodings;
let max_decoding_message_size = self.max_decoding_message_size;
let max_encoding_message_size = self.max_encoding_message_size;
let inner = self.inner.clone();
let fut = async move {
let method = ListThreadsSvc(inner);
let codec = tonic_prost::ProstCodec::default();
let mut grpc = tonic::server::Grpc::new(codec)
.apply_compression_config(
accept_compression_encodings,
send_compression_encodings,
)
.apply_max_message_size_config(
max_decoding_message_size,
max_encoding_message_size,
);
let res = grpc.unary(method, req).await;
Ok(res)
};
Box::pin(fut)
}
_ => Box::pin(async move {
let mut response = http::Response::new(tonic::body::Body::default());
let headers = response.headers_mut();
headers.insert(
tonic::Status::GRPC_STATUS,
(tonic::Code::Unimplemented as i32).into(),
);
headers.insert(
http::header::CONTENT_TYPE,
tonic::metadata::GRPC_CONTENT_TYPE,
);
Ok(response)
}),
}
}
}
impl<T> Clone for ThreadStoreServer<T> {
fn clone(&self) -> Self {
let inner = self.inner.clone();
Self {
inner,
accept_compression_encodings: self.accept_compression_encodings,
send_compression_encodings: self.send_compression_encodings,
max_decoding_message_size: self.max_decoding_message_size,
max_encoding_message_size: self.max_encoding_message_size,
}
}
}
/// Generated gRPC service name
pub const SERVICE_NAME: &str = "codex.thread_store.v1.ThreadStore";
impl<T> tonic::server::NamedService for ThreadStoreServer<T> {
const NAME: &'static str = SERVICE_NAME;
}
}