wire up APIs

This commit is contained in:
Owen Lin
2025-11-13 08:29:12 -08:00
parent 7c5cb8d4db
commit 6ff6d9f5f7
2 changed files with 187 additions and 45 deletions

View File

@@ -23,6 +23,9 @@ use codex_app_server_protocol::CancelLoginAccountParams;
use codex_app_server_protocol::CancelLoginAccountResponse;
use codex_app_server_protocol::CancelLoginChatGptResponse;
use codex_app_server_protocol::ClientRequest;
use codex_app_server_protocol::CommandExecutionRequest;
use codex_app_server_protocol::CommandExecutionRequestApprovalParams;
use codex_app_server_protocol::CommandExecutionRequestApprovalResponse;
use codex_app_server_protocol::ConversationGitInfo;
use codex_app_server_protocol::ConversationSummary;
use codex_app_server_protocol::ExecCommandApprovalParams;
@@ -31,6 +34,10 @@ use codex_app_server_protocol::ExecOneOffCommandParams;
use codex_app_server_protocol::ExecOneOffCommandResponse;
use codex_app_server_protocol::FeedbackUploadParams;
use codex_app_server_protocol::FeedbackUploadResponse;
use codex_app_server_protocol::FileChange as V2FileChange;
use codex_app_server_protocol::FileChangeRequest;
use codex_app_server_protocol::FileChangeRequestApprovalParams;
use codex_app_server_protocol::FileChangeRequestApprovalResponse;
use codex_app_server_protocol::FuzzyFileSearchParams;
use codex_app_server_protocol::FuzzyFileSearchResponse;
use codex_app_server_protocol::GetAccountParams;
@@ -62,12 +69,15 @@ use codex_app_server_protocol::ModelListParams;
use codex_app_server_protocol::ModelListResponse;
use codex_app_server_protocol::NewConversationParams;
use codex_app_server_protocol::NewConversationResponse;
use codex_app_server_protocol::ParsedCommand as V2ParsedCommand;
use codex_app_server_protocol::RemoveConversationListenerParams;
use codex_app_server_protocol::RemoveConversationSubscriptionResponse;
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::Result as JsonRpcResult;
use codex_app_server_protocol::ResumeConversationParams;
use codex_app_server_protocol::ResumeConversationResponse;
use codex_app_server_protocol::ReviewDecision as V2ReviewDecision;
use codex_app_server_protocol::SandboxCommandAssessment as V2SandboxCommandAssessment;
use codex_app_server_protocol::SandboxMode;
use codex_app_server_protocol::SendUserMessageParams;
use codex_app_server_protocol::SendUserMessageResponse;
@@ -1259,7 +1269,7 @@ impl CodexMessageProcessor {
// Auto-attach a conversation listener when starting a thread.
// Use the same behavior as the v1 API with experimental_raw_events=false.
if let Err(err) = self
.attach_conversation_listener(conversation_id, false)
.attach_conversation_listener(conversation_id, false, ApiVersion::V2)
.await
{
tracing::warn!(
@@ -1537,7 +1547,7 @@ impl CodexMessageProcessor {
}) => {
// Auto-attach a conversation listener when resuming a thread.
if let Err(err) = self
.attach_conversation_listener(conversation_id, false)
.attach_conversation_listener(conversation_id, false, ApiVersion::V2)
.await
{
tracing::warn!(
@@ -2390,7 +2400,7 @@ impl CodexMessageProcessor {
experimental_raw_events,
} = params;
match self
.attach_conversation_listener(conversation_id, experimental_raw_events)
.attach_conversation_listener(conversation_id, experimental_raw_events, ApiVersion::V1)
.await
{
Ok(subscription_id) => {
@@ -2431,6 +2441,7 @@ impl CodexMessageProcessor {
&mut self,
conversation_id: ConversationId,
experimental_raw_events: bool,
api_version: ApiVersion,
) -> Result<Uuid, JSONRPCErrorError> {
let conversation = match self
.conversation_manager
@@ -2454,6 +2465,7 @@ impl CodexMessageProcessor {
let outgoing_for_task = self.outgoing.clone();
let pending_interrupts = self.pending_interrupts.clone();
let api_version_for_task = api_version;
tokio::spawn(async move {
loop {
tokio::select! {
@@ -2509,6 +2521,7 @@ impl CodexMessageProcessor {
conversation.clone(),
outgoing_for_task.clone(),
pending_interrupts.clone(),
api_version_for_task,
)
.await;
}
@@ -2657,6 +2670,7 @@ async fn apply_bespoke_event_handling(
conversation: Arc<CodexConversation>,
outgoing: Arc<OutgoingMessageSender>,
pending_interrupts: PendingInterrupts,
api_version: ApiVersion,
) {
let Event { id: event_id, msg } = event;
match msg {
@@ -2665,22 +2679,47 @@ async fn apply_bespoke_event_handling(
changes,
reason,
grant_root,
}) => {
let params = ApplyPatchApprovalParams {
conversation_id,
call_id,
file_changes: changes,
reason,
grant_root,
};
let rx = outgoing
.send_request(ServerRequestPayload::ApplyPatchApproval(params))
.await;
// TODO(mbolin): Enforce a timeout so this task does not live indefinitely?
tokio::spawn(async move {
on_patch_approval_response(event_id, rx, conversation).await;
});
}
}) => match api_version {
ApiVersion::V1 => {
let params = ApplyPatchApprovalParams {
conversation_id,
call_id,
file_changes: changes,
reason,
grant_root,
};
let rx = outgoing
.send_request(ServerRequestPayload::ApplyPatchApproval(params))
.await;
tokio::spawn(async move {
on_patch_approval_response(event_id, rx, conversation).await;
});
}
ApiVersion::V2 => {
let item_id = call_id.clone();
let request = FileChangeRequest {
call_id,
file_changes: changes
.into_iter()
.map(|(path, change)| (path, V2FileChange::from(change)))
.collect(),
reason,
grant_root,
};
let params = FileChangeRequestApprovalParams {
thread_id: conversation_id.to_string(),
turn_id: event_id.clone(),
item_id,
request,
};
let rx = outgoing
.send_request(ServerRequestPayload::FileChangeRequestApproval(params))
.await;
tokio::spawn(async move {
on_file_change_request_approval_response(event_id, rx, conversation).await;
});
}
},
EventMsg::ExecApprovalRequest(ExecApprovalRequestEvent {
call_id,
command,
@@ -2688,25 +2727,51 @@ async fn apply_bespoke_event_handling(
reason,
risk,
parsed_cmd,
}) => {
let params = ExecCommandApprovalParams {
conversation_id,
call_id,
command,
cwd,
reason,
risk,
parsed_cmd,
};
let rx = outgoing
.send_request(ServerRequestPayload::ExecCommandApproval(params))
.await;
// TODO(mbolin): Enforce a timeout so this task does not live indefinitely?
tokio::spawn(async move {
on_exec_approval_response(event_id, rx, conversation).await;
});
}
}) => match api_version {
ApiVersion::V1 => {
let params = ExecCommandApprovalParams {
conversation_id,
call_id,
command,
cwd,
reason,
risk,
parsed_cmd,
};
let rx = outgoing
.send_request(ServerRequestPayload::ExecCommandApproval(params))
.await;
tokio::spawn(async move {
on_exec_approval_response(event_id, rx, conversation).await;
});
}
ApiVersion::V2 => {
let item_id = call_id.clone();
let request = CommandExecutionRequest {
call_id,
command,
cwd,
reason,
risk: risk.map(V2SandboxCommandAssessment::from),
parsed_cmd: parsed_cmd.into_iter().map(V2ParsedCommand::from).collect(),
};
let params = CommandExecutionRequestApprovalParams {
thread_id: conversation_id.to_string(),
turn_id: event_id.clone(),
item_id,
request,
};
let rx = outgoing
.send_request(ServerRequestPayload::CommandExecutionRequestApproval(
params,
))
.await;
tokio::spawn(async move {
on_command_execution_request_approval_response(event_id, rx, conversation)
.await;
});
}
},
EventMsg::TokenCount(token_count_event) => {
if let Some(rate_limits) = token_count_event.rate_limits {
outgoing
@@ -2851,6 +2916,83 @@ async fn on_exec_approval_response(
}
}
async fn on_file_change_request_approval_response(
event_id: String,
receiver: oneshot::Receiver<JsonRpcResult>,
codex: Arc<CodexConversation>,
) {
let response = receiver.await;
let value = match response {
Ok(value) => value,
Err(err) => {
error!("request failed: {err:?}");
if let Err(submit_err) = codex
.submit(Op::PatchApproval {
id: event_id.clone(),
decision: ReviewDecision::Denied,
})
.await
{
error!("failed to submit denied PatchApproval after request failure: {submit_err}");
}
return;
}
};
let response = serde_json::from_value::<FileChangeRequestApprovalResponse>(value)
.unwrap_or_else(|err| {
error!("failed to deserialize FileChangeRequestApprovalResponse: {err}");
FileChangeRequestApprovalResponse {
decision: V2ReviewDecision::Denied,
}
});
let decision = response.decision.to_core();
if let Err(err) = codex
.submit(Op::PatchApproval {
id: event_id,
decision,
})
.await
{
error!("failed to submit PatchApproval: {err}");
}
}
async fn on_command_execution_request_approval_response(
event_id: String,
receiver: oneshot::Receiver<JsonRpcResult>,
conversation: Arc<CodexConversation>,
) {
let response = receiver.await;
let value = match response {
Ok(value) => value,
Err(err) => {
error!("request failed: {err:?}");
return;
}
};
let response = serde_json::from_value::<CommandExecutionRequestApprovalResponse>(value)
.unwrap_or_else(|err| {
error!("failed to deserialize CommandExecutionRequestApprovalResponse: {err}");
CommandExecutionRequestApprovalResponse {
decision: V2ReviewDecision::Denied,
}
});
let decision = response.decision.to_core();
if let Err(err) = conversation
.submit(Op::ExecApproval {
id: event_id,
decision,
})
.await
{
error!("failed to submit ExecApproval: {err}");
}
}
async fn read_summary_from_rollout(
path: &Path,
fallback_provider: &str,

View File

@@ -7,6 +7,7 @@ use app_test_support::create_shell_sse_response;
use app_test_support::to_response;
use codex_app_server_protocol::JSONRPCNotification;
use codex_app_server_protocol::JSONRPCResponse;
use codex_app_server_protocol::ParsedCommand;
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::ServerRequest;
use codex_app_server_protocol::ThreadStartParams;
@@ -17,7 +18,6 @@ use codex_app_server_protocol::TurnStartedNotification;
use codex_app_server_protocol::UserInput as V2UserInput;
use codex_core::protocol_config_types::ReasoningEffort;
use codex_core::protocol_config_types::ReasoningSummary;
use codex_protocol::parse_command::ParsedCommand;
use codex_protocol::protocol::Event;
use codex_protocol::protocol::EventMsg;
use core_test_support::skip_if_no_network;
@@ -235,7 +235,7 @@ async fn turn_start_exec_approval_toggle_v2() -> Result<()> {
.await??;
let ThreadStartResponse { thread } = to_response::<ThreadStartResponse>(start_resp)?;
// turn/start — expect ExecCommandApproval request from server
// turn/start — expect CommandExecutionRequestApproval request from server
let first_turn_id = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id.clone(),
@@ -258,12 +258,12 @@ async fn turn_start_exec_approval_toggle_v2() -> Result<()> {
mcp.read_stream_until_request_message(),
)
.await??;
let ServerRequest::ExecCommandApproval { request_id, params } = server_req else {
panic!("expected ExecCommandApproval request");
let ServerRequest::CommandExecutionRequestApproval { request_id, params } = server_req else {
panic!("expected CommandExecutionRequestApproval request");
};
assert_eq!(params.call_id, "call1");
assert_eq!(params.request.call_id, "call1");
assert_eq!(
params.parsed_cmd,
params.request.parsed_cmd,
vec![ParsedCommand::Unknown {
cmd: "python3 -c 'print(42)'".to_string()
}]
@@ -302,7 +302,7 @@ async fn turn_start_exec_approval_toggle_v2() -> Result<()> {
)
.await??;
// Ensure we do NOT receive an ExecCommandApproval request before task completes
// Ensure we do NOT receive a CommandExecutionRequestApproval request before task completes
timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("codex/event/task_complete"),