From ccd0e29421ab8adc3dc6638986d2af25b179cbcb Mon Sep 17 00:00:00 2001 From: Michael Bolin Date: Fri, 30 Jan 2026 19:15:59 -0800 Subject: [PATCH] app-server: add v2 APIs for TUI --- .../src/protocol/common.rs | 20 ++ .../app-server-protocol/src/protocol/v2.rs | 54 ++++ .../app-server/src/codex_message_processor.rs | 103 ++++++- codex-rs/app-server/src/lib.rs | 265 ++++++++++++++++++ codex-rs/app-server/src/message_processor.rs | 46 ++- .../app-server/tests/suite/v2/turn_start.rs | 3 + 6 files changed, 481 insertions(+), 10 deletions(-) diff --git a/codex-rs/app-server-protocol/src/protocol/common.rs b/codex-rs/app-server-protocol/src/protocol/common.rs index eb59480b58..f2a30a8f17 100644 --- a/codex-rs/app-server-protocol/src/protocol/common.rs +++ b/codex-rs/app-server-protocol/src/protocol/common.rs @@ -71,6 +71,14 @@ macro_rules! client_request_definitions { )* } + impl ClientRequest { + pub fn request_id(&self) -> &RequestId { + match self { + $(Self::$variant { request_id, .. } => request_id,)* + } + } + } + pub fn export_client_responses( out_dir: &::std::path::Path, ) -> ::std::result::Result<(), ::ts_rs::ExportError> { @@ -140,6 +148,14 @@ client_request_definitions! { params: v2::ThreadRollbackParams, response: v2::ThreadRollbackResponse, }, + ThreadCompact => "thread/compact" { + params: v2::ThreadCompactParams, + response: v2::ThreadCompactResponse, + }, + ThreadShutdown => "thread/shutdown" { + params: v2::ThreadShutdownParams, + response: v2::ThreadShutdownResponse, + }, ThreadList => "thread/list" { params: v2::ThreadListParams, response: v2::ThreadListResponse, @@ -201,6 +217,10 @@ client_request_definitions! { params: v2::ListMcpServerStatusParams, response: v2::ListMcpServerStatusResponse, }, + McpElicitationResolve => "mcp/elicitation/resolve" { + params: v2::McpElicitationResolveParams, + response: v2::McpElicitationResolveResponse, + }, LoginAccount => "account/login/start" { params: v2::LoginAccountParams, diff --git a/codex-rs/app-server-protocol/src/protocol/v2.rs b/codex-rs/app-server-protocol/src/protocol/v2.rs index f99659cd4d..09d65a38f8 100644 --- a/codex-rs/app-server-protocol/src/protocol/v2.rs +++ b/codex-rs/app-server-protocol/src/protocol/v2.rs @@ -3,6 +3,7 @@ use std::path::PathBuf; use crate::protocol::common::AuthMode; use codex_protocol::account::PlanType; +use codex_protocol::approvals::ElicitationAction as CoreElicitationAction; use codex_protocol::approvals::ExecPolicyAmendment as CoreExecPolicyAmendment; use codex_protocol::config_types::CollaborationMode; use codex_protocol::config_types::CollaborationModeMask; @@ -12,6 +13,7 @@ use codex_protocol::config_types::ReasoningSummary; use codex_protocol::config_types::SandboxMode as CoreSandboxMode; use codex_protocol::config_types::Verbosity; use codex_protocol::config_types::WebSearchMode; +use codex_protocol::config_types::WindowsSandboxLevel; use codex_protocol::items::AgentMessageContent as CoreAgentMessageContent; use codex_protocol::items::TurnItem as CoreTurnItem; use codex_protocol::models::ResponseItem; @@ -41,6 +43,7 @@ use codex_protocol::user_input::TextElement as CoreTextElement; use codex_protocol::user_input::UserInput as CoreUserInput; use codex_utils_absolute_path::AbsolutePathBuf; use mcp_types::ContentBlock as McpContentBlock; +use mcp_types::RequestId as McpRequestId; use mcp_types::Resource as McpResource; use mcp_types::ResourceTemplate as McpResourceTemplate; use mcp_types::Tool as McpTool; @@ -233,6 +236,14 @@ v2_enum_from_core!( } ); +v2_enum_from_core!( + pub enum ElicitationAction from CoreElicitationAction { + Accept, + Decline, + Cancel + } +); + #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, JsonSchema, TS)] #[serde(tag = "type", rename_all = "camelCase")] #[ts(tag = "type")] @@ -1061,6 +1072,21 @@ pub struct ListMcpServerStatusResponse { pub next_cursor: Option, } +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] +#[serde(rename_all = "camelCase")] +#[ts(export_to = "v2/")] +pub struct McpElicitationResolveParams { + pub thread_id: String, + pub server_name: String, + pub request_id: McpRequestId, + pub decision: ElicitationAction, +} + +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] +#[serde(rename_all = "camelCase")] +#[ts(export_to = "v2/")] +pub struct McpElicitationResolveResponse {} + #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Default, JsonSchema, TS)] #[serde(rename_all = "camelCase")] #[ts(export_to = "v2/")] @@ -1354,6 +1380,32 @@ pub struct ThreadRollbackResponse { pub thread: Thread, } +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] +#[serde(rename_all = "camelCase")] +#[ts(export_to = "v2/")] +pub struct ThreadCompactParams { + pub thread_id: String, +} + +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] +#[serde(rename_all = "camelCase")] +#[ts(export_to = "v2/")] +pub struct ThreadCompactResponse { + pub turn: Turn, +} + +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] +#[serde(rename_all = "camelCase")] +#[ts(export_to = "v2/")] +pub struct ThreadShutdownParams { + pub thread_id: String, +} + +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] +#[serde(rename_all = "camelCase")] +#[ts(export_to = "v2/")] +pub struct ThreadShutdownResponse {} + #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] #[serde(rename_all = "camelCase")] #[ts(export_to = "v2/")] @@ -1810,6 +1862,8 @@ pub struct TurnStartParams { pub approval_policy: Option, /// Override the sandbox policy for this turn and subsequent turns. pub sandbox_policy: Option, + /// Override the Windows sandbox level for this turn and subsequent turns. + pub windows_sandbox_level: Option, /// Override the model for this turn and subsequent turns. pub model: Option, /// Override the reasoning effort for this turn and subsequent turns. diff --git a/codex-rs/app-server/src/codex_message_processor.rs b/codex-rs/app-server/src/codex_message_processor.rs index aae369a19d..7a623cbacc 100644 --- a/codex-rs/app-server/src/codex_message_processor.rs +++ b/codex-rs/app-server/src/codex_message_processor.rs @@ -64,6 +64,8 @@ use codex_app_server_protocol::LoginChatGptCompleteNotification; use codex_app_server_protocol::LoginChatGptResponse; use codex_app_server_protocol::LogoutAccountResponse; use codex_app_server_protocol::LogoutChatGptResponse; +use codex_app_server_protocol::McpElicitationResolveParams; +use codex_app_server_protocol::McpElicitationResolveResponse; use codex_app_server_protocol::McpServerOauthLoginCompletedNotification; use codex_app_server_protocol::McpServerOauthLoginParams; use codex_app_server_protocol::McpServerOauthLoginResponse; @@ -98,6 +100,8 @@ use codex_app_server_protocol::SkillsListResponse; use codex_app_server_protocol::Thread; use codex_app_server_protocol::ThreadArchiveParams; use codex_app_server_protocol::ThreadArchiveResponse; +use codex_app_server_protocol::ThreadCompactParams; +use codex_app_server_protocol::ThreadCompactResponse; use codex_app_server_protocol::ThreadForkParams; use codex_app_server_protocol::ThreadForkResponse; use codex_app_server_protocol::ThreadItem; @@ -112,6 +116,8 @@ use codex_app_server_protocol::ThreadResumeResponse; use codex_app_server_protocol::ThreadRollbackParams; use codex_app_server_protocol::ThreadSetNameParams; use codex_app_server_protocol::ThreadSetNameResponse; +use codex_app_server_protocol::ThreadShutdownParams; +use codex_app_server_protocol::ThreadShutdownResponse; use codex_app_server_protocol::ThreadSortKey; use codex_app_server_protocol::ThreadSourceKind; use codex_app_server_protocol::ThreadStartParams; @@ -447,6 +453,12 @@ impl CodexMessageProcessor { ClientRequest::ThreadRollback { request_id, params } => { self.thread_rollback(request_id, params).await; } + ClientRequest::ThreadCompact { request_id, params } => { + self.thread_compact(request_id, params).await; + } + ClientRequest::ThreadShutdown { request_id, params } => { + self.thread_shutdown(request_id, params).await; + } ClientRequest::ThreadList { request_id, params } => { self.thread_list(request_id, params).await; } @@ -513,6 +525,9 @@ impl CodexMessageProcessor { ClientRequest::McpServerStatusList { request_id, params } => { self.list_mcp_server_status(request_id, params).await; } + ClientRequest::McpElicitationResolve { request_id, params } => { + self.mcp_elicitation_resolve(request_id, params).await; + } ClientRequest::LoginAccount { request_id, params } => { self.login_v2(request_id, params).await; } @@ -2051,6 +2066,63 @@ impl CodexMessageProcessor { } } + async fn thread_compact(&mut self, request_id: RequestId, params: ThreadCompactParams) { + let ThreadCompactParams { thread_id } = params; + let (thread_uuid, thread) = match self.load_thread(&thread_id).await { + Ok(v) => v, + Err(error) => { + self.outgoing.send_error(request_id, error).await; + return; + } + }; + + match thread.submit(Op::Compact).await { + Ok(turn_id) => { + let turn = Turn { + id: turn_id.clone(), + items: Vec::new(), + error: None, + status: TurnStatus::InProgress, + }; + let response = ThreadCompactResponse { turn: turn.clone() }; + self.outgoing.send_response(request_id, response).await; + + let notif = TurnStartedNotification { + thread_id: thread_uuid.to_string(), + turn, + }; + self.outgoing + .send_server_notification(ServerNotification::TurnStarted(notif)) + .await; + } + Err(err) => { + let error = JSONRPCErrorError { + code: INTERNAL_ERROR_CODE, + message: format!("failed to start compact turn: {err}"), + data: None, + }; + self.outgoing.send_error(request_id, error).await; + } + } + } + + async fn thread_shutdown(&mut self, request_id: RequestId, params: ThreadShutdownParams) { + let ThreadShutdownParams { thread_id } = params; + let (thread_uuid, thread) = match self.load_thread(&thread_id).await { + Ok(v) => v, + Err(error) => { + self.outgoing.send_error(request_id, error).await; + return; + } + }; + + let _ = thread.submit(Op::Shutdown).await; + self.thread_manager.remove_thread(&thread_uuid).await; + + let response = ThreadShutdownResponse {}; + self.outgoing.send_response(request_id, response).await; + } + async fn thread_list(&self, request_id: RequestId, params: ThreadListParams) { let ThreadListParams { cursor, @@ -3143,6 +3215,34 @@ impl CodexMessageProcessor { }); } + async fn mcp_elicitation_resolve( + &self, + request_id: RequestId, + params: McpElicitationResolveParams, + ) { + let McpElicitationResolveParams { + thread_id, + server_name, + request_id: elicitation_id, + decision, + } = params; + let (_, thread) = match self.load_thread(&thread_id).await { + Ok(v) => v, + Err(error) => { + self.outgoing.send_error(request_id, error).await; + return; + } + }; + let op = Op::ResolveElicitation { + server_name, + request_id: elicitation_id, + decision: decision.to_core(), + }; + let _ = thread.submit(op).await; + let response = McpElicitationResolveResponse {}; + self.outgoing.send_response(request_id, response).await; + } + async fn list_mcp_server_status_task( outgoing: Arc, request_id: RequestId, @@ -4090,6 +4190,7 @@ impl CodexMessageProcessor { let has_any_overrides = params.cwd.is_some() || params.approval_policy.is_some() || params.sandbox_policy.is_some() + || params.windows_sandbox_level.is_some() || params.model.is_some() || params.effort.is_some() || params.summary.is_some() @@ -4103,7 +4204,7 @@ impl CodexMessageProcessor { cwd: params.cwd, approval_policy: params.approval_policy.map(AskForApproval::to_core), sandbox_policy: params.sandbox_policy.map(|p| p.to_core()), - windows_sandbox_level: None, + windows_sandbox_level: params.windows_sandbox_level, model: params.model, effort: params.effort.map(Some), summary: params.summary, diff --git a/codex-rs/app-server/src/lib.rs b/codex-rs/app-server/src/lib.rs index 52ea9325f1..bf9350aea0 100644 --- a/codex-rs/app-server/src/lib.rs +++ b/codex-rs/app-server/src/lib.rs @@ -11,14 +11,22 @@ use codex_core::config_loader::LoaderOverrides; use std::io::ErrorKind; use std::io::Result as IoResult; use std::path::PathBuf; +use std::sync::Arc; use crate::message_processor::MessageProcessor; use crate::message_processor::MessageProcessorArgs; use crate::outgoing_message::OutgoingMessage; use crate::outgoing_message::OutgoingMessageSender; +use codex_app_server_protocol::ClientNotification; +use codex_app_server_protocol::ClientRequest; use codex_app_server_protocol::ConfigLayerSource; use codex_app_server_protocol::ConfigWarningNotification; +use codex_app_server_protocol::JSONRPCErrorError; use codex_app_server_protocol::JSONRPCMessage; +use codex_app_server_protocol::RequestId; +use codex_app_server_protocol::Result; +use codex_app_server_protocol::ServerNotification; +use codex_app_server_protocol::ServerRequest; use codex_app_server_protocol::TextPosition as AppTextPosition; use codex_app_server_protocol::TextRange as AppTextRange; use codex_core::ExecPolicyError; @@ -324,6 +332,7 @@ pub async fn run_main( cloud_requirements: cloud_requirements.clone(), feedback: feedback.clone(), config_warnings, + session_source: codex_protocol::protocol::SessionSource::VSCode, }); let mut thread_created_rx = processor.thread_created_receiver(); async move { @@ -395,3 +404,259 @@ pub async fn run_main( Ok(()) } + +#[derive(Debug, Clone)] +pub enum AppServerClientMessage { + Request(ClientRequest), + Response { + id: RequestId, + result: Result, + }, + Error { + id: RequestId, + error: JSONRPCErrorError, + }, + Notification(ClientNotification), +} + +#[derive(Debug, Clone, PartialEq)] +pub struct AppServerEventNotification { + pub method: String, + pub params: Option, +} + +#[derive(Debug, Clone)] +pub enum AppServerMessage { + Request(ServerRequest), + Notification(ServerNotification), + EventNotification(AppServerEventNotification), + Response { + id: RequestId, + result: Result, + }, + Error { + id: RequestId, + error: JSONRPCErrorError, + }, +} + +pub struct InMemoryAppServer { + pub incoming: mpsc::Sender, + pub outgoing: mpsc::Receiver, +} + +pub struct InProcessAppServer { + pub incoming: mpsc::Sender, + pub outgoing: mpsc::Receiver, +} + +pub fn spawn_in_memory( + codex_linux_sandbox_exe: Option, + config: Arc, + cli_overrides: Vec<(String, TomlValue)>, + loader_overrides: LoaderOverrides, + feedback: CodexFeedback, + config_warnings: Vec, + session_source: codex_protocol::protocol::SessionSource, +) -> InMemoryAppServer { + let (incoming_tx, mut incoming_rx) = mpsc::channel::(CHANNEL_CAPACITY); + let (outgoing_tx, mut outgoing_rx) = mpsc::channel::(CHANNEL_CAPACITY); + let (client_outgoing_tx, client_outgoing_rx) = + mpsc::channel::(CHANNEL_CAPACITY); + + let auth_manager = AuthManager::shared( + config.codex_home.clone(), + false, + config.cli_auth_credentials_store_mode, + ); + let cloud_requirements = + cloud_requirements_loader(auth_manager, config.chatgpt_base_url.clone()); + + tokio::spawn({ + let outgoing_message_sender = OutgoingMessageSender::new(outgoing_tx); + let mut processor = MessageProcessor::new(MessageProcessorArgs { + outgoing: outgoing_message_sender, + codex_linux_sandbox_exe, + config: Arc::clone(&config), + cli_overrides, + loader_overrides, + cloud_requirements, + feedback, + config_warnings, + session_source, + }); + let mut thread_created_rx = processor.thread_created_receiver(); + async move { + let mut listen_for_threads = true; + loop { + tokio::select! { + msg = incoming_rx.recv() => { + let Some(msg) = msg else { + break; + }; + match msg { + JSONRPCMessage::Request(r) => processor.process_request(r).await, + JSONRPCMessage::Response(r) => processor.process_response(r).await, + JSONRPCMessage::Notification(n) => processor.process_notification(n).await, + JSONRPCMessage::Error(e) => processor.process_error(e).await, + } + } + created = thread_created_rx.recv(), if listen_for_threads => { + match created { + Ok(thread_id) => { + processor.try_attach_thread_listener(thread_id).await; + } + Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => { + warn!("thread_created receiver lagged; skipping resync"); + } + Err(tokio::sync::broadcast::error::RecvError::Closed) => { + listen_for_threads = false; + } + } + } + } + } + + info!("in-memory processor task exited (channel closed)"); + } + }); + + tokio::spawn(async move { + while let Some(outgoing_message) = outgoing_rx.recv().await { + let Ok(value) = serde_json::to_value(outgoing_message) else { + error!("Failed to convert OutgoingMessage to JSON value"); + continue; + }; + let jsonrpc = match serde_json::from_value::(value) { + Ok(message) => message, + Err(err) => { + error!("Failed to deserialize OutgoingMessage as JSONRPCMessage: {err}"); + continue; + } + }; + if client_outgoing_tx.send(jsonrpc).await.is_err() { + break; + } + } + info!("in-memory outgoing task exited (channel closed)"); + }); + + InMemoryAppServer { + incoming: incoming_tx, + outgoing: client_outgoing_rx, + } +} + +pub fn spawn_in_memory_typed( + codex_linux_sandbox_exe: Option, + config: Arc, + cli_overrides: Vec<(String, TomlValue)>, + loader_overrides: LoaderOverrides, + feedback: CodexFeedback, + config_warnings: Vec, + session_source: codex_protocol::protocol::SessionSource, +) -> InProcessAppServer { + let (incoming_tx, mut incoming_rx) = mpsc::channel::(CHANNEL_CAPACITY); + let (outgoing_tx, mut outgoing_rx) = mpsc::channel::(CHANNEL_CAPACITY); + let (client_outgoing_tx, client_outgoing_rx) = + mpsc::channel::(CHANNEL_CAPACITY); + + let auth_manager = AuthManager::shared( + config.codex_home.clone(), + false, + config.cli_auth_credentials_store_mode, + ); + let cloud_requirements = + cloud_requirements_loader(auth_manager, config.chatgpt_base_url.clone()); + + tokio::spawn({ + let outgoing_message_sender = OutgoingMessageSender::new(outgoing_tx); + let mut processor = MessageProcessor::new(MessageProcessorArgs { + outgoing: outgoing_message_sender, + codex_linux_sandbox_exe, + config: Arc::clone(&config), + cli_overrides, + loader_overrides, + cloud_requirements, + feedback, + config_warnings, + session_source, + }); + let mut thread_created_rx = processor.thread_created_receiver(); + async move { + let mut listen_for_threads = true; + loop { + tokio::select! { + msg = incoming_rx.recv() => { + let Some(msg) = msg else { + break; + }; + match msg { + AppServerClientMessage::Request(request) => { + processor.process_client_request(request).await; + } + AppServerClientMessage::Response { id, result } => { + processor.process_client_response(id, result).await; + } + AppServerClientMessage::Error { id, error } => { + processor.process_client_error(id, error); + } + AppServerClientMessage::Notification(notification) => { + processor.process_client_notification(notification).await; + } + } + } + created = thread_created_rx.recv(), if listen_for_threads => { + match created { + Ok(thread_id) => { + processor.try_attach_thread_listener(thread_id).await; + } + Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => { + warn!("thread_created receiver lagged; skipping resync"); + } + Err(tokio::sync::broadcast::error::RecvError::Closed) => { + listen_for_threads = false; + } + } + } + } + } + + info!("in-process processor task exited (channel closed)"); + } + }); + + tokio::spawn(async move { + while let Some(outgoing_message) = outgoing_rx.recv().await { + let mapped = match outgoing_message { + OutgoingMessage::Request(request) => AppServerMessage::Request(request), + OutgoingMessage::AppServerNotification(notification) => { + AppServerMessage::Notification(notification) + } + OutgoingMessage::Notification(notification) => { + AppServerMessage::EventNotification(AppServerEventNotification { + method: notification.method, + params: notification.params, + }) + } + OutgoingMessage::Response(response) => AppServerMessage::Response { + id: response.id, + result: response.result, + }, + OutgoingMessage::Error(error) => AppServerMessage::Error { + id: error.id, + error: error.error, + }, + }; + if client_outgoing_tx.send(mapped).await.is_err() { + break; + } + } + info!("in-process outgoing task exited (channel closed)"); + }); + + InProcessAppServer { + incoming: incoming_tx, + outgoing: client_outgoing_rx, + } +} diff --git a/codex-rs/app-server/src/message_processor.rs b/codex-rs/app-server/src/message_processor.rs index b1c899bf1d..7d72c68191 100644 --- a/codex-rs/app-server/src/message_processor.rs +++ b/codex-rs/app-server/src/message_processor.rs @@ -11,6 +11,7 @@ use codex_app_server_protocol::ChatgptAuthTokensRefreshParams; use codex_app_server_protocol::ChatgptAuthTokensRefreshReason; use codex_app_server_protocol::ChatgptAuthTokensRefreshResponse; use codex_app_server_protocol::ClientInfo; +use codex_app_server_protocol::ClientNotification; use codex_app_server_protocol::ClientRequest; use codex_app_server_protocol::ConfigBatchWriteParams; use codex_app_server_protocol::ConfigReadParams; @@ -23,6 +24,7 @@ use codex_app_server_protocol::JSONRPCNotification; use codex_app_server_protocol::JSONRPCRequest; use codex_app_server_protocol::JSONRPCResponse; use codex_app_server_protocol::RequestId; +use codex_app_server_protocol::Result as JsonResult; use codex_app_server_protocol::ServerNotification; use codex_app_server_protocol::ServerRequestPayload; use codex_core::AuthManager; @@ -119,6 +121,7 @@ pub(crate) struct MessageProcessorArgs { pub(crate) cloud_requirements: CloudRequirementsLoader, pub(crate) feedback: CodexFeedback, pub(crate) config_warnings: Vec, + pub(crate) session_source: SessionSource, } impl MessageProcessor { @@ -134,7 +137,9 @@ impl MessageProcessor { cloud_requirements, feedback, config_warnings, + session_source, } = args; + let outgoing = Arc::new(outgoing); let auth_manager = AuthManager::shared( config.codex_home.clone(), @@ -148,7 +153,7 @@ impl MessageProcessor { let thread_manager = Arc::new(ThreadManager::new( config.codex_home.clone(), auth_manager.clone(), - SessionSource::VSCode, + session_source, )); let codex_message_processor = CodexMessageProcessor::new(CodexMessageProcessorArgs { auth_manager, @@ -205,24 +210,33 @@ impl MessageProcessor { } }; - match codex_request { + self.process_client_request(codex_request).await; + } + + pub(crate) async fn process_client_request(&mut self, codex_request: ClientRequest) { + let request_id = codex_request.request_id().clone(); + match &codex_request { // Handle Initialize internally so CodexMessageProcessor does not have to concern // itself with the `initialized` bool. - ClientRequest::Initialize { request_id, params } => { + ClientRequest::Initialize { + request_id: _, + params, + } => { if self.initialized { let error = JSONRPCErrorError { code: INVALID_REQUEST_ERROR_CODE, message: "Already initialized".to_string(), data: None, }; - self.outgoing.send_error(request_id, error).await; + self.outgoing.send_error(request_id.clone(), error).await; return; } else { + let client_info = params.client_info.clone(); let ClientInfo { name, title: _title, version, - } = params.client_info; + } = client_info; if let Err(error) = set_default_originator(name.clone()) { match error { SetOriginatorError::InvalidHeaderValue => { @@ -233,7 +247,7 @@ impl MessageProcessor { ), data: None, }; - self.outgoing.send_error(request_id, error).await; + self.outgoing.send_error(request_id.clone(), error).await; return; } SetOriginatorError::AlreadyInitialized => { @@ -252,7 +266,9 @@ impl MessageProcessor { let user_agent = get_codex_user_agent(); let response = InitializeResponse { user_agent }; - self.outgoing.send_response(request_id, response).await; + self.outgoing + .send_response(request_id.clone(), response) + .await; self.initialized = true; if !self.config_warnings.is_empty() { @@ -275,7 +291,7 @@ impl MessageProcessor { message: "Not initialized".to_string(), data: None, }; - self.outgoing.send_error(request_id, error).await; + self.outgoing.send_error(request_id.clone(), error).await; return; } } @@ -309,6 +325,10 @@ impl MessageProcessor { tracing::info!("<- notification: {:?}", notification); } + pub(crate) async fn process_client_notification(&self, notification: ClientNotification) { + tracing::info!("<- notification: {:?}", notification); + } + pub(crate) fn thread_created_receiver(&self) -> broadcast::Receiver { self.codex_message_processor.thread_created_receiver() } @@ -326,7 +346,11 @@ impl MessageProcessor { pub(crate) async fn process_response(&mut self, response: JSONRPCResponse) { tracing::info!("<- response: {:?}", response); let JSONRPCResponse { id, result, .. } = response; - self.outgoing.notify_client_response(id, result).await + self.process_client_response(id, result).await; + } + + pub(crate) async fn process_client_response(&mut self, id: RequestId, result: JsonResult) { + self.outgoing.notify_client_response(id, result).await; } /// Handle an error object received from the peer. @@ -335,6 +359,10 @@ impl MessageProcessor { self.outgoing.notify_client_error(err.id, err.error).await; } + pub(crate) fn process_client_error(&mut self, id: RequestId, error: JSONRPCErrorError) { + tracing::error!("<- error: {:?}", JSONRPCError { id, error }); + } + async fn handle_config_read(&self, request_id: RequestId, params: ConfigReadParams) { match self.config_api.read(params).await { Ok(response) => self.outgoing.send_response(request_id, response).await, diff --git a/codex-rs/app-server/tests/suite/v2/turn_start.rs b/codex-rs/app-server/tests/suite/v2/turn_start.rs index 99eff9ecb2..b950d9c344 100644 --- a/codex-rs/app-server/tests/suite/v2/turn_start.rs +++ b/codex-rs/app-server/tests/suite/v2/turn_start.rs @@ -656,6 +656,7 @@ async fn turn_start_exec_approval_toggle_v2() -> Result<()> { }], approval_policy: Some(codex_app_server_protocol::AskForApproval::Never), sandbox_policy: Some(codex_app_server_protocol::SandboxPolicy::DangerFullAccess), + windows_sandbox_level: None, model: Some("mock-model".to_string()), effort: Some(ReasoningEffort::Medium), summary: Some(ReasoningSummary::Auto), @@ -898,6 +899,7 @@ async fn turn_start_updates_sandbox_and_cwd_between_turns_v2() -> Result<()> { exclude_tmpdir_env_var: false, exclude_slash_tmp: false, }), + windows_sandbox_level: None, model: Some("mock-model".to_string()), effort: Some(ReasoningEffort::Medium), summary: Some(ReasoningSummary::Auto), @@ -929,6 +931,7 @@ async fn turn_start_updates_sandbox_and_cwd_between_turns_v2() -> Result<()> { cwd: Some(second_cwd.clone()), approval_policy: Some(codex_app_server_protocol::AskForApproval::Never), sandbox_policy: Some(codex_app_server_protocol::SandboxPolicy::DangerFullAccess), + windows_sandbox_level: None, model: Some("mock-model".to_string()), effort: Some(ReasoningEffort::Medium), summary: Some(ReasoningSummary::Auto),