app-server: Use shared receivers for app-server message processors (#17256)

We do not rely on the mutability here, so express it in the type system.
This commit is contained in:
Ruslan Nigmatullin
2026-04-09 12:53:50 -07:00
committed by GitHub
parent a92a5085bd
commit 545f3daba0
2 changed files with 44 additions and 60 deletions

View File

@@ -690,7 +690,7 @@ impl CodexMessageProcessor {
}
pub async fn process_request(
&mut self,
&self,
connection_id: ConnectionId,
request: ClientRequest,
app_server_client_name: Option<String>,
@@ -1000,7 +1000,7 @@ impl CodexMessageProcessor {
}
}
async fn login_v2(&mut self, request_id: ConnectionRequestId, params: LoginAccountParams) {
async fn login_v2(&self, request_id: ConnectionRequestId, params: LoginAccountParams) {
match params {
LoginAccountParams::ApiKey { api_key } => {
self.login_api_key_v2(request_id, LoginApiKeyParams { api_key })
@@ -1038,7 +1038,7 @@ impl CodexMessageProcessor {
}
async fn login_api_key_common(
&mut self,
&self,
params: &LoginApiKeyParams,
) -> std::result::Result<(), JSONRPCErrorError> {
if self.auth_manager.is_external_chatgpt_auth_active() {
@@ -1081,11 +1081,7 @@ impl CodexMessageProcessor {
}
}
async fn login_api_key_v2(
&mut self,
request_id: ConnectionRequestId,
params: LoginApiKeyParams,
) {
async fn login_api_key_v2(&self, request_id: ConnectionRequestId, params: LoginApiKeyParams) {
match self.login_api_key_common(&params).await {
Ok(()) => {
let response = codex_app_server_protocol::LoginAccountResponse::ApiKey {};
@@ -1168,7 +1164,7 @@ impl CodexMessageProcessor {
}
}
async fn login_chatgpt_v2(&mut self, request_id: ConnectionRequestId) {
async fn login_chatgpt_v2(&self, request_id: ConnectionRequestId) {
match self.login_chatgpt_common().await {
Ok(opts) => match run_login_server(opts) {
Ok(server) => {
@@ -1277,7 +1273,7 @@ impl CodexMessageProcessor {
}
}
async fn login_chatgpt_device_code_v2(&mut self, request_id: ConnectionRequestId) {
async fn login_chatgpt_device_code_v2(&self, request_id: ConnectionRequestId) {
match self.login_chatgpt_common().await {
Ok(opts) => match request_device_code(&opts).await {
Ok(device_code) => {
@@ -1380,7 +1376,7 @@ impl CodexMessageProcessor {
}
async fn cancel_login_chatgpt_common(
&mut self,
&self,
login_id: Uuid,
) -> std::result::Result<(), CancelLoginError> {
let mut guard = self.active_login.lock().await;
@@ -1395,7 +1391,7 @@ impl CodexMessageProcessor {
}
async fn cancel_login_v2(
&mut self,
&self,
request_id: ConnectionRequestId,
params: CancelLoginAccountParams,
) {
@@ -1421,7 +1417,7 @@ impl CodexMessageProcessor {
}
async fn login_chatgpt_auth_tokens(
&mut self,
&self,
request_id: ConnectionRequestId,
access_token: String,
chatgpt_account_id: String,
@@ -1510,7 +1506,7 @@ impl CodexMessageProcessor {
.await;
}
async fn logout_common(&mut self) -> std::result::Result<Option<AuthMode>, JSONRPCErrorError> {
async fn logout_common(&self) -> std::result::Result<Option<AuthMode>, JSONRPCErrorError> {
// Cancel any active login attempt.
{
let mut guard = self.active_login.lock().await;
@@ -1535,7 +1531,7 @@ impl CodexMessageProcessor {
.map(CodexAuth::api_auth_mode))
}
async fn logout_v2(&mut self, request_id: ConnectionRequestId) {
async fn logout_v2(&self, request_id: ConnectionRequestId) {
match self.logout_common().await {
Ok(current_auth_method) => {
self.outgoing
@@ -2508,11 +2504,7 @@ impl CodexMessageProcessor {
}
}
async fn thread_archive(
&mut self,
request_id: ConnectionRequestId,
params: ThreadArchiveParams,
) {
async fn thread_archive(&self, request_id: ConnectionRequestId, params: ThreadArchiveParams) {
// TODO(jif) mostly rewrite this using sqlite after phase 1
let thread_id = match ThreadId::from_string(&params.thread_id) {
Ok(id) => id,
@@ -3037,7 +3029,7 @@ impl CodexMessageProcessor {
}
async fn thread_unarchive(
&mut self,
&self,
request_id: ConnectionRequestId,
params: ThreadUnarchiveParams,
) {
@@ -3225,11 +3217,7 @@ impl CodexMessageProcessor {
}
}
async fn thread_rollback(
&mut self,
request_id: ConnectionRequestId,
params: ThreadRollbackParams,
) {
async fn thread_rollback(&self, request_id: ConnectionRequestId, params: ThreadRollbackParams) {
let ThreadRollbackParams {
thread_id,
num_turns,
@@ -3550,7 +3538,7 @@ impl CodexMessageProcessor {
self.outgoing.send_response(request_id, response).await;
}
async fn thread_read(&mut self, request_id: ConnectionRequestId, params: ThreadReadParams) {
async fn thread_read(&self, request_id: ConnectionRequestId, params: ThreadReadParams) {
let ThreadReadParams {
thread_id,
include_turns,
@@ -3714,7 +3702,7 @@ impl CodexMessageProcessor {
.await;
}
pub(crate) async fn connection_closed(&mut self, connection_id: ConnectionId) {
pub(crate) async fn connection_closed(&self, connection_id: ConnectionId) {
self.command_exec_manager
.connection_closed(connection_id)
.await;
@@ -3729,7 +3717,7 @@ impl CodexMessageProcessor {
/// Best-effort: ensure initialized connections are subscribed to this thread.
pub(crate) async fn try_attach_thread_listener(
&mut self,
&self,
thread_id: ThreadId,
connection_ids: Vec<ConnectionId>,
) {
@@ -3756,7 +3744,7 @@ impl CodexMessageProcessor {
}
}
async fn thread_resume(&mut self, request_id: ConnectionRequestId, params: ThreadResumeParams) {
async fn thread_resume(&self, request_id: ConnectionRequestId, params: ThreadResumeParams) {
if let Ok(thread_id) = ThreadId::from_string(&params.thread_id)
&& self
.pending_thread_unloads
@@ -3990,7 +3978,7 @@ impl CodexMessageProcessor {
}
async fn resume_running_thread(
&mut self,
&self,
request_id: ConnectionRequestId,
params: &ThreadResumeParams,
) -> bool {
@@ -4289,7 +4277,7 @@ impl CodexMessageProcessor {
}
}
async fn thread_fork(&mut self, request_id: ConnectionRequestId, params: ThreadForkParams) {
async fn thread_fork(&self, request_id: ConnectionRequestId, params: ThreadForkParams) {
let ThreadForkParams {
thread_id,
path,
@@ -5452,7 +5440,7 @@ impl CodexMessageProcessor {
}
}
async fn finalize_thread_teardown(&mut self, thread_id: ThreadId) {
async fn finalize_thread_teardown(&self, thread_id: ThreadId) {
self.pending_thread_unloads.lock().await.remove(&thread_id);
self.outgoing
.cancel_requests_for_thread(thread_id, /*error*/ None)
@@ -5466,7 +5454,7 @@ impl CodexMessageProcessor {
}
async fn thread_unsubscribe(
&mut self,
&self,
request_id: ConnectionRequestId,
params: ThreadUnsubscribeParams,
) {
@@ -5577,7 +5565,7 @@ impl CodexMessageProcessor {
}
async fn archive_thread_common(
&mut self,
&self,
thread_id: ThreadId,
rollout_path: &Path,
) -> Result<(), JSONRPCErrorError> {
@@ -6817,7 +6805,7 @@ impl CodexMessageProcessor {
}
async fn prepare_realtime_conversation_thread(
&mut self,
&self,
request_id: ConnectionRequestId,
thread_id: &str,
) -> Option<(ThreadId, Arc<CodexThread>)> {
@@ -6861,7 +6849,7 @@ impl CodexMessageProcessor {
}
async fn thread_realtime_start(
&mut self,
&self,
request_id: ConnectionRequestId,
params: ThreadRealtimeStartParams,
) {
@@ -6909,7 +6897,7 @@ impl CodexMessageProcessor {
}
async fn thread_realtime_append_audio(
&mut self,
&self,
request_id: ConnectionRequestId,
params: ThreadRealtimeAppendAudioParams,
) {
@@ -6947,7 +6935,7 @@ impl CodexMessageProcessor {
}
async fn thread_realtime_append_text(
&mut self,
&self,
request_id: ConnectionRequestId,
params: ThreadRealtimeAppendTextParams,
) {
@@ -6983,7 +6971,7 @@ impl CodexMessageProcessor {
}
async fn thread_realtime_stop(
&mut self,
&self,
request_id: ConnectionRequestId,
params: ThreadRealtimeStopParams,
) {
@@ -7015,7 +7003,7 @@ impl CodexMessageProcessor {
}
async fn thread_realtime_list_voices(
&mut self,
&self,
request_id: ConnectionRequestId,
_params: ThreadRealtimeListVoicesParams,
) {
@@ -7101,7 +7089,7 @@ impl CodexMessageProcessor {
}
async fn start_detached_review(
&mut self,
&self,
request_id: &ConnectionRequestId,
parent_thread_id: ThreadId,
parent_thread: Arc<CodexThread>,
@@ -7219,7 +7207,7 @@ impl CodexMessageProcessor {
Ok(())
}
async fn review_start(&mut self, request_id: ConnectionRequestId, params: ReviewStartParams) {
async fn review_start(&self, request_id: ConnectionRequestId, params: ReviewStartParams) {
let ReviewStartParams {
thread_id,
target,
@@ -7274,11 +7262,7 @@ impl CodexMessageProcessor {
}
}
async fn turn_interrupt(
&mut self,
request_id: ConnectionRequestId,
params: TurnInterruptParams,
) {
async fn turn_interrupt(&self, request_id: ConnectionRequestId, params: TurnInterruptParams) {
let TurnInterruptParams { thread_id, turn_id } = params;
self.outgoing
.record_request_turn_id(&request_id, &turn_id)
@@ -7547,7 +7531,7 @@ impl CodexMessageProcessor {
}
async fn fuzzy_file_search(
&mut self,
&self,
request_id: ConnectionRequestId,
params: FuzzyFileSearchParams,
) {
@@ -7591,7 +7575,7 @@ impl CodexMessageProcessor {
}
async fn fuzzy_file_search_session_start(
&mut self,
&self,
request_id: ConnectionRequestId,
params: FuzzyFileSearchSessionStartParams,
) {
@@ -7628,7 +7612,7 @@ impl CodexMessageProcessor {
}
async fn fuzzy_file_search_session_update(
&mut self,
&self,
request_id: ConnectionRequestId,
params: FuzzyFileSearchSessionUpdateParams,
) {
@@ -7658,7 +7642,7 @@ impl CodexMessageProcessor {
}
async fn fuzzy_file_search_session_stop(
&mut self,
&self,
request_id: ConnectionRequestId,
params: FuzzyFileSearchSessionStopParams,
) {
@@ -7858,7 +7842,7 @@ impl CodexMessageProcessor {
}
async fn windows_sandbox_setup_start(
&mut self,
&self,
request_id: ConnectionRequestId,
params: WindowsSandboxSetupStartParams,
) {