From a89ffb6b0f22cf4e9d3b80d4eff744ef72ce0f40 Mon Sep 17 00:00:00 2001 From: rcmerci Date: Wed, 11 Mar 2026 21:46:00 +0800 Subject: [PATCH] 057-cli-sync-download-realtime-progress.md --- ...057-cli-sync-download-realtime-progress.md | 436 ++++++++++++++++++ docs/cli/logseq-cli.md | 10 +- src/main/frontend/handler/db_based/sync.cljs | 137 +----- src/main/frontend/worker/db_core.cljs | 16 +- src/main/frontend/worker/sync.cljs | 27 +- src/main/logseq/cli/command/sync.cljs | 54 ++- src/main/logseq/cli/transport.cljs | 92 ++++ .../frontend/handler/db_based/sync_test.cljs | 230 +++------ src/test/frontend/worker/db_sync_test.cljs | 55 +++ src/test/logseq/cli/command/sync_test.cljs | 136 ++++++ src/test/logseq/cli/commands_test.cljs | 8 + src/test/logseq/cli/transport_test.cljs | 40 ++ 12 files changed, 937 insertions(+), 304 deletions(-) create mode 100644 docs/agent-guide/057-cli-sync-download-realtime-progress.md diff --git a/docs/agent-guide/057-cli-sync-download-realtime-progress.md b/docs/agent-guide/057-cli-sync-download-realtime-progress.md new file mode 100644 index 0000000000..83ad162da3 --- /dev/null +++ b/docs/agent-guide/057-cli-sync-download-realtime-progress.md @@ -0,0 +1,436 @@ +# 057: Make `logseq-cli sync download` stream realtime progress and support long-running downloads + +## Summary + +This document defines an execution-ready implementation plan for improving `logseq-cli sync download` for large graphs. + +Today, the CLI waits for a single `/v1/invoke` response and uses a short default timeout. This creates two bad outcomes for large graph downloads: + +1. the terminal is silent while the snapshot is downloading and importing, and +2. healthy long-running work may appear to fail because the CLI request timeout is too short. + +The implementation should reuse the existing sync log/event infrastructure already used by the app, instead of creating a separate CLI-only progress system. + +## Decision records (confirmed) + +The following decisions are fixed for implementation: + +1. `sync download` timeout strategy uses a command-level long-running policy (not a global timeout increase). +2. Progress logs are printed to `stdout`. +3. Add `--progress` option for `sync download`. +4. `--progress` defaults to `true` for human output. +5. For structured output modes (for example `json` / `edn`), progress is automatically disabled unless the user explicitly passes `--progress true`. +6. Download progress log emission should be unified into shared worker/sync code (no duplicate app-only emission path for the same milestones). + +## Problem statement + +`logseq-cli sync download` already uses the same db-worker-node and worker sync stack that powers app sync behavior, but it does not currently reuse the realtime event stream. + +Relevant current-state facts: + +- CLI sends a request to db-worker-node over `/v1/invoke` and waits for one final response. +- db-worker-node already exposes `/v1/events` as an SSE event stream. +- worker/app sync logic already emits `:rtc-log` events. +- app/desktop already displays those logs. +- CLI transport defaults to a `10000` ms timeout, which is not appropriate for a full graph snapshot download/import. + +The implementation goal is therefore not to invent progress tracking from scratch. The goal is to make CLI consume and display the same progress events, and to change timeout behavior so long-running downloads can complete. + +## Goals + +- Show realtime progress during `logseq-cli sync download`. +- Reuse the existing `:rtc-log` event model and db-worker-node SSE stream. +- Unify download progress log emission into shared worker/sync code for the full download flow. +- Add `--progress` to `sync download` with default behavior enabled for human output. +- Print progress to `stdout`. +- Automatically disable progress for structured output modes unless `--progress true` is explicitly set. +- Prevent large graph downloads from failing under the generic short CLI timeout path. +- Preserve the final command result semantics and existing validation behavior. + +## Non-goals + +- Redesign the sync protocol. +- Replace the app RTC log UI. +- Introduce a polling-based progress API. +- Change the existing success/failure meaning of `sync download`. +- Add a CLI-only event schema that diverges from app behavior. + +## Current implementation map + +### CLI entrypoints + +- `src/main/logseq/cli/command/sync.cljs` + - defines `sync download` + - resolves auth, starts db-worker-node, validates empty DB, invokes `:thread-api/db-sync-download-graph-by-id` +- `src/main/logseq/cli/transport.cljs` + - sends HTTP requests to db-worker-node `/v1/invoke` + - currently applies default timeout behavior +- `src/main/logseq/cli/format.cljs` + - formats final CLI output +- `src/main/logseq/cli/config.cljs` + - defines CLI defaults, including timeout +- `src/main/logseq/cli/command/core.cljs` + - defines global CLI options including `--timeout-ms` + +### db-worker-node / worker entrypoints + +- `src/main/frontend/worker/db_worker_node.cljs` + - serves `/v1/invoke` + - serves `/v1/events` as SSE +- `src/main/frontend/worker/db_core.cljs` + - handles `:thread-api/db-sync-download-graph-by-id` + - already emits import/decrypt/save-stage logs +- `src/main/frontend/worker/sync.cljs` + - performs remote snapshot download and sync data fetch +- `src/main/frontend/worker/sync/log_and_state.cljs` + - publishes `:rtc-log` events to connected clients + +### App-side log consumers and app-specific log emission + +- `src/main/frontend/handler/db_based/sync.cljs` + - currently emits useful early download messages such as: + - `Preparing graph snapshot download` + - `Start downloading graph snapshot, file size: ...` + - `Graph snapshot downloaded` +- `src/main/frontend/handler/worker.cljs` +- `src/main/frontend/handler/events.cljs` +- `src/main/frontend/handler/db_based/rtc_flows.cljs` +- `src/main/frontend/components/rtc/indicator.cljs` + +These app-side files are useful references because they show the desired end-user progress semantics. However, the actual shared event source should live in worker/shared sync code, not in app-only UI handlers. + +## Design constraints + +1. **One shared progress model** + - CLI and app should consume the same logical progress events. + - Avoid a separate CLI-only progress protocol. + +2. **Invoke result remains authoritative** + - Realtime logs improve visibility but must not replace final command success/failure semantics. + +3. **Long-running timeout behavior must be command-aware** + - `sync download` should not rely on the same timeout assumptions as short metadata requests. + +4. **Output compatibility matters** + - Streaming progress should not break final human or machine-readable command results. + +## Proposed implementation + +The implementation should be delivered in four phases. + +--- + +## Phase 1: Make timeout handling explicit for long-running `sync download` + +### Objective + +Remove the dependency on the generic short CLI request timeout for the long-running download/import invoke path. + +### Files + +- `src/main/logseq/cli/command/sync.cljs` +- `src/main/logseq/cli/transport.cljs` +- `src/main/logseq/cli/config.cljs` +- `src/main/logseq/cli/command/core.cljs` + +### Tasks + +1. Trace exactly how `:timeout-ms` flows from CLI options/config into `transport/invoke` for `sync download`. +2. Introduce command-specific timeout behavior for `sync download`. +3. Keep the timeout policy explicit in code, rather than relying on an accidental global default. +4. Preserve existing timeout behavior for short non-download CLI commands unless intentionally changed. + +### Recommended implementation direction + +Prefer a command-specific long-task timeout path over raising the global default for all CLI traffic. + +Good options include: + +- passing a much larger timeout only for the final `db-sync-download-graph-by-id` invoke, or +- introducing a dedicated long-running request helper for commands that are expected to take a long time. + +Do **not** solve this by silently changing all CLI requests to use a large default timeout. + +### Acceptance criteria + +- `sync download` no longer depends on the generic `10000` ms timeout for the full download/import request. +- Other CLI requests keep their current short-request behavior unless explicitly updated. +- The effective timeout policy is easy to understand from the command implementation. + +### Verification + +- Unit test or integration test proving that `sync download` can run longer than the old short timeout path. +- Regression test showing short requests are unchanged. + +--- + +## Phase 2: Unify all download-progress logs into shared worker/sync code + +### Objective + +Ensure the full download flow emits shared progress events from the same worker/sync path used by both app and CLI. + +### Files + +- `src/main/frontend/worker/sync.cljs` +- `src/main/frontend/worker/db_core.cljs` +- `src/main/frontend/worker/sync/log_and_state.cljs` +- `src/main/frontend/handler/db_based/sync.cljs` + +### Tasks + +1. Inventory all download-progress log emissions related to `sync download` across app handlers and worker code. +2. Move or extract all shared milestones into worker/shared sync code where the remote snapshot and import/decrypt flow actually executes. +3. Reuse the existing `:rtc.log/download` event family and existing `sub-type` semantics wherever possible. +4. Remove duplicate app-only emission for shared milestones, and keep app handlers as consumers of shared events. +5. Keep milestone wording stable enough to avoid unnecessary UI regression in existing app consumers. + +### Required shared milestones + +The worker/shared path should emit at least these human-meaningful milestones: + +- preparing snapshot download, +- snapshot download started, including file size when available, +- snapshot download completed, +- saving/import/decrypt progress, +- graph ready / download complete. + +The final wording may differ, but the milestones must cover both: + +- network download progress stages, and +- local import/decrypt stages. + +### Acceptance criteria + +- Shared download milestones are emitted from worker/sync code across the full flow (download + import/decrypt). +- There is no competing app-only emission path for the same shared milestones. +- CLI-triggered `db-sync-download-graph-by-id` produces the same family of download progress events as app-triggered flows. +- Existing app consumers can still display download progress using the shared event source. + +### Verification + +- Worker-level or sync-level tests for emitted `:rtc-log` events. +- Manual verification in app that download progress still appears after the refactor. + +--- + +## Phase 3: Add CLI support for db-worker-node SSE event consumption + +### Objective + +Allow the CLI to subscribe to `/v1/events` while a long-running invoke is in progress. + +### Files + +- `src/main/logseq/cli/transport.cljs` +- `src/main/logseq/cli/command/sync.cljs` +- optionally a new helper namespace under `src/main/logseq/cli/` if event-stream logic should be isolated +- reference implementation: + - `src/main/frontend/persist_db/remote.cljs` + - `src/main/frontend/persist_db/node.cljs` + +### Tasks + +1. Add a lightweight CLI-side SSE client for db-worker-node `/v1/events`. +2. Decode incoming event payloads into the same shape consumed elsewhere in the codebase. +3. Support subscription lifecycle management: + - connect before starting the long-running invoke, + - receive events during the invoke, + - close cleanly on success, error, timeout, or interruption. +4. Keep the event client generic enough that future CLI commands could reuse it if needed. + +### Important behavior + +- The SSE stream is for observability, not command truth. +- If SSE disconnects, the invoke result still determines command success/failure. +- If the invoke finishes successfully, the CLI must stop listening and finalize normally. + +### Acceptance criteria + +- CLI can consume db-worker-node `/v1/events` while a command is in flight. +- Incoming `:rtc-log` events are decoded correctly. +- Subscription cleanup is reliable across success and failure paths. + +### Verification + +- Tests for event decode behavior. +- Tests or integration coverage for stream setup/cleanup. +- Manual verification against a running db-worker-node. + +--- + +## Phase 4: Render download progress in `sync download` without breaking final output + +### Objective + +Display realtime download/import progress in the terminal while preserving final result compatibility. + +### Files + +- `src/main/logseq/cli/command/sync.cljs` +- `src/main/logseq/cli/format.cljs` +- any new CLI event/render helper added in Phase 3 +- `docs/cli/logseq-cli.md` + +### Tasks + +1. Add `--progress` option to `sync download` command handling. +2. Define default behavior: progress enabled for human-oriented output mode. +3. Define structured-output behavior: progress automatically disabled for structured modes unless explicitly overridden by `--progress true`. +4. Subscribe to the worker event stream before invoking `db-sync-download-graph-by-id` when progress is enabled. +5. Filter only the relevant download log events for the active graph. +6. Render progress messages in chronological order to `stdout`. +7. Preserve the final success/failure output contract. +8. Document the new behavior in CLI docs, including mode-dependent defaults and override rules. + +### Output policy + +The implementation must make a clear separation between: + +- streaming progress lines, and +- the final command result. + +Confirmed direction: + +- stream progress to `stdout` when progress is enabled, +- add `--progress` option for `sync download`, +- default `--progress` to `true` for human-oriented output, +- automatically disable progress for structured output modes unless the user explicitly passes `--progress true`, +- keep the final result formatter responsible for terminal success/failure summary semantics. + +### Filtering policy + +The command should filter progress events using enough context to avoid printing unrelated logs. + +At minimum, filtering should consider the active graph identity. If a more precise operation-level filter is available without major complexity, prefer it. + +### Acceptance criteria + +- Running `logseq-cli sync download` with human-oriented output prints realtime progress lines to `stdout` during download/import. +- `--progress false` suppresses progress streaming. +- Structured output modes auto-disable progress unless `--progress true` is explicitly provided. +- Final command output still reflects the authoritative invoke result. +- Structured output parsing is not broken under the default mode-dependent progress behavior. + +### Verification + +- Integration test or high-confidence manual test showing visible staged output. +- Verification that final success output still matches expected formatter behavior. +- Verification that failure cases still return the correct final error. + +--- + +## Concrete execution order + +Implement in this order: + +1. **Phase 1 first** so large downloads no longer die under the short timeout path. +2. **Phase 2 second** so the CLI has a complete shared event source to consume. +3. **Phase 3 third** to add CLI event subscription infrastructure. +4. **Phase 4 last** to wire the streaming logs into `sync download` and finalize output behavior. + +Do not start Phase 4 before Phase 2 is complete, or the CLI will only show partial progress from the existing worker import stage. + +## Testing plan + +### Unit / focused tests + +Add or update tests in the most appropriate existing test namespaces for: + +- CLI timeout behavior for `sync download` +- event decoding / event subscription lifecycle +- worker/shared sync download log emission +- filtering and rendering of relevant `:rtc-log` events + +Likely test locations: + +- `src/test/logseq/cli/command/sync_test.cljs` +- `src/test/logseq/cli/integration_test.cljs` +- `src/test/frontend/worker/db_worker_node_test.cljs` +- `src/test/frontend/worker/db_sync_test.cljs` +- `src/test/frontend/handler/db_based/sync_test.cljs` + +The exact namespace choices may differ depending on existing test structure, but the coverage categories above are required. + +### Manual verification checklist + +1. Start a `sync download` against a graph large enough to produce visible staged progress. +2. Confirm the terminal shows early snapshot-download messages. +3. Confirm the terminal shows import/decrypt/save progress. +4. Confirm progress lines are emitted to `stdout` in human-oriented mode. +5. Confirm `--progress false` suppresses streaming progress output. +6. Confirm structured output mode auto-disables progress by default. +7. Confirm structured output mode prints progress only when explicitly using `--progress true`. +8. Confirm successful completion still depends on the invoke result. +9. Confirm a slow download no longer fails under the old short timeout path. +10. Confirm existing app download progress still works after shared-log refactoring. +11. Confirm non-empty DB validation and other preflight failures remain unchanged. + +## Risks and open questions + +### Risk 1: app and CLI may need slightly different rendering + +The event source should be shared, but rendering may differ by client. + +Mitigation: + +- share event emission, not presentation details. +- keep worker messages human-readable enough for both app and CLI. + +### Risk 2: progress events may not uniquely identify one operation + +If multiple operations or graphs are active, CLI could print unrelated logs. + +Mitigation: + +- filter by graph identity at minimum, +- add more precise filtering only if needed and justified by actual ambiguity. + +### Risk 3: output-mode compatibility + +Streaming logs can interfere with structured output modes. + +Mitigation: + +- enforce mode-dependent default behavior (`progress` auto-off for structured output unless explicitly enabled), +- verify human and machine-readable modes explicitly. + +### Resolved decision: timeout strategy + +`sync download` uses command-level long-running timeout handling instead of a global timeout increase. + +### Open question 1 + +Should the CLI event-stream helper remain local to `sync download`, or be introduced as a reusable CLI transport helper? + +Recommendation: + +- prefer a small reusable helper if the abstraction stays simple. + +## Out-of-scope follow-ups + +The following can be considered later and are **not required** for this plan: + +- byte-level progress bars, +- richer TUI formatting, +- resumable download semantics, +- generalized event streaming for all CLI commands. + +## Definition of done + +This work is complete when all of the following are true: + +- `logseq-cli sync download` displays realtime progress while a large graph is downloading/importing. +- The progress messages come from the shared worker/app event model, not a CLI-only ad hoc implementation. +- Download-progress milestones are unified in shared worker/sync code across the full flow. +- `sync download` supports `--progress`, with mode-dependent default behavior as documented. +- In structured output modes, progress is auto-disabled unless explicitly enabled. +- Large downloads are no longer constrained by the old generic short timeout path. +- Final command success/failure semantics remain intact. +- Relevant automated tests and CLI docs are updated. + +## Recommendation + +Execute this as a shared-event refactor plus a command-specific long-request timeout change. + +The key architecture decision is to make worker/shared sync code the source of truth for download progress, then let the CLI subscribe to db-worker-node events just like the app already does. This maximizes reuse, keeps progress semantics aligned across clients, and solves the two real UX issues together: silent long-running work and premature short-timeout failures. \ No newline at end of file diff --git a/docs/cli/logseq-cli.md b/docs/cli/logseq-cli.md index 8e842e8c6f..815ef609c3 100644 --- a/docs/cli/logseq-cli.md +++ b/docs/cli/logseq-cli.md @@ -76,7 +76,7 @@ Auth file contents include the persisted Cognito `id-token`, `access-token`, `re Verbose logging: - `--verbose` enables structured debug logs to stderr for CLI option parsing and db-worker-node API calls. -- stdout remains reserved for command output; large payloads are truncated in debug previews. +- `sync download` can stream realtime progress lines to stdout when progress is enabled; debug previews remain truncated. Timeouts: - `--timeout-ms` continues to control request timeout behavior for CLI transport. @@ -119,7 +119,7 @@ Sync commands: - `sync start --graph ` - start db-sync websocket client for a graph - `sync stop --graph ` - stop db-sync client on a graph daemon - `sync upload --graph ` - upload local graph snapshot to remote -- `sync download --graph ` - download remote graph `` into a same-name local graph directory +- `sync download --graph [--progress true|false]` - download remote graph `` into a same-name local graph directory - `sync remote-graphs [--graph ]` - list remote graphs visible to the current login context - `sync ensure-keys [--graph ]` - ensure user RSA keys for sync/e2ee - `sync grant-access --graph --graph-id --email ` - grant encrypted graph access to a user @@ -144,6 +144,11 @@ Sync download behavior: - If a local graph with the same name already exists, the CLI returns `graph-exists`. - If no remote graph with that name exists, the CLI returns `remote-graph-not-found`. - `sync download` starts `db-worker-node` in create-empty mode so local startup does not write `db-initial-data` before snapshot import. +- The final snapshot download/import invoke uses a command-specific long-running timeout (30 minutes by default) rather than the generic short-request timeout path. +- Progress streaming uses db-worker-node SSE `/v1/events` and shared `:rtc.log/download` events. +- `--progress` defaults to `true` for human output. +- For structured output (`--output json|edn`), progress is auto-disabled unless explicitly overridden with `--progress true`. +- `--progress false` always suppresses progress streaming. - If the target graph DB is not empty at download time, the CLI returns `graph-db-not-empty` and aborts before import. - For e2ee remote graphs in headless CLI mode, run `logseq login` first and set `e2ee-password` via `sync config set` (or in `--config`) before download. @@ -210,6 +215,7 @@ Output formats: - Global `--output ` applies to all commands - Output formatting is controlled via global `--output`, `:output-format` in config, or `LOGSEQ_CLI_OUTPUT`. - Human output is plain text. List/search commands render tables with a final `Count: N` line. For list and search subcommands, the ID column uses `:db/id` (not UUID). If `:db/ident` exists, an `IDENT` column is included. `list property` includes a dedicated `TYPE` column. Search table columns are `ID` and `TITLE`. Block titles can include multiple lines; multi-line rows align additional lines under the `TITLE` column. Times such as list `UPDATED-AT`/`CREATED-AT` and `graph info` `Created at` are shown in human-friendly relative form. Errors include error codes and may include a `Hint:` line. Use `--output json|edn` for structured output. +- `sync download` progress lines are streamed to stdout only when progress is enabled. In `json`/`edn` mode, progress is disabled by default unless `--progress true` is provided. - For `list property`, `TYPE` is returned in default output (without `--expand`) for human and structured (`json`/`edn`) formats. - `upsert page` and `upsert block` return entity ids in `data.result` for JSON/EDN output, and include ids in human output. - Human example: diff --git a/src/main/frontend/handler/db_based/sync.cljs b/src/main/frontend/handler/db_based/sync.cljs index b5718e3bfb..48d7757d2e 100644 --- a/src/main/frontend/handler/db_based/sync.cljs +++ b/src/main/frontend/handler/db_based/sync.cljs @@ -32,94 +32,6 @@ (or config/db-sync-http-base (ws->http-base config/db-sync-ws-url))) -(def ^:private snapshot-text-decoder (js/TextDecoder.)) - -(defn- ->uint8 [data] - (cond - (instance? js/Uint8Array data) data - (instance? js/ArrayBuffer data) (js/Uint8Array. data) - (string? data) (.encode (js/TextEncoder.) data) - :else (js/Uint8Array. data))) - -(defn- decode-snapshot-rows [payload] - (sqlite-util/read-transit-str (.decode snapshot-text-decoder (->uint8 payload)))) - -(defn- frame-len [^js data offset] - (let [view (js/DataView. (.-buffer data) offset 4)] - (.getUint32 view 0 false))) - -(defn- concat-bytes - [^js a ^js b] - (cond - (nil? a) b - (nil? b) a - :else - (let [out (js/Uint8Array. (+ (.-byteLength a) (.-byteLength b)))] - (.set out a 0) - (.set out b (.-byteLength a)) - out))) - -(defn- parse-framed-chunk - [buffer chunk] - (let [data (concat-bytes buffer chunk) - total (.-byteLength data)] - (loop [offset 0 - rows []] - (if (< (- total offset) 4) - {:rows rows - :buffer (when (< offset total) - (.slice data offset total))} - (let [len (frame-len data offset) - next-offset (+ offset 4 len)] - (if (<= next-offset total) - (let [payload (.slice data (+ offset 4) next-offset) - decoded (decode-snapshot-rows payload)] - (recur next-offset (into rows decoded))) - {:rows rows - :buffer (.slice data offset total)})))))) - -(defn- finalize-framed-buffer - [buffer] - (if (or (nil? buffer) (zero? (.-byteLength buffer))) - [] - (let [{:keys [rows buffer]} (parse-framed-chunk nil buffer)] - (if (and (seq rows) (or (nil? buffer) (zero? (.-byteLength buffer)))) - rows - (throw (ex-info "incomplete framed buffer" {:buffer buffer :rows rows})))))) - -(defn- gzip-bytes? - [^js payload] - (and (some? payload) - (>= (.-byteLength payload) 2) - (= 31 (aget payload 0)) - (= 139 (aget payload 1)))) - -(defn- bytes->stream - [^js payload] - (js/ReadableStream. - #js {:start (fn [controller] - (.enqueue controller payload) - (.close controller))})) - -(defn- stream payload) - decompressed (.pipeThrough stream (js/DecompressionStream. "gzip")) - resp (js/Response. decompressed) - buf (.arrayBuffer resp)] - (->uint8 buf)) - (p/rejected (ex-info "gzip decompression not supported" - {:type :db-sync/decompression-not-supported})))) - -(defn- uint8 buf)] - (if (gzip-bytes? payload) - ( (if (and graph-uuid base) - (-> (p/let [_ (js/Promise. user-handler/task--ensure-id&access-token) - pull-resp (fetch-json (str base "/sync/" graph-uuid "/pull") - {:method "GET"} - {:response-schema :sync/pull}) - remote-tx (:t pull-resp) - _ (when-not (integer? remote-tx) - (throw (ex-info "non-integer remote-tx when downloading graph" - {:graph graph-name - :remote-tx remote-tx}))) - resp (js/fetch (str base "/sync/" graph-uuid "/snapshot/stream") - (clj->js (with-auth-headers {:method "GET"}))) - total-bytes (when-let [raw (some-> resp .-headers (.get "content-length"))] - (let [parsed (js/parseInt raw 10)] - (when-not (js/isNaN parsed) parsed))) - _ (state/pub-event! - [:rtc/log {:type :rtc.log/download - :sub-type :download-progress - :graph-uuid graph-uuid - :message (str "Start downloading graph snapshot, file size: " total-bytes)}])] - (when-not (.-ok resp) - (throw (ex-info "snapshot download failed" - {:graph graph-name - :status (.-status resp)}))) - (p/let [snapshot-bytes ( (if (seq graph-uuid) + (p/let [_ (js/Promise. user-handler/task--ensure-id&access-token)] + (state/ resp .-headers (.get "content-length"))] + (let [parsed (js/parseInt raw 10)] + (when-not (js/isNaN parsed) + parsed)))) + (defn- download-graph-with-id! [repo graph-id graph-e2ee?] (let [base (http-base-url) @@ -2270,7 +2285,8 @@ :else (do (require-auth-token! {:repo repo :field :auth-token}) - (p/let [pull-resp (fetch-json (str base "/sync/" graph-id "/pull") + (p/let [_ (download-log! graph-id :download-progress "Preparing graph snapshot download") + pull-resp (fetch-json (str base "/sync/" graph-id "/pull") {:method "GET"} {:response-schema :sync/pull}) remote-tx (:t pull-resp) @@ -2280,12 +2296,19 @@ :value remote-tx})) resp (js/fetch (str base "/sync/" graph-id "/snapshot/stream") (clj->js (with-auth-headers {:method "GET"}))) + total-bytes (response-content-length resp) + _ (download-log! graph-id + :download-progress + (if (number? total-bytes) + (str "Start downloading graph snapshot, file size: " total-bytes) + "Start downloading graph snapshot")) _ (when-not (.-ok resp) (fail-fast :db-sync/snapshot-download-failed {:repo repo :graph-id graph-id :status (.-status resp)})) payload ( line str string/trim)) + (println line))) + +(defn- sync-download-invoke-config + [cfg] + (assoc cfg :timeout-ms (max 0 (or (:sync-download-timeout-ms cfg) + sync-download-timeout-ms)))) + (def ^:private sync-download-non-empty-query '[:find (count ?e) . :where @@ -112,7 +128,9 @@ :repo repo :graph (core/repo->graph repo) :allow-missing-graph true - :require-missing-graph true}}) + :require-missing-graph true + :progress (:progress options) + :progress-explicit? (contains? options :progress)}}) :sync-remote-graphs {:ok? true @@ -314,9 +332,24 @@ (p/catch (fn [error] (exception->error error {:repo (:repo action)}))))) +(defn- sync-download-progress-enabled? + [action config] + (if (:progress-explicit? action) + (true? (:progress action)) + (not (contains? structured-output-formats (:output-format config))))) + +(defn- download-progress-message + [graph-id event-type payload] + (when (and (= :rtc-log event-type) + (map? payload) + (= :rtc.log/download (:type payload)) + (= graph-id (:graph-uuid payload))) + (:message payload))) + (defn- execute-sync-download [action config] - (let [config' (download-config config)] + (let [config' (download-config config) + progress-enabled? (sync-download-progress-enabled? action config')] (-> (p/let [remote-graphs (invoke-global config' :thread-api/db-sync-list-remote-graphs []) @@ -332,8 +365,19 @@ (p/let [cfg (cli-server/ensure-server! config' (:repo action)) _ (transport/invoke cfg :thread-api/set-db-sync-config false [(sync-config config')]) _ (ensure-empty-download-db! cfg (:repo action)) - result (transport/invoke cfg :thread-api/db-sync-download-graph-by-id false - [(:repo action) (:graph-id remote-graph) (:graph-e2ee? remote-graph)])] + download-cfg (sync-download-invoke-config cfg) + graph-id (:graph-id remote-graph) + events-sub (when progress-enabled? + (transport/connect-events! + download-cfg + (fn [event-type payload] + (when-let [message (download-progress-message graph-id event-type payload)] + (print-progress-line! message))))) + result (-> (transport/invoke download-cfg :thread-api/db-sync-download-graph-by-id false + [(:repo action) graph-id (:graph-e2ee? remote-graph)]) + (p/finally (fn [] + (when-let [close! (:close! events-sub)] + (close!)))))] {:status :ok :data (if (map? result) result diff --git a/src/main/logseq/cli/transport.cljs b/src/main/logseq/cli/transport.cljs index 6e72f443f5..0ba83a372d 100644 --- a/src/main/logseq/cli/transport.cljs +++ b/src/main/logseq/cli/transport.cljs @@ -125,6 +125,98 @@ :response response-preview) decoded))))) +(defn- decode-event + [{:keys [type payload]}] + (let [decoded (when (some? payload) + (try + (ldb/read-transit-str payload) + (catch :default _ + payload)))] + (if (and (vector? decoded) + (= 2 (count decoded)) + (keyword? (first decoded))) + [(first decoded) (second decoded)] + [(when type (keyword type)) decoded]))) + +(defn- data-line + [event-text] + (some (fn [line] + (when (string/starts-with? line "data: ") + (subs line 6))) + (string/split-lines event-text))) + +(defn connect-events! + [{:keys [base-url]} on-event] + (let [handler (or on-event (fn [_event-type _payload] nil)) + url (js/URL. (str (string/replace (or base-url "") #"/$" "") "/v1/events")) + buffer (atom "") + *req (atom nil) + *res (atom nil) + *closed? (atom false) + dispatch! (fn [event-text] + (when-let [line (data-line event-text)] + (try + (let [event-map (js->clj (js/JSON.parse line) :keywordize-keys true) + [event-type payload] (decode-event event-map)] + (when (some? event-type) + (handler event-type payload))) + (catch :default e + (log/debug :event :cli.transport/events-parse-failed + :error e + :line line))))) + consume-chunk! (fn [chunk] + (swap! buffer str (.toString chunk "utf8")) + (loop [] + (let [current @buffer + idx (string/index-of current "\n\n")] + (when (some? idx) + (let [event-text (subs current 0 idx) + rest-text (subs current (+ idx 2))] + (reset! buffer rest-text) + (dispatch! event-text) + (recur)))))) + close! (fn [] + (reset! *closed? true) + (when-let [^js res @*res] + (try + (.destroy res) + (catch :default _ nil))) + (when-let [^js req @*req] + (try + (.destroy req) + (catch :default _ nil))) + nil)] + (try + (let [req (.request + (request-module url) + #js {:method "GET" + :hostname (.-hostname url) + :port (request-port url) + :path (str (.-pathname url) (.-search url)) + :headers (clj->js {"Accept" "text/event-stream"})} + (fn [^js res] + (reset! *res res) + (.on res "data" + (fn [chunk] + (when-not @*closed? + (consume-chunk! chunk)))) + (.on res "error" + (fn [e] + (when-not @*closed? + (log/debug :event :cli.transport/events-stream-error + :error e))))))] + (reset! *req req) + (.on req "error" + (fn [e] + (when-not @*closed? + (log/debug :event :cli.transport/events-request-error + :error e)))) + (.end req)) + (catch :default e + (log/debug :event :cli.transport/events-connect-failed + :error e))) + {:close! close!})) + (defn write-output [{:keys [format path data]}] (case format diff --git a/src/test/frontend/handler/db_based/sync_test.cljs b/src/test/frontend/handler/db_based/sync_test.cljs index 189856d5d4..dddff7bb92 100644 --- a/src/test/frontend/handler/db_based/sync_test.cljs +++ b/src/test/frontend/handler/db_based/sync_test.cljs @@ -7,35 +7,8 @@ [frontend.handler.user :as user-handler] [frontend.state :as state] [logseq.db :as ldb] - [logseq.db.sqlite.util :as sqlite-util] [promesa.core :as p])) -(def ^:private test-text-encoder (js/TextEncoder.)) - -(defn- frame-bytes [^js data] - (let [len (.-byteLength data) - out (js/Uint8Array. (+ 4 len)) - view (js/DataView. (.-buffer out))] - (.setUint32 view 0 len false) - (.set out data 4) - out)) - -(defn- encode-framed-rows [rows] - (let [payload (.encode test-text-encoder (sqlite-util/write-transit-str rows))] - (frame-bytes payload))) - -(defn- (p/let [gzip-bytes ( (p/with-redefs [db-sync/http-base (fn [] "http://base") - db-sync/fetch-json (fn [url _opts _schema] - (cond - (string/ends-with? url "/pull") - (p/resolved {:t 42}) - - :else - (p/rejected (ex-info "unexpected fetch-json URL" - {:url url})))) - user-handler/task--ensure-id&access-token (fn [resolve _reject] - (resolve true)) - state/ (p/with-redefs [state/set-state! (fn [k v] + (swap! state-calls conj [k v])) + state/pub-event! (fn [event] + (swap! pub-events conj event) + nil) + user-handler/task--ensure-id&access-token (fn [resolve _reject] + (resolve true)) + state/ (p/let [gzip-bytes ( (p/with-redefs [db-sync/http-base (constantly "http://base") - db-sync/fetch-json (fn [url _opts _schema] - (cond - (string/ends-with? url "/pull") - (p/resolved {:t 8}) - - :else - (p/rejected (ex-info "unexpected fetch-json URL" - {:url url})))) - user-handler/task--ensure-id&access-token (fn [resolve _reject] + (let [worker-calls (atom [])] + (-> (p/with-redefs [user-handler/task--ensure-id&access-token (fn [resolve _reject] (resolve true)) - state/ (p/with-redefs [state/set-state! (fn [k v] + (swap! state-calls conj [k v])) + state/pub-event! (fn [event] + (swap! pub-events conj event) + nil) + user-handler/task--ensure-id&access-token (fn [resolve _reject] + (resolve true)) + state/ error ex-data :code))) + (is (= [[:rtc/downloading-graph-uuid "graph-3"] + [:rtc/downloading-graph-uuid nil]] + @state-calls)) + (is (empty? @pub-events)) + (done))))))) diff --git a/src/test/frontend/worker/db_sync_test.cljs b/src/test/frontend/worker/db_sync_test.cljs index 0b1060bb84..57d57b0db7 100644 --- a/src/test/frontend/worker/db_sync_test.cljs +++ b/src/test/frontend/worker/db_sync_test.cljs @@ -1,5 +1,6 @@ (ns frontend.worker.db-sync-test (:require [cljs.test :refer [deftest is testing async use-fixtures]] + [clojure.string :as string] [datascript.core :as d] [frontend.common.crypt :as crypt] [frontend.worker-common.util :as worker-util] @@ -9,6 +10,7 @@ [frontend.worker.sync :as db-sync] [frontend.worker.sync.client-op :as client-op] [frontend.worker.sync.crypt :as sync-crypt] + [frontend.worker.sync.log-and-state :as sync-log-and-state] [logseq.common.config :as common-config] [logseq.db :as ldb] [logseq.db.frontend.validate :as db-validate] @@ -1323,6 +1325,59 @@ (reset! worker-state/*db-sync-config config-prev) (done)))))))) +(deftest download-graph-by-id-emits-shared-download-milestones-test + (testing "download emits shared rtc.log/download milestones" + (async done + (let [fetch-prev js/fetch + config-prev @worker-state/*db-sync-config + logs (atom []) + rows [["addr-1" "content-1" nil]] + payload (#'db-sync/frame-bytes (#'db-sync/encode-snapshot-rows rows))] + (reset! worker-state/*db-sync-config {:http-base "https://example.com" + :auth-token "token-value"}) + (set! js/fetch + (fn [url _opts] + (cond + (>= (.indexOf url "/pull") 0) + (js/Promise.resolve #js {:ok true + :status 200 + :text (fn [] (js/Promise.resolve "{\"type\":\"pull/ok\",\"t\":9,\"txs\":[]}"))}) + + (>= (.indexOf url "/snapshot/stream") 0) + (js/Promise.resolve #js {:ok true + :status 200 + :headers #js {:get (fn [header] + (when (= header "content-length") + (str (.-byteLength payload))))} + :arrayBuffer (fn [] (js/Promise.resolve (.-buffer payload)))}) + + :else + (js/Promise.reject (js/Error. (str "unexpected fetch url: " url)))))) + (-> (p/with-redefs [sync-log-and-state/rtc-log (fn [type payload] + (swap! logs conj [type payload]) + nil)] + (db-sync/download-graph-by-id! test-repo "graph-1" false)) + (p/then (fn [result] + (is (= "graph-1" (:graph-id result))) + (is (= 9 (:remote-tx result))) + (is (= false (:graph-e2ee? result))) + (is (= rows (vec (:rows result)))) + (let [messages (mapv (fn [[_ payload]] (:message payload)) @logs)] + (is (some #(= "Preparing graph snapshot download" %) messages)) + (is (some #(string/includes? % "Start downloading graph snapshot") messages)) + (is (some #(= "Graph snapshot downloaded" %) messages))) + (is (every? (fn [[type payload]] + (and (= :rtc.log/download type) + (= "graph-1" (:graph-uuid payload)))) + @logs)))) + (p/catch (fn [e] + (is false (str "unexpected error: " e)))) + (p/finally + (fn [] + (set! js/fetch fetch-prev) + (reset! worker-state/*db-sync-config config-prev) + (done)))))))) + (deftest ensure-upload-graph-identity-creates-remote-graph-when-local-graph-id-missing-test (testing "first upload bootstrap creates and persists a remote graph when local metadata is missing" (async done diff --git a/src/test/logseq/cli/command/sync_test.cljs b/src/test/logseq/cli/command/sync_test.cljs index 4e9f83791a..79de171639 100644 --- a/src/test/logseq/cli/command/sync_test.cljs +++ b/src/test/logseq/cli/command/sync_test.cljs @@ -28,6 +28,15 @@ (is (true? (get-in result [:action :allow-missing-graph]))) (is (true? (get-in result [:action :require-missing-graph]))))) + (testing "sync download action keeps progress option and explicit flag" + (let [default-result (sync-command/build-action :sync-download {} [] "logseq_db_demo") + explicit-result (sync-command/build-action :sync-download {:progress false} [] "logseq_db_demo")] + (is (true? (:ok? default-result))) + (is (= false (get-in default-result [:action :progress-explicit?]))) + (is (true? (:ok? explicit-result))) + (is (= false (get-in explicit-result [:action :progress]))) + (is (= true (get-in explicit-result [:action :progress-explicit?]))))) + (testing "sync config set requires name and value" (let [missing-both (sync-command/build-action :sync-config-set {} [] nil) missing-value (sync-command/build-action :sync-config-set {} ["ws-url"] nil)] @@ -313,6 +322,133 @@ (is false (str "unexpected error: " e)))) (p/finally done))))) +(deftest test-execute-sync-download-uses-long-timeout-only-for-download-invoke + (async done + (let [invoke-calls (atom [])] + (-> (p/with-redefs [cli-server/ensure-server! (fn [config _repo] + (p/resolved (assoc config :base-url "http://example"))) + transport/invoke (fn [cfg method direct-pass? args] + (swap! invoke-calls conj {:method method + :direct-pass? direct-pass? + :args args + :timeout-ms (:timeout-ms cfg)}) + (case method + :thread-api/db-sync-list-remote-graphs + (p/resolved [{:graph-id "remote-graph-id" + :graph-name "demo" + :graph-e2ee? false}]) + :thread-api/q + (p/resolved 0) + :thread-api/db-sync-download-graph-by-id + (p/resolved {:ok true}) + (p/resolved nil)))] + (p/let [result (execute-with-runtime-auth {:type :sync-download + :repo "logseq_db_demo" + :graph "demo"} + {:base-url "http://example" + :data-dir "/tmp" + :timeout-ms 10000}) + [set-config-before list-remote-graphs set-config-after check-empty-db download] + @invoke-calls] + (is (= :ok (:status result))) + (is (= :thread-api/set-db-sync-config (:method set-config-before))) + (is (= :thread-api/db-sync-list-remote-graphs (:method list-remote-graphs))) + (is (= :thread-api/set-db-sync-config (:method set-config-after))) + (is (= :thread-api/q (:method check-empty-db))) + (is (= :thread-api/db-sync-download-graph-by-id (:method download))) + (is (= 10000 (:timeout-ms set-config-before))) + (is (= 10000 (:timeout-ms list-remote-graphs))) + (is (= 10000 (:timeout-ms set-config-after))) + (is (= 10000 (:timeout-ms check-empty-db))) + (is (= 1800000 (:timeout-ms download))))) + (p/catch (fn [e] + (is false (str "unexpected error: " e)))) + (p/finally done))))) + +(deftest test-execute-sync-download-progress-mode-behavior + (async done + (let [subscribe-calls (atom []) + close-calls (atom 0) + printed-lines (atom [])] + (-> (p/with-redefs [cli-server/ensure-server! (fn [config _repo] + (p/resolved (assoc config :base-url "http://example"))) + transport/connect-events! (fn [cfg on-event] + (swap! subscribe-calls conj {:base-url (:base-url cfg) + :timeout-ms (:timeout-ms cfg)}) + (on-event :rtc-log {:type :rtc.log/download + :graph-uuid "remote-graph-id" + :message "Preparing graph snapshot download"}) + (on-event :rtc-log {:type :rtc.log/download + :graph-uuid "other-graph-id" + :message "should be filtered"}) + {:close! (fn [] + (swap! close-calls inc))}) + sync-command/print-progress-line! (fn [line] + (swap! printed-lines conj line) + nil) + transport/invoke (fn [_ method _direct-pass? _args] + (case method + :thread-api/db-sync-list-remote-graphs + (p/resolved [{:graph-id "remote-graph-id" + :graph-name "demo" + :graph-e2ee? false}]) + :thread-api/q + (p/resolved 0) + :thread-api/db-sync-download-graph-by-id + (p/resolved {:ok true}) + (p/resolved nil)))] + (p/let [_ (execute-with-runtime-auth {:type :sync-download + :repo "logseq_db_demo" + :graph "demo" + :progress-explicit? false} + {:base-url "http://example" + :data-dir "/tmp" + :output-format nil}) + _ (is (= 1 (count @subscribe-calls))) + _ (is (= ["Preparing graph snapshot download"] @printed-lines)) + _ (is (= 1 @close-calls)) + _ (reset! subscribe-calls []) + _ (reset! printed-lines []) + _ (reset! close-calls 0) + _ (execute-with-runtime-auth {:type :sync-download + :repo "logseq_db_demo" + :graph "demo" + :progress-explicit? false} + {:base-url "http://example" + :data-dir "/tmp" + :output-format :json}) + _ (is (= [] @subscribe-calls)) + _ (is (= [] @printed-lines)) + _ (is (= 0 @close-calls)) + _ (execute-with-runtime-auth {:type :sync-download + :repo "logseq_db_demo" + :graph "demo" + :progress true + :progress-explicit? true} + {:base-url "http://example" + :data-dir "/tmp" + :output-format :json}) + _ (is (= 1 (count @subscribe-calls))) + _ (is (= ["Preparing graph snapshot download"] @printed-lines)) + _ (is (= 1 @close-calls)) + _ (reset! subscribe-calls []) + _ (reset! printed-lines []) + _ (reset! close-calls 0) + _ (execute-with-runtime-auth {:type :sync-download + :repo "logseq_db_demo" + :graph "demo" + :progress false + :progress-explicit? true} + {:base-url "http://example" + :data-dir "/tmp" + :output-format nil})] + (is (= [] @subscribe-calls)) + (is (= [] @printed-lines)) + (is (= 0 @close-calls)))) + (p/catch (fn [e] + (is false (str "unexpected error: " e)))) + (p/finally done))))) + (deftest test-execute-sync-download-uses-graph-config-when-base-url-missing (async done (let [ensure-calls (atom []) diff --git a/src/test/logseq/cli/commands_test.cljs b/src/test/logseq/cli/commands_test.cljs index 3156023b4b..420c778517 100644 --- a/src/test/logseq/cli/commands_test.cljs +++ b/src/test/logseq/cli/commands_test.cljs @@ -1259,6 +1259,14 @@ (is (false? (:ok? result))) (is (= :missing-graph (get-in result [:error :code]))))) + (testing "sync download accepts progress option" + (let [disabled (commands/parse-args ["sync" "download" "--graph" "demo" "--progress" "false"]) + enabled (commands/parse-args ["sync" "download" "--graph" "demo" "--progress" "true"])] + (is (true? (:ok? disabled))) + (is (= false (get-in disabled [:options :progress]))) + (is (true? (:ok? enabled))) + (is (= true (get-in enabled [:options :progress]))))) + (testing "graph import rejects unknown type" (let [result (commands/parse-args ["graph" "import" "--type" "zip" diff --git a/src/test/logseq/cli/transport_test.cljs b/src/test/logseq/cli/transport_test.cljs index e23bb44fd2..12c77c1e0b 100644 --- a/src/test/logseq/cli/transport_test.cljs +++ b/src/test/logseq/cli/transport_test.cljs @@ -2,6 +2,7 @@ (:require [cljs.test :refer [deftest is async testing]] [logseq.cli.test-helper :as test-helper] [logseq.cli.transport :as transport] + [logseq.db :as ldb] [promesa.core :as p])) (def ^:private fs (js/require "fs")) @@ -146,6 +147,45 @@ (is false (str "unexpected error: " e)) (done))))))) +(deftest test-connect-events-decodes-rtc-log-and-cleans-up + (async done + (let [received (atom []) + client-closed? (atom false)] + (-> (p/let [{:keys [url stop!]} (start-server + (fn [^js req ^js res] + (if (= "/v1/events" (.-url req)) + (do + (.writeHead res 200 #js {"Content-Type" "text/event-stream" + "Cache-Control" "no-cache" + "Connection" "keep-alive"}) + (let [payload (ldb/write-transit-str {:type :rtc.log/download + :graph-uuid "graph-1" + :message "Preparing graph snapshot download"}) + data (js/JSON.stringify #js {:type "rtc-log" + :payload payload})] + (.write res (str "data: " data "\n\n"))) + (.on req "close" (fn [] + (reset! client-closed? true)))) + (do + (.writeHead res 404 #js {"Content-Type" "text/plain"}) + (.end res "not-found")))))] + (let [{:keys [close!]} (transport/connect-events! {:base-url url} + (fn [event-type payload] + (swap! received conj [event-type payload])))] + (p/let [_ (p/delay 50) + _ (close!) + _ (p/delay 50)] + (is (= [[:rtc-log {:type :rtc.log/download + :graph-uuid "graph-1" + :message "Preparing graph snapshot download"}]] + @received)) + (is (= true @client-closed?)) + (stop!)))) + (p/then (fn [_] (done))) + (p/catch (fn [e] + (is false (str "unexpected error: " e)) + (done))))))) + (deftest test-read-input (testing "reads edn input" (let [file-path (temp-path "input.edn")]