Files
codex/codex-rs/app-server/src/message_processor.rs
2026-05-12 16:50:48 -07:00

1323 lines
52 KiB
Rust

use std::collections::HashSet;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::sync::OnceLock;
use std::sync::atomic::AtomicBool;
use crate::attestation::app_server_attestation_provider;
use crate::config_manager::ConfigManager;
use crate::connection_rpc_gate::ConnectionRpcGate;
use crate::error_code::invalid_request;
use crate::extensions::guardian_agent_spawner;
use crate::extensions::thread_extensions;
use crate::fs_watch::FsWatchManager;
use crate::outgoing_message::ConnectionId;
use crate::outgoing_message::ConnectionRequestId;
use crate::outgoing_message::OutgoingMessageSender;
use crate::outgoing_message::RequestContext;
use crate::request_processors::AccountRequestProcessor;
use crate::request_processors::AppsRequestProcessor;
use crate::request_processors::CatalogRequestProcessor;
use crate::request_processors::CommandExecRequestProcessor;
use crate::request_processors::ConfigRequestProcessor;
use crate::request_processors::EnvironmentRequestProcessor;
use crate::request_processors::ExternalAgentConfigRequestProcessor;
use crate::request_processors::FeedbackRequestProcessor;
use crate::request_processors::FsRequestProcessor;
use crate::request_processors::GitRequestProcessor;
use crate::request_processors::InitializeRequestProcessor;
use crate::request_processors::MarketplaceRequestProcessor;
use crate::request_processors::McpRequestProcessor;
use crate::request_processors::PluginRequestProcessor;
use crate::request_processors::ProcessExecRequestProcessor;
use crate::request_processors::SearchRequestProcessor;
use crate::request_processors::ThreadGoalRequestProcessor;
use crate::request_processors::ThreadRequestProcessor;
use crate::request_processors::TurnRequestProcessor;
use crate::request_processors::WindowsSandboxRequestProcessor;
use crate::request_serialization::QueuedInitializedRequest;
use crate::request_serialization::RequestSerializationQueueKey;
use crate::request_serialization::RequestSerializationQueues;
use crate::skills_watcher::SkillsWatcher;
use crate::thread_state::ConnectionCapabilities;
use crate::thread_state::ThreadStateManager;
use crate::transport::AppServerTransport;
use crate::transport::RemoteControlHandle;
use async_trait::async_trait;
use codex_analytics::AnalyticsEventsClient;
use codex_analytics::AppServerRpcTransport;
use codex_app_server_protocol::AuthMode as LoginAuthMode;
use codex_app_server_protocol::ChatgptAuthTokensRefreshParams;
use codex_app_server_protocol::ChatgptAuthTokensRefreshReason;
use codex_app_server_protocol::ChatgptAuthTokensRefreshResponse;
use codex_app_server_protocol::ClientNotification;
use codex_app_server_protocol::ClientRequest;
use codex_app_server_protocol::ClientResponsePayload;
use codex_app_server_protocol::ConfigWarningNotification;
use codex_app_server_protocol::ExperimentalApi;
use codex_app_server_protocol::JSONRPCError;
use codex_app_server_protocol::JSONRPCErrorError;
use codex_app_server_protocol::JSONRPCNotification;
use codex_app_server_protocol::JSONRPCRequest;
use codex_app_server_protocol::JSONRPCResponse;
use codex_app_server_protocol::ServerRequestPayload;
use codex_app_server_protocol::experimental_required_message;
use codex_arg0::Arg0DispatchPaths;
use codex_chatgpt::workspace_settings;
use codex_core::ThreadManager;
use codex_core::config::Config;
use codex_exec_server::EnvironmentManager;
use codex_feedback::CodexFeedback;
use codex_login::AuthManager;
use codex_login::auth::ExternalAuth;
use codex_login::auth::ExternalAuthRefreshContext;
use codex_login::auth::ExternalAuthRefreshReason;
use codex_login::auth::ExternalAuthTokens;
use codex_protocol::ThreadId;
use codex_protocol::protocol::SessionSource;
use codex_protocol::protocol::W3cTraceContext;
use codex_rollout::StateDbHandle;
use codex_state::log_db::LogDbLayer;
use tokio::sync::Mutex;
use tokio::sync::Semaphore;
use tokio::sync::broadcast;
use tokio::sync::watch;
use tokio::time::Duration;
use tokio::time::timeout;
use tracing::Instrument;
const EXTERNAL_AUTH_REFRESH_TIMEOUT: Duration = Duration::from_secs(10);
#[derive(Clone)]
struct ExternalAuthRefreshBridge {
outgoing: Arc<OutgoingMessageSender>,
}
impl ExternalAuthRefreshBridge {
fn map_reason(reason: ExternalAuthRefreshReason) -> ChatgptAuthTokensRefreshReason {
match reason {
ExternalAuthRefreshReason::Unauthorized => ChatgptAuthTokensRefreshReason::Unauthorized,
}
}
}
#[async_trait]
impl ExternalAuth for ExternalAuthRefreshBridge {
fn auth_mode(&self) -> LoginAuthMode {
LoginAuthMode::Chatgpt
}
async fn refresh(
&self,
context: ExternalAuthRefreshContext,
) -> std::io::Result<ExternalAuthTokens> {
let params = ChatgptAuthTokensRefreshParams {
reason: Self::map_reason(context.reason),
previous_account_id: context.previous_account_id,
};
let (request_id, rx) = self
.outgoing
.send_request(ServerRequestPayload::ChatgptAuthTokensRefresh(params))
.await;
let result = match timeout(EXTERNAL_AUTH_REFRESH_TIMEOUT, rx).await {
Ok(result) => {
// Two failure scenarios:
// 1) `oneshot::Receiver` failed (sender dropped) => request canceled/channel closed.
// 2) client answered with JSON-RPC error payload => propagate code/message.
let result = result.map_err(|err| {
std::io::Error::other(format!("auth refresh request canceled: {err}"))
})?;
result.map_err(|err| {
std::io::Error::other(format!(
"auth refresh request failed: code={} message={}",
err.code, err.message
))
})?
}
Err(_) => {
let _canceled = self.outgoing.cancel_request(&request_id).await;
return Err(std::io::Error::other(format!(
"auth refresh request timed out after {}s",
EXTERNAL_AUTH_REFRESH_TIMEOUT.as_secs()
)));
}
};
let response: ChatgptAuthTokensRefreshResponse =
serde_json::from_value(result).map_err(std::io::Error::other)?;
Ok(ExternalAuthTokens::chatgpt(
response.access_token,
response.chatgpt_account_id,
response.chatgpt_plan_type,
))
}
}
pub(crate) struct MessageProcessor {
outgoing: Arc<OutgoingMessageSender>,
account_processor: AccountRequestProcessor,
apps_processor: AppsRequestProcessor,
catalog_processor: CatalogRequestProcessor,
command_exec_processor: CommandExecRequestProcessor,
process_exec_processor: ProcessExecRequestProcessor,
config_processor: ConfigRequestProcessor,
environment_processor: EnvironmentRequestProcessor,
external_agent_config_processor: ExternalAgentConfigRequestProcessor,
feedback_processor: FeedbackRequestProcessor,
fs_processor: FsRequestProcessor,
git_processor: GitRequestProcessor,
initialize_processor: InitializeRequestProcessor,
marketplace_processor: MarketplaceRequestProcessor,
mcp_processor: McpRequestProcessor,
plugin_processor: PluginRequestProcessor,
search_processor: SearchRequestProcessor,
thread_goal_processor: ThreadGoalRequestProcessor,
thread_processor: ThreadRequestProcessor,
turn_processor: TurnRequestProcessor,
windows_sandbox_processor: WindowsSandboxRequestProcessor,
request_serialization_queues: RequestSerializationQueues,
}
#[derive(Debug)]
pub(crate) struct ConnectionSessionState {
pub(crate) rpc_gate: Arc<ConnectionRpcGate>,
initialized: OnceLock<InitializedConnectionSessionState>,
}
#[derive(Debug)]
pub(crate) struct InitializedConnectionSessionState {
pub(crate) experimental_api_enabled: bool,
pub(crate) opted_out_notification_methods: HashSet<String>,
pub(crate) app_server_client_name: String,
pub(crate) client_version: String,
pub(crate) request_attestation: bool,
}
impl Default for ConnectionSessionState {
fn default() -> Self {
Self::new()
}
}
impl ConnectionSessionState {
pub(crate) fn new() -> Self {
Self {
rpc_gate: Arc::new(ConnectionRpcGate::new()),
initialized: OnceLock::new(),
}
}
pub(crate) fn initialized(&self) -> bool {
self.initialized.get().is_some()
}
pub(crate) fn experimental_api_enabled(&self) -> bool {
self.initialized
.get()
.is_some_and(|session| session.experimental_api_enabled)
}
pub(crate) fn opted_out_notification_methods(&self) -> HashSet<String> {
self.initialized
.get()
.map(|session| session.opted_out_notification_methods.clone())
.unwrap_or_default()
}
pub(crate) fn app_server_client_name(&self) -> Option<&str> {
self.initialized
.get()
.map(|session| session.app_server_client_name.as_str())
}
pub(crate) fn client_version(&self) -> Option<&str> {
self.initialized
.get()
.map(|session| session.client_version.as_str())
}
pub(crate) fn request_attestation(&self) -> bool {
self.initialized
.get()
.is_some_and(|session| session.request_attestation)
}
pub(crate) fn initialize(&self, session: InitializedConnectionSessionState) -> Result<(), ()> {
self.initialized.set(session).map_err(|_| ())
}
}
pub(crate) struct MessageProcessorArgs {
pub(crate) outgoing: Arc<OutgoingMessageSender>,
pub(crate) analytics_events_client: AnalyticsEventsClient,
pub(crate) arg0_paths: Arg0DispatchPaths,
pub(crate) config: Arc<Config>,
pub(crate) config_manager: ConfigManager,
pub(crate) environment_manager: Arc<EnvironmentManager>,
pub(crate) feedback: CodexFeedback,
pub(crate) log_db: Option<LogDbLayer>,
pub(crate) state_db: Option<StateDbHandle>,
pub(crate) config_warnings: Vec<ConfigWarningNotification>,
pub(crate) session_source: SessionSource,
pub(crate) auth_manager: Arc<AuthManager>,
pub(crate) installation_id: String,
pub(crate) rpc_transport: AppServerRpcTransport,
pub(crate) remote_control_handle: Option<RemoteControlHandle>,
pub(crate) plugin_startup_tasks: crate::PluginStartupTasks,
}
impl MessageProcessor {
/// Create a new `MessageProcessor`, retaining a handle to the outgoing
/// `Sender` so handlers can enqueue messages to be written to stdout.
pub(crate) fn new(args: MessageProcessorArgs) -> Self {
let MessageProcessorArgs {
outgoing,
analytics_events_client,
arg0_paths,
config,
config_manager,
environment_manager,
feedback,
log_db,
state_db,
config_warnings,
session_source,
auth_manager,
installation_id,
rpc_transport,
remote_control_handle,
plugin_startup_tasks,
} = args;
auth_manager.set_external_auth(Arc::new(ExternalAuthRefreshBridge {
outgoing: outgoing.clone(),
}));
let thread_state_manager = ThreadStateManager::new();
// The thread store is intentionally process-scoped. Config reloads can
// affect per-thread behavior, but they must not move newly started,
// resumed, or forked threads to a different persistence backend/root.
let thread_store = codex_core::thread_store_from_config(config.as_ref(), state_db.clone());
let thread_manager = Arc::new_cyclic(|thread_manager| {
ThreadManager::new(
config.as_ref(),
auth_manager.clone(),
session_source,
environment_manager,
thread_extensions(guardian_agent_spawner(thread_manager.clone())),
Some(analytics_events_client.clone()),
Arc::clone(&thread_store),
state_db.clone(),
installation_id,
Some(app_server_attestation_provider(
outgoing.clone(),
thread_state_manager.clone(),
)),
)
});
thread_manager
.plugins_manager()
.set_analytics_events_client(analytics_events_client.clone());
let skills_watcher = SkillsWatcher::new(thread_manager.skills_manager(), outgoing.clone());
let pending_thread_unloads = Arc::new(Mutex::new(HashSet::new()));
let thread_watch_manager =
crate::thread_status::ThreadWatchManager::new_with_outgoing(outgoing.clone());
let thread_list_state_permit = Arc::new(Semaphore::new(/*permits*/ 1));
let workspace_settings_cache =
Arc::new(workspace_settings::WorkspaceSettingsCache::default());
let account_processor = AccountRequestProcessor::new(
auth_manager.clone(),
Arc::clone(&thread_manager),
outgoing.clone(),
Arc::clone(&config),
config_manager.clone(),
);
let apps_processor = AppsRequestProcessor::new(
auth_manager.clone(),
Arc::clone(&thread_manager),
outgoing.clone(),
config_manager.clone(),
Arc::clone(&workspace_settings_cache),
);
let catalog_processor = CatalogRequestProcessor::new(
auth_manager.clone(),
Arc::clone(&thread_manager),
Arc::clone(&config),
config_manager.clone(),
Arc::clone(&workspace_settings_cache),
);
let command_exec_processor = CommandExecRequestProcessor::new(
arg0_paths.clone(),
Arc::clone(&config),
outgoing.clone(),
);
let process_exec_processor = ProcessExecRequestProcessor::new(outgoing.clone());
let feedback_processor = FeedbackRequestProcessor::new(
auth_manager.clone(),
Arc::clone(&thread_manager),
Arc::clone(&config),
feedback,
log_db,
state_db.clone(),
);
let git_processor = GitRequestProcessor::new();
let initialize_processor = InitializeRequestProcessor::new(
outgoing.clone(),
analytics_events_client.clone(),
Arc::clone(&config),
config_warnings,
rpc_transport,
);
let marketplace_processor = MarketplaceRequestProcessor::new(
Arc::clone(&config),
config_manager.clone(),
Arc::clone(&thread_manager),
);
let mcp_processor = McpRequestProcessor::new(
auth_manager.clone(),
Arc::clone(&thread_manager),
outgoing.clone(),
config_manager.clone(),
);
let plugin_processor = PluginRequestProcessor::new(
auth_manager.clone(),
Arc::clone(&thread_manager),
outgoing.clone(),
analytics_events_client.clone(),
config_manager.clone(),
workspace_settings_cache,
);
let search_processor = SearchRequestProcessor::new(outgoing.clone());
let thread_goal_processor = ThreadGoalRequestProcessor::new(
Arc::clone(&thread_manager),
outgoing.clone(),
Arc::clone(&config),
thread_state_manager.clone(),
state_db.clone(),
);
let thread_processor = ThreadRequestProcessor::new(
auth_manager.clone(),
Arc::clone(&thread_manager),
outgoing.clone(),
arg0_paths.clone(),
Arc::clone(&config),
config_manager.clone(),
Arc::clone(&thread_store),
Arc::clone(&pending_thread_unloads),
thread_state_manager.clone(),
thread_watch_manager.clone(),
Arc::clone(&thread_list_state_permit),
thread_goal_processor.clone(),
state_db,
Arc::clone(&skills_watcher),
);
let turn_processor = TurnRequestProcessor::new(
auth_manager.clone(),
Arc::clone(&thread_manager),
outgoing.clone(),
analytics_events_client.clone(),
arg0_paths.clone(),
Arc::clone(&config),
config_manager.clone(),
pending_thread_unloads,
thread_state_manager,
thread_watch_manager,
thread_list_state_permit,
Arc::clone(&skills_watcher),
);
if matches!(plugin_startup_tasks, crate::PluginStartupTasks::Start) {
// Keep plugin startup warmups aligned at app-server startup.
let on_effective_plugins_changed =
plugin_processor.effective_plugins_changed_callback();
thread_manager
.plugins_manager()
.maybe_start_plugin_startup_tasks_for_config(
&config.plugins_config_input(),
auth_manager.clone(),
Some(on_effective_plugins_changed),
);
}
let fs_watch_manager = FsWatchManager::new(outgoing.clone());
let config_processor = ConfigRequestProcessor::new(
outgoing.clone(),
config_manager.clone(),
auth_manager,
thread_manager.clone(),
analytics_events_client,
remote_control_handle,
);
let external_agent_config_processor = ExternalAgentConfigRequestProcessor::new(
outgoing.clone(),
Arc::clone(&thread_manager),
config_manager.clone(),
config_processor.clone(),
arg0_paths,
config.codex_home.to_path_buf(),
);
let environment_processor =
EnvironmentRequestProcessor::new(thread_manager.environment_manager());
let fs_processor = FsRequestProcessor::new(
thread_manager
.environment_manager()
.local_environment()
.get_filesystem(),
fs_watch_manager,
);
let windows_sandbox_processor = WindowsSandboxRequestProcessor::new(
outgoing.clone(),
Arc::clone(&config),
config_manager,
);
Self {
outgoing,
account_processor,
apps_processor,
catalog_processor,
command_exec_processor,
process_exec_processor,
config_processor,
environment_processor,
external_agent_config_processor,
feedback_processor,
fs_processor,
git_processor,
initialize_processor,
marketplace_processor,
mcp_processor,
plugin_processor,
search_processor,
thread_goal_processor,
thread_processor,
turn_processor,
windows_sandbox_processor,
request_serialization_queues: RequestSerializationQueues::default(),
}
}
pub(crate) fn clear_runtime_references(&self) {
self.account_processor.clear_external_auth();
}
pub(crate) async fn process_request(
self: &Arc<Self>,
connection_id: ConnectionId,
request: JSONRPCRequest,
transport: &AppServerTransport,
session: Arc<ConnectionSessionState>,
) {
let request_method = request.method.as_str();
tracing::trace!(
?connection_id,
request_id = ?request.id,
"app-server request: {request_method}"
);
let request_id = ConnectionRequestId {
connection_id,
request_id: request.id.clone(),
};
let request_span =
crate::app_server_tracing::request_span(&request, transport, connection_id, &session);
let request_trace = request.trace.as_ref().map(|trace| W3cTraceContext {
traceparent: trace.traceparent.clone(),
tracestate: trace.tracestate.clone(),
});
let request_context = RequestContext::new(request_id.clone(), request_span, request_trace);
Self::run_request_with_context(
Arc::clone(&self.outgoing),
request_context.clone(),
Box::pin(async {
let codex_request = serde_json::to_value(&request)
.map_err(|err| invalid_request(format!("Invalid request: {err}")))
.and_then(|request_json| {
serde_json::from_value::<ClientRequest>(request_json)
.map_err(|err| invalid_request(format!("Invalid request: {err}")))
});
let result = match codex_request {
Ok(codex_request) => {
// Websocket callers finalize outbound readiness in lib.rs after mirroring
// session state into outbound state and sending initialize notifications to
// this specific connection. Passing `None` avoids marking the connection
// ready too early from inside the shared request handler.
Box::pin(self.handle_client_request(
request_id.clone(),
codex_request,
Arc::clone(&session),
/*outbound_initialized*/ None,
request_context.clone(),
))
.await
}
Err(error) => Err(error),
};
if let Err(error) = result {
self.outgoing.send_error(request_id.clone(), error).await;
}
}),
)
.await;
}
/// Handles a typed request path used by in-process embedders.
///
/// This bypasses JSON request deserialization but keeps identical request
/// semantics by delegating to `handle_client_request`.
pub(crate) async fn process_client_request(
self: &Arc<Self>,
connection_id: ConnectionId,
request: Box<ClientRequest>,
session: Arc<ConnectionSessionState>,
outbound_initialized: &AtomicBool,
) {
let request_id = ConnectionRequestId {
connection_id,
request_id: request.id().clone(),
};
let request_span = crate::app_server_tracing::typed_request_span(
request.as_ref(),
connection_id,
&session,
);
let request_context =
RequestContext::new(request_id.clone(), request_span, /*parent_trace*/ None);
tracing::trace!(
?connection_id,
request_id = ?request_id.request_id,
"app-server typed request"
);
Self::run_request_with_context(
Arc::clone(&self.outgoing),
request_context.clone(),
Box::pin(async {
// In-process clients do not have the websocket transport loop that performs
// post-initialize bookkeeping, so they still finalize outbound readiness in
// the shared request handler.
let result = Box::pin(self.handle_client_request(
request_id.clone(),
*request,
Arc::clone(&session),
Some(outbound_initialized),
request_context.clone(),
))
.await;
if let Err(error) = result {
self.outgoing.send_error(request_id.clone(), error).await;
}
}),
)
.await;
}
pub(crate) async fn process_notification(&self, notification: JSONRPCNotification) {
// Currently, we do not expect to receive any notifications from the
// client, so we just log them.
tracing::info!("<- notification: {:?}", notification);
}
/// Handles typed notifications from in-process clients.
pub(crate) async fn process_client_notification(&self, notification: ClientNotification) {
// Currently, we do not expect to receive any typed notifications from
// in-process clients, so we just log them.
tracing::info!("<- typed notification: {:?}", notification);
}
async fn run_request_with_context(
outgoing: Arc<OutgoingMessageSender>,
request_context: RequestContext,
request_fut: Pin<Box<dyn Future<Output = ()> + Send + '_>>,
) {
outgoing
.register_request_context(request_context.clone())
.await;
request_fut.instrument(request_context.span()).await;
}
pub(crate) fn thread_created_receiver(&self) -> broadcast::Receiver<ThreadId> {
self.thread_processor.thread_created_receiver()
}
pub(crate) async fn send_initialize_notifications_to_connection(
&self,
connection_id: ConnectionId,
) {
self.initialize_processor
.send_initialize_notifications_to_connection(connection_id)
.await;
}
pub(crate) async fn connection_initialized(
&self,
connection_id: ConnectionId,
request_attestation: bool,
) {
self.thread_processor
.connection_initialized(
connection_id,
ConnectionCapabilities {
request_attestation,
},
)
.await;
}
pub(crate) async fn send_initialize_notifications(&self) {
self.initialize_processor
.send_initialize_notifications()
.await;
}
pub(crate) async fn try_attach_thread_listener(
&self,
thread_id: ThreadId,
connection_ids: Vec<ConnectionId>,
) {
self.thread_processor
.try_attach_thread_listener(thread_id, connection_ids)
.await;
}
pub(crate) async fn drain_background_tasks(&self) {
self.thread_processor.drain_background_tasks().await;
}
pub(crate) async fn cancel_active_login(&self) {
self.account_processor.cancel_active_login().await;
}
pub(crate) async fn clear_all_thread_listeners(&self) {
self.thread_processor.clear_all_thread_listeners().await;
}
pub(crate) async fn shutdown_threads(&self) {
self.thread_processor.shutdown_threads().await;
}
pub(crate) async fn connection_closed(
&self,
connection_id: ConnectionId,
session_state: &ConnectionSessionState,
) {
session_state.rpc_gate.shutdown().await;
self.outgoing.connection_closed(connection_id).await;
self.fs_processor.connection_closed(connection_id).await;
self.command_exec_processor
.connection_closed(connection_id)
.await;
self.process_exec_processor
.connection_closed(connection_id)
.await;
self.thread_processor.connection_closed(connection_id).await;
}
pub(crate) fn subscribe_running_assistant_turn_count(&self) -> watch::Receiver<usize> {
self.thread_processor
.subscribe_running_assistant_turn_count()
}
/// Handle a standalone JSON-RPC response originating from the peer.
pub(crate) async fn process_response(&self, response: JSONRPCResponse) {
tracing::info!("<- response: {:?}", response);
let JSONRPCResponse { id, result, .. } = response;
self.outgoing.notify_client_response(id, result).await
}
/// Handle an error object received from the peer.
pub(crate) async fn process_error(&self, err: JSONRPCError) {
tracing::error!("<- error: {:?}", err);
self.outgoing.notify_client_error(err.id, err.error).await;
}
async fn handle_client_request(
self: &Arc<Self>,
connection_request_id: ConnectionRequestId,
codex_request: ClientRequest,
session: Arc<ConnectionSessionState>,
// `Some(...)` means the caller wants initialize to immediately mark the
// connection outbound-ready. Websocket JSON-RPC calls pass `None` so
// lib.rs can deliver connection-scoped initialize notifications first.
outbound_initialized: Option<&AtomicBool>,
request_context: RequestContext,
) -> Result<(), JSONRPCErrorError> {
let connection_id = connection_request_id.connection_id;
if let ClientRequest::Initialize { request_id, params } = codex_request {
let connection_initialized = self
.initialize_processor
.initialize(
connection_id,
request_id,
params,
&session,
outbound_initialized,
)
.await?;
if connection_initialized {
self.thread_processor
.connection_initialized(
connection_id,
ConnectionCapabilities {
request_attestation: session.request_attestation(),
},
)
.await;
}
return Ok(());
}
Box::pin(self.dispatch_initialized_client_request(
connection_request_id,
codex_request,
session,
request_context,
))
.await
}
async fn dispatch_initialized_client_request(
self: &Arc<Self>,
connection_request_id: ConnectionRequestId,
codex_request: ClientRequest,
session: Arc<ConnectionSessionState>,
request_context: RequestContext,
) -> Result<(), JSONRPCErrorError> {
if !session.initialized() {
return Err(invalid_request("Not initialized"));
}
if let Some(reason) = codex_request.experimental_reason()
&& !session.experimental_api_enabled()
{
return Err(invalid_request(experimental_required_message(reason)));
}
let connection_id = connection_request_id.connection_id;
self.initialize_processor.track_initialized_request(
connection_id,
connection_request_id.request_id.clone(),
&codex_request,
);
let serialization_scope = codex_request.serialization_scope();
let app_server_client_name = session.app_server_client_name().map(str::to_string);
let client_version = session.client_version().map(str::to_string);
let error_request_id = connection_request_id.clone();
let rpc_gate = Arc::clone(&session.rpc_gate);
let processor = Arc::clone(self);
let span = request_context.span();
let request = QueuedInitializedRequest::new(
rpc_gate,
async move {
let processor_for_request = Arc::clone(&processor);
let result = Box::pin(processor_for_request.handle_initialized_client_request(
connection_request_id,
codex_request,
request_context,
app_server_client_name,
client_version,
))
.await;
if let Err(error) = result {
processor.outgoing.send_error(error_request_id, error).await;
}
}
.instrument(span),
);
if let Some(scope) = serialization_scope {
let (key, access) = RequestSerializationQueueKey::from_scope(connection_id, scope);
self.request_serialization_queues
.enqueue(key, access, request)
.await;
} else {
tokio::spawn(async move {
request.run().await;
});
}
Ok(())
}
async fn handle_initialized_client_request(
self: Arc<Self>,
connection_request_id: ConnectionRequestId,
codex_request: ClientRequest,
request_context: RequestContext,
app_server_client_name: Option<String>,
client_version: Option<String>,
) -> Result<(), JSONRPCErrorError> {
let connection_id = connection_request_id.connection_id;
let request_id = ConnectionRequestId {
connection_id,
request_id: codex_request.id().clone(),
};
let result: Result<Option<ClientResponsePayload>, JSONRPCErrorError> = match codex_request {
ClientRequest::Initialize { .. } => {
panic!("Initialize should be handled before initialized request dispatch");
}
ClientRequest::ConfigRead { params, .. } => self
.config_processor
.read(params)
.await
.map(|response| Some(response.into())),
ClientRequest::WindowsSandboxReadiness { .. } => self
.windows_sandbox_processor
.windows_sandbox_readiness()
.await
.map(|response| Some(response.into())),
ClientRequest::ExternalAgentConfigDetect { params, .. } => self
.external_agent_config_processor
.detect(params)
.await
.map(|response| Some(response.into())),
ClientRequest::ExternalAgentConfigImport { params, .. } => self
.external_agent_config_processor
.import(request_id.clone(), params)
.await
.map(|()| None),
ClientRequest::ConfigValueWrite { params, .. } => {
self.config_processor.value_write(params).await.map(Some)
}
ClientRequest::ConfigBatchWrite { params, .. } => {
self.config_processor.batch_write(params).await.map(Some)
}
ClientRequest::ExperimentalFeatureEnablementSet { params, .. } => {
self.config_processor
.experimental_feature_enablement_set(request_id.clone(), params)
.await
}
ClientRequest::ConfigRequirementsRead { params: _, .. } => self
.config_processor
.config_requirements_read()
.await
.map(|response| Some(response.into())),
ClientRequest::EnvironmentAdd { params, .. } => {
self.environment_processor.environment_add(params).await
}
ClientRequest::FsReadFile { params, .. } => self
.fs_processor
.read_file(params)
.await
.map(|response| Some(response.into())),
ClientRequest::FsWriteFile { params, .. } => self
.fs_processor
.write_file(params)
.await
.map(|response| Some(response.into())),
ClientRequest::FsCreateDirectory { params, .. } => self
.fs_processor
.create_directory(params)
.await
.map(|response| Some(response.into())),
ClientRequest::FsGetMetadata { params, .. } => self
.fs_processor
.get_metadata(params)
.await
.map(|response| Some(response.into())),
ClientRequest::FsReadDirectory { params, .. } => self
.fs_processor
.read_directory(params)
.await
.map(|response| Some(response.into())),
ClientRequest::FsRemove { params, .. } => self
.fs_processor
.remove(params)
.await
.map(|response| Some(response.into())),
ClientRequest::FsCopy { params, .. } => self
.fs_processor
.copy(params)
.await
.map(|response| Some(response.into())),
ClientRequest::FsWatch { params, .. } => self
.fs_processor
.watch(connection_id, params)
.await
.map(|response| Some(response.into())),
ClientRequest::FsUnwatch { params, .. } => self
.fs_processor
.unwatch(connection_id, params)
.await
.map(|response| Some(response.into())),
ClientRequest::ModelProviderCapabilitiesRead { params: _, .. } => self
.config_processor
.model_provider_capabilities_read()
.await
.map(|response| Some(response.into())),
ClientRequest::ThreadStart { params, .. } => {
self.thread_processor
.thread_start(
request_id.clone(),
params,
app_server_client_name.clone(),
client_version.clone(),
request_context,
)
.await
}
ClientRequest::ThreadUnsubscribe { params, .. } => {
self.thread_processor
.thread_unsubscribe(&request_id, params)
.await
}
ClientRequest::ThreadResume { params, .. } => {
self.thread_processor
.thread_resume(
request_id.clone(),
params,
app_server_client_name.clone(),
client_version.clone(),
)
.await
}
ClientRequest::ThreadFork { params, .. } => {
self.thread_processor
.thread_fork(
request_id.clone(),
params,
app_server_client_name.clone(),
client_version.clone(),
)
.await
}
ClientRequest::ThreadArchive { params, .. } => {
self.thread_processor
.thread_archive(request_id.clone(), params)
.await
}
ClientRequest::ThreadIncrementElicitation { params, .. } => {
self.thread_processor
.thread_increment_elicitation(params)
.await
}
ClientRequest::ThreadDecrementElicitation { params, .. } => {
self.thread_processor
.thread_decrement_elicitation(params)
.await
}
ClientRequest::ThreadSetName { params, .. } => {
self.thread_processor
.thread_set_name(request_id.clone(), params)
.await
}
ClientRequest::ThreadGoalSet { params, .. } => {
self.thread_goal_processor
.thread_goal_set(request_id.clone(), params)
.await
}
ClientRequest::ThreadGoalGet { params, .. } => {
self.thread_goal_processor.thread_goal_get(params).await
}
ClientRequest::ThreadGoalClear { params, .. } => {
self.thread_goal_processor
.thread_goal_clear(request_id.clone(), params)
.await
}
ClientRequest::ThreadMetadataUpdate { params, .. } => {
self.thread_processor.thread_metadata_update(params).await
}
ClientRequest::ThreadMemoryModeSet { params, .. } => {
self.thread_processor.thread_memory_mode_set(params).await
}
ClientRequest::MemoryReset { .. } => self.thread_processor.memory_reset().await,
ClientRequest::ThreadUnarchive { params, .. } => {
self.thread_processor
.thread_unarchive(request_id.clone(), params)
.await
}
ClientRequest::ThreadCompactStart { params, .. } => {
self.thread_processor
.thread_compact_start(&request_id, params)
.await
}
ClientRequest::ThreadBackgroundTerminalsClean { params, .. } => {
self.thread_processor
.thread_background_terminals_clean(&request_id, params)
.await
}
ClientRequest::ThreadRollback { params, .. } => {
self.thread_processor
.thread_rollback(&request_id, params)
.await
}
ClientRequest::ThreadList { params, .. } => {
self.thread_processor.thread_list(params).await
}
ClientRequest::ThreadLoadedList { params, .. } => {
self.thread_processor.thread_loaded_list(params).await
}
ClientRequest::ThreadRead { params, .. } => {
self.thread_processor.thread_read(params).await
}
ClientRequest::ThreadTurnsList { params, .. } => {
self.thread_processor.thread_turns_list(params).await
}
ClientRequest::ThreadTurnsItemsList { params, .. } => {
self.thread_processor.thread_turns_items_list(params).await
}
ClientRequest::ThreadShellCommand { params, .. } => {
self.thread_processor
.thread_shell_command(&request_id, params)
.await
}
ClientRequest::ThreadApproveGuardianDeniedAction { params, .. } => {
self.thread_processor
.thread_approve_guardian_denied_action(&request_id, params)
.await
}
ClientRequest::GetConversationSummary { params, .. } => {
self.thread_processor.conversation_summary(params).await
}
ClientRequest::SkillsList { params, .. } => {
self.catalog_processor.skills_list(params).await
}
ClientRequest::HooksList { params, .. } => {
self.catalog_processor.hooks_list(params).await
}
ClientRequest::MarketplaceAdd { params, .. } => {
self.marketplace_processor.marketplace_add(params).await
}
ClientRequest::MarketplaceRemove { params, .. } => {
self.marketplace_processor.marketplace_remove(params).await
}
ClientRequest::MarketplaceUpgrade { params, .. } => {
self.marketplace_processor.marketplace_upgrade(params).await
}
ClientRequest::PluginList { params, .. } => {
self.plugin_processor.plugin_list(params).await
}
ClientRequest::PluginRead { params, .. } => {
self.plugin_processor.plugin_read(params).await
}
ClientRequest::PluginSkillRead { params, .. } => {
self.plugin_processor.plugin_skill_read(params).await
}
ClientRequest::PluginShareSave { params, .. } => {
self.plugin_processor.plugin_share_save(params).await
}
ClientRequest::PluginShareUpdateTargets { params, .. } => {
self.plugin_processor
.plugin_share_update_targets(params)
.await
}
ClientRequest::PluginShareList { params, .. } => {
self.plugin_processor.plugin_share_list(params).await
}
ClientRequest::PluginShareDelete { params, .. } => {
self.plugin_processor.plugin_share_delete(params).await
}
ClientRequest::AppsList { params, .. } => {
self.apps_processor.apps_list(&request_id, params).await
}
ClientRequest::SkillsConfigWrite { params, .. } => {
self.catalog_processor.skills_config_write(params).await
}
ClientRequest::PluginInstall { params, .. } => {
self.plugin_processor.plugin_install(params).await
}
ClientRequest::PluginUninstall { params, .. } => {
self.plugin_processor.plugin_uninstall(params).await
}
ClientRequest::ModelList { params, .. } => {
self.catalog_processor.model_list(params).await
}
ClientRequest::ExperimentalFeatureList { params, .. } => {
self.catalog_processor
.experimental_feature_list(params)
.await
}
ClientRequest::CollaborationModeList { params, .. } => {
self.catalog_processor.collaboration_mode_list(params).await
}
ClientRequest::MockExperimentalMethod { params, .. } => {
self.catalog_processor
.mock_experimental_method(params)
.await
}
ClientRequest::TurnStart { params, .. } => {
self.turn_processor
.turn_start(
request_id.clone(),
params,
app_server_client_name.clone(),
client_version.clone(),
)
.await
}
ClientRequest::ThreadInjectItems { params, .. } => {
self.turn_processor.thread_inject_items(params).await
}
ClientRequest::TurnSteer { params, .. } => {
self.turn_processor.turn_steer(&request_id, params).await
}
ClientRequest::TurnInterrupt { params, .. } => {
self.turn_processor
.turn_interrupt(&request_id, params)
.await
}
ClientRequest::ThreadRealtimeStart { params, .. } => {
self.turn_processor
.thread_realtime_start(&request_id, params)
.await
}
ClientRequest::ThreadRealtimeAppendAudio { params, .. } => {
self.turn_processor
.thread_realtime_append_audio(&request_id, params)
.await
}
ClientRequest::ThreadRealtimeAppendText { params, .. } => {
self.turn_processor
.thread_realtime_append_text(&request_id, params)
.await
}
ClientRequest::ThreadRealtimeStop { params, .. } => {
self.turn_processor
.thread_realtime_stop(&request_id, params)
.await
}
ClientRequest::ThreadRealtimeListVoices { params: _, .. } => {
self.turn_processor.thread_realtime_list_voices().await
}
ClientRequest::ReviewStart { params, .. } => {
self.turn_processor.review_start(&request_id, params).await
}
ClientRequest::McpServerOauthLogin { params, .. } => {
self.mcp_processor.mcp_server_oauth_login(params).await
}
ClientRequest::McpServerRefresh { params, .. } => {
self.mcp_processor.mcp_server_refresh(params).await
}
ClientRequest::McpServerStatusList { params, .. } => {
self.mcp_processor
.mcp_server_status_list(&request_id, params)
.await
}
ClientRequest::McpResourceRead { params, .. } => {
self.mcp_processor
.mcp_resource_read(&request_id, params)
.await
}
ClientRequest::McpServerToolCall { params, .. } => {
self.mcp_processor
.mcp_server_tool_call(&request_id, params)
.await
}
ClientRequest::WindowsSandboxSetupStart { params, .. } => {
self.windows_sandbox_processor
.windows_sandbox_setup_start(&request_id, params)
.await
}
ClientRequest::LoginAccount { params, .. } => {
self.account_processor
.login_account(request_id.clone(), params)
.await
}
ClientRequest::LogoutAccount { .. } => {
self.account_processor
.logout_account(request_id.clone())
.await
}
ClientRequest::CancelLoginAccount { params, .. } => {
self.account_processor.cancel_login_account(params).await
}
ClientRequest::GetAccount { params, .. } => {
self.account_processor.get_account(params).await
}
ClientRequest::GetAuthStatus { params, .. } => {
self.account_processor.get_auth_status(params).await
}
ClientRequest::GetAccountRateLimits { .. } => {
self.account_processor.get_account_rate_limits().await
}
ClientRequest::SendAddCreditsNudgeEmail { params, .. } => {
self.account_processor
.send_add_credits_nudge_email(params)
.await
}
ClientRequest::GitDiffToRemote { params, .. } => {
self.git_processor.git_diff_to_remote(params).await
}
ClientRequest::FuzzyFileSearch { params, .. } => self
.search_processor
.fuzzy_file_search(params)
.await
.map(|response| Some(response.into())),
ClientRequest::FuzzyFileSearchSessionStart { params, .. } => self
.search_processor
.fuzzy_file_search_session_start_response(params)
.await
.map(|response| Some(response.into())),
ClientRequest::FuzzyFileSearchSessionUpdate { params, .. } => self
.search_processor
.fuzzy_file_search_session_update_response(params)
.await
.map(|response| Some(response.into())),
ClientRequest::FuzzyFileSearchSessionStop { params, .. } => self
.search_processor
.fuzzy_file_search_session_stop(params)
.await
.map(|response| Some(response.into())),
ClientRequest::OneOffCommandExec { params, .. } => {
self.command_exec_processor
.one_off_command_exec(&request_id, params)
.await
}
ClientRequest::CommandExecWrite { params, .. } => {
self.command_exec_processor
.command_exec_write(request_id.clone(), params)
.await
}
ClientRequest::CommandExecResize { params, .. } => {
self.command_exec_processor
.command_exec_resize(request_id.clone(), params)
.await
}
ClientRequest::CommandExecTerminate { params, .. } => {
self.command_exec_processor
.command_exec_terminate(request_id.clone(), params)
.await
}
ClientRequest::ProcessSpawn { params, .. } => self
.process_exec_processor
.process_spawn(request_id.clone(), params)
.await
.map(|()| None),
ClientRequest::ProcessWriteStdin { params, .. } => {
self.process_exec_processor
.process_write_stdin(request_id.clone(), params)
.await
}
ClientRequest::ProcessKill { params, .. } => {
self.process_exec_processor
.process_kill(request_id.clone(), params)
.await
}
ClientRequest::ProcessResizePty { params, .. } => {
self.process_exec_processor
.process_resize_pty(request_id.clone(), params)
.await
}
ClientRequest::FeedbackUpload { params, .. } => {
self.feedback_processor.feedback_upload(params).await
}
};
match result {
Ok(Some(response)) => {
self.outgoing
.send_response_as(request_id.clone(), response)
.await;
}
Ok(None) => {}
Err(error) => {
self.outgoing.send_error(request_id.clone(), error).await;
}
}
Ok(())
}
}
#[cfg(test)]
#[path = "message_processor_tracing_tests.rs"]
mod message_processor_tracing_tests;