Compare commits

...

1 Commits

Author SHA1 Message Date
Michael Bolin
14e4341c1c feat: migrate TUI to use app-server v2 2026-01-30 18:33:09 -08:00
27 changed files with 2734 additions and 1224 deletions

1
codex-rs/Cargo.lock generated
View File

@@ -1910,6 +1910,7 @@ dependencies = [
"chrono",
"clap",
"codex-ansi-escape",
"codex-app-server",
"codex-app-server-protocol",
"codex-arg0",
"codex-backend-client",

View File

@@ -71,6 +71,14 @@ macro_rules! client_request_definitions {
)*
}
impl ClientRequest {
pub fn request_id(&self) -> &RequestId {
match self {
$(Self::$variant { request_id, .. } => request_id,)*
}
}
}
pub fn export_client_responses(
out_dir: &::std::path::Path,
) -> ::std::result::Result<(), ::ts_rs::ExportError> {
@@ -140,6 +148,14 @@ client_request_definitions! {
params: v2::ThreadRollbackParams,
response: v2::ThreadRollbackResponse,
},
ThreadCompact => "thread/compact" {
params: v2::ThreadCompactParams,
response: v2::ThreadCompactResponse,
},
ThreadShutdown => "thread/shutdown" {
params: v2::ThreadShutdownParams,
response: v2::ThreadShutdownResponse,
},
ThreadList => "thread/list" {
params: v2::ThreadListParams,
response: v2::ThreadListResponse,
@@ -201,6 +217,10 @@ client_request_definitions! {
params: v2::ListMcpServerStatusParams,
response: v2::ListMcpServerStatusResponse,
},
McpElicitationResolve => "mcp/elicitation/resolve" {
params: v2::McpElicitationResolveParams,
response: v2::McpElicitationResolveResponse,
},
LoginAccount => "account/login/start" {
params: v2::LoginAccountParams,

View File

@@ -3,6 +3,7 @@ use std::path::PathBuf;
use crate::protocol::common::AuthMode;
use codex_protocol::account::PlanType;
use codex_protocol::approvals::ElicitationAction as CoreElicitationAction;
use codex_protocol::approvals::ExecPolicyAmendment as CoreExecPolicyAmendment;
use codex_protocol::config_types::CollaborationMode;
use codex_protocol::config_types::CollaborationModeMask;
@@ -12,6 +13,7 @@ use codex_protocol::config_types::ReasoningSummary;
use codex_protocol::config_types::SandboxMode as CoreSandboxMode;
use codex_protocol::config_types::Verbosity;
use codex_protocol::config_types::WebSearchMode;
use codex_protocol::config_types::WindowsSandboxLevel;
use codex_protocol::items::AgentMessageContent as CoreAgentMessageContent;
use codex_protocol::items::TurnItem as CoreTurnItem;
use codex_protocol::models::ResponseItem;
@@ -41,6 +43,7 @@ use codex_protocol::user_input::TextElement as CoreTextElement;
use codex_protocol::user_input::UserInput as CoreUserInput;
use codex_utils_absolute_path::AbsolutePathBuf;
use mcp_types::ContentBlock as McpContentBlock;
use mcp_types::RequestId as McpRequestId;
use mcp_types::Resource as McpResource;
use mcp_types::ResourceTemplate as McpResourceTemplate;
use mcp_types::Tool as McpTool;
@@ -233,6 +236,14 @@ v2_enum_from_core!(
}
);
v2_enum_from_core!(
pub enum ElicitationAction from CoreElicitationAction {
Accept,
Decline,
Cancel
}
);
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, JsonSchema, TS)]
#[serde(tag = "type", rename_all = "camelCase")]
#[ts(tag = "type")]
@@ -1061,6 +1072,21 @@ pub struct ListMcpServerStatusResponse {
pub next_cursor: Option<String>,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct McpElicitationResolveParams {
pub thread_id: String,
pub server_name: String,
pub request_id: McpRequestId,
pub decision: ElicitationAction,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct McpElicitationResolveResponse {}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Default, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
@@ -1354,6 +1380,32 @@ pub struct ThreadRollbackResponse {
pub thread: Thread,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct ThreadCompactParams {
pub thread_id: String,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct ThreadCompactResponse {
pub turn: Turn,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct ThreadShutdownParams {
pub thread_id: String,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct ThreadShutdownResponse {}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
@@ -1810,6 +1862,8 @@ pub struct TurnStartParams {
pub approval_policy: Option<AskForApproval>,
/// Override the sandbox policy for this turn and subsequent turns.
pub sandbox_policy: Option<SandboxPolicy>,
/// Override the Windows sandbox level for this turn and subsequent turns.
pub windows_sandbox_level: Option<WindowsSandboxLevel>,
/// Override the model for this turn and subsequent turns.
pub model: Option<String>,
/// Override the reasoning effort for this turn and subsequent turns.

View File

@@ -64,6 +64,8 @@ use codex_app_server_protocol::LoginChatGptCompleteNotification;
use codex_app_server_protocol::LoginChatGptResponse;
use codex_app_server_protocol::LogoutAccountResponse;
use codex_app_server_protocol::LogoutChatGptResponse;
use codex_app_server_protocol::McpElicitationResolveParams;
use codex_app_server_protocol::McpElicitationResolveResponse;
use codex_app_server_protocol::McpServerOauthLoginCompletedNotification;
use codex_app_server_protocol::McpServerOauthLoginParams;
use codex_app_server_protocol::McpServerOauthLoginResponse;
@@ -98,6 +100,8 @@ use codex_app_server_protocol::SkillsListResponse;
use codex_app_server_protocol::Thread;
use codex_app_server_protocol::ThreadArchiveParams;
use codex_app_server_protocol::ThreadArchiveResponse;
use codex_app_server_protocol::ThreadCompactParams;
use codex_app_server_protocol::ThreadCompactResponse;
use codex_app_server_protocol::ThreadForkParams;
use codex_app_server_protocol::ThreadForkResponse;
use codex_app_server_protocol::ThreadItem;
@@ -112,6 +116,8 @@ use codex_app_server_protocol::ThreadResumeResponse;
use codex_app_server_protocol::ThreadRollbackParams;
use codex_app_server_protocol::ThreadSetNameParams;
use codex_app_server_protocol::ThreadSetNameResponse;
use codex_app_server_protocol::ThreadShutdownParams;
use codex_app_server_protocol::ThreadShutdownResponse;
use codex_app_server_protocol::ThreadSortKey;
use codex_app_server_protocol::ThreadSourceKind;
use codex_app_server_protocol::ThreadStartParams;
@@ -447,6 +453,12 @@ impl CodexMessageProcessor {
ClientRequest::ThreadRollback { request_id, params } => {
self.thread_rollback(request_id, params).await;
}
ClientRequest::ThreadCompact { request_id, params } => {
self.thread_compact(request_id, params).await;
}
ClientRequest::ThreadShutdown { request_id, params } => {
self.thread_shutdown(request_id, params).await;
}
ClientRequest::ThreadList { request_id, params } => {
self.thread_list(request_id, params).await;
}
@@ -513,6 +525,9 @@ impl CodexMessageProcessor {
ClientRequest::McpServerStatusList { request_id, params } => {
self.list_mcp_server_status(request_id, params).await;
}
ClientRequest::McpElicitationResolve { request_id, params } => {
self.mcp_elicitation_resolve(request_id, params).await;
}
ClientRequest::LoginAccount { request_id, params } => {
self.login_v2(request_id, params).await;
}
@@ -2051,6 +2066,63 @@ impl CodexMessageProcessor {
}
}
async fn thread_compact(&mut self, request_id: RequestId, params: ThreadCompactParams) {
let ThreadCompactParams { thread_id } = params;
let (thread_uuid, thread) = match self.load_thread(&thread_id).await {
Ok(v) => v,
Err(error) => {
self.outgoing.send_error(request_id, error).await;
return;
}
};
match thread.submit(Op::Compact).await {
Ok(turn_id) => {
let turn = Turn {
id: turn_id.clone(),
items: Vec::new(),
error: None,
status: TurnStatus::InProgress,
};
let response = ThreadCompactResponse { turn: turn.clone() };
self.outgoing.send_response(request_id, response).await;
let notif = TurnStartedNotification {
thread_id: thread_uuid.to_string(),
turn,
};
self.outgoing
.send_server_notification(ServerNotification::TurnStarted(notif))
.await;
}
Err(err) => {
let error = JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message: format!("failed to start compact turn: {err}"),
data: None,
};
self.outgoing.send_error(request_id, error).await;
}
}
}
async fn thread_shutdown(&mut self, request_id: RequestId, params: ThreadShutdownParams) {
let ThreadShutdownParams { thread_id } = params;
let (thread_uuid, thread) = match self.load_thread(&thread_id).await {
Ok(v) => v,
Err(error) => {
self.outgoing.send_error(request_id, error).await;
return;
}
};
let _ = thread.submit(Op::Shutdown).await;
self.thread_manager.remove_thread(&thread_uuid).await;
let response = ThreadShutdownResponse {};
self.outgoing.send_response(request_id, response).await;
}
async fn thread_list(&self, request_id: RequestId, params: ThreadListParams) {
let ThreadListParams {
cursor,
@@ -3143,6 +3215,34 @@ impl CodexMessageProcessor {
});
}
async fn mcp_elicitation_resolve(
&self,
request_id: RequestId,
params: McpElicitationResolveParams,
) {
let McpElicitationResolveParams {
thread_id,
server_name,
request_id: elicitation_id,
decision,
} = params;
let (_, thread) = match self.load_thread(&thread_id).await {
Ok(v) => v,
Err(error) => {
self.outgoing.send_error(request_id, error).await;
return;
}
};
let op = Op::ResolveElicitation {
server_name,
request_id: elicitation_id,
decision: decision.to_core(),
};
let _ = thread.submit(op).await;
let response = McpElicitationResolveResponse {};
self.outgoing.send_response(request_id, response).await;
}
async fn list_mcp_server_status_task(
outgoing: Arc<OutgoingMessageSender>,
request_id: RequestId,
@@ -4090,6 +4190,7 @@ impl CodexMessageProcessor {
let has_any_overrides = params.cwd.is_some()
|| params.approval_policy.is_some()
|| params.sandbox_policy.is_some()
|| params.windows_sandbox_level.is_some()
|| params.model.is_some()
|| params.effort.is_some()
|| params.summary.is_some()
@@ -4103,7 +4204,7 @@ impl CodexMessageProcessor {
cwd: params.cwd,
approval_policy: params.approval_policy.map(AskForApproval::to_core),
sandbox_policy: params.sandbox_policy.map(|p| p.to_core()),
windows_sandbox_level: None,
windows_sandbox_level: params.windows_sandbox_level,
model: params.model,
effort: params.effort.map(Some),
summary: params.summary,

View File

@@ -11,14 +11,22 @@ use codex_core::config_loader::LoaderOverrides;
use std::io::ErrorKind;
use std::io::Result as IoResult;
use std::path::PathBuf;
use std::sync::Arc;
use crate::message_processor::MessageProcessor;
use crate::message_processor::MessageProcessorArgs;
use crate::outgoing_message::OutgoingMessage;
use crate::outgoing_message::OutgoingMessageSender;
use codex_app_server_protocol::ClientNotification;
use codex_app_server_protocol::ClientRequest;
use codex_app_server_protocol::ConfigLayerSource;
use codex_app_server_protocol::ConfigWarningNotification;
use codex_app_server_protocol::JSONRPCErrorError;
use codex_app_server_protocol::JSONRPCMessage;
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::Result;
use codex_app_server_protocol::ServerNotification;
use codex_app_server_protocol::ServerRequest;
use codex_app_server_protocol::TextPosition as AppTextPosition;
use codex_app_server_protocol::TextRange as AppTextRange;
use codex_core::ExecPolicyError;
@@ -324,6 +332,7 @@ pub async fn run_main(
cloud_requirements: cloud_requirements.clone(),
feedback: feedback.clone(),
config_warnings,
session_source: codex_protocol::protocol::SessionSource::VSCode,
});
let mut thread_created_rx = processor.thread_created_receiver();
async move {
@@ -395,3 +404,259 @@ pub async fn run_main(
Ok(())
}
#[derive(Debug, Clone)]
pub enum AppServerClientMessage {
Request(ClientRequest),
Response {
id: RequestId,
result: Result,
},
Error {
id: RequestId,
error: JSONRPCErrorError,
},
Notification(ClientNotification),
}
#[derive(Debug, Clone, PartialEq)]
pub struct AppServerEventNotification {
pub method: String,
pub params: Option<serde_json::Value>,
}
#[derive(Debug, Clone)]
pub enum AppServerMessage {
Request(ServerRequest),
Notification(ServerNotification),
EventNotification(AppServerEventNotification),
Response {
id: RequestId,
result: Result,
},
Error {
id: RequestId,
error: JSONRPCErrorError,
},
}
pub struct InMemoryAppServer {
pub incoming: mpsc::Sender<JSONRPCMessage>,
pub outgoing: mpsc::Receiver<JSONRPCMessage>,
}
pub struct InProcessAppServer {
pub incoming: mpsc::Sender<AppServerClientMessage>,
pub outgoing: mpsc::Receiver<AppServerMessage>,
}
pub fn spawn_in_memory(
codex_linux_sandbox_exe: Option<PathBuf>,
config: Arc<Config>,
cli_overrides: Vec<(String, TomlValue)>,
loader_overrides: LoaderOverrides,
feedback: CodexFeedback,
config_warnings: Vec<ConfigWarningNotification>,
session_source: codex_protocol::protocol::SessionSource,
) -> InMemoryAppServer {
let (incoming_tx, mut incoming_rx) = mpsc::channel::<JSONRPCMessage>(CHANNEL_CAPACITY);
let (outgoing_tx, mut outgoing_rx) = mpsc::channel::<OutgoingMessage>(CHANNEL_CAPACITY);
let (client_outgoing_tx, client_outgoing_rx) =
mpsc::channel::<JSONRPCMessage>(CHANNEL_CAPACITY);
let auth_manager = AuthManager::shared(
config.codex_home.clone(),
false,
config.cli_auth_credentials_store_mode,
);
let cloud_requirements =
cloud_requirements_loader(auth_manager, config.chatgpt_base_url.clone());
tokio::spawn({
let outgoing_message_sender = OutgoingMessageSender::new(outgoing_tx);
let mut processor = MessageProcessor::new(MessageProcessorArgs {
outgoing: outgoing_message_sender,
codex_linux_sandbox_exe,
config: Arc::clone(&config),
cli_overrides,
loader_overrides,
cloud_requirements,
feedback,
config_warnings,
session_source,
});
let mut thread_created_rx = processor.thread_created_receiver();
async move {
let mut listen_for_threads = true;
loop {
tokio::select! {
msg = incoming_rx.recv() => {
let Some(msg) = msg else {
break;
};
match msg {
JSONRPCMessage::Request(r) => processor.process_request(r).await,
JSONRPCMessage::Response(r) => processor.process_response(r).await,
JSONRPCMessage::Notification(n) => processor.process_notification(n).await,
JSONRPCMessage::Error(e) => processor.process_error(e).await,
}
}
created = thread_created_rx.recv(), if listen_for_threads => {
match created {
Ok(thread_id) => {
processor.try_attach_thread_listener(thread_id).await;
}
Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => {
warn!("thread_created receiver lagged; skipping resync");
}
Err(tokio::sync::broadcast::error::RecvError::Closed) => {
listen_for_threads = false;
}
}
}
}
}
info!("in-memory processor task exited (channel closed)");
}
});
tokio::spawn(async move {
while let Some(outgoing_message) = outgoing_rx.recv().await {
let Ok(value) = serde_json::to_value(outgoing_message) else {
error!("Failed to convert OutgoingMessage to JSON value");
continue;
};
let jsonrpc = match serde_json::from_value::<JSONRPCMessage>(value) {
Ok(message) => message,
Err(err) => {
error!("Failed to deserialize OutgoingMessage as JSONRPCMessage: {err}");
continue;
}
};
if client_outgoing_tx.send(jsonrpc).await.is_err() {
break;
}
}
info!("in-memory outgoing task exited (channel closed)");
});
InMemoryAppServer {
incoming: incoming_tx,
outgoing: client_outgoing_rx,
}
}
pub fn spawn_in_memory_typed(
codex_linux_sandbox_exe: Option<PathBuf>,
config: Arc<Config>,
cli_overrides: Vec<(String, TomlValue)>,
loader_overrides: LoaderOverrides,
feedback: CodexFeedback,
config_warnings: Vec<ConfigWarningNotification>,
session_source: codex_protocol::protocol::SessionSource,
) -> InProcessAppServer {
let (incoming_tx, mut incoming_rx) = mpsc::channel::<AppServerClientMessage>(CHANNEL_CAPACITY);
let (outgoing_tx, mut outgoing_rx) = mpsc::channel::<OutgoingMessage>(CHANNEL_CAPACITY);
let (client_outgoing_tx, client_outgoing_rx) =
mpsc::channel::<AppServerMessage>(CHANNEL_CAPACITY);
let auth_manager = AuthManager::shared(
config.codex_home.clone(),
false,
config.cli_auth_credentials_store_mode,
);
let cloud_requirements =
cloud_requirements_loader(auth_manager, config.chatgpt_base_url.clone());
tokio::spawn({
let outgoing_message_sender = OutgoingMessageSender::new(outgoing_tx);
let mut processor = MessageProcessor::new(MessageProcessorArgs {
outgoing: outgoing_message_sender,
codex_linux_sandbox_exe,
config: Arc::clone(&config),
cli_overrides,
loader_overrides,
cloud_requirements,
feedback,
config_warnings,
session_source,
});
let mut thread_created_rx = processor.thread_created_receiver();
async move {
let mut listen_for_threads = true;
loop {
tokio::select! {
msg = incoming_rx.recv() => {
let Some(msg) = msg else {
break;
};
match msg {
AppServerClientMessage::Request(request) => {
processor.process_client_request(request).await;
}
AppServerClientMessage::Response { id, result } => {
processor.process_client_response(id, result).await;
}
AppServerClientMessage::Error { id, error } => {
processor.process_client_error(id, error);
}
AppServerClientMessage::Notification(notification) => {
processor.process_client_notification(notification).await;
}
}
}
created = thread_created_rx.recv(), if listen_for_threads => {
match created {
Ok(thread_id) => {
processor.try_attach_thread_listener(thread_id).await;
}
Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => {
warn!("thread_created receiver lagged; skipping resync");
}
Err(tokio::sync::broadcast::error::RecvError::Closed) => {
listen_for_threads = false;
}
}
}
}
}
info!("in-process processor task exited (channel closed)");
}
});
tokio::spawn(async move {
while let Some(outgoing_message) = outgoing_rx.recv().await {
let mapped = match outgoing_message {
OutgoingMessage::Request(request) => AppServerMessage::Request(request),
OutgoingMessage::AppServerNotification(notification) => {
AppServerMessage::Notification(notification)
}
OutgoingMessage::Notification(notification) => {
AppServerMessage::EventNotification(AppServerEventNotification {
method: notification.method,
params: notification.params,
})
}
OutgoingMessage::Response(response) => AppServerMessage::Response {
id: response.id,
result: response.result,
},
OutgoingMessage::Error(error) => AppServerMessage::Error {
id: error.id,
error: error.error,
},
};
if client_outgoing_tx.send(mapped).await.is_err() {
break;
}
}
info!("in-process outgoing task exited (channel closed)");
});
InProcessAppServer {
incoming: incoming_tx,
outgoing: client_outgoing_rx,
}
}

View File

@@ -11,6 +11,7 @@ use codex_app_server_protocol::ChatgptAuthTokensRefreshParams;
use codex_app_server_protocol::ChatgptAuthTokensRefreshReason;
use codex_app_server_protocol::ChatgptAuthTokensRefreshResponse;
use codex_app_server_protocol::ClientInfo;
use codex_app_server_protocol::ClientNotification;
use codex_app_server_protocol::ClientRequest;
use codex_app_server_protocol::ConfigBatchWriteParams;
use codex_app_server_protocol::ConfigReadParams;
@@ -23,6 +24,7 @@ use codex_app_server_protocol::JSONRPCNotification;
use codex_app_server_protocol::JSONRPCRequest;
use codex_app_server_protocol::JSONRPCResponse;
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::Result as JsonResult;
use codex_app_server_protocol::ServerNotification;
use codex_app_server_protocol::ServerRequestPayload;
use codex_core::AuthManager;
@@ -119,6 +121,7 @@ pub(crate) struct MessageProcessorArgs {
pub(crate) cloud_requirements: CloudRequirementsLoader,
pub(crate) feedback: CodexFeedback,
pub(crate) config_warnings: Vec<ConfigWarningNotification>,
pub(crate) session_source: SessionSource,
}
impl MessageProcessor {
@@ -134,7 +137,9 @@ impl MessageProcessor {
cloud_requirements,
feedback,
config_warnings,
session_source,
} = args;
let outgoing = Arc::new(outgoing);
let auth_manager = AuthManager::shared(
config.codex_home.clone(),
@@ -148,7 +153,7 @@ impl MessageProcessor {
let thread_manager = Arc::new(ThreadManager::new(
config.codex_home.clone(),
auth_manager.clone(),
SessionSource::VSCode,
session_source,
));
let codex_message_processor = CodexMessageProcessor::new(CodexMessageProcessorArgs {
auth_manager,
@@ -205,24 +210,33 @@ impl MessageProcessor {
}
};
match codex_request {
self.process_client_request(codex_request).await;
}
pub(crate) async fn process_client_request(&mut self, codex_request: ClientRequest) {
let request_id = codex_request.request_id().clone();
match &codex_request {
// Handle Initialize internally so CodexMessageProcessor does not have to concern
// itself with the `initialized` bool.
ClientRequest::Initialize { request_id, params } => {
ClientRequest::Initialize {
request_id: _,
params,
} => {
if self.initialized {
let error = JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: "Already initialized".to_string(),
data: None,
};
self.outgoing.send_error(request_id, error).await;
self.outgoing.send_error(request_id.clone(), error).await;
return;
} else {
let client_info = params.client_info.clone();
let ClientInfo {
name,
title: _title,
version,
} = params.client_info;
} = client_info;
if let Err(error) = set_default_originator(name.clone()) {
match error {
SetOriginatorError::InvalidHeaderValue => {
@@ -233,7 +247,7 @@ impl MessageProcessor {
),
data: None,
};
self.outgoing.send_error(request_id, error).await;
self.outgoing.send_error(request_id.clone(), error).await;
return;
}
SetOriginatorError::AlreadyInitialized => {
@@ -252,7 +266,9 @@ impl MessageProcessor {
let user_agent = get_codex_user_agent();
let response = InitializeResponse { user_agent };
self.outgoing.send_response(request_id, response).await;
self.outgoing
.send_response(request_id.clone(), response)
.await;
self.initialized = true;
if !self.config_warnings.is_empty() {
@@ -275,7 +291,7 @@ impl MessageProcessor {
message: "Not initialized".to_string(),
data: None,
};
self.outgoing.send_error(request_id, error).await;
self.outgoing.send_error(request_id.clone(), error).await;
return;
}
}
@@ -309,6 +325,10 @@ impl MessageProcessor {
tracing::info!("<- notification: {:?}", notification);
}
pub(crate) async fn process_client_notification(&self, notification: ClientNotification) {
tracing::info!("<- notification: {:?}", notification);
}
pub(crate) fn thread_created_receiver(&self) -> broadcast::Receiver<ThreadId> {
self.codex_message_processor.thread_created_receiver()
}
@@ -326,7 +346,11 @@ impl MessageProcessor {
pub(crate) async fn process_response(&mut self, response: JSONRPCResponse) {
tracing::info!("<- response: {:?}", response);
let JSONRPCResponse { id, result, .. } = response;
self.outgoing.notify_client_response(id, result).await
self.process_client_response(id, result).await;
}
pub(crate) async fn process_client_response(&mut self, id: RequestId, result: JsonResult) {
self.outgoing.notify_client_response(id, result).await;
}
/// Handle an error object received from the peer.
@@ -335,6 +359,10 @@ impl MessageProcessor {
self.outgoing.notify_client_error(err.id, err.error).await;
}
pub(crate) fn process_client_error(&mut self, id: RequestId, error: JSONRPCErrorError) {
tracing::error!("<- error: {:?}", JSONRPCError { id, error });
}
async fn handle_config_read(&self, request_id: RequestId, params: ConfigReadParams) {
match self.config_api.read(params).await {
Ok(response) => self.outgoing.send_response(request_id, response).await,

View File

@@ -656,6 +656,7 @@ async fn turn_start_exec_approval_toggle_v2() -> Result<()> {
}],
approval_policy: Some(codex_app_server_protocol::AskForApproval::Never),
sandbox_policy: Some(codex_app_server_protocol::SandboxPolicy::DangerFullAccess),
windows_sandbox_level: None,
model: Some("mock-model".to_string()),
effort: Some(ReasoningEffort::Medium),
summary: Some(ReasoningSummary::Auto),
@@ -898,6 +899,7 @@ async fn turn_start_updates_sandbox_and_cwd_between_turns_v2() -> Result<()> {
exclude_tmpdir_env_var: false,
exclude_slash_tmp: false,
}),
windows_sandbox_level: None,
model: Some("mock-model".to_string()),
effort: Some(ReasoningEffort::Medium),
summary: Some(ReasoningSummary::Auto),
@@ -929,6 +931,7 @@ async fn turn_start_updates_sandbox_and_cwd_between_turns_v2() -> Result<()> {
cwd: Some(second_cwd.clone()),
approval_policy: Some(codex_app_server_protocol::AskForApproval::Never),
sandbox_policy: Some(codex_app_server_protocol::SandboxPolicy::DangerFullAccess),
windows_sandbox_level: None,
model: Some("mock-model".to_string()),
effort: Some(ReasoningEffort::Medium),
summary: Some(ReasoningSummary::Auto),

View File

@@ -44,7 +44,7 @@ pub use mcp_connection_manager::MCP_SANDBOX_STATE_METHOD;
pub use mcp_connection_manager::SandboxState;
mod mcp_tool_call;
mod mentions;
mod message_history;
pub mod message_history;
mod model_provider_info;
pub mod parse_command;
pub mod path_utils;
@@ -98,6 +98,7 @@ pub mod spawn;
pub mod state_db;
pub mod terminal;
mod tools;
pub use tools::format_exec_output_str;
pub mod turn_diff_tracker;
pub use rollout::ARCHIVED_SESSIONS_SUBDIR;
pub use rollout::INTERACTIVE_SESSION_SOURCES;
@@ -119,6 +120,7 @@ pub use rollout::list::read_head_for_summary;
pub use rollout::list::read_session_meta_line;
pub use rollout::rollout_date_parts;
pub use transport_manager::TransportManager;
pub use truncate::TruncationPolicy;
mod function_tool;
mod state;
mod tasks;

View File

@@ -69,11 +69,7 @@ fn history_filepath(config: &Config) -> PathBuf {
/// Append a `text` entry associated with `conversation_id` to the history file. Uses
/// advisory file locking to ensure that concurrent writes do not interleave,
/// which entails a small amount of blocking I/O internally.
pub(crate) async fn append_entry(
text: &str,
conversation_id: &ThreadId,
config: &Config,
) -> Result<()> {
pub async fn append_entry(text: &str, conversation_id: &ThreadId, config: &Config) -> Result<()> {
match config.history.persistence {
HistoryPersistence::SaveAll => {
// Save everything: proceed.
@@ -245,7 +241,7 @@ fn trim_target_bytes(max_bytes: u64, newest_entry_len: u64) -> u64 {
/// Asynchronously fetch the history file's *identifier* (inode on Unix) and
/// the current number of entries by counting newline characters.
pub(crate) async fn history_metadata(config: &Config) -> (u64, usize) {
pub async fn history_metadata(config: &Config) -> (u64, usize) {
let path = history_filepath(config);
history_metadata_for_file(&path).await
}
@@ -258,7 +254,7 @@ pub(crate) async fn history_metadata(config: &Config) -> (u64, usize) {
///
/// Note this function is not async because it uses a sync advisory file
/// locking API.
pub(crate) fn lookup(log_id: u64, offset: usize, config: &Config) -> Option<HistoryEntry> {
pub fn lookup(log_id: u64, offset: usize, config: &Config) -> Option<HistoryEntry> {
let path = history_filepath(config);
lookup_history_entry(&path, log_id, offset)
}

View File

@@ -27,6 +27,7 @@ base64 = { workspace = true }
chrono = { workspace = true, features = ["serde"] }
clap = { workspace = true, features = ["derive"] }
codex-ansi-escape = { workspace = true }
codex-app-server = { workspace = true }
codex-app-server-protocol = { workspace = true }
codex-arg0 = { workspace = true }
codex-backend-client = { workspace = true }

File diff suppressed because it is too large Load Diff

View File

@@ -28,6 +28,7 @@ use std::path::PathBuf;
use std::sync::Arc;
use crate::app::App;
use crate::app_event::AppServerAction;
use crate::history_cell::SessionInfoCell;
use crate::history_cell::UserHistoryCell;
use crate::pager_overlay::Overlay;
@@ -36,7 +37,6 @@ use crate::tui::TuiEvent;
use codex_core::protocol::CodexErrorInfo;
use codex_core::protocol::ErrorEvent;
use codex_core::protocol::EventMsg;
use codex_core::protocol::Op;
use codex_protocol::ThreadId;
use codex_protocol::user_input::TextElement;
use color_eyre::eyre::Result;
@@ -210,7 +210,8 @@ impl App {
selection,
thread_id: self.chat_widget.thread_id(),
});
self.chat_widget.submit_op(Op::ThreadRollback { num_turns });
self.chat_widget
.submit_action(AppServerAction::ThreadRollback { num_turns });
if !prefill.is_empty() || !text_elements.is_empty() || !local_image_paths.is_empty() {
self.chat_widget
.set_composer_text(prefill, text_elements, local_image_paths);

View File

@@ -14,19 +14,27 @@ use codex_chatgpt::connectors::AppInfo;
use codex_common::approval_presets::ApprovalPreset;
use codex_core::protocol::Event;
use codex_core::protocol::RateLimitSnapshot;
use codex_core::protocol::ReviewDecision;
use codex_file_search::FileMatch;
use codex_protocol::ThreadId;
use codex_protocol::openai_models::ModelPreset;
use codex_protocol::request_user_input::RequestUserInputResponse;
use codex_protocol::user_input::UserInput;
use crate::bottom_pane::ApprovalRequest;
use crate::history_cell::HistoryCell;
use codex_core::features::Feature;
use codex_core::protocol::AskForApproval;
use codex_core::protocol::ElicitationAction;
use codex_core::protocol::SandboxPolicy;
use codex_protocol::config_types::CollaborationMode;
use codex_protocol::config_types::CollaborationModeMask;
use codex_protocol::config_types::Personality;
use codex_protocol::config_types::ReasoningSummary;
use codex_protocol::config_types::WindowsSandboxLevel;
use codex_protocol::openai_models::ReasoningEffort;
use serde::Serialize;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[cfg_attr(not(target_os = "windows"), allow(dead_code))]
@@ -46,10 +54,83 @@ pub(crate) struct ConnectorsSnapshot {
pub(crate) connectors: Vec<AppInfo>,
}
#[derive(Debug, Clone, Serialize, PartialEq)]
pub(crate) struct TurnStartRequest {
pub(crate) items: Vec<UserInput>,
pub(crate) cwd: PathBuf,
pub(crate) approval_policy: AskForApproval,
pub(crate) sandbox_policy: SandboxPolicy,
pub(crate) windows_sandbox_level: WindowsSandboxLevel,
pub(crate) model: String,
pub(crate) effort: Option<ReasoningEffort>,
pub(crate) summary: Option<ReasoningSummary>,
pub(crate) collaboration_mode: Option<CollaborationMode>,
pub(crate) personality: Option<Personality>,
pub(crate) output_schema: Option<serde_json::Value>,
}
#[derive(Debug, Clone, Serialize, PartialEq)]
pub(crate) enum AppServerAction {
TurnStart(TurnStartRequest),
ThreadSetName {
name: String,
},
ReviewStart {
review_request: codex_core::protocol::ReviewRequest,
},
Interrupt,
Shutdown,
Compact,
ThreadRollback {
num_turns: u32,
},
ListSkills {
cwds: Vec<PathBuf>,
force_reload: bool,
},
#[allow(dead_code)]
RefreshMcpServers {
config: codex_protocol::protocol::McpServerRefreshConfig,
},
ListMcpTools,
ListCustomPrompts,
RunUserShellCommand {
command: String,
},
AddToHistory {
text: String,
},
GetHistoryEntry {
log_id: u64,
offset: usize,
},
ExecApproval {
call_id: String,
decision: ReviewDecision,
},
PatchApproval {
call_id: String,
decision: ReviewDecision,
},
UserInputAnswer {
call_id: String,
response: RequestUserInputResponse,
},
ResolveElicitation {
server_name: String,
request_id: mcp_types::RequestId,
decision: ElicitationAction,
},
}
#[allow(clippy::large_enum_variant)]
#[derive(Debug)]
pub(crate) enum AppEvent {
CodexEvent(Event),
CodexThreadEvent {
thread_id: ThreadId,
event: Event,
},
/// Open the agent picker for switching active threads.
OpenAgentPicker,
/// Switch the active thread to the selected agent.
@@ -73,11 +154,12 @@ pub(crate) enum AppEvent {
Exit(ExitMode),
/// Request to exit the application due to a fatal error.
#[allow(dead_code)]
FatalExitRequest(String),
/// Forward an `Op` to the Agent. Using an `AppEvent` for this avoids
/// Forward a command to the app server. Using an `AppEvent` for this avoids
/// bubbling channels through layers of widgets.
CodexOp(codex_core::protocol::Op),
AppServerAction(AppServerAction),
/// Kick off an asynchronous file search for the given query (text after
/// the `@`). Previous searches may be cancelled by the app layer so there
@@ -305,8 +387,9 @@ pub(crate) enum ExitMode {
ShutdownFirst,
/// Exit the UI loop immediately without waiting for shutdown.
///
/// This skips `Op::Shutdown`, so any in-flight work may be dropped and
/// cleanup that normally runs before `ShutdownComplete` can be missed.
/// This skips the app-server shutdown request, so any in-flight work may be
/// dropped and cleanup that normally runs before `ShutdownComplete` can be
/// missed.
Immediate,
}

View File

@@ -17,8 +17,10 @@ impl AppEventSender {
/// error and log it.
pub(crate) fn send(&self, event: AppEvent) {
// Record inbound events for high-fidelity session replay.
// Avoid double-logging Ops; those are logged at the point of submission.
if !matches!(event, AppEvent::CodexOp(_)) {
// Avoid double-logging app-server actions; those are logged at the point of submission.
if let AppEvent::AppServerAction(action) = &event {
session_log::log_outbound_app_action(action);
} else {
session_log::log_inbound_app_event(&event);
}
if let Err(e) = self.app_event_tx.send(event) {

View File

@@ -0,0 +1,613 @@
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::atomic::AtomicI64;
use std::sync::atomic::Ordering;
use codex_app_server::AppServerClientMessage;
use codex_app_server::AppServerEventNotification;
use codex_app_server::AppServerMessage;
use codex_app_server::spawn_in_memory_typed;
use codex_app_server_protocol::ClientNotification;
use codex_app_server_protocol::ClientRequest;
use codex_app_server_protocol::CommandExecutionApprovalDecision;
use codex_app_server_protocol::CommandExecutionRequestApprovalResponse;
use codex_app_server_protocol::DynamicToolCallResponse;
use codex_app_server_protocol::ElicitationAction as V2ElicitationAction;
use codex_app_server_protocol::FileChangeApprovalDecision;
use codex_app_server_protocol::FileChangeRequestApprovalResponse;
use codex_app_server_protocol::JSONRPCErrorError;
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::Result as JsonResult;
use codex_app_server_protocol::ServerRequest;
use codex_app_server_protocol::ToolRequestUserInputParams;
use codex_app_server_protocol::ToolRequestUserInputResponse;
use codex_core::config::Config;
use codex_core::config_loader::LoaderOverrides;
use codex_core::protocol::Event;
use codex_core::protocol::EventMsg;
use codex_core::protocol::ReviewDecision;
use codex_feedback::CodexFeedback;
use codex_protocol::ThreadId;
use codex_protocol::request_user_input::RequestUserInputResponse as CoreRequestUserInputResponse;
use mcp_types::RequestId as McpRequestId;
use tokio::sync::Mutex;
use tokio::sync::mpsc;
use tokio::sync::oneshot;
use crate::app_event::AppEvent;
use crate::app_event_sender::AppEventSender;
type PendingResponseMap =
HashMap<RequestId, oneshot::Sender<std::result::Result<JsonResult, JSONRPCErrorError>>>;
pub(crate) struct PendingResponse {
receiver: oneshot::Receiver<std::result::Result<JsonResult, JSONRPCErrorError>>,
}
impl PendingResponse {
pub async fn into_typed<T: serde::de::DeserializeOwned>(
self,
) -> std::result::Result<T, JSONRPCErrorError> {
let value = self
.receiver
.await
.map_err(|_| internal_error("response channel closed"))??;
serde_json::from_value(value).map_err(|err| internal_error(err.to_string()))
}
pub async fn discard(self) -> std::result::Result<(), JSONRPCErrorError> {
let _ = self
.receiver
.await
.map_err(|_| internal_error("response channel closed"))??;
Ok(())
}
}
#[derive(Default)]
struct PendingServerRequests {
exec: HashMap<String, RequestId>,
patch: HashMap<String, RequestId>,
user_input: HashMap<String, RequestId>,
}
#[derive(Default)]
struct QueuedResponses {
exec: HashMap<String, ReviewDecision>,
patch: HashMap<String, ReviewDecision>,
user_input: HashMap<String, CoreRequestUserInputResponse>,
}
#[derive(Default)]
struct ElicitationState {
thread_by_request: HashMap<McpRequestId, ThreadId>,
queued: HashMap<McpRequestId, (String, codex_core::protocol::ElicitationAction)>,
}
#[derive(Default)]
struct TurnState {
current_turn_by_thread: HashMap<ThreadId, String>,
}
pub(crate) struct AppServerClient {
sender: mpsc::Sender<AppServerClientMessage>,
pending: Arc<Mutex<PendingResponseMap>>,
pending_server_requests: Arc<Mutex<PendingServerRequests>>,
queued_responses: Arc<Mutex<QueuedResponses>>,
elicitation_state: Arc<Mutex<ElicitationState>>,
turn_state: Arc<Mutex<TurnState>>,
next_request_id: Arc<AtomicI64>,
}
impl AppServerClient {
pub(crate) fn spawn(
app_event_tx: AppEventSender,
config: Arc<Config>,
cli_overrides: Vec<(String, toml::Value)>,
loader_overrides: LoaderOverrides,
feedback: CodexFeedback,
config_warnings: Vec<codex_app_server_protocol::ConfigWarningNotification>,
session_source: codex_protocol::protocol::SessionSource,
) -> Self {
let in_process = spawn_in_memory_typed(
config.codex_linux_sandbox_exe.clone(),
config,
cli_overrides,
loader_overrides,
feedback,
config_warnings,
session_source,
);
let client = Self {
sender: in_process.incoming,
pending: Arc::new(Mutex::new(HashMap::new())),
pending_server_requests: Arc::new(Mutex::new(PendingServerRequests::default())),
queued_responses: Arc::new(Mutex::new(QueuedResponses::default())),
elicitation_state: Arc::new(Mutex::new(ElicitationState::default())),
turn_state: Arc::new(Mutex::new(TurnState::default())),
next_request_id: Arc::new(AtomicI64::new(1)),
};
client.spawn_outgoing_handler(app_event_tx, in_process.outgoing);
client
}
pub(crate) async fn request(
&self,
build: impl FnOnce(RequestId) -> ClientRequest,
) -> std::result::Result<PendingResponse, JSONRPCErrorError> {
let request_id = RequestId::Integer(self.next_request_id.fetch_add(1, Ordering::Relaxed));
let (tx, rx) = oneshot::channel();
self.pending.lock().await.insert(request_id.clone(), tx);
if self
.sender
.send(AppServerClientMessage::Request(build(request_id.clone())))
.await
.is_err()
{
self.pending.lock().await.remove(&request_id);
return Err(internal_error("app-server request channel closed"));
}
Ok(PendingResponse { receiver: rx })
}
pub(crate) async fn send_notification(
&self,
notification: ClientNotification,
) -> std::result::Result<(), JSONRPCErrorError> {
self.sender
.send(AppServerClientMessage::Notification(notification))
.await
.map_err(|_| internal_error("app-server notification channel closed"))
}
pub(crate) async fn respond_exec_approval(
&self,
call_id: String,
decision: ReviewDecision,
) -> std::result::Result<(), JSONRPCErrorError> {
let request_id = {
let mut pending = self.pending_server_requests.lock().await;
pending.exec.remove(&call_id)
};
let Some(request_id) = request_id else {
self.queued_responses
.lock()
.await
.exec
.insert(call_id, decision);
return Ok(());
};
let response = CommandExecutionRequestApprovalResponse {
decision: map_exec_decision(decision),
};
self.send_response(request_id, response).await
}
pub(crate) async fn respond_patch_approval(
&self,
call_id: String,
decision: ReviewDecision,
) -> std::result::Result<(), JSONRPCErrorError> {
let request_id = {
let mut pending = self.pending_server_requests.lock().await;
pending.patch.remove(&call_id)
};
let Some(request_id) = request_id else {
self.queued_responses
.lock()
.await
.patch
.insert(call_id, decision);
return Ok(());
};
let response = FileChangeRequestApprovalResponse {
decision: map_patch_decision(decision),
};
self.send_response(request_id, response).await
}
pub(crate) async fn respond_user_input(
&self,
call_id: String,
response: CoreRequestUserInputResponse,
) -> std::result::Result<(), JSONRPCErrorError> {
let request_id = {
let mut pending = self.pending_server_requests.lock().await;
pending.user_input.remove(&call_id)
};
let Some(request_id) = request_id else {
self.queued_responses
.lock()
.await
.user_input
.insert(call_id, response);
return Ok(());
};
let response = ToolRequestUserInputResponse {
answers: response
.answers
.into_iter()
.map(|(id, answer)| {
(
id,
codex_app_server_protocol::ToolRequestUserInputAnswer {
answers: answer.answers,
},
)
})
.collect(),
};
self.send_response(request_id, response).await
}
pub(crate) async fn respond_elicitation(
&self,
server_name: String,
request_id: McpRequestId,
decision: codex_core::protocol::ElicitationAction,
) -> std::result::Result<(), JSONRPCErrorError> {
let thread_id = {
let mut state = self.elicitation_state.lock().await;
state.thread_by_request.remove(&request_id)
};
let Some(thread_id) = thread_id else {
self.elicitation_state
.lock()
.await
.queued
.insert(request_id, (server_name, decision));
return Ok(());
};
let params = codex_app_server_protocol::McpElicitationResolveParams {
thread_id: thread_id.to_string(),
server_name,
request_id: request_id.clone(),
decision: V2ElicitationAction::from(decision),
};
self.request(|id| ClientRequest::McpElicitationResolve {
request_id: id,
params,
})
.await?
.discard()
.await
}
pub(crate) async fn interrupt_current_turn(
&self,
thread_id: ThreadId,
) -> std::result::Result<(), JSONRPCErrorError> {
let turn_id = {
let state = self.turn_state.lock().await;
state.current_turn_by_thread.get(&thread_id).cloned()
};
let Some(turn_id) = turn_id else {
return Ok(());
};
let params = codex_app_server_protocol::TurnInterruptParams {
thread_id: thread_id.to_string(),
turn_id,
};
self.request(|id| ClientRequest::TurnInterrupt {
request_id: id,
params,
})
.await?
.discard()
.await
}
async fn send_response<T: serde::Serialize>(
&self,
request_id: RequestId,
response: T,
) -> std::result::Result<(), JSONRPCErrorError> {
let result =
serde_json::to_value(response).map_err(|err| internal_error(err.to_string()))?;
self.sender
.send(AppServerClientMessage::Response {
id: request_id,
result,
})
.await
.map_err(|_| internal_error("app-server response channel closed"))
}
fn spawn_outgoing_handler(
&self,
app_event_tx: AppEventSender,
mut outgoing: mpsc::Receiver<AppServerMessage>,
) {
let pending = Arc::clone(&self.pending);
let pending_server_requests = Arc::clone(&self.pending_server_requests);
let queued_responses = Arc::clone(&self.queued_responses);
let elicitation_state = Arc::clone(&self.elicitation_state);
let turn_state = Arc::clone(&self.turn_state);
let sender = self.sender.clone();
let next_request_id = Arc::clone(&self.next_request_id);
tokio::spawn(async move {
while let Some(message) = outgoing.recv().await {
match message {
AppServerMessage::EventNotification(notification) => {
handle_event_notification(
notification,
&app_event_tx,
&turn_state,
&elicitation_state,
&sender,
next_request_id.as_ref(),
)
.await;
}
AppServerMessage::Request(request) => {
handle_server_request(
request,
&pending_server_requests,
&queued_responses,
&sender,
)
.await;
}
AppServerMessage::Response { id, result } => {
if let Some(tx) = pending.lock().await.remove(&id) {
let _ = tx.send(Ok(result));
}
}
AppServerMessage::Error { id, error } => {
if let Some(tx) = pending.lock().await.remove(&id) {
let _ = tx.send(Err(error));
}
}
AppServerMessage::Notification(_notification) => {
// v2 notifications are currently surfaced through codex/event
// for the TUI, so ignore explicit server notifications here.
}
}
}
});
}
}
async fn handle_event_notification(
notification: AppServerEventNotification,
app_event_tx: &AppEventSender,
turn_state: &Mutex<TurnState>,
elicitation_state: &Mutex<ElicitationState>,
sender: &mpsc::Sender<AppServerClientMessage>,
next_request_id: &AtomicI64,
) {
if !notification.method.starts_with("codex/event/") {
return;
}
let Some(params) = notification.params else {
return;
};
let serde_json::Value::Object(mut map) = params else {
return;
};
let Some(conversation_id) = map.remove("conversationId") else {
return;
};
let thread_id = match conversation_id.as_str() {
Some(value) => match ThreadId::from_string(value) {
Ok(thread_id) => thread_id,
Err(err) => {
tracing::warn!("invalid thread id in event: {err}");
return;
}
},
None => return,
};
let event_value = serde_json::Value::Object(map);
let event: Event = match serde_json::from_value(event_value) {
Ok(event) => event,
Err(err) => {
tracing::warn!("failed to parse codex event: {err}");
return;
}
};
match &event.msg {
EventMsg::TurnStarted(_) => {
if !event.id.is_empty() {
turn_state
.lock()
.await
.current_turn_by_thread
.insert(thread_id, event.id.clone());
}
}
EventMsg::TurnComplete(_) | EventMsg::TurnAborted(_) => {
if !event.id.is_empty() {
let mut state = turn_state.lock().await;
if state
.current_turn_by_thread
.get(&thread_id)
.is_some_and(|id| id == &event.id)
{
state.current_turn_by_thread.remove(&thread_id);
}
}
}
EventMsg::ElicitationRequest(ev) => {
let mut state = elicitation_state.lock().await;
state.thread_by_request.insert(ev.id.clone(), thread_id);
if let Some((server_name, decision)) = state.queued.remove(&ev.id) {
let params = codex_app_server_protocol::McpElicitationResolveParams {
thread_id: thread_id.to_string(),
server_name,
request_id: ev.id.clone(),
decision: V2ElicitationAction::from(decision),
};
let request_id =
RequestId::Integer(next_request_id.fetch_add(1, Ordering::Relaxed));
let request = ClientRequest::McpElicitationResolve { request_id, params };
let _ = sender.send(AppServerClientMessage::Request(request)).await;
}
}
_ => {}
}
app_event_tx.send(AppEvent::CodexThreadEvent { thread_id, event });
}
async fn handle_server_request(
request: ServerRequest,
pending: &Mutex<PendingServerRequests>,
queued: &Mutex<QueuedResponses>,
sender: &mpsc::Sender<AppServerClientMessage>,
) {
match request {
ServerRequest::CommandExecutionRequestApproval { request_id, params } => {
record_server_request(
request_id,
params.item_id,
&mut pending.lock().await.exec,
&mut queued.lock().await.exec,
|decision| CommandExecutionRequestApprovalResponse {
decision: map_exec_decision(decision),
},
sender,
)
.await;
}
ServerRequest::FileChangeRequestApproval { request_id, params } => {
record_server_request(
request_id,
params.item_id,
&mut pending.lock().await.patch,
&mut queued.lock().await.patch,
|decision| FileChangeRequestApprovalResponse {
decision: map_patch_decision(decision),
},
sender,
)
.await;
}
ServerRequest::ToolRequestUserInput { request_id, params } => {
record_user_input_request(request_id, params, pending, queued, sender).await;
}
ServerRequest::DynamicToolCall { request_id, params } => {
let response = DynamicToolCallResponse {
output: "Dynamic tools are not supported in the TUI yet.".to_string(),
success: false,
};
let _ = send_response(sender, request_id, response).await;
tracing::warn!(
"dynamic tool call {} for tool {} ignored",
params.call_id,
params.tool
);
}
_ => {}
}
}
async fn record_user_input_request(
request_id: RequestId,
params: ToolRequestUserInputParams,
pending: &Mutex<PendingServerRequests>,
queued: &Mutex<QueuedResponses>,
sender: &mpsc::Sender<AppServerClientMessage>,
) {
let call_id = params.item_id;
let mut pending_guard = pending.lock().await;
if let Some(response) = queued.lock().await.user_input.remove(&call_id) {
drop(pending_guard);
let response = ToolRequestUserInputResponse {
answers: response
.answers
.into_iter()
.map(|(id, answer)| {
(
id,
codex_app_server_protocol::ToolRequestUserInputAnswer {
answers: answer.answers,
},
)
})
.collect(),
};
let _ = send_response(sender, request_id, response).await;
} else {
pending_guard.user_input.insert(call_id, request_id);
}
}
async fn record_server_request<T: serde::Serialize>(
request_id: RequestId,
call_id: String,
pending: &mut HashMap<String, RequestId>,
queued: &mut HashMap<String, ReviewDecision>,
build_response: impl FnOnce(ReviewDecision) -> T,
sender: &mpsc::Sender<AppServerClientMessage>,
) {
if let Some(decision) = queued.remove(&call_id) {
let response = build_response(decision);
let _ = send_response(sender, request_id, response).await;
} else {
pending.insert(call_id, request_id);
}
}
async fn send_response<T: serde::Serialize>(
sender: &mpsc::Sender<AppServerClientMessage>,
request_id: RequestId,
response: T,
) -> std::result::Result<(), JSONRPCErrorError> {
let result = serde_json::to_value(response).map_err(|err| internal_error(err.to_string()))?;
sender
.send(AppServerClientMessage::Response {
id: request_id,
result,
})
.await
.map_err(|_| internal_error("app-server response channel closed"))
}
fn internal_error(message: impl Into<String>) -> JSONRPCErrorError {
JSONRPCErrorError {
code: -32603,
message: message.into(),
data: None,
}
}
fn map_exec_decision(decision: ReviewDecision) -> CommandExecutionApprovalDecision {
match decision {
ReviewDecision::Approved => CommandExecutionApprovalDecision::Accept,
ReviewDecision::ApprovedForSession => CommandExecutionApprovalDecision::AcceptForSession,
ReviewDecision::ApprovedExecpolicyAmendment {
proposed_execpolicy_amendment,
} => CommandExecutionApprovalDecision::AcceptWithExecpolicyAmendment {
execpolicy_amendment: proposed_execpolicy_amendment.into(),
},
ReviewDecision::Denied => CommandExecutionApprovalDecision::Decline,
ReviewDecision::Abort => CommandExecutionApprovalDecision::Cancel,
}
}
fn map_patch_decision(decision: ReviewDecision) -> FileChangeApprovalDecision {
match decision {
ReviewDecision::Approved => FileChangeApprovalDecision::Accept,
ReviewDecision::ApprovedForSession => FileChangeApprovalDecision::AcceptForSession,
ReviewDecision::ApprovedExecpolicyAmendment { .. } => FileChangeApprovalDecision::Accept,
ReviewDecision::Denied => FileChangeApprovalDecision::Decline,
ReviewDecision::Abort => FileChangeApprovalDecision::Cancel,
}
}

View File

@@ -2,6 +2,7 @@ use std::collections::HashMap;
use std::path::PathBuf;
use crate::app_event::AppEvent;
use crate::app_event::AppServerAction;
use crate::app_event_sender::AppEventSender;
use crate::bottom_pane::BottomPaneView;
use crate::bottom_pane::CancellationEvent;
@@ -21,7 +22,6 @@ use codex_core::features::Features;
use codex_core::protocol::ElicitationAction;
use codex_core::protocol::ExecPolicyAmendment;
use codex_core::protocol::FileChange;
use codex_core::protocol::Op;
use codex_core::protocol::ReviewDecision;
use crossterm::event::KeyCode;
use crossterm::event::KeyEvent;
@@ -194,17 +194,19 @@ impl ApprovalOverlay {
fn handle_exec_decision(&self, id: &str, command: &[String], decision: ReviewDecision) {
let cell = history_cell::new_approval_decision_cell(command.to_vec(), decision.clone());
self.app_event_tx.send(AppEvent::InsertHistoryCell(cell));
self.app_event_tx.send(AppEvent::CodexOp(Op::ExecApproval {
id: id.to_string(),
decision,
}));
self.app_event_tx
.send(AppEvent::AppServerAction(AppServerAction::ExecApproval {
call_id: id.to_string(),
decision,
}));
}
fn handle_patch_decision(&self, id: &str, decision: ReviewDecision) {
self.app_event_tx.send(AppEvent::CodexOp(Op::PatchApproval {
id: id.to_string(),
decision,
}));
self.app_event_tx
.send(AppEvent::AppServerAction(AppServerAction::PatchApproval {
call_id: id.to_string(),
decision,
}));
}
fn handle_elicitation_decision(
@@ -213,12 +215,13 @@ impl ApprovalOverlay {
request_id: &RequestId,
decision: ElicitationAction,
) {
self.app_event_tx
.send(AppEvent::CodexOp(Op::ResolveElicitation {
self.app_event_tx.send(AppEvent::AppServerAction(
AppServerAction::ResolveElicitation {
server_name: server_name.to_string(),
request_id: request_id.clone(),
decision,
}));
},
));
}
fn advance_queue(&mut self) {
@@ -540,6 +543,7 @@ fn elicitation_options() -> Vec<ApprovalOption> {
mod tests {
use super::*;
use crate::app_event::AppEvent;
use crate::app_event::AppServerAction;
use pretty_assertions::assert_eq;
use tokio::sync::mpsc::unbounded_channel;
@@ -570,15 +574,15 @@ mod tests {
let mut view = ApprovalOverlay::new(make_exec_request(), tx, Features::with_defaults());
assert!(!view.is_complete());
view.handle_key_event(KeyEvent::new(KeyCode::Char('y'), KeyModifiers::NONE));
// We expect at least one CodexOp message in the queue.
let mut saw_op = false;
// We expect at least one app-server action message in the queue.
let mut saw_action = false;
while let Ok(ev) = rx.try_recv() {
if matches!(ev, AppEvent::CodexOp(_)) {
saw_op = true;
if matches!(ev, AppEvent::AppServerAction(_)) {
saw_action = true;
break;
}
}
assert!(saw_op, "expected approval decision to emit an op");
assert!(saw_action, "expected approval decision to emit an action");
}
#[test]
@@ -598,9 +602,9 @@ mod tests {
Features::with_defaults(),
);
view.handle_key_event(KeyEvent::new(KeyCode::Char('p'), KeyModifiers::NONE));
let mut saw_op = false;
let mut saw_action = false;
while let Ok(ev) = rx.try_recv() {
if let AppEvent::CodexOp(Op::ExecApproval { decision, .. }) = ev {
if let AppEvent::AppServerAction(AppServerAction::ExecApproval { decision, .. }) = ev {
assert_eq!(
decision,
ReviewDecision::ApprovedExecpolicyAmendment {
@@ -609,13 +613,13 @@ mod tests {
])
}
);
saw_op = true;
saw_action = true;
break;
}
}
assert!(
saw_op,
"expected approval decision to emit an op with command prefix"
saw_action,
"expected approval decision to emit an action with command prefix"
);
}
@@ -717,7 +721,10 @@ mod tests {
let mut decision = None;
while let Ok(ev) = rx.try_recv() {
if let AppEvent::CodexOp(Op::ExecApproval { decision: d, .. }) = ev {
if let AppEvent::AppServerAction(AppServerAction::ExecApproval {
decision: d, ..
}) = ev
{
decision = Some(d);
break;
}

View File

@@ -2,8 +2,8 @@ use std::collections::HashMap;
use std::path::PathBuf;
use crate::app_event::AppEvent;
use crate::app_event::AppServerAction;
use crate::app_event_sender::AppEventSender;
use codex_core::protocol::Op;
use codex_protocol::user_input::TextElement;
#[derive(Debug, Clone, PartialEq)]
@@ -213,11 +213,12 @@ impl ChatComposerHistory {
self.last_history_text = Some(entry.text.clone());
return Some(entry);
} else if let Some(log_id) = self.history_log_id {
let op = Op::GetHistoryEntryRequest {
offset: global_idx,
log_id,
};
app_event_tx.send(AppEvent::CodexOp(op));
app_event_tx.send(AppEvent::AppServerAction(
AppServerAction::GetHistoryEntry {
log_id,
offset: global_idx,
},
));
}
None
}
@@ -227,7 +228,7 @@ impl ChatComposerHistory {
mod tests {
use super::*;
use crate::app_event::AppEvent;
use codex_core::protocol::Op;
use crate::app_event::AppServerAction;
use tokio::sync::mpsc::unbounded_channel;
#[test]
@@ -272,15 +273,15 @@ mod tests {
assert!(history.should_handle_navigation("", 0));
assert!(history.navigate_up(&tx).is_none()); // don't replace the text yet
// Verify that an AppEvent::CodexOp with the correct GetHistoryEntryRequest was sent.
// Verify that an AppServerAction with the correct GetHistoryEntry request was sent.
let event = rx.try_recv().expect("expected AppEvent to be sent");
let AppEvent::CodexOp(history_request1) = event else {
let AppEvent::AppServerAction(history_request1) = event else {
panic!("unexpected event variant");
};
assert_eq!(
Op::GetHistoryEntryRequest {
AppServerAction::GetHistoryEntry {
log_id: 1,
offset: 2
offset: 2,
},
history_request1
);
@@ -294,15 +295,15 @@ mod tests {
// Next Up should move to offset 1.
assert!(history.navigate_up(&tx).is_none()); // don't replace the text yet
// Verify second CodexOp event for offset 1.
// Verify second AppServerAction event for offset 1.
let event2 = rx.try_recv().expect("expected second event");
let AppEvent::CodexOp(history_request_2) = event2 else {
let AppEvent::AppServerAction(history_request_2) = event2 else {
panic!("unexpected event variant");
};
assert_eq!(
Op::GetHistoryEntryRequest {
AppServerAction::GetHistoryEntry {
log_id: 1,
offset: 1
offset: 1,
},
history_request_2
);

View File

@@ -326,7 +326,7 @@ impl BottomPane {
&& !self.composer.popup_active()
&& let Some(status) = &self.status
{
// Send Op::Interrupt
// Send interrupt request
status.interrupt();
self.request_redraw();
return InputResult::None;
@@ -832,7 +832,7 @@ impl Renderable for BottomPane {
mod tests {
use super::*;
use crate::app_event::AppEvent;
use codex_core::protocol::Op;
use crate::app_event::AppServerAction;
use codex_protocol::protocol::SkillScope;
use crossterm::event::KeyModifiers;
use insta::assert_snapshot;
@@ -1198,8 +1198,8 @@ mod tests {
while let Ok(ev) = rx.try_recv() {
assert!(
!matches!(ev, AppEvent::CodexOp(Op::Interrupt)),
"expected Esc to not send Op::Interrupt when dismissing skill popup"
!matches!(ev, AppEvent::AppServerAction(AppServerAction::Interrupt)),
"expected Esc to not send interrupt when dismissing skill popup"
);
}
assert!(
@@ -1236,8 +1236,8 @@ mod tests {
while let Ok(ev) = rx.try_recv() {
assert!(
!matches!(ev, AppEvent::CodexOp(Op::Interrupt)),
"expected Esc to not send Op::Interrupt while command popup is active"
!matches!(ev, AppEvent::AppServerAction(AppServerAction::Interrupt)),
"expected Esc to not send interrupt while command popup is active"
);
}
assert_eq!(pane.composer_text(), "/");
@@ -1263,8 +1263,11 @@ mod tests {
pane.handle_key_event(KeyEvent::new(KeyCode::Esc, KeyModifiers::NONE));
assert!(
matches!(rx.try_recv(), Ok(AppEvent::CodexOp(Op::Interrupt))),
"expected Esc to send Op::Interrupt while a task is running"
matches!(
rx.try_recv(),
Ok(AppEvent::AppServerAction(AppServerAction::Interrupt))
),
"expected Esc to send interrupt while a task is running"
);
}

View File

@@ -18,6 +18,7 @@ mod layout;
mod render;
use crate::app_event::AppEvent;
use crate::app_event::AppServerAction;
use crate::app_event_sender::AppEventSender;
use crate::bottom_pane::CancellationEvent;
use crate::bottom_pane::ChatComposer;
@@ -29,7 +30,6 @@ use crate::bottom_pane::selection_popup_common::GenericDisplayRow;
use crate::bottom_pane::selection_popup_common::measure_rows_height;
use crate::render::renderable::Renderable;
use codex_core::protocol::Op;
use codex_protocol::request_user_input::RequestUserInputAnswer;
use codex_protocol::request_user_input::RequestUserInputEvent;
use codex_protocol::request_user_input::RequestUserInputResponse;
@@ -719,11 +719,12 @@ impl RequestUserInputOverlay {
},
);
}
self.app_event_tx
.send(AppEvent::CodexOp(Op::UserInputAnswer {
id: self.request.turn_id.clone(),
self.app_event_tx.send(AppEvent::AppServerAction(
AppServerAction::UserInputAnswer {
call_id: self.request.call_id.clone(),
response: RequestUserInputResponse { answers },
}));
},
));
if let Some(next) = self.queue.pop_front() {
self.request = next;
self.reset_for_request();
@@ -966,7 +967,8 @@ impl BottomPaneView for RequestUserInputOverlay {
}
if matches!(key_event.code, KeyCode::Esc) {
self.app_event_tx.send(AppEvent::CodexOp(Op::Interrupt));
self.app_event_tx
.send(AppEvent::AppServerAction(AppServerAction::Interrupt));
self.done = true;
return;
}
@@ -1173,7 +1175,8 @@ impl BottomPaneView for RequestUserInputOverlay {
fn on_ctrl_c(&mut self) -> CancellationEvent {
if self.confirm_unanswered_active() {
self.close_unanswered_confirmation();
self.app_event_tx.send(AppEvent::CodexOp(Op::Interrupt));
self.app_event_tx
.send(AppEvent::AppServerAction(AppServerAction::Interrupt));
self.done = true;
return CancellationEvent::Handled;
}
@@ -1182,7 +1185,8 @@ impl BottomPaneView for RequestUserInputOverlay {
return CancellationEvent::Handled;
}
self.app_event_tx.send(AppEvent::CodexOp(Op::Interrupt));
self.app_event_tx
.send(AppEvent::AppServerAction(AppServerAction::Interrupt));
self.done = true;
CancellationEvent::Handled
}
@@ -1403,10 +1407,12 @@ mod tests {
overlay.submit_answers();
let event = rx.try_recv().expect("expected AppEvent");
let AppEvent::CodexOp(Op::UserInputAnswer { id, response }) = event else {
let AppEvent::AppServerAction(AppServerAction::UserInputAnswer { call_id, response }) =
event
else {
panic!("expected UserInputAnswer");
};
assert_eq!(id, "turn-1");
assert_eq!(call_id, "call-1");
let answer = response.answers.get("q1").expect("answer missing");
assert_eq!(answer.answers, Vec::<String>::new());
}
@@ -1425,7 +1431,8 @@ mod tests {
overlay.handle_key_event(KeyEvent::from(KeyCode::Enter));
let event = rx.try_recv().expect("expected AppEvent");
let AppEvent::CodexOp(Op::UserInputAnswer { response, .. }) = event else {
let AppEvent::AppServerAction(AppServerAction::UserInputAnswer { response, .. }) = event
else {
panic!("expected UserInputAnswer");
};
let answer = response.answers.get("q1").expect("answer missing");
@@ -1458,7 +1465,8 @@ mod tests {
overlay.handle_key_event(KeyEvent::from(KeyCode::Enter));
let event = rx.try_recv().expect("expected AppEvent");
let AppEvent::CodexOp(Op::UserInputAnswer { response, .. }) = event else {
let AppEvent::AppServerAction(AppServerAction::UserInputAnswer { response, .. }) = event
else {
panic!("expected UserInputAnswer");
};
let answer = response.answers.get("q1").expect("answer missing");
@@ -1479,7 +1487,8 @@ mod tests {
overlay.handle_key_event(KeyEvent::from(KeyCode::Char('2')));
let event = rx.try_recv().expect("expected AppEvent");
let AppEvent::CodexOp(Op::UserInputAnswer { response, .. }) = event else {
let AppEvent::AppServerAction(AppServerAction::UserInputAnswer { response, .. }) = event
else {
panic!("expected UserInputAnswer");
};
let answer = response.answers.get("q1").expect("answer missing");
@@ -1634,7 +1643,8 @@ mod tests {
overlay.handle_key_event(KeyEvent::from(KeyCode::Enter));
let event = rx.try_recv().expect("expected AppEvent");
let AppEvent::CodexOp(Op::UserInputAnswer { response, .. }) = event else {
let AppEvent::AppServerAction(AppServerAction::UserInputAnswer { response, .. }) = event
else {
panic!("expected UserInputAnswer");
};
let answer = response.answers.get("q1").expect("answer missing");
@@ -1658,10 +1668,10 @@ mod tests {
assert_eq!(overlay.done, true);
let event = rx.try_recv().expect("expected AppEvent");
let AppEvent::CodexOp(op) = event else {
panic!("expected CodexOp");
let AppEvent::AppServerAction(action) = event else {
panic!("expected AppServerAction");
};
assert_eq!(op, Op::Interrupt);
assert_eq!(action, AppServerAction::Interrupt);
}
#[test]
@@ -1679,10 +1689,10 @@ mod tests {
assert_eq!(overlay.done, true);
let event = rx.try_recv().expect("expected AppEvent");
let AppEvent::CodexOp(op) = event else {
panic!("expected CodexOp");
let AppEvent::AppServerAction(action) = event else {
panic!("expected AppServerAction");
};
assert_eq!(op, Op::Interrupt);
assert_eq!(action, AppServerAction::Interrupt);
}
#[test]
@@ -1703,10 +1713,10 @@ mod tests {
assert_eq!(overlay.done, true);
let event = rx.try_recv().expect("expected AppEvent");
let AppEvent::CodexOp(op) = event else {
panic!("expected CodexOp");
let AppEvent::AppServerAction(action) = event else {
panic!("expected AppServerAction");
};
assert_eq!(op, Op::Interrupt);
assert_eq!(action, AppServerAction::Interrupt);
}
#[test]
@@ -1728,10 +1738,10 @@ mod tests {
assert_eq!(overlay.done, true);
let event = rx.try_recv().expect("expected AppEvent");
let AppEvent::CodexOp(op) = event else {
panic!("expected CodexOp");
let AppEvent::AppServerAction(action) = event else {
panic!("expected AppServerAction");
};
assert_eq!(op, Op::Interrupt);
assert_eq!(action, AppServerAction::Interrupt);
}
#[test]
@@ -1906,7 +1916,8 @@ mod tests {
overlay.submit_answers();
let event = rx.try_recv().expect("expected AppEvent");
let AppEvent::CodexOp(Op::UserInputAnswer { response, .. }) = event else {
let AppEvent::AppServerAction(AppServerAction::UserInputAnswer { response, .. }) = event
else {
panic!("expected UserInputAnswer");
};
let answer = response.answers.get("q1").expect("answer missing");
@@ -1931,7 +1942,8 @@ mod tests {
overlay.submit_answers();
let event = rx.try_recv().expect("expected AppEvent");
let AppEvent::CodexOp(Op::UserInputAnswer { response, .. }) = event else {
let AppEvent::AppServerAction(AppServerAction::UserInputAnswer { response, .. }) = event
else {
panic!("expected UserInputAnswer");
};
let answer = response.answers.get("q1").expect("answer missing");
@@ -1973,7 +1985,8 @@ mod tests {
overlay.submit_answers();
let event = rx.try_recv().expect("expected AppEvent");
let AppEvent::CodexOp(Op::UserInputAnswer { response, .. }) = event else {
let AppEvent::AppServerAction(AppServerAction::UserInputAnswer { response, .. }) = event
else {
panic!("expected UserInputAnswer");
};
let answer = response.answers.get("q1").expect("answer missing");
@@ -2009,7 +2022,8 @@ mod tests {
overlay.submit_answers();
let event = rx.try_recv().expect("expected AppEvent");
let AppEvent::CodexOp(Op::UserInputAnswer { response, .. }) = event else {
let AppEvent::AppServerAction(AppServerAction::UserInputAnswer { response, .. }) = event
else {
panic!("expected UserInputAnswer");
};
let answer = response.answers.get("q1").expect("answer missing");
@@ -2094,7 +2108,8 @@ mod tests {
overlay.submit_answers();
let event = rx.try_recv().expect("expected AppEvent");
let AppEvent::CodexOp(Op::UserInputAnswer { response, .. }) = event else {
let AppEvent::AppServerAction(AppServerAction::UserInputAnswer { response, .. }) = event
else {
panic!("expected UserInputAnswer");
};
let answer = response.answers.get("q1").expect("answer missing");

View File

@@ -13,6 +13,7 @@ use ratatui::widgets::Block;
use ratatui::widgets::Widget;
use crate::app_event::AppEvent;
use crate::app_event::AppServerAction;
use crate::app_event_sender::AppEventSender;
use crate::key_hint;
use crate::render::Insets;
@@ -22,7 +23,6 @@ use crate::render::renderable::Renderable;
use crate::skills_helpers::match_skill;
use crate::skills_helpers::truncate_skill_name;
use crate::style::user_message_style;
use codex_core::protocol::Op;
use super::CancellationEvent;
use super::bottom_pane_view::BottomPaneView;
@@ -187,10 +187,11 @@ impl SkillsToggleView {
}
self.complete = true;
self.app_event_tx.send(AppEvent::ManageSkillsClosed);
self.app_event_tx.send(AppEvent::CodexOp(Op::ListSkills {
cwds: Vec::new(),
force_reload: true,
}));
self.app_event_tx
.send(AppEvent::AppServerAction(AppServerAction::ListSkills {
cwds: Vec::new(),
force_reload: true,
}));
}
fn rows_width(total_width: u16) -> u16 {

View File

@@ -69,7 +69,6 @@ use codex_core::protocol::McpStartupStatus;
use codex_core::protocol::McpStartupUpdateEvent;
use codex_core::protocol::McpToolCallBeginEvent;
use codex_core::protocol::McpToolCallEndEvent;
use codex_core::protocol::Op;
use codex_core::protocol::PatchApplyBeginEvent;
use codex_core::protocol::RateLimitSnapshot;
use codex_core::protocol::ReviewRequest;
@@ -90,7 +89,6 @@ use codex_core::protocol::WarningEvent;
use codex_core::protocol::WebSearchBeginEvent;
use codex_core::protocol::WebSearchEndEvent;
use codex_core::skills::model::SkillMetadata;
#[cfg(target_os = "windows")]
use codex_core::windows_sandbox::WindowsSandboxLevelExt;
use codex_otel::OtelManager;
use codex_protocol::ThreadId;
@@ -101,7 +99,6 @@ use codex_protocol::config_types::CollaborationModeMask;
use codex_protocol::config_types::ModeKind;
use codex_protocol::config_types::Personality;
use codex_protocol::config_types::Settings;
#[cfg(target_os = "windows")]
use codex_protocol::config_types::WindowsSandboxLevel;
use codex_protocol::models::local_image_label_text;
use codex_protocol::parse_command::ParsedCommand;
@@ -122,7 +119,6 @@ use ratatui::style::Stylize;
use ratatui::text::Line;
use ratatui::widgets::Paragraph;
use ratatui::widgets::Wrap;
use tokio::sync::mpsc::UnboundedSender;
use tokio::task::JoinHandle;
use tracing::debug;
@@ -133,8 +129,10 @@ const PLAN_IMPLEMENTATION_NO: &str = "No, stay in Plan mode";
const PLAN_IMPLEMENTATION_CODING_MESSAGE: &str = "Implement the plan.";
use crate::app_event::AppEvent;
use crate::app_event::AppServerAction;
use crate::app_event::ConnectorsSnapshot;
use crate::app_event::ExitMode;
use crate::app_event::TurnStartRequest;
#[cfg(target_os = "windows")]
use crate::app_event::WindowsSandboxEnableMode;
use crate::app_event::WindowsSandboxFallbackReason;
@@ -186,10 +184,7 @@ use crate::text_formatting::truncate_text;
use crate::tui::FrameRequester;
mod interrupts;
use self::interrupts::InterruptManager;
mod agent;
use self::agent::spawn_agent;
use self::agent::spawn_agent_from_existing;
pub(crate) use self::agent::spawn_op_forwarder;
// Agent execution now lives behind the app-server client.
mod session_header;
use self::session_header::SessionHeader;
mod skills;
@@ -204,7 +199,6 @@ use codex_common::approval_presets::ApprovalPreset;
use codex_common::approval_presets::builtin_approval_presets;
use codex_core::AuthManager;
use codex_core::CodexAuth;
use codex_core::ThreadManager;
use codex_core::protocol::AskForApproval;
use codex_core::protocol::SandboxPolicy;
use codex_file_search::FileMatch;
@@ -443,7 +437,7 @@ pub(crate) enum ExternalEditorState {
///
/// `ChatWidget` owns the state derived from the protocol event stream (history cells, streaming
/// buffers, bottom-pane overlays, and transient status text) and turns key presses into user
/// intent (`Op` submissions and `AppEvent` requests).
/// intent (app-server requests and `AppEvent` requests).
///
/// It is not responsible for running the agent itself; it reflects progress by updating UI state
/// and by sending requests back to codex-core.
@@ -453,7 +447,6 @@ pub(crate) enum ExternalEditorState {
/// active work, arming the double-press quit shortcut, and requesting shutdown-first exit.
pub(crate) struct ChatWidget {
app_event_tx: AppEventSender,
codex_op_tx: UnboundedSender<Op>,
bottom_pane: BottomPane,
active_cell: Option<Box<dyn HistoryCell>>,
/// Monotonic-ish counter used to invalidate transcript overlay caching.
@@ -821,9 +814,9 @@ impl ChatWidget {
if let Some(messages) = initial_messages {
self.replay_initial_messages(messages);
}
// Ask codex-core to enumerate custom prompts for this session.
self.submit_op(Op::ListCustomPrompts);
self.submit_op(Op::ListSkills {
// Ask the app server to enumerate custom prompts and skills for this session.
self.submit_action(AppServerAction::ListCustomPrompts);
self.submit_action(AppServerAction::ListSkills {
cwds: Vec::new(),
force_reload: true,
});
@@ -2162,7 +2155,7 @@ impl ChatWidget {
self.had_work_activity = true;
}
pub(crate) fn new(common: ChatWidgetInit, thread_manager: Arc<ThreadManager>) -> Self {
pub(crate) fn new(common: ChatWidgetInit) -> Self {
let ChatWidgetInit {
config,
frame_requester,
@@ -2182,8 +2175,6 @@ impl ChatWidget {
config.model = model.clone();
let mut rng = rand::rng();
let placeholder = PLACEHOLDERS[rng.random_range(0..PLACEHOLDERS.len())].to_string();
let codex_op_tx = spawn_agent(config.clone(), app_event_tx.clone(), thread_manager);
let model_override = model.as_deref();
let model_for_header = model
.clone()
@@ -2210,7 +2201,6 @@ impl ChatWidget {
let mut widget = Self {
app_event_tx: app_event_tx.clone(),
frame_requester: frame_requester.clone(),
codex_op_tx,
bottom_pane: BottomPane::new(BottomPaneParams {
frame_requester,
app_event_tx,
@@ -2305,281 +2295,6 @@ impl ChatWidget {
widget
}
pub(crate) fn new_with_op_sender(
common: ChatWidgetInit,
codex_op_tx: UnboundedSender<Op>,
) -> Self {
let ChatWidgetInit {
config,
frame_requester,
app_event_tx,
initial_user_message,
enhanced_keys_supported,
auth_manager,
models_manager,
feedback,
is_first_run,
feedback_audience,
model,
otel_manager,
} = common;
let model = model.filter(|m| !m.trim().is_empty());
let mut config = config;
config.model = model.clone();
let mut rng = rand::rng();
let placeholder = PLACEHOLDERS[rng.random_range(0..PLACEHOLDERS.len())].to_string();
let model_override = model.as_deref();
let model_for_header = model
.clone()
.unwrap_or_else(|| DEFAULT_MODEL_DISPLAY_NAME.to_string());
let active_collaboration_mask =
Self::initial_collaboration_mask(&config, models_manager.as_ref(), model_override);
let header_model = active_collaboration_mask
.as_ref()
.and_then(|mask| mask.model.clone())
.unwrap_or_else(|| model_for_header.clone());
let fallback_custom = Settings {
model: header_model.clone(),
reasoning_effort: None,
developer_instructions: None,
};
// Collaboration modes start in Custom mode (not activated).
let current_collaboration_mode = CollaborationMode {
mode: ModeKind::Custom,
settings: fallback_custom,
};
let active_cell = Some(Self::placeholder_session_header_cell(&config));
let mut widget = Self {
app_event_tx: app_event_tx.clone(),
frame_requester: frame_requester.clone(),
codex_op_tx,
bottom_pane: BottomPane::new(BottomPaneParams {
frame_requester,
app_event_tx,
has_input_focus: true,
enhanced_keys_supported,
placeholder_text: placeholder,
disable_paste_burst: config.disable_paste_burst,
animations_enabled: config.animations,
skills: None,
}),
active_cell,
active_cell_revision: 0,
config,
skills_all: Vec::new(),
skills_initial_state: None,
current_collaboration_mode,
active_collaboration_mask,
auth_manager,
models_manager,
otel_manager,
session_header: SessionHeader::new(header_model),
initial_user_message,
token_info: None,
rate_limit_snapshot: None,
plan_type: None,
rate_limit_warnings: RateLimitWarningState::default(),
rate_limit_switch_prompt: RateLimitSwitchPromptState::default(),
rate_limit_poller: None,
stream_controller: None,
plan_stream_controller: None,
running_commands: HashMap::new(),
suppressed_exec_calls: HashSet::new(),
last_unified_wait: None,
unified_exec_wait_streak: None,
task_complete_pending: false,
unified_exec_processes: Vec::new(),
agent_turn_running: false,
mcp_startup_status: None,
connectors_cache: ConnectorsCacheState::default(),
interrupts: InterruptManager::new(),
reasoning_buffer: String::new(),
full_reasoning_buffer: String::new(),
current_status_header: String::from("Working"),
retry_status_header: None,
thread_id: None,
thread_name: None,
forked_from: None,
saw_plan_update_this_turn: false,
saw_plan_item_this_turn: false,
plan_delta_buffer: String::new(),
plan_item_active: false,
queued_user_messages: VecDeque::new(),
show_welcome_banner: is_first_run,
suppress_session_configured_redraw: false,
pending_notification: None,
quit_shortcut_expires_at: None,
quit_shortcut_key: None,
is_review_mode: false,
pre_review_token_info: None,
needs_final_message_separator: false,
had_work_activity: false,
last_separator_elapsed_secs: None,
last_rendered_width: std::cell::Cell::new(None),
feedback,
feedback_audience,
current_rollout_path: None,
external_editor_state: ExternalEditorState::Closed,
};
widget.prefetch_rate_limits();
widget
.bottom_pane
.set_steer_enabled(widget.config.features.enabled(Feature::Steer));
widget.bottom_pane.set_collaboration_modes_enabled(
widget.config.features.enabled(Feature::CollaborationModes),
);
widget.sync_personality_command_enabled();
widget
}
/// Create a ChatWidget attached to an existing conversation (e.g., a fork).
pub(crate) fn new_from_existing(
common: ChatWidgetInit,
conversation: std::sync::Arc<codex_core::CodexThread>,
session_configured: codex_core::protocol::SessionConfiguredEvent,
) -> Self {
let ChatWidgetInit {
config,
frame_requester,
app_event_tx,
initial_user_message,
enhanced_keys_supported,
auth_manager,
models_manager,
feedback,
feedback_audience,
model,
otel_manager,
..
} = common;
let model = model.filter(|m| !m.trim().is_empty());
let mut rng = rand::rng();
let placeholder = PLACEHOLDERS[rng.random_range(0..PLACEHOLDERS.len())].to_string();
let model_override = model.as_deref();
let header_model = model
.clone()
.unwrap_or_else(|| session_configured.model.clone());
let active_collaboration_mask =
Self::initial_collaboration_mask(&config, models_manager.as_ref(), model_override);
let header_model = active_collaboration_mask
.as_ref()
.and_then(|mask| mask.model.clone())
.unwrap_or(header_model);
let codex_op_tx =
spawn_agent_from_existing(conversation, session_configured, app_event_tx.clone());
let fallback_custom = Settings {
model: header_model.clone(),
reasoning_effort: None,
developer_instructions: None,
};
// Collaboration modes start in Custom mode (not activated).
let current_collaboration_mode = CollaborationMode {
mode: ModeKind::Custom,
settings: fallback_custom,
};
let mut widget = Self {
app_event_tx: app_event_tx.clone(),
frame_requester: frame_requester.clone(),
codex_op_tx,
bottom_pane: BottomPane::new(BottomPaneParams {
frame_requester,
app_event_tx,
has_input_focus: true,
enhanced_keys_supported,
placeholder_text: placeholder,
disable_paste_burst: config.disable_paste_burst,
animations_enabled: config.animations,
skills: None,
}),
active_cell: None,
active_cell_revision: 0,
config,
skills_all: Vec::new(),
skills_initial_state: None,
current_collaboration_mode,
active_collaboration_mask,
auth_manager,
models_manager,
otel_manager,
session_header: SessionHeader::new(header_model),
initial_user_message,
token_info: None,
rate_limit_snapshot: None,
plan_type: None,
rate_limit_warnings: RateLimitWarningState::default(),
rate_limit_switch_prompt: RateLimitSwitchPromptState::default(),
rate_limit_poller: None,
stream_controller: None,
plan_stream_controller: None,
running_commands: HashMap::new(),
suppressed_exec_calls: HashSet::new(),
last_unified_wait: None,
unified_exec_wait_streak: None,
task_complete_pending: false,
unified_exec_processes: Vec::new(),
agent_turn_running: false,
mcp_startup_status: None,
connectors_cache: ConnectorsCacheState::default(),
interrupts: InterruptManager::new(),
reasoning_buffer: String::new(),
full_reasoning_buffer: String::new(),
current_status_header: String::from("Working"),
retry_status_header: None,
thread_id: None,
thread_name: None,
forked_from: None,
queued_user_messages: VecDeque::new(),
show_welcome_banner: false,
suppress_session_configured_redraw: true,
pending_notification: None,
quit_shortcut_expires_at: None,
quit_shortcut_key: None,
is_review_mode: false,
pre_review_token_info: None,
needs_final_message_separator: false,
had_work_activity: false,
saw_plan_update_this_turn: false,
saw_plan_item_this_turn: false,
plan_delta_buffer: String::new(),
plan_item_active: false,
last_separator_elapsed_secs: None,
last_rendered_width: std::cell::Cell::new(None),
feedback,
feedback_audience,
current_rollout_path: None,
external_editor_state: ExternalEditorState::Closed,
};
widget.prefetch_rate_limits();
widget
.bottom_pane
.set_steer_enabled(widget.config.features.enabled(Feature::Steer));
widget.bottom_pane.set_collaboration_modes_enabled(
widget.config.features.enabled(Feature::CollaborationModes),
);
widget.sync_personality_command_enabled();
#[cfg(target_os = "windows")]
widget.bottom_pane.set_windows_degraded_sandbox_active(
codex_core::windows_sandbox::ELEVATED_SANDBOX_NUX_ENABLED
&& matches!(
WindowsSandboxLevel::from_config(&widget.config),
WindowsSandboxLevel::RestrictedToken
),
);
widget.update_collaboration_mode_indicator();
widget
}
pub(crate) fn handle_key_event(&mut self, key_event: KeyEvent) {
match key_event {
KeyEvent {
@@ -2604,6 +2319,19 @@ impl ChatWidget {
self.quit_shortcut_expires_at = None;
self.quit_shortcut_key = None;
}
KeyEvent {
code: KeyCode::Char(c),
modifiers,
kind: KeyEventKind::Press,
..
} if modifiers.contains(KeyModifiers::SUPER) && c.eq_ignore_ascii_case(&'d') => {
if self.on_ctrl_d() {
return;
}
self.bottom_pane.clear_quit_shortcut_hint();
self.quit_shortcut_expires_at = None;
self.quit_shortcut_key = None;
}
KeyEvent {
code: KeyCode::Char(c),
modifiers,
@@ -2804,7 +2532,8 @@ impl ChatWidget {
}
SlashCommand::Compact => {
self.clear_token_usage();
self.app_event_tx.send(AppEvent::CodexOp(Op::Compact));
self.app_event_tx
.send(AppEvent::AppServerAction(AppServerAction::Compact));
}
SlashCommand::Review => {
self.open_review_popup();
@@ -2912,7 +2641,7 @@ impl ChatWidget {
self.request_quit_without_confirmation();
}
// SlashCommand::Undo => {
// self.app_event_tx.send(AppEvent::CodexOp(Op::Undo));
// Undo shortcut pending: would dispatch an app-server action.
// }
SlashCommand::Diff => {
self.add_diff_in_progress();
@@ -3021,15 +2750,14 @@ impl ChatWidget {
let cell = Self::rename_confirmation_cell(&name, self.thread_id);
self.add_boxed_history(Box::new(cell));
self.request_redraw();
self.app_event_tx
.send(AppEvent::CodexOp(Op::SetThreadName { name }));
self.submit_action(AppServerAction::ThreadSetName { name });
}
SlashCommand::Collab | SlashCommand::Plan => {
let _ = trimmed;
self.dispatch_command(cmd);
}
SlashCommand::Review if !trimmed.is_empty() => {
self.submit_op(Op::Review {
self.submit_action(AppServerAction::ReviewStart {
review_request: ReviewRequest {
target: ReviewTarget::Custom {
instructions: trimmed.to_string(),
@@ -3067,7 +2795,9 @@ impl ChatWidget {
};
let cell = Self::rename_confirmation_cell(&name, thread_id);
tx.send(AppEvent::InsertHistoryCell(Box::new(cell)));
tx.send(AppEvent::CodexOp(Op::SetThreadName { name }));
tx.send(AppEvent::AppServerAction(AppServerAction::ThreadSetName {
name,
}));
}),
);
@@ -3168,7 +2898,7 @@ impl ChatWidget {
)));
return;
}
self.submit_op(Op::RunUserShellCommand {
self.submit_action(AppServerAction::RunUserShellCommand {
command: cmd.to_string(),
});
return;
@@ -3228,30 +2958,24 @@ impl ChatWidget {
.model_personality
.filter(|_| self.config.features.enabled(Feature::Personality))
.filter(|_| self.current_model_supports_personality());
let op = Op::UserTurn {
let request = TurnStartRequest {
items,
cwd: self.config.cwd.clone(),
approval_policy: self.config.approval_policy.value(),
sandbox_policy: self.config.sandbox_policy.get().clone(),
windows_sandbox_level: WindowsSandboxLevel::from_config(&self.config),
model: effective_mode.model().to_string(),
effort: effective_mode.reasoning_effort(),
summary: self.config.model_reasoning_summary,
final_output_json_schema: None,
summary: Some(self.config.model_reasoning_summary),
collaboration_mode,
personality,
output_schema: None,
};
self.codex_op_tx.send(op).unwrap_or_else(|e| {
tracing::error!("failed to send message: {e}");
});
self.submit_action(AppServerAction::TurnStart(request));
// Persist the text to cross-session message history.
if !text.is_empty() {
self.codex_op_tx
.send(Op::AddToHistory { text: text.clone() })
.unwrap_or_else(|e| {
tracing::error!("failed to send AddHistory op: {e}");
});
self.submit_action(AppServerAction::AddToHistory { text: text.clone() });
}
// Only show the text portion in conversation history.
@@ -3410,7 +3134,7 @@ impl ChatWidget {
EventMsg::ListCustomPromptsResponse(ev) => self.on_list_custom_prompts(ev),
EventMsg::ListSkillsResponse(ev) => self.on_list_skills(ev),
EventMsg::SkillsUpdateAvailable => {
self.submit_op(Op::ListSkills {
self.submit_action(AppServerAction::ListSkills {
cwds: Vec::new(),
force_reload: true,
});
@@ -3751,17 +3475,6 @@ impl ChatWidget {
let default_effort: ReasoningEffortConfig = preset.default_reasoning_effort;
let switch_actions: Vec<SelectionAction> = vec![Box::new(move |tx| {
tx.send(AppEvent::CodexOp(Op::OverrideTurnContext {
cwd: None,
approval_policy: None,
sandbox_policy: None,
windows_sandbox_level: None,
model: Some(switch_model.clone()),
effort: Some(Some(default_effort)),
summary: None,
collaboration_mode: None,
personality: None,
}));
tx.send(AppEvent::UpdateModel(switch_model.clone()));
tx.send(AppEvent::UpdateReasoningEffort(Some(default_effort)));
})];
@@ -3874,17 +3587,6 @@ impl ChatWidget {
let name = Self::personality_label(personality).to_string();
let description = Some(Self::personality_description(personality).to_string());
let actions: Vec<SelectionAction> = vec![Box::new(move |tx| {
tx.send(AppEvent::CodexOp(Op::OverrideTurnContext {
cwd: None,
approval_policy: None,
sandbox_policy: None,
model: None,
effort: None,
summary: None,
collaboration_mode: None,
windows_sandbox_level: None,
personality: Some(personality),
}));
tx.send(AppEvent::UpdatePersonality(personality));
tx.send(AppEvent::PersistPersonalitySelection { personality });
})];
@@ -4145,17 +3847,6 @@ impl ChatWidget {
let effort_label = effort_for_action
.map(|effort| effort.to_string())
.unwrap_or_else(|| "default".to_string());
tx.send(AppEvent::CodexOp(Op::OverrideTurnContext {
cwd: None,
approval_policy: None,
sandbox_policy: None,
windows_sandbox_level: None,
model: Some(model_for_action.clone()),
effort: Some(effort_for_action),
summary: None,
collaboration_mode: None,
personality: None,
}));
tx.send(AppEvent::UpdateModel(model_for_action.clone()));
tx.send(AppEvent::UpdateReasoningEffort(effort_for_action));
tx.send(AppEvent::PersistModelSelection {
@@ -4318,18 +4009,6 @@ impl ChatWidget {
}
fn apply_model_and_effort(&self, model: String, effort: Option<ReasoningEffortConfig>) {
self.app_event_tx
.send(AppEvent::CodexOp(Op::OverrideTurnContext {
cwd: None,
approval_policy: None,
sandbox_policy: None,
windows_sandbox_level: None,
model: Some(model.clone()),
effort: Some(effort),
summary: None,
collaboration_mode: None,
personality: None,
}));
self.app_event_tx.send(AppEvent::UpdateModel(model.clone()));
self.app_event_tx
.send(AppEvent::UpdateReasoningEffort(effort));
@@ -4508,17 +4187,6 @@ impl ChatWidget {
) -> Vec<SelectionAction> {
vec![Box::new(move |tx| {
let sandbox_clone = sandbox.clone();
tx.send(AppEvent::CodexOp(Op::OverrideTurnContext {
cwd: None,
approval_policy: Some(approval),
sandbox_policy: Some(sandbox_clone.clone()),
windows_sandbox_level: None,
model: None,
effort: None,
summary: None,
collaboration_mode: None,
personality: None,
}));
tx.send(AppEvent::UpdateAskForApprovalPolicy(approval));
tx.send(AppEvent::UpdateSandboxPolicy(sandbox_clone));
})]
@@ -5356,7 +5024,8 @@ impl ChatWidget {
/// Update the active collaboration mask.
///
/// When collaboration modes are enabled and a preset is selected (not Custom),
/// the current mode is attached to submissions as `Op::UserTurn { collaboration_mode: Some(...) }`.
/// the current mode is attached to submissions as `turn/start` with
/// `collaboration_mode` set.
pub(crate) fn set_collaboration_mask(&mut self, mask: CollaborationModeMask) {
if !self.collaboration_modes_enabled() {
return;
@@ -5455,7 +5124,7 @@ impl ChatWidget {
if self.config.mcp_servers.is_empty() {
self.add_to_history(history_cell::empty_mcp_output());
} else {
self.submit_op(Op::ListMcpTools);
self.submit_action(AppServerAction::ListMcpTools);
}
}
@@ -5613,7 +5282,7 @@ impl ChatWidget {
/// Handles a Ctrl+C press at the chat-widget layer.
///
/// The first press arms a time-bounded quit shortcut and shows a footer hint via the bottom
/// pane. If cancellable work is active, Ctrl+C also submits `Op::Interrupt` after the shortcut
/// pane. If cancellable work is active, Ctrl+C also submits an interrupt after the shortcut
/// is armed.
///
/// If the same quit shortcut is pressed again before expiry, this requests a shutdown-first
@@ -5636,7 +5305,7 @@ impl ChatWidget {
if !DOUBLE_PRESS_QUIT_SHORTCUT_ENABLED {
if self.is_cancellable_work_active() {
self.submit_op(Op::Interrupt);
self.submit_action(AppServerAction::Interrupt);
} else {
self.request_quit_without_confirmation();
}
@@ -5653,7 +5322,7 @@ impl ChatWidget {
self.arm_quit_shortcut(key);
if self.is_cancellable_work_active() {
self.submit_op(Op::Interrupt);
self.submit_action(AppServerAction::Interrupt);
}
}
@@ -5756,16 +5425,14 @@ impl ChatWidget {
pub(crate) fn clear_esc_backtrack_hint(&mut self) {
self.bottom_pane.clear_esc_backtrack_hint();
}
/// Forward an `Op` directly to codex.
pub(crate) fn submit_op(&mut self, op: Op) {
// Record outbound operation for session replay fidelity.
crate::session_log::log_outbound_op(&op);
if matches!(&op, Op::Review { .. }) && !self.bottom_pane.is_task_running() {
/// Forward an app-server action via the app event loop.
pub(crate) fn submit_action(&mut self, action: AppServerAction) {
if matches!(&action, AppServerAction::ReviewStart { .. })
&& !self.bottom_pane.is_task_running()
{
self.bottom_pane.set_task_running(true);
}
if let Err(e) = self.codex_op_tx.send(op) {
tracing::error!("failed to submit op: {e}");
}
self.app_event_tx.send(AppEvent::AppServerAction(action));
}
fn on_list_mcp_tools(&mut self, ev: McpListToolsResponseEvent) {
@@ -5821,7 +5488,7 @@ impl ChatWidget {
items.push(SelectionItem {
name: "Review uncommitted changes".to_string(),
actions: vec![Box::new(move |tx: &AppEventSender| {
tx.send(AppEvent::CodexOp(Op::Review {
tx.send(AppEvent::AppServerAction(AppServerAction::ReviewStart {
review_request: ReviewRequest {
target: ReviewTarget::UncommittedChanges,
user_facing_hint: None,
@@ -5874,7 +5541,7 @@ impl ChatWidget {
items.push(SelectionItem {
name: format!("{current_branch} -> {branch}"),
actions: vec![Box::new(move |tx3: &AppEventSender| {
tx3.send(AppEvent::CodexOp(Op::Review {
tx3.send(AppEvent::AppServerAction(AppServerAction::ReviewStart {
review_request: ReviewRequest {
target: ReviewTarget::BaseBranch {
branch: branch.clone(),
@@ -5911,7 +5578,7 @@ impl ChatWidget {
items.push(SelectionItem {
name: subject.clone(),
actions: vec![Box::new(move |tx3: &AppEventSender| {
tx3.send(AppEvent::CodexOp(Op::Review {
tx3.send(AppEvent::AppServerAction(AppServerAction::ReviewStart {
review_request: ReviewRequest {
target: ReviewTarget::Commit {
sha: sha.clone(),
@@ -5948,7 +5615,7 @@ impl ChatWidget {
if trimmed.is_empty() {
return;
}
tx.send(AppEvent::CodexOp(Op::Review {
tx.send(AppEvent::AppServerAction(AppServerAction::ReviewStart {
review_request: ReviewRequest {
target: ReviewTarget::Custom {
instructions: trimmed,
@@ -6196,7 +5863,7 @@ pub(crate) fn show_review_commit_picker_with_entries(
items.push(SelectionItem {
name: subject.clone(),
actions: vec![Box::new(move |tx3: &AppEventSender| {
tx3.send(AppEvent::CodexOp(Op::Review {
tx3.send(AppEvent::AppServerAction(AppServerAction::ReviewStart {
review_request: ReviewRequest {
target: ReviewTarget::Commit {
sha: sha.clone(),

View File

@@ -1,122 +0,0 @@
use std::sync::Arc;
use codex_core::CodexThread;
use codex_core::NewThread;
use codex_core::ThreadManager;
use codex_core::config::Config;
use codex_core::protocol::Event;
use codex_core::protocol::EventMsg;
use codex_core::protocol::Op;
use tokio::sync::mpsc::UnboundedSender;
use tokio::sync::mpsc::unbounded_channel;
use crate::app_event::AppEvent;
use crate::app_event_sender::AppEventSender;
/// Spawn the agent bootstrapper and op forwarding loop, returning the
/// `UnboundedSender<Op>` used by the UI to submit operations.
pub(crate) fn spawn_agent(
config: Config,
app_event_tx: AppEventSender,
server: Arc<ThreadManager>,
) -> UnboundedSender<Op> {
let (codex_op_tx, mut codex_op_rx) = unbounded_channel::<Op>();
let app_event_tx_clone = app_event_tx;
tokio::spawn(async move {
let NewThread {
thread,
session_configured,
..
} = match server.start_thread(config).await {
Ok(v) => v,
Err(err) => {
let message = format!("Failed to initialize codex: {err}");
tracing::error!("{message}");
app_event_tx_clone.send(AppEvent::CodexEvent(Event {
id: "".to_string(),
msg: EventMsg::Error(err.to_error_event(None)),
}));
app_event_tx_clone.send(AppEvent::FatalExitRequest(message));
tracing::error!("failed to initialize codex: {err}");
return;
}
};
// Forward the captured `SessionConfigured` event so it can be rendered in the UI.
let ev = codex_core::protocol::Event {
// The `id` does not matter for rendering, so we can use a fake value.
id: "".to_string(),
msg: codex_core::protocol::EventMsg::SessionConfigured(session_configured),
};
app_event_tx_clone.send(AppEvent::CodexEvent(ev));
let thread_clone = thread.clone();
tokio::spawn(async move {
while let Some(op) = codex_op_rx.recv().await {
let id = thread_clone.submit(op).await;
if let Err(e) = id {
tracing::error!("failed to submit op: {e}");
}
}
});
while let Ok(event) = thread.next_event().await {
app_event_tx_clone.send(AppEvent::CodexEvent(event));
}
});
codex_op_tx
}
/// Spawn agent loops for an existing thread (e.g., a forked thread).
/// Sends the provided `SessionConfiguredEvent` immediately, then forwards subsequent
/// events and accepts Ops for submission.
pub(crate) fn spawn_agent_from_existing(
thread: std::sync::Arc<CodexThread>,
session_configured: codex_core::protocol::SessionConfiguredEvent,
app_event_tx: AppEventSender,
) -> UnboundedSender<Op> {
let (codex_op_tx, mut codex_op_rx) = unbounded_channel::<Op>();
let app_event_tx_clone = app_event_tx;
tokio::spawn(async move {
// Forward the captured `SessionConfigured` event so it can be rendered in the UI.
let ev = codex_core::protocol::Event {
id: "".to_string(),
msg: codex_core::protocol::EventMsg::SessionConfigured(session_configured),
};
app_event_tx_clone.send(AppEvent::CodexEvent(ev));
let thread_clone = thread.clone();
tokio::spawn(async move {
while let Some(op) = codex_op_rx.recv().await {
let id = thread_clone.submit(op).await;
if let Err(e) = id {
tracing::error!("failed to submit op: {e}");
}
}
});
while let Ok(event) = thread.next_event().await {
app_event_tx_clone.send(AppEvent::CodexEvent(event));
}
});
codex_op_tx
}
/// Spawn an op-forwarding loop for an existing thread without subscribing to events.
pub(crate) fn spawn_op_forwarder(thread: std::sync::Arc<CodexThread>) -> UnboundedSender<Op> {
let (codex_op_tx, mut codex_op_rx) = unbounded_channel::<Op>();
tokio::spawn(async move {
while let Some(op) = codex_op_rx.recv().await {
if let Err(e) = thread.submit(op).await {
tracing::error!("failed to submit op: {e}");
}
}
});
codex_op_tx
}

File diff suppressed because it is too large Load Diff

View File

@@ -58,6 +58,7 @@ mod app;
mod app_backtrack;
mod app_event;
mod app_event_sender;
mod app_server_client;
mod ascii_animation;
mod bottom_pane;
mod chatwidget;

View File

@@ -7,11 +7,11 @@ use std::sync::Mutex;
use std::sync::OnceLock;
use codex_core::config::Config;
use codex_core::protocol::Op;
use serde::Serialize;
use serde_json::json;
use crate::app_event::AppEvent;
use crate::app_event::AppServerAction;
static LOGGER: LazyLock<SessionLogger> = LazyLock::new(SessionLogger::new);
@@ -128,6 +128,16 @@ pub(crate) fn log_inbound_app_event(event: &AppEvent) {
AppEvent::CodexEvent(ev) => {
write_record("to_tui", "codex_event", ev);
}
AppEvent::CodexThreadEvent { thread_id, event } => {
let value = json!({
"ts": now_ts(),
"dir": "to_tui",
"kind": "codex_event",
"thread_id": thread_id.to_string(),
"payload": event,
});
LOGGER.write_json_line(value);
}
AppEvent::NewSession => {
let value = json!({
"ts": now_ts(),
@@ -177,11 +187,11 @@ pub(crate) fn log_inbound_app_event(event: &AppEvent) {
}
}
pub(crate) fn log_outbound_op(op: &Op) {
pub(crate) fn log_outbound_app_action(action: &AppServerAction) {
if !LOGGER.is_enabled() {
return;
}
write_record("from_tui", "op", op);
write_record("from_tui", "app_server_action", action);
}
pub(crate) fn log_session_end() {

View File

@@ -4,7 +4,6 @@
use std::time::Duration;
use std::time::Instant;
use codex_core::protocol::Op;
use crossterm::event::KeyCode;
use ratatui::buffer::Buffer;
use ratatui::layout::Rect;
@@ -17,6 +16,7 @@ use ratatui::widgets::WidgetRef;
use unicode_width::UnicodeWidthStr;
use crate::app_event::AppEvent;
use crate::app_event::AppServerAction;
use crate::app_event_sender::AppEventSender;
use crate::exec_cell::spinner;
use crate::key_hint;
@@ -82,7 +82,8 @@ impl StatusIndicatorWidget {
}
pub(crate) fn interrupt(&self) {
self.app_event_tx.send(AppEvent::CodexOp(Op::Interrupt));
self.app_event_tx
.send(AppEvent::AppServerAction(AppServerAction::Interrupt));
}
/// Update the animated header label (left of the brackets).

View File

@@ -37,12 +37,16 @@ model_provider = "ollama"
let CodexCliOutput { exit_code, output } = run_codex_cli(codex_home, cwd).await?;
assert_ne!(0, exit_code, "Codex CLI should exit nonzero.");
assert!(
output.contains("ERROR: Failed to initialize codex:"),
output.contains("Error: app server error -32603"),
"expected startup error in output, got: {output}"
);
let expected_error = format!(
"Fatal error: failed to load rules: failed to read rules files from {}",
codex_home.join("rules").canonicalize()?.display()
);
assert!(
output.contains("failed to read rules files"),
"expected rules read error in output, got: {output}"
output.contains(&expected_error),
"expected:\n{expected_error}\nin output, got:\n{output}"
);
Ok(())
}