mirror of
https://github.com/openai/codex.git
synced 2026-02-02 06:57:03 +00:00
Compare commits
1 Commits
dev/cc/new
...
pr10192
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
14e4341c1c |
1
codex-rs/Cargo.lock
generated
1
codex-rs/Cargo.lock
generated
@@ -1910,6 +1910,7 @@ dependencies = [
|
||||
"chrono",
|
||||
"clap",
|
||||
"codex-ansi-escape",
|
||||
"codex-app-server",
|
||||
"codex-app-server-protocol",
|
||||
"codex-arg0",
|
||||
"codex-backend-client",
|
||||
|
||||
@@ -71,6 +71,14 @@ macro_rules! client_request_definitions {
|
||||
)*
|
||||
}
|
||||
|
||||
impl ClientRequest {
|
||||
pub fn request_id(&self) -> &RequestId {
|
||||
match self {
|
||||
$(Self::$variant { request_id, .. } => request_id,)*
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn export_client_responses(
|
||||
out_dir: &::std::path::Path,
|
||||
) -> ::std::result::Result<(), ::ts_rs::ExportError> {
|
||||
@@ -140,6 +148,14 @@ client_request_definitions! {
|
||||
params: v2::ThreadRollbackParams,
|
||||
response: v2::ThreadRollbackResponse,
|
||||
},
|
||||
ThreadCompact => "thread/compact" {
|
||||
params: v2::ThreadCompactParams,
|
||||
response: v2::ThreadCompactResponse,
|
||||
},
|
||||
ThreadShutdown => "thread/shutdown" {
|
||||
params: v2::ThreadShutdownParams,
|
||||
response: v2::ThreadShutdownResponse,
|
||||
},
|
||||
ThreadList => "thread/list" {
|
||||
params: v2::ThreadListParams,
|
||||
response: v2::ThreadListResponse,
|
||||
@@ -201,6 +217,10 @@ client_request_definitions! {
|
||||
params: v2::ListMcpServerStatusParams,
|
||||
response: v2::ListMcpServerStatusResponse,
|
||||
},
|
||||
McpElicitationResolve => "mcp/elicitation/resolve" {
|
||||
params: v2::McpElicitationResolveParams,
|
||||
response: v2::McpElicitationResolveResponse,
|
||||
},
|
||||
|
||||
LoginAccount => "account/login/start" {
|
||||
params: v2::LoginAccountParams,
|
||||
|
||||
@@ -3,6 +3,7 @@ use std::path::PathBuf;
|
||||
|
||||
use crate::protocol::common::AuthMode;
|
||||
use codex_protocol::account::PlanType;
|
||||
use codex_protocol::approvals::ElicitationAction as CoreElicitationAction;
|
||||
use codex_protocol::approvals::ExecPolicyAmendment as CoreExecPolicyAmendment;
|
||||
use codex_protocol::config_types::CollaborationMode;
|
||||
use codex_protocol::config_types::CollaborationModeMask;
|
||||
@@ -12,6 +13,7 @@ use codex_protocol::config_types::ReasoningSummary;
|
||||
use codex_protocol::config_types::SandboxMode as CoreSandboxMode;
|
||||
use codex_protocol::config_types::Verbosity;
|
||||
use codex_protocol::config_types::WebSearchMode;
|
||||
use codex_protocol::config_types::WindowsSandboxLevel;
|
||||
use codex_protocol::items::AgentMessageContent as CoreAgentMessageContent;
|
||||
use codex_protocol::items::TurnItem as CoreTurnItem;
|
||||
use codex_protocol::models::ResponseItem;
|
||||
@@ -41,6 +43,7 @@ use codex_protocol::user_input::TextElement as CoreTextElement;
|
||||
use codex_protocol::user_input::UserInput as CoreUserInput;
|
||||
use codex_utils_absolute_path::AbsolutePathBuf;
|
||||
use mcp_types::ContentBlock as McpContentBlock;
|
||||
use mcp_types::RequestId as McpRequestId;
|
||||
use mcp_types::Resource as McpResource;
|
||||
use mcp_types::ResourceTemplate as McpResourceTemplate;
|
||||
use mcp_types::Tool as McpTool;
|
||||
@@ -233,6 +236,14 @@ v2_enum_from_core!(
|
||||
}
|
||||
);
|
||||
|
||||
v2_enum_from_core!(
|
||||
pub enum ElicitationAction from CoreElicitationAction {
|
||||
Accept,
|
||||
Decline,
|
||||
Cancel
|
||||
}
|
||||
);
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, JsonSchema, TS)]
|
||||
#[serde(tag = "type", rename_all = "camelCase")]
|
||||
#[ts(tag = "type")]
|
||||
@@ -1061,6 +1072,21 @@ pub struct ListMcpServerStatusResponse {
|
||||
pub next_cursor: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
#[ts(export_to = "v2/")]
|
||||
pub struct McpElicitationResolveParams {
|
||||
pub thread_id: String,
|
||||
pub server_name: String,
|
||||
pub request_id: McpRequestId,
|
||||
pub decision: ElicitationAction,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
#[ts(export_to = "v2/")]
|
||||
pub struct McpElicitationResolveResponse {}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Default, JsonSchema, TS)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
#[ts(export_to = "v2/")]
|
||||
@@ -1354,6 +1380,32 @@ pub struct ThreadRollbackResponse {
|
||||
pub thread: Thread,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
#[ts(export_to = "v2/")]
|
||||
pub struct ThreadCompactParams {
|
||||
pub thread_id: String,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
#[ts(export_to = "v2/")]
|
||||
pub struct ThreadCompactResponse {
|
||||
pub turn: Turn,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
#[ts(export_to = "v2/")]
|
||||
pub struct ThreadShutdownParams {
|
||||
pub thread_id: String,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
#[ts(export_to = "v2/")]
|
||||
pub struct ThreadShutdownResponse {}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
#[ts(export_to = "v2/")]
|
||||
@@ -1810,6 +1862,8 @@ pub struct TurnStartParams {
|
||||
pub approval_policy: Option<AskForApproval>,
|
||||
/// Override the sandbox policy for this turn and subsequent turns.
|
||||
pub sandbox_policy: Option<SandboxPolicy>,
|
||||
/// Override the Windows sandbox level for this turn and subsequent turns.
|
||||
pub windows_sandbox_level: Option<WindowsSandboxLevel>,
|
||||
/// Override the model for this turn and subsequent turns.
|
||||
pub model: Option<String>,
|
||||
/// Override the reasoning effort for this turn and subsequent turns.
|
||||
|
||||
@@ -64,6 +64,8 @@ use codex_app_server_protocol::LoginChatGptCompleteNotification;
|
||||
use codex_app_server_protocol::LoginChatGptResponse;
|
||||
use codex_app_server_protocol::LogoutAccountResponse;
|
||||
use codex_app_server_protocol::LogoutChatGptResponse;
|
||||
use codex_app_server_protocol::McpElicitationResolveParams;
|
||||
use codex_app_server_protocol::McpElicitationResolveResponse;
|
||||
use codex_app_server_protocol::McpServerOauthLoginCompletedNotification;
|
||||
use codex_app_server_protocol::McpServerOauthLoginParams;
|
||||
use codex_app_server_protocol::McpServerOauthLoginResponse;
|
||||
@@ -98,6 +100,8 @@ use codex_app_server_protocol::SkillsListResponse;
|
||||
use codex_app_server_protocol::Thread;
|
||||
use codex_app_server_protocol::ThreadArchiveParams;
|
||||
use codex_app_server_protocol::ThreadArchiveResponse;
|
||||
use codex_app_server_protocol::ThreadCompactParams;
|
||||
use codex_app_server_protocol::ThreadCompactResponse;
|
||||
use codex_app_server_protocol::ThreadForkParams;
|
||||
use codex_app_server_protocol::ThreadForkResponse;
|
||||
use codex_app_server_protocol::ThreadItem;
|
||||
@@ -112,6 +116,8 @@ use codex_app_server_protocol::ThreadResumeResponse;
|
||||
use codex_app_server_protocol::ThreadRollbackParams;
|
||||
use codex_app_server_protocol::ThreadSetNameParams;
|
||||
use codex_app_server_protocol::ThreadSetNameResponse;
|
||||
use codex_app_server_protocol::ThreadShutdownParams;
|
||||
use codex_app_server_protocol::ThreadShutdownResponse;
|
||||
use codex_app_server_protocol::ThreadSortKey;
|
||||
use codex_app_server_protocol::ThreadSourceKind;
|
||||
use codex_app_server_protocol::ThreadStartParams;
|
||||
@@ -447,6 +453,12 @@ impl CodexMessageProcessor {
|
||||
ClientRequest::ThreadRollback { request_id, params } => {
|
||||
self.thread_rollback(request_id, params).await;
|
||||
}
|
||||
ClientRequest::ThreadCompact { request_id, params } => {
|
||||
self.thread_compact(request_id, params).await;
|
||||
}
|
||||
ClientRequest::ThreadShutdown { request_id, params } => {
|
||||
self.thread_shutdown(request_id, params).await;
|
||||
}
|
||||
ClientRequest::ThreadList { request_id, params } => {
|
||||
self.thread_list(request_id, params).await;
|
||||
}
|
||||
@@ -513,6 +525,9 @@ impl CodexMessageProcessor {
|
||||
ClientRequest::McpServerStatusList { request_id, params } => {
|
||||
self.list_mcp_server_status(request_id, params).await;
|
||||
}
|
||||
ClientRequest::McpElicitationResolve { request_id, params } => {
|
||||
self.mcp_elicitation_resolve(request_id, params).await;
|
||||
}
|
||||
ClientRequest::LoginAccount { request_id, params } => {
|
||||
self.login_v2(request_id, params).await;
|
||||
}
|
||||
@@ -2051,6 +2066,63 @@ impl CodexMessageProcessor {
|
||||
}
|
||||
}
|
||||
|
||||
async fn thread_compact(&mut self, request_id: RequestId, params: ThreadCompactParams) {
|
||||
let ThreadCompactParams { thread_id } = params;
|
||||
let (thread_uuid, thread) = match self.load_thread(&thread_id).await {
|
||||
Ok(v) => v,
|
||||
Err(error) => {
|
||||
self.outgoing.send_error(request_id, error).await;
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
match thread.submit(Op::Compact).await {
|
||||
Ok(turn_id) => {
|
||||
let turn = Turn {
|
||||
id: turn_id.clone(),
|
||||
items: Vec::new(),
|
||||
error: None,
|
||||
status: TurnStatus::InProgress,
|
||||
};
|
||||
let response = ThreadCompactResponse { turn: turn.clone() };
|
||||
self.outgoing.send_response(request_id, response).await;
|
||||
|
||||
let notif = TurnStartedNotification {
|
||||
thread_id: thread_uuid.to_string(),
|
||||
turn,
|
||||
};
|
||||
self.outgoing
|
||||
.send_server_notification(ServerNotification::TurnStarted(notif))
|
||||
.await;
|
||||
}
|
||||
Err(err) => {
|
||||
let error = JSONRPCErrorError {
|
||||
code: INTERNAL_ERROR_CODE,
|
||||
message: format!("failed to start compact turn: {err}"),
|
||||
data: None,
|
||||
};
|
||||
self.outgoing.send_error(request_id, error).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn thread_shutdown(&mut self, request_id: RequestId, params: ThreadShutdownParams) {
|
||||
let ThreadShutdownParams { thread_id } = params;
|
||||
let (thread_uuid, thread) = match self.load_thread(&thread_id).await {
|
||||
Ok(v) => v,
|
||||
Err(error) => {
|
||||
self.outgoing.send_error(request_id, error).await;
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
let _ = thread.submit(Op::Shutdown).await;
|
||||
self.thread_manager.remove_thread(&thread_uuid).await;
|
||||
|
||||
let response = ThreadShutdownResponse {};
|
||||
self.outgoing.send_response(request_id, response).await;
|
||||
}
|
||||
|
||||
async fn thread_list(&self, request_id: RequestId, params: ThreadListParams) {
|
||||
let ThreadListParams {
|
||||
cursor,
|
||||
@@ -3143,6 +3215,34 @@ impl CodexMessageProcessor {
|
||||
});
|
||||
}
|
||||
|
||||
async fn mcp_elicitation_resolve(
|
||||
&self,
|
||||
request_id: RequestId,
|
||||
params: McpElicitationResolveParams,
|
||||
) {
|
||||
let McpElicitationResolveParams {
|
||||
thread_id,
|
||||
server_name,
|
||||
request_id: elicitation_id,
|
||||
decision,
|
||||
} = params;
|
||||
let (_, thread) = match self.load_thread(&thread_id).await {
|
||||
Ok(v) => v,
|
||||
Err(error) => {
|
||||
self.outgoing.send_error(request_id, error).await;
|
||||
return;
|
||||
}
|
||||
};
|
||||
let op = Op::ResolveElicitation {
|
||||
server_name,
|
||||
request_id: elicitation_id,
|
||||
decision: decision.to_core(),
|
||||
};
|
||||
let _ = thread.submit(op).await;
|
||||
let response = McpElicitationResolveResponse {};
|
||||
self.outgoing.send_response(request_id, response).await;
|
||||
}
|
||||
|
||||
async fn list_mcp_server_status_task(
|
||||
outgoing: Arc<OutgoingMessageSender>,
|
||||
request_id: RequestId,
|
||||
@@ -4090,6 +4190,7 @@ impl CodexMessageProcessor {
|
||||
let has_any_overrides = params.cwd.is_some()
|
||||
|| params.approval_policy.is_some()
|
||||
|| params.sandbox_policy.is_some()
|
||||
|| params.windows_sandbox_level.is_some()
|
||||
|| params.model.is_some()
|
||||
|| params.effort.is_some()
|
||||
|| params.summary.is_some()
|
||||
@@ -4103,7 +4204,7 @@ impl CodexMessageProcessor {
|
||||
cwd: params.cwd,
|
||||
approval_policy: params.approval_policy.map(AskForApproval::to_core),
|
||||
sandbox_policy: params.sandbox_policy.map(|p| p.to_core()),
|
||||
windows_sandbox_level: None,
|
||||
windows_sandbox_level: params.windows_sandbox_level,
|
||||
model: params.model,
|
||||
effort: params.effort.map(Some),
|
||||
summary: params.summary,
|
||||
|
||||
@@ -11,14 +11,22 @@ use codex_core::config_loader::LoaderOverrides;
|
||||
use std::io::ErrorKind;
|
||||
use std::io::Result as IoResult;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
|
||||
use crate::message_processor::MessageProcessor;
|
||||
use crate::message_processor::MessageProcessorArgs;
|
||||
use crate::outgoing_message::OutgoingMessage;
|
||||
use crate::outgoing_message::OutgoingMessageSender;
|
||||
use codex_app_server_protocol::ClientNotification;
|
||||
use codex_app_server_protocol::ClientRequest;
|
||||
use codex_app_server_protocol::ConfigLayerSource;
|
||||
use codex_app_server_protocol::ConfigWarningNotification;
|
||||
use codex_app_server_protocol::JSONRPCErrorError;
|
||||
use codex_app_server_protocol::JSONRPCMessage;
|
||||
use codex_app_server_protocol::RequestId;
|
||||
use codex_app_server_protocol::Result;
|
||||
use codex_app_server_protocol::ServerNotification;
|
||||
use codex_app_server_protocol::ServerRequest;
|
||||
use codex_app_server_protocol::TextPosition as AppTextPosition;
|
||||
use codex_app_server_protocol::TextRange as AppTextRange;
|
||||
use codex_core::ExecPolicyError;
|
||||
@@ -324,6 +332,7 @@ pub async fn run_main(
|
||||
cloud_requirements: cloud_requirements.clone(),
|
||||
feedback: feedback.clone(),
|
||||
config_warnings,
|
||||
session_source: codex_protocol::protocol::SessionSource::VSCode,
|
||||
});
|
||||
let mut thread_created_rx = processor.thread_created_receiver();
|
||||
async move {
|
||||
@@ -395,3 +404,259 @@ pub async fn run_main(
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum AppServerClientMessage {
|
||||
Request(ClientRequest),
|
||||
Response {
|
||||
id: RequestId,
|
||||
result: Result,
|
||||
},
|
||||
Error {
|
||||
id: RequestId,
|
||||
error: JSONRPCErrorError,
|
||||
},
|
||||
Notification(ClientNotification),
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq)]
|
||||
pub struct AppServerEventNotification {
|
||||
pub method: String,
|
||||
pub params: Option<serde_json::Value>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum AppServerMessage {
|
||||
Request(ServerRequest),
|
||||
Notification(ServerNotification),
|
||||
EventNotification(AppServerEventNotification),
|
||||
Response {
|
||||
id: RequestId,
|
||||
result: Result,
|
||||
},
|
||||
Error {
|
||||
id: RequestId,
|
||||
error: JSONRPCErrorError,
|
||||
},
|
||||
}
|
||||
|
||||
pub struct InMemoryAppServer {
|
||||
pub incoming: mpsc::Sender<JSONRPCMessage>,
|
||||
pub outgoing: mpsc::Receiver<JSONRPCMessage>,
|
||||
}
|
||||
|
||||
pub struct InProcessAppServer {
|
||||
pub incoming: mpsc::Sender<AppServerClientMessage>,
|
||||
pub outgoing: mpsc::Receiver<AppServerMessage>,
|
||||
}
|
||||
|
||||
pub fn spawn_in_memory(
|
||||
codex_linux_sandbox_exe: Option<PathBuf>,
|
||||
config: Arc<Config>,
|
||||
cli_overrides: Vec<(String, TomlValue)>,
|
||||
loader_overrides: LoaderOverrides,
|
||||
feedback: CodexFeedback,
|
||||
config_warnings: Vec<ConfigWarningNotification>,
|
||||
session_source: codex_protocol::protocol::SessionSource,
|
||||
) -> InMemoryAppServer {
|
||||
let (incoming_tx, mut incoming_rx) = mpsc::channel::<JSONRPCMessage>(CHANNEL_CAPACITY);
|
||||
let (outgoing_tx, mut outgoing_rx) = mpsc::channel::<OutgoingMessage>(CHANNEL_CAPACITY);
|
||||
let (client_outgoing_tx, client_outgoing_rx) =
|
||||
mpsc::channel::<JSONRPCMessage>(CHANNEL_CAPACITY);
|
||||
|
||||
let auth_manager = AuthManager::shared(
|
||||
config.codex_home.clone(),
|
||||
false,
|
||||
config.cli_auth_credentials_store_mode,
|
||||
);
|
||||
let cloud_requirements =
|
||||
cloud_requirements_loader(auth_manager, config.chatgpt_base_url.clone());
|
||||
|
||||
tokio::spawn({
|
||||
let outgoing_message_sender = OutgoingMessageSender::new(outgoing_tx);
|
||||
let mut processor = MessageProcessor::new(MessageProcessorArgs {
|
||||
outgoing: outgoing_message_sender,
|
||||
codex_linux_sandbox_exe,
|
||||
config: Arc::clone(&config),
|
||||
cli_overrides,
|
||||
loader_overrides,
|
||||
cloud_requirements,
|
||||
feedback,
|
||||
config_warnings,
|
||||
session_source,
|
||||
});
|
||||
let mut thread_created_rx = processor.thread_created_receiver();
|
||||
async move {
|
||||
let mut listen_for_threads = true;
|
||||
loop {
|
||||
tokio::select! {
|
||||
msg = incoming_rx.recv() => {
|
||||
let Some(msg) = msg else {
|
||||
break;
|
||||
};
|
||||
match msg {
|
||||
JSONRPCMessage::Request(r) => processor.process_request(r).await,
|
||||
JSONRPCMessage::Response(r) => processor.process_response(r).await,
|
||||
JSONRPCMessage::Notification(n) => processor.process_notification(n).await,
|
||||
JSONRPCMessage::Error(e) => processor.process_error(e).await,
|
||||
}
|
||||
}
|
||||
created = thread_created_rx.recv(), if listen_for_threads => {
|
||||
match created {
|
||||
Ok(thread_id) => {
|
||||
processor.try_attach_thread_listener(thread_id).await;
|
||||
}
|
||||
Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => {
|
||||
warn!("thread_created receiver lagged; skipping resync");
|
||||
}
|
||||
Err(tokio::sync::broadcast::error::RecvError::Closed) => {
|
||||
listen_for_threads = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
info!("in-memory processor task exited (channel closed)");
|
||||
}
|
||||
});
|
||||
|
||||
tokio::spawn(async move {
|
||||
while let Some(outgoing_message) = outgoing_rx.recv().await {
|
||||
let Ok(value) = serde_json::to_value(outgoing_message) else {
|
||||
error!("Failed to convert OutgoingMessage to JSON value");
|
||||
continue;
|
||||
};
|
||||
let jsonrpc = match serde_json::from_value::<JSONRPCMessage>(value) {
|
||||
Ok(message) => message,
|
||||
Err(err) => {
|
||||
error!("Failed to deserialize OutgoingMessage as JSONRPCMessage: {err}");
|
||||
continue;
|
||||
}
|
||||
};
|
||||
if client_outgoing_tx.send(jsonrpc).await.is_err() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
info!("in-memory outgoing task exited (channel closed)");
|
||||
});
|
||||
|
||||
InMemoryAppServer {
|
||||
incoming: incoming_tx,
|
||||
outgoing: client_outgoing_rx,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn spawn_in_memory_typed(
|
||||
codex_linux_sandbox_exe: Option<PathBuf>,
|
||||
config: Arc<Config>,
|
||||
cli_overrides: Vec<(String, TomlValue)>,
|
||||
loader_overrides: LoaderOverrides,
|
||||
feedback: CodexFeedback,
|
||||
config_warnings: Vec<ConfigWarningNotification>,
|
||||
session_source: codex_protocol::protocol::SessionSource,
|
||||
) -> InProcessAppServer {
|
||||
let (incoming_tx, mut incoming_rx) = mpsc::channel::<AppServerClientMessage>(CHANNEL_CAPACITY);
|
||||
let (outgoing_tx, mut outgoing_rx) = mpsc::channel::<OutgoingMessage>(CHANNEL_CAPACITY);
|
||||
let (client_outgoing_tx, client_outgoing_rx) =
|
||||
mpsc::channel::<AppServerMessage>(CHANNEL_CAPACITY);
|
||||
|
||||
let auth_manager = AuthManager::shared(
|
||||
config.codex_home.clone(),
|
||||
false,
|
||||
config.cli_auth_credentials_store_mode,
|
||||
);
|
||||
let cloud_requirements =
|
||||
cloud_requirements_loader(auth_manager, config.chatgpt_base_url.clone());
|
||||
|
||||
tokio::spawn({
|
||||
let outgoing_message_sender = OutgoingMessageSender::new(outgoing_tx);
|
||||
let mut processor = MessageProcessor::new(MessageProcessorArgs {
|
||||
outgoing: outgoing_message_sender,
|
||||
codex_linux_sandbox_exe,
|
||||
config: Arc::clone(&config),
|
||||
cli_overrides,
|
||||
loader_overrides,
|
||||
cloud_requirements,
|
||||
feedback,
|
||||
config_warnings,
|
||||
session_source,
|
||||
});
|
||||
let mut thread_created_rx = processor.thread_created_receiver();
|
||||
async move {
|
||||
let mut listen_for_threads = true;
|
||||
loop {
|
||||
tokio::select! {
|
||||
msg = incoming_rx.recv() => {
|
||||
let Some(msg) = msg else {
|
||||
break;
|
||||
};
|
||||
match msg {
|
||||
AppServerClientMessage::Request(request) => {
|
||||
processor.process_client_request(request).await;
|
||||
}
|
||||
AppServerClientMessage::Response { id, result } => {
|
||||
processor.process_client_response(id, result).await;
|
||||
}
|
||||
AppServerClientMessage::Error { id, error } => {
|
||||
processor.process_client_error(id, error);
|
||||
}
|
||||
AppServerClientMessage::Notification(notification) => {
|
||||
processor.process_client_notification(notification).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
created = thread_created_rx.recv(), if listen_for_threads => {
|
||||
match created {
|
||||
Ok(thread_id) => {
|
||||
processor.try_attach_thread_listener(thread_id).await;
|
||||
}
|
||||
Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => {
|
||||
warn!("thread_created receiver lagged; skipping resync");
|
||||
}
|
||||
Err(tokio::sync::broadcast::error::RecvError::Closed) => {
|
||||
listen_for_threads = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
info!("in-process processor task exited (channel closed)");
|
||||
}
|
||||
});
|
||||
|
||||
tokio::spawn(async move {
|
||||
while let Some(outgoing_message) = outgoing_rx.recv().await {
|
||||
let mapped = match outgoing_message {
|
||||
OutgoingMessage::Request(request) => AppServerMessage::Request(request),
|
||||
OutgoingMessage::AppServerNotification(notification) => {
|
||||
AppServerMessage::Notification(notification)
|
||||
}
|
||||
OutgoingMessage::Notification(notification) => {
|
||||
AppServerMessage::EventNotification(AppServerEventNotification {
|
||||
method: notification.method,
|
||||
params: notification.params,
|
||||
})
|
||||
}
|
||||
OutgoingMessage::Response(response) => AppServerMessage::Response {
|
||||
id: response.id,
|
||||
result: response.result,
|
||||
},
|
||||
OutgoingMessage::Error(error) => AppServerMessage::Error {
|
||||
id: error.id,
|
||||
error: error.error,
|
||||
},
|
||||
};
|
||||
if client_outgoing_tx.send(mapped).await.is_err() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
info!("in-process outgoing task exited (channel closed)");
|
||||
});
|
||||
|
||||
InProcessAppServer {
|
||||
incoming: incoming_tx,
|
||||
outgoing: client_outgoing_rx,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -11,6 +11,7 @@ use codex_app_server_protocol::ChatgptAuthTokensRefreshParams;
|
||||
use codex_app_server_protocol::ChatgptAuthTokensRefreshReason;
|
||||
use codex_app_server_protocol::ChatgptAuthTokensRefreshResponse;
|
||||
use codex_app_server_protocol::ClientInfo;
|
||||
use codex_app_server_protocol::ClientNotification;
|
||||
use codex_app_server_protocol::ClientRequest;
|
||||
use codex_app_server_protocol::ConfigBatchWriteParams;
|
||||
use codex_app_server_protocol::ConfigReadParams;
|
||||
@@ -23,6 +24,7 @@ use codex_app_server_protocol::JSONRPCNotification;
|
||||
use codex_app_server_protocol::JSONRPCRequest;
|
||||
use codex_app_server_protocol::JSONRPCResponse;
|
||||
use codex_app_server_protocol::RequestId;
|
||||
use codex_app_server_protocol::Result as JsonResult;
|
||||
use codex_app_server_protocol::ServerNotification;
|
||||
use codex_app_server_protocol::ServerRequestPayload;
|
||||
use codex_core::AuthManager;
|
||||
@@ -119,6 +121,7 @@ pub(crate) struct MessageProcessorArgs {
|
||||
pub(crate) cloud_requirements: CloudRequirementsLoader,
|
||||
pub(crate) feedback: CodexFeedback,
|
||||
pub(crate) config_warnings: Vec<ConfigWarningNotification>,
|
||||
pub(crate) session_source: SessionSource,
|
||||
}
|
||||
|
||||
impl MessageProcessor {
|
||||
@@ -134,7 +137,9 @@ impl MessageProcessor {
|
||||
cloud_requirements,
|
||||
feedback,
|
||||
config_warnings,
|
||||
session_source,
|
||||
} = args;
|
||||
|
||||
let outgoing = Arc::new(outgoing);
|
||||
let auth_manager = AuthManager::shared(
|
||||
config.codex_home.clone(),
|
||||
@@ -148,7 +153,7 @@ impl MessageProcessor {
|
||||
let thread_manager = Arc::new(ThreadManager::new(
|
||||
config.codex_home.clone(),
|
||||
auth_manager.clone(),
|
||||
SessionSource::VSCode,
|
||||
session_source,
|
||||
));
|
||||
let codex_message_processor = CodexMessageProcessor::new(CodexMessageProcessorArgs {
|
||||
auth_manager,
|
||||
@@ -205,24 +210,33 @@ impl MessageProcessor {
|
||||
}
|
||||
};
|
||||
|
||||
match codex_request {
|
||||
self.process_client_request(codex_request).await;
|
||||
}
|
||||
|
||||
pub(crate) async fn process_client_request(&mut self, codex_request: ClientRequest) {
|
||||
let request_id = codex_request.request_id().clone();
|
||||
match &codex_request {
|
||||
// Handle Initialize internally so CodexMessageProcessor does not have to concern
|
||||
// itself with the `initialized` bool.
|
||||
ClientRequest::Initialize { request_id, params } => {
|
||||
ClientRequest::Initialize {
|
||||
request_id: _,
|
||||
params,
|
||||
} => {
|
||||
if self.initialized {
|
||||
let error = JSONRPCErrorError {
|
||||
code: INVALID_REQUEST_ERROR_CODE,
|
||||
message: "Already initialized".to_string(),
|
||||
data: None,
|
||||
};
|
||||
self.outgoing.send_error(request_id, error).await;
|
||||
self.outgoing.send_error(request_id.clone(), error).await;
|
||||
return;
|
||||
} else {
|
||||
let client_info = params.client_info.clone();
|
||||
let ClientInfo {
|
||||
name,
|
||||
title: _title,
|
||||
version,
|
||||
} = params.client_info;
|
||||
} = client_info;
|
||||
if let Err(error) = set_default_originator(name.clone()) {
|
||||
match error {
|
||||
SetOriginatorError::InvalidHeaderValue => {
|
||||
@@ -233,7 +247,7 @@ impl MessageProcessor {
|
||||
),
|
||||
data: None,
|
||||
};
|
||||
self.outgoing.send_error(request_id, error).await;
|
||||
self.outgoing.send_error(request_id.clone(), error).await;
|
||||
return;
|
||||
}
|
||||
SetOriginatorError::AlreadyInitialized => {
|
||||
@@ -252,7 +266,9 @@ impl MessageProcessor {
|
||||
|
||||
let user_agent = get_codex_user_agent();
|
||||
let response = InitializeResponse { user_agent };
|
||||
self.outgoing.send_response(request_id, response).await;
|
||||
self.outgoing
|
||||
.send_response(request_id.clone(), response)
|
||||
.await;
|
||||
|
||||
self.initialized = true;
|
||||
if !self.config_warnings.is_empty() {
|
||||
@@ -275,7 +291,7 @@ impl MessageProcessor {
|
||||
message: "Not initialized".to_string(),
|
||||
data: None,
|
||||
};
|
||||
self.outgoing.send_error(request_id, error).await;
|
||||
self.outgoing.send_error(request_id.clone(), error).await;
|
||||
return;
|
||||
}
|
||||
}
|
||||
@@ -309,6 +325,10 @@ impl MessageProcessor {
|
||||
tracing::info!("<- notification: {:?}", notification);
|
||||
}
|
||||
|
||||
pub(crate) async fn process_client_notification(&self, notification: ClientNotification) {
|
||||
tracing::info!("<- notification: {:?}", notification);
|
||||
}
|
||||
|
||||
pub(crate) fn thread_created_receiver(&self) -> broadcast::Receiver<ThreadId> {
|
||||
self.codex_message_processor.thread_created_receiver()
|
||||
}
|
||||
@@ -326,7 +346,11 @@ impl MessageProcessor {
|
||||
pub(crate) async fn process_response(&mut self, response: JSONRPCResponse) {
|
||||
tracing::info!("<- response: {:?}", response);
|
||||
let JSONRPCResponse { id, result, .. } = response;
|
||||
self.outgoing.notify_client_response(id, result).await
|
||||
self.process_client_response(id, result).await;
|
||||
}
|
||||
|
||||
pub(crate) async fn process_client_response(&mut self, id: RequestId, result: JsonResult) {
|
||||
self.outgoing.notify_client_response(id, result).await;
|
||||
}
|
||||
|
||||
/// Handle an error object received from the peer.
|
||||
@@ -335,6 +359,10 @@ impl MessageProcessor {
|
||||
self.outgoing.notify_client_error(err.id, err.error).await;
|
||||
}
|
||||
|
||||
pub(crate) fn process_client_error(&mut self, id: RequestId, error: JSONRPCErrorError) {
|
||||
tracing::error!("<- error: {:?}", JSONRPCError { id, error });
|
||||
}
|
||||
|
||||
async fn handle_config_read(&self, request_id: RequestId, params: ConfigReadParams) {
|
||||
match self.config_api.read(params).await {
|
||||
Ok(response) => self.outgoing.send_response(request_id, response).await,
|
||||
|
||||
@@ -656,6 +656,7 @@ async fn turn_start_exec_approval_toggle_v2() -> Result<()> {
|
||||
}],
|
||||
approval_policy: Some(codex_app_server_protocol::AskForApproval::Never),
|
||||
sandbox_policy: Some(codex_app_server_protocol::SandboxPolicy::DangerFullAccess),
|
||||
windows_sandbox_level: None,
|
||||
model: Some("mock-model".to_string()),
|
||||
effort: Some(ReasoningEffort::Medium),
|
||||
summary: Some(ReasoningSummary::Auto),
|
||||
@@ -898,6 +899,7 @@ async fn turn_start_updates_sandbox_and_cwd_between_turns_v2() -> Result<()> {
|
||||
exclude_tmpdir_env_var: false,
|
||||
exclude_slash_tmp: false,
|
||||
}),
|
||||
windows_sandbox_level: None,
|
||||
model: Some("mock-model".to_string()),
|
||||
effort: Some(ReasoningEffort::Medium),
|
||||
summary: Some(ReasoningSummary::Auto),
|
||||
@@ -929,6 +931,7 @@ async fn turn_start_updates_sandbox_and_cwd_between_turns_v2() -> Result<()> {
|
||||
cwd: Some(second_cwd.clone()),
|
||||
approval_policy: Some(codex_app_server_protocol::AskForApproval::Never),
|
||||
sandbox_policy: Some(codex_app_server_protocol::SandboxPolicy::DangerFullAccess),
|
||||
windows_sandbox_level: None,
|
||||
model: Some("mock-model".to_string()),
|
||||
effort: Some(ReasoningEffort::Medium),
|
||||
summary: Some(ReasoningSummary::Auto),
|
||||
|
||||
@@ -44,7 +44,7 @@ pub use mcp_connection_manager::MCP_SANDBOX_STATE_METHOD;
|
||||
pub use mcp_connection_manager::SandboxState;
|
||||
mod mcp_tool_call;
|
||||
mod mentions;
|
||||
mod message_history;
|
||||
pub mod message_history;
|
||||
mod model_provider_info;
|
||||
pub mod parse_command;
|
||||
pub mod path_utils;
|
||||
@@ -98,6 +98,7 @@ pub mod spawn;
|
||||
pub mod state_db;
|
||||
pub mod terminal;
|
||||
mod tools;
|
||||
pub use tools::format_exec_output_str;
|
||||
pub mod turn_diff_tracker;
|
||||
pub use rollout::ARCHIVED_SESSIONS_SUBDIR;
|
||||
pub use rollout::INTERACTIVE_SESSION_SOURCES;
|
||||
@@ -119,6 +120,7 @@ pub use rollout::list::read_head_for_summary;
|
||||
pub use rollout::list::read_session_meta_line;
|
||||
pub use rollout::rollout_date_parts;
|
||||
pub use transport_manager::TransportManager;
|
||||
pub use truncate::TruncationPolicy;
|
||||
mod function_tool;
|
||||
mod state;
|
||||
mod tasks;
|
||||
|
||||
@@ -69,11 +69,7 @@ fn history_filepath(config: &Config) -> PathBuf {
|
||||
/// Append a `text` entry associated with `conversation_id` to the history file. Uses
|
||||
/// advisory file locking to ensure that concurrent writes do not interleave,
|
||||
/// which entails a small amount of blocking I/O internally.
|
||||
pub(crate) async fn append_entry(
|
||||
text: &str,
|
||||
conversation_id: &ThreadId,
|
||||
config: &Config,
|
||||
) -> Result<()> {
|
||||
pub async fn append_entry(text: &str, conversation_id: &ThreadId, config: &Config) -> Result<()> {
|
||||
match config.history.persistence {
|
||||
HistoryPersistence::SaveAll => {
|
||||
// Save everything: proceed.
|
||||
@@ -245,7 +241,7 @@ fn trim_target_bytes(max_bytes: u64, newest_entry_len: u64) -> u64 {
|
||||
|
||||
/// Asynchronously fetch the history file's *identifier* (inode on Unix) and
|
||||
/// the current number of entries by counting newline characters.
|
||||
pub(crate) async fn history_metadata(config: &Config) -> (u64, usize) {
|
||||
pub async fn history_metadata(config: &Config) -> (u64, usize) {
|
||||
let path = history_filepath(config);
|
||||
history_metadata_for_file(&path).await
|
||||
}
|
||||
@@ -258,7 +254,7 @@ pub(crate) async fn history_metadata(config: &Config) -> (u64, usize) {
|
||||
///
|
||||
/// Note this function is not async because it uses a sync advisory file
|
||||
/// locking API.
|
||||
pub(crate) fn lookup(log_id: u64, offset: usize, config: &Config) -> Option<HistoryEntry> {
|
||||
pub fn lookup(log_id: u64, offset: usize, config: &Config) -> Option<HistoryEntry> {
|
||||
let path = history_filepath(config);
|
||||
lookup_history_entry(&path, log_id, offset)
|
||||
}
|
||||
|
||||
@@ -27,6 +27,7 @@ base64 = { workspace = true }
|
||||
chrono = { workspace = true, features = ["serde"] }
|
||||
clap = { workspace = true, features = ["derive"] }
|
||||
codex-ansi-escape = { workspace = true }
|
||||
codex-app-server = { workspace = true }
|
||||
codex-app-server-protocol = { workspace = true }
|
||||
codex-arg0 = { workspace = true }
|
||||
codex-backend-client = { workspace = true }
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -28,6 +28,7 @@ use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
|
||||
use crate::app::App;
|
||||
use crate::app_event::AppServerAction;
|
||||
use crate::history_cell::SessionInfoCell;
|
||||
use crate::history_cell::UserHistoryCell;
|
||||
use crate::pager_overlay::Overlay;
|
||||
@@ -36,7 +37,6 @@ use crate::tui::TuiEvent;
|
||||
use codex_core::protocol::CodexErrorInfo;
|
||||
use codex_core::protocol::ErrorEvent;
|
||||
use codex_core::protocol::EventMsg;
|
||||
use codex_core::protocol::Op;
|
||||
use codex_protocol::ThreadId;
|
||||
use codex_protocol::user_input::TextElement;
|
||||
use color_eyre::eyre::Result;
|
||||
@@ -210,7 +210,8 @@ impl App {
|
||||
selection,
|
||||
thread_id: self.chat_widget.thread_id(),
|
||||
});
|
||||
self.chat_widget.submit_op(Op::ThreadRollback { num_turns });
|
||||
self.chat_widget
|
||||
.submit_action(AppServerAction::ThreadRollback { num_turns });
|
||||
if !prefill.is_empty() || !text_elements.is_empty() || !local_image_paths.is_empty() {
|
||||
self.chat_widget
|
||||
.set_composer_text(prefill, text_elements, local_image_paths);
|
||||
|
||||
@@ -14,19 +14,27 @@ use codex_chatgpt::connectors::AppInfo;
|
||||
use codex_common::approval_presets::ApprovalPreset;
|
||||
use codex_core::protocol::Event;
|
||||
use codex_core::protocol::RateLimitSnapshot;
|
||||
use codex_core::protocol::ReviewDecision;
|
||||
use codex_file_search::FileMatch;
|
||||
use codex_protocol::ThreadId;
|
||||
use codex_protocol::openai_models::ModelPreset;
|
||||
use codex_protocol::request_user_input::RequestUserInputResponse;
|
||||
use codex_protocol::user_input::UserInput;
|
||||
|
||||
use crate::bottom_pane::ApprovalRequest;
|
||||
use crate::history_cell::HistoryCell;
|
||||
|
||||
use codex_core::features::Feature;
|
||||
use codex_core::protocol::AskForApproval;
|
||||
use codex_core::protocol::ElicitationAction;
|
||||
use codex_core::protocol::SandboxPolicy;
|
||||
use codex_protocol::config_types::CollaborationMode;
|
||||
use codex_protocol::config_types::CollaborationModeMask;
|
||||
use codex_protocol::config_types::Personality;
|
||||
use codex_protocol::config_types::ReasoningSummary;
|
||||
use codex_protocol::config_types::WindowsSandboxLevel;
|
||||
use codex_protocol::openai_models::ReasoningEffort;
|
||||
use serde::Serialize;
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
#[cfg_attr(not(target_os = "windows"), allow(dead_code))]
|
||||
@@ -46,10 +54,83 @@ pub(crate) struct ConnectorsSnapshot {
|
||||
pub(crate) connectors: Vec<AppInfo>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, PartialEq)]
|
||||
pub(crate) struct TurnStartRequest {
|
||||
pub(crate) items: Vec<UserInput>,
|
||||
pub(crate) cwd: PathBuf,
|
||||
pub(crate) approval_policy: AskForApproval,
|
||||
pub(crate) sandbox_policy: SandboxPolicy,
|
||||
pub(crate) windows_sandbox_level: WindowsSandboxLevel,
|
||||
pub(crate) model: String,
|
||||
pub(crate) effort: Option<ReasoningEffort>,
|
||||
pub(crate) summary: Option<ReasoningSummary>,
|
||||
pub(crate) collaboration_mode: Option<CollaborationMode>,
|
||||
pub(crate) personality: Option<Personality>,
|
||||
pub(crate) output_schema: Option<serde_json::Value>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, PartialEq)]
|
||||
pub(crate) enum AppServerAction {
|
||||
TurnStart(TurnStartRequest),
|
||||
ThreadSetName {
|
||||
name: String,
|
||||
},
|
||||
ReviewStart {
|
||||
review_request: codex_core::protocol::ReviewRequest,
|
||||
},
|
||||
Interrupt,
|
||||
Shutdown,
|
||||
Compact,
|
||||
ThreadRollback {
|
||||
num_turns: u32,
|
||||
},
|
||||
ListSkills {
|
||||
cwds: Vec<PathBuf>,
|
||||
force_reload: bool,
|
||||
},
|
||||
#[allow(dead_code)]
|
||||
RefreshMcpServers {
|
||||
config: codex_protocol::protocol::McpServerRefreshConfig,
|
||||
},
|
||||
ListMcpTools,
|
||||
ListCustomPrompts,
|
||||
RunUserShellCommand {
|
||||
command: String,
|
||||
},
|
||||
AddToHistory {
|
||||
text: String,
|
||||
},
|
||||
GetHistoryEntry {
|
||||
log_id: u64,
|
||||
offset: usize,
|
||||
},
|
||||
ExecApproval {
|
||||
call_id: String,
|
||||
decision: ReviewDecision,
|
||||
},
|
||||
PatchApproval {
|
||||
call_id: String,
|
||||
decision: ReviewDecision,
|
||||
},
|
||||
UserInputAnswer {
|
||||
call_id: String,
|
||||
response: RequestUserInputResponse,
|
||||
},
|
||||
ResolveElicitation {
|
||||
server_name: String,
|
||||
request_id: mcp_types::RequestId,
|
||||
decision: ElicitationAction,
|
||||
},
|
||||
}
|
||||
|
||||
#[allow(clippy::large_enum_variant)]
|
||||
#[derive(Debug)]
|
||||
pub(crate) enum AppEvent {
|
||||
CodexEvent(Event),
|
||||
CodexThreadEvent {
|
||||
thread_id: ThreadId,
|
||||
event: Event,
|
||||
},
|
||||
/// Open the agent picker for switching active threads.
|
||||
OpenAgentPicker,
|
||||
/// Switch the active thread to the selected agent.
|
||||
@@ -73,11 +154,12 @@ pub(crate) enum AppEvent {
|
||||
Exit(ExitMode),
|
||||
|
||||
/// Request to exit the application due to a fatal error.
|
||||
#[allow(dead_code)]
|
||||
FatalExitRequest(String),
|
||||
|
||||
/// Forward an `Op` to the Agent. Using an `AppEvent` for this avoids
|
||||
/// Forward a command to the app server. Using an `AppEvent` for this avoids
|
||||
/// bubbling channels through layers of widgets.
|
||||
CodexOp(codex_core::protocol::Op),
|
||||
AppServerAction(AppServerAction),
|
||||
|
||||
/// Kick off an asynchronous file search for the given query (text after
|
||||
/// the `@`). Previous searches may be cancelled by the app layer so there
|
||||
@@ -305,8 +387,9 @@ pub(crate) enum ExitMode {
|
||||
ShutdownFirst,
|
||||
/// Exit the UI loop immediately without waiting for shutdown.
|
||||
///
|
||||
/// This skips `Op::Shutdown`, so any in-flight work may be dropped and
|
||||
/// cleanup that normally runs before `ShutdownComplete` can be missed.
|
||||
/// This skips the app-server shutdown request, so any in-flight work may be
|
||||
/// dropped and cleanup that normally runs before `ShutdownComplete` can be
|
||||
/// missed.
|
||||
Immediate,
|
||||
}
|
||||
|
||||
|
||||
@@ -17,8 +17,10 @@ impl AppEventSender {
|
||||
/// error and log it.
|
||||
pub(crate) fn send(&self, event: AppEvent) {
|
||||
// Record inbound events for high-fidelity session replay.
|
||||
// Avoid double-logging Ops; those are logged at the point of submission.
|
||||
if !matches!(event, AppEvent::CodexOp(_)) {
|
||||
// Avoid double-logging app-server actions; those are logged at the point of submission.
|
||||
if let AppEvent::AppServerAction(action) = &event {
|
||||
session_log::log_outbound_app_action(action);
|
||||
} else {
|
||||
session_log::log_inbound_app_event(&event);
|
||||
}
|
||||
if let Err(e) = self.app_event_tx.send(event) {
|
||||
|
||||
613
codex-rs/tui/src/app_server_client.rs
Normal file
613
codex-rs/tui/src/app_server_client.rs
Normal file
@@ -0,0 +1,613 @@
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::AtomicI64;
|
||||
use std::sync::atomic::Ordering;
|
||||
|
||||
use codex_app_server::AppServerClientMessage;
|
||||
use codex_app_server::AppServerEventNotification;
|
||||
use codex_app_server::AppServerMessage;
|
||||
use codex_app_server::spawn_in_memory_typed;
|
||||
use codex_app_server_protocol::ClientNotification;
|
||||
use codex_app_server_protocol::ClientRequest;
|
||||
use codex_app_server_protocol::CommandExecutionApprovalDecision;
|
||||
use codex_app_server_protocol::CommandExecutionRequestApprovalResponse;
|
||||
use codex_app_server_protocol::DynamicToolCallResponse;
|
||||
use codex_app_server_protocol::ElicitationAction as V2ElicitationAction;
|
||||
use codex_app_server_protocol::FileChangeApprovalDecision;
|
||||
use codex_app_server_protocol::FileChangeRequestApprovalResponse;
|
||||
use codex_app_server_protocol::JSONRPCErrorError;
|
||||
use codex_app_server_protocol::RequestId;
|
||||
use codex_app_server_protocol::Result as JsonResult;
|
||||
use codex_app_server_protocol::ServerRequest;
|
||||
use codex_app_server_protocol::ToolRequestUserInputParams;
|
||||
use codex_app_server_protocol::ToolRequestUserInputResponse;
|
||||
use codex_core::config::Config;
|
||||
use codex_core::config_loader::LoaderOverrides;
|
||||
use codex_core::protocol::Event;
|
||||
use codex_core::protocol::EventMsg;
|
||||
use codex_core::protocol::ReviewDecision;
|
||||
use codex_feedback::CodexFeedback;
|
||||
use codex_protocol::ThreadId;
|
||||
use codex_protocol::request_user_input::RequestUserInputResponse as CoreRequestUserInputResponse;
|
||||
use mcp_types::RequestId as McpRequestId;
|
||||
use tokio::sync::Mutex;
|
||||
use tokio::sync::mpsc;
|
||||
use tokio::sync::oneshot;
|
||||
|
||||
use crate::app_event::AppEvent;
|
||||
use crate::app_event_sender::AppEventSender;
|
||||
|
||||
type PendingResponseMap =
|
||||
HashMap<RequestId, oneshot::Sender<std::result::Result<JsonResult, JSONRPCErrorError>>>;
|
||||
|
||||
pub(crate) struct PendingResponse {
|
||||
receiver: oneshot::Receiver<std::result::Result<JsonResult, JSONRPCErrorError>>,
|
||||
}
|
||||
|
||||
impl PendingResponse {
|
||||
pub async fn into_typed<T: serde::de::DeserializeOwned>(
|
||||
self,
|
||||
) -> std::result::Result<T, JSONRPCErrorError> {
|
||||
let value = self
|
||||
.receiver
|
||||
.await
|
||||
.map_err(|_| internal_error("response channel closed"))??;
|
||||
serde_json::from_value(value).map_err(|err| internal_error(err.to_string()))
|
||||
}
|
||||
|
||||
pub async fn discard(self) -> std::result::Result<(), JSONRPCErrorError> {
|
||||
let _ = self
|
||||
.receiver
|
||||
.await
|
||||
.map_err(|_| internal_error("response channel closed"))??;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
struct PendingServerRequests {
|
||||
exec: HashMap<String, RequestId>,
|
||||
patch: HashMap<String, RequestId>,
|
||||
user_input: HashMap<String, RequestId>,
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
struct QueuedResponses {
|
||||
exec: HashMap<String, ReviewDecision>,
|
||||
patch: HashMap<String, ReviewDecision>,
|
||||
user_input: HashMap<String, CoreRequestUserInputResponse>,
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
struct ElicitationState {
|
||||
thread_by_request: HashMap<McpRequestId, ThreadId>,
|
||||
queued: HashMap<McpRequestId, (String, codex_core::protocol::ElicitationAction)>,
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
struct TurnState {
|
||||
current_turn_by_thread: HashMap<ThreadId, String>,
|
||||
}
|
||||
|
||||
pub(crate) struct AppServerClient {
|
||||
sender: mpsc::Sender<AppServerClientMessage>,
|
||||
pending: Arc<Mutex<PendingResponseMap>>,
|
||||
pending_server_requests: Arc<Mutex<PendingServerRequests>>,
|
||||
queued_responses: Arc<Mutex<QueuedResponses>>,
|
||||
elicitation_state: Arc<Mutex<ElicitationState>>,
|
||||
turn_state: Arc<Mutex<TurnState>>,
|
||||
next_request_id: Arc<AtomicI64>,
|
||||
}
|
||||
|
||||
impl AppServerClient {
|
||||
pub(crate) fn spawn(
|
||||
app_event_tx: AppEventSender,
|
||||
config: Arc<Config>,
|
||||
cli_overrides: Vec<(String, toml::Value)>,
|
||||
loader_overrides: LoaderOverrides,
|
||||
feedback: CodexFeedback,
|
||||
config_warnings: Vec<codex_app_server_protocol::ConfigWarningNotification>,
|
||||
session_source: codex_protocol::protocol::SessionSource,
|
||||
) -> Self {
|
||||
let in_process = spawn_in_memory_typed(
|
||||
config.codex_linux_sandbox_exe.clone(),
|
||||
config,
|
||||
cli_overrides,
|
||||
loader_overrides,
|
||||
feedback,
|
||||
config_warnings,
|
||||
session_source,
|
||||
);
|
||||
|
||||
let client = Self {
|
||||
sender: in_process.incoming,
|
||||
pending: Arc::new(Mutex::new(HashMap::new())),
|
||||
pending_server_requests: Arc::new(Mutex::new(PendingServerRequests::default())),
|
||||
queued_responses: Arc::new(Mutex::new(QueuedResponses::default())),
|
||||
elicitation_state: Arc::new(Mutex::new(ElicitationState::default())),
|
||||
turn_state: Arc::new(Mutex::new(TurnState::default())),
|
||||
next_request_id: Arc::new(AtomicI64::new(1)),
|
||||
};
|
||||
|
||||
client.spawn_outgoing_handler(app_event_tx, in_process.outgoing);
|
||||
client
|
||||
}
|
||||
|
||||
pub(crate) async fn request(
|
||||
&self,
|
||||
build: impl FnOnce(RequestId) -> ClientRequest,
|
||||
) -> std::result::Result<PendingResponse, JSONRPCErrorError> {
|
||||
let request_id = RequestId::Integer(self.next_request_id.fetch_add(1, Ordering::Relaxed));
|
||||
let (tx, rx) = oneshot::channel();
|
||||
self.pending.lock().await.insert(request_id.clone(), tx);
|
||||
if self
|
||||
.sender
|
||||
.send(AppServerClientMessage::Request(build(request_id.clone())))
|
||||
.await
|
||||
.is_err()
|
||||
{
|
||||
self.pending.lock().await.remove(&request_id);
|
||||
return Err(internal_error("app-server request channel closed"));
|
||||
}
|
||||
Ok(PendingResponse { receiver: rx })
|
||||
}
|
||||
|
||||
pub(crate) async fn send_notification(
|
||||
&self,
|
||||
notification: ClientNotification,
|
||||
) -> std::result::Result<(), JSONRPCErrorError> {
|
||||
self.sender
|
||||
.send(AppServerClientMessage::Notification(notification))
|
||||
.await
|
||||
.map_err(|_| internal_error("app-server notification channel closed"))
|
||||
}
|
||||
|
||||
pub(crate) async fn respond_exec_approval(
|
||||
&self,
|
||||
call_id: String,
|
||||
decision: ReviewDecision,
|
||||
) -> std::result::Result<(), JSONRPCErrorError> {
|
||||
let request_id = {
|
||||
let mut pending = self.pending_server_requests.lock().await;
|
||||
pending.exec.remove(&call_id)
|
||||
};
|
||||
|
||||
let Some(request_id) = request_id else {
|
||||
self.queued_responses
|
||||
.lock()
|
||||
.await
|
||||
.exec
|
||||
.insert(call_id, decision);
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
let response = CommandExecutionRequestApprovalResponse {
|
||||
decision: map_exec_decision(decision),
|
||||
};
|
||||
self.send_response(request_id, response).await
|
||||
}
|
||||
|
||||
pub(crate) async fn respond_patch_approval(
|
||||
&self,
|
||||
call_id: String,
|
||||
decision: ReviewDecision,
|
||||
) -> std::result::Result<(), JSONRPCErrorError> {
|
||||
let request_id = {
|
||||
let mut pending = self.pending_server_requests.lock().await;
|
||||
pending.patch.remove(&call_id)
|
||||
};
|
||||
|
||||
let Some(request_id) = request_id else {
|
||||
self.queued_responses
|
||||
.lock()
|
||||
.await
|
||||
.patch
|
||||
.insert(call_id, decision);
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
let response = FileChangeRequestApprovalResponse {
|
||||
decision: map_patch_decision(decision),
|
||||
};
|
||||
self.send_response(request_id, response).await
|
||||
}
|
||||
|
||||
pub(crate) async fn respond_user_input(
|
||||
&self,
|
||||
call_id: String,
|
||||
response: CoreRequestUserInputResponse,
|
||||
) -> std::result::Result<(), JSONRPCErrorError> {
|
||||
let request_id = {
|
||||
let mut pending = self.pending_server_requests.lock().await;
|
||||
pending.user_input.remove(&call_id)
|
||||
};
|
||||
|
||||
let Some(request_id) = request_id else {
|
||||
self.queued_responses
|
||||
.lock()
|
||||
.await
|
||||
.user_input
|
||||
.insert(call_id, response);
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
let response = ToolRequestUserInputResponse {
|
||||
answers: response
|
||||
.answers
|
||||
.into_iter()
|
||||
.map(|(id, answer)| {
|
||||
(
|
||||
id,
|
||||
codex_app_server_protocol::ToolRequestUserInputAnswer {
|
||||
answers: answer.answers,
|
||||
},
|
||||
)
|
||||
})
|
||||
.collect(),
|
||||
};
|
||||
|
||||
self.send_response(request_id, response).await
|
||||
}
|
||||
|
||||
pub(crate) async fn respond_elicitation(
|
||||
&self,
|
||||
server_name: String,
|
||||
request_id: McpRequestId,
|
||||
decision: codex_core::protocol::ElicitationAction,
|
||||
) -> std::result::Result<(), JSONRPCErrorError> {
|
||||
let thread_id = {
|
||||
let mut state = self.elicitation_state.lock().await;
|
||||
state.thread_by_request.remove(&request_id)
|
||||
};
|
||||
|
||||
let Some(thread_id) = thread_id else {
|
||||
self.elicitation_state
|
||||
.lock()
|
||||
.await
|
||||
.queued
|
||||
.insert(request_id, (server_name, decision));
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
let params = codex_app_server_protocol::McpElicitationResolveParams {
|
||||
thread_id: thread_id.to_string(),
|
||||
server_name,
|
||||
request_id: request_id.clone(),
|
||||
decision: V2ElicitationAction::from(decision),
|
||||
};
|
||||
|
||||
self.request(|id| ClientRequest::McpElicitationResolve {
|
||||
request_id: id,
|
||||
params,
|
||||
})
|
||||
.await?
|
||||
.discard()
|
||||
.await
|
||||
}
|
||||
|
||||
pub(crate) async fn interrupt_current_turn(
|
||||
&self,
|
||||
thread_id: ThreadId,
|
||||
) -> std::result::Result<(), JSONRPCErrorError> {
|
||||
let turn_id = {
|
||||
let state = self.turn_state.lock().await;
|
||||
state.current_turn_by_thread.get(&thread_id).cloned()
|
||||
};
|
||||
let Some(turn_id) = turn_id else {
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
let params = codex_app_server_protocol::TurnInterruptParams {
|
||||
thread_id: thread_id.to_string(),
|
||||
turn_id,
|
||||
};
|
||||
self.request(|id| ClientRequest::TurnInterrupt {
|
||||
request_id: id,
|
||||
params,
|
||||
})
|
||||
.await?
|
||||
.discard()
|
||||
.await
|
||||
}
|
||||
|
||||
async fn send_response<T: serde::Serialize>(
|
||||
&self,
|
||||
request_id: RequestId,
|
||||
response: T,
|
||||
) -> std::result::Result<(), JSONRPCErrorError> {
|
||||
let result =
|
||||
serde_json::to_value(response).map_err(|err| internal_error(err.to_string()))?;
|
||||
self.sender
|
||||
.send(AppServerClientMessage::Response {
|
||||
id: request_id,
|
||||
result,
|
||||
})
|
||||
.await
|
||||
.map_err(|_| internal_error("app-server response channel closed"))
|
||||
}
|
||||
|
||||
fn spawn_outgoing_handler(
|
||||
&self,
|
||||
app_event_tx: AppEventSender,
|
||||
mut outgoing: mpsc::Receiver<AppServerMessage>,
|
||||
) {
|
||||
let pending = Arc::clone(&self.pending);
|
||||
let pending_server_requests = Arc::clone(&self.pending_server_requests);
|
||||
let queued_responses = Arc::clone(&self.queued_responses);
|
||||
let elicitation_state = Arc::clone(&self.elicitation_state);
|
||||
let turn_state = Arc::clone(&self.turn_state);
|
||||
let sender = self.sender.clone();
|
||||
let next_request_id = Arc::clone(&self.next_request_id);
|
||||
|
||||
tokio::spawn(async move {
|
||||
while let Some(message) = outgoing.recv().await {
|
||||
match message {
|
||||
AppServerMessage::EventNotification(notification) => {
|
||||
handle_event_notification(
|
||||
notification,
|
||||
&app_event_tx,
|
||||
&turn_state,
|
||||
&elicitation_state,
|
||||
&sender,
|
||||
next_request_id.as_ref(),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
AppServerMessage::Request(request) => {
|
||||
handle_server_request(
|
||||
request,
|
||||
&pending_server_requests,
|
||||
&queued_responses,
|
||||
&sender,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
AppServerMessage::Response { id, result } => {
|
||||
if let Some(tx) = pending.lock().await.remove(&id) {
|
||||
let _ = tx.send(Ok(result));
|
||||
}
|
||||
}
|
||||
AppServerMessage::Error { id, error } => {
|
||||
if let Some(tx) = pending.lock().await.remove(&id) {
|
||||
let _ = tx.send(Err(error));
|
||||
}
|
||||
}
|
||||
AppServerMessage::Notification(_notification) => {
|
||||
// v2 notifications are currently surfaced through codex/event
|
||||
// for the TUI, so ignore explicit server notifications here.
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_event_notification(
|
||||
notification: AppServerEventNotification,
|
||||
app_event_tx: &AppEventSender,
|
||||
turn_state: &Mutex<TurnState>,
|
||||
elicitation_state: &Mutex<ElicitationState>,
|
||||
sender: &mpsc::Sender<AppServerClientMessage>,
|
||||
next_request_id: &AtomicI64,
|
||||
) {
|
||||
if !notification.method.starts_with("codex/event/") {
|
||||
return;
|
||||
}
|
||||
let Some(params) = notification.params else {
|
||||
return;
|
||||
};
|
||||
let serde_json::Value::Object(mut map) = params else {
|
||||
return;
|
||||
};
|
||||
let Some(conversation_id) = map.remove("conversationId") else {
|
||||
return;
|
||||
};
|
||||
let thread_id = match conversation_id.as_str() {
|
||||
Some(value) => match ThreadId::from_string(value) {
|
||||
Ok(thread_id) => thread_id,
|
||||
Err(err) => {
|
||||
tracing::warn!("invalid thread id in event: {err}");
|
||||
return;
|
||||
}
|
||||
},
|
||||
None => return,
|
||||
};
|
||||
|
||||
let event_value = serde_json::Value::Object(map);
|
||||
let event: Event = match serde_json::from_value(event_value) {
|
||||
Ok(event) => event,
|
||||
Err(err) => {
|
||||
tracing::warn!("failed to parse codex event: {err}");
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
match &event.msg {
|
||||
EventMsg::TurnStarted(_) => {
|
||||
if !event.id.is_empty() {
|
||||
turn_state
|
||||
.lock()
|
||||
.await
|
||||
.current_turn_by_thread
|
||||
.insert(thread_id, event.id.clone());
|
||||
}
|
||||
}
|
||||
EventMsg::TurnComplete(_) | EventMsg::TurnAborted(_) => {
|
||||
if !event.id.is_empty() {
|
||||
let mut state = turn_state.lock().await;
|
||||
if state
|
||||
.current_turn_by_thread
|
||||
.get(&thread_id)
|
||||
.is_some_and(|id| id == &event.id)
|
||||
{
|
||||
state.current_turn_by_thread.remove(&thread_id);
|
||||
}
|
||||
}
|
||||
}
|
||||
EventMsg::ElicitationRequest(ev) => {
|
||||
let mut state = elicitation_state.lock().await;
|
||||
state.thread_by_request.insert(ev.id.clone(), thread_id);
|
||||
if let Some((server_name, decision)) = state.queued.remove(&ev.id) {
|
||||
let params = codex_app_server_protocol::McpElicitationResolveParams {
|
||||
thread_id: thread_id.to_string(),
|
||||
server_name,
|
||||
request_id: ev.id.clone(),
|
||||
decision: V2ElicitationAction::from(decision),
|
||||
};
|
||||
let request_id =
|
||||
RequestId::Integer(next_request_id.fetch_add(1, Ordering::Relaxed));
|
||||
let request = ClientRequest::McpElicitationResolve { request_id, params };
|
||||
let _ = sender.send(AppServerClientMessage::Request(request)).await;
|
||||
}
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
|
||||
app_event_tx.send(AppEvent::CodexThreadEvent { thread_id, event });
|
||||
}
|
||||
|
||||
async fn handle_server_request(
|
||||
request: ServerRequest,
|
||||
pending: &Mutex<PendingServerRequests>,
|
||||
queued: &Mutex<QueuedResponses>,
|
||||
sender: &mpsc::Sender<AppServerClientMessage>,
|
||||
) {
|
||||
match request {
|
||||
ServerRequest::CommandExecutionRequestApproval { request_id, params } => {
|
||||
record_server_request(
|
||||
request_id,
|
||||
params.item_id,
|
||||
&mut pending.lock().await.exec,
|
||||
&mut queued.lock().await.exec,
|
||||
|decision| CommandExecutionRequestApprovalResponse {
|
||||
decision: map_exec_decision(decision),
|
||||
},
|
||||
sender,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
ServerRequest::FileChangeRequestApproval { request_id, params } => {
|
||||
record_server_request(
|
||||
request_id,
|
||||
params.item_id,
|
||||
&mut pending.lock().await.patch,
|
||||
&mut queued.lock().await.patch,
|
||||
|decision| FileChangeRequestApprovalResponse {
|
||||
decision: map_patch_decision(decision),
|
||||
},
|
||||
sender,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
ServerRequest::ToolRequestUserInput { request_id, params } => {
|
||||
record_user_input_request(request_id, params, pending, queued, sender).await;
|
||||
}
|
||||
ServerRequest::DynamicToolCall { request_id, params } => {
|
||||
let response = DynamicToolCallResponse {
|
||||
output: "Dynamic tools are not supported in the TUI yet.".to_string(),
|
||||
success: false,
|
||||
};
|
||||
let _ = send_response(sender, request_id, response).await;
|
||||
tracing::warn!(
|
||||
"dynamic tool call {} for tool {} ignored",
|
||||
params.call_id,
|
||||
params.tool
|
||||
);
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
|
||||
async fn record_user_input_request(
|
||||
request_id: RequestId,
|
||||
params: ToolRequestUserInputParams,
|
||||
pending: &Mutex<PendingServerRequests>,
|
||||
queued: &Mutex<QueuedResponses>,
|
||||
sender: &mpsc::Sender<AppServerClientMessage>,
|
||||
) {
|
||||
let call_id = params.item_id;
|
||||
let mut pending_guard = pending.lock().await;
|
||||
if let Some(response) = queued.lock().await.user_input.remove(&call_id) {
|
||||
drop(pending_guard);
|
||||
let response = ToolRequestUserInputResponse {
|
||||
answers: response
|
||||
.answers
|
||||
.into_iter()
|
||||
.map(|(id, answer)| {
|
||||
(
|
||||
id,
|
||||
codex_app_server_protocol::ToolRequestUserInputAnswer {
|
||||
answers: answer.answers,
|
||||
},
|
||||
)
|
||||
})
|
||||
.collect(),
|
||||
};
|
||||
let _ = send_response(sender, request_id, response).await;
|
||||
} else {
|
||||
pending_guard.user_input.insert(call_id, request_id);
|
||||
}
|
||||
}
|
||||
|
||||
async fn record_server_request<T: serde::Serialize>(
|
||||
request_id: RequestId,
|
||||
call_id: String,
|
||||
pending: &mut HashMap<String, RequestId>,
|
||||
queued: &mut HashMap<String, ReviewDecision>,
|
||||
build_response: impl FnOnce(ReviewDecision) -> T,
|
||||
sender: &mpsc::Sender<AppServerClientMessage>,
|
||||
) {
|
||||
if let Some(decision) = queued.remove(&call_id) {
|
||||
let response = build_response(decision);
|
||||
let _ = send_response(sender, request_id, response).await;
|
||||
} else {
|
||||
pending.insert(call_id, request_id);
|
||||
}
|
||||
}
|
||||
|
||||
async fn send_response<T: serde::Serialize>(
|
||||
sender: &mpsc::Sender<AppServerClientMessage>,
|
||||
request_id: RequestId,
|
||||
response: T,
|
||||
) -> std::result::Result<(), JSONRPCErrorError> {
|
||||
let result = serde_json::to_value(response).map_err(|err| internal_error(err.to_string()))?;
|
||||
sender
|
||||
.send(AppServerClientMessage::Response {
|
||||
id: request_id,
|
||||
result,
|
||||
})
|
||||
.await
|
||||
.map_err(|_| internal_error("app-server response channel closed"))
|
||||
}
|
||||
|
||||
fn internal_error(message: impl Into<String>) -> JSONRPCErrorError {
|
||||
JSONRPCErrorError {
|
||||
code: -32603,
|
||||
message: message.into(),
|
||||
data: None,
|
||||
}
|
||||
}
|
||||
|
||||
fn map_exec_decision(decision: ReviewDecision) -> CommandExecutionApprovalDecision {
|
||||
match decision {
|
||||
ReviewDecision::Approved => CommandExecutionApprovalDecision::Accept,
|
||||
ReviewDecision::ApprovedForSession => CommandExecutionApprovalDecision::AcceptForSession,
|
||||
ReviewDecision::ApprovedExecpolicyAmendment {
|
||||
proposed_execpolicy_amendment,
|
||||
} => CommandExecutionApprovalDecision::AcceptWithExecpolicyAmendment {
|
||||
execpolicy_amendment: proposed_execpolicy_amendment.into(),
|
||||
},
|
||||
ReviewDecision::Denied => CommandExecutionApprovalDecision::Decline,
|
||||
ReviewDecision::Abort => CommandExecutionApprovalDecision::Cancel,
|
||||
}
|
||||
}
|
||||
|
||||
fn map_patch_decision(decision: ReviewDecision) -> FileChangeApprovalDecision {
|
||||
match decision {
|
||||
ReviewDecision::Approved => FileChangeApprovalDecision::Accept,
|
||||
ReviewDecision::ApprovedForSession => FileChangeApprovalDecision::AcceptForSession,
|
||||
ReviewDecision::ApprovedExecpolicyAmendment { .. } => FileChangeApprovalDecision::Accept,
|
||||
ReviewDecision::Denied => FileChangeApprovalDecision::Decline,
|
||||
ReviewDecision::Abort => FileChangeApprovalDecision::Cancel,
|
||||
}
|
||||
}
|
||||
@@ -2,6 +2,7 @@ use std::collections::HashMap;
|
||||
use std::path::PathBuf;
|
||||
|
||||
use crate::app_event::AppEvent;
|
||||
use crate::app_event::AppServerAction;
|
||||
use crate::app_event_sender::AppEventSender;
|
||||
use crate::bottom_pane::BottomPaneView;
|
||||
use crate::bottom_pane::CancellationEvent;
|
||||
@@ -21,7 +22,6 @@ use codex_core::features::Features;
|
||||
use codex_core::protocol::ElicitationAction;
|
||||
use codex_core::protocol::ExecPolicyAmendment;
|
||||
use codex_core::protocol::FileChange;
|
||||
use codex_core::protocol::Op;
|
||||
use codex_core::protocol::ReviewDecision;
|
||||
use crossterm::event::KeyCode;
|
||||
use crossterm::event::KeyEvent;
|
||||
@@ -194,17 +194,19 @@ impl ApprovalOverlay {
|
||||
fn handle_exec_decision(&self, id: &str, command: &[String], decision: ReviewDecision) {
|
||||
let cell = history_cell::new_approval_decision_cell(command.to_vec(), decision.clone());
|
||||
self.app_event_tx.send(AppEvent::InsertHistoryCell(cell));
|
||||
self.app_event_tx.send(AppEvent::CodexOp(Op::ExecApproval {
|
||||
id: id.to_string(),
|
||||
decision,
|
||||
}));
|
||||
self.app_event_tx
|
||||
.send(AppEvent::AppServerAction(AppServerAction::ExecApproval {
|
||||
call_id: id.to_string(),
|
||||
decision,
|
||||
}));
|
||||
}
|
||||
|
||||
fn handle_patch_decision(&self, id: &str, decision: ReviewDecision) {
|
||||
self.app_event_tx.send(AppEvent::CodexOp(Op::PatchApproval {
|
||||
id: id.to_string(),
|
||||
decision,
|
||||
}));
|
||||
self.app_event_tx
|
||||
.send(AppEvent::AppServerAction(AppServerAction::PatchApproval {
|
||||
call_id: id.to_string(),
|
||||
decision,
|
||||
}));
|
||||
}
|
||||
|
||||
fn handle_elicitation_decision(
|
||||
@@ -213,12 +215,13 @@ impl ApprovalOverlay {
|
||||
request_id: &RequestId,
|
||||
decision: ElicitationAction,
|
||||
) {
|
||||
self.app_event_tx
|
||||
.send(AppEvent::CodexOp(Op::ResolveElicitation {
|
||||
self.app_event_tx.send(AppEvent::AppServerAction(
|
||||
AppServerAction::ResolveElicitation {
|
||||
server_name: server_name.to_string(),
|
||||
request_id: request_id.clone(),
|
||||
decision,
|
||||
}));
|
||||
},
|
||||
));
|
||||
}
|
||||
|
||||
fn advance_queue(&mut self) {
|
||||
@@ -540,6 +543,7 @@ fn elicitation_options() -> Vec<ApprovalOption> {
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::app_event::AppEvent;
|
||||
use crate::app_event::AppServerAction;
|
||||
use pretty_assertions::assert_eq;
|
||||
use tokio::sync::mpsc::unbounded_channel;
|
||||
|
||||
@@ -570,15 +574,15 @@ mod tests {
|
||||
let mut view = ApprovalOverlay::new(make_exec_request(), tx, Features::with_defaults());
|
||||
assert!(!view.is_complete());
|
||||
view.handle_key_event(KeyEvent::new(KeyCode::Char('y'), KeyModifiers::NONE));
|
||||
// We expect at least one CodexOp message in the queue.
|
||||
let mut saw_op = false;
|
||||
// We expect at least one app-server action message in the queue.
|
||||
let mut saw_action = false;
|
||||
while let Ok(ev) = rx.try_recv() {
|
||||
if matches!(ev, AppEvent::CodexOp(_)) {
|
||||
saw_op = true;
|
||||
if matches!(ev, AppEvent::AppServerAction(_)) {
|
||||
saw_action = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
assert!(saw_op, "expected approval decision to emit an op");
|
||||
assert!(saw_action, "expected approval decision to emit an action");
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -598,9 +602,9 @@ mod tests {
|
||||
Features::with_defaults(),
|
||||
);
|
||||
view.handle_key_event(KeyEvent::new(KeyCode::Char('p'), KeyModifiers::NONE));
|
||||
let mut saw_op = false;
|
||||
let mut saw_action = false;
|
||||
while let Ok(ev) = rx.try_recv() {
|
||||
if let AppEvent::CodexOp(Op::ExecApproval { decision, .. }) = ev {
|
||||
if let AppEvent::AppServerAction(AppServerAction::ExecApproval { decision, .. }) = ev {
|
||||
assert_eq!(
|
||||
decision,
|
||||
ReviewDecision::ApprovedExecpolicyAmendment {
|
||||
@@ -609,13 +613,13 @@ mod tests {
|
||||
])
|
||||
}
|
||||
);
|
||||
saw_op = true;
|
||||
saw_action = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
assert!(
|
||||
saw_op,
|
||||
"expected approval decision to emit an op with command prefix"
|
||||
saw_action,
|
||||
"expected approval decision to emit an action with command prefix"
|
||||
);
|
||||
}
|
||||
|
||||
@@ -717,7 +721,10 @@ mod tests {
|
||||
|
||||
let mut decision = None;
|
||||
while let Ok(ev) = rx.try_recv() {
|
||||
if let AppEvent::CodexOp(Op::ExecApproval { decision: d, .. }) = ev {
|
||||
if let AppEvent::AppServerAction(AppServerAction::ExecApproval {
|
||||
decision: d, ..
|
||||
}) = ev
|
||||
{
|
||||
decision = Some(d);
|
||||
break;
|
||||
}
|
||||
|
||||
@@ -2,8 +2,8 @@ use std::collections::HashMap;
|
||||
use std::path::PathBuf;
|
||||
|
||||
use crate::app_event::AppEvent;
|
||||
use crate::app_event::AppServerAction;
|
||||
use crate::app_event_sender::AppEventSender;
|
||||
use codex_core::protocol::Op;
|
||||
use codex_protocol::user_input::TextElement;
|
||||
|
||||
#[derive(Debug, Clone, PartialEq)]
|
||||
@@ -213,11 +213,12 @@ impl ChatComposerHistory {
|
||||
self.last_history_text = Some(entry.text.clone());
|
||||
return Some(entry);
|
||||
} else if let Some(log_id) = self.history_log_id {
|
||||
let op = Op::GetHistoryEntryRequest {
|
||||
offset: global_idx,
|
||||
log_id,
|
||||
};
|
||||
app_event_tx.send(AppEvent::CodexOp(op));
|
||||
app_event_tx.send(AppEvent::AppServerAction(
|
||||
AppServerAction::GetHistoryEntry {
|
||||
log_id,
|
||||
offset: global_idx,
|
||||
},
|
||||
));
|
||||
}
|
||||
None
|
||||
}
|
||||
@@ -227,7 +228,7 @@ impl ChatComposerHistory {
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::app_event::AppEvent;
|
||||
use codex_core::protocol::Op;
|
||||
use crate::app_event::AppServerAction;
|
||||
use tokio::sync::mpsc::unbounded_channel;
|
||||
|
||||
#[test]
|
||||
@@ -272,15 +273,15 @@ mod tests {
|
||||
assert!(history.should_handle_navigation("", 0));
|
||||
assert!(history.navigate_up(&tx).is_none()); // don't replace the text yet
|
||||
|
||||
// Verify that an AppEvent::CodexOp with the correct GetHistoryEntryRequest was sent.
|
||||
// Verify that an AppServerAction with the correct GetHistoryEntry request was sent.
|
||||
let event = rx.try_recv().expect("expected AppEvent to be sent");
|
||||
let AppEvent::CodexOp(history_request1) = event else {
|
||||
let AppEvent::AppServerAction(history_request1) = event else {
|
||||
panic!("unexpected event variant");
|
||||
};
|
||||
assert_eq!(
|
||||
Op::GetHistoryEntryRequest {
|
||||
AppServerAction::GetHistoryEntry {
|
||||
log_id: 1,
|
||||
offset: 2
|
||||
offset: 2,
|
||||
},
|
||||
history_request1
|
||||
);
|
||||
@@ -294,15 +295,15 @@ mod tests {
|
||||
// Next Up should move to offset 1.
|
||||
assert!(history.navigate_up(&tx).is_none()); // don't replace the text yet
|
||||
|
||||
// Verify second CodexOp event for offset 1.
|
||||
// Verify second AppServerAction event for offset 1.
|
||||
let event2 = rx.try_recv().expect("expected second event");
|
||||
let AppEvent::CodexOp(history_request_2) = event2 else {
|
||||
let AppEvent::AppServerAction(history_request_2) = event2 else {
|
||||
panic!("unexpected event variant");
|
||||
};
|
||||
assert_eq!(
|
||||
Op::GetHistoryEntryRequest {
|
||||
AppServerAction::GetHistoryEntry {
|
||||
log_id: 1,
|
||||
offset: 1
|
||||
offset: 1,
|
||||
},
|
||||
history_request_2
|
||||
);
|
||||
|
||||
@@ -326,7 +326,7 @@ impl BottomPane {
|
||||
&& !self.composer.popup_active()
|
||||
&& let Some(status) = &self.status
|
||||
{
|
||||
// Send Op::Interrupt
|
||||
// Send interrupt request
|
||||
status.interrupt();
|
||||
self.request_redraw();
|
||||
return InputResult::None;
|
||||
@@ -832,7 +832,7 @@ impl Renderable for BottomPane {
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::app_event::AppEvent;
|
||||
use codex_core::protocol::Op;
|
||||
use crate::app_event::AppServerAction;
|
||||
use codex_protocol::protocol::SkillScope;
|
||||
use crossterm::event::KeyModifiers;
|
||||
use insta::assert_snapshot;
|
||||
@@ -1198,8 +1198,8 @@ mod tests {
|
||||
|
||||
while let Ok(ev) = rx.try_recv() {
|
||||
assert!(
|
||||
!matches!(ev, AppEvent::CodexOp(Op::Interrupt)),
|
||||
"expected Esc to not send Op::Interrupt when dismissing skill popup"
|
||||
!matches!(ev, AppEvent::AppServerAction(AppServerAction::Interrupt)),
|
||||
"expected Esc to not send interrupt when dismissing skill popup"
|
||||
);
|
||||
}
|
||||
assert!(
|
||||
@@ -1236,8 +1236,8 @@ mod tests {
|
||||
|
||||
while let Ok(ev) = rx.try_recv() {
|
||||
assert!(
|
||||
!matches!(ev, AppEvent::CodexOp(Op::Interrupt)),
|
||||
"expected Esc to not send Op::Interrupt while command popup is active"
|
||||
!matches!(ev, AppEvent::AppServerAction(AppServerAction::Interrupt)),
|
||||
"expected Esc to not send interrupt while command popup is active"
|
||||
);
|
||||
}
|
||||
assert_eq!(pane.composer_text(), "/");
|
||||
@@ -1263,8 +1263,11 @@ mod tests {
|
||||
pane.handle_key_event(KeyEvent::new(KeyCode::Esc, KeyModifiers::NONE));
|
||||
|
||||
assert!(
|
||||
matches!(rx.try_recv(), Ok(AppEvent::CodexOp(Op::Interrupt))),
|
||||
"expected Esc to send Op::Interrupt while a task is running"
|
||||
matches!(
|
||||
rx.try_recv(),
|
||||
Ok(AppEvent::AppServerAction(AppServerAction::Interrupt))
|
||||
),
|
||||
"expected Esc to send interrupt while a task is running"
|
||||
);
|
||||
}
|
||||
|
||||
|
||||
@@ -18,6 +18,7 @@ mod layout;
|
||||
mod render;
|
||||
|
||||
use crate::app_event::AppEvent;
|
||||
use crate::app_event::AppServerAction;
|
||||
use crate::app_event_sender::AppEventSender;
|
||||
use crate::bottom_pane::CancellationEvent;
|
||||
use crate::bottom_pane::ChatComposer;
|
||||
@@ -29,7 +30,6 @@ use crate::bottom_pane::selection_popup_common::GenericDisplayRow;
|
||||
use crate::bottom_pane::selection_popup_common::measure_rows_height;
|
||||
use crate::render::renderable::Renderable;
|
||||
|
||||
use codex_core::protocol::Op;
|
||||
use codex_protocol::request_user_input::RequestUserInputAnswer;
|
||||
use codex_protocol::request_user_input::RequestUserInputEvent;
|
||||
use codex_protocol::request_user_input::RequestUserInputResponse;
|
||||
@@ -719,11 +719,12 @@ impl RequestUserInputOverlay {
|
||||
},
|
||||
);
|
||||
}
|
||||
self.app_event_tx
|
||||
.send(AppEvent::CodexOp(Op::UserInputAnswer {
|
||||
id: self.request.turn_id.clone(),
|
||||
self.app_event_tx.send(AppEvent::AppServerAction(
|
||||
AppServerAction::UserInputAnswer {
|
||||
call_id: self.request.call_id.clone(),
|
||||
response: RequestUserInputResponse { answers },
|
||||
}));
|
||||
},
|
||||
));
|
||||
if let Some(next) = self.queue.pop_front() {
|
||||
self.request = next;
|
||||
self.reset_for_request();
|
||||
@@ -966,7 +967,8 @@ impl BottomPaneView for RequestUserInputOverlay {
|
||||
}
|
||||
|
||||
if matches!(key_event.code, KeyCode::Esc) {
|
||||
self.app_event_tx.send(AppEvent::CodexOp(Op::Interrupt));
|
||||
self.app_event_tx
|
||||
.send(AppEvent::AppServerAction(AppServerAction::Interrupt));
|
||||
self.done = true;
|
||||
return;
|
||||
}
|
||||
@@ -1173,7 +1175,8 @@ impl BottomPaneView for RequestUserInputOverlay {
|
||||
fn on_ctrl_c(&mut self) -> CancellationEvent {
|
||||
if self.confirm_unanswered_active() {
|
||||
self.close_unanswered_confirmation();
|
||||
self.app_event_tx.send(AppEvent::CodexOp(Op::Interrupt));
|
||||
self.app_event_tx
|
||||
.send(AppEvent::AppServerAction(AppServerAction::Interrupt));
|
||||
self.done = true;
|
||||
return CancellationEvent::Handled;
|
||||
}
|
||||
@@ -1182,7 +1185,8 @@ impl BottomPaneView for RequestUserInputOverlay {
|
||||
return CancellationEvent::Handled;
|
||||
}
|
||||
|
||||
self.app_event_tx.send(AppEvent::CodexOp(Op::Interrupt));
|
||||
self.app_event_tx
|
||||
.send(AppEvent::AppServerAction(AppServerAction::Interrupt));
|
||||
self.done = true;
|
||||
CancellationEvent::Handled
|
||||
}
|
||||
@@ -1403,10 +1407,12 @@ mod tests {
|
||||
overlay.submit_answers();
|
||||
|
||||
let event = rx.try_recv().expect("expected AppEvent");
|
||||
let AppEvent::CodexOp(Op::UserInputAnswer { id, response }) = event else {
|
||||
let AppEvent::AppServerAction(AppServerAction::UserInputAnswer { call_id, response }) =
|
||||
event
|
||||
else {
|
||||
panic!("expected UserInputAnswer");
|
||||
};
|
||||
assert_eq!(id, "turn-1");
|
||||
assert_eq!(call_id, "call-1");
|
||||
let answer = response.answers.get("q1").expect("answer missing");
|
||||
assert_eq!(answer.answers, Vec::<String>::new());
|
||||
}
|
||||
@@ -1425,7 +1431,8 @@ mod tests {
|
||||
overlay.handle_key_event(KeyEvent::from(KeyCode::Enter));
|
||||
|
||||
let event = rx.try_recv().expect("expected AppEvent");
|
||||
let AppEvent::CodexOp(Op::UserInputAnswer { response, .. }) = event else {
|
||||
let AppEvent::AppServerAction(AppServerAction::UserInputAnswer { response, .. }) = event
|
||||
else {
|
||||
panic!("expected UserInputAnswer");
|
||||
};
|
||||
let answer = response.answers.get("q1").expect("answer missing");
|
||||
@@ -1458,7 +1465,8 @@ mod tests {
|
||||
|
||||
overlay.handle_key_event(KeyEvent::from(KeyCode::Enter));
|
||||
let event = rx.try_recv().expect("expected AppEvent");
|
||||
let AppEvent::CodexOp(Op::UserInputAnswer { response, .. }) = event else {
|
||||
let AppEvent::AppServerAction(AppServerAction::UserInputAnswer { response, .. }) = event
|
||||
else {
|
||||
panic!("expected UserInputAnswer");
|
||||
};
|
||||
let answer = response.answers.get("q1").expect("answer missing");
|
||||
@@ -1479,7 +1487,8 @@ mod tests {
|
||||
overlay.handle_key_event(KeyEvent::from(KeyCode::Char('2')));
|
||||
|
||||
let event = rx.try_recv().expect("expected AppEvent");
|
||||
let AppEvent::CodexOp(Op::UserInputAnswer { response, .. }) = event else {
|
||||
let AppEvent::AppServerAction(AppServerAction::UserInputAnswer { response, .. }) = event
|
||||
else {
|
||||
panic!("expected UserInputAnswer");
|
||||
};
|
||||
let answer = response.answers.get("q1").expect("answer missing");
|
||||
@@ -1634,7 +1643,8 @@ mod tests {
|
||||
overlay.handle_key_event(KeyEvent::from(KeyCode::Enter));
|
||||
|
||||
let event = rx.try_recv().expect("expected AppEvent");
|
||||
let AppEvent::CodexOp(Op::UserInputAnswer { response, .. }) = event else {
|
||||
let AppEvent::AppServerAction(AppServerAction::UserInputAnswer { response, .. }) = event
|
||||
else {
|
||||
panic!("expected UserInputAnswer");
|
||||
};
|
||||
let answer = response.answers.get("q1").expect("answer missing");
|
||||
@@ -1658,10 +1668,10 @@ mod tests {
|
||||
|
||||
assert_eq!(overlay.done, true);
|
||||
let event = rx.try_recv().expect("expected AppEvent");
|
||||
let AppEvent::CodexOp(op) = event else {
|
||||
panic!("expected CodexOp");
|
||||
let AppEvent::AppServerAction(action) = event else {
|
||||
panic!("expected AppServerAction");
|
||||
};
|
||||
assert_eq!(op, Op::Interrupt);
|
||||
assert_eq!(action, AppServerAction::Interrupt);
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -1679,10 +1689,10 @@ mod tests {
|
||||
|
||||
assert_eq!(overlay.done, true);
|
||||
let event = rx.try_recv().expect("expected AppEvent");
|
||||
let AppEvent::CodexOp(op) = event else {
|
||||
panic!("expected CodexOp");
|
||||
let AppEvent::AppServerAction(action) = event else {
|
||||
panic!("expected AppServerAction");
|
||||
};
|
||||
assert_eq!(op, Op::Interrupt);
|
||||
assert_eq!(action, AppServerAction::Interrupt);
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -1703,10 +1713,10 @@ mod tests {
|
||||
|
||||
assert_eq!(overlay.done, true);
|
||||
let event = rx.try_recv().expect("expected AppEvent");
|
||||
let AppEvent::CodexOp(op) = event else {
|
||||
panic!("expected CodexOp");
|
||||
let AppEvent::AppServerAction(action) = event else {
|
||||
panic!("expected AppServerAction");
|
||||
};
|
||||
assert_eq!(op, Op::Interrupt);
|
||||
assert_eq!(action, AppServerAction::Interrupt);
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -1728,10 +1738,10 @@ mod tests {
|
||||
|
||||
assert_eq!(overlay.done, true);
|
||||
let event = rx.try_recv().expect("expected AppEvent");
|
||||
let AppEvent::CodexOp(op) = event else {
|
||||
panic!("expected CodexOp");
|
||||
let AppEvent::AppServerAction(action) = event else {
|
||||
panic!("expected AppServerAction");
|
||||
};
|
||||
assert_eq!(op, Op::Interrupt);
|
||||
assert_eq!(action, AppServerAction::Interrupt);
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -1906,7 +1916,8 @@ mod tests {
|
||||
overlay.submit_answers();
|
||||
|
||||
let event = rx.try_recv().expect("expected AppEvent");
|
||||
let AppEvent::CodexOp(Op::UserInputAnswer { response, .. }) = event else {
|
||||
let AppEvent::AppServerAction(AppServerAction::UserInputAnswer { response, .. }) = event
|
||||
else {
|
||||
panic!("expected UserInputAnswer");
|
||||
};
|
||||
let answer = response.answers.get("q1").expect("answer missing");
|
||||
@@ -1931,7 +1942,8 @@ mod tests {
|
||||
overlay.submit_answers();
|
||||
|
||||
let event = rx.try_recv().expect("expected AppEvent");
|
||||
let AppEvent::CodexOp(Op::UserInputAnswer { response, .. }) = event else {
|
||||
let AppEvent::AppServerAction(AppServerAction::UserInputAnswer { response, .. }) = event
|
||||
else {
|
||||
panic!("expected UserInputAnswer");
|
||||
};
|
||||
let answer = response.answers.get("q1").expect("answer missing");
|
||||
@@ -1973,7 +1985,8 @@ mod tests {
|
||||
overlay.submit_answers();
|
||||
|
||||
let event = rx.try_recv().expect("expected AppEvent");
|
||||
let AppEvent::CodexOp(Op::UserInputAnswer { response, .. }) = event else {
|
||||
let AppEvent::AppServerAction(AppServerAction::UserInputAnswer { response, .. }) = event
|
||||
else {
|
||||
panic!("expected UserInputAnswer");
|
||||
};
|
||||
let answer = response.answers.get("q1").expect("answer missing");
|
||||
@@ -2009,7 +2022,8 @@ mod tests {
|
||||
overlay.submit_answers();
|
||||
|
||||
let event = rx.try_recv().expect("expected AppEvent");
|
||||
let AppEvent::CodexOp(Op::UserInputAnswer { response, .. }) = event else {
|
||||
let AppEvent::AppServerAction(AppServerAction::UserInputAnswer { response, .. }) = event
|
||||
else {
|
||||
panic!("expected UserInputAnswer");
|
||||
};
|
||||
let answer = response.answers.get("q1").expect("answer missing");
|
||||
@@ -2094,7 +2108,8 @@ mod tests {
|
||||
overlay.submit_answers();
|
||||
|
||||
let event = rx.try_recv().expect("expected AppEvent");
|
||||
let AppEvent::CodexOp(Op::UserInputAnswer { response, .. }) = event else {
|
||||
let AppEvent::AppServerAction(AppServerAction::UserInputAnswer { response, .. }) = event
|
||||
else {
|
||||
panic!("expected UserInputAnswer");
|
||||
};
|
||||
let answer = response.answers.get("q1").expect("answer missing");
|
||||
|
||||
@@ -13,6 +13,7 @@ use ratatui::widgets::Block;
|
||||
use ratatui::widgets::Widget;
|
||||
|
||||
use crate::app_event::AppEvent;
|
||||
use crate::app_event::AppServerAction;
|
||||
use crate::app_event_sender::AppEventSender;
|
||||
use crate::key_hint;
|
||||
use crate::render::Insets;
|
||||
@@ -22,7 +23,6 @@ use crate::render::renderable::Renderable;
|
||||
use crate::skills_helpers::match_skill;
|
||||
use crate::skills_helpers::truncate_skill_name;
|
||||
use crate::style::user_message_style;
|
||||
use codex_core::protocol::Op;
|
||||
|
||||
use super::CancellationEvent;
|
||||
use super::bottom_pane_view::BottomPaneView;
|
||||
@@ -187,10 +187,11 @@ impl SkillsToggleView {
|
||||
}
|
||||
self.complete = true;
|
||||
self.app_event_tx.send(AppEvent::ManageSkillsClosed);
|
||||
self.app_event_tx.send(AppEvent::CodexOp(Op::ListSkills {
|
||||
cwds: Vec::new(),
|
||||
force_reload: true,
|
||||
}));
|
||||
self.app_event_tx
|
||||
.send(AppEvent::AppServerAction(AppServerAction::ListSkills {
|
||||
cwds: Vec::new(),
|
||||
force_reload: true,
|
||||
}));
|
||||
}
|
||||
|
||||
fn rows_width(total_width: u16) -> u16 {
|
||||
|
||||
@@ -69,7 +69,6 @@ use codex_core::protocol::McpStartupStatus;
|
||||
use codex_core::protocol::McpStartupUpdateEvent;
|
||||
use codex_core::protocol::McpToolCallBeginEvent;
|
||||
use codex_core::protocol::McpToolCallEndEvent;
|
||||
use codex_core::protocol::Op;
|
||||
use codex_core::protocol::PatchApplyBeginEvent;
|
||||
use codex_core::protocol::RateLimitSnapshot;
|
||||
use codex_core::protocol::ReviewRequest;
|
||||
@@ -90,7 +89,6 @@ use codex_core::protocol::WarningEvent;
|
||||
use codex_core::protocol::WebSearchBeginEvent;
|
||||
use codex_core::protocol::WebSearchEndEvent;
|
||||
use codex_core::skills::model::SkillMetadata;
|
||||
#[cfg(target_os = "windows")]
|
||||
use codex_core::windows_sandbox::WindowsSandboxLevelExt;
|
||||
use codex_otel::OtelManager;
|
||||
use codex_protocol::ThreadId;
|
||||
@@ -101,7 +99,6 @@ use codex_protocol::config_types::CollaborationModeMask;
|
||||
use codex_protocol::config_types::ModeKind;
|
||||
use codex_protocol::config_types::Personality;
|
||||
use codex_protocol::config_types::Settings;
|
||||
#[cfg(target_os = "windows")]
|
||||
use codex_protocol::config_types::WindowsSandboxLevel;
|
||||
use codex_protocol::models::local_image_label_text;
|
||||
use codex_protocol::parse_command::ParsedCommand;
|
||||
@@ -122,7 +119,6 @@ use ratatui::style::Stylize;
|
||||
use ratatui::text::Line;
|
||||
use ratatui::widgets::Paragraph;
|
||||
use ratatui::widgets::Wrap;
|
||||
use tokio::sync::mpsc::UnboundedSender;
|
||||
use tokio::task::JoinHandle;
|
||||
use tracing::debug;
|
||||
|
||||
@@ -133,8 +129,10 @@ const PLAN_IMPLEMENTATION_NO: &str = "No, stay in Plan mode";
|
||||
const PLAN_IMPLEMENTATION_CODING_MESSAGE: &str = "Implement the plan.";
|
||||
|
||||
use crate::app_event::AppEvent;
|
||||
use crate::app_event::AppServerAction;
|
||||
use crate::app_event::ConnectorsSnapshot;
|
||||
use crate::app_event::ExitMode;
|
||||
use crate::app_event::TurnStartRequest;
|
||||
#[cfg(target_os = "windows")]
|
||||
use crate::app_event::WindowsSandboxEnableMode;
|
||||
use crate::app_event::WindowsSandboxFallbackReason;
|
||||
@@ -186,10 +184,7 @@ use crate::text_formatting::truncate_text;
|
||||
use crate::tui::FrameRequester;
|
||||
mod interrupts;
|
||||
use self::interrupts::InterruptManager;
|
||||
mod agent;
|
||||
use self::agent::spawn_agent;
|
||||
use self::agent::spawn_agent_from_existing;
|
||||
pub(crate) use self::agent::spawn_op_forwarder;
|
||||
// Agent execution now lives behind the app-server client.
|
||||
mod session_header;
|
||||
use self::session_header::SessionHeader;
|
||||
mod skills;
|
||||
@@ -204,7 +199,6 @@ use codex_common::approval_presets::ApprovalPreset;
|
||||
use codex_common::approval_presets::builtin_approval_presets;
|
||||
use codex_core::AuthManager;
|
||||
use codex_core::CodexAuth;
|
||||
use codex_core::ThreadManager;
|
||||
use codex_core::protocol::AskForApproval;
|
||||
use codex_core::protocol::SandboxPolicy;
|
||||
use codex_file_search::FileMatch;
|
||||
@@ -443,7 +437,7 @@ pub(crate) enum ExternalEditorState {
|
||||
///
|
||||
/// `ChatWidget` owns the state derived from the protocol event stream (history cells, streaming
|
||||
/// buffers, bottom-pane overlays, and transient status text) and turns key presses into user
|
||||
/// intent (`Op` submissions and `AppEvent` requests).
|
||||
/// intent (app-server requests and `AppEvent` requests).
|
||||
///
|
||||
/// It is not responsible for running the agent itself; it reflects progress by updating UI state
|
||||
/// and by sending requests back to codex-core.
|
||||
@@ -453,7 +447,6 @@ pub(crate) enum ExternalEditorState {
|
||||
/// active work, arming the double-press quit shortcut, and requesting shutdown-first exit.
|
||||
pub(crate) struct ChatWidget {
|
||||
app_event_tx: AppEventSender,
|
||||
codex_op_tx: UnboundedSender<Op>,
|
||||
bottom_pane: BottomPane,
|
||||
active_cell: Option<Box<dyn HistoryCell>>,
|
||||
/// Monotonic-ish counter used to invalidate transcript overlay caching.
|
||||
@@ -821,9 +814,9 @@ impl ChatWidget {
|
||||
if let Some(messages) = initial_messages {
|
||||
self.replay_initial_messages(messages);
|
||||
}
|
||||
// Ask codex-core to enumerate custom prompts for this session.
|
||||
self.submit_op(Op::ListCustomPrompts);
|
||||
self.submit_op(Op::ListSkills {
|
||||
// Ask the app server to enumerate custom prompts and skills for this session.
|
||||
self.submit_action(AppServerAction::ListCustomPrompts);
|
||||
self.submit_action(AppServerAction::ListSkills {
|
||||
cwds: Vec::new(),
|
||||
force_reload: true,
|
||||
});
|
||||
@@ -2162,7 +2155,7 @@ impl ChatWidget {
|
||||
self.had_work_activity = true;
|
||||
}
|
||||
|
||||
pub(crate) fn new(common: ChatWidgetInit, thread_manager: Arc<ThreadManager>) -> Self {
|
||||
pub(crate) fn new(common: ChatWidgetInit) -> Self {
|
||||
let ChatWidgetInit {
|
||||
config,
|
||||
frame_requester,
|
||||
@@ -2182,8 +2175,6 @@ impl ChatWidget {
|
||||
config.model = model.clone();
|
||||
let mut rng = rand::rng();
|
||||
let placeholder = PLACEHOLDERS[rng.random_range(0..PLACEHOLDERS.len())].to_string();
|
||||
let codex_op_tx = spawn_agent(config.clone(), app_event_tx.clone(), thread_manager);
|
||||
|
||||
let model_override = model.as_deref();
|
||||
let model_for_header = model
|
||||
.clone()
|
||||
@@ -2210,7 +2201,6 @@ impl ChatWidget {
|
||||
let mut widget = Self {
|
||||
app_event_tx: app_event_tx.clone(),
|
||||
frame_requester: frame_requester.clone(),
|
||||
codex_op_tx,
|
||||
bottom_pane: BottomPane::new(BottomPaneParams {
|
||||
frame_requester,
|
||||
app_event_tx,
|
||||
@@ -2305,281 +2295,6 @@ impl ChatWidget {
|
||||
widget
|
||||
}
|
||||
|
||||
pub(crate) fn new_with_op_sender(
|
||||
common: ChatWidgetInit,
|
||||
codex_op_tx: UnboundedSender<Op>,
|
||||
) -> Self {
|
||||
let ChatWidgetInit {
|
||||
config,
|
||||
frame_requester,
|
||||
app_event_tx,
|
||||
initial_user_message,
|
||||
enhanced_keys_supported,
|
||||
auth_manager,
|
||||
models_manager,
|
||||
feedback,
|
||||
is_first_run,
|
||||
feedback_audience,
|
||||
model,
|
||||
otel_manager,
|
||||
} = common;
|
||||
let model = model.filter(|m| !m.trim().is_empty());
|
||||
let mut config = config;
|
||||
config.model = model.clone();
|
||||
let mut rng = rand::rng();
|
||||
let placeholder = PLACEHOLDERS[rng.random_range(0..PLACEHOLDERS.len())].to_string();
|
||||
|
||||
let model_override = model.as_deref();
|
||||
let model_for_header = model
|
||||
.clone()
|
||||
.unwrap_or_else(|| DEFAULT_MODEL_DISPLAY_NAME.to_string());
|
||||
let active_collaboration_mask =
|
||||
Self::initial_collaboration_mask(&config, models_manager.as_ref(), model_override);
|
||||
let header_model = active_collaboration_mask
|
||||
.as_ref()
|
||||
.and_then(|mask| mask.model.clone())
|
||||
.unwrap_or_else(|| model_for_header.clone());
|
||||
let fallback_custom = Settings {
|
||||
model: header_model.clone(),
|
||||
reasoning_effort: None,
|
||||
developer_instructions: None,
|
||||
};
|
||||
// Collaboration modes start in Custom mode (not activated).
|
||||
let current_collaboration_mode = CollaborationMode {
|
||||
mode: ModeKind::Custom,
|
||||
settings: fallback_custom,
|
||||
};
|
||||
|
||||
let active_cell = Some(Self::placeholder_session_header_cell(&config));
|
||||
|
||||
let mut widget = Self {
|
||||
app_event_tx: app_event_tx.clone(),
|
||||
frame_requester: frame_requester.clone(),
|
||||
codex_op_tx,
|
||||
bottom_pane: BottomPane::new(BottomPaneParams {
|
||||
frame_requester,
|
||||
app_event_tx,
|
||||
has_input_focus: true,
|
||||
enhanced_keys_supported,
|
||||
placeholder_text: placeholder,
|
||||
disable_paste_burst: config.disable_paste_burst,
|
||||
animations_enabled: config.animations,
|
||||
skills: None,
|
||||
}),
|
||||
active_cell,
|
||||
active_cell_revision: 0,
|
||||
config,
|
||||
skills_all: Vec::new(),
|
||||
skills_initial_state: None,
|
||||
current_collaboration_mode,
|
||||
active_collaboration_mask,
|
||||
auth_manager,
|
||||
models_manager,
|
||||
otel_manager,
|
||||
session_header: SessionHeader::new(header_model),
|
||||
initial_user_message,
|
||||
token_info: None,
|
||||
rate_limit_snapshot: None,
|
||||
plan_type: None,
|
||||
rate_limit_warnings: RateLimitWarningState::default(),
|
||||
rate_limit_switch_prompt: RateLimitSwitchPromptState::default(),
|
||||
rate_limit_poller: None,
|
||||
stream_controller: None,
|
||||
plan_stream_controller: None,
|
||||
running_commands: HashMap::new(),
|
||||
suppressed_exec_calls: HashSet::new(),
|
||||
last_unified_wait: None,
|
||||
unified_exec_wait_streak: None,
|
||||
task_complete_pending: false,
|
||||
unified_exec_processes: Vec::new(),
|
||||
agent_turn_running: false,
|
||||
mcp_startup_status: None,
|
||||
connectors_cache: ConnectorsCacheState::default(),
|
||||
interrupts: InterruptManager::new(),
|
||||
reasoning_buffer: String::new(),
|
||||
full_reasoning_buffer: String::new(),
|
||||
current_status_header: String::from("Working"),
|
||||
retry_status_header: None,
|
||||
thread_id: None,
|
||||
thread_name: None,
|
||||
forked_from: None,
|
||||
saw_plan_update_this_turn: false,
|
||||
saw_plan_item_this_turn: false,
|
||||
plan_delta_buffer: String::new(),
|
||||
plan_item_active: false,
|
||||
queued_user_messages: VecDeque::new(),
|
||||
show_welcome_banner: is_first_run,
|
||||
suppress_session_configured_redraw: false,
|
||||
pending_notification: None,
|
||||
quit_shortcut_expires_at: None,
|
||||
quit_shortcut_key: None,
|
||||
is_review_mode: false,
|
||||
pre_review_token_info: None,
|
||||
needs_final_message_separator: false,
|
||||
had_work_activity: false,
|
||||
last_separator_elapsed_secs: None,
|
||||
last_rendered_width: std::cell::Cell::new(None),
|
||||
feedback,
|
||||
feedback_audience,
|
||||
current_rollout_path: None,
|
||||
external_editor_state: ExternalEditorState::Closed,
|
||||
};
|
||||
|
||||
widget.prefetch_rate_limits();
|
||||
widget
|
||||
.bottom_pane
|
||||
.set_steer_enabled(widget.config.features.enabled(Feature::Steer));
|
||||
widget.bottom_pane.set_collaboration_modes_enabled(
|
||||
widget.config.features.enabled(Feature::CollaborationModes),
|
||||
);
|
||||
widget.sync_personality_command_enabled();
|
||||
|
||||
widget
|
||||
}
|
||||
|
||||
/// Create a ChatWidget attached to an existing conversation (e.g., a fork).
|
||||
pub(crate) fn new_from_existing(
|
||||
common: ChatWidgetInit,
|
||||
conversation: std::sync::Arc<codex_core::CodexThread>,
|
||||
session_configured: codex_core::protocol::SessionConfiguredEvent,
|
||||
) -> Self {
|
||||
let ChatWidgetInit {
|
||||
config,
|
||||
frame_requester,
|
||||
app_event_tx,
|
||||
initial_user_message,
|
||||
enhanced_keys_supported,
|
||||
auth_manager,
|
||||
models_manager,
|
||||
feedback,
|
||||
feedback_audience,
|
||||
model,
|
||||
otel_manager,
|
||||
..
|
||||
} = common;
|
||||
let model = model.filter(|m| !m.trim().is_empty());
|
||||
let mut rng = rand::rng();
|
||||
let placeholder = PLACEHOLDERS[rng.random_range(0..PLACEHOLDERS.len())].to_string();
|
||||
|
||||
let model_override = model.as_deref();
|
||||
let header_model = model
|
||||
.clone()
|
||||
.unwrap_or_else(|| session_configured.model.clone());
|
||||
let active_collaboration_mask =
|
||||
Self::initial_collaboration_mask(&config, models_manager.as_ref(), model_override);
|
||||
let header_model = active_collaboration_mask
|
||||
.as_ref()
|
||||
.and_then(|mask| mask.model.clone())
|
||||
.unwrap_or(header_model);
|
||||
|
||||
let codex_op_tx =
|
||||
spawn_agent_from_existing(conversation, session_configured, app_event_tx.clone());
|
||||
|
||||
let fallback_custom = Settings {
|
||||
model: header_model.clone(),
|
||||
reasoning_effort: None,
|
||||
developer_instructions: None,
|
||||
};
|
||||
// Collaboration modes start in Custom mode (not activated).
|
||||
let current_collaboration_mode = CollaborationMode {
|
||||
mode: ModeKind::Custom,
|
||||
settings: fallback_custom,
|
||||
};
|
||||
|
||||
let mut widget = Self {
|
||||
app_event_tx: app_event_tx.clone(),
|
||||
frame_requester: frame_requester.clone(),
|
||||
codex_op_tx,
|
||||
bottom_pane: BottomPane::new(BottomPaneParams {
|
||||
frame_requester,
|
||||
app_event_tx,
|
||||
has_input_focus: true,
|
||||
enhanced_keys_supported,
|
||||
placeholder_text: placeholder,
|
||||
disable_paste_burst: config.disable_paste_burst,
|
||||
animations_enabled: config.animations,
|
||||
skills: None,
|
||||
}),
|
||||
active_cell: None,
|
||||
active_cell_revision: 0,
|
||||
config,
|
||||
skills_all: Vec::new(),
|
||||
skills_initial_state: None,
|
||||
current_collaboration_mode,
|
||||
active_collaboration_mask,
|
||||
auth_manager,
|
||||
models_manager,
|
||||
otel_manager,
|
||||
session_header: SessionHeader::new(header_model),
|
||||
initial_user_message,
|
||||
token_info: None,
|
||||
rate_limit_snapshot: None,
|
||||
plan_type: None,
|
||||
rate_limit_warnings: RateLimitWarningState::default(),
|
||||
rate_limit_switch_prompt: RateLimitSwitchPromptState::default(),
|
||||
rate_limit_poller: None,
|
||||
stream_controller: None,
|
||||
plan_stream_controller: None,
|
||||
running_commands: HashMap::new(),
|
||||
suppressed_exec_calls: HashSet::new(),
|
||||
last_unified_wait: None,
|
||||
unified_exec_wait_streak: None,
|
||||
task_complete_pending: false,
|
||||
unified_exec_processes: Vec::new(),
|
||||
agent_turn_running: false,
|
||||
mcp_startup_status: None,
|
||||
connectors_cache: ConnectorsCacheState::default(),
|
||||
interrupts: InterruptManager::new(),
|
||||
reasoning_buffer: String::new(),
|
||||
full_reasoning_buffer: String::new(),
|
||||
current_status_header: String::from("Working"),
|
||||
retry_status_header: None,
|
||||
thread_id: None,
|
||||
thread_name: None,
|
||||
forked_from: None,
|
||||
queued_user_messages: VecDeque::new(),
|
||||
show_welcome_banner: false,
|
||||
suppress_session_configured_redraw: true,
|
||||
pending_notification: None,
|
||||
quit_shortcut_expires_at: None,
|
||||
quit_shortcut_key: None,
|
||||
is_review_mode: false,
|
||||
pre_review_token_info: None,
|
||||
needs_final_message_separator: false,
|
||||
had_work_activity: false,
|
||||
saw_plan_update_this_turn: false,
|
||||
saw_plan_item_this_turn: false,
|
||||
plan_delta_buffer: String::new(),
|
||||
plan_item_active: false,
|
||||
last_separator_elapsed_secs: None,
|
||||
last_rendered_width: std::cell::Cell::new(None),
|
||||
feedback,
|
||||
feedback_audience,
|
||||
current_rollout_path: None,
|
||||
external_editor_state: ExternalEditorState::Closed,
|
||||
};
|
||||
|
||||
widget.prefetch_rate_limits();
|
||||
widget
|
||||
.bottom_pane
|
||||
.set_steer_enabled(widget.config.features.enabled(Feature::Steer));
|
||||
widget.bottom_pane.set_collaboration_modes_enabled(
|
||||
widget.config.features.enabled(Feature::CollaborationModes),
|
||||
);
|
||||
widget.sync_personality_command_enabled();
|
||||
#[cfg(target_os = "windows")]
|
||||
widget.bottom_pane.set_windows_degraded_sandbox_active(
|
||||
codex_core::windows_sandbox::ELEVATED_SANDBOX_NUX_ENABLED
|
||||
&& matches!(
|
||||
WindowsSandboxLevel::from_config(&widget.config),
|
||||
WindowsSandboxLevel::RestrictedToken
|
||||
),
|
||||
);
|
||||
widget.update_collaboration_mode_indicator();
|
||||
|
||||
widget
|
||||
}
|
||||
|
||||
pub(crate) fn handle_key_event(&mut self, key_event: KeyEvent) {
|
||||
match key_event {
|
||||
KeyEvent {
|
||||
@@ -2604,6 +2319,19 @@ impl ChatWidget {
|
||||
self.quit_shortcut_expires_at = None;
|
||||
self.quit_shortcut_key = None;
|
||||
}
|
||||
KeyEvent {
|
||||
code: KeyCode::Char(c),
|
||||
modifiers,
|
||||
kind: KeyEventKind::Press,
|
||||
..
|
||||
} if modifiers.contains(KeyModifiers::SUPER) && c.eq_ignore_ascii_case(&'d') => {
|
||||
if self.on_ctrl_d() {
|
||||
return;
|
||||
}
|
||||
self.bottom_pane.clear_quit_shortcut_hint();
|
||||
self.quit_shortcut_expires_at = None;
|
||||
self.quit_shortcut_key = None;
|
||||
}
|
||||
KeyEvent {
|
||||
code: KeyCode::Char(c),
|
||||
modifiers,
|
||||
@@ -2804,7 +2532,8 @@ impl ChatWidget {
|
||||
}
|
||||
SlashCommand::Compact => {
|
||||
self.clear_token_usage();
|
||||
self.app_event_tx.send(AppEvent::CodexOp(Op::Compact));
|
||||
self.app_event_tx
|
||||
.send(AppEvent::AppServerAction(AppServerAction::Compact));
|
||||
}
|
||||
SlashCommand::Review => {
|
||||
self.open_review_popup();
|
||||
@@ -2912,7 +2641,7 @@ impl ChatWidget {
|
||||
self.request_quit_without_confirmation();
|
||||
}
|
||||
// SlashCommand::Undo => {
|
||||
// self.app_event_tx.send(AppEvent::CodexOp(Op::Undo));
|
||||
// Undo shortcut pending: would dispatch an app-server action.
|
||||
// }
|
||||
SlashCommand::Diff => {
|
||||
self.add_diff_in_progress();
|
||||
@@ -3021,15 +2750,14 @@ impl ChatWidget {
|
||||
let cell = Self::rename_confirmation_cell(&name, self.thread_id);
|
||||
self.add_boxed_history(Box::new(cell));
|
||||
self.request_redraw();
|
||||
self.app_event_tx
|
||||
.send(AppEvent::CodexOp(Op::SetThreadName { name }));
|
||||
self.submit_action(AppServerAction::ThreadSetName { name });
|
||||
}
|
||||
SlashCommand::Collab | SlashCommand::Plan => {
|
||||
let _ = trimmed;
|
||||
self.dispatch_command(cmd);
|
||||
}
|
||||
SlashCommand::Review if !trimmed.is_empty() => {
|
||||
self.submit_op(Op::Review {
|
||||
self.submit_action(AppServerAction::ReviewStart {
|
||||
review_request: ReviewRequest {
|
||||
target: ReviewTarget::Custom {
|
||||
instructions: trimmed.to_string(),
|
||||
@@ -3067,7 +2795,9 @@ impl ChatWidget {
|
||||
};
|
||||
let cell = Self::rename_confirmation_cell(&name, thread_id);
|
||||
tx.send(AppEvent::InsertHistoryCell(Box::new(cell)));
|
||||
tx.send(AppEvent::CodexOp(Op::SetThreadName { name }));
|
||||
tx.send(AppEvent::AppServerAction(AppServerAction::ThreadSetName {
|
||||
name,
|
||||
}));
|
||||
}),
|
||||
);
|
||||
|
||||
@@ -3168,7 +2898,7 @@ impl ChatWidget {
|
||||
)));
|
||||
return;
|
||||
}
|
||||
self.submit_op(Op::RunUserShellCommand {
|
||||
self.submit_action(AppServerAction::RunUserShellCommand {
|
||||
command: cmd.to_string(),
|
||||
});
|
||||
return;
|
||||
@@ -3228,30 +2958,24 @@ impl ChatWidget {
|
||||
.model_personality
|
||||
.filter(|_| self.config.features.enabled(Feature::Personality))
|
||||
.filter(|_| self.current_model_supports_personality());
|
||||
let op = Op::UserTurn {
|
||||
let request = TurnStartRequest {
|
||||
items,
|
||||
cwd: self.config.cwd.clone(),
|
||||
approval_policy: self.config.approval_policy.value(),
|
||||
sandbox_policy: self.config.sandbox_policy.get().clone(),
|
||||
windows_sandbox_level: WindowsSandboxLevel::from_config(&self.config),
|
||||
model: effective_mode.model().to_string(),
|
||||
effort: effective_mode.reasoning_effort(),
|
||||
summary: self.config.model_reasoning_summary,
|
||||
final_output_json_schema: None,
|
||||
summary: Some(self.config.model_reasoning_summary),
|
||||
collaboration_mode,
|
||||
personality,
|
||||
output_schema: None,
|
||||
};
|
||||
|
||||
self.codex_op_tx.send(op).unwrap_or_else(|e| {
|
||||
tracing::error!("failed to send message: {e}");
|
||||
});
|
||||
self.submit_action(AppServerAction::TurnStart(request));
|
||||
|
||||
// Persist the text to cross-session message history.
|
||||
if !text.is_empty() {
|
||||
self.codex_op_tx
|
||||
.send(Op::AddToHistory { text: text.clone() })
|
||||
.unwrap_or_else(|e| {
|
||||
tracing::error!("failed to send AddHistory op: {e}");
|
||||
});
|
||||
self.submit_action(AppServerAction::AddToHistory { text: text.clone() });
|
||||
}
|
||||
|
||||
// Only show the text portion in conversation history.
|
||||
@@ -3410,7 +3134,7 @@ impl ChatWidget {
|
||||
EventMsg::ListCustomPromptsResponse(ev) => self.on_list_custom_prompts(ev),
|
||||
EventMsg::ListSkillsResponse(ev) => self.on_list_skills(ev),
|
||||
EventMsg::SkillsUpdateAvailable => {
|
||||
self.submit_op(Op::ListSkills {
|
||||
self.submit_action(AppServerAction::ListSkills {
|
||||
cwds: Vec::new(),
|
||||
force_reload: true,
|
||||
});
|
||||
@@ -3751,17 +3475,6 @@ impl ChatWidget {
|
||||
let default_effort: ReasoningEffortConfig = preset.default_reasoning_effort;
|
||||
|
||||
let switch_actions: Vec<SelectionAction> = vec![Box::new(move |tx| {
|
||||
tx.send(AppEvent::CodexOp(Op::OverrideTurnContext {
|
||||
cwd: None,
|
||||
approval_policy: None,
|
||||
sandbox_policy: None,
|
||||
windows_sandbox_level: None,
|
||||
model: Some(switch_model.clone()),
|
||||
effort: Some(Some(default_effort)),
|
||||
summary: None,
|
||||
collaboration_mode: None,
|
||||
personality: None,
|
||||
}));
|
||||
tx.send(AppEvent::UpdateModel(switch_model.clone()));
|
||||
tx.send(AppEvent::UpdateReasoningEffort(Some(default_effort)));
|
||||
})];
|
||||
@@ -3874,17 +3587,6 @@ impl ChatWidget {
|
||||
let name = Self::personality_label(personality).to_string();
|
||||
let description = Some(Self::personality_description(personality).to_string());
|
||||
let actions: Vec<SelectionAction> = vec![Box::new(move |tx| {
|
||||
tx.send(AppEvent::CodexOp(Op::OverrideTurnContext {
|
||||
cwd: None,
|
||||
approval_policy: None,
|
||||
sandbox_policy: None,
|
||||
model: None,
|
||||
effort: None,
|
||||
summary: None,
|
||||
collaboration_mode: None,
|
||||
windows_sandbox_level: None,
|
||||
personality: Some(personality),
|
||||
}));
|
||||
tx.send(AppEvent::UpdatePersonality(personality));
|
||||
tx.send(AppEvent::PersistPersonalitySelection { personality });
|
||||
})];
|
||||
@@ -4145,17 +3847,6 @@ impl ChatWidget {
|
||||
let effort_label = effort_for_action
|
||||
.map(|effort| effort.to_string())
|
||||
.unwrap_or_else(|| "default".to_string());
|
||||
tx.send(AppEvent::CodexOp(Op::OverrideTurnContext {
|
||||
cwd: None,
|
||||
approval_policy: None,
|
||||
sandbox_policy: None,
|
||||
windows_sandbox_level: None,
|
||||
model: Some(model_for_action.clone()),
|
||||
effort: Some(effort_for_action),
|
||||
summary: None,
|
||||
collaboration_mode: None,
|
||||
personality: None,
|
||||
}));
|
||||
tx.send(AppEvent::UpdateModel(model_for_action.clone()));
|
||||
tx.send(AppEvent::UpdateReasoningEffort(effort_for_action));
|
||||
tx.send(AppEvent::PersistModelSelection {
|
||||
@@ -4318,18 +4009,6 @@ impl ChatWidget {
|
||||
}
|
||||
|
||||
fn apply_model_and_effort(&self, model: String, effort: Option<ReasoningEffortConfig>) {
|
||||
self.app_event_tx
|
||||
.send(AppEvent::CodexOp(Op::OverrideTurnContext {
|
||||
cwd: None,
|
||||
approval_policy: None,
|
||||
sandbox_policy: None,
|
||||
windows_sandbox_level: None,
|
||||
model: Some(model.clone()),
|
||||
effort: Some(effort),
|
||||
summary: None,
|
||||
collaboration_mode: None,
|
||||
personality: None,
|
||||
}));
|
||||
self.app_event_tx.send(AppEvent::UpdateModel(model.clone()));
|
||||
self.app_event_tx
|
||||
.send(AppEvent::UpdateReasoningEffort(effort));
|
||||
@@ -4508,17 +4187,6 @@ impl ChatWidget {
|
||||
) -> Vec<SelectionAction> {
|
||||
vec![Box::new(move |tx| {
|
||||
let sandbox_clone = sandbox.clone();
|
||||
tx.send(AppEvent::CodexOp(Op::OverrideTurnContext {
|
||||
cwd: None,
|
||||
approval_policy: Some(approval),
|
||||
sandbox_policy: Some(sandbox_clone.clone()),
|
||||
windows_sandbox_level: None,
|
||||
model: None,
|
||||
effort: None,
|
||||
summary: None,
|
||||
collaboration_mode: None,
|
||||
personality: None,
|
||||
}));
|
||||
tx.send(AppEvent::UpdateAskForApprovalPolicy(approval));
|
||||
tx.send(AppEvent::UpdateSandboxPolicy(sandbox_clone));
|
||||
})]
|
||||
@@ -5356,7 +5024,8 @@ impl ChatWidget {
|
||||
/// Update the active collaboration mask.
|
||||
///
|
||||
/// When collaboration modes are enabled and a preset is selected (not Custom),
|
||||
/// the current mode is attached to submissions as `Op::UserTurn { collaboration_mode: Some(...) }`.
|
||||
/// the current mode is attached to submissions as `turn/start` with
|
||||
/// `collaboration_mode` set.
|
||||
pub(crate) fn set_collaboration_mask(&mut self, mask: CollaborationModeMask) {
|
||||
if !self.collaboration_modes_enabled() {
|
||||
return;
|
||||
@@ -5455,7 +5124,7 @@ impl ChatWidget {
|
||||
if self.config.mcp_servers.is_empty() {
|
||||
self.add_to_history(history_cell::empty_mcp_output());
|
||||
} else {
|
||||
self.submit_op(Op::ListMcpTools);
|
||||
self.submit_action(AppServerAction::ListMcpTools);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -5613,7 +5282,7 @@ impl ChatWidget {
|
||||
/// Handles a Ctrl+C press at the chat-widget layer.
|
||||
///
|
||||
/// The first press arms a time-bounded quit shortcut and shows a footer hint via the bottom
|
||||
/// pane. If cancellable work is active, Ctrl+C also submits `Op::Interrupt` after the shortcut
|
||||
/// pane. If cancellable work is active, Ctrl+C also submits an interrupt after the shortcut
|
||||
/// is armed.
|
||||
///
|
||||
/// If the same quit shortcut is pressed again before expiry, this requests a shutdown-first
|
||||
@@ -5636,7 +5305,7 @@ impl ChatWidget {
|
||||
|
||||
if !DOUBLE_PRESS_QUIT_SHORTCUT_ENABLED {
|
||||
if self.is_cancellable_work_active() {
|
||||
self.submit_op(Op::Interrupt);
|
||||
self.submit_action(AppServerAction::Interrupt);
|
||||
} else {
|
||||
self.request_quit_without_confirmation();
|
||||
}
|
||||
@@ -5653,7 +5322,7 @@ impl ChatWidget {
|
||||
self.arm_quit_shortcut(key);
|
||||
|
||||
if self.is_cancellable_work_active() {
|
||||
self.submit_op(Op::Interrupt);
|
||||
self.submit_action(AppServerAction::Interrupt);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -5756,16 +5425,14 @@ impl ChatWidget {
|
||||
pub(crate) fn clear_esc_backtrack_hint(&mut self) {
|
||||
self.bottom_pane.clear_esc_backtrack_hint();
|
||||
}
|
||||
/// Forward an `Op` directly to codex.
|
||||
pub(crate) fn submit_op(&mut self, op: Op) {
|
||||
// Record outbound operation for session replay fidelity.
|
||||
crate::session_log::log_outbound_op(&op);
|
||||
if matches!(&op, Op::Review { .. }) && !self.bottom_pane.is_task_running() {
|
||||
/// Forward an app-server action via the app event loop.
|
||||
pub(crate) fn submit_action(&mut self, action: AppServerAction) {
|
||||
if matches!(&action, AppServerAction::ReviewStart { .. })
|
||||
&& !self.bottom_pane.is_task_running()
|
||||
{
|
||||
self.bottom_pane.set_task_running(true);
|
||||
}
|
||||
if let Err(e) = self.codex_op_tx.send(op) {
|
||||
tracing::error!("failed to submit op: {e}");
|
||||
}
|
||||
self.app_event_tx.send(AppEvent::AppServerAction(action));
|
||||
}
|
||||
|
||||
fn on_list_mcp_tools(&mut self, ev: McpListToolsResponseEvent) {
|
||||
@@ -5821,7 +5488,7 @@ impl ChatWidget {
|
||||
items.push(SelectionItem {
|
||||
name: "Review uncommitted changes".to_string(),
|
||||
actions: vec![Box::new(move |tx: &AppEventSender| {
|
||||
tx.send(AppEvent::CodexOp(Op::Review {
|
||||
tx.send(AppEvent::AppServerAction(AppServerAction::ReviewStart {
|
||||
review_request: ReviewRequest {
|
||||
target: ReviewTarget::UncommittedChanges,
|
||||
user_facing_hint: None,
|
||||
@@ -5874,7 +5541,7 @@ impl ChatWidget {
|
||||
items.push(SelectionItem {
|
||||
name: format!("{current_branch} -> {branch}"),
|
||||
actions: vec![Box::new(move |tx3: &AppEventSender| {
|
||||
tx3.send(AppEvent::CodexOp(Op::Review {
|
||||
tx3.send(AppEvent::AppServerAction(AppServerAction::ReviewStart {
|
||||
review_request: ReviewRequest {
|
||||
target: ReviewTarget::BaseBranch {
|
||||
branch: branch.clone(),
|
||||
@@ -5911,7 +5578,7 @@ impl ChatWidget {
|
||||
items.push(SelectionItem {
|
||||
name: subject.clone(),
|
||||
actions: vec![Box::new(move |tx3: &AppEventSender| {
|
||||
tx3.send(AppEvent::CodexOp(Op::Review {
|
||||
tx3.send(AppEvent::AppServerAction(AppServerAction::ReviewStart {
|
||||
review_request: ReviewRequest {
|
||||
target: ReviewTarget::Commit {
|
||||
sha: sha.clone(),
|
||||
@@ -5948,7 +5615,7 @@ impl ChatWidget {
|
||||
if trimmed.is_empty() {
|
||||
return;
|
||||
}
|
||||
tx.send(AppEvent::CodexOp(Op::Review {
|
||||
tx.send(AppEvent::AppServerAction(AppServerAction::ReviewStart {
|
||||
review_request: ReviewRequest {
|
||||
target: ReviewTarget::Custom {
|
||||
instructions: trimmed,
|
||||
@@ -6196,7 +5863,7 @@ pub(crate) fn show_review_commit_picker_with_entries(
|
||||
items.push(SelectionItem {
|
||||
name: subject.clone(),
|
||||
actions: vec![Box::new(move |tx3: &AppEventSender| {
|
||||
tx3.send(AppEvent::CodexOp(Op::Review {
|
||||
tx3.send(AppEvent::AppServerAction(AppServerAction::ReviewStart {
|
||||
review_request: ReviewRequest {
|
||||
target: ReviewTarget::Commit {
|
||||
sha: sha.clone(),
|
||||
|
||||
@@ -1,122 +0,0 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use codex_core::CodexThread;
|
||||
use codex_core::NewThread;
|
||||
use codex_core::ThreadManager;
|
||||
use codex_core::config::Config;
|
||||
use codex_core::protocol::Event;
|
||||
use codex_core::protocol::EventMsg;
|
||||
use codex_core::protocol::Op;
|
||||
use tokio::sync::mpsc::UnboundedSender;
|
||||
use tokio::sync::mpsc::unbounded_channel;
|
||||
|
||||
use crate::app_event::AppEvent;
|
||||
use crate::app_event_sender::AppEventSender;
|
||||
|
||||
/// Spawn the agent bootstrapper and op forwarding loop, returning the
|
||||
/// `UnboundedSender<Op>` used by the UI to submit operations.
|
||||
pub(crate) fn spawn_agent(
|
||||
config: Config,
|
||||
app_event_tx: AppEventSender,
|
||||
server: Arc<ThreadManager>,
|
||||
) -> UnboundedSender<Op> {
|
||||
let (codex_op_tx, mut codex_op_rx) = unbounded_channel::<Op>();
|
||||
|
||||
let app_event_tx_clone = app_event_tx;
|
||||
tokio::spawn(async move {
|
||||
let NewThread {
|
||||
thread,
|
||||
session_configured,
|
||||
..
|
||||
} = match server.start_thread(config).await {
|
||||
Ok(v) => v,
|
||||
Err(err) => {
|
||||
let message = format!("Failed to initialize codex: {err}");
|
||||
tracing::error!("{message}");
|
||||
app_event_tx_clone.send(AppEvent::CodexEvent(Event {
|
||||
id: "".to_string(),
|
||||
msg: EventMsg::Error(err.to_error_event(None)),
|
||||
}));
|
||||
app_event_tx_clone.send(AppEvent::FatalExitRequest(message));
|
||||
tracing::error!("failed to initialize codex: {err}");
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
// Forward the captured `SessionConfigured` event so it can be rendered in the UI.
|
||||
let ev = codex_core::protocol::Event {
|
||||
// The `id` does not matter for rendering, so we can use a fake value.
|
||||
id: "".to_string(),
|
||||
msg: codex_core::protocol::EventMsg::SessionConfigured(session_configured),
|
||||
};
|
||||
app_event_tx_clone.send(AppEvent::CodexEvent(ev));
|
||||
|
||||
let thread_clone = thread.clone();
|
||||
tokio::spawn(async move {
|
||||
while let Some(op) = codex_op_rx.recv().await {
|
||||
let id = thread_clone.submit(op).await;
|
||||
if let Err(e) = id {
|
||||
tracing::error!("failed to submit op: {e}");
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
while let Ok(event) = thread.next_event().await {
|
||||
app_event_tx_clone.send(AppEvent::CodexEvent(event));
|
||||
}
|
||||
});
|
||||
|
||||
codex_op_tx
|
||||
}
|
||||
|
||||
/// Spawn agent loops for an existing thread (e.g., a forked thread).
|
||||
/// Sends the provided `SessionConfiguredEvent` immediately, then forwards subsequent
|
||||
/// events and accepts Ops for submission.
|
||||
pub(crate) fn spawn_agent_from_existing(
|
||||
thread: std::sync::Arc<CodexThread>,
|
||||
session_configured: codex_core::protocol::SessionConfiguredEvent,
|
||||
app_event_tx: AppEventSender,
|
||||
) -> UnboundedSender<Op> {
|
||||
let (codex_op_tx, mut codex_op_rx) = unbounded_channel::<Op>();
|
||||
|
||||
let app_event_tx_clone = app_event_tx;
|
||||
tokio::spawn(async move {
|
||||
// Forward the captured `SessionConfigured` event so it can be rendered in the UI.
|
||||
let ev = codex_core::protocol::Event {
|
||||
id: "".to_string(),
|
||||
msg: codex_core::protocol::EventMsg::SessionConfigured(session_configured),
|
||||
};
|
||||
app_event_tx_clone.send(AppEvent::CodexEvent(ev));
|
||||
|
||||
let thread_clone = thread.clone();
|
||||
tokio::spawn(async move {
|
||||
while let Some(op) = codex_op_rx.recv().await {
|
||||
let id = thread_clone.submit(op).await;
|
||||
if let Err(e) = id {
|
||||
tracing::error!("failed to submit op: {e}");
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
while let Ok(event) = thread.next_event().await {
|
||||
app_event_tx_clone.send(AppEvent::CodexEvent(event));
|
||||
}
|
||||
});
|
||||
|
||||
codex_op_tx
|
||||
}
|
||||
|
||||
/// Spawn an op-forwarding loop for an existing thread without subscribing to events.
|
||||
pub(crate) fn spawn_op_forwarder(thread: std::sync::Arc<CodexThread>) -> UnboundedSender<Op> {
|
||||
let (codex_op_tx, mut codex_op_rx) = unbounded_channel::<Op>();
|
||||
|
||||
tokio::spawn(async move {
|
||||
while let Some(op) = codex_op_rx.recv().await {
|
||||
if let Err(e) = thread.submit(op).await {
|
||||
tracing::error!("failed to submit op: {e}");
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
codex_op_tx
|
||||
}
|
||||
File diff suppressed because it is too large
Load Diff
@@ -58,6 +58,7 @@ mod app;
|
||||
mod app_backtrack;
|
||||
mod app_event;
|
||||
mod app_event_sender;
|
||||
mod app_server_client;
|
||||
mod ascii_animation;
|
||||
mod bottom_pane;
|
||||
mod chatwidget;
|
||||
|
||||
@@ -7,11 +7,11 @@ use std::sync::Mutex;
|
||||
use std::sync::OnceLock;
|
||||
|
||||
use codex_core::config::Config;
|
||||
use codex_core::protocol::Op;
|
||||
use serde::Serialize;
|
||||
use serde_json::json;
|
||||
|
||||
use crate::app_event::AppEvent;
|
||||
use crate::app_event::AppServerAction;
|
||||
|
||||
static LOGGER: LazyLock<SessionLogger> = LazyLock::new(SessionLogger::new);
|
||||
|
||||
@@ -128,6 +128,16 @@ pub(crate) fn log_inbound_app_event(event: &AppEvent) {
|
||||
AppEvent::CodexEvent(ev) => {
|
||||
write_record("to_tui", "codex_event", ev);
|
||||
}
|
||||
AppEvent::CodexThreadEvent { thread_id, event } => {
|
||||
let value = json!({
|
||||
"ts": now_ts(),
|
||||
"dir": "to_tui",
|
||||
"kind": "codex_event",
|
||||
"thread_id": thread_id.to_string(),
|
||||
"payload": event,
|
||||
});
|
||||
LOGGER.write_json_line(value);
|
||||
}
|
||||
AppEvent::NewSession => {
|
||||
let value = json!({
|
||||
"ts": now_ts(),
|
||||
@@ -177,11 +187,11 @@ pub(crate) fn log_inbound_app_event(event: &AppEvent) {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn log_outbound_op(op: &Op) {
|
||||
pub(crate) fn log_outbound_app_action(action: &AppServerAction) {
|
||||
if !LOGGER.is_enabled() {
|
||||
return;
|
||||
}
|
||||
write_record("from_tui", "op", op);
|
||||
write_record("from_tui", "app_server_action", action);
|
||||
}
|
||||
|
||||
pub(crate) fn log_session_end() {
|
||||
|
||||
@@ -4,7 +4,6 @@
|
||||
use std::time::Duration;
|
||||
use std::time::Instant;
|
||||
|
||||
use codex_core::protocol::Op;
|
||||
use crossterm::event::KeyCode;
|
||||
use ratatui::buffer::Buffer;
|
||||
use ratatui::layout::Rect;
|
||||
@@ -17,6 +16,7 @@ use ratatui::widgets::WidgetRef;
|
||||
use unicode_width::UnicodeWidthStr;
|
||||
|
||||
use crate::app_event::AppEvent;
|
||||
use crate::app_event::AppServerAction;
|
||||
use crate::app_event_sender::AppEventSender;
|
||||
use crate::exec_cell::spinner;
|
||||
use crate::key_hint;
|
||||
@@ -82,7 +82,8 @@ impl StatusIndicatorWidget {
|
||||
}
|
||||
|
||||
pub(crate) fn interrupt(&self) {
|
||||
self.app_event_tx.send(AppEvent::CodexOp(Op::Interrupt));
|
||||
self.app_event_tx
|
||||
.send(AppEvent::AppServerAction(AppServerAction::Interrupt));
|
||||
}
|
||||
|
||||
/// Update the animated header label (left of the brackets).
|
||||
|
||||
@@ -37,12 +37,16 @@ model_provider = "ollama"
|
||||
let CodexCliOutput { exit_code, output } = run_codex_cli(codex_home, cwd).await?;
|
||||
assert_ne!(0, exit_code, "Codex CLI should exit nonzero.");
|
||||
assert!(
|
||||
output.contains("ERROR: Failed to initialize codex:"),
|
||||
output.contains("Error: app server error -32603"),
|
||||
"expected startup error in output, got: {output}"
|
||||
);
|
||||
let expected_error = format!(
|
||||
"Fatal error: failed to load rules: failed to read rules files from {}",
|
||||
codex_home.join("rules").canonicalize()?.display()
|
||||
);
|
||||
assert!(
|
||||
output.contains("failed to read rules files"),
|
||||
"expected rules read error in output, got: {output}"
|
||||
output.contains(&expected_error),
|
||||
"expected:\n{expected_error}\nin output, got:\n{output}"
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user