mirror of
https://github.com/openai/codex.git
synced 2026-02-26 10:43:45 +00:00
Compare commits
1 Commits
dev/cc/new
...
pr10298
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
ccd0e29421 |
@@ -71,6 +71,14 @@ macro_rules! client_request_definitions {
|
||||
)*
|
||||
}
|
||||
|
||||
impl ClientRequest {
|
||||
pub fn request_id(&self) -> &RequestId {
|
||||
match self {
|
||||
$(Self::$variant { request_id, .. } => request_id,)*
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn export_client_responses(
|
||||
out_dir: &::std::path::Path,
|
||||
) -> ::std::result::Result<(), ::ts_rs::ExportError> {
|
||||
@@ -140,6 +148,14 @@ client_request_definitions! {
|
||||
params: v2::ThreadRollbackParams,
|
||||
response: v2::ThreadRollbackResponse,
|
||||
},
|
||||
ThreadCompact => "thread/compact" {
|
||||
params: v2::ThreadCompactParams,
|
||||
response: v2::ThreadCompactResponse,
|
||||
},
|
||||
ThreadShutdown => "thread/shutdown" {
|
||||
params: v2::ThreadShutdownParams,
|
||||
response: v2::ThreadShutdownResponse,
|
||||
},
|
||||
ThreadList => "thread/list" {
|
||||
params: v2::ThreadListParams,
|
||||
response: v2::ThreadListResponse,
|
||||
@@ -201,6 +217,10 @@ client_request_definitions! {
|
||||
params: v2::ListMcpServerStatusParams,
|
||||
response: v2::ListMcpServerStatusResponse,
|
||||
},
|
||||
McpElicitationResolve => "mcp/elicitation/resolve" {
|
||||
params: v2::McpElicitationResolveParams,
|
||||
response: v2::McpElicitationResolveResponse,
|
||||
},
|
||||
|
||||
LoginAccount => "account/login/start" {
|
||||
params: v2::LoginAccountParams,
|
||||
|
||||
@@ -3,6 +3,7 @@ use std::path::PathBuf;
|
||||
|
||||
use crate::protocol::common::AuthMode;
|
||||
use codex_protocol::account::PlanType;
|
||||
use codex_protocol::approvals::ElicitationAction as CoreElicitationAction;
|
||||
use codex_protocol::approvals::ExecPolicyAmendment as CoreExecPolicyAmendment;
|
||||
use codex_protocol::config_types::CollaborationMode;
|
||||
use codex_protocol::config_types::CollaborationModeMask;
|
||||
@@ -12,6 +13,7 @@ use codex_protocol::config_types::ReasoningSummary;
|
||||
use codex_protocol::config_types::SandboxMode as CoreSandboxMode;
|
||||
use codex_protocol::config_types::Verbosity;
|
||||
use codex_protocol::config_types::WebSearchMode;
|
||||
use codex_protocol::config_types::WindowsSandboxLevel;
|
||||
use codex_protocol::items::AgentMessageContent as CoreAgentMessageContent;
|
||||
use codex_protocol::items::TurnItem as CoreTurnItem;
|
||||
use codex_protocol::models::ResponseItem;
|
||||
@@ -41,6 +43,7 @@ use codex_protocol::user_input::TextElement as CoreTextElement;
|
||||
use codex_protocol::user_input::UserInput as CoreUserInput;
|
||||
use codex_utils_absolute_path::AbsolutePathBuf;
|
||||
use mcp_types::ContentBlock as McpContentBlock;
|
||||
use mcp_types::RequestId as McpRequestId;
|
||||
use mcp_types::Resource as McpResource;
|
||||
use mcp_types::ResourceTemplate as McpResourceTemplate;
|
||||
use mcp_types::Tool as McpTool;
|
||||
@@ -233,6 +236,14 @@ v2_enum_from_core!(
|
||||
}
|
||||
);
|
||||
|
||||
v2_enum_from_core!(
|
||||
pub enum ElicitationAction from CoreElicitationAction {
|
||||
Accept,
|
||||
Decline,
|
||||
Cancel
|
||||
}
|
||||
);
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, JsonSchema, TS)]
|
||||
#[serde(tag = "type", rename_all = "camelCase")]
|
||||
#[ts(tag = "type")]
|
||||
@@ -1061,6 +1072,21 @@ pub struct ListMcpServerStatusResponse {
|
||||
pub next_cursor: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
#[ts(export_to = "v2/")]
|
||||
pub struct McpElicitationResolveParams {
|
||||
pub thread_id: String,
|
||||
pub server_name: String,
|
||||
pub request_id: McpRequestId,
|
||||
pub decision: ElicitationAction,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
#[ts(export_to = "v2/")]
|
||||
pub struct McpElicitationResolveResponse {}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Default, JsonSchema, TS)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
#[ts(export_to = "v2/")]
|
||||
@@ -1354,6 +1380,32 @@ pub struct ThreadRollbackResponse {
|
||||
pub thread: Thread,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
#[ts(export_to = "v2/")]
|
||||
pub struct ThreadCompactParams {
|
||||
pub thread_id: String,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
#[ts(export_to = "v2/")]
|
||||
pub struct ThreadCompactResponse {
|
||||
pub turn: Turn,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
#[ts(export_to = "v2/")]
|
||||
pub struct ThreadShutdownParams {
|
||||
pub thread_id: String,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
#[ts(export_to = "v2/")]
|
||||
pub struct ThreadShutdownResponse {}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
#[ts(export_to = "v2/")]
|
||||
@@ -1810,6 +1862,8 @@ pub struct TurnStartParams {
|
||||
pub approval_policy: Option<AskForApproval>,
|
||||
/// Override the sandbox policy for this turn and subsequent turns.
|
||||
pub sandbox_policy: Option<SandboxPolicy>,
|
||||
/// Override the Windows sandbox level for this turn and subsequent turns.
|
||||
pub windows_sandbox_level: Option<WindowsSandboxLevel>,
|
||||
/// Override the model for this turn and subsequent turns.
|
||||
pub model: Option<String>,
|
||||
/// Override the reasoning effort for this turn and subsequent turns.
|
||||
|
||||
@@ -64,6 +64,8 @@ use codex_app_server_protocol::LoginChatGptCompleteNotification;
|
||||
use codex_app_server_protocol::LoginChatGptResponse;
|
||||
use codex_app_server_protocol::LogoutAccountResponse;
|
||||
use codex_app_server_protocol::LogoutChatGptResponse;
|
||||
use codex_app_server_protocol::McpElicitationResolveParams;
|
||||
use codex_app_server_protocol::McpElicitationResolveResponse;
|
||||
use codex_app_server_protocol::McpServerOauthLoginCompletedNotification;
|
||||
use codex_app_server_protocol::McpServerOauthLoginParams;
|
||||
use codex_app_server_protocol::McpServerOauthLoginResponse;
|
||||
@@ -98,6 +100,8 @@ use codex_app_server_protocol::SkillsListResponse;
|
||||
use codex_app_server_protocol::Thread;
|
||||
use codex_app_server_protocol::ThreadArchiveParams;
|
||||
use codex_app_server_protocol::ThreadArchiveResponse;
|
||||
use codex_app_server_protocol::ThreadCompactParams;
|
||||
use codex_app_server_protocol::ThreadCompactResponse;
|
||||
use codex_app_server_protocol::ThreadForkParams;
|
||||
use codex_app_server_protocol::ThreadForkResponse;
|
||||
use codex_app_server_protocol::ThreadItem;
|
||||
@@ -112,6 +116,8 @@ use codex_app_server_protocol::ThreadResumeResponse;
|
||||
use codex_app_server_protocol::ThreadRollbackParams;
|
||||
use codex_app_server_protocol::ThreadSetNameParams;
|
||||
use codex_app_server_protocol::ThreadSetNameResponse;
|
||||
use codex_app_server_protocol::ThreadShutdownParams;
|
||||
use codex_app_server_protocol::ThreadShutdownResponse;
|
||||
use codex_app_server_protocol::ThreadSortKey;
|
||||
use codex_app_server_protocol::ThreadSourceKind;
|
||||
use codex_app_server_protocol::ThreadStartParams;
|
||||
@@ -447,6 +453,12 @@ impl CodexMessageProcessor {
|
||||
ClientRequest::ThreadRollback { request_id, params } => {
|
||||
self.thread_rollback(request_id, params).await;
|
||||
}
|
||||
ClientRequest::ThreadCompact { request_id, params } => {
|
||||
self.thread_compact(request_id, params).await;
|
||||
}
|
||||
ClientRequest::ThreadShutdown { request_id, params } => {
|
||||
self.thread_shutdown(request_id, params).await;
|
||||
}
|
||||
ClientRequest::ThreadList { request_id, params } => {
|
||||
self.thread_list(request_id, params).await;
|
||||
}
|
||||
@@ -513,6 +525,9 @@ impl CodexMessageProcessor {
|
||||
ClientRequest::McpServerStatusList { request_id, params } => {
|
||||
self.list_mcp_server_status(request_id, params).await;
|
||||
}
|
||||
ClientRequest::McpElicitationResolve { request_id, params } => {
|
||||
self.mcp_elicitation_resolve(request_id, params).await;
|
||||
}
|
||||
ClientRequest::LoginAccount { request_id, params } => {
|
||||
self.login_v2(request_id, params).await;
|
||||
}
|
||||
@@ -2051,6 +2066,63 @@ impl CodexMessageProcessor {
|
||||
}
|
||||
}
|
||||
|
||||
async fn thread_compact(&mut self, request_id: RequestId, params: ThreadCompactParams) {
|
||||
let ThreadCompactParams { thread_id } = params;
|
||||
let (thread_uuid, thread) = match self.load_thread(&thread_id).await {
|
||||
Ok(v) => v,
|
||||
Err(error) => {
|
||||
self.outgoing.send_error(request_id, error).await;
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
match thread.submit(Op::Compact).await {
|
||||
Ok(turn_id) => {
|
||||
let turn = Turn {
|
||||
id: turn_id.clone(),
|
||||
items: Vec::new(),
|
||||
error: None,
|
||||
status: TurnStatus::InProgress,
|
||||
};
|
||||
let response = ThreadCompactResponse { turn: turn.clone() };
|
||||
self.outgoing.send_response(request_id, response).await;
|
||||
|
||||
let notif = TurnStartedNotification {
|
||||
thread_id: thread_uuid.to_string(),
|
||||
turn,
|
||||
};
|
||||
self.outgoing
|
||||
.send_server_notification(ServerNotification::TurnStarted(notif))
|
||||
.await;
|
||||
}
|
||||
Err(err) => {
|
||||
let error = JSONRPCErrorError {
|
||||
code: INTERNAL_ERROR_CODE,
|
||||
message: format!("failed to start compact turn: {err}"),
|
||||
data: None,
|
||||
};
|
||||
self.outgoing.send_error(request_id, error).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn thread_shutdown(&mut self, request_id: RequestId, params: ThreadShutdownParams) {
|
||||
let ThreadShutdownParams { thread_id } = params;
|
||||
let (thread_uuid, thread) = match self.load_thread(&thread_id).await {
|
||||
Ok(v) => v,
|
||||
Err(error) => {
|
||||
self.outgoing.send_error(request_id, error).await;
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
let _ = thread.submit(Op::Shutdown).await;
|
||||
self.thread_manager.remove_thread(&thread_uuid).await;
|
||||
|
||||
let response = ThreadShutdownResponse {};
|
||||
self.outgoing.send_response(request_id, response).await;
|
||||
}
|
||||
|
||||
async fn thread_list(&self, request_id: RequestId, params: ThreadListParams) {
|
||||
let ThreadListParams {
|
||||
cursor,
|
||||
@@ -3143,6 +3215,34 @@ impl CodexMessageProcessor {
|
||||
});
|
||||
}
|
||||
|
||||
async fn mcp_elicitation_resolve(
|
||||
&self,
|
||||
request_id: RequestId,
|
||||
params: McpElicitationResolveParams,
|
||||
) {
|
||||
let McpElicitationResolveParams {
|
||||
thread_id,
|
||||
server_name,
|
||||
request_id: elicitation_id,
|
||||
decision,
|
||||
} = params;
|
||||
let (_, thread) = match self.load_thread(&thread_id).await {
|
||||
Ok(v) => v,
|
||||
Err(error) => {
|
||||
self.outgoing.send_error(request_id, error).await;
|
||||
return;
|
||||
}
|
||||
};
|
||||
let op = Op::ResolveElicitation {
|
||||
server_name,
|
||||
request_id: elicitation_id,
|
||||
decision: decision.to_core(),
|
||||
};
|
||||
let _ = thread.submit(op).await;
|
||||
let response = McpElicitationResolveResponse {};
|
||||
self.outgoing.send_response(request_id, response).await;
|
||||
}
|
||||
|
||||
async fn list_mcp_server_status_task(
|
||||
outgoing: Arc<OutgoingMessageSender>,
|
||||
request_id: RequestId,
|
||||
@@ -4090,6 +4190,7 @@ impl CodexMessageProcessor {
|
||||
let has_any_overrides = params.cwd.is_some()
|
||||
|| params.approval_policy.is_some()
|
||||
|| params.sandbox_policy.is_some()
|
||||
|| params.windows_sandbox_level.is_some()
|
||||
|| params.model.is_some()
|
||||
|| params.effort.is_some()
|
||||
|| params.summary.is_some()
|
||||
@@ -4103,7 +4204,7 @@ impl CodexMessageProcessor {
|
||||
cwd: params.cwd,
|
||||
approval_policy: params.approval_policy.map(AskForApproval::to_core),
|
||||
sandbox_policy: params.sandbox_policy.map(|p| p.to_core()),
|
||||
windows_sandbox_level: None,
|
||||
windows_sandbox_level: params.windows_sandbox_level,
|
||||
model: params.model,
|
||||
effort: params.effort.map(Some),
|
||||
summary: params.summary,
|
||||
|
||||
@@ -11,14 +11,22 @@ use codex_core::config_loader::LoaderOverrides;
|
||||
use std::io::ErrorKind;
|
||||
use std::io::Result as IoResult;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
|
||||
use crate::message_processor::MessageProcessor;
|
||||
use crate::message_processor::MessageProcessorArgs;
|
||||
use crate::outgoing_message::OutgoingMessage;
|
||||
use crate::outgoing_message::OutgoingMessageSender;
|
||||
use codex_app_server_protocol::ClientNotification;
|
||||
use codex_app_server_protocol::ClientRequest;
|
||||
use codex_app_server_protocol::ConfigLayerSource;
|
||||
use codex_app_server_protocol::ConfigWarningNotification;
|
||||
use codex_app_server_protocol::JSONRPCErrorError;
|
||||
use codex_app_server_protocol::JSONRPCMessage;
|
||||
use codex_app_server_protocol::RequestId;
|
||||
use codex_app_server_protocol::Result;
|
||||
use codex_app_server_protocol::ServerNotification;
|
||||
use codex_app_server_protocol::ServerRequest;
|
||||
use codex_app_server_protocol::TextPosition as AppTextPosition;
|
||||
use codex_app_server_protocol::TextRange as AppTextRange;
|
||||
use codex_core::ExecPolicyError;
|
||||
@@ -324,6 +332,7 @@ pub async fn run_main(
|
||||
cloud_requirements: cloud_requirements.clone(),
|
||||
feedback: feedback.clone(),
|
||||
config_warnings,
|
||||
session_source: codex_protocol::protocol::SessionSource::VSCode,
|
||||
});
|
||||
let mut thread_created_rx = processor.thread_created_receiver();
|
||||
async move {
|
||||
@@ -395,3 +404,259 @@ pub async fn run_main(
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum AppServerClientMessage {
|
||||
Request(ClientRequest),
|
||||
Response {
|
||||
id: RequestId,
|
||||
result: Result,
|
||||
},
|
||||
Error {
|
||||
id: RequestId,
|
||||
error: JSONRPCErrorError,
|
||||
},
|
||||
Notification(ClientNotification),
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq)]
|
||||
pub struct AppServerEventNotification {
|
||||
pub method: String,
|
||||
pub params: Option<serde_json::Value>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum AppServerMessage {
|
||||
Request(ServerRequest),
|
||||
Notification(ServerNotification),
|
||||
EventNotification(AppServerEventNotification),
|
||||
Response {
|
||||
id: RequestId,
|
||||
result: Result,
|
||||
},
|
||||
Error {
|
||||
id: RequestId,
|
||||
error: JSONRPCErrorError,
|
||||
},
|
||||
}
|
||||
|
||||
pub struct InMemoryAppServer {
|
||||
pub incoming: mpsc::Sender<JSONRPCMessage>,
|
||||
pub outgoing: mpsc::Receiver<JSONRPCMessage>,
|
||||
}
|
||||
|
||||
pub struct InProcessAppServer {
|
||||
pub incoming: mpsc::Sender<AppServerClientMessage>,
|
||||
pub outgoing: mpsc::Receiver<AppServerMessage>,
|
||||
}
|
||||
|
||||
pub fn spawn_in_memory(
|
||||
codex_linux_sandbox_exe: Option<PathBuf>,
|
||||
config: Arc<Config>,
|
||||
cli_overrides: Vec<(String, TomlValue)>,
|
||||
loader_overrides: LoaderOverrides,
|
||||
feedback: CodexFeedback,
|
||||
config_warnings: Vec<ConfigWarningNotification>,
|
||||
session_source: codex_protocol::protocol::SessionSource,
|
||||
) -> InMemoryAppServer {
|
||||
let (incoming_tx, mut incoming_rx) = mpsc::channel::<JSONRPCMessage>(CHANNEL_CAPACITY);
|
||||
let (outgoing_tx, mut outgoing_rx) = mpsc::channel::<OutgoingMessage>(CHANNEL_CAPACITY);
|
||||
let (client_outgoing_tx, client_outgoing_rx) =
|
||||
mpsc::channel::<JSONRPCMessage>(CHANNEL_CAPACITY);
|
||||
|
||||
let auth_manager = AuthManager::shared(
|
||||
config.codex_home.clone(),
|
||||
false,
|
||||
config.cli_auth_credentials_store_mode,
|
||||
);
|
||||
let cloud_requirements =
|
||||
cloud_requirements_loader(auth_manager, config.chatgpt_base_url.clone());
|
||||
|
||||
tokio::spawn({
|
||||
let outgoing_message_sender = OutgoingMessageSender::new(outgoing_tx);
|
||||
let mut processor = MessageProcessor::new(MessageProcessorArgs {
|
||||
outgoing: outgoing_message_sender,
|
||||
codex_linux_sandbox_exe,
|
||||
config: Arc::clone(&config),
|
||||
cli_overrides,
|
||||
loader_overrides,
|
||||
cloud_requirements,
|
||||
feedback,
|
||||
config_warnings,
|
||||
session_source,
|
||||
});
|
||||
let mut thread_created_rx = processor.thread_created_receiver();
|
||||
async move {
|
||||
let mut listen_for_threads = true;
|
||||
loop {
|
||||
tokio::select! {
|
||||
msg = incoming_rx.recv() => {
|
||||
let Some(msg) = msg else {
|
||||
break;
|
||||
};
|
||||
match msg {
|
||||
JSONRPCMessage::Request(r) => processor.process_request(r).await,
|
||||
JSONRPCMessage::Response(r) => processor.process_response(r).await,
|
||||
JSONRPCMessage::Notification(n) => processor.process_notification(n).await,
|
||||
JSONRPCMessage::Error(e) => processor.process_error(e).await,
|
||||
}
|
||||
}
|
||||
created = thread_created_rx.recv(), if listen_for_threads => {
|
||||
match created {
|
||||
Ok(thread_id) => {
|
||||
processor.try_attach_thread_listener(thread_id).await;
|
||||
}
|
||||
Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => {
|
||||
warn!("thread_created receiver lagged; skipping resync");
|
||||
}
|
||||
Err(tokio::sync::broadcast::error::RecvError::Closed) => {
|
||||
listen_for_threads = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
info!("in-memory processor task exited (channel closed)");
|
||||
}
|
||||
});
|
||||
|
||||
tokio::spawn(async move {
|
||||
while let Some(outgoing_message) = outgoing_rx.recv().await {
|
||||
let Ok(value) = serde_json::to_value(outgoing_message) else {
|
||||
error!("Failed to convert OutgoingMessage to JSON value");
|
||||
continue;
|
||||
};
|
||||
let jsonrpc = match serde_json::from_value::<JSONRPCMessage>(value) {
|
||||
Ok(message) => message,
|
||||
Err(err) => {
|
||||
error!("Failed to deserialize OutgoingMessage as JSONRPCMessage: {err}");
|
||||
continue;
|
||||
}
|
||||
};
|
||||
if client_outgoing_tx.send(jsonrpc).await.is_err() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
info!("in-memory outgoing task exited (channel closed)");
|
||||
});
|
||||
|
||||
InMemoryAppServer {
|
||||
incoming: incoming_tx,
|
||||
outgoing: client_outgoing_rx,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn spawn_in_memory_typed(
|
||||
codex_linux_sandbox_exe: Option<PathBuf>,
|
||||
config: Arc<Config>,
|
||||
cli_overrides: Vec<(String, TomlValue)>,
|
||||
loader_overrides: LoaderOverrides,
|
||||
feedback: CodexFeedback,
|
||||
config_warnings: Vec<ConfigWarningNotification>,
|
||||
session_source: codex_protocol::protocol::SessionSource,
|
||||
) -> InProcessAppServer {
|
||||
let (incoming_tx, mut incoming_rx) = mpsc::channel::<AppServerClientMessage>(CHANNEL_CAPACITY);
|
||||
let (outgoing_tx, mut outgoing_rx) = mpsc::channel::<OutgoingMessage>(CHANNEL_CAPACITY);
|
||||
let (client_outgoing_tx, client_outgoing_rx) =
|
||||
mpsc::channel::<AppServerMessage>(CHANNEL_CAPACITY);
|
||||
|
||||
let auth_manager = AuthManager::shared(
|
||||
config.codex_home.clone(),
|
||||
false,
|
||||
config.cli_auth_credentials_store_mode,
|
||||
);
|
||||
let cloud_requirements =
|
||||
cloud_requirements_loader(auth_manager, config.chatgpt_base_url.clone());
|
||||
|
||||
tokio::spawn({
|
||||
let outgoing_message_sender = OutgoingMessageSender::new(outgoing_tx);
|
||||
let mut processor = MessageProcessor::new(MessageProcessorArgs {
|
||||
outgoing: outgoing_message_sender,
|
||||
codex_linux_sandbox_exe,
|
||||
config: Arc::clone(&config),
|
||||
cli_overrides,
|
||||
loader_overrides,
|
||||
cloud_requirements,
|
||||
feedback,
|
||||
config_warnings,
|
||||
session_source,
|
||||
});
|
||||
let mut thread_created_rx = processor.thread_created_receiver();
|
||||
async move {
|
||||
let mut listen_for_threads = true;
|
||||
loop {
|
||||
tokio::select! {
|
||||
msg = incoming_rx.recv() => {
|
||||
let Some(msg) = msg else {
|
||||
break;
|
||||
};
|
||||
match msg {
|
||||
AppServerClientMessage::Request(request) => {
|
||||
processor.process_client_request(request).await;
|
||||
}
|
||||
AppServerClientMessage::Response { id, result } => {
|
||||
processor.process_client_response(id, result).await;
|
||||
}
|
||||
AppServerClientMessage::Error { id, error } => {
|
||||
processor.process_client_error(id, error);
|
||||
}
|
||||
AppServerClientMessage::Notification(notification) => {
|
||||
processor.process_client_notification(notification).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
created = thread_created_rx.recv(), if listen_for_threads => {
|
||||
match created {
|
||||
Ok(thread_id) => {
|
||||
processor.try_attach_thread_listener(thread_id).await;
|
||||
}
|
||||
Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => {
|
||||
warn!("thread_created receiver lagged; skipping resync");
|
||||
}
|
||||
Err(tokio::sync::broadcast::error::RecvError::Closed) => {
|
||||
listen_for_threads = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
info!("in-process processor task exited (channel closed)");
|
||||
}
|
||||
});
|
||||
|
||||
tokio::spawn(async move {
|
||||
while let Some(outgoing_message) = outgoing_rx.recv().await {
|
||||
let mapped = match outgoing_message {
|
||||
OutgoingMessage::Request(request) => AppServerMessage::Request(request),
|
||||
OutgoingMessage::AppServerNotification(notification) => {
|
||||
AppServerMessage::Notification(notification)
|
||||
}
|
||||
OutgoingMessage::Notification(notification) => {
|
||||
AppServerMessage::EventNotification(AppServerEventNotification {
|
||||
method: notification.method,
|
||||
params: notification.params,
|
||||
})
|
||||
}
|
||||
OutgoingMessage::Response(response) => AppServerMessage::Response {
|
||||
id: response.id,
|
||||
result: response.result,
|
||||
},
|
||||
OutgoingMessage::Error(error) => AppServerMessage::Error {
|
||||
id: error.id,
|
||||
error: error.error,
|
||||
},
|
||||
};
|
||||
if client_outgoing_tx.send(mapped).await.is_err() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
info!("in-process outgoing task exited (channel closed)");
|
||||
});
|
||||
|
||||
InProcessAppServer {
|
||||
incoming: incoming_tx,
|
||||
outgoing: client_outgoing_rx,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -11,6 +11,7 @@ use codex_app_server_protocol::ChatgptAuthTokensRefreshParams;
|
||||
use codex_app_server_protocol::ChatgptAuthTokensRefreshReason;
|
||||
use codex_app_server_protocol::ChatgptAuthTokensRefreshResponse;
|
||||
use codex_app_server_protocol::ClientInfo;
|
||||
use codex_app_server_protocol::ClientNotification;
|
||||
use codex_app_server_protocol::ClientRequest;
|
||||
use codex_app_server_protocol::ConfigBatchWriteParams;
|
||||
use codex_app_server_protocol::ConfigReadParams;
|
||||
@@ -23,6 +24,7 @@ use codex_app_server_protocol::JSONRPCNotification;
|
||||
use codex_app_server_protocol::JSONRPCRequest;
|
||||
use codex_app_server_protocol::JSONRPCResponse;
|
||||
use codex_app_server_protocol::RequestId;
|
||||
use codex_app_server_protocol::Result as JsonResult;
|
||||
use codex_app_server_protocol::ServerNotification;
|
||||
use codex_app_server_protocol::ServerRequestPayload;
|
||||
use codex_core::AuthManager;
|
||||
@@ -119,6 +121,7 @@ pub(crate) struct MessageProcessorArgs {
|
||||
pub(crate) cloud_requirements: CloudRequirementsLoader,
|
||||
pub(crate) feedback: CodexFeedback,
|
||||
pub(crate) config_warnings: Vec<ConfigWarningNotification>,
|
||||
pub(crate) session_source: SessionSource,
|
||||
}
|
||||
|
||||
impl MessageProcessor {
|
||||
@@ -134,7 +137,9 @@ impl MessageProcessor {
|
||||
cloud_requirements,
|
||||
feedback,
|
||||
config_warnings,
|
||||
session_source,
|
||||
} = args;
|
||||
|
||||
let outgoing = Arc::new(outgoing);
|
||||
let auth_manager = AuthManager::shared(
|
||||
config.codex_home.clone(),
|
||||
@@ -148,7 +153,7 @@ impl MessageProcessor {
|
||||
let thread_manager = Arc::new(ThreadManager::new(
|
||||
config.codex_home.clone(),
|
||||
auth_manager.clone(),
|
||||
SessionSource::VSCode,
|
||||
session_source,
|
||||
));
|
||||
let codex_message_processor = CodexMessageProcessor::new(CodexMessageProcessorArgs {
|
||||
auth_manager,
|
||||
@@ -205,24 +210,33 @@ impl MessageProcessor {
|
||||
}
|
||||
};
|
||||
|
||||
match codex_request {
|
||||
self.process_client_request(codex_request).await;
|
||||
}
|
||||
|
||||
pub(crate) async fn process_client_request(&mut self, codex_request: ClientRequest) {
|
||||
let request_id = codex_request.request_id().clone();
|
||||
match &codex_request {
|
||||
// Handle Initialize internally so CodexMessageProcessor does not have to concern
|
||||
// itself with the `initialized` bool.
|
||||
ClientRequest::Initialize { request_id, params } => {
|
||||
ClientRequest::Initialize {
|
||||
request_id: _,
|
||||
params,
|
||||
} => {
|
||||
if self.initialized {
|
||||
let error = JSONRPCErrorError {
|
||||
code: INVALID_REQUEST_ERROR_CODE,
|
||||
message: "Already initialized".to_string(),
|
||||
data: None,
|
||||
};
|
||||
self.outgoing.send_error(request_id, error).await;
|
||||
self.outgoing.send_error(request_id.clone(), error).await;
|
||||
return;
|
||||
} else {
|
||||
let client_info = params.client_info.clone();
|
||||
let ClientInfo {
|
||||
name,
|
||||
title: _title,
|
||||
version,
|
||||
} = params.client_info;
|
||||
} = client_info;
|
||||
if let Err(error) = set_default_originator(name.clone()) {
|
||||
match error {
|
||||
SetOriginatorError::InvalidHeaderValue => {
|
||||
@@ -233,7 +247,7 @@ impl MessageProcessor {
|
||||
),
|
||||
data: None,
|
||||
};
|
||||
self.outgoing.send_error(request_id, error).await;
|
||||
self.outgoing.send_error(request_id.clone(), error).await;
|
||||
return;
|
||||
}
|
||||
SetOriginatorError::AlreadyInitialized => {
|
||||
@@ -252,7 +266,9 @@ impl MessageProcessor {
|
||||
|
||||
let user_agent = get_codex_user_agent();
|
||||
let response = InitializeResponse { user_agent };
|
||||
self.outgoing.send_response(request_id, response).await;
|
||||
self.outgoing
|
||||
.send_response(request_id.clone(), response)
|
||||
.await;
|
||||
|
||||
self.initialized = true;
|
||||
if !self.config_warnings.is_empty() {
|
||||
@@ -275,7 +291,7 @@ impl MessageProcessor {
|
||||
message: "Not initialized".to_string(),
|
||||
data: None,
|
||||
};
|
||||
self.outgoing.send_error(request_id, error).await;
|
||||
self.outgoing.send_error(request_id.clone(), error).await;
|
||||
return;
|
||||
}
|
||||
}
|
||||
@@ -309,6 +325,10 @@ impl MessageProcessor {
|
||||
tracing::info!("<- notification: {:?}", notification);
|
||||
}
|
||||
|
||||
pub(crate) async fn process_client_notification(&self, notification: ClientNotification) {
|
||||
tracing::info!("<- notification: {:?}", notification);
|
||||
}
|
||||
|
||||
pub(crate) fn thread_created_receiver(&self) -> broadcast::Receiver<ThreadId> {
|
||||
self.codex_message_processor.thread_created_receiver()
|
||||
}
|
||||
@@ -326,7 +346,11 @@ impl MessageProcessor {
|
||||
pub(crate) async fn process_response(&mut self, response: JSONRPCResponse) {
|
||||
tracing::info!("<- response: {:?}", response);
|
||||
let JSONRPCResponse { id, result, .. } = response;
|
||||
self.outgoing.notify_client_response(id, result).await
|
||||
self.process_client_response(id, result).await;
|
||||
}
|
||||
|
||||
pub(crate) async fn process_client_response(&mut self, id: RequestId, result: JsonResult) {
|
||||
self.outgoing.notify_client_response(id, result).await;
|
||||
}
|
||||
|
||||
/// Handle an error object received from the peer.
|
||||
@@ -335,6 +359,10 @@ impl MessageProcessor {
|
||||
self.outgoing.notify_client_error(err.id, err.error).await;
|
||||
}
|
||||
|
||||
pub(crate) fn process_client_error(&mut self, id: RequestId, error: JSONRPCErrorError) {
|
||||
tracing::error!("<- error: {:?}", JSONRPCError { id, error });
|
||||
}
|
||||
|
||||
async fn handle_config_read(&self, request_id: RequestId, params: ConfigReadParams) {
|
||||
match self.config_api.read(params).await {
|
||||
Ok(response) => self.outgoing.send_response(request_id, response).await,
|
||||
|
||||
@@ -656,6 +656,7 @@ async fn turn_start_exec_approval_toggle_v2() -> Result<()> {
|
||||
}],
|
||||
approval_policy: Some(codex_app_server_protocol::AskForApproval::Never),
|
||||
sandbox_policy: Some(codex_app_server_protocol::SandboxPolicy::DangerFullAccess),
|
||||
windows_sandbox_level: None,
|
||||
model: Some("mock-model".to_string()),
|
||||
effort: Some(ReasoningEffort::Medium),
|
||||
summary: Some(ReasoningSummary::Auto),
|
||||
@@ -898,6 +899,7 @@ async fn turn_start_updates_sandbox_and_cwd_between_turns_v2() -> Result<()> {
|
||||
exclude_tmpdir_env_var: false,
|
||||
exclude_slash_tmp: false,
|
||||
}),
|
||||
windows_sandbox_level: None,
|
||||
model: Some("mock-model".to_string()),
|
||||
effort: Some(ReasoningEffort::Medium),
|
||||
summary: Some(ReasoningSummary::Auto),
|
||||
@@ -929,6 +931,7 @@ async fn turn_start_updates_sandbox_and_cwd_between_turns_v2() -> Result<()> {
|
||||
cwd: Some(second_cwd.clone()),
|
||||
approval_policy: Some(codex_app_server_protocol::AskForApproval::Never),
|
||||
sandbox_policy: Some(codex_app_server_protocol::SandboxPolicy::DangerFullAccess),
|
||||
windows_sandbox_level: None,
|
||||
model: Some("mock-model".to_string()),
|
||||
effort: Some(ReasoningEffort::Medium),
|
||||
summary: Some(ReasoningSummary::Auto),
|
||||
|
||||
Reference in New Issue
Block a user