From d0ea93a867a72a4a9cf19d38de77bfece59c0f8f Mon Sep 17 00:00:00 2001 From: rcmerci Date: Tue, 21 Apr 2026 23:11:35 +0800 Subject: [PATCH] enhance(cli-e2e): add jobs-based(default=4) parallel case execution for non-sync cases --- cli-e2e/AGENTS.md | 3 + cli-e2e/bb.edn | 4 +- cli-e2e/src/logseq/cli/e2e/main.clj | 119 +++++++++-- cli-e2e/test/logseq/cli/e2e/main_test.clj | 238 +++++++++++++++++++++- 4 files changed, 349 insertions(+), 15 deletions(-) diff --git a/cli-e2e/AGENTS.md b/cli-e2e/AGENTS.md index f7bca804c7..c3ca7fa89b 100644 --- a/cli-e2e/AGENTS.md +++ b/cli-e2e/AGENTS.md @@ -13,11 +13,14 @@ Shell-first end-to-end tests for logseq CLI. - List declared non-sync case ids: `bb list-cases` - Run non-sync cases with build preflight unless `--skip-build` is provided: `bb test` - `bb test --help` for options + - Increase case-level parallelism with `--jobs N` (default: `4`), for example: `bb test --skip-build --jobs 4` + - Parallelism is case-scoped only; each case still runs setup/main/cleanup sequentially in the existing ephemeral shell model ## Run sync suite - List declared sync case ids: `bb list-sync-cases` - Run sync cases with build preflight unless `--skip-build` is provided: `bb test-sync` - `bb test-sync --help` for options + - `--jobs` is accepted for CLI consistency but sync cases still run serially - Configure sync E2EE password: `--e2ee-password ` (default: `11111`) - Run only sync MVP case: `bb test-sync --skip-build --case sync-upload-download-mvp` diff --git a/cli-e2e/bb.edn b/cli-e2e/bb.edn index 02b811cbfd..339d6512db 100644 --- a/cli-e2e/bb.edn +++ b/cli-e2e/bb.edn @@ -9,12 +9,14 @@ *command-line-args* {:alias {:i :include :h :help} + :spec {:jobs {:default 4}} :coerce {:include [] :help :boolean :dry-run :boolean :skip-build :boolean :verbose :boolean - :timings :boolean}})) + :timings :boolean + :jobs :long}})) unit-test {:doc "Run internal cli-e2e harness unit tests" diff --git a/cli-e2e/src/logseq/cli/e2e/main.clj b/cli-e2e/src/logseq/cli/e2e/main.clj index f4666d8f4c..d0a6a77d6f 100644 --- a/cli-e2e/src/logseq/cli/e2e/main.clj +++ b/cli-e2e/src/logseq/cli/e2e/main.clj @@ -7,7 +7,8 @@ [logseq.cli.e2e.report :as report] [logseq.cli.e2e.runner :as runner] [logseq.cli.e2e.shell :as shell] - [logseq.cli.e2e.sync-fixture :as sync-fixture])) + [logseq.cli.e2e.sync-fixture :as sync-fixture]) + (:import (java.util.concurrent Executors LinkedBlockingQueue TimeUnit))) (defn select-cases [cases {:keys [case include]}] @@ -25,6 +26,8 @@ (vec cases))) (def default-suite :non-sync) +(def default-jobs 1) +(def default-cli-jobs 4) (defn- suite-from-opts [opts] @@ -38,9 +41,18 @@ [started-at] (format "%.2fs" (/ (double (- (System/nanoTime) started-at)) 1000000000.0))) -(defn- run-selected-cases! - [selected-cases run-case run-command {:keys [on-case-start on-case-success on-case-failure detailed-log? timings?]}] - (let [total (count selected-cases)] +(defn- positive-jobs + [jobs] + (let [jobs (or jobs default-jobs)] + (when-not (and (integer? jobs) (pos? jobs)) + (throw (ex-info "--jobs must be a positive integer" + {:jobs jobs}))) + jobs)) + +(defn run-selected-cases! + [selected-cases run-case run-command {:keys [on-case-start on-case-success on-case-failure detailed-log? timings? jobs]}] + (let [total (count selected-cases) + _ (positive-jobs jobs)] (reduce (fn [acc [idx case]] (let [index (inc idx) started-at (System/nanoTime)] @@ -71,13 +83,71 @@ [] (map-indexed vector selected-cases)))) +(defn run-selected-cases-in-parallel! + [selected-cases run-case run-command {:keys [on-case-start on-case-success on-case-failure detailed-log? timings? jobs]}] + (let [total (count selected-cases) + jobs (positive-jobs jobs) + executor (Executors/newFixedThreadPool jobs) + completions (LinkedBlockingQueue.)] + (try + (doseq [[idx case] (map-indexed vector selected-cases)] + (let [index (inc idx)] + (when on-case-start + (on-case-start {:index index + :total total + :case case})) + (.submit executor + ^Runnable + (fn [] + (let [started-at (System/nanoTime)] + (.put completions + (try + (let [result (run-case case {:run-command run-command + :detailed-log? detailed-log? + :timings? timings?})] + {:index index + :total total + :case case + :result result + :elapsed-ms (elapsed-ms started-at)}) + (catch Exception error + {:index index + :total total + :case case + :error error + :elapsed-ms (elapsed-ms started-at)})))))))) + (loop [remaining total + results [] + failure nil] + (if (zero? remaining) + (do + (when failure + (throw failure)) + (->> results + (sort-by :index) + (mapv :result))) + (let [payload (.take completions)] + (if-let [error (:error payload)] + (do + (when on-case-failure + (on-case-failure payload)) + (recur (dec remaining) results (or failure error))) + (do + (when on-case-success + (on-case-success payload)) + (recur (dec remaining) (conj results payload) failure)))))) + (finally + (.shutdown executor) + (.awaitTermination executor 1 TimeUnit/MINUTES))))) + (defn run! - [{:keys [inventory cases skip-build run-command] + [{:keys [inventory cases skip-build run-command jobs] :as opts}] (let [run-command (or run-command shell/run!) run-case (or (:run-case opts) runner/run-case!) suite (suite-from-opts opts) sync-suite? (= suite :sync) + jobs (positive-jobs jobs) targeted-run? (or (:case opts) (seq (:include opts))) on-preflight-start (:on-preflight-start opts) on-preflight-complete (:on-preflight-complete opts) @@ -115,7 +185,13 @@ {:status :ok :cases selected-cases :coverage coverage-result - :results (run-selected-cases! selected-cases run-case* run-command opts)} + :results ((if (and (not sync-suite?) (> jobs 1)) + run-selected-cases-in-parallel! + run-selected-cases!) + selected-cases + run-case* + run-command + (assoc opts :jobs jobs))} (finally (when suite-context (sync-fixture/after-suite! suite-context {:run-command run-command}))))))))) @@ -234,6 +310,12 @@ cmd))) (flush))) +(defn- progress-prefix + [{:keys [parallel? index total]} symbol] + (if parallel? + (str symbol " ") + (format "[%d/%d] %s " index total symbol))) + (defn- print-test-help! [command-name suite] (let [sync-suite? (= suite :sync)] @@ -244,6 +326,7 @@ (println " --skip-build Skip build preflight steps") (println " -i, --include TAG Run only cases with matching tag (repeatable)") (println " --case ID Run a single case by id") + (println (format " --jobs N Run up to N non-sync cases in parallel (Default: %d)" default-cli-jobs)) (when sync-suite? (println " --e2ee-password VALUE E2EE password for sync commands (Default: 11111)")) (println " --verbose Enable verbose output") @@ -251,6 +334,7 @@ (println) (println "Examples:") (println (str " bb -f cli-e2e/bb.edn " command-name " --skip-build")) + (println (str " bb -f cli-e2e/bb.edn " command-name " --skip-build --jobs 4")) (println (str " bb -f cli-e2e/bb.edn " command-name " --skip-build -i smoke")) (println (str " bb -f cli-e2e/bb.edn " command-name " --skip-build --case global-help")) (when sync-suite? @@ -274,6 +358,7 @@ timings? (boolean (:timings opts)) all-step-timings (atom []) detailed-case-log? (some? (:case opts)) + parallel? (and (= suite :non-sync) (> (positive-jobs (:jobs opts)) 1)) base-run-command (or (:run-command opts) shell/run!) run-command (if detailed-case-log? (fn [{:keys [cmd phase step-index step-total] :as command-opts}] @@ -310,13 +395,19 @@ (println (format "==> Prepared %d case(s), starting execution" total)) (flush)) :on-case-start (fn [{:keys [index total case]}] - (println (format "[%d/%d] ▶ %s" index total (:id case))) + (println (str (progress-prefix {:parallel? parallel? + :index index + :total total} + "▶") + (:id case))) (flush)) :on-case-success (fn [{:keys [index total result elapsed-ms]}] (swap! passed inc) - (println (format "[%d/%d] ✓ %s (%dms)" - index - total + (println (format "%s%s (%dms)" + (progress-prefix {:parallel? parallel? + :index index + :total total} + "✓") (:id result) elapsed-ms)) (when timings? @@ -327,9 +418,11 @@ (flush)) :on-case-failure (fn [{:keys [index total case error elapsed-ms]}] (swap! failed inc) - (println (format "[%d/%d] ✗ %s (%dms)" - index - total + (println (format "%s%s (%dms)" + (progress-prefix {:parallel? parallel? + :index index + :total total} + "✗") (:id case) elapsed-ms)) (print-failure-details! error) diff --git a/cli-e2e/test/logseq/cli/e2e/main_test.clj b/cli-e2e/test/logseq/cli/e2e/main_test.clj index 08f25003de..5559d0eb1f 100644 --- a/cli-e2e/test/logseq/cli/e2e/main_test.clj +++ b/cli-e2e/test/logseq/cli/e2e/main_test.clj @@ -1,5 +1,6 @@ (ns logseq.cli.e2e.main-test - (:require [clojure.string :as string] + (:require [babashka.cli :as cli] + [clojure.string :as string] [clojure.test :refer [deftest is testing]] [logseq.cli.e2e.cleanup :as cleanup] [logseq.cli.e2e.main :as main] @@ -28,6 +29,216 @@ :graph {:commands ["graph create" "graph list"] :options ["--type" "--file"]}}}) +(def cli-parse-config + {:alias {:i :include + :h :help} + :spec {:jobs {:default 4}} + :coerce {:include [] + :help :boolean + :dry-run :boolean + :skip-build :boolean + :verbose :boolean + :timings :boolean + :jobs :long}}) + +(deftest cli-opts-parses-jobs-as-integer + (is (= 3 + (:jobs (cli/parse-opts ["--jobs" "3"] cli-parse-config))))) + +(deftest cli-opts-defaults-jobs-to-four + (is (= 4 + (:jobs (cli/parse-opts [] cli-parse-config))))) + +(deftest cli-opts-rejects-non-integer-jobs + (is (thrown-with-msg? + clojure.lang.ExceptionInfo + #"Coerce failure" + (cli/parse-opts ["--jobs" "nope"] cli-parse-config)))) + +(deftest run-rejects-jobs-less-than-one + (is (thrown-with-msg? + clojure.lang.ExceptionInfo + #"--jobs must be a positive integer" + (main/run! {:inventory complete-inventory + :cases sample-cases + :skip-build true + :jobs 0 + :run-command (fn [_] + {:exit 0 + :out "" + :err ""}) + :run-case (fn [case _opts] + {:id (:id case) + :status :ok})})))) + +(deftest run-non-sync-uses-parallel-runner-when-jobs-greater-than-one + (let [parallel-call (atom nil) + serial-called? (atom false)] + (with-redefs [main/run-selected-cases-in-parallel! (fn [selected-cases run-case run-command opts] + (reset! parallel-call {:case-ids (mapv :id selected-cases) + :run-case run-case + :run-command run-command + :jobs (:jobs opts)}) + [{:id "global-help" :status :ok} + {:id "graph-list" :status :ok}]) + main/run-selected-cases! (fn [& _] + (reset! serial-called? true) + (throw (ex-info "serial runner should not be used" {})))] + (let [result (main/run! {:inventory complete-inventory + :cases sample-cases + :include ["smoke"] + :skip-build true + :jobs 2 + :run-command (fn [_] + {:exit 0 + :out "" + :err ""}) + :run-case (fn [case _opts] + {:id (:id case) + :status :ok})})] + (is (= :ok (:status result))) + (is (= ["global-help" "graph-list"] (:case-ids @parallel-call))) + (is (= 2 (:jobs @parallel-call))) + (is (false? @serial-called?)))))) + +(deftest run-sync-suite-stays-serial-when-jobs-greater-than-one + (let [serial-call (atom nil) + parallel-called? (atom false) + sync-inventory {:excluded-command-prefixes ["login" "logout"] + :scopes {:sync {:commands ["sync upload" "sync status"] + :options []}}} + sync-cases [{:id "sync-upload" + :cmds ["node static/logseq-cli.js sync upload"] + :covers {:commands ["sync upload"]}} + {:id "sync-status" + :cmds ["node static/logseq-cli.js sync status"] + :covers {:commands ["sync status"]}}]] + (with-redefs [main/run-selected-cases! (fn [selected-cases run-case run-command opts] + (reset! serial-call {:case-ids (mapv :id selected-cases) + :run-case run-case + :run-command run-command + :jobs (:jobs opts)}) + (mapv (fn [case] + {:id (:id case) + :status :ok}) + selected-cases)) + main/run-selected-cases-in-parallel! (fn [& _] + (reset! parallel-called? true) + (throw (ex-info "parallel runner should not be used" {})))] + (let [result (main/run! {:suite :sync + :inventory sync-inventory + :cases sync-cases + :skip-build true + :jobs 4 + :run-command (fn [_] + {:exit 0 + :out "" + :err ""}) + :run-case (fn [case _opts] + {:id (:id case) + :status :ok})})] + (is (= :ok (:status result))) + (is (= ["sync-upload" "sync-status"] (:case-ids @serial-call))) + (is (= 4 (:jobs @serial-call))) + (is (false? @parallel-called?)))))) + +(deftest run-jobs-one-keeps-serial-runner + (let [serial-call (atom nil) + parallel-called? (atom false)] + (with-redefs [main/run-selected-cases! (fn [selected-cases run-case run-command opts] + (reset! serial-call {:case-ids (mapv :id selected-cases) + :run-case run-case + :run-command run-command + :jobs (:jobs opts)}) + (mapv (fn [case] + {:id (:id case) + :status :ok}) + selected-cases)) + main/run-selected-cases-in-parallel! (fn [& _] + (reset! parallel-called? true) + (throw (ex-info "parallel runner should not be used" {})))] + (let [result (main/run! {:inventory complete-inventory + :cases sample-cases + :include ["smoke"] + :skip-build true + :jobs 1 + :run-command (fn [_] + {:exit 0 + :out "" + :err ""}) + :run-case (fn [case _opts] + {:id (:id case) + :status :ok})})] + (is (= :ok (:status result))) + (is (= ["global-help" "graph-list"] (:case-ids @serial-call))) + (is (= 1 (:jobs @serial-call))) + (is (false? @parallel-called?)))))) + +(deftest parallel-runner-collects-completions-before-rethrowing-failure + (let [started (atom []) + finished (atom []) + started-latch (java.util.concurrent.CountDownLatch. 2) + release-success (promise) + cases [{:id "global-help"} + {:id "graph-list"}] + error (try + (main/run-selected-cases-in-parallel! + cases + (fn [case _opts] + (swap! started conj (:id case)) + (.countDown started-latch) + (.await started-latch) + (if (= "graph-list" (:id case)) + (do + (swap! finished conj [:failed (:id case)]) + (deliver release-success true) + (throw (ex-info "boom" {:id (:id case)}))) + (do + @release-success + (swap! finished conj [:ok (:id case)]) + {:id (:id case) + :status :ok}))) + (fn [_] + {:exit 0 + :out "" + :err ""}) + {:jobs 2}) + nil + (catch Exception ex + ex))] + (is (instance? Exception error)) + (is (= #{"global-help" "graph-list"} + (set @started))) + (is (= #{[:ok "global-help"] + [:failed "graph-list"]} + (set @finished))))) + +(deftest parallel-runner-elapsed-ms-starts-when-case-begins-running + (let [events (atom [])] + (main/run-selected-cases-in-parallel! + [{:id "slow-1"} + {:id "slow-2"} + {:id "fast-after-queue"}] + (fn [test-case _opts] + (clojure.core/case (:id test-case) + "slow-1" (Thread/sleep 180) + "slow-2" (Thread/sleep 180) + "fast-after-queue" (Thread/sleep 10)) + {:id (:id test-case) + :status :ok}) + (fn [_] + {:exit 0 + :out "" + :err ""}) + {:jobs 2 + :on-case-success (fn [payload] + (swap! events conj [(:id (:result payload)) (:elapsed-ms payload)]))}) + (let [elapsed-map (into {} @events)] + (is (< (get elapsed-map "fast-after-queue" 1000) 120) + "queued case should measure only its own execution time, not time spent waiting in the pool") + (is (>= (get elapsed-map "slow-1" 0) 150)) + (is (>= (get elapsed-map "slow-2" 0) 150))))) + (deftest select-cases-supports-case-id (is (= ["graph-create"] (mapv :id (main/select-cases sample-cases {:case "graph-create"}))))) @@ -283,6 +494,27 @@ (is (string/includes? output "[2/2] ✓ graph-list")) (is (string/includes? output "Summary: 2 passed, 0 failed")))) +(deftest test-parallel-output-omits-meaningless-index-prefixes + (let [output (with-out-str + (main/test! {:inventory complete-inventory + :cases sample-cases + :include ["smoke"] + :skip-build true + :jobs 2 + :run-command (fn [_] + {:exit 0 + :out "" + :err ""}) + :run-case (fn [case _opts] + (Thread/sleep (if (= "global-help" (:id case)) 25 5)) + {:id (:id case) + :status :ok})}))] + (is (string/includes? output "▶ global-help")) + (is (string/includes? output "✓ global-help")) + (is (string/includes? output "✓ graph-list")) + (is (not (string/includes? output "[1/2]"))) + (is (not (string/includes? output "[2/2]"))))) + (deftest test-timings-prints-step-details-and-slow-summary (let [output (with-out-str (main/test! {:inventory complete-inventory @@ -410,6 +642,8 @@ (is (string/includes? output "--skip-build")) (is (string/includes? output "--include TAG")) (is (string/includes? output "--case ID")) + (is (string/includes? output "--jobs N")) + (is (string/includes? output "Default: 4")) (is (string/includes? output "--timings")) (is (not (string/includes? output "--e2ee-password"))))) @@ -434,6 +668,8 @@ (is (string/includes? output "--skip-build")) (is (string/includes? output "--include TAG")) (is (string/includes? output "--case ID")) + (is (string/includes? output "--jobs N")) + (is (string/includes? output "Default: 4")) (is (string/includes? output "--timings")) (is (string/includes? output "--e2ee-password VALUE")) (is (string/includes? output "Default: 11111"))))