Compare commits

...

3 Commits

Author SHA1 Message Date
bryanashley
01b7621d37 Fix app-server runtime options initialization 2026-04-26 20:51:12 -07:00
bryanashley
131ee501e0 move identity 2026-04-26 20:37:19 -07:00
bryanashley
babfce7d9c Add opaque identity and make available to thread through to remote contracts 2026-04-26 20:37:19 -07:00
14 changed files with 259 additions and 12 deletions

View File

@@ -405,6 +405,7 @@ impl InProcessClientStartArgs {
session_source: self.session_source,
enable_codex_api_key_env: self.enable_codex_api_key_env,
initialize,
identity_key: None,
channel_capacity: self.channel_capacity,
}
}

View File

@@ -412,6 +412,7 @@ mod plugin_mcp_oauth;
mod plugins;
mod token_usage_replay;
use crate::IdentityKey;
use crate::filters::compute_source_filters;
use crate::filters::source_kind_matches;
use crate::thread_state::ThreadListenerCommand;
@@ -674,6 +675,7 @@ pub(crate) struct CodexMessageProcessorArgs {
pub(crate) config_manager: ConfigManager,
pub(crate) feedback: CodexFeedback,
pub(crate) log_db: Option<LogDbLayer>,
pub(crate) identity_key: Option<IdentityKey>,
}
fn configured_thread_store(config: &Config) -> Arc<dyn ThreadStore> {
@@ -770,6 +772,7 @@ impl CodexMessageProcessor {
config_manager,
feedback,
log_db,
identity_key: _identity_key,
} = args;
Self {
auth_manager,

View File

@@ -0,0 +1,61 @@
use std::ffi::OsString;
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct IdentityKey {
bytes: Vec<u8>,
}
impl IdentityKey {
pub fn from_bytes(bytes: impl Into<Vec<u8>>) -> Self {
Self {
bytes: bytes.into(),
}
}
pub fn from_os_string(value: OsString) -> Self {
Self::from_bytes(os_string_to_bytes(value))
}
pub fn as_bytes(&self) -> &[u8] {
&self.bytes
}
pub fn into_bytes(self) -> Vec<u8> {
self.bytes
}
}
#[cfg(unix)]
fn os_string_to_bytes(value: OsString) -> Vec<u8> {
use std::os::unix::ffi::OsStrExt;
value.as_os_str().as_bytes().to_vec()
}
#[cfg(not(unix))]
fn os_string_to_bytes(value: OsString) -> Vec<u8> {
value.to_string_lossy().into_owned().into_bytes()
}
#[cfg(test)]
mod tests {
use super::IdentityKey;
use pretty_assertions::assert_eq;
#[test]
fn identity_key_preserves_opaque_bytes() {
let key = IdentityKey::from_bytes(b"tenant-key-\x00\xff".to_vec());
assert_eq!(key.as_bytes(), &b"tenant-key-\x00\xff"[..]);
}
#[cfg(unix)]
#[test]
fn identity_key_preserves_unix_argv_bytes() {
use std::ffi::OsString;
use std::os::unix::ffi::OsStringExt;
let key = IdentityKey::from_os_string(OsString::from_vec(b"tenant-key-\xff".to_vec()));
assert_eq!(key.as_bytes(), &b"tenant-key-\xff"[..]);
}
}

View File

@@ -50,6 +50,7 @@ use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use std::time::Duration;
use crate::IdentityKey;
use crate::config_manager::ConfigManager;
use crate::error_code::INTERNAL_ERROR_CODE;
use crate::error_code::INVALID_REQUEST_ERROR_CODE;
@@ -135,6 +136,8 @@ pub struct InProcessStartArgs {
pub enable_codex_api_key_env: bool,
/// Initialize params used for initial handshake.
pub initialize: InitializeParams,
/// Opaque identity key forwarded to remote contract implementations.
pub identity_key: Option<IdentityKey>,
/// Capacity used for all runtime queues (clamped to at least 1).
pub channel_capacity: usize,
}
@@ -416,6 +419,7 @@ fn start_uninitialized(args: InProcessStartArgs) -> InProcessClientHandle {
rpc_transport: AppServerRpcTransport::InProcess,
remote_control_handle: None,
plugin_startup_tasks: crate::PluginStartupTasks::Start,
identity_key: args.identity_key,
}));
let mut thread_created_rx = processor.thread_created_receiver();
let session = Arc::new(ConnectionSessionState::new(ConnectionOrigin::InProcess));
@@ -763,6 +767,7 @@ mod tests {
},
capabilities: None,
},
identity_key: None,
channel_capacity,
};
start(args).await.expect("in-process runtime should start")

