mirror of
https://github.com/openai/codex.git
synced 2026-05-21 19:45:26 +00:00
Compare commits
2 Commits
dev/winsto
...
bryanashle
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
4943f09dd2 | ||
|
|
5d40ee8cab |
@@ -433,6 +433,7 @@ async fn start_uninitialized(args: InProcessStartArgs) -> IoResult<InProcessClie
|
||||
rpc_transport: AppServerRpcTransport::InProcess,
|
||||
remote_control_handle: None,
|
||||
plugin_startup_tasks: crate::PluginStartupTasks::Start,
|
||||
core_api_options: codex_core::CoreApiOptions::default(),
|
||||
}));
|
||||
let mut thread_created_rx = processor.thread_created_receiver();
|
||||
let session = Arc::new(ConnectionSessionState::new(ConnectionOrigin::InProcess));
|
||||
|
||||
@@ -4,10 +4,10 @@ use codex_arg0::Arg0DispatchPaths;
|
||||
use codex_config::ConfigLayerStackOrdering;
|
||||
use codex_config::LoaderOverrides;
|
||||
use codex_config::NoopThreadConfigLoader;
|
||||
use codex_config::RemoteThreadConfigLoader;
|
||||
use codex_config::ThreadConfigLoader;
|
||||
use codex_core::CoreApiOptions;
|
||||
use codex_core::config::Config;
|
||||
use codex_core::resolve_installation_id;
|
||||
use codex_core::thread_config_loader_from_config_with_options;
|
||||
use codex_exec_server::EnvironmentManagerArgs;
|
||||
use codex_features::Feature;
|
||||
use codex_login::AuthManager;
|
||||
@@ -115,13 +115,6 @@ enum LogFormat {
|
||||
|
||||
type StderrLogLayer = Box<dyn Layer<Registry> + Send + Sync + 'static>;
|
||||
|
||||
fn configured_thread_config_loader(config: &Config) -> Arc<dyn ThreadConfigLoader> {
|
||||
match config.experimental_thread_config_endpoint.as_deref() {
|
||||
Some(endpoint) => Arc::new(RemoteThreadConfigLoader::new(endpoint)),
|
||||
None => Arc::new(NoopThreadConfigLoader),
|
||||
}
|
||||
}
|
||||
|
||||
/// Control-plane messages from the processor/transport side to the outbound router task.
|
||||
///
|
||||
/// `run_main_with_transport` now uses two loops/tasks:
|
||||
@@ -373,15 +366,17 @@ pub enum PluginStartupTasks {
|
||||
Skip,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub struct AppServerRuntimeOptions {
|
||||
pub plugin_startup_tasks: PluginStartupTasks,
|
||||
pub core_api_options: CoreApiOptions,
|
||||
}
|
||||
|
||||
impl Default for AppServerRuntimeOptions {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
plugin_startup_tasks: PluginStartupTasks::Start,
|
||||
core_api_options: CoreApiOptions::default(),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -456,7 +451,10 @@ pub async fn run_main_with_transport_options(
|
||||
.await
|
||||
{
|
||||
Ok(config) => {
|
||||
let discovered_thread_config_loader = configured_thread_config_loader(&config);
|
||||
let discovered_thread_config_loader = thread_config_loader_from_config_with_options(
|
||||
&config,
|
||||
&runtime_options.core_api_options,
|
||||
);
|
||||
config_manager
|
||||
.replace_thread_config_loader(Arc::clone(&discovered_thread_config_loader));
|
||||
let auth_manager =
|
||||
@@ -780,6 +778,7 @@ pub async fn run_main_with_transport_options(
|
||||
rpc_transport: analytics_rpc_transport(&transport),
|
||||
remote_control_handle: Some(remote_control_handle.clone()),
|
||||
plugin_startup_tasks: runtime_options.plugin_startup_tasks,
|
||||
core_api_options: runtime_options.core_api_options.clone(),
|
||||
}));
|
||||
let mut thread_created_rx = processor.thread_created_receiver();
|
||||
let mut running_turn_count_rx = processor.subscribe_running_assistant_turn_count();
|
||||
|
||||
@@ -60,9 +60,10 @@ use codex_app_server_protocol::ServerRequestPayload;
|
||||
use codex_app_server_protocol::experimental_required_message;
|
||||
use codex_arg0::Arg0DispatchPaths;
|
||||
use codex_chatgpt::workspace_settings;
|
||||
use codex_core::CoreApiOptions;
|
||||
use codex_core::ThreadManager;
|
||||
use codex_core::config::Config;
|
||||
use codex_core::thread_store_from_config;
|
||||
use codex_core::thread_store_from_config_with_options;
|
||||
use codex_exec_server::EnvironmentManager;
|
||||
use codex_feedback::CodexFeedback;
|
||||
use codex_login::AuthManager;
|
||||
@@ -262,6 +263,7 @@ pub(crate) struct MessageProcessorArgs {
|
||||
pub(crate) rpc_transport: AppServerRpcTransport,
|
||||
pub(crate) remote_control_handle: Option<RemoteControlHandle>,
|
||||
pub(crate) plugin_startup_tasks: crate::PluginStartupTasks,
|
||||
pub(crate) core_api_options: CoreApiOptions,
|
||||
}
|
||||
|
||||
impl MessageProcessor {
|
||||
@@ -285,6 +287,7 @@ impl MessageProcessor {
|
||||
rpc_transport,
|
||||
remote_control_handle,
|
||||
plugin_startup_tasks,
|
||||
core_api_options,
|
||||
} = args;
|
||||
auth_manager.set_external_auth(Arc::new(ExternalAuthRefreshBridge {
|
||||
outgoing: outgoing.clone(),
|
||||
@@ -292,7 +295,11 @@ impl MessageProcessor {
|
||||
// The thread store is intentionally process-scoped. Config reloads can
|
||||
// affect per-thread behavior, but they must not move newly started,
|
||||
// resumed, or forked threads to a different persistence backend/root.
|
||||
let thread_store = thread_store_from_config(config.as_ref(), state_db.clone());
|
||||
let thread_store = thread_store_from_config_with_options(
|
||||
config.as_ref(),
|
||||
state_db.clone(),
|
||||
&core_api_options,
|
||||
);
|
||||
let thread_manager = Arc::new(ThreadManager::new(
|
||||
config.as_ref(),
|
||||
auth_manager.clone(),
|
||||
|
||||
@@ -298,6 +298,7 @@ async fn build_test_processor(
|
||||
rpc_transport: AppServerRpcTransport::Stdio,
|
||||
remote_control_handle: None,
|
||||
plugin_startup_tasks: crate::PluginStartupTasks::Start,
|
||||
core_api_options: codex_core::CoreApiOptions::default(),
|
||||
}));
|
||||
(processor, outgoing_rx)
|
||||
}
|
||||
|
||||
@@ -31,6 +31,8 @@ pub use cloud_requirements::CloudRequirementsLoadError;
|
||||
pub use cloud_requirements::CloudRequirementsLoadErrorCode;
|
||||
pub use cloud_requirements::CloudRequirementsLoader;
|
||||
pub use codex_app_server_protocol::ConfigLayerSource;
|
||||
pub use codex_protocol::CODEX_CORE_IDENTITY_HEADER;
|
||||
pub use codex_protocol::OpaqueIdentity;
|
||||
pub use codex_utils_absolute_path::AbsolutePathBuf;
|
||||
pub use config_requirements::AppRequirementToml;
|
||||
pub use config_requirements::AppsRequirementsToml;
|
||||
|
||||
@@ -6,6 +6,8 @@ use std::time::Duration;
|
||||
use async_trait::async_trait;
|
||||
use codex_model_provider_info::ModelProviderInfo;
|
||||
use codex_model_provider_info::WireApi;
|
||||
use codex_protocol::CODEX_CORE_IDENTITY_HEADER;
|
||||
use codex_protocol::OpaqueIdentity;
|
||||
use codex_protocol::config_types::ModelProviderAuthInfo;
|
||||
use codex_utils_absolute_path::AbsolutePathBuf;
|
||||
|
||||
@@ -17,29 +19,75 @@ use super::ThreadConfigLoader;
|
||||
use super::ThreadConfigSource;
|
||||
use super::UserThreadConfig;
|
||||
use proto::thread_config_loader_client::ThreadConfigLoaderClient;
|
||||
use tonic::codegen::InterceptedService;
|
||||
use tonic::metadata::BinaryMetadataValue;
|
||||
use tonic::service::Interceptor;
|
||||
use tonic::transport::Channel;
|
||||
use tonic::transport::Endpoint;
|
||||
|
||||
#[path = "proto/codex.thread_config.v1.rs"]
|
||||
mod proto;
|
||||
|
||||
const REMOTE_THREAD_CONFIG_LOAD_TIMEOUT: Duration = Duration::from_secs(5);
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
struct IdentityInterceptor {
|
||||
identity: Option<OpaqueIdentity>,
|
||||
}
|
||||
|
||||
impl Interceptor for IdentityInterceptor {
|
||||
fn call(
|
||||
&mut self,
|
||||
mut request: tonic::Request<()>,
|
||||
) -> Result<tonic::Request<()>, tonic::Status> {
|
||||
if let Some(identity) = &self.identity {
|
||||
request.metadata_mut().insert_bin(
|
||||
CODEX_CORE_IDENTITY_HEADER,
|
||||
BinaryMetadataValue::from_bytes(identity.as_bytes()),
|
||||
);
|
||||
}
|
||||
Ok(request)
|
||||
}
|
||||
}
|
||||
|
||||
type RemoteThreadConfigLoaderClient =
|
||||
ThreadConfigLoaderClient<InterceptedService<Channel, IdentityInterceptor>>;
|
||||
|
||||
/// gRPC-backed [`ThreadConfigLoader`] implementation.
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct RemoteThreadConfigLoader {
|
||||
endpoint: String,
|
||||
identity: Option<OpaqueIdentity>,
|
||||
}
|
||||
|
||||
impl RemoteThreadConfigLoader {
|
||||
pub fn new(endpoint: impl Into<String>) -> Self {
|
||||
Self {
|
||||
endpoint: endpoint.into(),
|
||||
identity: None,
|
||||
}
|
||||
}
|
||||
|
||||
async fn client(
|
||||
&self,
|
||||
) -> Result<ThreadConfigLoaderClient<tonic::transport::Channel>, ThreadConfigLoadError> {
|
||||
ThreadConfigLoaderClient::connect(self.endpoint.clone())
|
||||
pub fn new_with_identity(
|
||||
endpoint: impl Into<String>,
|
||||
identity: Option<OpaqueIdentity>,
|
||||
) -> Self {
|
||||
Self {
|
||||
endpoint: endpoint.into(),
|
||||
identity,
|
||||
}
|
||||
}
|
||||
|
||||
async fn client(&self) -> Result<RemoteThreadConfigLoaderClient, ThreadConfigLoadError> {
|
||||
let channel = Endpoint::new(self.endpoint.clone())
|
||||
.map_err(|err| {
|
||||
ThreadConfigLoadError::new(
|
||||
ThreadConfigLoadErrorCode::RequestFailed,
|
||||
/*status_code*/ None,
|
||||
format!("invalid remote thread config loader endpoint: {err}"),
|
||||
)
|
||||
})?
|
||||
.connect()
|
||||
.await
|
||||
.map_err(|err| {
|
||||
ThreadConfigLoadError::new(
|
||||
@@ -47,7 +95,13 @@ impl RemoteThreadConfigLoader {
|
||||
/*status_code*/ None,
|
||||
format!("failed to connect to remote thread config loader: {err}"),
|
||||
)
|
||||
})
|
||||
})?;
|
||||
Ok(ThreadConfigLoaderClient::with_interceptor(
|
||||
channel,
|
||||
IdentityInterceptor {
|
||||
identity: self.identity.clone(),
|
||||
},
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -299,9 +353,13 @@ mod tests {
|
||||
use std::collections::BTreeMap;
|
||||
use std::collections::HashMap;
|
||||
use std::num::NonZeroU64;
|
||||
use std::sync::Arc;
|
||||
use std::sync::Mutex;
|
||||
|
||||
use codex_model_provider_info::ModelProviderInfo;
|
||||
use codex_model_provider_info::WireApi;
|
||||
use codex_protocol::CODEX_CORE_IDENTITY_HEADER;
|
||||
use codex_protocol::OpaqueIdentity;
|
||||
use codex_protocol::config_types::ModelProviderAuthInfo;
|
||||
use codex_utils_absolute_path::AbsolutePathBuf;
|
||||
use pretty_assertions::assert_eq;
|
||||
@@ -319,6 +377,7 @@ mod tests {
|
||||
struct TestServer {
|
||||
sources: Vec<proto::ThreadConfigSource>,
|
||||
expected_cwd: String,
|
||||
captured_identity: Option<Arc<Mutex<Option<Vec<u8>>>>>,
|
||||
}
|
||||
|
||||
#[tonic::async_trait]
|
||||
@@ -327,6 +386,16 @@ mod tests {
|
||||
&self,
|
||||
request: Request<proto::LoadThreadConfigRequest>,
|
||||
) -> Result<Response<proto::LoadThreadConfigResponse>, Status> {
|
||||
if let Some(captured_identity) = &self.captured_identity {
|
||||
let identity = request
|
||||
.metadata()
|
||||
.get_bin(CODEX_CORE_IDENTITY_HEADER)
|
||||
.and_then(|value| value.to_bytes().ok())
|
||||
.map(|value| value.to_vec());
|
||||
*captured_identity
|
||||
.lock()
|
||||
.expect("captured identity mutex poisoned") = identity;
|
||||
}
|
||||
assert_eq!(
|
||||
request.into_inner(),
|
||||
proto::LoadThreadConfigRequest {
|
||||
@@ -355,6 +424,7 @@ mod tests {
|
||||
.add_service(ThreadConfigLoaderServer::new(TestServer {
|
||||
sources: proto_sources(),
|
||||
expected_cwd,
|
||||
captured_identity: None,
|
||||
}))
|
||||
.serve_with_incoming_shutdown(
|
||||
tokio_stream::wrappers::TcpListenerStream::new(listener),
|
||||
@@ -379,6 +449,57 @@ mod tests {
|
||||
assert_eq!(loaded.expect("load thread config"), expected_sources());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn load_thread_config_forwards_identity_as_metadata() {
|
||||
let cwd = workspace_dir().join("project");
|
||||
let expected_cwd = cwd.to_string_lossy().into_owned();
|
||||
let captured_identity = Arc::new(Mutex::new(None));
|
||||
let listener = tokio::net::TcpListener::bind("127.0.0.1:0")
|
||||
.await
|
||||
.expect("bind test server");
|
||||
let addr = listener.local_addr().expect("test server addr");
|
||||
let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
|
||||
let server_identity = captured_identity.clone();
|
||||
let server = tokio::spawn(async move {
|
||||
Server::builder()
|
||||
.add_service(ThreadConfigLoaderServer::new(TestServer {
|
||||
sources: proto_sources(),
|
||||
expected_cwd,
|
||||
captured_identity: Some(server_identity),
|
||||
}))
|
||||
.serve_with_incoming_shutdown(
|
||||
tokio_stream::wrappers::TcpListenerStream::new(listener),
|
||||
async {
|
||||
let _ = shutdown_rx.await;
|
||||
},
|
||||
)
|
||||
.await
|
||||
});
|
||||
|
||||
let loader = RemoteThreadConfigLoader::new_with_identity(
|
||||
format!("http://{addr}"),
|
||||
Some(OpaqueIdentity::from_bytes(b"tenant-key-\x00\xff".to_vec())),
|
||||
);
|
||||
let loaded = loader
|
||||
.load(ThreadConfigContext {
|
||||
thread_id: Some("thread-1".to_string()),
|
||||
cwd: Some(cwd),
|
||||
})
|
||||
.await;
|
||||
|
||||
let _ = shutdown_tx.send(());
|
||||
server.await.expect("join server").expect("server");
|
||||
|
||||
assert_eq!(loaded.expect("load thread config"), expected_sources());
|
||||
assert_eq!(
|
||||
captured_identity
|
||||
.lock()
|
||||
.expect("captured identity mutex poisoned")
|
||||
.as_deref(),
|
||||
Some(&b"tenant-key-\x00\xff"[..])
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn load_thread_config_request_sets_timeout() {
|
||||
let request = load_thread_config_request(ThreadConfigContext::default());
|
||||
|
||||
@@ -8,6 +8,9 @@ pub use codex_app_server_protocol::item_event_to_server_notification;
|
||||
pub use codex_arg0::Arg0DispatchPaths;
|
||||
pub use codex_arg0::arg0_dispatch_or_else;
|
||||
pub use codex_config::ConfigLayerStack;
|
||||
pub use codex_config::NoopThreadConfigLoader;
|
||||
pub use codex_config::RemoteThreadConfigLoader;
|
||||
pub use codex_config::ThreadConfigLoader;
|
||||
pub use codex_config::config_toml::ProjectConfig;
|
||||
pub use codex_config::config_toml::RealtimeAudioConfig;
|
||||
pub use codex_config::config_toml::RealtimeConfig;
|
||||
@@ -24,6 +27,7 @@ pub use codex_config::types::TuiKeymap;
|
||||
pub use codex_config::types::TuiNotificationSettings;
|
||||
pub use codex_config::types::UriBasedFileOpener;
|
||||
pub use codex_core::CodexThread;
|
||||
pub use codex_core::CoreApiOptions;
|
||||
pub use codex_core::ForkSnapshot;
|
||||
pub use codex_core::McpManager;
|
||||
pub use codex_core::NewThread;
|
||||
@@ -42,7 +46,10 @@ pub use codex_core::config::find_codex_home;
|
||||
pub use codex_core::init_state_db;
|
||||
pub use codex_core::resolve_installation_id;
|
||||
pub use codex_core::skills::SkillsManager;
|
||||
pub use codex_core::thread_config_loader_from_config;
|
||||
pub use codex_core::thread_config_loader_from_config_with_options;
|
||||
pub use codex_core::thread_store_from_config;
|
||||
pub use codex_core::thread_store_from_config_with_options;
|
||||
pub use codex_exec_server::EnvironmentManager;
|
||||
pub use codex_exec_server::EnvironmentManagerArgs;
|
||||
pub use codex_exec_server::ExecServerRuntimePaths;
|
||||
@@ -54,6 +61,8 @@ pub use codex_model_provider_info::OPENAI_PROVIDER_ID;
|
||||
pub use codex_model_provider_info::built_in_model_providers;
|
||||
pub use codex_models_manager::manager::RefreshStrategy;
|
||||
pub use codex_models_manager::manager::SharedModelsManager;
|
||||
pub use codex_protocol::CODEX_CORE_IDENTITY_HEADER;
|
||||
pub use codex_protocol::OpaqueIdentity;
|
||||
pub use codex_protocol::ThreadId;
|
||||
pub use codex_protocol::config_types::AltScreenMode;
|
||||
pub use codex_protocol::config_types::ApprovalsReviewer;
|
||||
|
||||
@@ -112,13 +112,17 @@ pub mod review_prompts;
|
||||
mod thread_manager;
|
||||
pub(crate) mod web_search;
|
||||
pub(crate) mod windows_sandbox_read_grants;
|
||||
pub use thread_manager::CoreApiOptions;
|
||||
pub use thread_manager::ForkSnapshot;
|
||||
pub use thread_manager::NewThread;
|
||||
pub use thread_manager::StartThreadOptions;
|
||||
pub use thread_manager::ThreadManager;
|
||||
pub use thread_manager::ThreadShutdownReport;
|
||||
pub use thread_manager::build_models_manager;
|
||||
pub use thread_manager::thread_config_loader_from_config;
|
||||
pub use thread_manager::thread_config_loader_from_config_with_options;
|
||||
pub use thread_manager::thread_store_from_config;
|
||||
pub use thread_manager::thread_store_from_config_with_options;
|
||||
pub use web_search::web_search_action_detail;
|
||||
pub use web_search::web_search_detail;
|
||||
pub use windows_sandbox_read_grants::grant_read_root_non_elevated;
|
||||
|
||||
@@ -21,6 +21,9 @@ use crate::tasks::interrupted_turn_history_marker;
|
||||
use codex_analytics::AnalyticsEventsClient;
|
||||
use codex_app_server_protocol::ThreadHistoryBuilder;
|
||||
use codex_app_server_protocol::TurnStatus;
|
||||
use codex_config::NoopThreadConfigLoader;
|
||||
use codex_config::RemoteThreadConfigLoader;
|
||||
use codex_config::ThreadConfigLoader;
|
||||
use codex_core_plugins::PluginsManager;
|
||||
use codex_exec_server::EnvironmentManager;
|
||||
use codex_login::AuthManager;
|
||||
@@ -30,6 +33,7 @@ use codex_model_provider_info::ModelProviderInfo;
|
||||
use codex_model_provider_info::OPENAI_PROVIDER_ID;
|
||||
use codex_models_manager::manager::RefreshStrategy;
|
||||
use codex_models_manager::manager::SharedModelsManager;
|
||||
use codex_protocol::OpaqueIdentity;
|
||||
use codex_protocol::ThreadId;
|
||||
use codex_protocol::config_types::CollaborationModeMask;
|
||||
use codex_protocol::error::CodexErr;
|
||||
@@ -225,6 +229,12 @@ pub struct StartThreadOptions {
|
||||
pub environments: Vec<TurnEnvironmentSelection>,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Default, PartialEq, Eq)]
|
||||
pub struct CoreApiOptions {
|
||||
/// Opaque caller identity forwarded to remote core contract implementations.
|
||||
pub opaque_identity: Option<OpaqueIdentity>,
|
||||
}
|
||||
|
||||
pub(crate) struct ResumeThreadWithHistoryOptions {
|
||||
pub(crate) config: Config,
|
||||
pub(crate) initial_history: InitialHistory,
|
||||
@@ -270,17 +280,45 @@ pub fn build_models_manager(
|
||||
pub fn thread_store_from_config(
|
||||
config: &Config,
|
||||
state_db: Option<StateDbHandle>,
|
||||
) -> Arc<dyn ThreadStore> {
|
||||
thread_store_from_config_with_options(config, state_db, &CoreApiOptions::default())
|
||||
}
|
||||
|
||||
pub fn thread_store_from_config_with_options(
|
||||
config: &Config,
|
||||
state_db: Option<StateDbHandle>,
|
||||
options: &CoreApiOptions,
|
||||
) -> Arc<dyn ThreadStore> {
|
||||
match &config.experimental_thread_store {
|
||||
ThreadStoreConfig::Local => Arc::new(LocalThreadStore::new(
|
||||
LocalThreadStoreConfig::from_config(config),
|
||||
state_db,
|
||||
)),
|
||||
ThreadStoreConfig::Remote { endpoint } => Arc::new(RemoteThreadStore::new(endpoint)),
|
||||
ThreadStoreConfig::Remote { endpoint } => Arc::new(RemoteThreadStore::new_with_identity(
|
||||
endpoint,
|
||||
options.opaque_identity.clone(),
|
||||
)),
|
||||
ThreadStoreConfig::InMemory { id } => InMemoryThreadStore::for_id(id),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn thread_config_loader_from_config(config: &Config) -> Arc<dyn ThreadConfigLoader> {
|
||||
thread_config_loader_from_config_with_options(config, &CoreApiOptions::default())
|
||||
}
|
||||
|
||||
pub fn thread_config_loader_from_config_with_options(
|
||||
config: &Config,
|
||||
options: &CoreApiOptions,
|
||||
) -> Arc<dyn ThreadConfigLoader> {
|
||||
match config.experimental_thread_config_endpoint.as_deref() {
|
||||
Some(endpoint) => Arc::new(RemoteThreadConfigLoader::new_with_identity(
|
||||
endpoint,
|
||||
options.opaque_identity.clone(),
|
||||
)),
|
||||
None => Arc::new(NoopThreadConfigLoader),
|
||||
}
|
||||
}
|
||||
|
||||
impl ThreadManager {
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub fn new(
|
||||
|
||||
@@ -1,10 +1,13 @@
|
||||
pub mod account;
|
||||
mod agent_path;
|
||||
pub mod auth;
|
||||
mod opaque_identity;
|
||||
mod session_id;
|
||||
mod thread_id;
|
||||
mod tool_name;
|
||||
pub use agent_path::AgentPath;
|
||||
pub use opaque_identity::CODEX_CORE_IDENTITY_HEADER;
|
||||
pub use opaque_identity::OpaqueIdentity;
|
||||
pub use session_id::SessionId;
|
||||
pub use thread_id::ThreadId;
|
||||
pub use tool_name::ToolName;
|
||||
|
||||
71
codex-rs/protocol/src/opaque_identity.rs
Normal file
71
codex-rs/protocol/src/opaque_identity.rs
Normal file
@@ -0,0 +1,71 @@
|
||||
use std::ffi::OsString;
|
||||
|
||||
/// Binary gRPC metadata key used to forward the caller's opaque identity to
|
||||
/// remote core contract implementations.
|
||||
pub const CODEX_CORE_IDENTITY_HEADER: &str = "x-codex-core-identity-bin";
|
||||
|
||||
/// Opaque identity supplied by a Codex core API caller.
|
||||
///
|
||||
/// Codex core treats this as uninterpreted bytes and only forwards it to remote
|
||||
/// contract implementations that need to perform their own authorization.
|
||||
#[derive(Clone, Debug, Eq, PartialEq)]
|
||||
pub struct OpaqueIdentity {
|
||||
bytes: Vec<u8>,
|
||||
}
|
||||
|
||||
impl OpaqueIdentity {
|
||||
pub fn from_bytes(bytes: impl Into<Vec<u8>>) -> Self {
|
||||
Self {
|
||||
bytes: bytes.into(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn from_os_string(value: OsString) -> Self {
|
||||
Self::from_bytes(os_string_to_bytes(value))
|
||||
}
|
||||
|
||||
pub fn as_bytes(&self) -> &[u8] {
|
||||
&self.bytes
|
||||
}
|
||||
|
||||
pub fn into_bytes(self) -> Vec<u8> {
|
||||
self.bytes
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
fn os_string_to_bytes(value: OsString) -> Vec<u8> {
|
||||
use std::os::unix::ffi::OsStrExt;
|
||||
|
||||
value.as_os_str().as_bytes().to_vec()
|
||||
}
|
||||
|
||||
#[cfg(not(unix))]
|
||||
fn os_string_to_bytes(value: OsString) -> Vec<u8> {
|
||||
value.to_string_lossy().into_owned().into_bytes()
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::OpaqueIdentity;
|
||||
use pretty_assertions::assert_eq;
|
||||
|
||||
#[test]
|
||||
fn opaque_identity_preserves_bytes() {
|
||||
let identity = OpaqueIdentity::from_bytes(b"tenant-key-\x00\xff".to_vec());
|
||||
|
||||
assert_eq!(identity.as_bytes(), &b"tenant-key-\x00\xff"[..]);
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
#[test]
|
||||
fn opaque_identity_preserves_unix_argv_bytes() {
|
||||
use std::ffi::OsString;
|
||||
use std::os::unix::ffi::OsStringExt;
|
||||
|
||||
let identity =
|
||||
OpaqueIdentity::from_os_string(OsString::from_vec(b"tenant-key-\xff".to_vec()));
|
||||
|
||||
assert_eq!(identity.as_bytes(), &b"tenant-key-\xff"[..]);
|
||||
}
|
||||
}
|
||||
@@ -12,6 +12,8 @@ mod remote;
|
||||
mod store;
|
||||
mod types;
|
||||
|
||||
pub use codex_protocol::CODEX_CORE_IDENTITY_HEADER;
|
||||
pub use codex_protocol::OpaqueIdentity;
|
||||
pub use error::ThreadStoreError;
|
||||
pub use error::ThreadStoreResult;
|
||||
pub use in_memory::InMemoryThreadStore;
|
||||
|
||||
@@ -66,7 +66,11 @@ pub(super) async fn list_threads(
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
use std::sync::Mutex;
|
||||
|
||||
use codex_protocol::CODEX_CORE_IDENTITY_HEADER;
|
||||
use codex_protocol::OpaqueIdentity;
|
||||
use codex_protocol::openai_models::ReasoningEffort;
|
||||
use codex_protocol::protocol::SessionSource;
|
||||
use pretty_assertions::assert_eq;
|
||||
@@ -83,7 +87,9 @@ mod tests {
|
||||
use crate::ThreadStore;
|
||||
|
||||
#[derive(Default)]
|
||||
struct TestServer;
|
||||
struct TestServer {
|
||||
captured_identity: Option<Arc<Mutex<Option<Vec<u8>>>>>,
|
||||
}
|
||||
|
||||
#[tonic::async_trait]
|
||||
impl thread_store_server::ThreadStore for TestServer {
|
||||
@@ -91,6 +97,16 @@ mod tests {
|
||||
&self,
|
||||
request: Request<proto::ListThreadsRequest>,
|
||||
) -> Result<Response<proto::ListThreadsResponse>, Status> {
|
||||
if let Some(captured_identity) = &self.captured_identity {
|
||||
let identity = request
|
||||
.metadata()
|
||||
.get_bin(CODEX_CORE_IDENTITY_HEADER)
|
||||
.and_then(|value| value.to_bytes().ok())
|
||||
.map(|value| value.to_vec());
|
||||
*captured_identity
|
||||
.lock()
|
||||
.expect("captured identity mutex poisoned") = identity;
|
||||
}
|
||||
let request = request.into_inner();
|
||||
assert_eq!(request.page_size, 2);
|
||||
assert_eq!(request.cursor.as_deref(), Some("cursor-1"));
|
||||
@@ -171,7 +187,7 @@ mod tests {
|
||||
let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
|
||||
let server = tokio::spawn(async move {
|
||||
Server::builder()
|
||||
.add_service(ThreadStoreServer::new(TestServer))
|
||||
.add_service(ThreadStoreServer::new(TestServer::default()))
|
||||
.serve_with_incoming_shutdown(
|
||||
tokio_stream::wrappers::TcpListenerStream::new(listener),
|
||||
async {
|
||||
@@ -226,6 +242,61 @@ mod tests {
|
||||
server.await.expect("join server").expect("server");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn list_threads_forwards_identity_as_metadata() {
|
||||
let captured_identity = Arc::new(Mutex::new(None));
|
||||
let listener = tokio::net::TcpListener::bind("127.0.0.1:0")
|
||||
.await
|
||||
.expect("bind test server");
|
||||
let addr = listener.local_addr().expect("test server addr");
|
||||
let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
|
||||
let server_identity = captured_identity.clone();
|
||||
let server = tokio::spawn(async move {
|
||||
Server::builder()
|
||||
.add_service(ThreadStoreServer::new(TestServer {
|
||||
captured_identity: Some(server_identity),
|
||||
}))
|
||||
.serve_with_incoming_shutdown(
|
||||
tokio_stream::wrappers::TcpListenerStream::new(listener),
|
||||
async {
|
||||
let _ = shutdown_rx.await;
|
||||
},
|
||||
)
|
||||
.await
|
||||
});
|
||||
|
||||
let store = RemoteThreadStore::new_with_identity(
|
||||
format!("http://{addr}"),
|
||||
Some(OpaqueIdentity::from_bytes(b"tenant-key-\x00\xff".to_vec())),
|
||||
);
|
||||
store
|
||||
.list_threads(ListThreadsParams {
|
||||
page_size: 2,
|
||||
cursor: Some("cursor-1".to_string()),
|
||||
sort_key: ThreadSortKey::UpdatedAt,
|
||||
sort_direction: crate::SortDirection::Desc,
|
||||
allowed_sources: vec![SessionSource::Cli],
|
||||
model_providers: Some(vec!["openai".to_string()]),
|
||||
cwd_filters: Some(vec![PathBuf::from("/workspace")]),
|
||||
archived: true,
|
||||
search_term: Some("needle".to_string()),
|
||||
use_state_db_only: true,
|
||||
})
|
||||
.await
|
||||
.expect("list threads");
|
||||
|
||||
assert_eq!(
|
||||
captured_identity
|
||||
.lock()
|
||||
.expect("captured identity mutex poisoned")
|
||||
.as_deref(),
|
||||
Some(&b"tenant-key-\x00\xff"[..])
|
||||
);
|
||||
|
||||
let _ = shutdown_tx.send(());
|
||||
server.await.expect("join server").expect("server");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn stored_thread_proto_roundtrips_through_domain_type() {
|
||||
let thread = proto::StoredThread {
|
||||
|
||||
@@ -2,6 +2,8 @@ mod helpers;
|
||||
mod list_threads;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use codex_protocol::CODEX_CORE_IDENTITY_HEADER;
|
||||
use codex_protocol::OpaqueIdentity;
|
||||
use codex_protocol::ThreadId;
|
||||
|
||||
use crate::AppendThreadItemsParams;
|
||||
@@ -20,10 +22,37 @@ use crate::ThreadStoreError;
|
||||
use crate::ThreadStoreResult;
|
||||
use crate::UpdateThreadMetadataParams;
|
||||
use proto::thread_store_client::ThreadStoreClient;
|
||||
use tonic::codegen::InterceptedService;
|
||||
use tonic::metadata::BinaryMetadataValue;
|
||||
use tonic::service::Interceptor;
|
||||
use tonic::transport::Channel;
|
||||
use tonic::transport::Endpoint;
|
||||
|
||||
#[path = "proto/codex.thread_store.v1.rs"]
|
||||
mod proto;
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
struct IdentityInterceptor {
|
||||
identity: Option<OpaqueIdentity>,
|
||||
}
|
||||
|
||||
impl Interceptor for IdentityInterceptor {
|
||||
fn call(
|
||||
&mut self,
|
||||
mut request: tonic::Request<()>,
|
||||
) -> Result<tonic::Request<()>, tonic::Status> {
|
||||
if let Some(identity) = &self.identity {
|
||||
request.metadata_mut().insert_bin(
|
||||
CODEX_CORE_IDENTITY_HEADER,
|
||||
BinaryMetadataValue::from_bytes(identity.as_bytes()),
|
||||
);
|
||||
}
|
||||
Ok(request)
|
||||
}
|
||||
}
|
||||
|
||||
type RemoteThreadStoreClient = ThreadStoreClient<InterceptedService<Channel, IdentityInterceptor>>;
|
||||
|
||||
/// gRPC-backed [`ThreadStore`] implementation for deployments whose durable thread data lives
|
||||
/// outside the app-server process.
|
||||
///
|
||||
@@ -33,21 +62,43 @@ mod proto;
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct RemoteThreadStore {
|
||||
endpoint: String,
|
||||
identity: Option<OpaqueIdentity>,
|
||||
}
|
||||
|
||||
impl RemoteThreadStore {
|
||||
pub fn new(endpoint: impl Into<String>) -> Self {
|
||||
Self {
|
||||
endpoint: endpoint.into(),
|
||||
identity: None,
|
||||
}
|
||||
}
|
||||
|
||||
async fn client(&self) -> ThreadStoreResult<ThreadStoreClient<tonic::transport::Channel>> {
|
||||
ThreadStoreClient::connect(self.endpoint.clone())
|
||||
pub fn new_with_identity(
|
||||
endpoint: impl Into<String>,
|
||||
identity: Option<OpaqueIdentity>,
|
||||
) -> Self {
|
||||
Self {
|
||||
endpoint: endpoint.into(),
|
||||
identity,
|
||||
}
|
||||
}
|
||||
|
||||
async fn client(&self) -> ThreadStoreResult<RemoteThreadStoreClient> {
|
||||
let channel = Endpoint::new(self.endpoint.clone())
|
||||
.map_err(|err| ThreadStoreError::InvalidRequest {
|
||||
message: format!("invalid remote thread store endpoint: {err}"),
|
||||
})?
|
||||
.connect()
|
||||
.await
|
||||
.map_err(|err| ThreadStoreError::Internal {
|
||||
message: format!("failed to connect to remote thread store: {err}"),
|
||||
})
|
||||
})?;
|
||||
Ok(ThreadStoreClient::with_interceptor(
|
||||
channel,
|
||||
IdentityInterceptor {
|
||||
identity: self.identity.clone(),
|
||||
},
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user