mirror of
https://github.com/openai/codex.git
synced 2026-05-06 12:26:38 +00:00
Compare commits
1 Commits
pr20387
...
bryanashle
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b2d17c42db |
377
codex-rs/app-server/MULTITENANCY_AUDIT.md
Normal file
377
codex-rs/app-server/MULTITENANCY_AUDIT.md
Normal file
@@ -0,0 +1,377 @@
|
||||
# App-Server Multi-Tenancy Audit
|
||||
|
||||
Date: 2026-04-16
|
||||
Updated: 2026-04-22
|
||||
|
||||
Scope: `codex-rs/app-server`, the app-server protocol surface, and the core/login/state/thread-store components that app-server directly depends on.
|
||||
|
||||
## Executive Summary
|
||||
|
||||
The current app-server is multi-connection, but it is not multi-tenant. A single app-server process currently owns exactly one effective runtime tenant: one base `Config`, one `CODEX_HOME`, one SQLite home, one auth manager, one cloud-requirements loader, one thread manager, one plugin/skills/MCP/model manager graph, one feedback/log pipeline, one set of process-global HTTP client metadata, and one outbound broadcast space.
|
||||
|
||||
The safest first design for Codex Cloud is to make each connection or remote-control stream bind to one authenticated tenant and route all requests through a server-derived `TenantContext`. The tenant id used by the runtime registry should be the app-server `IdentityKey`: opaque bytes supplied by the trusted launcher or derived from transport/remote-control auth, compared and hashed by app-server but not interpreted. Do not let clients self-declare an identity key without binding that declaration to transport auth. The lower-risk persistence model is per-tenant root isolation: each tenant gets its own `codex_home`, `sqlite_home`, log directory, auth store, plugin store, memories, sessions, and local config files. A single shared state DB is possible, but would require tenant columns and query filters across thread metadata, spawn edges, jobs, memories, logs, remote-control enrollments, rollout indexes, and legacy filesystem session scanning.
|
||||
|
||||
## Current Tenant Boundary
|
||||
|
||||
There is no explicit tenant boundary today.
|
||||
|
||||
- `run_main_with_transport` constructs the process-wide `EnvironmentManager`, base config, cloud requirements, state DB, log DB, transport router, remote-control connection, and `MessageProcessor` once at process startup (`app-server/src/lib.rs:355-672`).
|
||||
- `MessageProcessor` owns one `CodexMessageProcessor`, one `ConfigApi`, one `ExternalAgentConfigApi`, one `FsApi`, one `AuthManager`, one `FsWatchManager`, one `Config`, and one config warning list (`app-server/src/message_processor.rs:163-176`).
|
||||
- `ConnectionSessionState` stores only initialization state, experimental API opt-in, notification opt-outs, and client name/version. It has no principal, tenant id, workspace/account id, or authorization scope (`app-server/src/message_processor.rs:178-189`).
|
||||
- Once initialized, request dispatch only checks initialized/experimental state, then forwards to handlers. There is no tenant authorization check at dispatch time (`app-server/src/message_processor.rs:721-772`).
|
||||
- Thread creation notifications auto-attach every initialized connection to every new thread in the process (`app-server/src/lib.rs:844-858`).
|
||||
|
||||
This means a shared app-server process would currently let any initialized connection list, load, resume, subscribe to, mutate, archive, or receive notifications for any thread known to the process.
|
||||
|
||||
## Required Architecture Changes
|
||||
|
||||
### 1. Add Server-Derived IdentityKey Tenant Identity
|
||||
|
||||
Use `IdentityKey` as the tenant id for app-server runtime isolation. `IdentityKey` is intentionally opaque bytes; app-server may clone, compare, hash, log a redacted digest, and encode it for storage paths, but must not parse it into account/workspace/user semantics. Authorization and storage-boundary meaning belong to the remote contract implementations and the trusted identity issuer.
|
||||
|
||||
Recommended connection model:
|
||||
|
||||
- WebSocket upgrade auth validates the caller and returns a principal containing an `IdentityKey`, not just `Ok(())`.
|
||||
- Remote-control streams derive the `IdentityKey` from the remote-control account/environment/server enrollment, not from the JSON-RPC payload.
|
||||
- Current single-process startup identity can seed the default tenant runtime for non-multiplexed deployments.
|
||||
- A connection is single-tenant for its lifetime.
|
||||
- All JSON-RPC request handling receives a `TenantContext` selected by `connection_id`.
|
||||
|
||||
Alternative model:
|
||||
|
||||
- A request envelope carries a tenant selector, but the server must verify it maps to the authenticated principal's `IdentityKey` on every request.
|
||||
- This is more flexible for multiplexed gateway connections, but expands the attack surface and requires more protocol churn.
|
||||
|
||||
The existing WebSocket auth only verifies capability tokens or signed bearer JWT validity and discards claims (`transport/auth.rs:273-304`). The listener state stores only `transport_event_tx` and `auth_policy`, then opens a raw connection id (`transport/websocket.rs:74-121`). That needs to become `authorize_upgrade -> AuthorizedPrincipal` and `TransportEvent::ConnectionOpened` should carry the principal.
|
||||
|
||||
### 2. Add a Tenant Runtime Registry
|
||||
|
||||
Add a process-wide tenant registry owned above `MessageProcessor`. The registry key is `IdentityKey`; there should not be a separate app-server-assigned tenant id unless it is just a local alias for the same opaque bytes.
|
||||
|
||||
```rust
|
||||
struct TenantRegistry {
|
||||
tenants: DashMap<IdentityKey, Arc<TenantRuntime>>,
|
||||
}
|
||||
|
||||
struct TenantRuntime {
|
||||
identity_key: IdentityKey,
|
||||
codex_home: AbsolutePathBuf,
|
||||
config_manager: TenantConfigManager,
|
||||
auth_manager: Arc<AuthManager>,
|
||||
cloud_requirements: Arc<RwLock<CloudRequirementsLoader>>,
|
||||
thread_manager: Arc<ThreadManager>,
|
||||
state_db: Option<Arc<StateRuntime>>,
|
||||
log_db: Option<LogDbLayer>,
|
||||
feedback: CodexFeedback,
|
||||
config_api: ConfigApi,
|
||||
external_agent_config_api: ExternalAgentConfigApi,
|
||||
fs_api: TenantFsApi,
|
||||
fs_watch_manager: FsWatchManager,
|
||||
command_exec_manager: CommandExecManager,
|
||||
}
|
||||
```
|
||||
|
||||
Key rule: handlers should not read process-global `self.config`, `self.auth_manager`, `self.thread_manager`, etc. They should receive a `TenantRuntime` resolved from the connection.
|
||||
|
||||
Tenant runtime lifecycle needs explicit policies:
|
||||
|
||||
- lazy-create on first authenticated connection;
|
||||
- ref-count or idle unload tenant runtimes;
|
||||
- bounded per-tenant resources: max loaded threads, command sessions, file watches, background tasks, memory jobs, MCP servers;
|
||||
- per-tenant shutdown/drain instead of only process-wide graceful drain.
|
||||
|
||||
### 3. Pick Persistence Isolation Strategy
|
||||
|
||||
Use per-tenant roots first unless there is a strong product need for cross-tenant SQL queries.
|
||||
|
||||
Per-tenant root isolation:
|
||||
|
||||
- `IdentityKey` maps to a deterministic root such as `$APP_SERVER_STATE/tenants/{encoded_identity_key}/codex_home`.
|
||||
- Do not place raw identity bytes directly in paths. Use a canonical path-safe encoding such as base64url without padding, or a SHA-256 digest if path names should not reveal even encoded key material.
|
||||
- Each tenant runtime builds config with `.codex_home(tenant_codex_home)`.
|
||||
- Each tenant gets separate `state_*.sqlite`, `logs_*.sqlite`, sessions, archived sessions, memory directories, plugin installs, MCP credentials, auth file/keyring key, and history.
|
||||
- This avoids broad SQL migrations because `StateRuntime::init` already scopes DBs under a provided root (`state/src/runtime.rs:77-150`).
|
||||
|
||||
Shared DB isolation:
|
||||
|
||||
- Add `tenant_id` to `threads`, `thread_spawn_edges`, `stage1_outputs`, `jobs`, `agent_jobs`, logs tables, remote-control enrollments, and all unique/index definitions. This column should store the canonical serialized `IdentityKey` bytes or path-safe encoding, not a separately invented account/workspace id.
|
||||
- Convert primary keys from `id` or `(kind, job_key)` to include `tenant_id`.
|
||||
- Update every query to filter by `tenant_id`. Examples today list and mutate by bare `threads.id` (`state/src/runtime/threads.rs:367-535`).
|
||||
- Audit legacy rollout filesystem scanning, session index files, and archive/unarchive paths.
|
||||
|
||||
The current `threads` schema uses `id TEXT PRIMARY KEY` and has no tenant dimension (`state/migrations/0001_threads.sql:1-24`). Memory reset deletes every memory row in the current DB and all memory jobs (`state/src/runtime/memories.rs:26-55`), so shared DB isolation would be high-risk until every delete/update is tenant-filtered.
|
||||
|
||||
## Subsystem Findings And Required Changes
|
||||
|
||||
### Protocol And Request Routing
|
||||
|
||||
Findings:
|
||||
|
||||
- `ClientRequest` contains only `id`, `method`, and `params`; no tenant context exists in the envelope (`app-server-protocol/src/protocol/common.rs:87-101`).
|
||||
- Thread APIs use bare `thread_id` strings (`v2.rs:2687-2735`, `v2.rs:3218-3245`).
|
||||
- Filesystem APIs accept absolute host paths directly (`v2.rs:2303-2325`).
|
||||
- Feedback accepts optional `thread_id` and arbitrary `extra_log_files` (`v2.rs:2283-2293`).
|
||||
|
||||
Required changes:
|
||||
|
||||
- Add a server-side `ConnectionTenantMap` and require every initialized request to resolve to one tenant before dispatch.
|
||||
- Keep public protocol unchanged for first pass if connections are single-tenant; document that `thread_id` is tenant-scoped.
|
||||
- If gateway multiplexing is required, add tenant metadata to a higher-level envelope or `initialize`, and reject mismatches against auth claims.
|
||||
- Add a thread ownership check before every method that takes `thread_id`.
|
||||
- Replace arbitrary path params in remote/cloud contexts with tenant workspace handles, thread-scoped roots, or capability-bound file handles.
|
||||
|
||||
### Transport And Remote Control
|
||||
|
||||
Findings:
|
||||
|
||||
- WebSocket auth returns only success/failure (`transport/auth.rs:273-304`).
|
||||
- WebSocket listener opens a connection id without attaching identity (`transport/websocket.rs:100-121`).
|
||||
- Remote control is started once with one `StateRuntime` and one `AuthManager` (`transport/remote_control/mod.rs:47-74`).
|
||||
- Remote-control websocket state stores one auth manager, one enrollment, and one client tracker (`transport/remote_control/websocket.rs:116-158`).
|
||||
- Remote-control clients are tracked by `(ClientId, StreamId)` to `ConnectionId`, with no tenant field (`transport/remote_control/client_tracker.rs:28-60`).
|
||||
- Remote-control enrollment is keyed by websocket URL, account id, and app-server client name (`transport/remote_control/websocket.rs:724-790`), but it still feeds into the single process tenant today.
|
||||
|
||||
Required changes:
|
||||
|
||||
- Make WebSocket auth produce `AuthorizedPrincipal { identity_key, account_id, scopes, auth_method }`.
|
||||
- Add principal to `TransportEvent::ConnectionOpened`.
|
||||
- Make `ClientTracker` store `IdentityKey`/principal per stream.
|
||||
- Remote-control enrollment state should belong to a tenant runtime or include the `IdentityKey` in shared storage.
|
||||
- Decide whether one remote-control connection can multiplex tenants. If yes, every remote-control envelope must include authenticated tenant routing metadata from the backend.
|
||||
|
||||
### Outbound Routing And Notifications
|
||||
|
||||
Findings:
|
||||
|
||||
- Outbound envelopes are either `ToConnection` or global `Broadcast`; there is no tenant-targeted broadcast (`outgoing_message.rs:84-94`).
|
||||
- Server-request callbacks are keyed only by `RequestId`, not `(connection_id, request_id)` or tenant (`outgoing_message.rs:111-133`).
|
||||
- Server requests can broadcast to all initialized connections (`outgoing_message.rs:278-338`).
|
||||
- Server notifications with no explicit connection list broadcast to every initialized connection (`outgoing_message.rs:507-547`).
|
||||
- The router fans out a broadcast to all initialized connections, subject only to notification opt-outs (`transport/mod.rs:352-390`).
|
||||
|
||||
Required changes:
|
||||
|
||||
- Replace `Broadcast` with `ToTenant { identity_key, message }` and, only where needed, `ToAllTenants` for administrative process notices.
|
||||
- Add `IdentityKey` to outbound connection state and filter broadcasts by tenant.
|
||||
- Key callbacks by `ServerRequestHandle { identity_key, connection_id, request_id }` or allocate globally unique unguessable IDs and verify the responding connection is an intended recipient.
|
||||
- Make `process_response` and `process_error` take `connection_id` and enforce callback ownership.
|
||||
- Audit every `send_server_notification(...)` call and make it tenant-scoped or thread-subscriber-scoped.
|
||||
|
||||
### Thread Manager And Thread State
|
||||
|
||||
Findings:
|
||||
|
||||
- `ThreadManagerState` has one `HashMap<ThreadId, Arc<CodexThread>>`, one auth manager, one models manager, one environment manager, one skills manager, one plugins manager, and one MCP manager (`core/src/thread_manager.rs:204-218`).
|
||||
- `ThreadManager::new` builds managers from one `codex_home` and one restriction product (`core/src/thread_manager.rs:220-272`).
|
||||
- `finalize_thread_spawn` inserts by bare `ThreadId` (`core/src/thread_manager.rs:956-985`).
|
||||
- App-server thread subscription state is keyed by bare `ThreadId` and `ConnectionId`, with no tenant ownership check (`app-server/src/thread_state.rs:187-348`).
|
||||
- `ThreadWatchManager` tracks one global running turn count and loaded statuses (`app-server/src/thread_status.rs:19-24`).
|
||||
|
||||
Required changes:
|
||||
|
||||
- Prefer one `ThreadManager` per `TenantRuntime`.
|
||||
- If using a single global manager, all maps must key by `(IdentityKey, ThreadId)` and every manager method must require tenant.
|
||||
- `ThreadStateManager`, `ThreadWatchManager`, pending unloads, pending server requests, and listener tasks must be per-tenant or tenant-keyed.
|
||||
- `thread_created` broadcasts must include tenant or stay inside a tenant manager.
|
||||
- Auto-attach on thread creation must attach only the creating connection and explicit same-tenant subscribers, not all initialized connections.
|
||||
- Thread IDs can remain UUIDs but must be treated as tenant-scoped. Do not rely on UUID unguessability as authorization.
|
||||
|
||||
### Config, Auth, And Cloud Requirements
|
||||
|
||||
Findings:
|
||||
|
||||
- `Config` contains tenant-sensitive roots and settings: `cwd`, auth store mode, MCP servers and OAuth settings, agent roles, memories, `codex_home`, `sqlite_home`, log dir, and more (`core/src/config/mod.rs:360-430`).
|
||||
- `ConfigApi` stores one `codex_home`, one CLI override set, one runtime feature map, one cloud-requirements loader, and reloads all loaded threads through one `UserConfigReloader` (`app-server/src/config_api.rs:73-155`, `app-server/src/config_api.rs:217-300`).
|
||||
- `ExternalAgentConfigApi` owns one migration service rooted at one `codex_home` (`app-server/src/external_agent_config_api.rs:18-28`).
|
||||
- `AuthManager` is explicitly a single source of truth for one `codex_home` auth snapshot (`login/src/auth/manager.rs:1151-1238`).
|
||||
- File auth is `$CODEX_HOME/auth.json`, and keyring identity is derived from canonical `codex_home` (`login/src/auth/storage.rs:57-145`).
|
||||
- `USER_AGENT_SUFFIX`, `ORIGINATOR`, and residency headers are process-global statics; comments explicitly say this assumes one MCP server per process (`login/src/auth/default_client.rs:18-98`).
|
||||
|
||||
Required changes:
|
||||
|
||||
- Move config/auth/cloud requirements into `TenantRuntime`.
|
||||
- `ConfigApi` writes must mutate only that tenant and reload only that tenant's loaded threads.
|
||||
- Runtime feature enablement must be per tenant, not process-wide.
|
||||
- Auth login/logout must operate on tenant auth only and notify tenant connections only.
|
||||
- External auth refresh must target the tenant's active connection(s), not broadcast globally.
|
||||
- Replace process-global default client metadata with per-request/per-tenant HTTP client metadata or a request context passed through OpenAI/ChatGPT clients.
|
||||
- Cloud requirements loader must be per tenant/account/workspace and refreshed on tenant auth changes only.
|
||||
|
||||
### Local Thread Store, Rollouts, And State DB
|
||||
|
||||
Findings:
|
||||
|
||||
- `LocalThreadStore` is built once from one `RolloutConfig` (`thread-store/src/local/mod.rs:18-42`).
|
||||
- `RolloutConfig` is just `codex_home`, `sqlite_home`, `cwd`, model provider, and memory flag (`rollout/src/config.rs:6-34`).
|
||||
- Thread listing walks/scans the one rollout root and/or state DB behind that config (`thread-store/src/local/list_threads.rs:15-64`).
|
||||
- `StateRuntime` opens one state DB and one logs DB under one root (`state/src/runtime.rs:77-150`).
|
||||
- Thread rows, indexes, and lookups are keyed by bare thread IDs and paths (`state/migrations/0001_threads.sql:1-24`, `state/src/runtime/threads.rs:367-535`).
|
||||
|
||||
Required changes:
|
||||
|
||||
- For per-tenant roots, create one `LocalThreadStore` per tenant.
|
||||
- Ensure thread resume/read/archive/unarchive/set-name/metadata-update always resolve paths inside the tenant root unless an explicit tenant-authorized path capability is provided.
|
||||
- Avoid accepting raw rollout `path` from clients in multi-tenant cloud mode, or validate it is inside the tenant's allowed storage/workspace.
|
||||
- For shared DB, migrate every schema/query and legacy backfill path to include tenant.
|
||||
|
||||
### Filesystem, Watches, Fuzzy Search, And Command Exec
|
||||
|
||||
Findings:
|
||||
|
||||
- `FsApi::default()` uses the default host environment filesystem and all operations pass `sandbox None` (`app-server/src/fs_api.rs:29-90`).
|
||||
- FS params use absolute paths and no thread/workspace/tenant handle (`app-server-protocol/src/protocol/v2.rs:2303-2325`).
|
||||
- `FsWatchManager` is process-global and keyed by `(connection_id, watch_id)` (`app-server/src/fs_watch.rs:71-130`).
|
||||
- `CommandExecManager` is process-global and sessions are keyed by `(connection_id, process_id)` (`app-server/src/command_exec.rs:47-125`).
|
||||
- Fuzzy file search maps cancellation tokens and session IDs globally, while params provide arbitrary roots.
|
||||
|
||||
Required changes:
|
||||
|
||||
- In cloud mode, do not expose absolute host path operations. Require a tenant workspace id, thread id, or server-issued filesystem capability.
|
||||
- Tenant FS operations should use that tenant's environment manager and sandbox policy.
|
||||
- Watch IDs, fuzzy-search session IDs, and cancellation tokens must be tenant+connection scoped.
|
||||
- Command sessions should be tenant+connection scoped and enforce tenant resource limits.
|
||||
- Command execution must use tenant workspace/sandbox/network policy, not only client-provided params.
|
||||
|
||||
### Login And Account APIs
|
||||
|
||||
Findings:
|
||||
|
||||
- `active_login` is one mutex for the entire process (`codex_message_processor.rs:686-721`).
|
||||
- API-key and ChatGPT login write to `self.config.codex_home` and reload one `AuthManager` (`codex_message_processor.rs:1181-1730`).
|
||||
- Account update and login-completed notifications broadcast globally.
|
||||
|
||||
Required changes:
|
||||
|
||||
- `active_login` must be per tenant.
|
||||
- Browser/device-code login callback state must include tenant and must not cancel another tenant's login.
|
||||
- `account/read`, `account/login`, `account/logout`, rate limits, and external token refresh must use tenant auth.
|
||||
- Account notifications must go only to the tenant.
|
||||
|
||||
### Thread APIs And Authorization
|
||||
|
||||
Findings:
|
||||
|
||||
- `load_thread` parses a bare thread id and calls `self.thread_manager.get_thread(thread_id)` (`codex_message_processor.rs:663-682`).
|
||||
- `thread/start` derives config from one codex home, can persist project trust into that home, starts through one thread manager, auto-attaches the creating connection, then broadcasts `thread/started` globally (`codex_message_processor.rs:2249-2717`).
|
||||
- `thread/list` reads the one local store and global statuses (`codex_message_processor.rs:3631-3713`).
|
||||
- `thread/loaded/list` returns every loaded thread id from the one thread manager (`codex_message_processor.rs:3715-3772`).
|
||||
- `thread/read`, `thread/resume`, `thread/fork`, `thread/setName`, `thread/metadata/update`, archive/unarchive use the one store/config and often locate rollouts under one `codex_home`.
|
||||
- Turn APIs (`turn/start`, `turn/steer`, `turn/interrupt`, MCP resource/tool APIs) authorize only by ability to load the thread.
|
||||
|
||||
Required changes:
|
||||
|
||||
- Every thread method must resolve `TenantRuntime` first, then resolve `ThreadId` inside that tenant only.
|
||||
- Add `ensure_thread_access(tenant, connection, thread_id, operation)` before read/write/turn/MCP/feedback operations.
|
||||
- For `thread/start`, write trust/config only into tenant config.
|
||||
- For `thread/list` and `thread/loaded/list`, return only tenant threads.
|
||||
- For `thread/read includeTurns`, do not read arbitrary rollout paths outside tenant storage.
|
||||
- For `thread/resume`/`thread/fork`, reject raw `path` unless it is inside tenant-owned storage or comes from a server-issued capability.
|
||||
- For turn operations, subscription and outbound lifecycle events must be scoped to subscribed connections for that tenant/thread.
|
||||
|
||||
### MCP, Plugins, Skills, Apps
|
||||
|
||||
Findings:
|
||||
|
||||
- MCP OAuth login loads latest config from the one config root and uses one thread manager MCP manager (`codex_message_processor.rs:5305-5399`).
|
||||
- MCP status reads process/tenant-wide config and auth (`codex_message_processor.rs:5415-5528`).
|
||||
- Apps list uses one config, one auth manager, and global cached connector helpers (`codex_message_processor.rs:5838-6016`).
|
||||
- Skills and plugin operations use one `codex_home`, one plugin manager, one skills manager, and one auth manager (`codex_message_processor.rs:6018-6740`).
|
||||
- Marketplace add writes directly into `self.config.codex_home` (`codex_message_processor.rs:6363-6394`).
|
||||
- Skills config write uses `ConfigEditsBuilder::new(&self.config.codex_home)` (`codex_message_processor.rs:6464-6518`).
|
||||
|
||||
Required changes:
|
||||
|
||||
- Plugins manager, skills manager, MCP manager, app connector caches, marketplace installs, and skill config writes must be per tenant.
|
||||
- MCP OAuth credentials and callback state must include tenant. Fixed callback ports can conflict across concurrent tenants; prefer ephemeral callbacks or a broker keyed by tenant+state.
|
||||
- Plugin install/uninstall/list should enforce tenant policy and use tenant auth for remote sync.
|
||||
- App connector caches should include tenant/account/config in cache keys.
|
||||
|
||||
### Feedback, Logging, Analytics, And OTEL
|
||||
|
||||
Findings:
|
||||
|
||||
- `run_main_with_transport` creates one `CodexFeedback`, one tracing subscriber, one `StateRuntime`, and one log DB layer (`app-server/src/lib.rs:479-532`).
|
||||
- Feedback upload may gather subtree thread IDs, SQLite logs, rollout attachments, and client-provided extra log files from the process tenant (`codex_message_processor.rs:8082-8265`).
|
||||
- Analytics clients are built from one auth manager and one base URL (`message_processor.rs:263-270`).
|
||||
|
||||
Required changes:
|
||||
|
||||
- Logs and feedback snapshots must be tenant-tagged or tenant-isolated.
|
||||
- Feedback upload with `include_logs` must only include tenant-owned threads and logs.
|
||||
- `extra_log_files` should be removed in cloud mode or replaced by tenant-scoped attachment handles.
|
||||
- Analytics events must include tenant/account/workspace where appropriate and avoid leaking tenant-specific request IDs across tenants.
|
||||
- Process-level tracing subscriber can stay global, but per-tenant log DB sinks need either tenant-specific layers or a shared log schema with tenant columns.
|
||||
|
||||
## High-Risk Single-Tenant Assumptions To Remove First
|
||||
|
||||
1. Global outbound broadcasts.
|
||||
2. Global thread-created auto-attach to every initialized connection.
|
||||
3. One `AuthManager` and auth store for all connections.
|
||||
4. One `ConfigApi` and runtime feature map.
|
||||
5. Process-global default HTTP client headers (`ORIGINATOR`, `USER_AGENT_SUFFIX`, residency).
|
||||
6. Thread/store APIs that resolve bare `thread_id` without tenant.
|
||||
7. Absolute-path FS APIs and raw rollout path resume/fork.
|
||||
8. One `active_login`.
|
||||
9. One `ThreadManager` with shared plugin/skills/MCP/model managers.
|
||||
10. Shared feedback/log snapshots without tenant labels.
|
||||
|
||||
## Suggested Implementation Plan
|
||||
|
||||
### Phase 0: Safety Switches
|
||||
|
||||
- Add a cloud/multi-tenant mode feature flag.
|
||||
- In that mode, disable or restrict `fs/*`, raw `thread/resume.path`, raw `thread/fork.path`, `feedback.extraLogFiles`, and unauthenticated WebSocket listeners.
|
||||
- Add logging fields for connection id, principal, redacted identity key digest, thread id, and request id.
|
||||
|
||||
### Phase 1: Tenant Context Plumbing
|
||||
|
||||
- Use `IdentityKey` as the tenant id and add `TenantPrincipal`, `TenantRuntime`, and a `TenantRegistry` keyed by `IdentityKey`.
|
||||
- Seed the initial single-tenant runtime from the process startup `IdentityKey` where remote contracts are not yet multiplexed.
|
||||
- Make transport auth produce principals that include an `IdentityKey`.
|
||||
- Store principal/`IdentityKey` in `ConnectionSessionState` or adjacent connection metadata.
|
||||
- Route every initialized request through a tenant runtime.
|
||||
- Convert outbound broadcasts to tenant broadcasts.
|
||||
- Update `process_response`/`process_error` to include connection id and validate callback ownership.
|
||||
|
||||
### Phase 2: Per-Tenant Runtime Isolation
|
||||
|
||||
- Move config/auth/cloud requirements/thread manager/config API/external config API/feedback/log handles into `TenantRuntime`.
|
||||
- Create per-tenant `codex_home` and `sqlite_home`.
|
||||
- Make `ThreadStateManager`, `ThreadWatchManager`, `CommandExecManager`, `FsWatchManager`, fuzzy search state, pending unloads, and active login per tenant or tenant-keyed.
|
||||
- Ensure `thread_created` auto-attach only considers same-tenant connections.
|
||||
|
||||
### Phase 3: Surface Hardening
|
||||
|
||||
- Add thread ownership checks for every `thread_id` method.
|
||||
- Replace remote filesystem APIs with tenant workspace APIs.
|
||||
- Remove raw client paths for rollout/history operations in cloud mode.
|
||||
- Tenant-scope plugin/MCP/app caches and OAuth callback state.
|
||||
- Add per-tenant resource quotas.
|
||||
|
||||
### Phase 4: Optional Shared Persistence
|
||||
|
||||
Only do this if operational requirements demand one shared SQL store. Otherwise keep root isolation.
|
||||
|
||||
- Add `tenant_id` columns and composite keys to state/log schemas, with values derived from the canonical serialized `IdentityKey`.
|
||||
- Migrate all query helpers and indexes.
|
||||
- Backfill tenant ids for existing isolated roots.
|
||||
- Update rollout/session index and archive paths.
|
||||
|
||||
## Test Matrix
|
||||
|
||||
Minimum multi-tenant regression tests:
|
||||
|
||||
- Two tenants connect to one app-server process; tenant A starts a thread; tenant B never receives `thread/started`, status, approvals, raw events, or token usage notifications.
|
||||
- Tenant B cannot `thread/read`, `thread/resume`, `turn/start`, `turn/interrupt`, `mcp/*`, `feedback/upload`, archive, or rename tenant A's thread by guessed UUID.
|
||||
- Tenant A and tenant B can use the same client request id without callback collisions.
|
||||
- Tenant-specific config write reloads only tenant A threads.
|
||||
- Tenant-specific login/logout sends account notifications only to tenant A.
|
||||
- Tenant-specific `memory/reset` clears only tenant A memory data.
|
||||
- FS/read/write/watch/fuzzy search cannot escape tenant workspace.
|
||||
- Command exec process IDs collide safely across tenants/connections.
|
||||
- Plugin install/uninstall and MCP OAuth in tenant A do not change tenant B config, caches, credentials, or server status.
|
||||
- Remote-control streams for two tenants do not share client trackers, enrollments, cursors, outbound buffers, or auth recovery state.
|
||||
|
||||
## Bottom Line
|
||||
|
||||
A single deployable app-server process is feasible, but not as a narrow patch. The smallest defensible architecture is per-tenant runtime isolation inside the process, selected by authenticated `IdentityKey`, with tenant-scoped outbound routing and per-tenant `CODEX_HOME` roots. That avoids large SQL migrations while removing the dangerous cross-connection assumptions. After that is working, shared persistence can be considered as an explicit storage project rather than as a prerequisite for multi-tenancy.
|
||||
@@ -365,6 +365,7 @@ mod plugin_app_helpers;
|
||||
mod plugin_mcp_oauth;
|
||||
mod token_usage_replay;
|
||||
|
||||
use crate::IdentityKey;
|
||||
use crate::filters::compute_source_filters;
|
||||
use crate::filters::source_kind_matches;
|
||||
use crate::thread_state::ThreadListenerCommand;
|
||||
@@ -618,6 +619,7 @@ pub(crate) struct CodexMessageProcessorArgs {
|
||||
pub(crate) cloud_requirements: Arc<RwLock<CloudRequirementsLoader>>,
|
||||
pub(crate) feedback: CodexFeedback,
|
||||
pub(crate) log_db: Option<LogDbLayer>,
|
||||
pub(crate) identity_key: Option<IdentityKey>,
|
||||
}
|
||||
|
||||
impl CodexMessageProcessor {
|
||||
@@ -696,6 +698,7 @@ impl CodexMessageProcessor {
|
||||
cloud_requirements,
|
||||
feedback,
|
||||
log_db,
|
||||
identity_key: _identity_key,
|
||||
} = args;
|
||||
Self {
|
||||
auth_manager,
|
||||
|
||||
@@ -404,6 +404,7 @@ fn start_uninitialized(args: InProcessStartArgs) -> InProcessClientHandle {
|
||||
auth_manager,
|
||||
rpc_transport: AppServerRpcTransport::InProcess,
|
||||
remote_control_handle: None,
|
||||
identity_key: None,
|
||||
}));
|
||||
let mut thread_created_rx = processor.thread_created_receiver();
|
||||
let session = Arc::new(ConnectionSessionState::default());
|
||||
|
||||
@@ -12,6 +12,7 @@ use codex_login::AuthManager;
|
||||
use codex_utils_cli::CliConfigOverrides;
|
||||
use std::collections::HashMap;
|
||||
use std::collections::HashSet;
|
||||
use std::ffi::OsString;
|
||||
use std::io::ErrorKind;
|
||||
use std::io::Result as IoResult;
|
||||
use std::sync::Arc;
|
||||
@@ -94,6 +95,43 @@ pub use crate::transport::auth::WebsocketAuthCliMode;
|
||||
|
||||
const LOG_FORMAT_ENV_VAR: &str = "LOG_FORMAT";
|
||||
|
||||
#[derive(Clone, Debug, Eq, PartialEq)]
|
||||
pub struct IdentityKey {
|
||||
bytes: Vec<u8>,
|
||||
}
|
||||
|
||||
impl IdentityKey {
|
||||
pub fn from_bytes(bytes: impl Into<Vec<u8>>) -> Self {
|
||||
Self {
|
||||
bytes: bytes.into(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn from_os_string(value: OsString) -> Self {
|
||||
Self::from_bytes(os_string_to_bytes(value))
|
||||
}
|
||||
|
||||
pub fn as_bytes(&self) -> &[u8] {
|
||||
&self.bytes
|
||||
}
|
||||
|
||||
pub fn into_bytes(self) -> Vec<u8> {
|
||||
self.bytes
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
fn os_string_to_bytes(value: OsString) -> Vec<u8> {
|
||||
use std::os::unix::ffi::OsStrExt;
|
||||
|
||||
value.as_os_str().as_bytes().to_vec()
|
||||
}
|
||||
|
||||
#[cfg(not(unix))]
|
||||
fn os_string_to_bytes(value: OsString) -> Vec<u8> {
|
||||
value.to_string_lossy().into_owned().into_bytes()
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
|
||||
enum LogFormat {
|
||||
Default,
|
||||
@@ -360,6 +398,29 @@ pub async fn run_main_with_transport(
|
||||
transport: AppServerTransport,
|
||||
session_source: SessionSource,
|
||||
auth: AppServerWebsocketAuthSettings,
|
||||
) -> IoResult<()> {
|
||||
run_main_with_transport_options(
|
||||
arg0_paths,
|
||||
cli_config_overrides,
|
||||
loader_overrides,
|
||||
default_analytics_enabled,
|
||||
transport,
|
||||
session_source,
|
||||
auth,
|
||||
None,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn run_main_with_transport_options(
|
||||
arg0_paths: Arg0DispatchPaths,
|
||||
cli_config_overrides: CliConfigOverrides,
|
||||
loader_overrides: LoaderOverrides,
|
||||
default_analytics_enabled: bool,
|
||||
transport: AppServerTransport,
|
||||
session_source: SessionSource,
|
||||
auth: AppServerWebsocketAuthSettings,
|
||||
identity_key: Option<IdentityKey>,
|
||||
) -> IoResult<()> {
|
||||
let environment_manager = Arc::new(EnvironmentManager::from_env_with_runtime_paths(Some(
|
||||
ExecServerRuntimePaths::from_optional_paths(
|
||||
@@ -669,6 +730,7 @@ pub async fn run_main_with_transport(
|
||||
auth_manager,
|
||||
rpc_transport: analytics_rpc_transport(transport),
|
||||
remote_control_handle: Some(remote_control_handle),
|
||||
identity_key,
|
||||
}));
|
||||
let mut thread_created_rx = processor.thread_created_receiver();
|
||||
let mut running_turn_count_rx = processor.subscribe_running_assistant_turn_count();
|
||||
@@ -908,9 +970,27 @@ fn analytics_rpc_transport(transport: AppServerTransport) -> AppServerRpcTranspo
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::IdentityKey;
|
||||
use super::LogFormat;
|
||||
use pretty_assertions::assert_eq;
|
||||
|
||||
#[test]
|
||||
fn identity_key_preserves_opaque_bytes() {
|
||||
let key = IdentityKey::from_bytes(b"tenant-key-\x00\xff".to_vec());
|
||||
assert_eq!(key.as_bytes(), &b"tenant-key-\x00\xff"[..]);
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
#[test]
|
||||
fn identity_key_preserves_unix_argv_bytes() {
|
||||
use std::ffi::OsString;
|
||||
use std::os::unix::ffi::OsStringExt;
|
||||
|
||||
let key = IdentityKey::from_os_string(OsString::from_vec(b"tenant-key-\xff".to_vec()));
|
||||
|
||||
assert_eq!(key.as_bytes(), &b"tenant-key-\xff"[..]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn log_format_from_env_value_matches_json_values_case_insensitively() {
|
||||
assert_eq!(LogFormat::from_env_value(Some("json")), LogFormat::Json);
|
||||
|
||||
@@ -1,12 +1,14 @@
|
||||
use clap::Parser;
|
||||
use codex_app_server::AppServerTransport;
|
||||
use codex_app_server::AppServerWebsocketAuthArgs;
|
||||
use codex_app_server::run_main_with_transport;
|
||||
use codex_app_server::IdentityKey;
|
||||
use codex_app_server::run_main_with_transport_options;
|
||||
use codex_arg0::Arg0DispatchPaths;
|
||||
use codex_arg0::arg0_dispatch_or_else;
|
||||
use codex_core::config_loader::LoaderOverrides;
|
||||
use codex_protocol::protocol::SessionSource;
|
||||
use codex_utils_cli::CliConfigOverrides;
|
||||
use std::ffi::OsString;
|
||||
use std::path::PathBuf;
|
||||
|
||||
// Debug-only test hook: lets integration tests point the server at a temporary
|
||||
@@ -36,6 +38,10 @@ struct AppServerArgs {
|
||||
|
||||
#[command(flatten)]
|
||||
auth: AppServerWebsocketAuthArgs,
|
||||
|
||||
/// Opaque identity key forwarded to remote contract implementations.
|
||||
#[arg(long = "identity-key", value_name = "KEY")]
|
||||
identity_key: Option<OsString>,
|
||||
}
|
||||
|
||||
fn main() -> anyhow::Result<()> {
|
||||
@@ -51,8 +57,9 @@ fn main() -> anyhow::Result<()> {
|
||||
let transport = args.listen;
|
||||
let session_source = args.session_source;
|
||||
let auth = args.auth.try_into_settings()?;
|
||||
let identity_key = args.identity_key.map(IdentityKey::from_os_string);
|
||||
|
||||
run_main_with_transport(
|
||||
run_main_with_transport_options(
|
||||
arg0_paths,
|
||||
CliConfigOverrides::default(),
|
||||
loader_overrides,
|
||||
@@ -60,6 +67,7 @@ fn main() -> anyhow::Result<()> {
|
||||
transport,
|
||||
session_source,
|
||||
auth,
|
||||
identity_key,
|
||||
)
|
||||
.await?;
|
||||
Ok(())
|
||||
|
||||
@@ -7,6 +7,7 @@ use std::sync::RwLock;
|
||||
use std::sync::atomic::AtomicBool;
|
||||
use std::sync::atomic::Ordering;
|
||||
|
||||
use crate::IdentityKey;
|
||||
use crate::codex_message_processor::CodexMessageProcessor;
|
||||
use crate::codex_message_processor::CodexMessageProcessorArgs;
|
||||
use crate::config_api::ConfigApi;
|
||||
@@ -238,6 +239,7 @@ pub(crate) struct MessageProcessorArgs {
|
||||
pub(crate) auth_manager: Arc<AuthManager>,
|
||||
pub(crate) rpc_transport: AppServerRpcTransport,
|
||||
pub(crate) remote_control_handle: Option<RemoteControlHandle>,
|
||||
pub(crate) identity_key: Option<IdentityKey>,
|
||||
}
|
||||
|
||||
impl MessageProcessor {
|
||||
@@ -259,6 +261,7 @@ impl MessageProcessor {
|
||||
auth_manager,
|
||||
rpc_transport,
|
||||
remote_control_handle,
|
||||
identity_key,
|
||||
} = args;
|
||||
auth_manager.set_external_auth(Arc::new(ExternalAuthRefreshBridge {
|
||||
outgoing: outgoing.clone(),
|
||||
@@ -299,6 +302,7 @@ impl MessageProcessor {
|
||||
cloud_requirements: cloud_requirements.clone(),
|
||||
feedback,
|
||||
log_db,
|
||||
identity_key,
|
||||
});
|
||||
// Keep plugin startup warmups aligned at app-server startup.
|
||||
// TODO(xl): Move into PluginManager once this no longer depends on config feature gating.
|
||||
|
||||
@@ -252,6 +252,7 @@ fn build_test_processor(
|
||||
auth_manager,
|
||||
rpc_transport: AppServerRpcTransport::Stdio,
|
||||
remote_control_handle: None,
|
||||
identity_key: None,
|
||||
}));
|
||||
(processor, outgoing_rx)
|
||||
}
|
||||
|
||||
@@ -30,6 +30,7 @@ use codex_tui::ExitReason;
|
||||
use codex_tui::UpdateAction;
|
||||
use codex_utils_cli::CliConfigOverrides;
|
||||
use owo_colors::OwoColorize;
|
||||
use std::ffi::OsString;
|
||||
use std::io::IsTerminal;
|
||||
use std::path::PathBuf;
|
||||
use supports_color::Stream;
|
||||
@@ -390,6 +391,10 @@ struct AppServerCommand {
|
||||
|
||||
#[command(flatten)]
|
||||
auth: codex_app_server::AppServerWebsocketAuthArgs,
|
||||
|
||||
/// Opaque identity key forwarded to remote contract implementations.
|
||||
#[arg(long = "identity-key", value_name = "KEY")]
|
||||
identity_key: Option<OsString>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Parser)]
|
||||
@@ -744,6 +749,7 @@ async fn cli_main(arg0_paths: Arg0DispatchPaths) -> anyhow::Result<()> {
|
||||
listen,
|
||||
analytics_default_enabled,
|
||||
auth,
|
||||
identity_key,
|
||||
} = app_server_cli;
|
||||
reject_remote_mode_for_app_server_subcommand(
|
||||
root_remote.as_deref(),
|
||||
@@ -754,7 +760,9 @@ async fn cli_main(arg0_paths: Arg0DispatchPaths) -> anyhow::Result<()> {
|
||||
None => {
|
||||
let transport = listen;
|
||||
let auth = auth.try_into_settings()?;
|
||||
codex_app_server::run_main_with_transport(
|
||||
let identity_key =
|
||||
identity_key.map(codex_app_server::IdentityKey::from_os_string);
|
||||
codex_app_server::run_main_with_transport_options(
|
||||
arg0_paths.clone(),
|
||||
root_config_overrides,
|
||||
codex_core::config_loader::LoaderOverrides::default(),
|
||||
@@ -762,6 +770,7 @@ async fn cli_main(arg0_paths: Arg0DispatchPaths) -> anyhow::Result<()> {
|
||||
transport,
|
||||
codex_protocol::protocol::SessionSource::VSCode,
|
||||
auth,
|
||||
identity_key,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
@@ -1942,6 +1951,17 @@ mod tests {
|
||||
assert!(app_server.analytics_default_enabled);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn app_server_identity_key_flag_parses() {
|
||||
let app_server = app_server_from_args(
|
||||
["codex", "app-server", "--identity-key", "tenant-key-123"].as_ref(),
|
||||
);
|
||||
assert_eq!(
|
||||
app_server.identity_key.as_deref(),
|
||||
Some(std::ffi::OsStr::new("tenant-key-123"))
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn remote_flag_parses_for_interactive_root() {
|
||||
let cli = MultitoolCli::try_parse_from(["codex", "--remote", "ws://127.0.0.1:4500"])
|
||||
|
||||
@@ -15,6 +15,7 @@ pub use error::ThreadStoreError;
|
||||
pub use error::ThreadStoreResult;
|
||||
pub use local::LocalThreadStore;
|
||||
pub use recorder::ThreadRecorder;
|
||||
pub use remote::IDENTITY_KEY_HEADER;
|
||||
pub use remote::RemoteThreadStore;
|
||||
pub use store::ThreadStore;
|
||||
pub use types::AppendThreadItemsParams;
|
||||
|
||||
@@ -57,6 +57,8 @@ pub(super) async fn list_threads(
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
use std::sync::Mutex;
|
||||
|
||||
use codex_protocol::openai_models::ReasoningEffort;
|
||||
use codex_protocol::protocol::SessionSource;
|
||||
@@ -70,11 +72,14 @@ mod tests {
|
||||
use super::super::proto::thread_store_server;
|
||||
use super::super::proto::thread_store_server::ThreadStoreServer;
|
||||
use super::*;
|
||||
use crate::IDENTITY_KEY_HEADER;
|
||||
use crate::ThreadSortKey;
|
||||
use crate::ThreadStore;
|
||||
|
||||
#[derive(Default)]
|
||||
struct TestServer;
|
||||
struct TestServer {
|
||||
captured_identity_key: Option<Arc<Mutex<Option<Vec<u8>>>>>,
|
||||
}
|
||||
|
||||
#[tonic::async_trait]
|
||||
impl thread_store_server::ThreadStore for TestServer {
|
||||
@@ -82,6 +87,16 @@ mod tests {
|
||||
&self,
|
||||
request: Request<proto::ListThreadsRequest>,
|
||||
) -> Result<Response<proto::ListThreadsResponse>, Status> {
|
||||
if let Some(captured_identity_key) = &self.captured_identity_key {
|
||||
let identity_key = request
|
||||
.metadata()
|
||||
.get_bin(IDENTITY_KEY_HEADER)
|
||||
.and_then(|value| value.to_bytes().ok())
|
||||
.map(|value| value.to_vec());
|
||||
*captured_identity_key
|
||||
.lock()
|
||||
.expect("captured identity key mutex poisoned") = identity_key;
|
||||
}
|
||||
let request = request.into_inner();
|
||||
assert_eq!(request.page_size, 2);
|
||||
assert_eq!(request.cursor.as_deref(), Some("cursor-1"));
|
||||
@@ -145,7 +160,7 @@ mod tests {
|
||||
let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
|
||||
let server = tokio::spawn(async move {
|
||||
Server::builder()
|
||||
.add_service(ThreadStoreServer::new(TestServer))
|
||||
.add_service(ThreadStoreServer::new(TestServer::default()))
|
||||
.serve_with_incoming_shutdown(
|
||||
tokio_stream::wrappers::TcpListenerStream::new(listener),
|
||||
async {
|
||||
@@ -197,6 +212,58 @@ mod tests {
|
||||
server.await.expect("join server").expect("server");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn list_threads_forwards_identity_key_as_metadata() {
|
||||
let captured_identity_key = Arc::new(Mutex::new(None));
|
||||
let listener = tokio::net::TcpListener::bind("127.0.0.1:0")
|
||||
.await
|
||||
.expect("bind test server");
|
||||
let addr = listener.local_addr().expect("test server addr");
|
||||
let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
|
||||
let server_identity_key = captured_identity_key.clone();
|
||||
let server = tokio::spawn(async move {
|
||||
Server::builder()
|
||||
.add_service(ThreadStoreServer::new(TestServer {
|
||||
captured_identity_key: Some(server_identity_key),
|
||||
}))
|
||||
.serve_with_incoming_shutdown(
|
||||
tokio_stream::wrappers::TcpListenerStream::new(listener),
|
||||
async {
|
||||
let _ = shutdown_rx.await;
|
||||
},
|
||||
)
|
||||
.await
|
||||
});
|
||||
|
||||
let store = RemoteThreadStore::new_with_identity_key(
|
||||
format!("http://{addr}"),
|
||||
Some(b"tenant-key-\x00\xff".to_vec()),
|
||||
);
|
||||
store
|
||||
.list_threads(ListThreadsParams {
|
||||
page_size: 2,
|
||||
cursor: Some("cursor-1".to_string()),
|
||||
sort_key: ThreadSortKey::UpdatedAt,
|
||||
allowed_sources: vec![SessionSource::Cli],
|
||||
model_providers: Some(vec!["openai".to_string()]),
|
||||
archived: true,
|
||||
search_term: Some("needle".to_string()),
|
||||
})
|
||||
.await
|
||||
.expect("list threads");
|
||||
|
||||
assert_eq!(
|
||||
captured_identity_key
|
||||
.lock()
|
||||
.expect("captured identity key mutex poisoned")
|
||||
.as_deref(),
|
||||
Some(&b"tenant-key-\x00\xff"[..])
|
||||
);
|
||||
|
||||
let _ = shutdown_tx.send(());
|
||||
server.await.expect("join server").expect("server");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn stored_thread_proto_roundtrips_through_domain_type() {
|
||||
let thread = proto::StoredThread {
|
||||
|
||||
@@ -20,30 +20,83 @@ use crate::ThreadStoreError;
|
||||
use crate::ThreadStoreResult;
|
||||
use crate::UpdateThreadMetadataParams;
|
||||
use proto::thread_store_client::ThreadStoreClient;
|
||||
use tonic::codegen::InterceptedService;
|
||||
use tonic::metadata::BinaryMetadataValue;
|
||||
use tonic::service::Interceptor;
|
||||
use tonic::transport::Channel;
|
||||
use tonic::transport::Endpoint;
|
||||
|
||||
#[path = "proto/codex.thread_store.v1.rs"]
|
||||
mod proto;
|
||||
|
||||
/// Metadata key used to forward the app-server's opaque identity key to remote contracts.
|
||||
pub const IDENTITY_KEY_HEADER: &str = "x-codex-app-server-identity-key-bin";
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
struct IdentityKeyInterceptor {
|
||||
identity_key: Option<Vec<u8>>,
|
||||
}
|
||||
|
||||
impl Interceptor for IdentityKeyInterceptor {
|
||||
fn call(
|
||||
&mut self,
|
||||
mut request: tonic::Request<()>,
|
||||
) -> Result<tonic::Request<()>, tonic::Status> {
|
||||
if let Some(identity_key) = &self.identity_key {
|
||||
request.metadata_mut().insert_bin(
|
||||
IDENTITY_KEY_HEADER,
|
||||
BinaryMetadataValue::from_bytes(identity_key),
|
||||
);
|
||||
}
|
||||
Ok(request)
|
||||
}
|
||||
}
|
||||
|
||||
type RemoteThreadStoreClient =
|
||||
ThreadStoreClient<InterceptedService<Channel, IdentityKeyInterceptor>>;
|
||||
|
||||
/// gRPC-backed [`ThreadStore`] implementation for deployments whose durable thread data lives
|
||||
/// outside the app-server process.
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct RemoteThreadStore {
|
||||
endpoint: String,
|
||||
identity_key: Option<Vec<u8>>,
|
||||
}
|
||||
|
||||
impl RemoteThreadStore {
|
||||
pub fn new(endpoint: impl Into<String>) -> Self {
|
||||
Self {
|
||||
endpoint: endpoint.into(),
|
||||
identity_key: None,
|
||||
}
|
||||
}
|
||||
|
||||
async fn client(&self) -> ThreadStoreResult<ThreadStoreClient<tonic::transport::Channel>> {
|
||||
ThreadStoreClient::connect(self.endpoint.clone())
|
||||
pub fn new_with_identity_key(
|
||||
endpoint: impl Into<String>,
|
||||
identity_key: Option<Vec<u8>>,
|
||||
) -> Self {
|
||||
Self {
|
||||
endpoint: endpoint.into(),
|
||||
identity_key,
|
||||
}
|
||||
}
|
||||
|
||||
async fn client(&self) -> ThreadStoreResult<RemoteThreadStoreClient> {
|
||||
let channel = Endpoint::new(self.endpoint.clone())
|
||||
.map_err(|err| ThreadStoreError::InvalidRequest {
|
||||
message: format!("invalid remote thread store endpoint: {err}"),
|
||||
})?
|
||||
.connect()
|
||||
.await
|
||||
.map_err(|err| ThreadStoreError::Internal {
|
||||
message: format!("failed to connect to remote thread store: {err}"),
|
||||
})
|
||||
})?;
|
||||
Ok(ThreadStoreClient::with_interceptor(
|
||||
channel,
|
||||
IdentityKeyInterceptor {
|
||||
identity_key: self.identity_key.clone(),
|
||||
},
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user