Compare commits

...

2 Commits

Author SHA1 Message Date
bryanashley
4943f09dd2 Merge main into core API opaque identity
Resolve PR conflicts after main moved forward, preserving the current optional state DB thread-manager shape while keeping opaque identity forwarding on remote core contracts.

Co-authored-by: Codex <noreply@openai.com>
2026-05-07 15:32:04 +00:00
bryanashley
5d40ee8cab Thread opaque identity through core API
Add an opaque identity type at the protocol layer and pass it from core API options into remote thread config and thread store clients. App-server runtime options now reuse the same core API option path when constructing core stores.

Co-authored-by: Codex <noreply@openai.com>
2026-05-06 15:16:55 +00:00
14 changed files with 404 additions and 24 deletions

View File

@@ -433,6 +433,7 @@ async fn start_uninitialized(args: InProcessStartArgs) -> IoResult<InProcessClie
rpc_transport: AppServerRpcTransport::InProcess,
remote_control_handle: None,
plugin_startup_tasks: crate::PluginStartupTasks::Start,
core_api_options: codex_core::CoreApiOptions::default(),
}));
let mut thread_created_rx = processor.thread_created_receiver();
let session = Arc::new(ConnectionSessionState::new(ConnectionOrigin::InProcess));

View File

@@ -4,10 +4,10 @@ use codex_arg0::Arg0DispatchPaths;
use codex_config::ConfigLayerStackOrdering;
use codex_config::LoaderOverrides;
use codex_config::NoopThreadConfigLoader;
use codex_config::RemoteThreadConfigLoader;
use codex_config::ThreadConfigLoader;
use codex_core::CoreApiOptions;
use codex_core::config::Config;
use codex_core::resolve_installation_id;
use codex_core::thread_config_loader_from_config_with_options;
use codex_exec_server::EnvironmentManagerArgs;
use codex_features::Feature;
use codex_login::AuthManager;
@@ -115,13 +115,6 @@ enum LogFormat {
type StderrLogLayer = Box<dyn Layer<Registry> + Send + Sync + 'static>;
fn configured_thread_config_loader(config: &Config) -> Arc<dyn ThreadConfigLoader> {
match config.experimental_thread_config_endpoint.as_deref() {
Some(endpoint) => Arc::new(RemoteThreadConfigLoader::new(endpoint)),
None => Arc::new(NoopThreadConfigLoader),
}
}
/// Control-plane messages from the processor/transport side to the outbound router task.
///
/// `run_main_with_transport` now uses two loops/tasks:
@@ -373,15 +366,17 @@ pub enum PluginStartupTasks {
Skip,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct AppServerRuntimeOptions {
pub plugin_startup_tasks: PluginStartupTasks,
pub core_api_options: CoreApiOptions,
}
impl Default for AppServerRuntimeOptions {
fn default() -> Self {
Self {
plugin_startup_tasks: PluginStartupTasks::Start,
core_api_options: CoreApiOptions::default(),
}
}
}
@@ -456,7 +451,10 @@ pub async fn run_main_with_transport_options(
.await
{
Ok(config) => {
let discovered_thread_config_loader = configured_thread_config_loader(&config);
let discovered_thread_config_loader = thread_config_loader_from_config_with_options(
&config,
&runtime_options.core_api_options,
);
config_manager
.replace_thread_config_loader(Arc::clone(&discovered_thread_config_loader));
let auth_manager =
@@ -780,6 +778,7 @@ pub async fn run_main_with_transport_options(
rpc_transport: analytics_rpc_transport(&transport),
remote_control_handle: Some(remote_control_handle.clone()),
plugin_startup_tasks: runtime_options.plugin_startup_tasks,
core_api_options: runtime_options.core_api_options.clone(),
}));
let mut thread_created_rx = processor.thread_created_receiver();
let mut running_turn_count_rx = processor.subscribe_running_assistant_turn_count();

View File

@@ -60,9 +60,10 @@ use codex_app_server_protocol::ServerRequestPayload;
use codex_app_server_protocol::experimental_required_message;
use codex_arg0::Arg0DispatchPaths;
use codex_chatgpt::workspace_settings;
use codex_core::CoreApiOptions;
use codex_core::ThreadManager;
use codex_core::config::Config;
use codex_core::thread_store_from_config;
use codex_core::thread_store_from_config_with_options;
use codex_exec_server::EnvironmentManager;
use codex_feedback::CodexFeedback;
use codex_login::AuthManager;
@@ -262,6 +263,7 @@ pub(crate) struct MessageProcessorArgs {
pub(crate) rpc_transport: AppServerRpcTransport,
pub(crate) remote_control_handle: Option<RemoteControlHandle>,
pub(crate) plugin_startup_tasks: crate::PluginStartupTasks,
pub(crate) core_api_options: CoreApiOptions,
}
impl MessageProcessor {
@@ -285,6 +287,7 @@ impl MessageProcessor {
rpc_transport,
remote_control_handle,
plugin_startup_tasks,
core_api_options,
} = args;
auth_manager.set_external_auth(Arc::new(ExternalAuthRefreshBridge {
outgoing: outgoing.clone(),
@@ -292,7 +295,11 @@ impl MessageProcessor {
// The thread store is intentionally process-scoped. Config reloads can
// affect per-thread behavior, but they must not move newly started,
// resumed, or forked threads to a different persistence backend/root.
let thread_store = thread_store_from_config(config.as_ref(), state_db.clone());
let thread_store = thread_store_from_config_with_options(
config.as_ref(),
state_db.clone(),
&core_api_options,
);
let thread_manager = Arc::new(ThreadManager::new(
config.as_ref(),
auth_manager.clone(),

View File

@@ -298,6 +298,7 @@ async fn build_test_processor(
rpc_transport: AppServerRpcTransport::Stdio,
remote_control_handle: None,
plugin_startup_tasks: crate::PluginStartupTasks::Start,
core_api_options: codex_core::CoreApiOptions::default(),
}));
(processor, outgoing_rx)
}

View File

@@ -31,6 +31,8 @@ pub use cloud_requirements::CloudRequirementsLoadError;
pub use cloud_requirements::CloudRequirementsLoadErrorCode;
pub use cloud_requirements::CloudRequirementsLoader;
pub use codex_app_server_protocol::ConfigLayerSource;
pub use codex_protocol::CODEX_CORE_IDENTITY_HEADER;
pub use codex_protocol::OpaqueIdentity;
pub use codex_utils_absolute_path::AbsolutePathBuf;
pub use config_requirements::AppRequirementToml;
pub use config_requirements::AppsRequirementsToml;

View File

@@ -6,6 +6,8 @@ use std::time::Duration;
use async_trait::async_trait;
use codex_model_provider_info::ModelProviderInfo;
use codex_model_provider_info::WireApi;
use codex_protocol::CODEX_CORE_IDENTITY_HEADER;
use codex_protocol::OpaqueIdentity;
use codex_protocol::config_types::ModelProviderAuthInfo;
use codex_utils_absolute_path::AbsolutePathBuf;
@@ -17,29 +19,75 @@ use super::ThreadConfigLoader;
use super::ThreadConfigSource;
use super::UserThreadConfig;
use proto::thread_config_loader_client::ThreadConfigLoaderClient;
use tonic::codegen::InterceptedService;
use tonic::metadata::BinaryMetadataValue;
use tonic::service::Interceptor;
use tonic::transport::Channel;
use tonic::transport::Endpoint;
#[path = "proto/codex.thread_config.v1.rs"]
mod proto;
const REMOTE_THREAD_CONFIG_LOAD_TIMEOUT: Duration = Duration::from_secs(5);
#[derive(Clone, Debug)]
struct IdentityInterceptor {
identity: Option<OpaqueIdentity>,
}
impl Interceptor for IdentityInterceptor {
fn call(
&mut self,
mut request: tonic::Request<()>,
) -> Result<tonic::Request<()>, tonic::Status> {
if let Some(identity) = &self.identity {
request.metadata_mut().insert_bin(
CODEX_CORE_IDENTITY_HEADER,
BinaryMetadataValue::from_bytes(identity.as_bytes()),
);
}
Ok(request)
}
}
type RemoteThreadConfigLoaderClient =
ThreadConfigLoaderClient<InterceptedService<Channel, IdentityInterceptor>>;
/// gRPC-backed [`ThreadConfigLoader`] implementation.
#[derive(Clone, Debug)]
pub struct RemoteThreadConfigLoader {
endpoint: String,
identity: Option<OpaqueIdentity>,
}
impl RemoteThreadConfigLoader {
pub fn new(endpoint: impl Into<String>) -> Self {
Self {
endpoint: endpoint.into(),
identity: None,
}
}
async fn client(
&self,
) -> Result<ThreadConfigLoaderClient<tonic::transport::Channel>, ThreadConfigLoadError> {
ThreadConfigLoaderClient::connect(self.endpoint.clone())
pub fn new_with_identity(
endpoint: impl Into<String>,
identity: Option<OpaqueIdentity>,
) -> Self {
Self {
endpoint: endpoint.into(),
identity,
}
}
async fn client(&self) -> Result<RemoteThreadConfigLoaderClient, ThreadConfigLoadError> {
let channel = Endpoint::new(self.endpoint.clone())
.map_err(|err| {
ThreadConfigLoadError::new(
ThreadConfigLoadErrorCode::RequestFailed,
/*status_code*/ None,
format!("invalid remote thread config loader endpoint: {err}"),
)
})?
.connect()
.await
.map_err(|err| {
ThreadConfigLoadError::new(
@@ -47,7 +95,13 @@ impl RemoteThreadConfigLoader {
/*status_code*/ None,
format!("failed to connect to remote thread config loader: {err}"),
)
})
})?;
Ok(ThreadConfigLoaderClient::with_interceptor(
channel,
IdentityInterceptor {
identity: self.identity.clone(),
},
))
}
}
@@ -299,9 +353,13 @@ mod tests {
use std::collections::BTreeMap;
use std::collections::HashMap;
use std::num::NonZeroU64;
use std::sync::Arc;
use std::sync::Mutex;
use codex_model_provider_info::ModelProviderInfo;
use codex_model_provider_info::WireApi;
use codex_protocol::CODEX_CORE_IDENTITY_HEADER;
use codex_protocol::OpaqueIdentity;
use codex_protocol::config_types::ModelProviderAuthInfo;
use codex_utils_absolute_path::AbsolutePathBuf;
use pretty_assertions::assert_eq;
@@ -319,6 +377,7 @@ mod tests {
struct TestServer {
sources: Vec<proto::ThreadConfigSource>,
expected_cwd: String,
captured_identity: Option<Arc<Mutex<Option<Vec<u8>>>>>,
}
#[tonic::async_trait]
@@ -327,6 +386,16 @@ mod tests {
&self,
request: Request<proto::LoadThreadConfigRequest>,
) -> Result<Response<proto::LoadThreadConfigResponse>, Status> {
if let Some(captured_identity) = &self.captured_identity {
let identity = request
.metadata()
.get_bin(CODEX_CORE_IDENTITY_HEADER)
.and_then(|value| value.to_bytes().ok())
.map(|value| value.to_vec());
*captured_identity
.lock()
.expect("captured identity mutex poisoned") = identity;
}
assert_eq!(
request.into_inner(),
proto::LoadThreadConfigRequest {
@@ -355,6 +424,7 @@ mod tests {
.add_service(ThreadConfigLoaderServer::new(TestServer {
sources: proto_sources(),
expected_cwd,
captured_identity: None,
}))
.serve_with_incoming_shutdown(
tokio_stream::wrappers::TcpListenerStream::new(listener),
@@ -379,6 +449,57 @@ mod tests {
assert_eq!(loaded.expect("load thread config"), expected_sources());
}
#[tokio::test]
async fn load_thread_config_forwards_identity_as_metadata() {
let cwd = workspace_dir().join("project");
let expected_cwd = cwd.to_string_lossy().into_owned();
let captured_identity = Arc::new(Mutex::new(None));
let listener = tokio::net::TcpListener::bind("127.0.0.1:0")
.await
.expect("bind test server");
let addr = listener.local_addr().expect("test server addr");
let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
let server_identity = captured_identity.clone();
let server = tokio::spawn(async move {
Server::builder()
.add_service(ThreadConfigLoaderServer::new(TestServer {
sources: proto_sources(),
expected_cwd,
captured_identity: Some(server_identity),
}))
.serve_with_incoming_shutdown(
tokio_stream::wrappers::TcpListenerStream::new(listener),
async {
let _ = shutdown_rx.await;
},
)
.await
});
let loader = RemoteThreadConfigLoader::new_with_identity(
format!("http://{addr}"),
Some(OpaqueIdentity::from_bytes(b"tenant-key-\x00\xff".to_vec())),
);
let loaded = loader
.load(ThreadConfigContext {
thread_id: Some("thread-1".to_string()),
cwd: Some(cwd),
})
.await;
let _ = shutdown_tx.send(());
server.await.expect("join server").expect("server");
assert_eq!(loaded.expect("load thread config"), expected_sources());
assert_eq!(
captured_identity
.lock()
.expect("captured identity mutex poisoned")
.as_deref(),
Some(&b"tenant-key-\x00\xff"[..])
);
}
#[test]
fn load_thread_config_request_sets_timeout() {
let request = load_thread_config_request(ThreadConfigContext::default());

View File

@@ -8,6 +8,9 @@ pub use codex_app_server_protocol::item_event_to_server_notification;
pub use codex_arg0::Arg0DispatchPaths;
pub use codex_arg0::arg0_dispatch_or_else;
pub use codex_config::ConfigLayerStack;
pub use codex_config::NoopThreadConfigLoader;
pub use codex_config::RemoteThreadConfigLoader;
pub use codex_config::ThreadConfigLoader;
pub use codex_config::config_toml::ProjectConfig;
pub use codex_config::config_toml::RealtimeAudioConfig;
pub use codex_config::config_toml::RealtimeConfig;
@@ -24,6 +27,7 @@ pub use codex_config::types::TuiKeymap;
pub use codex_config::types::TuiNotificationSettings;
pub use codex_config::types::UriBasedFileOpener;
pub use codex_core::CodexThread;
pub use codex_core::CoreApiOptions;
pub use codex_core::ForkSnapshot;
pub use codex_core::McpManager;
pub use codex_core::NewThread;
@@ -42,7 +46,10 @@ pub use codex_core::config::find_codex_home;
pub use codex_core::init_state_db;
pub use codex_core::resolve_installation_id;
pub use codex_core::skills::SkillsManager;
pub use codex_core::thread_config_loader_from_config;
pub use codex_core::thread_config_loader_from_config_with_options;
pub use codex_core::thread_store_from_config;
pub use codex_core::thread_store_from_config_with_options;
pub use codex_exec_server::EnvironmentManager;
pub use codex_exec_server::EnvironmentManagerArgs;
pub use codex_exec_server::ExecServerRuntimePaths;
@@ -54,6 +61,8 @@ pub use codex_model_provider_info::OPENAI_PROVIDER_ID;
pub use codex_model_provider_info::built_in_model_providers;
pub use codex_models_manager::manager::RefreshStrategy;
pub use codex_models_manager::manager::SharedModelsManager;
pub use codex_protocol::CODEX_CORE_IDENTITY_HEADER;
pub use codex_protocol::OpaqueIdentity;
pub use codex_protocol::ThreadId;
pub use codex_protocol::config_types::AltScreenMode;
pub use codex_protocol::config_types::ApprovalsReviewer;

View File

@@ -112,13 +112,17 @@ pub mod review_prompts;
mod thread_manager;
pub(crate) mod web_search;
pub(crate) mod windows_sandbox_read_grants;
pub use thread_manager::CoreApiOptions;
pub use thread_manager::ForkSnapshot;
pub use thread_manager::NewThread;
pub use thread_manager::StartThreadOptions;
pub use thread_manager::ThreadManager;
pub use thread_manager::ThreadShutdownReport;
pub use thread_manager::build_models_manager;
pub use thread_manager::thread_config_loader_from_config;
pub use thread_manager::thread_config_loader_from_config_with_options;
pub use thread_manager::thread_store_from_config;
pub use thread_manager::thread_store_from_config_with_options;
pub use web_search::web_search_action_detail;
pub use web_search::web_search_detail;
pub use windows_sandbox_read_grants::grant_read_root_non_elevated;

View File

@@ -21,6 +21,9 @@ use crate::tasks::interrupted_turn_history_marker;
use codex_analytics::AnalyticsEventsClient;
use codex_app_server_protocol::ThreadHistoryBuilder;
use codex_app_server_protocol::TurnStatus;
use codex_config::NoopThreadConfigLoader;
use codex_config::RemoteThreadConfigLoader;
use codex_config::ThreadConfigLoader;
use codex_core_plugins::PluginsManager;
use codex_exec_server::EnvironmentManager;
use codex_login::AuthManager;
@@ -30,6 +33,7 @@ use codex_model_provider_info::ModelProviderInfo;
use codex_model_provider_info::OPENAI_PROVIDER_ID;
use codex_models_manager::manager::RefreshStrategy;
use codex_models_manager::manager::SharedModelsManager;
use codex_protocol::OpaqueIdentity;
use codex_protocol::ThreadId;
use codex_protocol::config_types::CollaborationModeMask;
use codex_protocol::error::CodexErr;
@@ -225,6 +229,12 @@ pub struct StartThreadOptions {
pub environments: Vec<TurnEnvironmentSelection>,
}
#[derive(Clone, Debug, Default, PartialEq, Eq)]
pub struct CoreApiOptions {
/// Opaque caller identity forwarded to remote core contract implementations.
pub opaque_identity: Option<OpaqueIdentity>,
}
pub(crate) struct ResumeThreadWithHistoryOptions {
pub(crate) config: Config,
pub(crate) initial_history: InitialHistory,
@@ -270,17 +280,45 @@ pub fn build_models_manager(
pub fn thread_store_from_config(
config: &Config,
state_db: Option<StateDbHandle>,
) -> Arc<dyn ThreadStore> {
thread_store_from_config_with_options(config, state_db, &CoreApiOptions::default())
}
pub fn thread_store_from_config_with_options(
config: &Config,
state_db: Option<StateDbHandle>,
options: &CoreApiOptions,
) -> Arc<dyn ThreadStore> {
match &config.experimental_thread_store {
ThreadStoreConfig::Local => Arc::new(LocalThreadStore::new(
LocalThreadStoreConfig::from_config(config),
state_db,
)),
ThreadStoreConfig::Remote { endpoint } => Arc::new(RemoteThreadStore::new(endpoint)),
ThreadStoreConfig::Remote { endpoint } => Arc::new(RemoteThreadStore::new_with_identity(
endpoint,
options.opaque_identity.clone(),
)),
ThreadStoreConfig::InMemory { id } => InMemoryThreadStore::for_id(id),
}
}
pub fn thread_config_loader_from_config(config: &Config) -> Arc<dyn ThreadConfigLoader> {
thread_config_loader_from_config_with_options(config, &CoreApiOptions::default())
}
pub fn thread_config_loader_from_config_with_options(
config: &Config,
options: &CoreApiOptions,
) -> Arc<dyn ThreadConfigLoader> {
match config.experimental_thread_config_endpoint.as_deref() {
Some(endpoint) => Arc::new(RemoteThreadConfigLoader::new_with_identity(
endpoint,
options.opaque_identity.clone(),
)),
None => Arc::new(NoopThreadConfigLoader),
}
}
impl ThreadManager {
#[allow(clippy::too_many_arguments)]
pub fn new(

View File

@@ -1,10 +1,13 @@
pub mod account;
mod agent_path;
pub mod auth;
mod opaque_identity;
mod session_id;
mod thread_id;
mod tool_name;
pub use agent_path::AgentPath;
pub use opaque_identity::CODEX_CORE_IDENTITY_HEADER;
pub use opaque_identity::OpaqueIdentity;
pub use session_id::SessionId;
pub use thread_id::ThreadId;
pub use tool_name::ToolName;

View File

@@ -0,0 +1,71 @@
use std::ffi::OsString;
/// Binary gRPC metadata key used to forward the caller's opaque identity to
/// remote core contract implementations.
pub const CODEX_CORE_IDENTITY_HEADER: &str = "x-codex-core-identity-bin";
/// Opaque identity supplied by a Codex core API caller.
///
/// Codex core treats this as uninterpreted bytes and only forwards it to remote
/// contract implementations that need to perform their own authorization.
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct OpaqueIdentity {
bytes: Vec<u8>,
}
impl OpaqueIdentity {
pub fn from_bytes(bytes: impl Into<Vec<u8>>) -> Self {
Self {
bytes: bytes.into(),
}
}
pub fn from_os_string(value: OsString) -> Self {
Self::from_bytes(os_string_to_bytes(value))
}
pub fn as_bytes(&self) -> &[u8] {
&self.bytes
}
pub fn into_bytes(self) -> Vec<u8> {
self.bytes
}
}
#[cfg(unix)]
fn os_string_to_bytes(value: OsString) -> Vec<u8> {
use std::os::unix::ffi::OsStrExt;
value.as_os_str().as_bytes().to_vec()
}
#[cfg(not(unix))]
fn os_string_to_bytes(value: OsString) -> Vec<u8> {
value.to_string_lossy().into_owned().into_bytes()
}
#[cfg(test)]
mod tests {
use super::OpaqueIdentity;
use pretty_assertions::assert_eq;
#[test]
fn opaque_identity_preserves_bytes() {
let identity = OpaqueIdentity::from_bytes(b"tenant-key-\x00\xff".to_vec());
assert_eq!(identity.as_bytes(), &b"tenant-key-\x00\xff"[..]);
}
#[cfg(unix)]
#[test]
fn opaque_identity_preserves_unix_argv_bytes() {
use std::ffi::OsString;
use std::os::unix::ffi::OsStringExt;
let identity =
OpaqueIdentity::from_os_string(OsString::from_vec(b"tenant-key-\xff".to_vec()));
assert_eq!(identity.as_bytes(), &b"tenant-key-\xff"[..]);
}
}

View File

@@ -12,6 +12,8 @@ mod remote;
mod store;
mod types;
pub use codex_protocol::CODEX_CORE_IDENTITY_HEADER;
pub use codex_protocol::OpaqueIdentity;
pub use error::ThreadStoreError;
pub use error::ThreadStoreResult;
pub use in_memory::InMemoryThreadStore;

View File

@@ -66,7 +66,11 @@ pub(super) async fn list_threads(
#[cfg(test)]
mod tests {
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::Mutex;
use codex_protocol::CODEX_CORE_IDENTITY_HEADER;
use codex_protocol::OpaqueIdentity;
use codex_protocol::openai_models::ReasoningEffort;
use codex_protocol::protocol::SessionSource;
use pretty_assertions::assert_eq;
@@ -83,7 +87,9 @@ mod tests {
use crate::ThreadStore;
#[derive(Default)]
struct TestServer;
struct TestServer {
captured_identity: Option<Arc<Mutex<Option<Vec<u8>>>>>,
}
#[tonic::async_trait]
impl thread_store_server::ThreadStore for TestServer {
@@ -91,6 +97,16 @@ mod tests {
&self,
request: Request<proto::ListThreadsRequest>,
) -> Result<Response<proto::ListThreadsResponse>, Status> {
if let Some(captured_identity) = &self.captured_identity {
let identity = request
.metadata()
.get_bin(CODEX_CORE_IDENTITY_HEADER)
.and_then(|value| value.to_bytes().ok())
.map(|value| value.to_vec());
*captured_identity
.lock()
.expect("captured identity mutex poisoned") = identity;
}
let request = request.into_inner();
assert_eq!(request.page_size, 2);
assert_eq!(request.cursor.as_deref(), Some("cursor-1"));
@@ -171,7 +187,7 @@ mod tests {
let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
let server = tokio::spawn(async move {
Server::builder()
.add_service(ThreadStoreServer::new(TestServer))
.add_service(ThreadStoreServer::new(TestServer::default()))
.serve_with_incoming_shutdown(
tokio_stream::wrappers::TcpListenerStream::new(listener),
async {
@@ -226,6 +242,61 @@ mod tests {
server.await.expect("join server").expect("server");
}
#[tokio::test]
async fn list_threads_forwards_identity_as_metadata() {
let captured_identity = Arc::new(Mutex::new(None));
let listener = tokio::net::TcpListener::bind("127.0.0.1:0")
.await
.expect("bind test server");
let addr = listener.local_addr().expect("test server addr");
let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
let server_identity = captured_identity.clone();
let server = tokio::spawn(async move {
Server::builder()
.add_service(ThreadStoreServer::new(TestServer {
captured_identity: Some(server_identity),
}))
.serve_with_incoming_shutdown(
tokio_stream::wrappers::TcpListenerStream::new(listener),
async {
let _ = shutdown_rx.await;
},
)
.await
});
let store = RemoteThreadStore::new_with_identity(
format!("http://{addr}"),
Some(OpaqueIdentity::from_bytes(b"tenant-key-\x00\xff".to_vec())),
);
store
.list_threads(ListThreadsParams {
page_size: 2,
cursor: Some("cursor-1".to_string()),
sort_key: ThreadSortKey::UpdatedAt,
sort_direction: crate::SortDirection::Desc,
allowed_sources: vec![SessionSource::Cli],
model_providers: Some(vec!["openai".to_string()]),
cwd_filters: Some(vec![PathBuf::from("/workspace")]),
archived: true,
search_term: Some("needle".to_string()),
use_state_db_only: true,
})
.await
.expect("list threads");
assert_eq!(
captured_identity
.lock()
.expect("captured identity mutex poisoned")
.as_deref(),
Some(&b"tenant-key-\x00\xff"[..])
);
let _ = shutdown_tx.send(());
server.await.expect("join server").expect("server");
}
#[test]
fn stored_thread_proto_roundtrips_through_domain_type() {
let thread = proto::StoredThread {

View File

@@ -2,6 +2,8 @@ mod helpers;
mod list_threads;
use async_trait::async_trait;
use codex_protocol::CODEX_CORE_IDENTITY_HEADER;
use codex_protocol::OpaqueIdentity;
use codex_protocol::ThreadId;
use crate::AppendThreadItemsParams;
@@ -20,10 +22,37 @@ use crate::ThreadStoreError;
use crate::ThreadStoreResult;
use crate::UpdateThreadMetadataParams;
use proto::thread_store_client::ThreadStoreClient;
use tonic::codegen::InterceptedService;
use tonic::metadata::BinaryMetadataValue;
use tonic::service::Interceptor;
use tonic::transport::Channel;
use tonic::transport::Endpoint;
#[path = "proto/codex.thread_store.v1.rs"]
mod proto;
#[derive(Clone, Debug)]
struct IdentityInterceptor {
identity: Option<OpaqueIdentity>,
}
impl Interceptor for IdentityInterceptor {
fn call(
&mut self,
mut request: tonic::Request<()>,
) -> Result<tonic::Request<()>, tonic::Status> {
if let Some(identity) = &self.identity {
request.metadata_mut().insert_bin(
CODEX_CORE_IDENTITY_HEADER,
BinaryMetadataValue::from_bytes(identity.as_bytes()),
);
}
Ok(request)
}
}
type RemoteThreadStoreClient = ThreadStoreClient<InterceptedService<Channel, IdentityInterceptor>>;
/// gRPC-backed [`ThreadStore`] implementation for deployments whose durable thread data lives
/// outside the app-server process.
///
@@ -33,21 +62,43 @@ mod proto;
#[derive(Clone, Debug)]
pub struct RemoteThreadStore {
endpoint: String,
identity: Option<OpaqueIdentity>,
}
impl RemoteThreadStore {
pub fn new(endpoint: impl Into<String>) -> Self {
Self {
endpoint: endpoint.into(),
identity: None,
}
}
async fn client(&self) -> ThreadStoreResult<ThreadStoreClient<tonic::transport::Channel>> {
ThreadStoreClient::connect(self.endpoint.clone())
pub fn new_with_identity(
endpoint: impl Into<String>,
identity: Option<OpaqueIdentity>,
) -> Self {
Self {
endpoint: endpoint.into(),
identity,
}
}
async fn client(&self) -> ThreadStoreResult<RemoteThreadStoreClient> {
let channel = Endpoint::new(self.endpoint.clone())
.map_err(|err| ThreadStoreError::InvalidRequest {
message: format!("invalid remote thread store endpoint: {err}"),
})?
.connect()
.await
.map_err(|err| ThreadStoreError::Internal {
message: format!("failed to connect to remote thread store: {err}"),
})
})?;
Ok(ThreadStoreClient::with_interceptor(
channel,
IdentityInterceptor {
identity: self.identity.clone(),
},
))
}
}