From 203ca43ffbab6fa1c4f067305b3d2ed9e37cb85a Mon Sep 17 00:00:00 2001 From: rcmerci Date: Fri, 22 May 2026 12:11:30 +0800 Subject: [PATCH] fix: agent bridge and logging --- .../logseq/db/frontend/property_test.cljs | 21 +- src/main/logseq/cli/command/agent.cljs | 66 ++++-- src/main/logseq/db_worker/log.cljs | 7 +- src/resources/dicts/en.edn | 1 + src/resources/dicts/zh-cn.edn | 1 + .../frontend/worker/db_worker_node_test.cljs | 21 ++ src/test/frontend/worker/migrate_test.cljs | 18 ++ src/test/logseq/cli/command/agent_test.cljs | 190 +++++++++++++++++- 8 files changed, 305 insertions(+), 20 deletions(-) diff --git a/deps/db/test/logseq/db/frontend/property_test.cljs b/deps/db/test/logseq/db/frontend/property_test.cljs index a2509b7db4..f28369063c 100644 --- a/deps/db/test/logseq/db/frontend/property_test.cljs +++ b/deps/db/test/logseq/db/frontend/property_test.cljs @@ -1,7 +1,18 @@ (ns logseq.db.frontend.property-test - (:require [cljs.test :refer [deftest is testing]] + (:require ["fs" :as fs] + ["path" :as node-path] + [cljs.reader :as reader] + [cljs.test :refer [deftest is testing]] [logseq.db.frontend.property :as db-property])) +(defn- read-dict + [filename] + (reader/read-string + (.toString + (fs/readFileSync + (node-path/join (.cwd js/process) "src" "resources" "dicts" filename)) + "utf8"))) + (deftest sort-properties (let [p1 {:db/id 1, :block/order "a", :block/uuid "uuid-a"} p2 {:db/id 2, :block/order "b", :block/uuid "uuid-b"} @@ -102,4 +113,10 @@ (get-in property [:properties :logseq.property/description]))) (is (contains? db-property/public-built-in-properties :logseq.property.agent/session-id)) (is (db-property/logseq-property? :logseq.property.agent/session-id)) - (is (db-property/internal-property? :logseq.property.agent/session-id))))) + (is (db-property/internal-property? :logseq.property.agent/session-id))) + + (testing "localized display title" + (let [i18n-key (db-property/built-in-ident->i18n-key :logseq.property.agent/session-id)] + (is (= :property.built-in/agent-session-id i18n-key)) + (is (contains? (read-dict "en.edn") i18n-key)) + (is (contains? (read-dict "zh-cn.edn") i18n-key)))))) diff --git a/src/main/logseq/cli/command/agent.cljs b/src/main/logseq/cli/command/agent.cljs index b0e3232905..5827f81744 100644 --- a/src/main/logseq/cli/command/agent.cljs +++ b/src/main/logseq/cli/command/agent.cljs @@ -936,15 +936,27 @@ (p/resolved nil)) (route-task! cfg opts task)))) +(def ^:private max-concurrent-routes 4) + +(defn- p-map-batched + [limit f coll] + (reduce (fn [result-promise batch] + (p/let [result result-promise + batch-result (p/all (mapv f batch))] + (into result batch-result))) + (p/resolved []) + (partition-all limit coll))) + (defn- process-tasks! [cfg {:keys [repo graph agent-name prompt-templates]}] (p/let [tasks (list-routable-tasks cfg repo agent-name)] - (p/all (mapv #(route-task-once! cfg {:repo repo - :graph graph - :agent-name agent-name - :prompt-templates prompt-templates} - %) - tasks)))) + (p-map-batched max-concurrent-routes + #(route-task-once! cfg {:repo repo + :graph graph + :agent-name agent-name + :prompt-templates prompt-templates} + %) + tasks))) (def ^:private assignee-property-ident :logseq.property/assignee) @@ -1024,6 +1036,14 @@ (or (string/includes? (:v datom) (str "[[" agent-name "]]")) (string/includes? (:v datom) "[[")))) +(defn- task-routability-datom? + [datom] + (and (true? (:added datom)) + (or (and (= :block/tags (:a datom)) + (= :logseq.class/Task (:v datom))) + (and (= :logseq.property/status (:a datom)) + (= :logseq.property/status.todo (:v datom)))))) + (defn- pull-assignee-property [cfg repo] (transport/invoke cfg :thread-api/pull [repo assignee-property-selector assignee-property-ident])) @@ -1212,6 +1232,15 @@ (route-task-once! cfg opts {:block block :tree-text tree-text}))))))))) +(defn- route-routability-datom! + [cfg {:keys [repo agent-name] :as opts} datom] + (when-let [block-id (:e datom)] + (p/let [block (pull-task-block cfg repo block-id)] + (when (routable-task? block agent-name) + (p/let [tree-text (show-task-tree cfg repo block)] + (route-task-once! cfg opts {:block block + :tree-text tree-text})))))) + (defn- route-comment-datom! [cfg {:keys [repo agent-name] :as opts} datom] (when-let [block-id (:e datom)] @@ -1222,8 +1251,10 @@ (defn- process-sync-db-changes-event! [cfg {:keys [repo] :as opts} {:keys [tx-data]}] (p/let [assignee-datoms (resolve-assignee-datoms cfg repo tx-data)] - (let [comment-datoms (filter #(comment-title-datom? % (:agent-name opts)) tx-data) + (let [routability-datoms (filter task-routability-datom? tx-data) + comment-datoms (filter #(comment-title-datom? % (:agent-name opts)) tx-data) routing (vec (concat (map #(route-assignee-datom! cfg opts %) assignee-datoms) + (map #(route-routability-datom! cfg opts %) routability-datoms) (map #(route-comment-datom! cfg opts %) comment-datoms)))] (when (seq routing) (p/all routing))))) @@ -1295,10 +1326,10 @@ :agent-name agent-name :logs logs :commands commands}}) - (do - (doseq [line (conj logs (log-line "listening graph changes ..."))] - (emit-log! cfg line)) - (if (:process-once? action) + (if (:process-once? action) + (do + (doseq [line (conj logs (log-line "listening graph changes ..."))] + (emit-log! cfg line)) (p/let [routed (process-tasks! cfg {:repo repo :graph graph :agent-name agent-name @@ -1308,12 +1339,15 @@ :data {:mode :processed-once :graph graph :agent-name agent-name - :routed routed}}) + :routed routed}})) + (let [listen-promise (listen-forever! cfg {:repo repo + :graph graph + :agent-name agent-name + :prompt-templates prompt-templates})] + (doseq [line (conj logs (log-line "listening graph changes ..."))] + (emit-log! cfg line)) (p/let [_ (process-tasks! cfg {:repo repo :graph graph :agent-name agent-name :prompt-templates prompt-templates})] - (listen-forever! cfg {:repo repo - :graph graph - :agent-name agent-name - :prompt-templates prompt-templates})))))))))))) + listen-promise))))))))))) diff --git a/src/main/logseq/db_worker/log.cljs b/src/main/logseq/db_worker/log.cljs index 228bf7884f..e03766cf8f 100644 --- a/src/main/logseq/db_worker/log.cljs +++ b/src/main/logseq/db_worker/log.cljs @@ -110,8 +110,13 @@ (instance? js/Error value) (or (.-stack value) (.-message value) (str value)) + (and encoding + (exists? js/Buffer) + (.isBuffer js/Buffer value)) + (.toString value encoding) + (and value (fn? (.-toString value))) - (.toString value (or encoding "utf8")) + (.toString value) :else (str value)))) diff --git a/src/resources/dicts/en.edn b/src/resources/dicts/en.edn index 5538c51031..a10bb823b1 100644 --- a/src/resources/dicts/en.edn +++ b/src/resources/dicts/en.edn @@ -1390,6 +1390,7 @@ :property/update-success "Property updated!" :property/use-choice-in-tag "Use in #{1}" + :property.built-in/agent-session-id "Agent Session ID" :property.built-in/alias "Alias" :property.built-in/asset "Asset" :property.built-in/asset-align "Asset alignment" diff --git a/src/resources/dicts/zh-cn.edn b/src/resources/dicts/zh-cn.edn index a4eb1a6cc0..c518eb8935 100644 --- a/src/resources/dicts/zh-cn.edn +++ b/src/resources/dicts/zh-cn.edn @@ -1379,6 +1379,7 @@ :property/update-success "属性已更新!" :property/use-choice-in-tag "用于 #{1}" + :property.built-in/agent-session-id "Agent 会话 ID" :property.built-in/alias "别名" :property.built-in/asset "附件" :property.built-in/asset-align "附件对齐" diff --git a/src/test/frontend/worker/db_worker_node_test.cljs b/src/test/frontend/worker/db_worker_node_test.cljs index e4c96b230b..dbd2beac53 100644 --- a/src/test/frontend/worker/db_worker_node_test.cljs +++ b/src/test/frontend/worker/db_worker_node_test.cljs @@ -311,6 +311,27 @@ (-> (stop!) (p/finally (fn [] (done)))) (done)))))))) +(deftest db-worker-node-logs-console-number-output + (async done + (let [daemon (atom nil) + data-dir (node-helper/create-tmp-dir "db-worker-log-console-number") + repo (str "logseq_db_log_console_number_" (subs (str (random-uuid)) 0 8)) + log-file (log-path data-dir repo)] + (-> (p/let [{:keys [stop!]} + (start-daemon! {:root-dir data-dir + :repo repo}) + _ (reset! daemon {:stop! stop!}) + _ (.error js/console 123) + _ (p/delay 50) + contents (.toString (fs/readFileSync log-file) "utf8")] + (is (string/includes? contents "123"))) + (p/catch (fn [e] + (is false (str "unexpected error: " e)))) + (p/finally (fn [] + (if-let [stop! (:stop! @daemon)] + (-> (stop!) (p/finally (fn [] (done)))) + (done)))))))) + (deftest db-worker-node-logs-process-stdout-output (async done (let [daemon (atom nil) diff --git a/src/test/frontend/worker/migrate_test.cljs b/src/test/frontend/worker/migrate_test.cljs index 751720a000..59d751e86a 100644 --- a/src/test/frontend/worker/migrate_test.cljs +++ b/src/test/frontend/worker/migrate_test.cljs @@ -187,6 +187,24 @@ (d/entity @conn [:block/uuid range-comments-uuid]))))) "Existing range comment targets should be preserved"))) +(deftest migrate-65-30-adds-assignee-property + (let [conn (d/create-conn db-schema/schema)] + (d/transact! conn [{:db/ident :logseq.kv/schema-version + :kv/value {:major 65 :minor 29}}]) + + (let [result (db-migrate/migrate conn :target-version {:major 65 :minor 30})] + (is (= {:major 65 :minor 30} + (:kv/value (d/entity @conn :logseq.kv/schema-version)))) + (let [property (d/entity @conn :logseq.property/assignee)] + (is (some? property)) + (is (= "Assignee" (:block/title property))) + (is (= :node (:logseq.property/type property))) + (is (= :db.cardinality/many (:db/cardinality property))) + (is (true? (:logseq.property/public? property)))) + (is (some #(= {:properties [:logseq.property/assignee]} + (:migrate-updates %)) + (:upgrade-result-coll result)))))) + (deftest migrate-65-31-adds-agent-session-id-property (let [conn (d/create-conn db-schema/schema)] (d/transact! conn [{:db/ident :logseq.kv/schema-version diff --git a/src/test/logseq/cli/command/agent_test.cljs b/src/test/logseq/cli/command/agent_test.cljs index d4fdd3c36f..da7038eff8 100644 --- a/src/test/logseq/cli/command/agent_test.cljs +++ b/src/test/logseq/cli/command/agent_test.cljs @@ -21,6 +21,14 @@ [] (.mkdtempSync fs (node-path/join (.tmpdir os) "logseq-agent-bridge-test-"))) +(defn- read-dict + [filename] + (reader/read-string + (.toString + (fs/readFileSync + (node-path/join (.cwd js/process) "src" "resources" "dicts" filename)) + "utf8"))) + (defn- task-block [overrides] (merge {:db/id 42 @@ -108,7 +116,11 @@ (get-in property [:properties :logseq.property/description]))) (is (contains? db-property/public-built-in-properties :logseq.property.agent/session-id)) (is (db-property/logseq-property? :logseq.property.agent/session-id)) - (is (db-property/internal-property? :logseq.property.agent/session-id))))) + (is (db-property/internal-property? :logseq.property.agent/session-id)) + (let [i18n-key (db-property/built-in-ident->i18n-key :logseq.property.agent/session-id)] + (is (= :property.built-in/agent-session-id i18n-key)) + (is (contains? (read-dict "en.edn") i18n-key)) + (is (contains? (read-dict "zh-cn.edn") i18n-key)))))) (deftest test-agent-command-entries (testing "parse agent bridge command surface" @@ -1308,6 +1320,110 @@ (is false (str "unexpected setup error: " e)) (done)))))) +(deftest test-execute-agent-bridge-connects-listener-before-ready-log-and-initial-scan + (async done + (let [calls (atom []) + call-index (fn [pred] + (first (keep-indexed (fn [idx call] + (when (pred call) idx)) + @calls)))] + (-> (p/with-redefs [agent-command/codex-available? (fn [_] true) + cli-server/ensure-server! (fn [cfg repo] + (swap! calls conj [:ensure-server repo]) + (assoc cfg :base-url "http://127.0.0.1:1234")) + agent-command/register-agent-bridge! (fn [_cfg repo agent-name] + (swap! calls conj [:register repo agent-name]) + (p/resolved true)) + agent-command/ensure-agent-bridge-prompt-templates! + (fn [_cfg repo] + (swap! calls conj [:prompt-templates repo]) + (p/resolved {:task "Task {{graph}} {{block-uuid}} {{agent-name}}\n{{task-block-tree}}" + :comment "Comment {{graph}} {{comment-uuid}} {{agent-name}}\n{{comment-target-context}}\n{{comment-thread-context}}\n{{requesting-comment}}"})) + transport/connect-events! (fn [_cfg _handler] + (swap! calls conj [:connect]) + {:close! (fn [] nil)}) + agent-command/list-routable-tasks (fn [_cfg repo agent-name] + (swap! calls conj [:list repo agent-name]) + (p/resolved []))] + (do + (agent-command/execute-bridge {:type :agent-bridge + :repo "logseq_db_demo" + :graph "demo" + :dry-run? false} + {:root-dir "/tmp/logseq" + :agent-name "build-host" + :log-fn (fn [line] + (swap! calls conj [:log line]))}) + (p/let [_ (p/delay 10)] + (let [connect-index (call-index #(= [:connect] %)) + listening-index (call-index #(and (= :log (first %)) + (string/includes? (second %) "listening graph changes"))) + list-index (call-index #(= :list (first %)))] + (is (some? connect-index)) + (is (some? listening-index)) + (is (some? list-index)) + (is (< connect-index listening-index)) + (is (< connect-index list-index)))))) + (p/catch (fn [e] + (is false (str "unexpected error: " e)))) + (p/finally done))))) + +(deftest test-execute-agent-bridge-bounds-initial-task-routing-concurrency + (async done + (let [root (temp-root) + active* (atom 0) + max-active* (atom 0) + routed* (atom []) + blocks (mapv (fn [idx] + (task-block {:db/id (+ 42 idx) + :block/uuid (uuid (str "11111111-1111-1111-1111-11111111111" idx))})) + (range 5))] + (try + (-> (p/with-redefs [agent-command/codex-available? (fn [_] true) + cli-server/ensure-server! (fn [cfg _repo] + (assoc cfg :base-url "http://127.0.0.1:1234")) + agent-command/register-agent-bridge! (fn [_cfg _repo _agent-name] + (p/resolved true)) + agent-command/ensure-agent-bridge-prompt-templates! + (fn [_cfg _repo] + (p/resolved {:task "Task {{graph}} {{block-uuid}} {{agent-name}}\n{{task-block-tree}}" + :comment "Comment {{graph}} {{comment-uuid}} {{agent-name}}\n{{comment-target-context}}\n{{comment-thread-context}}\n{{requesting-comment}}"})) + agent-command/list-routable-tasks (fn [_cfg _repo _agent-name] + (p/resolved (mapv (fn [block] + {:block block + :tree-text (:block/title block)}) + blocks))) + agent-command/start-codex! (fn [_command _opts] + (let [active (swap! active* inc)] + (swap! max-active* max active) + (p/let [_ (p/delay 20)] + (swap! active* dec) + {:session (str "session-" (random-uuid)) + :status :running}))) + agent-command/write-agent-session-id! (fn [_cfg _repo block-uuid session-id] + (swap! routed* conj [block-uuid session-id]) + (p/resolved true))] + (p/let [result (agent-command/execute-bridge {:type :agent-bridge + :repo "logseq_db_demo" + :graph "demo" + :dry-run? false + :process-once? true} + {:root-dir root + :agent-name "build-host" + :log-fn (fn [_] nil)})] + (is (= :ok (:status result))) + (is (= 5 (count @routed*))) + (is (<= @max-active* 4)))) + (p/catch (fn [e] + (is false (str "unexpected error: " e)))) + (p/finally (fn [] + (fs/rmSync root #js {:recursive true :force true}) + (done)))) + (catch :default e + (fs/rmSync root #js {:recursive true :force true}) + (is false (str "unexpected setup error: " e)) + (done)))))) + (deftest test-agent-bridge-listener-ignores-unrelated-events (async done (let [handler* (atom nil) @@ -1487,3 +1603,75 @@ (fs/rmSync root #js {:recursive true :force true}) (is false (str "unexpected setup error: " e)) (done)))))) + +(deftest test-agent-bridge-listener-routes-task-tag-and-status-datoms + (async done + (let [root (temp-root) + handler* (atom nil) + calls (atom []) + status-block (task-block {:db/id 42}) + tag-block (task-block {:db/id 43 + :block/uuid #uuid "22222222-2222-2222-2222-222222222222"})] + (try + (-> (p/with-redefs [transport/connect-events! (fn [_cfg handler] + (reset! handler* handler) + {:close! (fn [] nil)}) + agent-command/list-routable-tasks (fn [_cfg repo agent-name] + (swap! calls conj [:broad-scan repo agent-name]) + (p/resolved [])) + transport/invoke (fn [_cfg method args] + (swap! calls conj [method args]) + (case method + :thread-api/pull + (let [[_repo _selector lookup] args] + (cond + (= lookup 42) (p/resolved status-block) + (= lookup 43) (p/resolved tag-block) + :else (p/rejected (ex-info "unexpected pull" + {:lookup lookup})))) + (p/rejected (ex-info "unexpected invoke" + {:method method + :args args})))) + show-command/execute-show (fn [action _cfg] + (swap! calls conj [:show action]) + (p/resolved {:status :ok + :data {:message "- Routed task"}})) + cli-server/ensure-server! (fn [cfg repo] + (swap! calls conj [:ensure-server (:root-dir cfg) repo]) + (assoc cfg :base-url "http://127.0.0.1:1234")) + agent-command/start-codex! (fn [_command _opts] + (p/resolved {:session (str "session-" (random-uuid)) + :status :running})) + agent-command/write-agent-session-id! (fn [_cfg repo block-uuid session-id] + (swap! calls conj [:write-session repo block-uuid session-id]) + (p/resolved true))] + (do + (#'agent-command/listen-forever! {:root-dir root + :base-url "http://127.0.0.1:1234" + :log-fn (fn [_] nil)} + {:repo "logseq_db_demo" + :graph "demo" + :agent-name "build-host"}) + (@handler* :sync-db-changes {:tx-data [{:e 42 + :a :logseq.property/status + :v :logseq.property/status.todo + :added true} + {:e 43 + :a :block/tags + :v :logseq.class/Task + :added true}]}) + (p/let [_ (p/delay 10)] + (is (not-any? #(= :broad-scan (first %)) @calls)) + (is (= #{(:block/uuid status-block) + (:block/uuid tag-block)} + (set (map #(nth % 2) + (filter #(= :write-session (first %)) @calls)))))))) + (p/catch (fn [e] + (is false (str "unexpected error: " e)))) + (p/finally (fn [] + (fs/rmSync root #js {:recursive true :force true}) + (done)))) + (catch :default e + (fs/rmSync root #js {:recursive true :force true}) + (is false (str "unexpected setup error: " e)) + (done))))))