Files
codex/codex-rs/app-server/src/bespoke_event_handling.rs
Curtis 'Fjord' Hawthorne 8543e39885 Preserve image detail in app-server inputs (#20693)
## Summary

- Add optional image detail to user image inputs across core, app-server
v2, thread history/event mapping, and the generated app-server
schemas/types.
- Preserve requested detail when serializing Responses image inputs:
omitted detail stays on the existing `high` default, while explicit
`original` keeps local images on the original-resolution path.
- Support `high`/`original` consistently for tool image outputs,
including MCP `codex/imageDetail`, code-mode image helpers, and
`view_image`.
2026-05-15 15:04:04 -07:00

3815 lines
145 KiB
Rust

use crate::error_code::internal_error;
use crate::error_code::invalid_request;
use crate::outgoing_message::ClientRequestResult;
use crate::outgoing_message::ThreadScopedOutgoingMessageSender;
use crate::request_processors::populate_thread_turns_from_history;
use crate::request_processors::thread_from_stored_thread;
use crate::server_request_error::is_turn_transition_server_request_error;
use crate::thread_state::ThreadState;
use crate::thread_state::TurnSummary;
use crate::thread_state::resolve_server_request_on_thread_listener;
use crate::thread_status::ThreadWatchActiveGuard;
use crate::thread_status::ThreadWatchManager;
use codex_app_server_protocol::AccountRateLimitsUpdatedNotification;
use codex_app_server_protocol::AdditionalPermissionProfile as V2AdditionalPermissionProfile;
use codex_app_server_protocol::CodexErrorInfo as V2CodexErrorInfo;
use codex_app_server_protocol::CommandAction as V2ParsedCommand;
use codex_app_server_protocol::CommandExecutionApprovalDecision;
use codex_app_server_protocol::CommandExecutionRequestApprovalParams;
use codex_app_server_protocol::CommandExecutionRequestApprovalResponse;
use codex_app_server_protocol::CommandExecutionSource;
use codex_app_server_protocol::CommandExecutionStatus;
use codex_app_server_protocol::DeprecationNoticeNotification;
use codex_app_server_protocol::DynamicToolCallParams;
use codex_app_server_protocol::DynamicToolCallStatus;
use codex_app_server_protocol::ErrorNotification;
use codex_app_server_protocol::ExecPolicyAmendment as V2ExecPolicyAmendment;
use codex_app_server_protocol::FileChangeApprovalDecision;
use codex_app_server_protocol::FileChangeRequestApprovalParams;
use codex_app_server_protocol::FileChangeRequestApprovalResponse;
use codex_app_server_protocol::GrantedPermissionProfile as V2GrantedPermissionProfile;
use codex_app_server_protocol::GuardianWarningNotification;
use codex_app_server_protocol::HookCompletedNotification;
use codex_app_server_protocol::HookStartedNotification;
use codex_app_server_protocol::ItemCompletedNotification;
use codex_app_server_protocol::ItemStartedNotification;
use codex_app_server_protocol::McpServerElicitationAction;
use codex_app_server_protocol::McpServerElicitationRequestParams;
use codex_app_server_protocol::McpServerElicitationRequestResponse;
use codex_app_server_protocol::McpServerStartupState;
use codex_app_server_protocol::McpServerStatusUpdatedNotification;
use codex_app_server_protocol::ModelReroutedNotification;
use codex_app_server_protocol::ModelVerificationNotification;
use codex_app_server_protocol::NetworkApprovalContext as V2NetworkApprovalContext;
use codex_app_server_protocol::NetworkPolicyAmendment as V2NetworkPolicyAmendment;
use codex_app_server_protocol::NetworkPolicyRuleAction as V2NetworkPolicyRuleAction;
use codex_app_server_protocol::PermissionsRequestApprovalParams;
use codex_app_server_protocol::PermissionsRequestApprovalResponse;
use codex_app_server_protocol::RawResponseItemCompletedNotification;
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::ServerNotification;
use codex_app_server_protocol::ServerRequestPayload;
use codex_app_server_protocol::ThreadGoalUpdatedNotification;
use codex_app_server_protocol::ThreadItem;
use codex_app_server_protocol::ThreadRealtimeClosedNotification;
use codex_app_server_protocol::ThreadRealtimeErrorNotification;
use codex_app_server_protocol::ThreadRealtimeItemAddedNotification;
use codex_app_server_protocol::ThreadRealtimeOutputAudioDeltaNotification;
use codex_app_server_protocol::ThreadRealtimeSdpNotification;
use codex_app_server_protocol::ThreadRealtimeStartedNotification;
use codex_app_server_protocol::ThreadRealtimeTranscriptDeltaNotification;
use codex_app_server_protocol::ThreadRealtimeTranscriptDoneNotification;
use codex_app_server_protocol::ThreadRollbackResponse;
use codex_app_server_protocol::ThreadStatus;
use codex_app_server_protocol::ThreadTokenUsage;
use codex_app_server_protocol::ThreadTokenUsageUpdatedNotification;
use codex_app_server_protocol::ToolRequestUserInputOption;
use codex_app_server_protocol::ToolRequestUserInputParams;
use codex_app_server_protocol::ToolRequestUserInputQuestion;
use codex_app_server_protocol::ToolRequestUserInputResponse;
use codex_app_server_protocol::Turn;
use codex_app_server_protocol::TurnCompletedNotification;
use codex_app_server_protocol::TurnDiffUpdatedNotification;
use codex_app_server_protocol::TurnError;
use codex_app_server_protocol::TurnInterruptResponse;
use codex_app_server_protocol::TurnItemsView;
use codex_app_server_protocol::TurnPlanStep;
use codex_app_server_protocol::TurnPlanUpdatedNotification;
use codex_app_server_protocol::TurnStartedNotification;
use codex_app_server_protocol::TurnStatus;
use codex_app_server_protocol::WarningNotification;
use codex_app_server_protocol::build_item_from_guardian_event;
use codex_app_server_protocol::guardian_auto_approval_review_notification;
use codex_app_server_protocol::item_event_to_server_notification;
use codex_core::CodexThread;
use codex_core::ThreadManager;
use codex_core::review_format::format_review_findings_block;
use codex_core::review_prompts;
use codex_protocol::ThreadId;
use codex_protocol::items::parse_hook_prompt_message;
use codex_protocol::models::AdditionalPermissionProfile as CoreAdditionalPermissionProfile;
use codex_protocol::plan_tool::UpdatePlanArgs;
use codex_protocol::protocol::CodexErrorInfo as CoreCodexErrorInfo;
use codex_protocol::protocol::Event;
use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::ExecApprovalRequestEvent;
use codex_protocol::protocol::Op;
use codex_protocol::protocol::RealtimeEvent;
use codex_protocol::protocol::ReviewDecision;
use codex_protocol::protocol::ReviewOutputEvent;
use codex_protocol::protocol::TokenCountEvent;
use codex_protocol::protocol::TurnAbortedEvent;
use codex_protocol::protocol::TurnCompleteEvent;
use codex_protocol::protocol::TurnDiffEvent;
use codex_protocol::request_permissions::PermissionGrantScope as CorePermissionGrantScope;
use codex_protocol::request_permissions::RequestPermissionProfile as CoreRequestPermissionProfile;
use codex_protocol::request_permissions::RequestPermissionsResponse as CoreRequestPermissionsResponse;
use codex_protocol::request_user_input::RequestUserInputAnswer as CoreRequestUserInputAnswer;
use codex_protocol::request_user_input::RequestUserInputResponse as CoreRequestUserInputResponse;
use codex_sandboxing::policy_transforms::intersect_permission_profiles;
use codex_shell_command::parse_command::shlex_join;
use codex_utils_absolute_path::AbsolutePathBuf;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::SystemTime;
use std::time::UNIX_EPOCH;
use tokio::sync::Mutex;
use tokio::sync::oneshot;
use tracing::error;
enum CommandExecutionApprovalPresentation {
Network(V2NetworkApprovalContext),
Command(CommandExecutionCompletionItem),
}
#[derive(Debug, PartialEq)]
struct CommandExecutionCompletionItem {
command: String,
cwd: AbsolutePathBuf,
command_actions: Vec<V2ParsedCommand>,
}
#[allow(clippy::too_many_arguments)]
pub(crate) async fn apply_bespoke_event_handling(
event: Event,
conversation_id: ThreadId,
conversation: Arc<CodexThread>,
thread_manager: Arc<ThreadManager>,
outgoing: ThreadScopedOutgoingMessageSender,
thread_state: Arc<tokio::sync::Mutex<ThreadState>>,
thread_watch_manager: ThreadWatchManager,
thread_list_state_permit: Arc<tokio::sync::Semaphore>,
fallback_model_provider: String,
) {
let Event {
id: event_turn_id,
msg,
} = event;
match msg {
EventMsg::TurnStarted(payload) => {
// While not technically necessary as it was already done on TurnComplete, be extra cautios and abort any pending server requests.
outgoing.abort_pending_server_requests().await;
thread_watch_manager
.note_turn_started(&conversation_id.to_string())
.await;
let turn = {
let state = thread_state.lock().await;
let mut turn = state.active_turn_snapshot().unwrap_or_else(|| Turn {
id: payload.turn_id.clone(),
items: Vec::new(),
items_view: TurnItemsView::NotLoaded,
error: None,
status: TurnStatus::InProgress,
started_at: payload.started_at,
completed_at: None,
duration_ms: None,
});
turn.items.clear();
turn.items_view = TurnItemsView::NotLoaded;
turn
};
let notification = TurnStartedNotification {
thread_id: conversation_id.to_string(),
turn,
};
outgoing
.send_server_notification(ServerNotification::TurnStarted(notification))
.await;
}
EventMsg::TurnComplete(turn_complete_event) => {
// All per-thread requests are bound to a turn, so abort them.
outgoing.abort_pending_server_requests().await;
respond_to_pending_interrupts(&thread_state, &outgoing).await;
let turn_failed = thread_state.lock().await.turn_summary.last_error.is_some();
thread_watch_manager
.note_turn_completed(&conversation_id.to_string(), turn_failed)
.await;
handle_turn_complete(
conversation_id,
event_turn_id,
turn_complete_event,
&outgoing,
&thread_state,
)
.await;
}
EventMsg::McpStartupUpdate(update) => {
let (status, error) = match update.status {
codex_protocol::protocol::McpStartupStatus::Starting => {
(McpServerStartupState::Starting, None)
}
codex_protocol::protocol::McpStartupStatus::Ready => {
(McpServerStartupState::Ready, None)
}
codex_protocol::protocol::McpStartupStatus::Failed { error } => {
(McpServerStartupState::Failed, Some(error))
}
codex_protocol::protocol::McpStartupStatus::Cancelled => {
(McpServerStartupState::Cancelled, None)
}
};
let notification = McpServerStatusUpdatedNotification {
name: update.server,
status,
error,
};
outgoing
.send_server_notification(ServerNotification::McpServerStatusUpdated(notification))
.await;
}
EventMsg::Warning(warning_event) => {
let notification = WarningNotification {
thread_id: Some(conversation_id.to_string()),
message: warning_event.message,
};
outgoing
.send_server_notification(ServerNotification::Warning(notification))
.await;
}
EventMsg::GuardianWarning(warning_event) => {
let notification = GuardianWarningNotification {
thread_id: conversation_id.to_string(),
message: warning_event.message,
};
outgoing
.send_server_notification(ServerNotification::GuardianWarning(notification))
.await;
}
EventMsg::GuardianAssessment(assessment) => {
let pending_command_execution = match build_item_from_guardian_event(
&assessment,
CommandExecutionStatus::InProgress,
) {
Some(ThreadItem::CommandExecution {
id,
command,
cwd,
command_actions,
..
}) => Some((
id,
CommandExecutionCompletionItem {
command,
cwd,
command_actions,
},
)),
Some(_) | None => None,
};
let assessment_turn_id = if assessment.turn_id.is_empty() {
event_turn_id.clone()
} else {
assessment.turn_id.clone()
};
if assessment.status == codex_protocol::protocol::GuardianAssessmentStatus::InProgress
&& let Some((target_item_id, completion_item)) = pending_command_execution.as_ref()
{
start_command_execution_item(
&conversation_id,
assessment_turn_id.clone(),
target_item_id.clone(),
completion_item.command.clone(),
completion_item.cwd.clone(),
completion_item.command_actions.clone(),
CommandExecutionSource::Agent,
&outgoing,
&thread_state,
)
.await;
}
let notification = guardian_auto_approval_review_notification(
&conversation_id,
&event_turn_id,
&assessment,
);
outgoing.send_server_notification(notification).await;
let completion_status = match assessment.status {
codex_protocol::protocol::GuardianAssessmentStatus::Denied
| codex_protocol::protocol::GuardianAssessmentStatus::Aborted => {
Some(CommandExecutionStatus::Declined)
}
codex_protocol::protocol::GuardianAssessmentStatus::TimedOut => {
Some(CommandExecutionStatus::Failed)
}
codex_protocol::protocol::GuardianAssessmentStatus::InProgress
| codex_protocol::protocol::GuardianAssessmentStatus::Approved => None,
};
if let Some(completion_status) = completion_status
&& let Some((target_item_id, completion_item)) = pending_command_execution
{
complete_command_execution_item(
&conversation_id,
assessment_turn_id,
target_item_id,
completion_item.command,
completion_item.cwd,
/*process_id*/ None,
CommandExecutionSource::Agent,
completion_item.command_actions,
completion_status,
&outgoing,
&thread_state,
)
.await;
}
}
EventMsg::ModelReroute(event) => {
let notification = ModelReroutedNotification {
thread_id: conversation_id.to_string(),
turn_id: event_turn_id.clone(),
from_model: event.from_model,
to_model: event.to_model,
reason: event.reason.into(),
};
outgoing
.send_server_notification(ServerNotification::ModelRerouted(notification))
.await;
}
EventMsg::ModelVerification(event) => {
let notification = ModelVerificationNotification {
thread_id: conversation_id.to_string(),
turn_id: event_turn_id.clone(),
verifications: event.verifications.into_iter().map(Into::into).collect(),
};
outgoing
.send_server_notification(ServerNotification::ModelVerification(notification))
.await;
}
EventMsg::RealtimeConversationStarted(event) => {
let notification = ThreadRealtimeStartedNotification {
thread_id: conversation_id.to_string(),
realtime_session_id: event.realtime_session_id,
version: event.version,
};
outgoing
.send_server_notification(ServerNotification::ThreadRealtimeStarted(notification))
.await;
}
EventMsg::RealtimeConversationSdp(event) => {
let notification = ThreadRealtimeSdpNotification {
thread_id: conversation_id.to_string(),
sdp: event.sdp,
};
outgoing
.send_server_notification(ServerNotification::ThreadRealtimeSdp(notification))
.await;
}
EventMsg::RealtimeConversationRealtime(event) => match event.payload {
RealtimeEvent::SessionUpdated { .. } => {}
RealtimeEvent::InputAudioSpeechStarted(event) => {
let notification = ThreadRealtimeItemAddedNotification {
thread_id: conversation_id.to_string(),
item: serde_json::json!({
"type": "input_audio_buffer.speech_started",
"item_id": event.item_id,
}),
};
outgoing
.send_server_notification(ServerNotification::ThreadRealtimeItemAdded(
notification,
))
.await;
}
RealtimeEvent::InputTranscriptDelta(event) => {
let notification = ThreadRealtimeTranscriptDeltaNotification {
thread_id: conversation_id.to_string(),
role: "user".to_string(),
delta: event.delta,
};
outgoing
.send_server_notification(ServerNotification::ThreadRealtimeTranscriptDelta(
notification,
))
.await;
}
RealtimeEvent::InputTranscriptDone(event) => {
let notification = ThreadRealtimeTranscriptDoneNotification {
thread_id: conversation_id.to_string(),
role: "user".to_string(),
text: event.text,
};
outgoing
.send_server_notification(ServerNotification::ThreadRealtimeTranscriptDone(
notification,
))
.await;
}
RealtimeEvent::OutputTranscriptDelta(event) => {
let notification = ThreadRealtimeTranscriptDeltaNotification {
thread_id: conversation_id.to_string(),
role: "assistant".to_string(),
delta: event.delta,
};
outgoing
.send_server_notification(ServerNotification::ThreadRealtimeTranscriptDelta(
notification,
))
.await;
}
RealtimeEvent::OutputTranscriptDone(event) => {
let notification = ThreadRealtimeTranscriptDoneNotification {
thread_id: conversation_id.to_string(),
role: "assistant".to_string(),
text: event.text,
};
outgoing
.send_server_notification(ServerNotification::ThreadRealtimeTranscriptDone(
notification,
))
.await;
}
RealtimeEvent::AudioOut(audio) => {
let notification = ThreadRealtimeOutputAudioDeltaNotification {
thread_id: conversation_id.to_string(),
audio: audio.into(),
};
outgoing
.send_server_notification(ServerNotification::ThreadRealtimeOutputAudioDelta(
notification,
))
.await;
}
RealtimeEvent::ResponseCreated(_) => {}
RealtimeEvent::ResponseCancelled(event) => {
let notification = ThreadRealtimeItemAddedNotification {
thread_id: conversation_id.to_string(),
item: serde_json::json!({
"type": "response.cancelled",
"response_id": event.response_id,
}),
};
outgoing
.send_server_notification(ServerNotification::ThreadRealtimeItemAdded(
notification,
))
.await;
}
RealtimeEvent::ResponseDone(_) => {}
RealtimeEvent::ConversationItemAdded(item) => {
let notification = ThreadRealtimeItemAddedNotification {
thread_id: conversation_id.to_string(),
item,
};
outgoing
.send_server_notification(ServerNotification::ThreadRealtimeItemAdded(
notification,
))
.await;
}
RealtimeEvent::ConversationItemDone { .. } | RealtimeEvent::NoopRequested(_) => {}
RealtimeEvent::HandoffRequested(handoff) => {
let notification = ThreadRealtimeItemAddedNotification {
thread_id: conversation_id.to_string(),
item: serde_json::json!({
"type": "handoff_request",
"handoff_id": handoff.handoff_id,
"item_id": handoff.item_id,
"input_transcript": handoff.input_transcript,
"active_transcript": handoff.active_transcript,
}),
};
outgoing
.send_server_notification(ServerNotification::ThreadRealtimeItemAdded(
notification,
))
.await;
}
RealtimeEvent::Error(message) => {
let notification = ThreadRealtimeErrorNotification {
thread_id: conversation_id.to_string(),
message,
};
outgoing
.send_server_notification(ServerNotification::ThreadRealtimeError(notification))
.await;
}
},
EventMsg::RealtimeConversationClosed(event) => {
let notification = ThreadRealtimeClosedNotification {
thread_id: conversation_id.to_string(),
reason: event.reason,
};
outgoing
.send_server_notification(ServerNotification::ThreadRealtimeClosed(notification))
.await;
}
EventMsg::ApplyPatchApprovalRequest(event) => {
let permission_guard = thread_watch_manager
.note_permission_requested(&conversation_id.to_string())
.await;
let item_id = event.call_id.clone();
let params = FileChangeRequestApprovalParams {
thread_id: conversation_id.to_string(),
turn_id: event.turn_id.clone(),
item_id: item_id.clone(),
started_at_ms: event.started_at_ms,
reason: event.reason.clone(),
grant_root: event.grant_root.clone(),
};
let (pending_request_id, rx) = outgoing
.send_request(ServerRequestPayload::FileChangeRequestApproval(params))
.await;
tokio::spawn(async move {
on_file_change_request_approval_response(
item_id,
pending_request_id,
rx,
conversation,
thread_state.clone(),
permission_guard,
)
.await;
});
}
EventMsg::ExecApprovalRequest(ev) => {
let permission_guard = thread_watch_manager
.note_permission_requested(&conversation_id.to_string())
.await;
let available_decisions = ev
.effective_available_decisions()
.into_iter()
.map(CommandExecutionApprovalDecision::from)
.collect::<Vec<_>>();
let ExecApprovalRequestEvent {
call_id,
approval_id,
turn_id,
started_at_ms,
command,
cwd,
reason,
network_approval_context,
proposed_execpolicy_amendment,
proposed_network_policy_amendments,
additional_permissions,
parsed_cmd,
..
} = ev;
let command_actions = parsed_cmd
.iter()
.cloned()
.map(|parsed| V2ParsedCommand::from_core_with_cwd(parsed, &cwd))
.collect::<Vec<_>>();
let presentation = if let Some(network_approval_context) =
network_approval_context.map(V2NetworkApprovalContext::from)
{
CommandExecutionApprovalPresentation::Network(network_approval_context)
} else {
let command_string = shlex_join(&command);
let completion_item = CommandExecutionCompletionItem {
command: command_string,
cwd: cwd.clone(),
command_actions: command_actions.clone(),
};
CommandExecutionApprovalPresentation::Command(completion_item)
};
let (network_approval_context, command, cwd, command_actions, completion_item) =
match presentation {
CommandExecutionApprovalPresentation::Network(network_approval_context) => {
(Some(network_approval_context), None, None, None, None)
}
CommandExecutionApprovalPresentation::Command(completion_item) => (
None,
Some(completion_item.command.clone()),
Some(completion_item.cwd.clone()),
Some(completion_item.command_actions.clone()),
Some(completion_item),
),
};
if approval_id.is_none()
&& let Some(completion_item) = completion_item.as_ref()
{
start_command_execution_item(
&conversation_id,
event_turn_id.clone(),
call_id.clone(),
completion_item.command.clone(),
completion_item.cwd.clone(),
completion_item.command_actions.clone(),
CommandExecutionSource::Agent,
&outgoing,
&thread_state,
)
.await;
}
let proposed_execpolicy_amendment_v2 =
proposed_execpolicy_amendment.map(V2ExecPolicyAmendment::from);
let proposed_network_policy_amendments_v2 =
proposed_network_policy_amendments.map(|amendments| {
amendments
.into_iter()
.map(V2NetworkPolicyAmendment::from)
.collect()
});
let additional_permissions =
additional_permissions.map(V2AdditionalPermissionProfile::from);
let params = CommandExecutionRequestApprovalParams {
thread_id: conversation_id.to_string(),
turn_id: turn_id.clone(),
item_id: call_id.clone(),
started_at_ms,
approval_id: approval_id.clone(),
reason,
network_approval_context,
command,
cwd,
command_actions,
additional_permissions,
proposed_execpolicy_amendment: proposed_execpolicy_amendment_v2,
proposed_network_policy_amendments: proposed_network_policy_amendments_v2,
available_decisions: Some(available_decisions),
};
let (pending_request_id, rx) = outgoing
.send_request(ServerRequestPayload::CommandExecutionRequestApproval(
params,
))
.await;
tokio::spawn(async move {
on_command_execution_request_approval_response(
event_turn_id,
conversation_id,
approval_id,
call_id,
completion_item,
pending_request_id,
rx,
conversation,
outgoing,
thread_state.clone(),
permission_guard,
)
.await;
});
}
EventMsg::RequestUserInput(request) => {
let user_input_guard = thread_watch_manager
.note_user_input_requested(&conversation_id.to_string())
.await;
let questions = request
.questions
.into_iter()
.map(|question| ToolRequestUserInputQuestion {
id: question.id,
header: question.header,
question: question.question,
is_other: question.is_other,
is_secret: question.is_secret,
options: question.options.map(|options| {
options
.into_iter()
.map(|option| ToolRequestUserInputOption {
label: option.label,
description: option.description,
})
.collect()
}),
})
.collect();
let params = ToolRequestUserInputParams {
thread_id: conversation_id.to_string(),
turn_id: request.turn_id,
item_id: request.call_id,
questions,
};
let (pending_request_id, rx) = outgoing
.send_request(ServerRequestPayload::ToolRequestUserInput(params))
.await;
tokio::spawn(async move {
on_request_user_input_response(
event_turn_id,
pending_request_id,
rx,
conversation,
thread_state,
user_input_guard,
)
.await;
});
}
EventMsg::ElicitationRequest(request) => {
let permission_guard = thread_watch_manager
.note_permission_requested(&conversation_id.to_string())
.await;
let turn_id = match request.turn_id.clone() {
Some(turn_id) => Some(turn_id),
None => {
let state = thread_state.lock().await;
state.active_turn_snapshot().map(|turn| turn.id)
}
};
let server_name = request.server_name.clone();
let request_body = match request.request.try_into() {
Ok(request_body) => request_body,
Err(err) => {
error!(
error = %err,
server_name,
request_id = ?request.id,
"failed to parse typed MCP elicitation schema"
);
if let Err(err) = conversation
.submit(Op::ResolveElicitation {
server_name: request.server_name,
request_id: request.id,
decision: codex_protocol::approvals::ElicitationAction::Cancel,
content: None,
meta: None,
})
.await
{
error!("failed to submit ResolveElicitation: {err}");
}
return;
}
};
let params = McpServerElicitationRequestParams {
thread_id: conversation_id.to_string(),
turn_id,
server_name: request.server_name.clone(),
request: request_body,
};
let (pending_request_id, rx) = outgoing
.send_request(ServerRequestPayload::McpServerElicitationRequest(params))
.await;
tokio::spawn(async move {
on_mcp_server_elicitation_response(
request.server_name,
request.id,
pending_request_id,
rx,
conversation,
thread_state,
permission_guard,
)
.await;
});
}
EventMsg::RequestPermissions(request) => {
let permission_guard = thread_watch_manager
.note_permission_requested(&conversation_id.to_string())
.await;
let requested_permissions = request.permissions.clone();
let request_cwd = match request.cwd.clone() {
Some(cwd) => cwd,
None => conversation.config_snapshot().await.cwd,
};
let params = PermissionsRequestApprovalParams {
thread_id: conversation_id.to_string(),
turn_id: request.turn_id.clone(),
item_id: request.call_id.clone(),
started_at_ms: request.started_at_ms,
cwd: request_cwd.clone(),
reason: request.reason,
permissions: request.permissions.into(),
};
let (pending_request_id, rx) = outgoing
.send_request(ServerRequestPayload::PermissionsRequestApproval(params))
.await;
let pending_response = PendingRequestPermissionsResponse {
call_id: request.call_id,
requested_permissions,
request_cwd,
pending_request_id,
outgoing,
receiver: rx,
request_permissions_guard: permission_guard,
};
tokio::spawn(async move {
on_request_permissions_response(pending_response, conversation, thread_state).await;
});
}
EventMsg::DynamicToolCallRequest(request) => {
let call_id = request.call_id;
let turn_id = request.turn_id;
let namespace = request.namespace;
let tool = request.tool;
let arguments = request.arguments;
let item = ThreadItem::DynamicToolCall {
id: call_id.clone(),
namespace: namespace.clone(),
tool: tool.clone(),
arguments: arguments.clone(),
status: DynamicToolCallStatus::InProgress,
content_items: None,
success: None,
duration_ms: None,
};
let notification = ItemStartedNotification {
thread_id: conversation_id.to_string(),
turn_id: turn_id.clone(),
started_at_ms: request.started_at_ms,
item,
};
outgoing
.send_server_notification(ServerNotification::ItemStarted(notification))
.await;
let params = DynamicToolCallParams {
thread_id: conversation_id.to_string(),
turn_id: turn_id.clone(),
call_id: call_id.clone(),
namespace,
tool: tool.clone(),
arguments: arguments.clone(),
};
let (_pending_request_id, rx) = outgoing
.send_request(ServerRequestPayload::DynamicToolCall(params))
.await;
tokio::spawn(async move {
crate::dynamic_tools::on_call_response(call_id, rx, conversation).await;
});
}
EventMsg::McpToolCallBegin(_) | EventMsg::McpToolCallEnd(_) => {
// Deprecated MCP tool-call events are still fanned out for legacy clients.
// App-server v2 receives the canonical TurnItem::McpToolCall lifecycle instead.
}
msg @ (EventMsg::DynamicToolCallResponse(_)
| EventMsg::CollabAgentSpawnBegin(_)
| EventMsg::CollabAgentSpawnEnd(_)
| EventMsg::CollabAgentInteractionBegin(_)
| EventMsg::CollabAgentInteractionEnd(_)
| EventMsg::CollabWaitingBegin(_)
| EventMsg::CollabWaitingEnd(_)
| EventMsg::CollabCloseBegin(_)
| EventMsg::CollabResumeBegin(_)
| EventMsg::CollabResumeEnd(_)
| EventMsg::AgentMessageContentDelta(_)
| EventMsg::PlanDelta(_)
| EventMsg::ReasoningContentDelta(_)
| EventMsg::ReasoningRawContentDelta(_)
| EventMsg::AgentReasoningSectionBreak(_)) => {
let notification = item_event_to_server_notification(
msg,
&conversation_id.to_string(),
&event_turn_id,
);
outgoing.send_server_notification(notification).await;
}
EventMsg::CollabCloseEnd(end_event) => {
if thread_manager
.get_thread(end_event.receiver_thread_id)
.await
.is_err()
{
thread_watch_manager
.remove_thread(&end_event.receiver_thread_id.to_string())
.await;
}
let notification = item_event_to_server_notification(
EventMsg::CollabCloseEnd(end_event),
&conversation_id.to_string(),
&event_turn_id,
);
outgoing.send_server_notification(notification).await;
}
EventMsg::ContextCompacted(..) => {
// Core still fans out this deprecated event for legacy clients;
// v2 clients receive the canonical ContextCompaction item instead.
}
EventMsg::DeprecationNotice(event) => {
let notification = DeprecationNoticeNotification {
summary: event.summary,
details: event.details,
};
outgoing
.send_server_notification(ServerNotification::DeprecationNotice(notification))
.await;
}
EventMsg::TokenCount(token_count_event) => {
handle_token_count_event(conversation_id, event_turn_id, token_count_event, &outgoing)
.await;
}
EventMsg::Error(ev) => {
thread_watch_manager
.note_system_error(&conversation_id.to_string())
.await;
let message = ev.message.clone();
let codex_error_info = ev.codex_error_info.clone();
// If this error belongs to an in-flight `thread/rollback` request, fail that request
// (and clear pending state) so subsequent rollbacks are unblocked.
//
// Don't send a notification for this error.
if matches!(
codex_error_info,
Some(CoreCodexErrorInfo::ThreadRollbackFailed)
) {
return handle_thread_rollback_failed(
conversation_id,
message,
&thread_state,
&outgoing,
)
.await;
};
if !ev.affects_turn_status() {
return;
}
let turn_error = TurnError {
message: ev.message,
codex_error_info: ev.codex_error_info.map(V2CodexErrorInfo::from),
additional_details: None,
};
handle_error(conversation_id, turn_error.clone(), &thread_state).await;
outgoing
.send_server_notification(ServerNotification::Error(ErrorNotification {
error: turn_error.clone(),
will_retry: false,
thread_id: conversation_id.to_string(),
turn_id: event_turn_id.clone(),
}))
.await;
}
EventMsg::StreamError(ev) => {
// We don't need to update the turn summary store for stream errors as they are intermediate error states for retries,
// but we notify the client.
let turn_error = TurnError {
message: ev.message,
codex_error_info: ev.codex_error_info.map(V2CodexErrorInfo::from),
additional_details: ev.additional_details,
};
outgoing
.send_server_notification(ServerNotification::Error(ErrorNotification {
error: turn_error,
will_retry: true,
thread_id: conversation_id.to_string(),
turn_id: event_turn_id.clone(),
}))
.await;
}
EventMsg::ViewImageToolCall(_) => {}
EventMsg::EnteredReviewMode(review_request) => {
let review = review_request
.user_facing_hint
.unwrap_or_else(|| review_prompts::user_facing_hint(&review_request.target));
let item = ThreadItem::EnteredReviewMode {
id: event_turn_id.clone(),
review,
};
let started = ItemStartedNotification {
thread_id: conversation_id.to_string(),
turn_id: event_turn_id.clone(),
started_at_ms: now_unix_timestamp_ms(),
item: item.clone(),
};
outgoing
.send_server_notification(ServerNotification::ItemStarted(started))
.await;
let completed = ItemCompletedNotification {
thread_id: conversation_id.to_string(),
turn_id: event_turn_id.clone(),
completed_at_ms: now_unix_timestamp_ms(),
item,
};
outgoing
.send_server_notification(ServerNotification::ItemCompleted(completed))
.await;
}
msg @ (EventMsg::ItemStarted(_)
| EventMsg::ItemCompleted(_)
| EventMsg::PatchApplyUpdated(_)
| EventMsg::TerminalInteraction(_)) => {
let notification = item_event_to_server_notification(
msg,
&conversation_id.to_string(),
&event_turn_id,
);
outgoing.send_server_notification(notification).await;
}
EventMsg::HookStarted(event) => {
let notification = HookStartedNotification {
thread_id: conversation_id.to_string(),
turn_id: event.turn_id,
run: event.run.into(),
};
outgoing
.send_server_notification(ServerNotification::HookStarted(notification))
.await;
}
EventMsg::HookCompleted(event) => {
let notification = HookCompletedNotification {
thread_id: conversation_id.to_string(),
turn_id: event.turn_id,
run: event.run.into(),
};
outgoing
.send_server_notification(ServerNotification::HookCompleted(notification))
.await;
}
EventMsg::ExitedReviewMode(review_event) => {
let review = match review_event.review_output {
Some(output) => render_review_output_text(&output),
None => REVIEW_FALLBACK_MESSAGE.to_string(),
};
let item = ThreadItem::ExitedReviewMode {
id: event_turn_id.clone(),
review,
};
let started = ItemStartedNotification {
thread_id: conversation_id.to_string(),
turn_id: event_turn_id.clone(),
started_at_ms: now_unix_timestamp_ms(),
item: item.clone(),
};
outgoing
.send_server_notification(ServerNotification::ItemStarted(started))
.await;
let completed = ItemCompletedNotification {
thread_id: conversation_id.to_string(),
turn_id: event_turn_id.clone(),
completed_at_ms: now_unix_timestamp_ms(),
item,
};
outgoing
.send_server_notification(ServerNotification::ItemCompleted(completed))
.await;
}
EventMsg::RawResponseItem(raw_response_item_event) => {
maybe_emit_hook_prompt_item_completed(
conversation_id,
&event_turn_id,
&raw_response_item_event.item,
&outgoing,
)
.await;
maybe_emit_raw_response_item_completed(
conversation_id,
&event_turn_id,
raw_response_item_event.item,
&outgoing,
)
.await;
}
EventMsg::PatchApplyBegin(_) | EventMsg::PatchApplyEnd(_) => {
// Core still fans out these deprecated events for legacy clients;
// v2 clients receive the canonical FileChange item instead.
}
EventMsg::ExecCommandBegin(exec_command_begin_event) => {
if matches!(
exec_command_begin_event.source,
codex_protocol::protocol::ExecCommandSource::UnifiedExecInteraction
) {
// TerminalInteraction is the v2 surface for unified exec
// stdin/poll events. Suppress the legacy CommandExecution
// item so clients do not render the same wait twice.
return;
}
let item_id = exec_command_begin_event.call_id.clone();
let first_start = {
let mut state = thread_state.lock().await;
state
.turn_summary
.command_execution_started
.insert(item_id.clone())
};
if first_start {
let notification = item_event_to_server_notification(
EventMsg::ExecCommandBegin(exec_command_begin_event),
&conversation_id.to_string(),
&event_turn_id,
);
outgoing.send_server_notification(notification).await;
}
}
EventMsg::ExecCommandOutputDelta(exec_command_output_delta_event) => {
let notification = item_event_to_server_notification(
EventMsg::ExecCommandOutputDelta(exec_command_output_delta_event),
&conversation_id.to_string(),
&event_turn_id,
);
outgoing.send_server_notification(notification).await;
}
EventMsg::ExecCommandEnd(exec_command_end_event) => {
let call_id = exec_command_end_event.call_id.clone();
{
let mut state = thread_state.lock().await;
state
.turn_summary
.command_execution_started
.remove(&call_id);
}
if matches!(
exec_command_end_event.source,
codex_protocol::protocol::ExecCommandSource::UnifiedExecInteraction
) {
// The paired begin event is suppressed above; keep the
// completion out of v2 as well so no orphan legacy item is
// emitted for unified exec interactions.
return;
}
let notification = item_event_to_server_notification(
EventMsg::ExecCommandEnd(exec_command_end_event),
&conversation_id.to_string(),
&event_turn_id,
);
outgoing.send_server_notification(notification).await;
}
// If this is a TurnAborted, reply to any pending interrupt requests.
EventMsg::TurnAborted(turn_aborted_event) => {
// All per-thread requests are bound to a turn, so abort them.
outgoing.abort_pending_server_requests().await;
respond_to_pending_interrupts(&thread_state, &outgoing).await;
thread_watch_manager
.note_turn_interrupted(&conversation_id.to_string())
.await;
handle_turn_interrupted(
conversation_id,
event_turn_id,
turn_aborted_event,
&outgoing,
&thread_state,
)
.await;
}
EventMsg::ThreadRolledBack(_rollback_event) => {
let pending = {
let mut state = thread_state.lock().await;
state.pending_rollbacks.take()
};
if let Some(request_id) = pending {
let _thread_list_state_permit = match thread_list_state_permit.acquire().await {
Ok(permit) => permit,
Err(err) => {
outgoing
.send_error(
request_id,
internal_error(format!(
"failed to acquire thread list state permit: {err}"
)),
)
.await;
return;
}
};
let fallback_cwd = conversation.config_snapshot().await.cwd;
let stored_thread = match conversation
.read_thread(
/*include_archived*/ true, /*include_history*/ true,
)
.await
{
Ok(stored_thread) => stored_thread,
Err(err) => {
outgoing
.send_error(
request_id.clone(),
internal_error(format!(
"failed to read thread {conversation_id} after rollback: {err}"
)),
)
.await;
return;
}
};
let loaded_status = thread_watch_manager
.loaded_status_for_thread(&conversation_id.to_string())
.await;
let response = match thread_rollback_response_from_stored_thread(
stored_thread,
conversation.session_configured().session_id.to_string(),
fallback_model_provider.as_str(),
&fallback_cwd,
loaded_status,
) {
Ok(response) => response,
Err(err) => {
outgoing
.send_error(request_id.clone(), internal_error(err))
.await;
return;
}
};
outgoing.send_response(request_id, response).await;
}
}
EventMsg::ThreadGoalUpdated(thread_goal_event) => {
let notification = ThreadGoalUpdatedNotification {
thread_id: thread_goal_event.thread_id.to_string(),
turn_id: thread_goal_event.turn_id,
goal: thread_goal_event.goal.clone().into(),
};
outgoing
.send_global_server_notification(ServerNotification::ThreadGoalUpdated(
notification,
))
.await;
}
EventMsg::TurnDiff(turn_diff_event) => {
handle_turn_diff(conversation_id, &event_turn_id, turn_diff_event, &outgoing).await;
}
EventMsg::PlanUpdate(plan_update_event) => {
handle_turn_plan_update(
conversation_id,
&event_turn_id,
plan_update_event,
&outgoing,
)
.await;
}
EventMsg::ShutdownComplete => {
thread_watch_manager
.note_thread_shutdown(&conversation_id.to_string())
.await;
}
_ => {}
}
}
async fn handle_turn_diff(
conversation_id: ThreadId,
event_turn_id: &str,
turn_diff_event: TurnDiffEvent,
outgoing: &ThreadScopedOutgoingMessageSender,
) {
let notification = TurnDiffUpdatedNotification {
thread_id: conversation_id.to_string(),
turn_id: event_turn_id.to_string(),
diff: turn_diff_event.unified_diff,
};
outgoing
.send_server_notification(ServerNotification::TurnDiffUpdated(notification))
.await;
}
async fn handle_turn_plan_update(
conversation_id: ThreadId,
event_turn_id: &str,
plan_update_event: UpdatePlanArgs,
outgoing: &ThreadScopedOutgoingMessageSender,
) {
// `update_plan` is a todo/checklist tool; it is not related to plan-mode updates
let notification = TurnPlanUpdatedNotification {
thread_id: conversation_id.to_string(),
turn_id: event_turn_id.to_string(),
explanation: plan_update_event.explanation,
plan: plan_update_event
.plan
.into_iter()
.map(TurnPlanStep::from)
.collect(),
};
outgoing
.send_server_notification(ServerNotification::TurnPlanUpdated(notification))
.await;
}
struct TurnCompletionMetadata {
status: TurnStatus,
error: Option<TurnError>,
started_at: Option<i64>,
completed_at: Option<i64>,
duration_ms: Option<i64>,
}
async fn emit_turn_completed_with_status(
conversation_id: ThreadId,
event_turn_id: String,
turn_completion_metadata: TurnCompletionMetadata,
outgoing: &ThreadScopedOutgoingMessageSender,
) {
let notification = TurnCompletedNotification {
thread_id: conversation_id.to_string(),
turn: Turn {
id: event_turn_id,
items: vec![],
items_view: TurnItemsView::NotLoaded,
error: turn_completion_metadata.error,
status: turn_completion_metadata.status,
started_at: turn_completion_metadata.started_at,
completed_at: turn_completion_metadata.completed_at,
duration_ms: turn_completion_metadata.duration_ms,
},
};
outgoing
.send_server_notification(ServerNotification::TurnCompleted(notification))
.await;
}
#[allow(clippy::too_many_arguments)]
async fn start_command_execution_item(
conversation_id: &ThreadId,
turn_id: String,
item_id: String,
command: String,
cwd: AbsolutePathBuf,
command_actions: Vec<V2ParsedCommand>,
source: CommandExecutionSource,
outgoing: &ThreadScopedOutgoingMessageSender,
thread_state: &Arc<Mutex<ThreadState>>,
) -> bool {
let first_start = {
let mut state = thread_state.lock().await;
state
.turn_summary
.command_execution_started
.insert(item_id.clone())
};
if first_start {
let notification = ItemStartedNotification {
thread_id: conversation_id.to_string(),
turn_id,
started_at_ms: now_unix_timestamp_ms(),
item: ThreadItem::CommandExecution {
id: item_id,
command,
cwd,
process_id: None,
source,
status: CommandExecutionStatus::InProgress,
command_actions,
aggregated_output: None,
exit_code: None,
duration_ms: None,
},
};
outgoing
.send_server_notification(ServerNotification::ItemStarted(notification))
.await;
}
first_start
}
#[allow(clippy::too_many_arguments)]
async fn complete_command_execution_item(
conversation_id: &ThreadId,
turn_id: String,
item_id: String,
command: String,
cwd: AbsolutePathBuf,
process_id: Option<String>,
source: CommandExecutionSource,
command_actions: Vec<V2ParsedCommand>,
status: CommandExecutionStatus,
outgoing: &ThreadScopedOutgoingMessageSender,
thread_state: &Arc<Mutex<ThreadState>>,
) {
let should_emit = thread_state
.lock()
.await
.turn_summary
.command_execution_started
.remove(&item_id);
if !should_emit {
return;
}
let item = ThreadItem::CommandExecution {
id: item_id,
command,
cwd,
process_id,
source,
status,
command_actions,
aggregated_output: None,
exit_code: None,
duration_ms: None,
};
let notification = ItemCompletedNotification {
thread_id: conversation_id.to_string(),
turn_id,
completed_at_ms: now_unix_timestamp_ms(),
item,
};
outgoing
.send_server_notification(ServerNotification::ItemCompleted(notification))
.await;
}
async fn maybe_emit_raw_response_item_completed(
conversation_id: ThreadId,
turn_id: &str,
item: codex_protocol::models::ResponseItem,
outgoing: &ThreadScopedOutgoingMessageSender,
) {
let notification = RawResponseItemCompletedNotification {
thread_id: conversation_id.to_string(),
turn_id: turn_id.to_string(),
item,
};
outgoing
.send_server_notification(ServerNotification::RawResponseItemCompleted(notification))
.await;
}
pub(crate) async fn maybe_emit_hook_prompt_item_completed(
conversation_id: ThreadId,
turn_id: &str,
item: &codex_protocol::models::ResponseItem,
outgoing: &ThreadScopedOutgoingMessageSender,
) {
let codex_protocol::models::ResponseItem::Message {
role, content, id, ..
} = item
else {
return;
};
if role != "user" {
return;
}
let Some(hook_prompt) = parse_hook_prompt_message(id.as_ref(), content) else {
return;
};
let notification = ItemCompletedNotification {
thread_id: conversation_id.to_string(),
turn_id: turn_id.to_string(),
completed_at_ms: now_unix_timestamp_ms(),
item: ThreadItem::HookPrompt {
id: hook_prompt.id,
fragments: hook_prompt
.fragments
.into_iter()
.map(codex_app_server_protocol::HookPromptFragment::from)
.collect(),
},
};
outgoing
.send_server_notification(ServerNotification::ItemCompleted(notification))
.await;
}
async fn find_and_remove_turn_summary(
_conversation_id: ThreadId,
thread_state: &Arc<Mutex<ThreadState>>,
) -> TurnSummary {
let mut state = thread_state.lock().await;
std::mem::take(&mut state.turn_summary)
}
async fn handle_turn_complete(
conversation_id: ThreadId,
event_turn_id: String,
turn_complete_event: TurnCompleteEvent,
outgoing: &ThreadScopedOutgoingMessageSender,
thread_state: &Arc<Mutex<ThreadState>>,
) {
let turn_summary = find_and_remove_turn_summary(conversation_id, thread_state).await;
let (status, error) = match turn_summary.last_error {
Some(error) => (TurnStatus::Failed, Some(error)),
None => (TurnStatus::Completed, None),
};
emit_turn_completed_with_status(
conversation_id,
event_turn_id,
TurnCompletionMetadata {
status,
error,
started_at: turn_summary.started_at,
completed_at: turn_complete_event.completed_at,
duration_ms: turn_complete_event.duration_ms,
},
outgoing,
)
.await;
}
async fn handle_turn_interrupted(
conversation_id: ThreadId,
event_turn_id: String,
turn_aborted_event: TurnAbortedEvent,
outgoing: &ThreadScopedOutgoingMessageSender,
thread_state: &Arc<Mutex<ThreadState>>,
) {
let turn_summary = find_and_remove_turn_summary(conversation_id, thread_state).await;
emit_turn_completed_with_status(
conversation_id,
event_turn_id,
TurnCompletionMetadata {
status: TurnStatus::Interrupted,
error: None,
started_at: turn_summary.started_at,
completed_at: turn_aborted_event.completed_at,
duration_ms: turn_aborted_event.duration_ms,
},
outgoing,
)
.await;
}
async fn handle_thread_rollback_failed(
_conversation_id: ThreadId,
message: String,
thread_state: &Arc<Mutex<ThreadState>>,
outgoing: &ThreadScopedOutgoingMessageSender,
) {
let pending_rollback = thread_state.lock().await.pending_rollbacks.take();
if let Some(request_id) = pending_rollback {
outgoing
.send_error(request_id, invalid_request(message))
.await;
}
}
fn thread_rollback_response_from_stored_thread(
stored_thread: codex_thread_store::StoredThread,
session_id: String,
fallback_model_provider: &str,
fallback_cwd: &AbsolutePathBuf,
loaded_status: ThreadStatus,
) -> std::result::Result<ThreadRollbackResponse, String> {
let thread_id = stored_thread.thread_id;
let (mut thread, history) =
thread_from_stored_thread(stored_thread, fallback_model_provider, fallback_cwd);
thread.session_id = session_id;
let Some(history) = history else {
return Err(format!(
"thread {thread_id} did not include persisted history after rollback"
));
};
populate_thread_turns_from_history(&mut thread, &history.items, /*active_turn*/ None);
thread.status = loaded_status;
Ok(ThreadRollbackResponse { thread })
}
async fn respond_to_pending_interrupts(
thread_state: &Arc<Mutex<ThreadState>>,
outgoing: &ThreadScopedOutgoingMessageSender,
) {
let pending = {
let mut state = thread_state.lock().await;
std::mem::take(&mut state.pending_interrupts)
};
for request_id in pending {
outgoing
.send_response(request_id, TurnInterruptResponse {})
.await;
}
}
async fn handle_token_count_event(
conversation_id: ThreadId,
turn_id: String,
token_count_event: TokenCountEvent,
outgoing: &ThreadScopedOutgoingMessageSender,
) {
let TokenCountEvent { info, rate_limits } = token_count_event;
if let Some(token_usage) = info.map(ThreadTokenUsage::from) {
let notification = ThreadTokenUsageUpdatedNotification {
thread_id: conversation_id.to_string(),
turn_id,
token_usage,
};
outgoing
.send_server_notification(ServerNotification::ThreadTokenUsageUpdated(notification))
.await;
}
if let Some(rate_limits) = rate_limits {
outgoing
.send_server_notification(ServerNotification::AccountRateLimitsUpdated(
AccountRateLimitsUpdatedNotification {
rate_limits: rate_limits.into(),
},
))
.await;
}
}
async fn handle_error(
_conversation_id: ThreadId,
error: TurnError,
thread_state: &Arc<Mutex<ThreadState>>,
) {
let mut state = thread_state.lock().await;
state.turn_summary.last_error = Some(error);
}
async fn on_request_user_input_response(
event_turn_id: String,
pending_request_id: RequestId,
receiver: oneshot::Receiver<ClientRequestResult>,
conversation: Arc<CodexThread>,
thread_state: Arc<Mutex<ThreadState>>,
user_input_guard: ThreadWatchActiveGuard,
) {
let response = receiver.await;
resolve_server_request_on_thread_listener(&thread_state, pending_request_id).await;
drop(user_input_guard);
let value = match response {
Ok(Ok(value)) => value,
Ok(Err(err)) if is_turn_transition_server_request_error(&err) => return,
Ok(Err(err)) => {
error!("request failed with client error: {err:?}");
let empty = CoreRequestUserInputResponse {
answers: HashMap::new(),
};
if let Err(err) = conversation
.submit(Op::UserInputAnswer {
id: event_turn_id,
response: empty,
})
.await
{
error!("failed to submit UserInputAnswer: {err}");
}
return;
}
Err(err) => {
error!("request failed: {err:?}");
let empty = CoreRequestUserInputResponse {
answers: HashMap::new(),
};
if let Err(err) = conversation
.submit(Op::UserInputAnswer {
id: event_turn_id,
response: empty,
})
.await
{
error!("failed to submit UserInputAnswer: {err}");
}
return;
}
};
let response =
serde_json::from_value::<ToolRequestUserInputResponse>(value).unwrap_or_else(|err| {
error!("failed to deserialize ToolRequestUserInputResponse: {err}");
ToolRequestUserInputResponse {
answers: HashMap::new(),
}
});
let response = CoreRequestUserInputResponse {
answers: response
.answers
.into_iter()
.map(|(id, answer)| {
(
id,
CoreRequestUserInputAnswer {
answers: answer.answers,
},
)
})
.collect(),
};
if let Err(err) = conversation
.submit(Op::UserInputAnswer {
id: event_turn_id,
response,
})
.await
{
error!("failed to submit UserInputAnswer: {err}");
}
}
async fn on_mcp_server_elicitation_response(
server_name: String,
request_id: codex_protocol::mcp::RequestId,
pending_request_id: RequestId,
receiver: oneshot::Receiver<ClientRequestResult>,
conversation: Arc<CodexThread>,
thread_state: Arc<Mutex<ThreadState>>,
permission_guard: ThreadWatchActiveGuard,
) {
let response = receiver.await;
resolve_server_request_on_thread_listener(&thread_state, pending_request_id).await;
drop(permission_guard);
let response = mcp_server_elicitation_response_from_client_result(response);
if let Err(err) = conversation
.submit(Op::ResolveElicitation {
server_name,
request_id,
decision: response.action.to_core(),
content: response.content,
meta: response.meta,
})
.await
{
error!("failed to submit ResolveElicitation: {err}");
}
}
fn mcp_server_elicitation_response_from_client_result(
response: std::result::Result<ClientRequestResult, oneshot::error::RecvError>,
) -> McpServerElicitationRequestResponse {
match response {
Ok(Ok(value)) => serde_json::from_value::<McpServerElicitationRequestResponse>(value)
.unwrap_or_else(|err| {
error!("failed to deserialize McpServerElicitationRequestResponse: {err}");
McpServerElicitationRequestResponse {
action: McpServerElicitationAction::Decline,
content: None,
meta: None,
}
}),
Ok(Err(err)) if is_turn_transition_server_request_error(&err) => {
McpServerElicitationRequestResponse {
action: McpServerElicitationAction::Cancel,
content: None,
meta: None,
}
}
Ok(Err(err)) => {
error!("request failed with client error: {err:?}");
McpServerElicitationRequestResponse {
action: McpServerElicitationAction::Decline,
content: None,
meta: None,
}
}
Err(err) => {
error!("request failed: {err:?}");
McpServerElicitationRequestResponse {
action: McpServerElicitationAction::Decline,
content: None,
meta: None,
}
}
}
}
async fn on_request_permissions_response(
pending_response: PendingRequestPermissionsResponse,
conversation: Arc<CodexThread>,
thread_state: Arc<Mutex<ThreadState>>,
) {
let PendingRequestPermissionsResponse {
call_id,
requested_permissions,
request_cwd,
pending_request_id,
outgoing,
receiver,
request_permissions_guard,
} = pending_response;
let response = receiver.await;
resolve_server_request_on_thread_listener(&thread_state, pending_request_id.clone()).await;
drop(request_permissions_guard);
let Some(response) = request_permissions_response_from_client_result(
requested_permissions,
response,
request_cwd.as_path(),
) else {
return;
};
outgoing.track_effective_permissions_approval_response(pending_request_id, response.clone());
if let Err(err) = conversation
.submit(Op::RequestPermissionsResponse {
id: call_id,
response,
})
.await
{
error!("failed to submit RequestPermissionsResponse: {err}");
}
}
struct PendingRequestPermissionsResponse {
call_id: String,
requested_permissions: CoreRequestPermissionProfile,
request_cwd: AbsolutePathBuf,
pending_request_id: RequestId,
outgoing: ThreadScopedOutgoingMessageSender,
receiver: oneshot::Receiver<ClientRequestResult>,
request_permissions_guard: ThreadWatchActiveGuard,
}
fn request_permissions_response_from_client_result(
requested_permissions: CoreRequestPermissionProfile,
response: std::result::Result<ClientRequestResult, oneshot::error::RecvError>,
cwd: &std::path::Path,
) -> Option<CoreRequestPermissionsResponse> {
let value = match response {
Ok(Ok(value)) => value,
Ok(Err(err)) if is_turn_transition_server_request_error(&err) => return None,
Ok(Err(err)) => {
error!("request failed with client error: {err:?}");
return Some(CoreRequestPermissionsResponse {
permissions: Default::default(),
scope: CorePermissionGrantScope::Turn,
strict_auto_review: false,
});
}
Err(err) => {
error!("request failed: {err:?}");
return Some(CoreRequestPermissionsResponse {
permissions: Default::default(),
scope: CorePermissionGrantScope::Turn,
strict_auto_review: false,
});
}
};
let response = serde_json::from_value::<PermissionsRequestApprovalResponse>(value)
.unwrap_or_else(|err| {
error!("failed to deserialize PermissionsRequestApprovalResponse: {err}");
PermissionsRequestApprovalResponse {
permissions: V2GrantedPermissionProfile::default(),
scope: codex_app_server_protocol::PermissionGrantScope::Turn,
strict_auto_review: None,
}
});
let strict_auto_review = response.strict_auto_review.unwrap_or(false);
if strict_auto_review
&& matches!(
response.scope,
codex_app_server_protocol::PermissionGrantScope::Session
)
{
error!("strict auto review is only supported for turn-scoped permission grants");
return Some(CoreRequestPermissionsResponse {
permissions: Default::default(),
scope: CorePermissionGrantScope::Turn,
strict_auto_review: false,
});
}
let granted_permissions: CoreAdditionalPermissionProfile = response.permissions.into();
let permissions = if granted_permissions.is_empty() {
CoreRequestPermissionProfile::default()
} else {
intersect_permission_profiles(requested_permissions.into(), granted_permissions, cwd).into()
};
Some(CoreRequestPermissionsResponse {
permissions,
scope: response.scope.to_core(),
strict_auto_review,
})
}
const REVIEW_FALLBACK_MESSAGE: &str = "Reviewer failed to output a response.";
fn render_review_output_text(output: &ReviewOutputEvent) -> String {
let mut sections = Vec::new();
let explanation = output.overall_explanation.trim();
if !explanation.is_empty() {
sections.push(explanation.to_string());
}
if !output.findings.is_empty() {
let findings = format_review_findings_block(&output.findings, /*selection*/ None);
let trimmed = findings.trim();
if !trimmed.is_empty() {
sections.push(trimmed.to_string());
}
}
if sections.is_empty() {
REVIEW_FALLBACK_MESSAGE.to_string()
} else {
sections.join("\n\n")
}
}
fn map_file_change_approval_decision(decision: FileChangeApprovalDecision) -> ReviewDecision {
match decision {
FileChangeApprovalDecision::Accept => ReviewDecision::Approved,
FileChangeApprovalDecision::AcceptForSession => ReviewDecision::ApprovedForSession,
FileChangeApprovalDecision::Decline => ReviewDecision::Denied,
FileChangeApprovalDecision::Cancel => ReviewDecision::Abort,
}
}
#[allow(clippy::too_many_arguments)]
async fn on_file_change_request_approval_response(
item_id: String,
pending_request_id: RequestId,
receiver: oneshot::Receiver<ClientRequestResult>,
codex: Arc<CodexThread>,
thread_state: Arc<Mutex<ThreadState>>,
permission_guard: ThreadWatchActiveGuard,
) {
let response = receiver.await;
resolve_server_request_on_thread_listener(&thread_state, pending_request_id).await;
drop(permission_guard);
let decision = match response {
Ok(Ok(value)) => {
let response = serde_json::from_value::<FileChangeRequestApprovalResponse>(value)
.unwrap_or_else(|err| {
error!("failed to deserialize FileChangeRequestApprovalResponse: {err}");
FileChangeRequestApprovalResponse {
decision: FileChangeApprovalDecision::Decline,
}
});
map_file_change_approval_decision(response.decision)
}
Ok(Err(err)) if is_turn_transition_server_request_error(&err) => return,
Ok(Err(err)) => {
error!("request failed with client error: {err:?}");
ReviewDecision::Denied
}
Err(err) => {
error!("request failed: {err:?}");
ReviewDecision::Denied
}
};
if let Err(err) = codex
.submit(Op::PatchApproval {
id: item_id,
decision,
})
.await
{
error!("failed to submit PatchApproval: {err}");
}
}
#[allow(clippy::too_many_arguments)]
async fn on_command_execution_request_approval_response(
event_turn_id: String,
conversation_id: ThreadId,
approval_id: Option<String>,
item_id: String,
completion_item: Option<CommandExecutionCompletionItem>,
pending_request_id: RequestId,
receiver: oneshot::Receiver<ClientRequestResult>,
conversation: Arc<CodexThread>,
outgoing: ThreadScopedOutgoingMessageSender,
thread_state: Arc<Mutex<ThreadState>>,
permission_guard: ThreadWatchActiveGuard,
) {
let response = receiver.await;
resolve_server_request_on_thread_listener(&thread_state, pending_request_id).await;
drop(permission_guard);
let (decision, completion_status) = match response {
Ok(Ok(value)) => {
let response = serde_json::from_value::<CommandExecutionRequestApprovalResponse>(value)
.unwrap_or_else(|err| {
error!("failed to deserialize CommandExecutionRequestApprovalResponse: {err}");
CommandExecutionRequestApprovalResponse {
decision: CommandExecutionApprovalDecision::Decline,
}
});
let decision = response.decision;
let (decision, completion_status) = match decision {
CommandExecutionApprovalDecision::Accept => (ReviewDecision::Approved, None),
CommandExecutionApprovalDecision::AcceptForSession => {
(ReviewDecision::ApprovedForSession, None)
}
CommandExecutionApprovalDecision::AcceptWithExecpolicyAmendment {
execpolicy_amendment,
} => (
ReviewDecision::ApprovedExecpolicyAmendment {
proposed_execpolicy_amendment: execpolicy_amendment.into_core(),
},
None,
),
CommandExecutionApprovalDecision::ApplyNetworkPolicyAmendment {
network_policy_amendment,
} => {
let completion_status = match network_policy_amendment.action {
V2NetworkPolicyRuleAction::Allow => None,
V2NetworkPolicyRuleAction::Deny => Some(CommandExecutionStatus::Declined),
};
(
ReviewDecision::NetworkPolicyAmendment {
network_policy_amendment: network_policy_amendment.into_core(),
},
completion_status,
)
}
CommandExecutionApprovalDecision::Decline => (
ReviewDecision::Denied,
Some(CommandExecutionStatus::Declined),
),
CommandExecutionApprovalDecision::Cancel => (
ReviewDecision::Abort,
Some(CommandExecutionStatus::Declined),
),
};
(decision, completion_status)
}
Ok(Err(err)) if is_turn_transition_server_request_error(&err) => return,
Ok(Err(err)) => {
error!("request failed with client error: {err:?}");
(ReviewDecision::Denied, Some(CommandExecutionStatus::Failed))
}
Err(err) => {
error!("request failed: {err:?}");
(ReviewDecision::Denied, Some(CommandExecutionStatus::Failed))
}
};
let suppress_subcommand_completion_item = {
// For regular shell/unified_exec approvals, approval_id is null.
// For zsh-fork subcommand approvals, approval_id is present and
// item_id points to the parent command item.
if approval_id.is_some() {
let state = thread_state.lock().await;
state
.turn_summary
.command_execution_started
.contains(&item_id)
} else {
false
}
};
if let Some(status) = completion_status
&& !suppress_subcommand_completion_item
&& let Some(completion_item) = completion_item
{
complete_command_execution_item(
&conversation_id,
event_turn_id.clone(),
item_id.clone(),
completion_item.command,
completion_item.cwd,
/*process_id*/ None,
CommandExecutionSource::Agent,
completion_item.command_actions,
status,
&outgoing,
&thread_state,
)
.await;
}
if let Err(err) = conversation
.submit(Op::ExecApproval {
id: approval_id.unwrap_or_else(|| item_id.clone()),
turn_id: Some(event_turn_id),
decision,
})
.await
{
error!("failed to submit ExecApproval: {err}");
}
}
fn now_unix_timestamp_ms() -> i64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|duration| duration.as_millis() as i64)
.unwrap_or_default()
}
#[cfg(test)]
mod tests {
use super::*;
use crate::CHANNEL_CAPACITY;
use crate::outgoing_message::ConnectionId;
use crate::outgoing_message::OutgoingEnvelope;
use crate::outgoing_message::OutgoingMessage;
use crate::outgoing_message::OutgoingMessageSender;
use anyhow::Result;
use anyhow::anyhow;
use anyhow::bail;
use chrono::Utc;
use codex_app_server_protocol::AutoReviewDecisionSource;
use codex_app_server_protocol::GuardianApprovalReviewStatus;
use codex_app_server_protocol::JSONRPCErrorError;
use codex_app_server_protocol::TurnPlanStepStatus;
use codex_login::CodexAuth;
use codex_protocol::items::HookPromptFragment;
use codex_protocol::items::build_hook_prompt_message;
use codex_protocol::models::FileSystemPermissions as CoreFileSystemPermissions;
use codex_protocol::models::NetworkPermissions as CoreNetworkPermissions;
use codex_protocol::permissions::FileSystemAccessMode;
use codex_protocol::permissions::FileSystemPath;
use codex_protocol::permissions::FileSystemSandboxEntry;
use codex_protocol::permissions::FileSystemSpecialPath;
use codex_protocol::plan_tool::PlanItemArg;
use codex_protocol::plan_tool::StepStatus;
use codex_protocol::protocol::AgentMessageEvent;
use codex_protocol::protocol::AskForApproval;
use codex_protocol::protocol::CreditsSnapshot;
use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::GuardianAssessmentEvent;
use codex_protocol::protocol::GuardianAssessmentStatus;
use codex_protocol::protocol::RateLimitSnapshot;
use codex_protocol::protocol::RateLimitWindow;
use codex_protocol::protocol::RolloutItem;
use codex_protocol::protocol::SandboxPolicy;
use codex_protocol::protocol::SessionSource;
use codex_protocol::protocol::TokenUsage;
use codex_protocol::protocol::TokenUsageInfo;
use codex_protocol::protocol::UserMessageEvent;
use codex_thread_store::StoredThread;
use codex_thread_store::StoredThreadHistory;
use codex_utils_absolute_path::AbsolutePathBuf;
use codex_utils_absolute_path::test_support::PathBufExt;
use codex_utils_absolute_path::test_support::test_path_buf;
use core_test_support::load_default_config_for_test;
use pretty_assertions::assert_eq;
use serde_json::json;
use tempfile::TempDir;
use tokio::sync::Mutex;
use tokio::sync::mpsc;
fn new_thread_state() -> Arc<Mutex<ThreadState>> {
Arc::new(Mutex::new(ThreadState::default()))
}
const TEST_TURN_COMPLETED_AT: i64 = 1_716_000_456;
const TEST_TURN_DURATION_MS: i64 = 1_234;
async fn recv_broadcast_message(
rx: &mut mpsc::Receiver<OutgoingEnvelope>,
) -> Result<OutgoingMessage> {
let envelope = rx
.recv()
.await
.ok_or_else(|| anyhow!("should send one message"))?;
match envelope {
OutgoingEnvelope::Broadcast { message } => Ok(message),
OutgoingEnvelope::ToConnection { message, .. } => Ok(message),
}
}
#[test]
fn rollback_response_rebuilds_pathless_thread_from_stored_history() -> Result<()> {
let thread_id = ThreadId::from_string("00000000-0000-0000-0000-000000000789")?;
let created_at = Utc::now();
let history_items = vec![
RolloutItem::EventMsg(EventMsg::UserMessage(UserMessageEvent {
message: "before rollback".to_string(),
images: None,
local_images: Vec::new(),
text_elements: Vec::new(),
..Default::default()
})),
RolloutItem::EventMsg(EventMsg::AgentMessage(AgentMessageEvent {
message: "after rollback".to_string(),
phase: None,
memory_citation: None,
})),
];
let stored_thread = StoredThread {
thread_id,
rollout_path: None,
forked_from_id: None,
preview: "fallback preview".to_string(),
name: Some("Rollback thread".to_string()),
model_provider: "openai".to_string(),
model: None,
reasoning_effort: None,
created_at,
updated_at: created_at,
archived_at: None,
cwd: test_path_buf("/tmp").abs().into(),
cli_version: "0.0.0".to_string(),
source: SessionSource::Cli,
thread_source: None,
agent_nickname: None,
agent_role: None,
agent_path: None,
git_info: None,
approval_mode: AskForApproval::OnRequest,
sandbox_policy: SandboxPolicy::new_read_only_policy(),
token_usage: None,
first_user_message: Some("before rollback".to_string()),
history: Some(StoredThreadHistory {
thread_id,
items: history_items,
}),
};
let fallback_cwd = test_path_buf("/tmp").abs();
let response = thread_rollback_response_from_stored_thread(
stored_thread,
thread_id.to_string(),
"fallback-provider",
&fallback_cwd,
ThreadStatus::NotLoaded,
)
.expect("rollback response should rebuild from stored history");
assert_eq!(response.thread.id, thread_id.to_string());
assert_eq!(response.thread.path, None);
assert_eq!(response.thread.preview, "fallback preview");
assert_eq!(response.thread.name.as_deref(), Some("Rollback thread"));
assert_eq!(response.thread.status, ThreadStatus::NotLoaded);
assert_eq!(response.thread.turns.len(), 1);
assert_eq!(response.thread.turns[0].items.len(), 2);
Ok(())
}
fn turn_complete_event(turn_id: &str) -> TurnCompleteEvent {
TurnCompleteEvent {
turn_id: turn_id.to_string(),
last_agent_message: None,
completed_at: Some(TEST_TURN_COMPLETED_AT),
duration_ms: Some(TEST_TURN_DURATION_MS),
time_to_first_token_ms: None,
}
}
fn turn_aborted_event(turn_id: &str) -> TurnAbortedEvent {
TurnAbortedEvent {
turn_id: Some(turn_id.to_string()),
reason: codex_protocol::protocol::TurnAbortReason::Interrupted,
completed_at: Some(TEST_TURN_COMPLETED_AT),
duration_ms: Some(TEST_TURN_DURATION_MS),
}
}
fn command_execution_completion_item(command: &str) -> CommandExecutionCompletionItem {
CommandExecutionCompletionItem {
command: command.to_string(),
cwd: test_path_buf("/tmp").abs(),
command_actions: vec![V2ParsedCommand::Unknown {
command: command.to_string(),
}],
}
}
fn guardian_command_assessment(
id: &str,
turn_id: &str,
status: GuardianAssessmentStatus,
) -> GuardianAssessmentEvent {
let (risk_level, user_authorization, rationale) = match status {
GuardianAssessmentStatus::InProgress => (None, None, None),
GuardianAssessmentStatus::Approved => (
Some(codex_protocol::protocol::GuardianRiskLevel::Low),
Some(codex_protocol::protocol::GuardianUserAuthorization::High),
Some("looks safe".to_string()),
),
GuardianAssessmentStatus::Denied => (
Some(codex_protocol::protocol::GuardianRiskLevel::High),
Some(codex_protocol::protocol::GuardianUserAuthorization::Low),
Some("too risky".to_string()),
),
GuardianAssessmentStatus::TimedOut => {
(None, None, Some("review timed out".to_string()))
}
GuardianAssessmentStatus::Aborted => (None, None, None),
};
GuardianAssessmentEvent {
id: format!("review-{id}"),
target_item_id: Some(id.to_string()),
turn_id: turn_id.to_string(),
started_at_ms: 1_000,
completed_at_ms: (!matches!(status, GuardianAssessmentStatus::InProgress))
.then_some(1_042),
status,
risk_level,
user_authorization,
rationale,
decision_source: if matches!(status, GuardianAssessmentStatus::InProgress) {
None
} else {
Some(codex_protocol::protocol::GuardianAssessmentDecisionSource::Agent)
},
action: serde_json::from_value(json!({
"type": "command",
"source": "shell",
"command": format!("rm -f /tmp/{id}.sqlite"),
"cwd": test_path_buf("/tmp"),
}))
.expect("guardian action"),
}
}
struct GuardianAssessmentTestContext {
conversation_id: ThreadId,
conversation: Arc<CodexThread>,
thread_manager: Arc<ThreadManager>,
outgoing: ThreadScopedOutgoingMessageSender,
thread_state: Arc<Mutex<ThreadState>>,
thread_watch_manager: ThreadWatchManager,
}
impl GuardianAssessmentTestContext {
async fn apply_guardian_assessment_event(&self, assessment: GuardianAssessmentEvent) {
let event_turn_id = assessment.turn_id.clone();
apply_bespoke_event_handling(
Event {
id: event_turn_id,
msg: EventMsg::GuardianAssessment(assessment),
},
self.conversation_id,
self.conversation.clone(),
self.thread_manager.clone(),
self.outgoing.clone(),
self.thread_state.clone(),
self.thread_watch_manager.clone(),
Arc::new(tokio::sync::Semaphore::new(/*permits*/ 1)),
"test-provider".to_string(),
)
.await;
}
}
#[test]
fn guardian_assessment_started_uses_event_turn_id_fallback() {
let conversation_id = ThreadId::new();
let action = codex_protocol::protocol::GuardianAssessmentAction::Command {
source: codex_protocol::protocol::GuardianCommandSource::Shell,
command: "rm -rf /tmp/example.sqlite".to_string(),
cwd: test_path_buf("/tmp").abs(),
};
let notification = guardian_auto_approval_review_notification(
&conversation_id,
"turn-from-event",
&GuardianAssessmentEvent {
id: "review-1".to_string(),
target_item_id: Some("item-1".to_string()),
turn_id: String::new(),
started_at_ms: 1_000,
completed_at_ms: None,
status: codex_protocol::protocol::GuardianAssessmentStatus::InProgress,
risk_level: None,
user_authorization: None,
rationale: None,
decision_source: None,
action: action.clone(),
},
);
match notification {
ServerNotification::ItemGuardianApprovalReviewStarted(payload) => {
assert_eq!(payload.thread_id, conversation_id.to_string());
assert_eq!(payload.turn_id, "turn-from-event");
assert_eq!(payload.started_at_ms, 1_000);
assert_eq!(payload.review_id, "review-1");
assert_eq!(payload.target_item_id.as_deref(), Some("item-1"));
assert_eq!(
payload.review.status,
GuardianApprovalReviewStatus::InProgress
);
assert_eq!(payload.review.risk_level, None);
assert_eq!(payload.review.user_authorization, None);
assert_eq!(payload.review.rationale, None);
assert_eq!(payload.action, action.into());
}
other => panic!("unexpected notification: {other:?}"),
}
}
#[test]
fn guardian_assessment_completed_emits_review_payload() {
let conversation_id = ThreadId::new();
let action = codex_protocol::protocol::GuardianAssessmentAction::Command {
source: codex_protocol::protocol::GuardianCommandSource::Shell,
command: "rm -rf /tmp/example.sqlite".to_string(),
cwd: test_path_buf("/tmp").abs(),
};
let notification = guardian_auto_approval_review_notification(
&conversation_id,
"turn-from-event",
&GuardianAssessmentEvent {
id: "review-2".to_string(),
target_item_id: Some("item-2".to_string()),
turn_id: "turn-from-assessment".to_string(),
started_at_ms: 1_000,
completed_at_ms: Some(1_042),
status: codex_protocol::protocol::GuardianAssessmentStatus::Denied,
risk_level: Some(codex_protocol::protocol::GuardianRiskLevel::High),
user_authorization: Some(codex_protocol::protocol::GuardianUserAuthorization::Low),
rationale: Some("too risky".to_string()),
decision_source: Some(
codex_protocol::protocol::GuardianAssessmentDecisionSource::Agent,
),
action: action.clone(),
},
);
match notification {
ServerNotification::ItemGuardianApprovalReviewCompleted(payload) => {
assert_eq!(payload.thread_id, conversation_id.to_string());
assert_eq!(payload.turn_id, "turn-from-assessment");
assert_eq!(payload.started_at_ms, 1_000);
assert_eq!(payload.completed_at_ms, 1_042);
assert_eq!(payload.review_id, "review-2");
assert_eq!(payload.target_item_id.as_deref(), Some("item-2"));
assert_eq!(payload.decision_source, AutoReviewDecisionSource::Agent);
assert_eq!(payload.review.status, GuardianApprovalReviewStatus::Denied);
assert_eq!(
payload.review.risk_level,
Some(codex_app_server_protocol::GuardianRiskLevel::High)
);
assert_eq!(
payload.review.user_authorization,
Some(codex_app_server_protocol::GuardianUserAuthorization::Low)
);
assert_eq!(payload.review.rationale.as_deref(), Some("too risky"));
assert_eq!(payload.action, action.into());
}
other => panic!("unexpected notification: {other:?}"),
}
}
#[test]
fn guardian_assessment_aborted_emits_completed_review_payload() {
let conversation_id = ThreadId::new();
let action = codex_protocol::protocol::GuardianAssessmentAction::NetworkAccess {
target: "api.openai.com:443".to_string(),
host: "api.openai.com".to_string(),
protocol: codex_protocol::protocol::NetworkApprovalProtocol::Https,
port: 443,
};
let notification = guardian_auto_approval_review_notification(
&conversation_id,
"turn-from-event",
&GuardianAssessmentEvent {
id: "review-3".to_string(),
target_item_id: None,
turn_id: "turn-from-assessment".to_string(),
started_at_ms: 1_000,
completed_at_ms: Some(1_042),
status: codex_protocol::protocol::GuardianAssessmentStatus::Aborted,
risk_level: None,
user_authorization: None,
rationale: None,
decision_source: Some(
codex_protocol::protocol::GuardianAssessmentDecisionSource::Agent,
),
action: action.clone(),
},
);
match notification {
ServerNotification::ItemGuardianApprovalReviewCompleted(payload) => {
assert_eq!(payload.thread_id, conversation_id.to_string());
assert_eq!(payload.turn_id, "turn-from-assessment");
assert_eq!(payload.review_id, "review-3");
assert_eq!(payload.target_item_id, None);
assert_eq!(payload.decision_source, AutoReviewDecisionSource::Agent);
assert_eq!(payload.review.status, GuardianApprovalReviewStatus::Aborted);
assert_eq!(payload.review.risk_level, None);
assert_eq!(payload.review.user_authorization, None);
assert_eq!(payload.review.rationale, None);
assert_eq!(payload.action, action.into());
}
other => panic!("unexpected notification: {other:?}"),
}
}
#[tokio::test]
async fn command_execution_started_helper_emits_once() -> Result<()> {
let conversation_id = ThreadId::new();
let thread_state = new_thread_state();
let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY);
let outgoing = Arc::new(OutgoingMessageSender::new(
tx,
codex_analytics::AnalyticsEventsClient::disabled(),
));
let outgoing = ThreadScopedOutgoingMessageSender::new(
outgoing,
vec![ConnectionId(1)],
ThreadId::new(),
);
let completion_item = command_execution_completion_item("printf hi");
let first_start = start_command_execution_item(
&conversation_id,
"turn-1".to_string(),
"cmd-1".to_string(),
completion_item.command.clone(),
completion_item.cwd.clone(),
completion_item.command_actions.clone(),
CommandExecutionSource::Agent,
&outgoing,
&thread_state,
)
.await;
assert!(first_start);
let msg = recv_broadcast_message(&mut rx).await?;
match msg {
OutgoingMessage::AppServerNotification(ServerNotification::ItemStarted(payload)) => {
assert_eq!(payload.thread_id, conversation_id.to_string());
assert_eq!(payload.turn_id, "turn-1");
assert_eq!(
payload.item,
ThreadItem::CommandExecution {
id: "cmd-1".to_string(),
command: completion_item.command.clone(),
cwd: completion_item.cwd.clone(),
process_id: None,
source: CommandExecutionSource::Agent,
status: CommandExecutionStatus::InProgress,
command_actions: completion_item.command_actions.clone(),
aggregated_output: None,
exit_code: None,
duration_ms: None,
}
);
}
other => bail!("unexpected message: {other:?}"),
}
let second_start = start_command_execution_item(
&conversation_id,
"turn-1".to_string(),
"cmd-1".to_string(),
completion_item.command.clone(),
completion_item.cwd.clone(),
completion_item.command_actions.clone(),
CommandExecutionSource::Agent,
&outgoing,
&thread_state,
)
.await;
assert!(!second_start);
assert!(rx.try_recv().is_err(), "duplicate start should not emit");
Ok(())
}
#[tokio::test]
async fn complete_command_execution_item_emits_declined_once_for_pending_command() -> Result<()>
{
let conversation_id = ThreadId::new();
let thread_state = new_thread_state();
let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY);
let outgoing = Arc::new(OutgoingMessageSender::new(
tx,
codex_analytics::AnalyticsEventsClient::disabled(),
));
let outgoing = ThreadScopedOutgoingMessageSender::new(
outgoing,
vec![ConnectionId(1)],
ThreadId::new(),
);
let completion_item = command_execution_completion_item("printf hi");
start_command_execution_item(
&conversation_id,
"turn-1".to_string(),
"cmd-1".to_string(),
completion_item.command.clone(),
completion_item.cwd.clone(),
completion_item.command_actions.clone(),
CommandExecutionSource::Agent,
&outgoing,
&thread_state,
)
.await;
let _started = recv_broadcast_message(&mut rx).await?;
complete_command_execution_item(
&conversation_id,
"turn-1".to_string(),
"cmd-1".to_string(),
completion_item.command.clone(),
completion_item.cwd.clone(),
/*process_id*/ None,
CommandExecutionSource::Agent,
completion_item.command_actions.clone(),
CommandExecutionStatus::Declined,
&outgoing,
&thread_state,
)
.await;
let completed = recv_broadcast_message(&mut rx).await?;
match completed {
OutgoingMessage::AppServerNotification(ServerNotification::ItemCompleted(payload)) => {
let ThreadItem::CommandExecution { id, status, .. } = payload.item else {
bail!("expected command execution completion");
};
assert_eq!(id, "cmd-1");
assert_eq!(status, CommandExecutionStatus::Declined);
}
other => bail!("unexpected message: {other:?}"),
}
complete_command_execution_item(
&conversation_id,
"turn-1".to_string(),
"cmd-1".to_string(),
completion_item.command,
completion_item.cwd,
/*process_id*/ None,
CommandExecutionSource::Agent,
completion_item.command_actions,
CommandExecutionStatus::Declined,
&outgoing,
&thread_state,
)
.await;
assert!(
rx.try_recv().is_err(),
"completion should not emit after the pending item is cleared"
);
Ok(())
}
#[tokio::test]
async fn guardian_command_execution_notifications_wrap_review_lifecycle() -> Result<()> {
let codex_home = TempDir::new()?;
let config = load_default_config_for_test(&codex_home).await;
let thread_manager = Arc::new(
codex_core::test_support::thread_manager_with_models_provider_and_home(
CodexAuth::create_dummy_chatgpt_auth_for_testing(),
config.model_provider.clone(),
config.codex_home.to_path_buf(),
Arc::new(codex_exec_server::EnvironmentManager::default_for_tests()),
),
);
let codex_core::NewThread {
thread_id: conversation_id,
thread: conversation,
..
} = thread_manager.start_thread(config.clone()).await?;
let thread_state = new_thread_state();
let thread_watch_manager = ThreadWatchManager::new();
let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY);
let outgoing = Arc::new(OutgoingMessageSender::new(
tx,
codex_analytics::AnalyticsEventsClient::disabled(),
));
let outgoing = ThreadScopedOutgoingMessageSender::new(
outgoing,
vec![ConnectionId(1)],
conversation_id,
);
let guardian_context = GuardianAssessmentTestContext {
conversation_id,
conversation: conversation.clone(),
thread_manager: thread_manager.clone(),
outgoing: outgoing.clone(),
thread_state: thread_state.clone(),
thread_watch_manager: thread_watch_manager.clone(),
};
guardian_context
.apply_guardian_assessment_event(guardian_command_assessment(
"cmd-guardian-approved",
"turn-guardian-approved",
GuardianAssessmentStatus::InProgress,
))
.await;
let first = recv_broadcast_message(&mut rx).await?;
match first {
OutgoingMessage::AppServerNotification(ServerNotification::ItemStarted(payload)) => {
assert_eq!(payload.turn_id, "turn-guardian-approved");
let ThreadItem::CommandExecution { id, status, .. } = payload.item else {
bail!("expected command execution item");
};
assert_eq!(id, "cmd-guardian-approved");
assert_eq!(status, CommandExecutionStatus::InProgress);
}
other => bail!("unexpected message: {other:?}"),
}
let second = recv_broadcast_message(&mut rx).await?;
match second {
OutgoingMessage::AppServerNotification(
ServerNotification::ItemGuardianApprovalReviewStarted(payload),
) => {
assert_eq!(payload.review_id, "review-cmd-guardian-approved");
assert_eq!(
payload.target_item_id.as_deref(),
Some("cmd-guardian-approved")
);
assert_eq!(
payload.review.status,
GuardianApprovalReviewStatus::InProgress
);
}
other => bail!("unexpected message: {other:?}"),
}
guardian_context
.apply_guardian_assessment_event(guardian_command_assessment(
"cmd-guardian-approved",
"turn-guardian-approved",
GuardianAssessmentStatus::Approved,
))
.await;
let third = recv_broadcast_message(&mut rx).await?;
match third {
OutgoingMessage::AppServerNotification(
ServerNotification::ItemGuardianApprovalReviewCompleted(payload),
) => {
assert_eq!(payload.review_id, "review-cmd-guardian-approved");
assert_eq!(
payload.target_item_id.as_deref(),
Some("cmd-guardian-approved")
);
assert_eq!(payload.decision_source, AutoReviewDecisionSource::Agent);
assert_eq!(
payload.review.status,
GuardianApprovalReviewStatus::Approved
);
}
other => bail!("unexpected message: {other:?}"),
}
assert!(
rx.try_recv().is_err(),
"approved review should not complete the command item"
);
guardian_context
.apply_guardian_assessment_event(guardian_command_assessment(
"cmd-guardian-denied",
"turn-guardian-denied",
GuardianAssessmentStatus::InProgress,
))
.await;
let fourth = recv_broadcast_message(&mut rx).await?;
match fourth {
OutgoingMessage::AppServerNotification(ServerNotification::ItemStarted(payload)) => {
assert_eq!(payload.turn_id, "turn-guardian-denied");
let ThreadItem::CommandExecution { id, status, .. } = payload.item else {
bail!("expected command execution item");
};
assert_eq!(id, "cmd-guardian-denied");
assert_eq!(status, CommandExecutionStatus::InProgress);
}
other => bail!("unexpected message: {other:?}"),
}
let fifth = recv_broadcast_message(&mut rx).await?;
match fifth {
OutgoingMessage::AppServerNotification(
ServerNotification::ItemGuardianApprovalReviewStarted(payload),
) => {
assert_eq!(payload.review_id, "review-cmd-guardian-denied");
assert_eq!(
payload.target_item_id.as_deref(),
Some("cmd-guardian-denied")
);
assert_eq!(
payload.review.status,
GuardianApprovalReviewStatus::InProgress
);
}
other => bail!("unexpected message: {other:?}"),
}
guardian_context
.apply_guardian_assessment_event(guardian_command_assessment(
"cmd-guardian-denied",
"turn-guardian-denied",
GuardianAssessmentStatus::Denied,
))
.await;
let sixth = recv_broadcast_message(&mut rx).await?;
match sixth {
OutgoingMessage::AppServerNotification(
ServerNotification::ItemGuardianApprovalReviewCompleted(payload),
) => {
assert_eq!(payload.review_id, "review-cmd-guardian-denied");
assert_eq!(
payload.target_item_id.as_deref(),
Some("cmd-guardian-denied")
);
assert_eq!(payload.decision_source, AutoReviewDecisionSource::Agent);
assert_eq!(payload.review.status, GuardianApprovalReviewStatus::Denied);
}
other => bail!("unexpected message: {other:?}"),
}
let seventh = recv_broadcast_message(&mut rx).await?;
match seventh {
OutgoingMessage::AppServerNotification(ServerNotification::ItemCompleted(payload)) => {
let ThreadItem::CommandExecution { id, status, .. } = payload.item else {
bail!("expected command execution completion");
};
assert_eq!(id, "cmd-guardian-denied");
assert_eq!(status, CommandExecutionStatus::Declined);
}
other => bail!("unexpected message: {other:?}"),
}
let mut missing_target = guardian_command_assessment(
"cmd-guardian-missing-target",
"turn-guardian-missing-target",
GuardianAssessmentStatus::InProgress,
);
missing_target.target_item_id = None;
guardian_context
.apply_guardian_assessment_event(missing_target)
.await;
let eighth = recv_broadcast_message(&mut rx).await?;
match eighth {
OutgoingMessage::AppServerNotification(
ServerNotification::ItemGuardianApprovalReviewStarted(payload),
) => {
assert_eq!(payload.review_id, "review-cmd-guardian-missing-target");
assert_eq!(payload.target_item_id, None);
assert_eq!(
payload.review.status,
GuardianApprovalReviewStatus::InProgress
);
}
other => bail!("unexpected message: {other:?}"),
}
assert!(rx.try_recv().is_err(), "no extra messages expected");
conversation.shutdown_and_wait().await?;
Ok(())
}
#[test]
fn file_change_accept_for_session_maps_to_approved_for_session() {
let decision =
map_file_change_approval_decision(FileChangeApprovalDecision::AcceptForSession);
assert_eq!(decision, ReviewDecision::ApprovedForSession);
}
#[test]
fn mcp_server_elicitation_turn_transition_error_maps_to_cancel() {
let error = JSONRPCErrorError {
code: -1,
message: "client request resolved because the turn state was changed".to_string(),
data: Some(serde_json::json!({ "reason": "turnTransition" })),
};
let response = mcp_server_elicitation_response_from_client_result(Ok(Err(error)));
assert_eq!(
response,
McpServerElicitationRequestResponse {
action: McpServerElicitationAction::Cancel,
content: None,
meta: None,
}
);
}
#[test]
fn request_permissions_turn_transition_error_is_ignored() {
let error = JSONRPCErrorError {
code: -1,
message: "client request resolved because the turn state was changed".to_string(),
data: Some(serde_json::json!({ "reason": "turnTransition" })),
};
let response = request_permissions_response_from_client_result(
CoreRequestPermissionProfile::default(),
Ok(Err(error)),
std::env::current_dir().expect("current dir").as_path(),
);
assert_eq!(response, None);
}
#[test]
fn request_permissions_response_accepts_partial_network_and_file_system_grants() {
let input_path = if cfg!(target_os = "windows") {
r"C:\tmp\input"
} else {
"/tmp/input"
};
let output_path = if cfg!(target_os = "windows") {
r"C:\tmp\output"
} else {
"/tmp/output"
};
let ignored_path = if cfg!(target_os = "windows") {
r"C:\tmp\ignored"
} else {
"/tmp/ignored"
};
let absolute_path = |path: &str| {
AbsolutePathBuf::try_from(std::path::PathBuf::from(path)).expect("absolute path")
};
let requested_permissions = CoreRequestPermissionProfile {
network: Some(CoreNetworkPermissions {
enabled: Some(true),
}),
file_system: Some(CoreFileSystemPermissions::from_read_write_roots(
Some(vec![absolute_path(input_path)]),
Some(vec![absolute_path(output_path)]),
)),
};
let cases = vec![
(
serde_json::json!({}),
CoreRequestPermissionProfile::default(),
),
(
serde_json::json!({
"network": {
"enabled": true,
},
}),
CoreRequestPermissionProfile {
network: Some(CoreNetworkPermissions {
enabled: Some(true),
}),
..CoreRequestPermissionProfile::default()
},
),
(
serde_json::json!({
"fileSystem": {
"write": [output_path],
},
}),
CoreRequestPermissionProfile {
file_system: Some(CoreFileSystemPermissions::from_read_write_roots(
/*read*/ None,
Some(vec![absolute_path(output_path)]),
)),
..CoreRequestPermissionProfile::default()
},
),
(
serde_json::json!({
"fileSystem": {
"read": [input_path],
"write": [output_path, ignored_path],
},
"macos": {
"calendar": true,
},
}),
CoreRequestPermissionProfile {
file_system: Some(CoreFileSystemPermissions::from_read_write_roots(
Some(vec![absolute_path(input_path)]),
Some(vec![absolute_path(output_path)]),
)),
..CoreRequestPermissionProfile::default()
},
),
];
let cwd = std::env::current_dir().expect("current dir");
for (granted_permissions, expected_permissions) in cases {
let response = request_permissions_response_from_client_result(
requested_permissions.clone(),
Ok(Ok(serde_json::json!({
"permissions": granted_permissions,
}))),
cwd.as_path(),
)
.expect("response should be accepted");
assert_eq!(
response,
CoreRequestPermissionsResponse {
permissions: expected_permissions,
scope: CorePermissionGrantScope::Turn,
strict_auto_review: false,
}
);
}
}
#[test]
fn request_permissions_response_preserves_session_scope() {
let response = request_permissions_response_from_client_result(
CoreRequestPermissionProfile::default(),
Ok(Ok(serde_json::json!({
"scope": "session",
"permissions": {},
}))),
std::env::current_dir().expect("current dir").as_path(),
)
.expect("response should be accepted");
assert_eq!(
response,
CoreRequestPermissionsResponse {
permissions: CoreRequestPermissionProfile::default(),
scope: CorePermissionGrantScope::Session,
strict_auto_review: false,
}
);
}
#[test]
fn request_permissions_response_rejects_session_scoped_strict_auto_review() {
let response = request_permissions_response_from_client_result(
CoreRequestPermissionProfile::default(),
Ok(Ok(serde_json::json!({
"scope": "session",
"strictAutoReview": true,
"permissions": {
"network": {
"enabled": true,
},
},
}))),
std::env::current_dir().expect("current dir").as_path(),
)
.expect("response should be accepted");
assert_eq!(
response,
CoreRequestPermissionsResponse {
permissions: CoreRequestPermissionProfile::default(),
scope: CorePermissionGrantScope::Turn,
strict_auto_review: false,
}
);
}
#[test]
fn request_permissions_response_preserves_turn_scoped_strict_auto_review() {
let response = request_permissions_response_from_client_result(
CoreRequestPermissionProfile {
network: Some(codex_protocol::models::NetworkPermissions {
enabled: Some(true),
}),
..Default::default()
},
Ok(Ok(serde_json::json!({
"strictAutoReview": true,
"permissions": {
"network": {
"enabled": true,
},
},
}))),
std::env::current_dir().expect("current dir").as_path(),
)
.expect("response should be accepted");
assert_eq!(response.scope, CorePermissionGrantScope::Turn);
assert!(response.strict_auto_review);
}
#[test]
fn request_permissions_response_accepts_explicit_child_grant_for_requested_cwd_scope() {
let temp_dir = TempDir::new().expect("temp dir");
let cwd = AbsolutePathBuf::from_absolute_path(temp_dir.path()).expect("absolute cwd");
let child = cwd.join("child");
let requested_permissions = CoreRequestPermissionProfile {
file_system: Some(CoreFileSystemPermissions {
entries: vec![FileSystemSandboxEntry {
path: FileSystemPath::Special {
value: FileSystemSpecialPath::project_roots(/*subpath*/ None),
},
access: FileSystemAccessMode::Write,
}],
glob_scan_max_depth: None,
}),
..Default::default()
};
let response = request_permissions_response_from_client_result(
requested_permissions,
Ok(Ok(serde_json::json!({
"permissions": {
"fileSystem": {
"write": [child],
},
},
}))),
cwd.as_path(),
)
.expect("response should be accepted");
assert_eq!(
response.permissions,
CoreRequestPermissionProfile {
file_system: Some(CoreFileSystemPermissions::from_read_write_roots(
/*read*/ None,
Some(vec![child]),
)),
..Default::default()
}
);
}
#[test]
fn request_permissions_response_rejects_child_grant_outside_requested_cwd_scope() {
let temp_dir = TempDir::new().expect("temp dir");
let request_cwd = AbsolutePathBuf::from_absolute_path(temp_dir.path().join("request-cwd"))
.expect("absolute request cwd");
let later_cwd = AbsolutePathBuf::from_absolute_path(temp_dir.path().join("later-cwd"))
.expect("absolute later cwd");
let later_child = later_cwd.join("child");
let requested_permissions = CoreRequestPermissionProfile {
file_system: Some(CoreFileSystemPermissions {
entries: vec![FileSystemSandboxEntry {
path: FileSystemPath::Special {
value: FileSystemSpecialPath::project_roots(/*subpath*/ None),
},
access: FileSystemAccessMode::Write,
}],
glob_scan_max_depth: None,
}),
..Default::default()
};
let response = request_permissions_response_from_client_result(
requested_permissions,
Ok(Ok(serde_json::json!({
"permissions": {
"fileSystem": {
"write": [later_child],
},
},
}))),
request_cwd.as_path(),
)
.expect("response should be accepted");
assert_eq!(
response.permissions,
CoreRequestPermissionProfile::default()
);
}
#[test]
fn request_permissions_response_ignores_broader_cwd_grant_for_requested_child_path() {
let temp_dir = TempDir::new().expect("temp dir");
let cwd = AbsolutePathBuf::from_absolute_path(temp_dir.path()).expect("absolute cwd");
let child = cwd.join("child");
let requested_permissions = CoreRequestPermissionProfile {
file_system: Some(CoreFileSystemPermissions::from_read_write_roots(
/*read*/ None,
Some(vec![child]),
)),
..Default::default()
};
let response = request_permissions_response_from_client_result(
requested_permissions,
Ok(Ok(serde_json::json!({
"permissions": {
"fileSystem": {
"entries": [{
"path": {
"type": "special",
"value": {
"kind": "project_roots",
"subpath": null
}
},
"access": "write"
}],
},
},
}))),
cwd.as_path(),
)
.expect("response should be accepted");
assert_eq!(
response.permissions,
CoreRequestPermissionProfile::default()
);
}
#[tokio::test]
async fn test_handle_error_records_message() -> Result<()> {
let conversation_id = ThreadId::new();
let thread_state = new_thread_state();
handle_error(
conversation_id,
TurnError {
message: "boom".to_string(),
codex_error_info: Some(V2CodexErrorInfo::InternalServerError),
additional_details: None,
},
&thread_state,
)
.await;
let turn_summary = find_and_remove_turn_summary(conversation_id, &thread_state).await;
assert_eq!(
turn_summary.last_error,
Some(TurnError {
message: "boom".to_string(),
codex_error_info: Some(V2CodexErrorInfo::InternalServerError),
additional_details: None,
})
);
Ok(())
}
#[tokio::test]
async fn turn_started_omits_active_snapshot_items() -> Result<()> {
let codex_home = TempDir::new()?;
let config = load_default_config_for_test(&codex_home).await;
let thread_manager = Arc::new(
codex_core::test_support::thread_manager_with_models_provider_and_home(
CodexAuth::create_dummy_chatgpt_auth_for_testing(),
config.model_provider.clone(),
config.codex_home.to_path_buf(),
Arc::new(codex_exec_server::EnvironmentManager::default_for_tests()),
),
);
let codex_core::NewThread {
thread_id: conversation_id,
thread: conversation,
..
} = thread_manager.start_thread(config.clone()).await?;
let thread_state = new_thread_state();
{
let mut state = thread_state.lock().await;
state.track_current_turn_event(
"turn-1",
&EventMsg::TurnStarted(codex_protocol::protocol::TurnStartedEvent {
turn_id: "turn-1".to_string(),
started_at: Some(42),
model_context_window: None,
collaboration_mode_kind: Default::default(),
}),
);
state.track_current_turn_event(
"turn-1",
&EventMsg::UserMessage(codex_protocol::protocol::UserMessageEvent {
message: "already tracked".to_string(),
images: None,
local_images: Vec::new(),
text_elements: Vec::new(),
..Default::default()
}),
);
}
let thread_watch_manager = ThreadWatchManager::new();
let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY);
let outgoing = Arc::new(OutgoingMessageSender::new(
tx,
codex_analytics::AnalyticsEventsClient::disabled(),
));
let outgoing = ThreadScopedOutgoingMessageSender::new(
outgoing,
vec![ConnectionId(1)],
conversation_id,
);
apply_bespoke_event_handling(
Event {
id: "turn-1".to_string(),
msg: EventMsg::TurnStarted(codex_protocol::protocol::TurnStartedEvent {
turn_id: "turn-1".to_string(),
started_at: Some(42),
model_context_window: None,
collaboration_mode_kind: Default::default(),
}),
},
conversation_id,
conversation,
thread_manager,
outgoing,
thread_state,
thread_watch_manager,
Arc::new(tokio::sync::Semaphore::new(/*permits*/ 1)),
"test-provider".to_string(),
)
.await;
let msg = recv_broadcast_message(&mut rx).await?;
match msg {
OutgoingMessage::AppServerNotification(ServerNotification::TurnStarted(n)) => {
assert_eq!(n.turn.id, "turn-1");
assert_eq!(n.turn.items_view, TurnItemsView::NotLoaded);
assert!(n.turn.items.is_empty());
}
other => bail!("unexpected message: {other:?}"),
}
Ok(())
}
#[tokio::test]
async fn test_handle_turn_complete_emits_completed_without_error() -> Result<()> {
let conversation_id = ThreadId::new();
let event_turn_id = "complete1".to_string();
let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY);
let outgoing = Arc::new(OutgoingMessageSender::new(
tx,
codex_analytics::AnalyticsEventsClient::disabled(),
));
let outgoing = ThreadScopedOutgoingMessageSender::new(
outgoing,
vec![ConnectionId(1)],
ThreadId::new(),
);
let thread_state = new_thread_state();
{
let mut state = thread_state.lock().await;
state.track_current_turn_event(
&event_turn_id,
&EventMsg::TurnStarted(codex_protocol::protocol::TurnStartedEvent {
turn_id: event_turn_id.clone(),
started_at: Some(42),
model_context_window: None,
collaboration_mode_kind: Default::default(),
}),
);
state.track_current_turn_event(
&event_turn_id,
&EventMsg::TurnComplete(turn_complete_event(&event_turn_id)),
);
}
handle_turn_complete(
conversation_id,
event_turn_id.clone(),
turn_complete_event(&event_turn_id),
&outgoing,
&thread_state,
)
.await;
let msg = recv_broadcast_message(&mut rx).await?;
match msg {
OutgoingMessage::AppServerNotification(ServerNotification::TurnCompleted(n)) => {
assert_eq!(n.turn.id, event_turn_id);
assert_eq!(n.turn.status, TurnStatus::Completed);
assert_eq!(n.turn.items_view, TurnItemsView::NotLoaded);
assert!(n.turn.items.is_empty());
assert_eq!(n.turn.error, None);
assert_eq!(n.turn.started_at, Some(42));
assert_eq!(n.turn.completed_at, Some(TEST_TURN_COMPLETED_AT));
assert_eq!(n.turn.duration_ms, Some(TEST_TURN_DURATION_MS));
}
other => bail!("unexpected message: {other:?}"),
}
assert!(rx.try_recv().is_err(), "no extra messages expected");
Ok(())
}
#[tokio::test]
async fn test_handle_turn_interrupted_emits_interrupted_with_error() -> Result<()> {
let conversation_id = ThreadId::new();
let event_turn_id = "interrupt1".to_string();
let thread_state = new_thread_state();
handle_error(
conversation_id,
TurnError {
message: "oops".to_string(),
codex_error_info: None,
additional_details: None,
},
&thread_state,
)
.await;
let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY);
let outgoing = Arc::new(OutgoingMessageSender::new(
tx,
codex_analytics::AnalyticsEventsClient::disabled(),
));
let outgoing = ThreadScopedOutgoingMessageSender::new(
outgoing,
vec![ConnectionId(1)],
ThreadId::new(),
);
handle_turn_interrupted(
conversation_id,
event_turn_id.clone(),
turn_aborted_event(&event_turn_id),
&outgoing,
&thread_state,
)
.await;
let msg = recv_broadcast_message(&mut rx).await?;
match msg {
OutgoingMessage::AppServerNotification(ServerNotification::TurnCompleted(n)) => {
assert_eq!(n.turn.id, event_turn_id);
assert_eq!(n.turn.status, TurnStatus::Interrupted);
assert_eq!(n.turn.error, None);
assert_eq!(n.turn.completed_at, Some(TEST_TURN_COMPLETED_AT));
assert_eq!(n.turn.duration_ms, Some(TEST_TURN_DURATION_MS));
}
other => bail!("unexpected message: {other:?}"),
}
assert!(rx.try_recv().is_err(), "no extra messages expected");
Ok(())
}
#[tokio::test]
async fn test_handle_turn_complete_emits_failed_with_error() -> Result<()> {
let conversation_id = ThreadId::new();
let event_turn_id = "complete_err1".to_string();
let thread_state = new_thread_state();
handle_error(
conversation_id,
TurnError {
message: "bad".to_string(),
codex_error_info: Some(V2CodexErrorInfo::Other),
additional_details: None,
},
&thread_state,
)
.await;
let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY);
let outgoing = Arc::new(OutgoingMessageSender::new(
tx,
codex_analytics::AnalyticsEventsClient::disabled(),
));
let outgoing = ThreadScopedOutgoingMessageSender::new(
outgoing,
vec![ConnectionId(1)],
ThreadId::new(),
);
handle_turn_complete(
conversation_id,
event_turn_id.clone(),
turn_complete_event(&event_turn_id),
&outgoing,
&thread_state,
)
.await;
let msg = recv_broadcast_message(&mut rx).await?;
match msg {
OutgoingMessage::AppServerNotification(ServerNotification::TurnCompleted(n)) => {
assert_eq!(n.turn.id, event_turn_id);
assert_eq!(n.turn.status, TurnStatus::Failed);
assert_eq!(
n.turn.error,
Some(TurnError {
message: "bad".to_string(),
codex_error_info: Some(V2CodexErrorInfo::Other),
additional_details: None,
})
);
assert_eq!(n.turn.completed_at, Some(TEST_TURN_COMPLETED_AT));
assert_eq!(n.turn.duration_ms, Some(TEST_TURN_DURATION_MS));
}
other => bail!("unexpected message: {other:?}"),
}
assert!(rx.try_recv().is_err(), "no extra messages expected");
Ok(())
}
#[tokio::test]
async fn test_handle_turn_plan_update_emits_notification_for_v2() -> Result<()> {
let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY);
let outgoing = Arc::new(OutgoingMessageSender::new(
tx,
codex_analytics::AnalyticsEventsClient::disabled(),
));
let outgoing = ThreadScopedOutgoingMessageSender::new(
outgoing,
vec![ConnectionId(1)],
ThreadId::new(),
);
let update = UpdatePlanArgs {
explanation: Some("need plan".to_string()),
plan: vec![
PlanItemArg {
step: "first".to_string(),
status: StepStatus::Pending,
},
PlanItemArg {
step: "second".to_string(),
status: StepStatus::Completed,
},
],
};
let conversation_id = ThreadId::new();
handle_turn_plan_update(conversation_id, "turn-123", update, &outgoing).await;
let msg = recv_broadcast_message(&mut rx).await?;
match msg {
OutgoingMessage::AppServerNotification(ServerNotification::TurnPlanUpdated(n)) => {
assert_eq!(n.thread_id, conversation_id.to_string());
assert_eq!(n.turn_id, "turn-123");
assert_eq!(n.explanation.as_deref(), Some("need plan"));
assert_eq!(n.plan.len(), 2);
assert_eq!(n.plan[0].step, "first");
assert_eq!(n.plan[0].status, TurnPlanStepStatus::Pending);
assert_eq!(n.plan[1].step, "second");
assert_eq!(n.plan[1].status, TurnPlanStepStatus::Completed);
}
other => bail!("unexpected message: {other:?}"),
}
assert!(rx.try_recv().is_err(), "no extra messages expected");
Ok(())
}
#[tokio::test]
async fn test_handle_token_count_event_emits_usage_and_rate_limits() -> Result<()> {
let conversation_id = ThreadId::new();
let turn_id = "turn-123".to_string();
let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY);
let outgoing = Arc::new(OutgoingMessageSender::new(
tx,
codex_analytics::AnalyticsEventsClient::disabled(),
));
let outgoing = ThreadScopedOutgoingMessageSender::new(
outgoing,
vec![ConnectionId(1)],
ThreadId::new(),
);
let info = TokenUsageInfo {
total_token_usage: TokenUsage {
input_tokens: 100,
cached_input_tokens: 25,
output_tokens: 50,
reasoning_output_tokens: 9,
total_tokens: 200,
},
last_token_usage: TokenUsage {
input_tokens: 10,
cached_input_tokens: 5,
output_tokens: 7,
reasoning_output_tokens: 1,
total_tokens: 23,
},
model_context_window: Some(4096),
};
let rate_limits = RateLimitSnapshot {
limit_id: Some("codex".to_string()),
limit_name: None,
primary: Some(RateLimitWindow {
used_percent: 42.5,
window_minutes: Some(15),
resets_at: Some(1700000000),
}),
secondary: None,
credits: Some(CreditsSnapshot {
has_credits: true,
unlimited: false,
balance: Some("5".to_string()),
}),
plan_type: None,
rate_limit_reached_type: None,
};
handle_token_count_event(
conversation_id,
turn_id.clone(),
TokenCountEvent {
info: Some(info),
rate_limits: Some(rate_limits),
},
&outgoing,
)
.await;
let first = recv_broadcast_message(&mut rx).await?;
match first {
OutgoingMessage::AppServerNotification(
ServerNotification::ThreadTokenUsageUpdated(payload),
) => {
assert_eq!(payload.thread_id, conversation_id.to_string());
assert_eq!(payload.turn_id, turn_id);
let usage = payload.token_usage;
assert_eq!(usage.total.total_tokens, 200);
assert_eq!(usage.total.cached_input_tokens, 25);
assert_eq!(usage.last.output_tokens, 7);
assert_eq!(usage.model_context_window, Some(4096));
}
other => bail!("unexpected notification: {other:?}"),
}
let second = recv_broadcast_message(&mut rx).await?;
match second {
OutgoingMessage::AppServerNotification(
ServerNotification::AccountRateLimitsUpdated(payload),
) => {
assert_eq!(payload.rate_limits.limit_id.as_deref(), Some("codex"));
assert_eq!(payload.rate_limits.limit_name, None);
assert!(payload.rate_limits.primary.is_some());
assert!(payload.rate_limits.credits.is_some());
}
other => bail!("unexpected notification: {other:?}"),
}
Ok(())
}
#[tokio::test]
async fn test_handle_token_count_event_without_usage_info() -> Result<()> {
let conversation_id = ThreadId::new();
let turn_id = "turn-456".to_string();
let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY);
let outgoing = Arc::new(OutgoingMessageSender::new(
tx,
codex_analytics::AnalyticsEventsClient::disabled(),
));
let outgoing = ThreadScopedOutgoingMessageSender::new(
outgoing,
vec![ConnectionId(1)],
ThreadId::new(),
);
handle_token_count_event(
conversation_id,
turn_id.clone(),
TokenCountEvent {
info: None,
rate_limits: None,
},
&outgoing,
)
.await;
assert!(
rx.try_recv().is_err(),
"no notifications should be emitted when token usage info is absent"
);
Ok(())
}
#[tokio::test]
async fn test_handle_turn_complete_emits_error_multiple_turns() -> Result<()> {
// Conversation A will have two turns; Conversation B will have one turn.
let conversation_a = ThreadId::new();
let conversation_b = ThreadId::new();
let thread_state = new_thread_state();
let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY);
let outgoing = Arc::new(OutgoingMessageSender::new(
tx,
codex_analytics::AnalyticsEventsClient::disabled(),
));
let outgoing = ThreadScopedOutgoingMessageSender::new(
outgoing,
vec![ConnectionId(1)],
ThreadId::new(),
);
// Turn 1 on conversation A
let a_turn1 = "a_turn1".to_string();
handle_error(
conversation_a,
TurnError {
message: "a1".to_string(),
codex_error_info: Some(V2CodexErrorInfo::BadRequest),
additional_details: None,
},
&thread_state,
)
.await;
handle_turn_complete(
conversation_a,
a_turn1.clone(),
turn_complete_event(&a_turn1),
&outgoing,
&thread_state,
)
.await;
// Turn 1 on conversation B
let b_turn1 = "b_turn1".to_string();
handle_error(
conversation_b,
TurnError {
message: "b1".to_string(),
codex_error_info: None,
additional_details: None,
},
&thread_state,
)
.await;
handle_turn_complete(
conversation_b,
b_turn1.clone(),
turn_complete_event(&b_turn1),
&outgoing,
&thread_state,
)
.await;
// Turn 2 on conversation A
let a_turn2 = "a_turn2".to_string();
handle_turn_complete(
conversation_a,
a_turn2.clone(),
turn_complete_event(&a_turn2),
&outgoing,
&thread_state,
)
.await;
// Verify: A turn 1
let msg = recv_broadcast_message(&mut rx).await?;
match msg {
OutgoingMessage::AppServerNotification(ServerNotification::TurnCompleted(n)) => {
assert_eq!(n.turn.id, a_turn1);
assert_eq!(n.turn.status, TurnStatus::Failed);
assert_eq!(
n.turn.error,
Some(TurnError {
message: "a1".to_string(),
codex_error_info: Some(V2CodexErrorInfo::BadRequest),
additional_details: None,
})
);
}
other => bail!("unexpected message: {other:?}"),
}
// Verify: B turn 1
let msg = recv_broadcast_message(&mut rx).await?;
match msg {
OutgoingMessage::AppServerNotification(ServerNotification::TurnCompleted(n)) => {
assert_eq!(n.turn.id, b_turn1);
assert_eq!(n.turn.status, TurnStatus::Failed);
assert_eq!(
n.turn.error,
Some(TurnError {
message: "b1".to_string(),
codex_error_info: None,
additional_details: None,
})
);
}
other => bail!("unexpected message: {other:?}"),
}
// Verify: A turn 2
let msg = recv_broadcast_message(&mut rx).await?;
match msg {
OutgoingMessage::AppServerNotification(ServerNotification::TurnCompleted(n)) => {
assert_eq!(n.turn.id, a_turn2);
assert_eq!(n.turn.status, TurnStatus::Completed);
assert_eq!(n.turn.error, None);
}
other => bail!("unexpected message: {other:?}"),
}
assert!(rx.try_recv().is_err(), "no extra messages expected");
Ok(())
}
#[tokio::test]
async fn test_handle_turn_diff_emits_v2_notification() -> Result<()> {
let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY);
let outgoing = Arc::new(OutgoingMessageSender::new(
tx,
codex_analytics::AnalyticsEventsClient::disabled(),
));
let outgoing = ThreadScopedOutgoingMessageSender::new(
outgoing,
vec![ConnectionId(1)],
ThreadId::new(),
);
let unified_diff = "--- a\n+++ b\n".to_string();
let conversation_id = ThreadId::new();
handle_turn_diff(
conversation_id,
"turn-1",
TurnDiffEvent {
unified_diff: unified_diff.clone(),
},
&outgoing,
)
.await;
let msg = recv_broadcast_message(&mut rx).await?;
match msg {
OutgoingMessage::AppServerNotification(ServerNotification::TurnDiffUpdated(
notification,
)) => {
assert_eq!(notification.thread_id, conversation_id.to_string());
assert_eq!(notification.turn_id, "turn-1");
assert_eq!(notification.diff, unified_diff);
}
other => bail!("unexpected message: {other:?}"),
}
assert!(rx.try_recv().is_err(), "no extra messages expected");
Ok(())
}
#[tokio::test]
async fn test_hook_prompt_raw_response_emits_item_completed() -> Result<()> {
let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY);
let outgoing = Arc::new(OutgoingMessageSender::new(
tx,
codex_analytics::AnalyticsEventsClient::disabled(),
));
let conversation_id = ThreadId::new();
let outgoing = ThreadScopedOutgoingMessageSender::new(
outgoing,
vec![ConnectionId(1)],
conversation_id,
);
let item = build_hook_prompt_message(&[
HookPromptFragment::from_single_hook("Retry with tests.", "hook-run-1"),
HookPromptFragment::from_single_hook("Then summarize cleanly.", "hook-run-2"),
])
.expect("hook prompt message");
maybe_emit_hook_prompt_item_completed(conversation_id, "turn-1", &item, &outgoing).await;
let msg = recv_broadcast_message(&mut rx).await?;
match msg {
OutgoingMessage::AppServerNotification(ServerNotification::ItemCompleted(
notification,
)) => {
assert_eq!(notification.thread_id, conversation_id.to_string());
assert_eq!(notification.turn_id, "turn-1");
assert_eq!(
notification.item,
ThreadItem::HookPrompt {
id: notification.item.id().to_string(),
fragments: vec![
codex_app_server_protocol::HookPromptFragment {
text: "Retry with tests.".into(),
hook_run_id: "hook-run-1".into(),
},
codex_app_server_protocol::HookPromptFragment {
text: "Then summarize cleanly.".into(),
hook_run_id: "hook-run-2".into(),
},
],
}
);
}
other => bail!("unexpected message: {other:?}"),
}
assert!(rx.try_recv().is_err(), "no extra messages expected");
Ok(())
}
}