enhance(cli-e2e): add jobs-based(default=4) parallel case execution for non-sync cases

This commit is contained in:
rcmerci
2026-04-21 23:11:35 +08:00
parent 3503570f8e
commit d0ea93a867
4 changed files with 349 additions and 15 deletions

View File

@@ -13,11 +13,14 @@ Shell-first end-to-end tests for logseq CLI.
- List declared non-sync case ids: `bb list-cases`
- Run non-sync cases with build preflight unless `--skip-build` is provided: `bb test`
- `bb test --help` for options
- Increase case-level parallelism with `--jobs N` (default: `4`), for example: `bb test --skip-build --jobs 4`
- Parallelism is case-scoped only; each case still runs setup/main/cleanup sequentially in the existing ephemeral shell model
## Run sync suite
- List declared sync case ids: `bb list-sync-cases`
- Run sync cases with build preflight unless `--skip-build` is provided: `bb test-sync`
- `bb test-sync --help` for options
- `--jobs` is accepted for CLI consistency but sync cases still run serially
- Configure sync E2EE password: `--e2ee-password <value>` (default: `11111`)
- Run only sync MVP case: `bb test-sync --skip-build --case sync-upload-download-mvp`

View File

@@ -9,12 +9,14 @@
*command-line-args*
{:alias {:i :include
:h :help}
:spec {:jobs {:default 4}}
:coerce {:include []
:help :boolean
:dry-run :boolean
:skip-build :boolean
:verbose :boolean
:timings :boolean}}))
:timings :boolean
:jobs :long}}))
unit-test
{:doc "Run internal cli-e2e harness unit tests"

View File

@@ -7,7 +7,8 @@
[logseq.cli.e2e.report :as report]
[logseq.cli.e2e.runner :as runner]
[logseq.cli.e2e.shell :as shell]
[logseq.cli.e2e.sync-fixture :as sync-fixture]))
[logseq.cli.e2e.sync-fixture :as sync-fixture])
(:import (java.util.concurrent Executors LinkedBlockingQueue TimeUnit)))
(defn select-cases
[cases {:keys [case include]}]
@@ -25,6 +26,8 @@
(vec cases)))
(def default-suite :non-sync)
(def default-jobs 1)
(def default-cli-jobs 4)
(defn- suite-from-opts
[opts]
@@ -38,9 +41,18 @@
[started-at]
(format "%.2fs" (/ (double (- (System/nanoTime) started-at)) 1000000000.0)))
(defn- run-selected-cases!
[selected-cases run-case run-command {:keys [on-case-start on-case-success on-case-failure detailed-log? timings?]}]
(let [total (count selected-cases)]
(defn- positive-jobs
[jobs]
(let [jobs (or jobs default-jobs)]
(when-not (and (integer? jobs) (pos? jobs))
(throw (ex-info "--jobs must be a positive integer"
{:jobs jobs})))
jobs))
(defn run-selected-cases!
[selected-cases run-case run-command {:keys [on-case-start on-case-success on-case-failure detailed-log? timings? jobs]}]
(let [total (count selected-cases)
_ (positive-jobs jobs)]
(reduce (fn [acc [idx case]]
(let [index (inc idx)
started-at (System/nanoTime)]
@@ -71,13 +83,71 @@
[]
(map-indexed vector selected-cases))))
(defn run-selected-cases-in-parallel!
[selected-cases run-case run-command {:keys [on-case-start on-case-success on-case-failure detailed-log? timings? jobs]}]
(let [total (count selected-cases)
jobs (positive-jobs jobs)
executor (Executors/newFixedThreadPool jobs)
completions (LinkedBlockingQueue.)]
(try
(doseq [[idx case] (map-indexed vector selected-cases)]
(let [index (inc idx)]
(when on-case-start
(on-case-start {:index index
:total total
:case case}))
(.submit executor
^Runnable
(fn []
(let [started-at (System/nanoTime)]
(.put completions
(try
(let [result (run-case case {:run-command run-command
:detailed-log? detailed-log?
:timings? timings?})]
{:index index
:total total
:case case
:result result
:elapsed-ms (elapsed-ms started-at)})
(catch Exception error
{:index index
:total total
:case case
:error error
:elapsed-ms (elapsed-ms started-at)}))))))))
(loop [remaining total
results []
failure nil]
(if (zero? remaining)
(do
(when failure
(throw failure))
(->> results
(sort-by :index)
(mapv :result)))
(let [payload (.take completions)]
(if-let [error (:error payload)]
(do
(when on-case-failure
(on-case-failure payload))
(recur (dec remaining) results (or failure error)))
(do
(when on-case-success
(on-case-success payload))
(recur (dec remaining) (conj results payload) failure))))))
(finally
(.shutdown executor)
(.awaitTermination executor 1 TimeUnit/MINUTES)))))
(defn run!
[{:keys [inventory cases skip-build run-command]
[{:keys [inventory cases skip-build run-command jobs]
:as opts}]
(let [run-command (or run-command shell/run!)
run-case (or (:run-case opts) runner/run-case!)
suite (suite-from-opts opts)
sync-suite? (= suite :sync)
jobs (positive-jobs jobs)
targeted-run? (or (:case opts) (seq (:include opts)))
on-preflight-start (:on-preflight-start opts)
on-preflight-complete (:on-preflight-complete opts)
@@ -115,7 +185,13 @@
{:status :ok
:cases selected-cases
:coverage coverage-result
:results (run-selected-cases! selected-cases run-case* run-command opts)}
:results ((if (and (not sync-suite?) (> jobs 1))
run-selected-cases-in-parallel!
run-selected-cases!)
selected-cases
run-case*
run-command
(assoc opts :jobs jobs))}
(finally
(when suite-context
(sync-fixture/after-suite! suite-context {:run-command run-command})))))))))
@@ -234,6 +310,12 @@
cmd)))
(flush)))
(defn- progress-prefix
[{:keys [parallel? index total]} symbol]
(if parallel?
(str symbol " ")
(format "[%d/%d] %s " index total symbol)))
(defn- print-test-help!
[command-name suite]
(let [sync-suite? (= suite :sync)]
@@ -244,6 +326,7 @@
(println " --skip-build Skip build preflight steps")
(println " -i, --include TAG Run only cases with matching tag (repeatable)")
(println " --case ID Run a single case by id")
(println (format " --jobs N Run up to N non-sync cases in parallel (Default: %d)" default-cli-jobs))
(when sync-suite?
(println " --e2ee-password VALUE E2EE password for sync commands (Default: 11111)"))
(println " --verbose Enable verbose output")
@@ -251,6 +334,7 @@
(println)
(println "Examples:")
(println (str " bb -f cli-e2e/bb.edn " command-name " --skip-build"))
(println (str " bb -f cli-e2e/bb.edn " command-name " --skip-build --jobs 4"))
(println (str " bb -f cli-e2e/bb.edn " command-name " --skip-build -i smoke"))
(println (str " bb -f cli-e2e/bb.edn " command-name " --skip-build --case global-help"))
(when sync-suite?
@@ -274,6 +358,7 @@
timings? (boolean (:timings opts))
all-step-timings (atom [])
detailed-case-log? (some? (:case opts))
parallel? (and (= suite :non-sync) (> (positive-jobs (:jobs opts)) 1))
base-run-command (or (:run-command opts) shell/run!)
run-command (if detailed-case-log?
(fn [{:keys [cmd phase step-index step-total] :as command-opts}]
@@ -310,13 +395,19 @@
(println (format "==> Prepared %d case(s), starting execution" total))
(flush))
:on-case-start (fn [{:keys [index total case]}]
(println (format "[%d/%d] ▶ %s" index total (:id case)))
(println (str (progress-prefix {:parallel? parallel?
:index index
:total total}
"▶")
(:id case)))
(flush))
:on-case-success (fn [{:keys [index total result elapsed-ms]}]
(swap! passed inc)
(println (format "[%d/%d] ✓ %s (%dms)"
index
total
(println (format "%s%s (%dms)"
(progress-prefix {:parallel? parallel?
:index index
:total total}
"✓")
(:id result)
elapsed-ms))
(when timings?
@@ -327,9 +418,11 @@
(flush))
:on-case-failure (fn [{:keys [index total case error elapsed-ms]}]
(swap! failed inc)
(println (format "[%d/%d] ✗ %s (%dms)"
index
total
(println (format "%s%s (%dms)"
(progress-prefix {:parallel? parallel?
:index index
:total total}
"✗")
(:id case)
elapsed-ms))
(print-failure-details! error)

View File

@@ -1,5 +1,6 @@
(ns logseq.cli.e2e.main-test
(:require [clojure.string :as string]
(:require [babashka.cli :as cli]
[clojure.string :as string]
[clojure.test :refer [deftest is testing]]
[logseq.cli.e2e.cleanup :as cleanup]
[logseq.cli.e2e.main :as main]
@@ -28,6 +29,216 @@
:graph {:commands ["graph create" "graph list"]
:options ["--type" "--file"]}}})
(def cli-parse-config
{:alias {:i :include
:h :help}
:spec {:jobs {:default 4}}
:coerce {:include []
:help :boolean
:dry-run :boolean
:skip-build :boolean
:verbose :boolean
:timings :boolean
:jobs :long}})
(deftest cli-opts-parses-jobs-as-integer
(is (= 3
(:jobs (cli/parse-opts ["--jobs" "3"] cli-parse-config)))))
(deftest cli-opts-defaults-jobs-to-four
(is (= 4
(:jobs (cli/parse-opts [] cli-parse-config)))))
(deftest cli-opts-rejects-non-integer-jobs
(is (thrown-with-msg?
clojure.lang.ExceptionInfo
#"Coerce failure"
(cli/parse-opts ["--jobs" "nope"] cli-parse-config))))
(deftest run-rejects-jobs-less-than-one
(is (thrown-with-msg?
clojure.lang.ExceptionInfo
#"--jobs must be a positive integer"
(main/run! {:inventory complete-inventory
:cases sample-cases
:skip-build true
:jobs 0
:run-command (fn [_]
{:exit 0
:out ""
:err ""})
:run-case (fn [case _opts]
{:id (:id case)
:status :ok})}))))
(deftest run-non-sync-uses-parallel-runner-when-jobs-greater-than-one
(let [parallel-call (atom nil)
serial-called? (atom false)]
(with-redefs [main/run-selected-cases-in-parallel! (fn [selected-cases run-case run-command opts]
(reset! parallel-call {:case-ids (mapv :id selected-cases)
:run-case run-case
:run-command run-command
:jobs (:jobs opts)})
[{:id "global-help" :status :ok}
{:id "graph-list" :status :ok}])
main/run-selected-cases! (fn [& _]
(reset! serial-called? true)
(throw (ex-info "serial runner should not be used" {})))]
(let [result (main/run! {:inventory complete-inventory
:cases sample-cases
:include ["smoke"]
:skip-build true
:jobs 2
:run-command (fn [_]
{:exit 0
:out ""
:err ""})
:run-case (fn [case _opts]
{:id (:id case)
:status :ok})})]
(is (= :ok (:status result)))
(is (= ["global-help" "graph-list"] (:case-ids @parallel-call)))
(is (= 2 (:jobs @parallel-call)))
(is (false? @serial-called?))))))
(deftest run-sync-suite-stays-serial-when-jobs-greater-than-one
(let [serial-call (atom nil)
parallel-called? (atom false)
sync-inventory {:excluded-command-prefixes ["login" "logout"]
:scopes {:sync {:commands ["sync upload" "sync status"]
:options []}}}
sync-cases [{:id "sync-upload"
:cmds ["node static/logseq-cli.js sync upload"]
:covers {:commands ["sync upload"]}}
{:id "sync-status"
:cmds ["node static/logseq-cli.js sync status"]
:covers {:commands ["sync status"]}}]]
(with-redefs [main/run-selected-cases! (fn [selected-cases run-case run-command opts]
(reset! serial-call {:case-ids (mapv :id selected-cases)
:run-case run-case
:run-command run-command
:jobs (:jobs opts)})
(mapv (fn [case]
{:id (:id case)
:status :ok})
selected-cases))
main/run-selected-cases-in-parallel! (fn [& _]
(reset! parallel-called? true)
(throw (ex-info "parallel runner should not be used" {})))]
(let [result (main/run! {:suite :sync
:inventory sync-inventory
:cases sync-cases
:skip-build true
:jobs 4
:run-command (fn [_]
{:exit 0
:out ""
:err ""})
:run-case (fn [case _opts]
{:id (:id case)
:status :ok})})]
(is (= :ok (:status result)))
(is (= ["sync-upload" "sync-status"] (:case-ids @serial-call)))
(is (= 4 (:jobs @serial-call)))
(is (false? @parallel-called?))))))
(deftest run-jobs-one-keeps-serial-runner
(let [serial-call (atom nil)
parallel-called? (atom false)]
(with-redefs [main/run-selected-cases! (fn [selected-cases run-case run-command opts]
(reset! serial-call {:case-ids (mapv :id selected-cases)
:run-case run-case
:run-command run-command
:jobs (:jobs opts)})
(mapv (fn [case]
{:id (:id case)
:status :ok})
selected-cases))
main/run-selected-cases-in-parallel! (fn [& _]
(reset! parallel-called? true)
(throw (ex-info "parallel runner should not be used" {})))]
(let [result (main/run! {:inventory complete-inventory
:cases sample-cases
:include ["smoke"]
:skip-build true
:jobs 1
:run-command (fn [_]
{:exit 0
:out ""
:err ""})
:run-case (fn [case _opts]
{:id (:id case)
:status :ok})})]
(is (= :ok (:status result)))
(is (= ["global-help" "graph-list"] (:case-ids @serial-call)))
(is (= 1 (:jobs @serial-call)))
(is (false? @parallel-called?))))))
(deftest parallel-runner-collects-completions-before-rethrowing-failure
(let [started (atom [])
finished (atom [])
started-latch (java.util.concurrent.CountDownLatch. 2)
release-success (promise)
cases [{:id "global-help"}
{:id "graph-list"}]
error (try
(main/run-selected-cases-in-parallel!
cases
(fn [case _opts]
(swap! started conj (:id case))
(.countDown started-latch)
(.await started-latch)
(if (= "graph-list" (:id case))
(do
(swap! finished conj [:failed (:id case)])
(deliver release-success true)
(throw (ex-info "boom" {:id (:id case)})))
(do
@release-success
(swap! finished conj [:ok (:id case)])
{:id (:id case)
:status :ok})))
(fn [_]
{:exit 0
:out ""
:err ""})
{:jobs 2})
nil
(catch Exception ex
ex))]
(is (instance? Exception error))
(is (= #{"global-help" "graph-list"}
(set @started)))
(is (= #{[:ok "global-help"]
[:failed "graph-list"]}
(set @finished)))))
(deftest parallel-runner-elapsed-ms-starts-when-case-begins-running
(let [events (atom [])]
(main/run-selected-cases-in-parallel!
[{:id "slow-1"}
{:id "slow-2"}
{:id "fast-after-queue"}]
(fn [test-case _opts]
(clojure.core/case (:id test-case)
"slow-1" (Thread/sleep 180)
"slow-2" (Thread/sleep 180)
"fast-after-queue" (Thread/sleep 10))
{:id (:id test-case)
:status :ok})
(fn [_]
{:exit 0
:out ""
:err ""})
{:jobs 2
:on-case-success (fn [payload]
(swap! events conj [(:id (:result payload)) (:elapsed-ms payload)]))})
(let [elapsed-map (into {} @events)]
(is (< (get elapsed-map "fast-after-queue" 1000) 120)
"queued case should measure only its own execution time, not time spent waiting in the pool")
(is (>= (get elapsed-map "slow-1" 0) 150))
(is (>= (get elapsed-map "slow-2" 0) 150)))))
(deftest select-cases-supports-case-id
(is (= ["graph-create"]
(mapv :id (main/select-cases sample-cases {:case "graph-create"})))))
@@ -283,6 +494,27 @@
(is (string/includes? output "[2/2] ✓ graph-list"))
(is (string/includes? output "Summary: 2 passed, 0 failed"))))
(deftest test-parallel-output-omits-meaningless-index-prefixes
(let [output (with-out-str
(main/test! {:inventory complete-inventory
:cases sample-cases
:include ["smoke"]
:skip-build true
:jobs 2
:run-command (fn [_]
{:exit 0
:out ""
:err ""})
:run-case (fn [case _opts]
(Thread/sleep (if (= "global-help" (:id case)) 25 5))
{:id (:id case)
:status :ok})}))]
(is (string/includes? output "▶ global-help"))
(is (string/includes? output "✓ global-help"))
(is (string/includes? output "✓ graph-list"))
(is (not (string/includes? output "[1/2]")))
(is (not (string/includes? output "[2/2]")))))
(deftest test-timings-prints-step-details-and-slow-summary
(let [output (with-out-str
(main/test! {:inventory complete-inventory
@@ -410,6 +642,8 @@
(is (string/includes? output "--skip-build"))
(is (string/includes? output "--include TAG"))
(is (string/includes? output "--case ID"))
(is (string/includes? output "--jobs N"))
(is (string/includes? output "Default: 4"))
(is (string/includes? output "--timings"))
(is (not (string/includes? output "--e2ee-password")))))
@@ -434,6 +668,8 @@
(is (string/includes? output "--skip-build"))
(is (string/includes? output "--include TAG"))
(is (string/includes? output "--case ID"))
(is (string/includes? output "--jobs N"))
(is (string/includes? output "Default: 4"))
(is (string/includes? output "--timings"))
(is (string/includes? output "--e2ee-password VALUE"))
(is (string/includes? output "Default: 11111"))))