Compare commits

...

1 Commits

Author SHA1 Message Date
Michael Bolin
ccd0e29421 app-server: add v2 APIs for TUI 2026-01-30 19:15:59 -08:00
6 changed files with 481 additions and 10 deletions

View File

@@ -71,6 +71,14 @@ macro_rules! client_request_definitions {
)*
}
impl ClientRequest {
pub fn request_id(&self) -> &RequestId {
match self {
$(Self::$variant { request_id, .. } => request_id,)*
}
}
}
pub fn export_client_responses(
out_dir: &::std::path::Path,
) -> ::std::result::Result<(), ::ts_rs::ExportError> {
@@ -140,6 +148,14 @@ client_request_definitions! {
params: v2::ThreadRollbackParams,
response: v2::ThreadRollbackResponse,
},
ThreadCompact => "thread/compact" {
params: v2::ThreadCompactParams,
response: v2::ThreadCompactResponse,
},
ThreadShutdown => "thread/shutdown" {
params: v2::ThreadShutdownParams,
response: v2::ThreadShutdownResponse,
},
ThreadList => "thread/list" {
params: v2::ThreadListParams,
response: v2::ThreadListResponse,
@@ -201,6 +217,10 @@ client_request_definitions! {
params: v2::ListMcpServerStatusParams,
response: v2::ListMcpServerStatusResponse,
},
McpElicitationResolve => "mcp/elicitation/resolve" {
params: v2::McpElicitationResolveParams,
response: v2::McpElicitationResolveResponse,
},
LoginAccount => "account/login/start" {
params: v2::LoginAccountParams,

View File

@@ -3,6 +3,7 @@ use std::path::PathBuf;
use crate::protocol::common::AuthMode;
use codex_protocol::account::PlanType;
use codex_protocol::approvals::ElicitationAction as CoreElicitationAction;
use codex_protocol::approvals::ExecPolicyAmendment as CoreExecPolicyAmendment;
use codex_protocol::config_types::CollaborationMode;
use codex_protocol::config_types::CollaborationModeMask;
@@ -12,6 +13,7 @@ use codex_protocol::config_types::ReasoningSummary;
use codex_protocol::config_types::SandboxMode as CoreSandboxMode;
use codex_protocol::config_types::Verbosity;
use codex_protocol::config_types::WebSearchMode;
use codex_protocol::config_types::WindowsSandboxLevel;
use codex_protocol::items::AgentMessageContent as CoreAgentMessageContent;
use codex_protocol::items::TurnItem as CoreTurnItem;
use codex_protocol::models::ResponseItem;
@@ -41,6 +43,7 @@ use codex_protocol::user_input::TextElement as CoreTextElement;
use codex_protocol::user_input::UserInput as CoreUserInput;
use codex_utils_absolute_path::AbsolutePathBuf;
use mcp_types::ContentBlock as McpContentBlock;
use mcp_types::RequestId as McpRequestId;
use mcp_types::Resource as McpResource;
use mcp_types::ResourceTemplate as McpResourceTemplate;
use mcp_types::Tool as McpTool;
@@ -233,6 +236,14 @@ v2_enum_from_core!(
}
);
v2_enum_from_core!(
pub enum ElicitationAction from CoreElicitationAction {
Accept,
Decline,
Cancel
}
);
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, JsonSchema, TS)]
#[serde(tag = "type", rename_all = "camelCase")]
#[ts(tag = "type")]
@@ -1061,6 +1072,21 @@ pub struct ListMcpServerStatusResponse {
pub next_cursor: Option<String>,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct McpElicitationResolveParams {
pub thread_id: String,
pub server_name: String,
pub request_id: McpRequestId,
pub decision: ElicitationAction,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct McpElicitationResolveResponse {}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Default, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
@@ -1354,6 +1380,32 @@ pub struct ThreadRollbackResponse {
pub thread: Thread,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct ThreadCompactParams {
pub thread_id: String,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct ThreadCompactResponse {
pub turn: Turn,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct ThreadShutdownParams {
pub thread_id: String,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct ThreadShutdownResponse {}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
@@ -1810,6 +1862,8 @@ pub struct TurnStartParams {
pub approval_policy: Option<AskForApproval>,
/// Override the sandbox policy for this turn and subsequent turns.
pub sandbox_policy: Option<SandboxPolicy>,
/// Override the Windows sandbox level for this turn and subsequent turns.
pub windows_sandbox_level: Option<WindowsSandboxLevel>,
/// Override the model for this turn and subsequent turns.
pub model: Option<String>,
/// Override the reasoning effort for this turn and subsequent turns.

View File

@@ -64,6 +64,8 @@ use codex_app_server_protocol::LoginChatGptCompleteNotification;
use codex_app_server_protocol::LoginChatGptResponse;
use codex_app_server_protocol::LogoutAccountResponse;
use codex_app_server_protocol::LogoutChatGptResponse;
use codex_app_server_protocol::McpElicitationResolveParams;
use codex_app_server_protocol::McpElicitationResolveResponse;
use codex_app_server_protocol::McpServerOauthLoginCompletedNotification;
use codex_app_server_protocol::McpServerOauthLoginParams;
use codex_app_server_protocol::McpServerOauthLoginResponse;
@@ -98,6 +100,8 @@ use codex_app_server_protocol::SkillsListResponse;
use codex_app_server_protocol::Thread;
use codex_app_server_protocol::ThreadArchiveParams;
use codex_app_server_protocol::ThreadArchiveResponse;
use codex_app_server_protocol::ThreadCompactParams;
use codex_app_server_protocol::ThreadCompactResponse;
use codex_app_server_protocol::ThreadForkParams;
use codex_app_server_protocol::ThreadForkResponse;
use codex_app_server_protocol::ThreadItem;
@@ -112,6 +116,8 @@ use codex_app_server_protocol::ThreadResumeResponse;
use codex_app_server_protocol::ThreadRollbackParams;
use codex_app_server_protocol::ThreadSetNameParams;
use codex_app_server_protocol::ThreadSetNameResponse;
use codex_app_server_protocol::ThreadShutdownParams;
use codex_app_server_protocol::ThreadShutdownResponse;
use codex_app_server_protocol::ThreadSortKey;
use codex_app_server_protocol::ThreadSourceKind;
use codex_app_server_protocol::ThreadStartParams;
@@ -447,6 +453,12 @@ impl CodexMessageProcessor {
ClientRequest::ThreadRollback { request_id, params } => {
self.thread_rollback(request_id, params).await;
}
ClientRequest::ThreadCompact { request_id, params } => {
self.thread_compact(request_id, params).await;
}
ClientRequest::ThreadShutdown { request_id, params } => {
self.thread_shutdown(request_id, params).await;
}
ClientRequest::ThreadList { request_id, params } => {
self.thread_list(request_id, params).await;
}
@@ -513,6 +525,9 @@ impl CodexMessageProcessor {
ClientRequest::McpServerStatusList { request_id, params } => {
self.list_mcp_server_status(request_id, params).await;
}
ClientRequest::McpElicitationResolve { request_id, params } => {
self.mcp_elicitation_resolve(request_id, params).await;
}
ClientRequest::LoginAccount { request_id, params } => {
self.login_v2(request_id, params).await;
}
@@ -2051,6 +2066,63 @@ impl CodexMessageProcessor {
}
}
async fn thread_compact(&mut self, request_id: RequestId, params: ThreadCompactParams) {
let ThreadCompactParams { thread_id } = params;
let (thread_uuid, thread) = match self.load_thread(&thread_id).await {
Ok(v) => v,
Err(error) => {
self.outgoing.send_error(request_id, error).await;
return;
}
};
match thread.submit(Op::Compact).await {
Ok(turn_id) => {
let turn = Turn {
id: turn_id.clone(),
items: Vec::new(),
error: None,
status: TurnStatus::InProgress,
};
let response = ThreadCompactResponse { turn: turn.clone() };
self.outgoing.send_response(request_id, response).await;
let notif = TurnStartedNotification {
thread_id: thread_uuid.to_string(),
turn,
};
self.outgoing
.send_server_notification(ServerNotification::TurnStarted(notif))
.await;
}
Err(err) => {
let error = JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message: format!("failed to start compact turn: {err}"),
data: None,
};
self.outgoing.send_error(request_id, error).await;
}
}
}
async fn thread_shutdown(&mut self, request_id: RequestId, params: ThreadShutdownParams) {
let ThreadShutdownParams { thread_id } = params;
let (thread_uuid, thread) = match self.load_thread(&thread_id).await {
Ok(v) => v,
Err(error) => {
self.outgoing.send_error(request_id, error).await;
return;
}
};
let _ = thread.submit(Op::Shutdown).await;
self.thread_manager.remove_thread(&thread_uuid).await;
let response = ThreadShutdownResponse {};
self.outgoing.send_response(request_id, response).await;
}
async fn thread_list(&self, request_id: RequestId, params: ThreadListParams) {
let ThreadListParams {
cursor,
@@ -3143,6 +3215,34 @@ impl CodexMessageProcessor {
});
}
async fn mcp_elicitation_resolve(
&self,
request_id: RequestId,
params: McpElicitationResolveParams,
) {
let McpElicitationResolveParams {
thread_id,
server_name,
request_id: elicitation_id,
decision,
} = params;
let (_, thread) = match self.load_thread(&thread_id).await {
Ok(v) => v,
Err(error) => {
self.outgoing.send_error(request_id, error).await;
return;
}
};
let op = Op::ResolveElicitation {
server_name,
request_id: elicitation_id,
decision: decision.to_core(),
};
let _ = thread.submit(op).await;
let response = McpElicitationResolveResponse {};
self.outgoing.send_response(request_id, response).await;
}
async fn list_mcp_server_status_task(
outgoing: Arc<OutgoingMessageSender>,
request_id: RequestId,
@@ -4090,6 +4190,7 @@ impl CodexMessageProcessor {
let has_any_overrides = params.cwd.is_some()
|| params.approval_policy.is_some()
|| params.sandbox_policy.is_some()
|| params.windows_sandbox_level.is_some()
|| params.model.is_some()
|| params.effort.is_some()
|| params.summary.is_some()
@@ -4103,7 +4204,7 @@ impl CodexMessageProcessor {
cwd: params.cwd,
approval_policy: params.approval_policy.map(AskForApproval::to_core),
sandbox_policy: params.sandbox_policy.map(|p| p.to_core()),
windows_sandbox_level: None,
windows_sandbox_level: params.windows_sandbox_level,
model: params.model,
effort: params.effort.map(Some),
summary: params.summary,

View File

@@ -11,14 +11,22 @@ use codex_core::config_loader::LoaderOverrides;
use std::io::ErrorKind;
use std::io::Result as IoResult;
use std::path::PathBuf;
use std::sync::Arc;
use crate::message_processor::MessageProcessor;
use crate::message_processor::MessageProcessorArgs;
use crate::outgoing_message::OutgoingMessage;
use crate::outgoing_message::OutgoingMessageSender;
use codex_app_server_protocol::ClientNotification;
use codex_app_server_protocol::ClientRequest;
use codex_app_server_protocol::ConfigLayerSource;
use codex_app_server_protocol::ConfigWarningNotification;
use codex_app_server_protocol::JSONRPCErrorError;
use codex_app_server_protocol::JSONRPCMessage;
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::Result;
use codex_app_server_protocol::ServerNotification;
use codex_app_server_protocol::ServerRequest;
use codex_app_server_protocol::TextPosition as AppTextPosition;
use codex_app_server_protocol::TextRange as AppTextRange;
use codex_core::ExecPolicyError;
@@ -324,6 +332,7 @@ pub async fn run_main(
cloud_requirements: cloud_requirements.clone(),
feedback: feedback.clone(),
config_warnings,
session_source: codex_protocol::protocol::SessionSource::VSCode,
});
let mut thread_created_rx = processor.thread_created_receiver();
async move {
@@ -395,3 +404,259 @@ pub async fn run_main(
Ok(())
}
#[derive(Debug, Clone)]
pub enum AppServerClientMessage {
Request(ClientRequest),
Response {
id: RequestId,
result: Result,
},
Error {
id: RequestId,
error: JSONRPCErrorError,
},
Notification(ClientNotification),
}
#[derive(Debug, Clone, PartialEq)]
pub struct AppServerEventNotification {
pub method: String,
pub params: Option<serde_json::Value>,
}
#[derive(Debug, Clone)]
pub enum AppServerMessage {
Request(ServerRequest),
Notification(ServerNotification),
EventNotification(AppServerEventNotification),
Response {
id: RequestId,
result: Result,
},
Error {
id: RequestId,
error: JSONRPCErrorError,
},
}
pub struct InMemoryAppServer {
pub incoming: mpsc::Sender<JSONRPCMessage>,
pub outgoing: mpsc::Receiver<JSONRPCMessage>,
}
pub struct InProcessAppServer {
pub incoming: mpsc::Sender<AppServerClientMessage>,
pub outgoing: mpsc::Receiver<AppServerMessage>,
}
pub fn spawn_in_memory(
codex_linux_sandbox_exe: Option<PathBuf>,
config: Arc<Config>,
cli_overrides: Vec<(String, TomlValue)>,
loader_overrides: LoaderOverrides,
feedback: CodexFeedback,
config_warnings: Vec<ConfigWarningNotification>,
session_source: codex_protocol::protocol::SessionSource,
) -> InMemoryAppServer {
let (incoming_tx, mut incoming_rx) = mpsc::channel::<JSONRPCMessage>(CHANNEL_CAPACITY);
let (outgoing_tx, mut outgoing_rx) = mpsc::channel::<OutgoingMessage>(CHANNEL_CAPACITY);
let (client_outgoing_tx, client_outgoing_rx) =
mpsc::channel::<JSONRPCMessage>(CHANNEL_CAPACITY);
let auth_manager = AuthManager::shared(
config.codex_home.clone(),
false,
config.cli_auth_credentials_store_mode,
);
let cloud_requirements =
cloud_requirements_loader(auth_manager, config.chatgpt_base_url.clone());
tokio::spawn({
let outgoing_message_sender = OutgoingMessageSender::new(outgoing_tx);
let mut processor = MessageProcessor::new(MessageProcessorArgs {
outgoing: outgoing_message_sender,
codex_linux_sandbox_exe,
config: Arc::clone(&config),
cli_overrides,
loader_overrides,
cloud_requirements,
feedback,
config_warnings,
session_source,
});
let mut thread_created_rx = processor.thread_created_receiver();
async move {
let mut listen_for_threads = true;
loop {
tokio::select! {
msg = incoming_rx.recv() => {
let Some(msg) = msg else {
break;
};
match msg {
JSONRPCMessage::Request(r) => processor.process_request(r).await,
JSONRPCMessage::Response(r) => processor.process_response(r).await,
JSONRPCMessage::Notification(n) => processor.process_notification(n).await,
JSONRPCMessage::Error(e) => processor.process_error(e).await,
}
}
created = thread_created_rx.recv(), if listen_for_threads => {
match created {
Ok(thread_id) => {
processor.try_attach_thread_listener(thread_id).await;
}
Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => {
warn!("thread_created receiver lagged; skipping resync");
}
Err(tokio::sync::broadcast::error::RecvError::Closed) => {
listen_for_threads = false;
}
}
}
}
}
info!("in-memory processor task exited (channel closed)");
}
});
tokio::spawn(async move {
while let Some(outgoing_message) = outgoing_rx.recv().await {
let Ok(value) = serde_json::to_value(outgoing_message) else {
error!("Failed to convert OutgoingMessage to JSON value");
continue;
};
let jsonrpc = match serde_json::from_value::<JSONRPCMessage>(value) {
Ok(message) => message,
Err(err) => {
error!("Failed to deserialize OutgoingMessage as JSONRPCMessage: {err}");
continue;
}
};
if client_outgoing_tx.send(jsonrpc).await.is_err() {
break;
}
}
info!("in-memory outgoing task exited (channel closed)");
});
InMemoryAppServer {
incoming: incoming_tx,
outgoing: client_outgoing_rx,
}
}
pub fn spawn_in_memory_typed(
codex_linux_sandbox_exe: Option<PathBuf>,
config: Arc<Config>,
cli_overrides: Vec<(String, TomlValue)>,
loader_overrides: LoaderOverrides,
feedback: CodexFeedback,
config_warnings: Vec<ConfigWarningNotification>,
session_source: codex_protocol::protocol::SessionSource,
) -> InProcessAppServer {
let (incoming_tx, mut incoming_rx) = mpsc::channel::<AppServerClientMessage>(CHANNEL_CAPACITY);
let (outgoing_tx, mut outgoing_rx) = mpsc::channel::<OutgoingMessage>(CHANNEL_CAPACITY);
let (client_outgoing_tx, client_outgoing_rx) =
mpsc::channel::<AppServerMessage>(CHANNEL_CAPACITY);
let auth_manager = AuthManager::shared(
config.codex_home.clone(),
false,
config.cli_auth_credentials_store_mode,
);
let cloud_requirements =
cloud_requirements_loader(auth_manager, config.chatgpt_base_url.clone());
tokio::spawn({
let outgoing_message_sender = OutgoingMessageSender::new(outgoing_tx);
let mut processor = MessageProcessor::new(MessageProcessorArgs {
outgoing: outgoing_message_sender,
codex_linux_sandbox_exe,
config: Arc::clone(&config),
cli_overrides,
loader_overrides,
cloud_requirements,
feedback,
config_warnings,
session_source,
});
let mut thread_created_rx = processor.thread_created_receiver();
async move {
let mut listen_for_threads = true;
loop {
tokio::select! {
msg = incoming_rx.recv() => {
let Some(msg) = msg else {
break;
};
match msg {
AppServerClientMessage::Request(request) => {
processor.process_client_request(request).await;
}
AppServerClientMessage::Response { id, result } => {
processor.process_client_response(id, result).await;
}
AppServerClientMessage::Error { id, error } => {
processor.process_client_error(id, error);
}
AppServerClientMessage::Notification(notification) => {
processor.process_client_notification(notification).await;
}
}
}
created = thread_created_rx.recv(), if listen_for_threads => {
match created {
Ok(thread_id) => {
processor.try_attach_thread_listener(thread_id).await;
}
Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => {
warn!("thread_created receiver lagged; skipping resync");
}
Err(tokio::sync::broadcast::error::RecvError::Closed) => {
listen_for_threads = false;
}
}
}
}
}
info!("in-process processor task exited (channel closed)");
}
});
tokio::spawn(async move {
while let Some(outgoing_message) = outgoing_rx.recv().await {
let mapped = match outgoing_message {
OutgoingMessage::Request(request) => AppServerMessage::Request(request),
OutgoingMessage::AppServerNotification(notification) => {
AppServerMessage::Notification(notification)
}
OutgoingMessage::Notification(notification) => {
AppServerMessage::EventNotification(AppServerEventNotification {
method: notification.method,
params: notification.params,
})
}
OutgoingMessage::Response(response) => AppServerMessage::Response {
id: response.id,
result: response.result,
},
OutgoingMessage::Error(error) => AppServerMessage::Error {
id: error.id,
error: error.error,
},
};
if client_outgoing_tx.send(mapped).await.is_err() {
break;
}
}
info!("in-process outgoing task exited (channel closed)");
});
InProcessAppServer {
incoming: incoming_tx,
outgoing: client_outgoing_rx,
}
}

View File

@@ -11,6 +11,7 @@ use codex_app_server_protocol::ChatgptAuthTokensRefreshParams;
use codex_app_server_protocol::ChatgptAuthTokensRefreshReason;
use codex_app_server_protocol::ChatgptAuthTokensRefreshResponse;
use codex_app_server_protocol::ClientInfo;
use codex_app_server_protocol::ClientNotification;
use codex_app_server_protocol::ClientRequest;
use codex_app_server_protocol::ConfigBatchWriteParams;
use codex_app_server_protocol::ConfigReadParams;
@@ -23,6 +24,7 @@ use codex_app_server_protocol::JSONRPCNotification;
use codex_app_server_protocol::JSONRPCRequest;
use codex_app_server_protocol::JSONRPCResponse;
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::Result as JsonResult;
use codex_app_server_protocol::ServerNotification;
use codex_app_server_protocol::ServerRequestPayload;
use codex_core::AuthManager;
@@ -119,6 +121,7 @@ pub(crate) struct MessageProcessorArgs {
pub(crate) cloud_requirements: CloudRequirementsLoader,
pub(crate) feedback: CodexFeedback,
pub(crate) config_warnings: Vec<ConfigWarningNotification>,
pub(crate) session_source: SessionSource,
}
impl MessageProcessor {
@@ -134,7 +137,9 @@ impl MessageProcessor {
cloud_requirements,
feedback,
config_warnings,
session_source,
} = args;
let outgoing = Arc::new(outgoing);
let auth_manager = AuthManager::shared(
config.codex_home.clone(),
@@ -148,7 +153,7 @@ impl MessageProcessor {
let thread_manager = Arc::new(ThreadManager::new(
config.codex_home.clone(),
auth_manager.clone(),
SessionSource::VSCode,
session_source,
));
let codex_message_processor = CodexMessageProcessor::new(CodexMessageProcessorArgs {
auth_manager,
@@ -205,24 +210,33 @@ impl MessageProcessor {
}
};
match codex_request {
self.process_client_request(codex_request).await;
}
pub(crate) async fn process_client_request(&mut self, codex_request: ClientRequest) {
let request_id = codex_request.request_id().clone();
match &codex_request {
// Handle Initialize internally so CodexMessageProcessor does not have to concern
// itself with the `initialized` bool.
ClientRequest::Initialize { request_id, params } => {
ClientRequest::Initialize {
request_id: _,
params,
} => {
if self.initialized {
let error = JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: "Already initialized".to_string(),
data: None,
};
self.outgoing.send_error(request_id, error).await;
self.outgoing.send_error(request_id.clone(), error).await;
return;
} else {
let client_info = params.client_info.clone();
let ClientInfo {
name,
title: _title,
version,
} = params.client_info;
} = client_info;
if let Err(error) = set_default_originator(name.clone()) {
match error {
SetOriginatorError::InvalidHeaderValue => {
@@ -233,7 +247,7 @@ impl MessageProcessor {
),
data: None,
};
self.outgoing.send_error(request_id, error).await;
self.outgoing.send_error(request_id.clone(), error).await;
return;
}
SetOriginatorError::AlreadyInitialized => {
@@ -252,7 +266,9 @@ impl MessageProcessor {
let user_agent = get_codex_user_agent();
let response = InitializeResponse { user_agent };
self.outgoing.send_response(request_id, response).await;
self.outgoing
.send_response(request_id.clone(), response)
.await;
self.initialized = true;
if !self.config_warnings.is_empty() {
@@ -275,7 +291,7 @@ impl MessageProcessor {
message: "Not initialized".to_string(),
data: None,
};
self.outgoing.send_error(request_id, error).await;
self.outgoing.send_error(request_id.clone(), error).await;
return;
}
}
@@ -309,6 +325,10 @@ impl MessageProcessor {
tracing::info!("<- notification: {:?}", notification);
}
pub(crate) async fn process_client_notification(&self, notification: ClientNotification) {
tracing::info!("<- notification: {:?}", notification);
}
pub(crate) fn thread_created_receiver(&self) -> broadcast::Receiver<ThreadId> {
self.codex_message_processor.thread_created_receiver()
}
@@ -326,7 +346,11 @@ impl MessageProcessor {
pub(crate) async fn process_response(&mut self, response: JSONRPCResponse) {
tracing::info!("<- response: {:?}", response);
let JSONRPCResponse { id, result, .. } = response;
self.outgoing.notify_client_response(id, result).await
self.process_client_response(id, result).await;
}
pub(crate) async fn process_client_response(&mut self, id: RequestId, result: JsonResult) {
self.outgoing.notify_client_response(id, result).await;
}
/// Handle an error object received from the peer.
@@ -335,6 +359,10 @@ impl MessageProcessor {
self.outgoing.notify_client_error(err.id, err.error).await;
}
pub(crate) fn process_client_error(&mut self, id: RequestId, error: JSONRPCErrorError) {
tracing::error!("<- error: {:?}", JSONRPCError { id, error });
}
async fn handle_config_read(&self, request_id: RequestId, params: ConfigReadParams) {
match self.config_api.read(params).await {
Ok(response) => self.outgoing.send_response(request_id, response).await,

View File

@@ -656,6 +656,7 @@ async fn turn_start_exec_approval_toggle_v2() -> Result<()> {
}],
approval_policy: Some(codex_app_server_protocol::AskForApproval::Never),
sandbox_policy: Some(codex_app_server_protocol::SandboxPolicy::DangerFullAccess),
windows_sandbox_level: None,
model: Some("mock-model".to_string()),
effort: Some(ReasoningEffort::Medium),
summary: Some(ReasoningSummary::Auto),
@@ -898,6 +899,7 @@ async fn turn_start_updates_sandbox_and_cwd_between_turns_v2() -> Result<()> {
exclude_tmpdir_env_var: false,
exclude_slash_tmp: false,
}),
windows_sandbox_level: None,
model: Some("mock-model".to_string()),
effort: Some(ReasoningEffort::Medium),
summary: Some(ReasoningSummary::Auto),
@@ -929,6 +931,7 @@ async fn turn_start_updates_sandbox_and_cwd_between_turns_v2() -> Result<()> {
cwd: Some(second_cwd.clone()),
approval_policy: Some(codex_app_server_protocol::AskForApproval::Never),
sandbox_policy: Some(codex_app_server_protocol::SandboxPolicy::DangerFullAccess),
windows_sandbox_level: None,
model: Some("mock-model".to_string()),
effort: Some(ReasoningEffort::Medium),
summary: Some(ReasoningSummary::Auto),