Files
codex/codex-rs/app-server/src/in_process.rs
Ruslan Nigmatullin 0700f979ba app-server: run initialized rpcs with keyed serialization (#17373)
## Why

Initialized app-server RPCs no longer need to bottleneck behind one
request processor path. Running them concurrently improves
responsiveness, but several request families still mutate shared state
or depend on ordered side effects. Those stateful families need an
auditable serialization contract so concurrency does not reorder thread,
config, auth, command, watcher, MCP, or similar state transitions.

This PR keeps that boundary explicit: stateful work is serialized by the
smallest useful key, while intentionally read-only or externally
concurrent work remains unkeyed. In particular, `thread/list` and
`thread/turns/list` explicitly have no serialization because they
primarily read append-only rollout storage and should continue to be
served concurrently.

## What changed

- Adds `ClientRequest::serialization_scope()` in `app-server-protocol`
and requires every client request definition to declare its
serialization behavior.
- Introduces keyed request scopes for thread, thread path, command exec
process, fuzzy search session, fs watch, MCP OAuth, and global state
buckets such as config, account auth, memory, and device keys.
- Routes initialized app-server RPCs through per-key FIFO serialization
while allowing unkeyed initialized requests to run concurrently.
- Cancels in-flight initialized RPC work when the connection disconnects
or the app-server exits so spawned request tasks do not outlive their
session.
- Adds focused coverage for representative keyed and unkeyed
serialization scopes, including explicitly concurrent
`thread/turns/list` behavior.

## Validation

- Added protocol tests for representative keyed serialization scopes and
intentionally unkeyed request families.
- Added app-server request serialization tests covering per-key FIFO
behavior, concurrent unkeyed execution, disconnect shutdown, and config
read-after-write ordering.
- Local focused protocol validation after the latest rebase is currently
blocked by packageproxy failing to resolve locked `rustls-webpki
0.103.13`; CI is expected to provide the full validation signal.
2026-04-28 12:23:34 -07:00

953 lines
42 KiB
Rust

//! In-process app-server runtime host for local embedders.
//!
//! This module runs the existing [`MessageProcessor`] and outbound routing logic
//! on Tokio tasks, but replaces socket/stdio transports with bounded in-memory
//! channels. The intent is to preserve app-server semantics while avoiding a
//! process boundary for CLI surfaces that run in the same process.
//!
//! # Lifecycle
//!
//! 1. Construct runtime state with [`InProcessStartArgs`].
//! 2. Call [`start`], which performs the `initialize` / `initialized` handshake
//! internally and returns a ready-to-use [`InProcessClientHandle`].
//! 3. Send requests via [`InProcessClientHandle::request`], notifications via
//! [`InProcessClientHandle::notify`], and consume events via
//! [`InProcessClientHandle::next_event`].
//! 4. Terminate with [`InProcessClientHandle::shutdown`].
//!
//! # Transport model
//!
//! The runtime is transport-local but not protocol-free. Incoming requests are
//! typed [`ClientRequest`] values, yet responses still come back through the
//! same JSON-RPC result envelope that `MessageProcessor` uses for stdio and
//! websocket transports. This keeps in-process behavior aligned with
//! app-server rather than creating a second execution contract.
//!
//! # Backpressure
//!
//! Command submission uses `try_send` and can return `WouldBlock`, while event
//! fanout may drop notifications under saturation. Server requests are never
//! silently abandoned: if they cannot be queued they are failed back into
//! `MessageProcessor` with overload or internal errors so approval flows do
//! not hang indefinitely.
//!
//! # Relationship to `codex-app-server-client`
//!
//! This module provides the low-level runtime handle ([`InProcessClientHandle`]).
//! Higher-level callers (TUI, exec) should go through `codex-app-server-client`,
//! which wraps this module behind a worker task with async request/response
//! helpers, surface-specific startup policy, and bounded shutdown.
use std::collections::HashMap;
use std::collections::HashSet;
use std::collections::hash_map::Entry;
use std::io::Error as IoError;
use std::io::ErrorKind;
use std::io::Result as IoResult;
use std::sync::Arc;
use std::sync::RwLock;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use std::time::Duration;
use crate::config_manager::ConfigManager;
use crate::error_code::INTERNAL_ERROR_CODE;
use crate::error_code::INVALID_REQUEST_ERROR_CODE;
use crate::error_code::OVERLOADED_ERROR_CODE;
use crate::message_processor::ConnectionSessionState;
use crate::message_processor::MessageProcessor;
use crate::message_processor::MessageProcessorArgs;
use crate::outgoing_message::ConnectionId;
use crate::outgoing_message::OutgoingEnvelope;
use crate::outgoing_message::OutgoingMessage;
use crate::outgoing_message::OutgoingMessageSender;
use crate::outgoing_message::QueuedOutgoingMessage;
use crate::transport::CHANNEL_CAPACITY;
use crate::transport::ConnectionOrigin;
use crate::transport::OutboundConnectionState;
use crate::transport::route_outgoing_envelope;
use codex_analytics::AppServerRpcTransport;
use codex_app_server_protocol::ClientNotification;
use codex_app_server_protocol::ClientRequest;
use codex_app_server_protocol::ConfigWarningNotification;
use codex_app_server_protocol::InitializeParams;
use codex_app_server_protocol::JSONRPCErrorError;
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::Result;
use codex_app_server_protocol::ServerNotification;
use codex_app_server_protocol::ServerRequest;
use codex_arg0::Arg0DispatchPaths;
use codex_config::CloudRequirementsLoader;
use codex_config::LoaderOverrides;
use codex_config::ThreadConfigLoader;
use codex_core::config::Config;
use codex_exec_server::EnvironmentManager;
use codex_feedback::CodexFeedback;
use codex_login::AuthManager;
use codex_protocol::protocol::SessionSource;
pub use codex_state::log_db::LogDbLayer;
use tokio::sync::mpsc;
use tokio::sync::oneshot;
use tokio::time::timeout;
use toml::Value as TomlValue;
use tracing::warn;
const IN_PROCESS_CONNECTION_ID: ConnectionId = ConnectionId(0);
const SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(5);
/// Default bounded channel capacity for in-process runtime queues.
pub const DEFAULT_IN_PROCESS_CHANNEL_CAPACITY: usize = CHANNEL_CAPACITY;
type PendingClientRequestResponse = std::result::Result<Result, JSONRPCErrorError>;
fn server_notification_requires_delivery(notification: &ServerNotification) -> bool {
matches!(notification, ServerNotification::TurnCompleted(_))
}
/// Input needed to start an in-process app-server runtime.
///
/// These fields mirror the pieces of ambient process state that stdio and
/// websocket transports normally assemble before `MessageProcessor` starts.
#[derive(Clone)]
pub struct InProcessStartArgs {
/// Resolved argv0 dispatch paths used by command execution internals.
pub arg0_paths: Arg0DispatchPaths,
/// Shared base config used to initialize core components.
pub config: Arc<Config>,
/// CLI config overrides that are already parsed into TOML values.
pub cli_overrides: Vec<(String, TomlValue)>,
/// Loader override knobs used by config API paths.
pub loader_overrides: LoaderOverrides,
/// Preloaded cloud requirements provider.
pub cloud_requirements: CloudRequirementsLoader,
/// Loader used to fetch typed thread config sources before a thread starts.
pub thread_config_loader: Arc<dyn ThreadConfigLoader>,
/// Feedback sink used by app-server/core telemetry and logs.
pub feedback: CodexFeedback,
/// SQLite tracing layer used to flush recently emitted logs before feedback upload.
pub log_db: Option<LogDbLayer>,
/// Environment manager used by core execution and filesystem operations.
pub environment_manager: Arc<EnvironmentManager>,
/// Startup warnings emitted after initialize succeeds.
pub config_warnings: Vec<ConfigWarningNotification>,
/// Session source stamped into thread/session metadata.
pub session_source: SessionSource,
/// Whether auth loading should honor the `CODEX_API_KEY` environment variable.
pub enable_codex_api_key_env: bool,
/// Initialize params used for initial handshake.
pub initialize: InitializeParams,
/// Capacity used for all runtime queues (clamped to at least 1).
pub channel_capacity: usize,
}
/// Event emitted from the app-server to the in-process client.
///
/// [`Lagged`](Self::Lagged) is a transport health marker, not an application
/// event — it signals that the consumer fell behind and some events were dropped.
#[derive(Debug, Clone)]
pub enum InProcessServerEvent {
/// Server request that requires client response/rejection.
ServerRequest(ServerRequest),
/// App-server notification directed to the embedded client.
ServerNotification(ServerNotification),
/// Indicates one or more events were dropped due to backpressure.
Lagged { skipped: usize },
}
/// Internal message sent from [`InProcessClientHandle`] methods to the runtime task.
///
/// Requests carry a oneshot sender for the response; notifications and server-request
/// replies are fire-and-forget from the caller's perspective (transport errors are
/// caught by `try_send` on the outer channel).
enum InProcessClientMessage {
Request {
request: Box<ClientRequest>,
response_tx: oneshot::Sender<PendingClientRequestResponse>,
},
Notification {
notification: ClientNotification,
},
ServerRequestResponse {
request_id: RequestId,
result: Result,
},
ServerRequestError {
request_id: RequestId,
error: JSONRPCErrorError,
},
Shutdown {
done_tx: oneshot::Sender<()>,
},
}
enum ProcessorCommand {
Request(Box<ClientRequest>),
Notification(ClientNotification),
}
#[derive(Clone)]
pub struct InProcessClientSender {
client_tx: mpsc::Sender<InProcessClientMessage>,
}
impl InProcessClientSender {
pub async fn request(&self, request: ClientRequest) -> IoResult<PendingClientRequestResponse> {
let (response_tx, response_rx) = oneshot::channel();
self.try_send_client_message(InProcessClientMessage::Request {
request: Box::new(request),
response_tx,
})?;
response_rx.await.map_err(|err| {
IoError::new(
ErrorKind::BrokenPipe,
format!("in-process request response channel closed: {err}"),
)
})
}
pub fn notify(&self, notification: ClientNotification) -> IoResult<()> {
self.try_send_client_message(InProcessClientMessage::Notification { notification })
}
pub fn respond_to_server_request(&self, request_id: RequestId, result: Result) -> IoResult<()> {
self.try_send_client_message(InProcessClientMessage::ServerRequestResponse {
request_id,
result,
})
}
pub fn fail_server_request(
&self,
request_id: RequestId,
error: JSONRPCErrorError,
) -> IoResult<()> {
self.try_send_client_message(InProcessClientMessage::ServerRequestError {
request_id,
error,
})
}
fn try_send_client_message(&self, message: InProcessClientMessage) -> IoResult<()> {
match self.client_tx.try_send(message) {
Ok(()) => Ok(()),
Err(mpsc::error::TrySendError::Full(_)) => Err(IoError::new(
ErrorKind::WouldBlock,
"in-process app-server client queue is full",
)),
Err(mpsc::error::TrySendError::Closed(_)) => Err(IoError::new(
ErrorKind::BrokenPipe,
"in-process app-server runtime is closed",
)),
}
}
}
/// Handle used by an in-process client to call app-server and consume events.
///
/// This is the low-level runtime handle. Higher-level callers should usually go
/// through `codex-app-server-client`, which adds worker-task buffering,
/// request/response helpers, and surface-specific startup policy.
pub struct InProcessClientHandle {
client: InProcessClientSender,
event_rx: mpsc::Receiver<InProcessServerEvent>,
runtime_handle: tokio::task::JoinHandle<()>,
}
impl InProcessClientHandle {
/// Sends a typed client request into the in-process runtime.
///
/// The returned value is a transport-level `IoResult` containing either a
/// JSON-RPC success payload or JSON-RPC error payload. Callers must keep
/// request IDs unique among concurrent requests; reusing an in-flight ID
/// produces an `INVALID_REQUEST` response and can make request routing
/// ambiguous in the caller.
pub async fn request(&self, request: ClientRequest) -> IoResult<PendingClientRequestResponse> {
self.client.request(request).await
}
/// Sends a typed client notification into the in-process runtime.
///
/// Notifications do not have an application-level response. Transport
/// errors indicate queue saturation or closed runtime.
pub fn notify(&self, notification: ClientNotification) -> IoResult<()> {
self.client.notify(notification)
}
/// Resolves a pending [`ServerRequest`](InProcessServerEvent::ServerRequest).
///
/// This should be used only with request IDs received from the current
/// runtime event stream; sending arbitrary IDs has no effect on app-server
/// state and can mask a stuck approval flow in the caller.
pub fn respond_to_server_request(&self, request_id: RequestId, result: Result) -> IoResult<()> {
self.client.respond_to_server_request(request_id, result)
}
/// Rejects a pending [`ServerRequest`](InProcessServerEvent::ServerRequest).
///
/// Use this when the embedder cannot satisfy a server request; leaving
/// requests unanswered can stall turn progress.
pub fn fail_server_request(
&self,
request_id: RequestId,
error: JSONRPCErrorError,
) -> IoResult<()> {
self.client.fail_server_request(request_id, error)
}
/// Receives the next server event from the in-process runtime.
///
/// Returns `None` when the runtime task exits and no more events are
/// available.
pub async fn next_event(&mut self) -> Option<InProcessServerEvent> {
self.event_rx.recv().await
}
/// Requests runtime shutdown and waits for worker termination.
///
/// Shutdown is bounded by internal timeouts and may abort background tasks
/// if graceful drain does not complete in time.
pub async fn shutdown(self) -> IoResult<()> {
let mut runtime_handle = self.runtime_handle;
let (done_tx, done_rx) = oneshot::channel();
if self
.client
.client_tx
.send(InProcessClientMessage::Shutdown { done_tx })
.await
.is_ok()
{
let _ = timeout(SHUTDOWN_TIMEOUT, done_rx).await;
}
if let Err(_elapsed) = timeout(SHUTDOWN_TIMEOUT, &mut runtime_handle).await {
runtime_handle.abort();
let _ = runtime_handle.await;
}
Ok(())
}
pub fn sender(&self) -> InProcessClientSender {
self.client.clone()
}
}
/// Starts an in-process app-server runtime and performs initialize handshake.
///
/// This function sends `initialize` followed by `initialized` before returning
/// the handle, so callers receive a ready-to-use runtime. If initialize fails,
/// the runtime is shut down and an `InvalidData` error is returned.
pub async fn start(args: InProcessStartArgs) -> IoResult<InProcessClientHandle> {
let initialize = args.initialize.clone();
let client = start_uninitialized(args);
let initialize_response = client
.request(ClientRequest::Initialize {
request_id: RequestId::Integer(0),
params: initialize,
})
.await?;
if let Err(error) = initialize_response {
let _ = client.shutdown().await;
return Err(IoError::new(
ErrorKind::InvalidData,
format!("in-process initialize failed: {}", error.message),
));
}
client.notify(ClientNotification::Initialized)?;
Ok(client)
}
fn start_uninitialized(args: InProcessStartArgs) -> InProcessClientHandle {
let channel_capacity = args.channel_capacity.max(1);
let (client_tx, mut client_rx) = mpsc::channel::<InProcessClientMessage>(channel_capacity);
let (event_tx, event_rx) = mpsc::channel::<InProcessServerEvent>(channel_capacity);
let runtime_handle = tokio::spawn(async move {
let (outgoing_tx, mut outgoing_rx) = mpsc::channel::<OutgoingEnvelope>(channel_capacity);
let outgoing_message_sender = Arc::new(OutgoingMessageSender::new(outgoing_tx));
let (writer_tx, mut writer_rx) = mpsc::channel::<QueuedOutgoingMessage>(channel_capacity);
let outbound_initialized = Arc::new(AtomicBool::new(false));
let outbound_experimental_api_enabled = Arc::new(AtomicBool::new(false));
let outbound_opted_out_notification_methods = Arc::new(RwLock::new(HashSet::new()));
let mut outbound_connections = HashMap::<ConnectionId, OutboundConnectionState>::new();
outbound_connections.insert(
IN_PROCESS_CONNECTION_ID,
OutboundConnectionState::new(
writer_tx,
Arc::clone(&outbound_initialized),
Arc::clone(&outbound_experimental_api_enabled),
Arc::clone(&outbound_opted_out_notification_methods),
/*disconnect_sender*/ None,
),
);
let mut outbound_handle = tokio::spawn(async move {
while let Some(envelope) = outgoing_rx.recv().await {
route_outgoing_envelope(&mut outbound_connections, envelope).await;
}
});
let processor_outgoing = Arc::clone(&outgoing_message_sender);
let auth_manager =
AuthManager::shared_from_config(args.config.as_ref(), args.enable_codex_api_key_env)
.await;
let config_manager = ConfigManager::new(
args.config.codex_home.to_path_buf(),
args.cli_overrides,
args.loader_overrides,
args.cloud_requirements,
args.arg0_paths.clone(),
args.thread_config_loader,
);
let (processor_tx, mut processor_rx) = mpsc::channel::<ProcessorCommand>(channel_capacity);
let mut processor_handle = tokio::spawn(async move {
let processor = Arc::new(MessageProcessor::new(MessageProcessorArgs {
outgoing: Arc::clone(&processor_outgoing),
arg0_paths: args.arg0_paths,
config: args.config,
config_manager,
environment_manager: args.environment_manager,
feedback: args.feedback,
log_db: args.log_db,
config_warnings: args.config_warnings,
session_source: args.session_source,
auth_manager,
rpc_transport: AppServerRpcTransport::InProcess,
remote_control_handle: None,
plugin_startup_tasks: crate::PluginStartupTasks::Start,
}));
let mut thread_created_rx = processor.thread_created_receiver();
let session = Arc::new(ConnectionSessionState::new(ConnectionOrigin::InProcess));
let mut listen_for_threads = true;
loop {
tokio::select! {
command = processor_rx.recv() => {
match command {
Some(ProcessorCommand::Request(request)) => {
let was_initialized = session.initialized();
processor
.process_client_request(
IN_PROCESS_CONNECTION_ID,
*request,
Arc::clone(&session),
&outbound_initialized,
)
.await;
let opted_out_notification_methods_snapshot =
session.opted_out_notification_methods();
let experimental_api_enabled =
session.experimental_api_enabled();
let is_initialized = session.initialized();
if let Ok(mut opted_out_notification_methods) =
outbound_opted_out_notification_methods.write()
{
*opted_out_notification_methods =
opted_out_notification_methods_snapshot;
} else {
warn!("failed to update outbound opted-out notifications");
}
outbound_experimental_api_enabled.store(
experimental_api_enabled,
Ordering::Release,
);
if !was_initialized && is_initialized {
processor.send_initialize_notifications().await;
}
}
Some(ProcessorCommand::Notification(notification)) => {
processor.process_client_notification(notification).await;
}
None => {
break;
}
}
}
created = thread_created_rx.recv(), if listen_for_threads => {
match created {
Ok(thread_id) => {
let connection_ids = if session.initialized() {
vec![IN_PROCESS_CONNECTION_ID]
} else {
Vec::<ConnectionId>::new()
};
processor
.try_attach_thread_listener(thread_id, connection_ids)
.await;
}
Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => {
warn!("thread_created receiver lagged; skipping resync");
}
Err(tokio::sync::broadcast::error::RecvError::Closed) => {
listen_for_threads = false;
}
}
}
}
}
processor.clear_runtime_references();
processor.cancel_active_login().await;
processor
.connection_closed(IN_PROCESS_CONNECTION_ID, &session)
.await;
processor.clear_all_thread_listeners().await;
processor.drain_background_tasks().await;
processor.shutdown_threads().await;
});
let mut pending_request_responses =
HashMap::<RequestId, oneshot::Sender<PendingClientRequestResponse>>::new();
let mut shutdown_ack = None;
loop {
tokio::select! {
message = client_rx.recv() => {
match message {
Some(InProcessClientMessage::Request { request, response_tx }) => {
let request = *request;
let request_id = request.id().clone();
match pending_request_responses.entry(request_id.clone()) {
Entry::Vacant(entry) => {
entry.insert(response_tx);
}
Entry::Occupied(_) => {
let _ = response_tx.send(Err(JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: format!("duplicate request id: {request_id:?}"),
data: None,
}));
continue;
}
}
match processor_tx.try_send(ProcessorCommand::Request(Box::new(request))) {
Ok(()) => {}
Err(mpsc::error::TrySendError::Full(_)) => {
if let Some(response_tx) =
pending_request_responses.remove(&request_id)
{
let _ = response_tx.send(Err(JSONRPCErrorError {
code: OVERLOADED_ERROR_CODE,
message: "in-process app-server request queue is full"
.to_string(),
data: None,
}));
}
}
Err(mpsc::error::TrySendError::Closed(_)) => {
if let Some(response_tx) =
pending_request_responses.remove(&request_id)
{
let _ = response_tx.send(Err(JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message:
"in-process app-server request processor is closed"
.to_string(),
data: None,
}));
}
break;
}
}
}
Some(InProcessClientMessage::Notification { notification }) => {
match processor_tx.try_send(ProcessorCommand::Notification(notification)) {
Ok(()) => {}
Err(mpsc::error::TrySendError::Full(_)) => {
warn!("dropping in-process client notification (queue full)");
}
Err(mpsc::error::TrySendError::Closed(_)) => {
break;
}
}
}
Some(InProcessClientMessage::ServerRequestResponse { request_id, result }) => {
outgoing_message_sender
.notify_client_response(request_id, result)
.await;
}
Some(InProcessClientMessage::ServerRequestError { request_id, error }) => {
outgoing_message_sender
.notify_client_error(request_id, error)
.await;
}
Some(InProcessClientMessage::Shutdown { done_tx }) => {
shutdown_ack = Some(done_tx);
break;
}
None => {
break;
}
}
}
queued_message = writer_rx.recv() => {
let Some(queued_message) = queued_message else {
break;
};
let outgoing_message = queued_message.message;
match outgoing_message {
OutgoingMessage::Response(response) => {
if let Some(response_tx) = pending_request_responses.remove(&response.id) {
let _ = response_tx.send(Ok(response.result));
} else {
warn!(
request_id = ?response.id,
"dropping unmatched in-process response"
);
}
}
OutgoingMessage::Error(error) => {
if let Some(response_tx) = pending_request_responses.remove(&error.id) {
let _ = response_tx.send(Err(error.error));
} else {
warn!(
request_id = ?error.id,
"dropping unmatched in-process error response"
);
}
}
OutgoingMessage::Request(request) => {
// Send directly to avoid cloning; on failure the
// original value is returned inside the error.
if let Err(send_error) = event_tx
.try_send(InProcessServerEvent::ServerRequest(request))
{
let (code, message, inner) = match send_error {
mpsc::error::TrySendError::Full(inner) => (
OVERLOADED_ERROR_CODE,
"in-process server request queue is full",
inner,
),
mpsc::error::TrySendError::Closed(inner) => (
INTERNAL_ERROR_CODE,
"in-process server request consumer is closed",
inner,
),
};
let request_id = match inner {
InProcessServerEvent::ServerRequest(req) => req.id().clone(),
_ => unreachable!("we just sent a ServerRequest variant"),
};
outgoing_message_sender
.notify_client_error(
request_id,
JSONRPCErrorError {
code,
message: message.to_string(),
data: None,
},
)
.await;
}
}
OutgoingMessage::AppServerNotification(notification) => {
if server_notification_requires_delivery(&notification) {
if event_tx
.send(InProcessServerEvent::ServerNotification(notification))
.await
.is_err()
{
break;
}
} else if let Err(send_error) =
event_tx.try_send(InProcessServerEvent::ServerNotification(notification))
{
match send_error {
mpsc::error::TrySendError::Full(_) => {
warn!("dropping in-process server notification (queue full)");
}
mpsc::error::TrySendError::Closed(_) => {
break;
}
}
}
}
}
if let Some(write_complete_tx) = queued_message.write_complete_tx {
let _ = write_complete_tx.send(());
}
}
}
}
drop(writer_rx);
drop(processor_tx);
outgoing_message_sender
.cancel_all_requests(Some(JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message: "in-process app-server runtime is shutting down".to_string(),
data: None,
}))
.await;
// Drop the runtime's last sender before awaiting the router task so
// `outgoing_rx.recv()` can observe channel closure and exit cleanly.
drop(outgoing_message_sender);
for (_, response_tx) in pending_request_responses {
let _ = response_tx.send(Err(JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message: "in-process app-server runtime is shutting down".to_string(),
data: None,
}));
}
if let Err(_elapsed) = timeout(SHUTDOWN_TIMEOUT, &mut processor_handle).await {
processor_handle.abort();
let _ = processor_handle.await;
}
if let Err(_elapsed) = timeout(SHUTDOWN_TIMEOUT, &mut outbound_handle).await {
outbound_handle.abort();
let _ = outbound_handle.await;
}
if let Some(done_tx) = shutdown_ack {
let _ = done_tx.send(());
}
});
InProcessClientHandle {
client: InProcessClientSender { client_tx },
event_rx,
runtime_handle,
}
}
#[cfg(test)]
mod tests {
use super::*;
use codex_app_server_protocol::ClientInfo;
use codex_app_server_protocol::ConfigRequirementsReadResponse;
use codex_app_server_protocol::DeviceKeyPublicParams;
use codex_app_server_protocol::DeviceKeySignParams;
use codex_app_server_protocol::DeviceKeySignPayload;
use codex_app_server_protocol::RemoteControlClientConnectionAudience;
use codex_app_server_protocol::RemoteControlClientEnrollmentAudience;
use codex_app_server_protocol::SessionSource as ApiSessionSource;
use codex_app_server_protocol::ThreadStartParams;
use codex_app_server_protocol::ThreadStartResponse;
use codex_app_server_protocol::Turn;
use codex_app_server_protocol::TurnCompletedNotification;
use codex_app_server_protocol::TurnStatus;
use codex_core::config::ConfigBuilder;
use pretty_assertions::assert_eq;
async fn build_test_config() -> Config {
match ConfigBuilder::default().build().await {
Ok(config) => config,
Err(_) => Config::load_default_with_cli_overrides(Vec::new())
.await
.expect("default config should load"),
}
}
async fn start_test_client_with_capacity(
session_source: SessionSource,
channel_capacity: usize,
) -> InProcessClientHandle {
let args = InProcessStartArgs {
arg0_paths: Arg0DispatchPaths::default(),
config: Arc::new(build_test_config().await),
cli_overrides: Vec::new(),
loader_overrides: LoaderOverrides::default(),
cloud_requirements: CloudRequirementsLoader::default(),
thread_config_loader: Arc::new(codex_config::NoopThreadConfigLoader),
feedback: CodexFeedback::new(),
log_db: None,
environment_manager: Arc::new(EnvironmentManager::default_for_tests()),
config_warnings: Vec::new(),
session_source,
enable_codex_api_key_env: false,
initialize: InitializeParams {
client_info: ClientInfo {
name: "codex-in-process-test".to_string(),
title: None,
version: "0.0.0".to_string(),
},
capabilities: None,
},
channel_capacity,
};
start(args).await.expect("in-process runtime should start")
}
async fn start_test_client(session_source: SessionSource) -> InProcessClientHandle {
start_test_client_with_capacity(session_source, DEFAULT_IN_PROCESS_CHANNEL_CAPACITY).await
}
#[tokio::test]
async fn in_process_start_initializes_and_handles_typed_v2_request() {
let client = start_test_client(SessionSource::Cli).await;
let response = client
.request(ClientRequest::ConfigRequirementsRead {
request_id: RequestId::Integer(1),
params: None,
})
.await
.expect("request transport should work")
.expect("request should succeed");
assert!(response.is_object());
let _parsed: ConfigRequirementsReadResponse =
serde_json::from_value(response).expect("response should match v2 schema");
client
.shutdown()
.await
.expect("in-process runtime should shutdown cleanly");
}
#[tokio::test]
async fn in_process_allows_device_key_requests_to_reach_device_key_api() {
let client = start_test_client(SessionSource::Cli).await;
const MALFORMED_KEY_ID_MESSAGE: &str = concat!(
"invalid device key payload: keyId must be dk_hse_, dk_tpm_, or dk_osn_ ",
"followed by unpadded base64url-encoded 32 bytes"
);
let requests = [
(
ClientRequest::DeviceKeyPublic {
request_id: RequestId::Integer(11),
params: DeviceKeyPublicParams {
key_id: String::new(),
},
},
MALFORMED_KEY_ID_MESSAGE,
),
(
ClientRequest::DeviceKeySign {
request_id: RequestId::Integer(12),
params: DeviceKeySignParams {
key_id: String::new(),
payload: DeviceKeySignPayload::RemoteControlClientConnection {
nonce: "nonce-123".to_string(),
audience:
RemoteControlClientConnectionAudience::RemoteControlClientWebsocket,
session_id: "wssess_123".to_string(),
target_origin: "https://chatgpt.com".to_string(),
target_path: "/api/codex/remote/control/client".to_string(),
account_user_id: "acct_123".to_string(),
client_id: "cli_123".to_string(),
token_expires_at: 4_102_444_800,
token_sha256_base64url: "47DEQpj8HBSa-_TImW-5JCeuQeRkm5NMpJWZG3hSuFU"
.to_string(),
scopes: vec!["remote_control_controller_websocket".to_string()],
},
},
},
MALFORMED_KEY_ID_MESSAGE,
),
(
ClientRequest::DeviceKeySign {
request_id: RequestId::Integer(13),
params: DeviceKeySignParams {
key_id: String::new(),
payload: DeviceKeySignPayload::RemoteControlClientEnrollment {
nonce: "nonce-123".to_string(),
audience:
RemoteControlClientEnrollmentAudience::RemoteControlClientEnrollment,
challenge_id: "rch_123".to_string(),
target_origin: "https://chatgpt.com".to_string(),
target_path: "/wham/remote/control/client/enroll".to_string(),
account_user_id: "acct_123".to_string(),
client_id: "cli_123".to_string(),
device_identity_sha256_base64url:
"47DEQpj8HBSa-_TImW-5JCeuQeRkm5NMpJWZG3hSuFU".to_string(),
challenge_expires_at: 4_102_444_800,
},
},
},
MALFORMED_KEY_ID_MESSAGE,
),
];
for (request, expected_message) in requests {
let error = client
.request(request)
.await
.expect("request transport should work")
.expect_err("request should be rejected");
assert_eq!(error.code, INVALID_REQUEST_ERROR_CODE);
assert_eq!(error.message, expected_message);
}
client
.shutdown()
.await
.expect("in-process runtime should shutdown cleanly");
}
#[tokio::test]
async fn in_process_start_uses_requested_session_source_for_thread_start() {
for (requested_source, expected_source) in [
(SessionSource::Cli, ApiSessionSource::Cli),
(SessionSource::Exec, ApiSessionSource::Exec),
] {
let client = start_test_client(requested_source).await;
let response = client
.request(ClientRequest::ThreadStart {
request_id: RequestId::Integer(2),
params: ThreadStartParams {
ephemeral: Some(true),
..ThreadStartParams::default()
},
})
.await
.expect("request transport should work")
.expect("thread/start should succeed");
let parsed: ThreadStartResponse =
serde_json::from_value(response).expect("thread/start response should parse");
assert_eq!(parsed.thread.source, expected_source);
client
.shutdown()
.await
.expect("in-process runtime should shutdown cleanly");
}
}
#[tokio::test]
async fn in_process_start_clamps_zero_channel_capacity() {
let client =
start_test_client_with_capacity(SessionSource::Cli, /*channel_capacity*/ 0).await;
let response = loop {
match client
.request(ClientRequest::ConfigRequirementsRead {
request_id: RequestId::Integer(4),
params: None,
})
.await
{
Ok(response) => break response.expect("request should succeed"),
Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => {
tokio::task::yield_now().await;
}
Err(err) => panic!("request transport should work: {err}"),
}
};
let _parsed: ConfigRequirementsReadResponse =
serde_json::from_value(response).expect("response should match v2 schema");
client
.shutdown()
.await
.expect("in-process runtime should shutdown cleanly");
}
#[test]
fn guaranteed_delivery_helpers_cover_terminal_server_notifications() {
assert!(server_notification_requires_delivery(
&ServerNotification::TurnCompleted(TurnCompletedNotification {
thread_id: "thread-1".to_string(),
turn: Turn {
id: "turn-1".to_string(),
items: Vec::new(),
status: TurnStatus::Completed,
error: None,
started_at: None,
completed_at: Some(0),
duration_ms: None,
},
})
));
}
}