View File

@@ -83,6 +83,7 @@ mod filters;
mod fs_api;
mod fs_watch;
mod fuzzy_file_search;
mod identity_key;
pub mod in_process;
mod message_processor;
mod models;
@@ -94,6 +95,7 @@ mod transport;
pub use crate::error_code::INPUT_TOO_LARGE_ERROR_CODE;
pub use crate::error_code::INVALID_PARAMS_ERROR_CODE;
pub use crate::identity_key::IdentityKey;
pub use crate::transport::AppServerTransport;
pub use crate::transport::app_server_control_socket_path;
pub use crate::transport::auth::AppServerWebsocketAuthArgs;
@@ -368,15 +370,17 @@ pub enum PluginStartupTasks {
Skip,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct AppServerRuntimeOptions {
pub plugin_startup_tasks: PluginStartupTasks,
pub identity_key: Option<IdentityKey>,
}
impl Default for AppServerRuntimeOptions {
fn default() -> Self {
Self {
plugin_startup_tasks: PluginStartupTasks::Start,
identity_key: None,
}
}
}
@@ -414,6 +418,10 @@ pub async fn run_main_with_transport_options(
auth: AppServerWebsocketAuthSettings,
runtime_options: AppServerRuntimeOptions,
) -> IoResult<()> {
let AppServerRuntimeOptions {
plugin_startup_tasks,
identity_key,
} = runtime_options;
let environment_manager = Arc::new(EnvironmentManager::new(EnvironmentManagerArgs::from_env(
ExecServerRuntimePaths::from_optional_paths(
arg0_paths.codex_self_exe.clone(),
@@ -726,7 +734,8 @@ pub async fn run_main_with_transport_options(
auth_manager,
rpc_transport: analytics_rpc_transport(&transport),
remote_control_handle: Some(remote_control_handle),
plugin_startup_tasks: runtime_options.plugin_startup_tasks,
plugin_startup_tasks,
identity_key,
}));
let mut thread_created_rx = processor.thread_created_receiver();
let mut running_turn_count_rx = processor.subscribe_running_assistant_turn_count();

View File

@@ -2,6 +2,7 @@ use clap::Parser;
use codex_app_server::AppServerRuntimeOptions;
use codex_app_server::AppServerTransport;
use codex_app_server::AppServerWebsocketAuthArgs;
use codex_app_server::IdentityKey;
use codex_app_server::PluginStartupTasks;
use codex_app_server::run_main_with_transport_options;
use codex_arg0::Arg0DispatchPaths;
@@ -9,6 +10,7 @@ use codex_arg0::arg0_dispatch_or_else;
use codex_config::LoaderOverrides;
use codex_protocol::protocol::SessionSource;
use codex_utils_cli::CliConfigOverrides;
use std::ffi::OsString;
use std::path::PathBuf;
// Debug-only test hook: lets integration tests point the server at a temporary
@@ -39,6 +41,10 @@ struct AppServerArgs {
#[command(flatten)]
auth: AppServerWebsocketAuthArgs,
/// Opaque identity key forwarded to remote contract implementations.
#[arg(long = "identity-key", value_name = "KEY")]
identity_key: Option<OsString>,
/// Hidden debug-only test hook used by integration tests that spawn the
/// production app-server binary.
#[cfg(debug_assertions)]
@@ -59,11 +65,18 @@ fn main() -> anyhow::Result<()> {
let transport = args.listen;
let session_source = args.session_source;
let auth = args.auth.try_into_settings()?;
let mut runtime_options = AppServerRuntimeOptions::default();
#[cfg(debug_assertions)]
if args.disable_plugin_startup_tasks_for_tests {
runtime_options.plugin_startup_tasks = PluginStartupTasks::Skip;
}
let plugin_startup_tasks = if args.disable_plugin_startup_tasks_for_tests {
PluginStartupTasks::Skip
} else {
PluginStartupTasks::Start
};
#[cfg(not(debug_assertions))]
let plugin_startup_tasks = PluginStartupTasks::Start;
let runtime_options = AppServerRuntimeOptions {
plugin_startup_tasks,
identity_key: args.identity_key.map(IdentityKey::from_os_string),
};
run_main_with_transport_options(
arg0_paths,

View File

@@ -5,6 +5,7 @@ use std::sync::OnceLock;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use crate::IdentityKey;
use crate::codex_message_processor::CodexMessageProcessor;
use crate::codex_message_processor::CodexMessageProcessorArgs;
use crate::config_api::ConfigApi;
@@ -249,6 +250,7 @@ pub(crate) struct MessageProcessorArgs {
pub(crate) rpc_transport: AppServerRpcTransport,
pub(crate) remote_control_handle: Option<RemoteControlHandle>,
pub(crate) plugin_startup_tasks: crate::PluginStartupTasks,
pub(crate) identity_key: Option<IdentityKey>,
}
impl MessageProcessor {
@@ -269,6 +271,7 @@ impl MessageProcessor {
rpc_transport,
remote_control_handle,
plugin_startup_tasks,
identity_key,
} = args;
auth_manager.set_external_auth(Arc::new(ExternalAuthRefreshBridge {
outgoing: outgoing.clone(),
@@ -304,6 +307,7 @@ impl MessageProcessor {
config_manager: config_manager.clone(),
feedback,
log_db,
identity_key,
});
if matches!(plugin_startup_tasks, crate::PluginStartupTasks::Start) {
// Keep plugin startup warmups aligned at app-server startup.

View File

@@ -289,6 +289,7 @@ fn build_test_processor(
rpc_transport: AppServerRpcTransport::Stdio,
remote_control_handle: None,
plugin_startup_tasks: crate::PluginStartupTasks::Start,
identity_key: None,
}));
(processor, outgoing_rx)
}

View File

@@ -216,6 +216,7 @@ async fn mcp_resource_read_returns_error_for_unknown_thread() -> Result<()> {
},
capabilities: None,
},
identity_key: None,
channel_capacity: in_process::DEFAULT_IN_PROCESS_CHANNEL_CAPACITY,
})
.await?;

View File

@@ -90,6 +90,7 @@ async fn thread_start_with_non_local_thread_store_does_not_create_local_persiste
},
capabilities: None,
},
identity_key: None,
channel_capacity: in_process::DEFAULT_IN_PROCESS_CHANNEL_CAPACITY,
})
.await?;

View File

@@ -35,6 +35,7 @@ use codex_tui::UpdateAction;
use codex_utils_absolute_path::AbsolutePathBuf;
use codex_utils_cli::CliConfigOverrides;
use owo_colors::OwoColorize;
use std::ffi::OsString;
use std::io::IsTerminal;
use std::path::PathBuf;
use supports_color::Stream;
@@ -440,6 +441,10 @@ struct AppServerCommand {
#[command(flatten)]
auth: codex_app_server::AppServerWebsocketAuthArgs,
/// Opaque identity key forwarded to remote contract implementations.
#[arg(long = "identity-key", value_name = "KEY")]
identity_key: Option<OsString>,
}
#[derive(Debug, Parser)]
@@ -817,6 +822,7 @@ async fn cli_main(arg0_paths: Arg0DispatchPaths) -> anyhow::Result<()> {
listen,
analytics_default_enabled,
auth,
identity_key,
} = app_server_cli;
reject_remote_mode_for_app_server_subcommand(
root_remote.as_deref(),
@@ -827,7 +833,13 @@ async fn cli_main(arg0_paths: Arg0DispatchPaths) -> anyhow::Result<()> {
None => {
let transport = listen;
let auth = auth.try_into_settings()?;
codex_app_server::run_main_with_transport(
let identity_key =
identity_key.map(codex_app_server::IdentityKey::from_os_string);
let runtime_options = codex_app_server::AppServerRuntimeOptions {
identity_key,
..Default::default()
};
codex_app_server::run_main_with_transport_options(
arg0_paths.clone(),
root_config_overrides,
codex_config::LoaderOverrides::default(),
@@ -835,6 +847,7 @@ async fn cli_main(arg0_paths: Arg0DispatchPaths) -> anyhow::Result<()> {
transport,
codex_protocol::protocol::SessionSource::VSCode,
auth,
runtime_options,
)
.await?;
}
@@ -2167,6 +2180,17 @@ mod tests {
assert!(app_server.analytics_default_enabled);
}
#[test]
fn app_server_identity_key_flag_parses() {
let app_server = app_server_from_args(
["codex", "app-server", "--identity-key", "tenant-key-123"].as_ref(),
);
assert_eq!(
app_server.identity_key.as_deref(),
Some(std::ffi::OsStr::new("tenant-key-123"))
);
}
#[test]
fn remote_flag_parses_for_interactive_root() {
let cli = MultitoolCli::try_parse_from(["codex", "--remote", "ws://127.0.0.1:4500"])

View File

@@ -22,6 +22,7 @@ pub use in_memory::InMemoryThreadStoreCalls;
pub use live_thread::LiveThread;
pub use live_thread::LiveThreadInitGuard;
pub use local::LocalThreadStore;
pub use remote::IDENTITY_KEY_HEADER;
pub use remote::RemoteThreadStore;
pub use store::ThreadStore;
pub use types::AppendThreadItemsParams;

View File

@@ -66,6 +66,8 @@ pub(super) async fn list_threads(
#[cfg(test)]
mod tests {
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::Mutex;
use codex_protocol::openai_models::ReasoningEffort;
use codex_protocol::protocol::SessionSource;
@@ -79,11 +81,14 @@ mod tests {
use super::super::proto::thread_store_server;
use super::super::proto::thread_store_server::ThreadStoreServer;
use super::*;
use crate::IDENTITY_KEY_HEADER;
use crate::ThreadSortKey;
use crate::ThreadStore;
#[derive(Default)]
struct TestServer;
struct TestServer {
captured_identity_key: Option<Arc<Mutex<Option<Vec<u8>>>>>,
}
#[tonic::async_trait]
impl thread_store_server::ThreadStore for TestServer {
@@ -91,6 +96,16 @@ mod tests {
&self,
request: Request<proto::ListThreadsRequest>,
) -> Result<Response<proto::ListThreadsResponse>, Status> {
if let Some(captured_identity_key) = &self.captured_identity_key {
let identity_key = request
.metadata()
.get_bin(IDENTITY_KEY_HEADER)
.and_then(|value| value.to_bytes().ok())
.map(|value| value.to_vec());
*captured_identity_key
.lock()
.expect("captured identity key mutex poisoned") = identity_key;
}
let request = request.into_inner();
assert_eq!(request.page_size, 2);
assert_eq!(request.cursor.as_deref(), Some("cursor-1"));
@@ -170,7 +185,7 @@ mod tests {
let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
let server = tokio::spawn(async move {
Server::builder()
.add_service(ThreadStoreServer::new(TestServer))
.add_service(ThreadStoreServer::new(TestServer::default()))
.serve_with_incoming_shutdown(
tokio_stream::wrappers::TcpListenerStream::new(listener),
async {
@@ -225,6 +240,61 @@ mod tests {
server.await.expect("join server").expect("server");
}
#[tokio::test]
async fn list_threads_forwards_identity_key_as_metadata() {
let captured_identity_key = Arc::new(Mutex::new(None));
let listener = tokio::net::TcpListener::bind("127.0.0.1:0")
.await
.expect("bind test server");
let addr = listener.local_addr().expect("test server addr");
let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
let server_identity_key = captured_identity_key.clone();
let server = tokio::spawn(async move {
Server::builder()
.add_service(ThreadStoreServer::new(TestServer {
captured_identity_key: Some(server_identity_key),
}))
.serve_with_incoming_shutdown(
tokio_stream::wrappers::TcpListenerStream::new(listener),
async {
let _ = shutdown_rx.await;
},
)
.await
});
let store = RemoteThreadStore::new_with_identity_key(
format!("http://{addr}"),
Some(b"tenant-key-\x00\xff".to_vec()),
);
store
.list_threads(ListThreadsParams {
page_size: 2,
cursor: Some("cursor-1".to_string()),
sort_key: ThreadSortKey::UpdatedAt,
sort_direction: crate::SortDirection::Desc,
allowed_sources: vec![SessionSource::Cli],
model_providers: Some(vec!["openai".to_string()]),
cwd_filters: Some(vec![PathBuf::from("/workspace")]),
archived: true,
search_term: Some("needle".to_string()),
use_state_db_only: true,
})
.await
.expect("list threads");
assert_eq!(
captured_identity_key
.lock()
.expect("captured identity key mutex poisoned")
.as_deref(),
Some(&b"tenant-key-\x00\xff"[..])
);
let _ = shutdown_tx.send(());
server.await.expect("join server").expect("server");
}
#[test]
fn stored_thread_proto_roundtrips_through_domain_type() {
let thread = proto::StoredThread {

View File

@@ -20,10 +20,41 @@ use crate::ThreadStoreError;
use crate::ThreadStoreResult;
use crate::UpdateThreadMetadataParams;
use proto::thread_store_client::ThreadStoreClient;
use tonic::codegen::InterceptedService;
use tonic::metadata::BinaryMetadataValue;
use tonic::service::Interceptor;
use tonic::transport::Channel;
use tonic::transport::Endpoint;
#[path = "proto/codex.thread_store.v1.rs"]
mod proto;
/// Metadata key used to forward the app-server's opaque identity key to remote contracts.
pub const IDENTITY_KEY_HEADER: &str = "x-codex-app-server-identity-key-bin";
#[derive(Clone, Debug)]
struct IdentityKeyInterceptor {
identity_key: Option<Vec<u8>>,
}
impl Interceptor for IdentityKeyInterceptor {
fn call(
&mut self,
mut request: tonic::Request<()>,
) -> Result<tonic::Request<()>, tonic::Status> {
if let Some(identity_key) = &self.identity_key {
request.metadata_mut().insert_bin(
IDENTITY_KEY_HEADER,
BinaryMetadataValue::from_bytes(identity_key),
);
}
Ok(request)
}
}
type RemoteThreadStoreClient =
ThreadStoreClient<InterceptedService<Channel, IdentityKeyInterceptor>>;
/// gRPC-backed [`ThreadStore`] implementation for deployments whose durable thread data lives
/// outside the app-server process.
///
@@ -33,21 +64,43 @@ mod proto;
#[derive(Clone, Debug)]
pub struct RemoteThreadStore {
endpoint: String,
identity_key: Option<Vec<u8>>,
}
impl RemoteThreadStore {
pub fn new(endpoint: impl Into<String>) -> Self {
Self {
endpoint: endpoint.into(),
identity_key: None,
}
}
async fn client(&self) -> ThreadStoreResult<ThreadStoreClient<tonic::transport::Channel>> {
ThreadStoreClient::connect(self.endpoint.clone())
pub fn new_with_identity_key(
endpoint: impl Into<String>,
identity_key: Option<Vec<u8>>,
) -> Self {
Self {
endpoint: endpoint.into(),
identity_key,
}
}
async fn client(&self) -> ThreadStoreResult<RemoteThreadStoreClient> {
let channel = Endpoint::new(self.endpoint.clone())
.map_err(|err| ThreadStoreError::InvalidRequest {
message: format!("invalid remote thread store endpoint: {err}"),
})?
.connect()
.await
.map_err(|err| ThreadStoreError::Internal {
message: format!("failed to connect to remote thread store: {err}"),
})
})?;
Ok(ThreadStoreClient::with_interceptor(
channel,
IdentityKeyInterceptor {
identity_key: self.identity_key.clone(),
},
))
}
}