codex: fix CI failure on PR #14988

This commit is contained in:
Eric Traut
2026-03-18 11:58:09 -06:00
parent 37d04c649c
commit e9b1a7b2da
2 changed files with 303 additions and 152 deletions

View File

@@ -18,9 +18,11 @@ use crate::app_server_session::app_server_rate_limit_snapshot_to_core;
use crate::app_server_session::status_account_display_from_auth_mode;
use crate::local_chatgpt_auth::load_local_chatgpt_auth;
use codex_app_server_client::AppServerEvent;
use codex_app_server_protocol::AuthMode;
use codex_app_server_protocol::ChatgptAuthTokensRefreshParams;
use codex_app_server_protocol::JSONRPCErrorError;
use codex_app_server_protocol::JSONRPCNotification;
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::ServerNotification;
use codex_app_server_protocol::ServerRequest;
use codex_app_server_protocol::Thread;
@@ -86,69 +88,13 @@ impl App {
"app-server event consumer lagged; dropping ignored events"
);
}
AppServerEvent::ServerNotification(notification) => match notification {
ServerNotification::ServerRequestResolved(notification) => {
self.pending_app_server_requests
.resolve_notification(&notification.request_id);
}
ServerNotification::AccountRateLimitsUpdated(notification) => {
self.chat_widget.on_rate_limit_snapshot(Some(
app_server_rate_limit_snapshot_to_core(notification.rate_limits),
));
}
ServerNotification::AccountUpdated(notification) => {
self.chat_widget.update_account_state(
status_account_display_from_auth_mode(
notification.auth_mode,
notification.plan_type,
),
notification.plan_type,
matches!(
notification.auth_mode,
Some(codex_app_server_protocol::AuthMode::Chatgpt)
| Some(codex_app_server_protocol::AuthMode::ChatgptAuthTokens)
),
);
}
notification => {
if !app_server_client.is_remote()
&& matches!(
notification,
ServerNotification::TurnCompleted(_)
| ServerNotification::ThreadRealtimeItemAdded(_)
| ServerNotification::ThreadRealtimeOutputAudioDelta(_)
| ServerNotification::ThreadRealtimeError(_)
)
{
return;
}
if let Some((thread_id, events)) =
server_notification_thread_events(notification)
{
for event in events {
if self.primary_thread_id.is_none()
|| matches!(event.msg, EventMsg::SessionConfigured(_))
&& self.primary_thread_id == Some(thread_id)
{
if let Err(err) = self.enqueue_primary_event(event).await {
tracing::warn!(
"failed to enqueue primary app-server server notification: {err}"
);
}
} else if let Err(err) =
self.enqueue_thread_event(thread_id, event).await
{
tracing::warn!(
"failed to enqueue app-server server notification for {thread_id}: {err}"
);
}
}
}
}
},
AppServerEvent::ServerNotification(notification) => {
self.handle_server_notification_event(app_server_client, notification)
.await;
}
AppServerEvent::LegacyNotification(notification) => {
if let Some((thread_id, legacy_notification)) =
legacy_thread_notification(notification.clone())
legacy_thread_notification(notification)
{
let result = match legacy_notification {
LegacyThreadNotification::Warning(message) => {
@@ -174,23 +120,8 @@ impl App {
if let Err(err) = result {
tracing::warn!("failed to enqueue app-server legacy notification: {err}");
}
} else if let Some((thread_id, event)) = legacy_thread_event(notification.params) {
self.pending_app_server_requests.note_legacy_event(&event);
if legacy_event_is_shadowed_by_server_notification(&event.msg) {
return;
}
if self.primary_thread_id.is_none()
|| matches!(event.msg, EventMsg::SessionConfigured(_))
&& self.primary_thread_id == Some(thread_id)
{
if let Err(err) = self.enqueue_primary_event(event).await {
tracing::warn!("failed to enqueue primary app-server event: {err}");
}
} else if let Err(err) = self.enqueue_thread_event(thread_id, event).await {
tracing::warn!(
"failed to enqueue app-server thread event for {thread_id}: {err}"
);
}
} else {
tracing::debug!("ignoring legacy app-server notification in tui_app_server");
}
}
AppServerEvent::ServerRequest(request) => {
@@ -203,28 +134,8 @@ impl App {
.await;
return;
}
if let Some(unsupported) = self
.pending_app_server_requests
.note_server_request(&request)
{
tracing::warn!(
request_id = ?unsupported.request_id,
message = unsupported.message,
"rejecting unsupported app-server request"
);
self.chat_widget
.add_error_message(unsupported.message.clone());
if let Err(err) = self
.reject_app_server_request(
app_server_client,
unsupported.request_id,
unsupported.message,
)
.await
{
tracing::warn!("{err}");
}
}
self.handle_server_request_event(app_server_client, request)
.await;
}
AppServerEvent::Disconnected { message } => {
tracing::warn!("app-server event stream disconnected: {message}");
@@ -234,10 +145,118 @@ impl App {
}
}
async fn handle_server_notification_event(
&mut self,
_app_server_client: &AppServerSession,
notification: ServerNotification,
) {
match &notification {
ServerNotification::ServerRequestResolved(notification) => {
self.pending_app_server_requests
.resolve_notification(&notification.request_id);
}
ServerNotification::AccountRateLimitsUpdated(notification) => {
self.chat_widget.on_rate_limit_snapshot(Some(
app_server_rate_limit_snapshot_to_core(notification.rate_limits.clone()),
));
return;
}
ServerNotification::AccountUpdated(notification) => {
self.chat_widget.update_account_state(
status_account_display_from_auth_mode(
notification.auth_mode,
notification.plan_type,
),
notification.plan_type,
matches!(
notification.auth_mode,
Some(AuthMode::Chatgpt) | Some(AuthMode::ChatgptAuthTokens)
),
);
return;
}
_ => {}
}
match server_notification_thread_target(&notification) {
ServerNotificationThreadTarget::Thread(thread_id) => {
let result = if self.primary_thread_id == Some(thread_id)
|| self.primary_thread_id.is_none()
{
self.enqueue_primary_thread_notification(notification).await
} else {
self.enqueue_thread_notification(thread_id, notification)
.await
};
if let Err(err) = result {
tracing::warn!("failed to enqueue app-server notification: {err}");
}
return;
}
ServerNotificationThreadTarget::InvalidThreadId(thread_id) => {
tracing::warn!(
thread_id,
"ignoring app-server notification with invalid thread_id"
);
return;
}
ServerNotificationThreadTarget::Global => {}
}
self.chat_widget
.handle_server_notification(notification, /*replay_kind*/ None);
}
async fn handle_server_request_event(
&mut self,
app_server_client: &AppServerSession,
request: ServerRequest,
) {
if let Some(unsupported) = self
.pending_app_server_requests
.note_server_request(&request)
{
tracing::warn!(
request_id = ?unsupported.request_id,
message = unsupported.message,
"rejecting unsupported app-server request"
);
self.chat_widget
.add_error_message(unsupported.message.clone());
if let Err(err) = self
.reject_app_server_request(
app_server_client,
unsupported.request_id,
unsupported.message,
)
.await
{
tracing::warn!("{err}");
}
return;
}
let Some(thread_id) = server_request_thread_id(&request) else {
tracing::warn!("ignoring threadless app-server request");
return;
};
let result =
if self.primary_thread_id == Some(thread_id) || self.primary_thread_id.is_none() {
self.enqueue_primary_thread_request(request).await
} else {
self.enqueue_thread_request(thread_id, request).await
};
if let Err(err) = result {
tracing::warn!("failed to enqueue app-server request: {err}");
}
}
async fn handle_chatgpt_auth_tokens_refresh_request(
&mut self,
app_server_client: &AppServerSession,
request_id: codex_app_server_protocol::RequestId,
request_id: RequestId,
params: ChatgptAuthTokensRefreshParams,
) {
let config = self.config.clone();
@@ -318,6 +337,143 @@ impl App {
}
}
fn server_request_thread_id(request: &ServerRequest) -> Option<ThreadId> {
match request {
ServerRequest::CommandExecutionRequestApproval { params, .. } => {
ThreadId::from_string(&params.thread_id).ok()
}
ServerRequest::FileChangeRequestApproval { params, .. } => {
ThreadId::from_string(&params.thread_id).ok()
}
ServerRequest::ToolRequestUserInput { params, .. } => {
ThreadId::from_string(&params.thread_id).ok()
}
ServerRequest::McpServerElicitationRequest { params, .. } => {
ThreadId::from_string(&params.thread_id).ok()
}
ServerRequest::PermissionsRequestApproval { params, .. } => {
ThreadId::from_string(&params.thread_id).ok()
}
ServerRequest::DynamicToolCall { params, .. } => {
ThreadId::from_string(&params.thread_id).ok()
}
ServerRequest::ChatgptAuthTokensRefresh { .. }
| ServerRequest::ApplyPatchApproval { .. }
| ServerRequest::ExecCommandApproval { .. } => None,
}
}
#[derive(Debug, PartialEq, Eq)]
enum ServerNotificationThreadTarget {
Thread(ThreadId),
InvalidThreadId(String),
Global,
}
fn server_notification_thread_target(
notification: &ServerNotification,
) -> ServerNotificationThreadTarget {
let thread_id = match notification {
ServerNotification::Error(notification) => Some(notification.thread_id.as_str()),
ServerNotification::ThreadStarted(notification) => Some(notification.thread.id.as_str()),
ServerNotification::ThreadStatusChanged(notification) => {
Some(notification.thread_id.as_str())
}
ServerNotification::ThreadArchived(notification) => Some(notification.thread_id.as_str()),
ServerNotification::ThreadUnarchived(notification) => Some(notification.thread_id.as_str()),
ServerNotification::ThreadClosed(notification) => Some(notification.thread_id.as_str()),
ServerNotification::ThreadNameUpdated(notification) => {
Some(notification.thread_id.as_str())
}
ServerNotification::ThreadTokenUsageUpdated(notification) => {
Some(notification.thread_id.as_str())
}
ServerNotification::TurnStarted(notification) => Some(notification.thread_id.as_str()),
ServerNotification::HookStarted(notification) => Some(notification.thread_id.as_str()),
ServerNotification::TurnCompleted(notification) => Some(notification.thread_id.as_str()),
ServerNotification::HookCompleted(notification) => Some(notification.thread_id.as_str()),
ServerNotification::TurnDiffUpdated(notification) => Some(notification.thread_id.as_str()),
ServerNotification::TurnPlanUpdated(notification) => Some(notification.thread_id.as_str()),
ServerNotification::ItemStarted(notification) => Some(notification.thread_id.as_str()),
ServerNotification::ItemGuardianApprovalReviewStarted(notification) => {
Some(notification.thread_id.as_str())
}
ServerNotification::ItemGuardianApprovalReviewCompleted(notification) => {
Some(notification.thread_id.as_str())
}
ServerNotification::ItemCompleted(notification) => Some(notification.thread_id.as_str()),
ServerNotification::RawResponseItemCompleted(notification) => {
Some(notification.thread_id.as_str())
}
ServerNotification::AgentMessageDelta(notification) => {
Some(notification.thread_id.as_str())
}
ServerNotification::PlanDelta(notification) => Some(notification.thread_id.as_str()),
ServerNotification::CommandExecutionOutputDelta(notification) => {
Some(notification.thread_id.as_str())
}
ServerNotification::TerminalInteraction(notification) => {
Some(notification.thread_id.as_str())
}
ServerNotification::FileChangeOutputDelta(notification) => {
Some(notification.thread_id.as_str())
}
ServerNotification::ServerRequestResolved(notification) => {
Some(notification.thread_id.as_str())
}
ServerNotification::McpToolCallProgress(notification) => {
Some(notification.thread_id.as_str())
}
ServerNotification::ReasoningSummaryTextDelta(notification) => {
Some(notification.thread_id.as_str())
}
ServerNotification::ReasoningSummaryPartAdded(notification) => {
Some(notification.thread_id.as_str())
}
ServerNotification::ReasoningTextDelta(notification) => {
Some(notification.thread_id.as_str())
}
ServerNotification::ContextCompacted(notification) => Some(notification.thread_id.as_str()),
ServerNotification::ModelRerouted(notification) => Some(notification.thread_id.as_str()),
ServerNotification::ThreadRealtimeStarted(notification) => {
Some(notification.thread_id.as_str())
}
ServerNotification::ThreadRealtimeItemAdded(notification) => {
Some(notification.thread_id.as_str())
}
ServerNotification::ThreadRealtimeOutputAudioDelta(notification) => {
Some(notification.thread_id.as_str())
}
ServerNotification::ThreadRealtimeError(notification) => {
Some(notification.thread_id.as_str())
}
ServerNotification::ThreadRealtimeClosed(notification) => {
Some(notification.thread_id.as_str())
}
ServerNotification::SkillsChanged(_)
| ServerNotification::McpServerOauthLoginCompleted(_)
| ServerNotification::AccountUpdated(_)
| ServerNotification::AccountRateLimitsUpdated(_)
| ServerNotification::AppListUpdated(_)
| ServerNotification::DeprecationNotice(_)
| ServerNotification::ConfigWarning(_)
| ServerNotification::FuzzyFileSearchSessionUpdated(_)
| ServerNotification::FuzzyFileSearchSessionCompleted(_)
| ServerNotification::CommandExecOutputDelta(_)
| ServerNotification::WindowsWorldWritableWarning(_)
| ServerNotification::WindowsSandboxSetupCompleted(_)
| ServerNotification::AccountLoginCompleted(_) => None,
};
match thread_id {
Some(thread_id) => match ThreadId::from_string(thread_id) {
Ok(thread_id) => ServerNotificationThreadTarget::Thread(thread_id),
Err(_) => ServerNotificationThreadTarget::InvalidThreadId(thread_id.to_string()),
},
None => ServerNotificationThreadTarget::Global,
}
}
fn resolve_chatgpt_auth_tokens_refresh_response(
codex_home: &std::path::Path,
auth_credentials_store_mode: codex_core::auth::AuthCredentialsStoreMode,
@@ -365,22 +521,6 @@ pub(super) fn thread_snapshot_events(
.collect()
}
fn legacy_thread_event(params: Option<Value>) -> Option<(ThreadId, Event)> {
let Value::Object(mut params) = params? else {
return None;
};
let thread_id = params
.remove("conversationId")
.and_then(|value| serde_json::from_value::<String>(value).ok())
.and_then(|value| ThreadId::from_string(&value).ok());
let event = serde_json::from_value::<Event>(Value::Object(params)).ok()?;
let thread_id = thread_id.or(match &event.msg {
EventMsg::SessionConfigured(session) => Some(session.session_id),
_ => None,
})?;
Some((thread_id, event))
}
fn legacy_thread_notification(
notification: JSONRPCNotification,
) -> Option<(ThreadId, LegacyThreadNotification)> {
@@ -423,27 +563,6 @@ fn legacy_thread_notification(
}
}
fn legacy_event_is_shadowed_by_server_notification(msg: &EventMsg) -> bool {
matches!(
msg,
EventMsg::TokenCount(_)
| EventMsg::Error(_)
| EventMsg::ThreadNameUpdated(_)
| EventMsg::TurnStarted(_)
| EventMsg::ItemStarted(_)
| EventMsg::ItemCompleted(_)
| EventMsg::ExecCommandBegin(_)
| EventMsg::ExecCommandOutputDelta(_)
| EventMsg::ExecCommandEnd(_)
| EventMsg::AgentMessageDelta(_)
| EventMsg::PlanDelta(_)
| EventMsg::AgentReasoningDelta(_)
| EventMsg::AgentReasoningRawContentDelta(_)
| EventMsg::RealtimeConversationStarted(_)
| EventMsg::RealtimeConversationClosed(_)
)
}
fn server_notification_thread_events(
notification: ServerNotification,
) -> Option<(ThreadId, Vec<Event>)> {
@@ -777,13 +896,33 @@ fn thread_item_to_core(item: &ThreadItem) -> Option<TurnItem> {
.map(codex_app_server_protocol::UserInput::into_core)
.collect(),
})),
ThreadItem::AgentMessage { id, text, phase } => {
Some(TurnItem::AgentMessage(AgentMessageItem {
id: id.clone(),
content: vec![AgentMessageContent::Text { text: text.clone() }],
phase: phase.clone(),
}))
}
ThreadItem::AgentMessage {
id,
text,
phase,
memory_citation,
} => Some(TurnItem::AgentMessage(AgentMessageItem {
id: id.clone(),
content: vec![AgentMessageContent::Text { text: text.clone() }],
phase: phase.clone(),
memory_citation: memory_citation.clone().map(|citation| {
codex_protocol::memory_citation::MemoryCitation {
entries: citation
.entries
.into_iter()
.map(
|entry| codex_protocol::memory_citation::MemoryCitationEntry {
path: entry.path,
line_start: entry.line_start,
line_end: entry.line_end,
note: entry.note,
},
)
.collect(),
rollout_ids: citation.thread_ids,
}
}),
})),
ThreadItem::Plan { id, text } => Some(TurnItem::Plan(PlanItem {
id: id.clone(),
text: text.clone(),
@@ -1193,6 +1332,7 @@ mod tests {
id: item_id,
text: "Hello from your coding assistant.".to_string(),
phase: Some(MessagePhase::FinalAnswer),
memory_citation: None,
},
thread_id: thread_id.clone(),
turn_id: turn_id.clone(),
@@ -1217,7 +1357,9 @@ mod tests {
);
assert_eq!(completed.turn_id, turn_id);
match &completed.item {
TurnItem::AgentMessage(AgentMessageItem { id, content, phase }) => {
TurnItem::AgentMessage(AgentMessageItem {
id, content, phase, ..
}) => {
assert_eq!(id, "msg_123");
let [AgentMessageContent::Text { text }] = content.as_slice() else {
panic!("expected a single text content item");
@@ -1589,6 +1731,7 @@ mod tests {
id: "assistant-1".to_string(),
text: "hi".to_string(),
phase: Some(MessagePhase::FinalAnswer),
memory_citation: None,
},
],
status: TurnStatus::Completed,

View File

@@ -5556,9 +5556,11 @@ impl ChatWidget {
command,
cwd,
process_id,
source,
status,
command_actions,
aggregated_output,
formatted_output,
exit_code,
duration_ms,
} => {
@@ -5576,10 +5578,11 @@ impl ChatWidget {
.into_iter()
.map(codex_app_server_protocol::CommandAction::into_core)
.collect(),
source: ExecCommandSource::Agent,
source: source.to_core(),
interaction_input: None,
});
} else {
let aggregated_output = aggregated_output.unwrap_or_default();
self.on_exec_command_end(ExecCommandEndEvent {
call_id: id,
process_id,
@@ -5590,16 +5593,20 @@ impl ChatWidget {
.into_iter()
.map(codex_app_server_protocol::CommandAction::into_core)
.collect(),
source: ExecCommandSource::Agent,
source: source.to_core(),
interaction_input: None,
stdout: String::new(),
stderr: String::new(),
aggregated_output: aggregated_output.unwrap_or_default(),
aggregated_output: aggregated_output.clone(),
exit_code: exit_code.unwrap_or_default(),
duration: Duration::from_millis(
duration_ms.unwrap_or_default().max(0) as u64
),
formatted_output: String::new(),
formatted_output: if formatted_output.is_empty() {
aggregated_output
} else {
formatted_output
},
status: match status {
codex_app_server_protocol::CommandExecutionStatus::Completed => {
codex_protocol::protocol::ExecCommandStatus::Completed
@@ -5879,7 +5886,7 @@ impl ChatWidget {
self.on_exec_command_output_delta(ExecCommandOutputDeltaEvent {
call_id: notification.item_id,
stream: codex_protocol::protocol::ExecOutputStream::Stdout,
chunk: notification.delta.into_bytes(),
chunk: notification.delta,
});
}
ServerNotification::FileChangeOutputDelta(notification) => {
@@ -6138,6 +6145,7 @@ impl ChatWidget {
command,
cwd,
process_id,
source,
command_actions,
..
} => {
@@ -6151,7 +6159,7 @@ impl ChatWidget {
.into_iter()
.map(codex_app_server_protocol::CommandAction::into_core)
.collect(),
source: ExecCommandSource::Agent,
source: source.to_core(),
interaction_input: None,
});
}