fix: agent bridge and logging

This commit is contained in:
rcmerci
2026-05-22 12:11:30 +08:00
parent 73fccfb0a9
commit 203ca43ffb
8 changed files with 305 additions and 20 deletions

View File

@@ -1,7 +1,18 @@
(ns logseq.db.frontend.property-test
(:require [cljs.test :refer [deftest is testing]]
(:require ["fs" :as fs]
["path" :as node-path]
[cljs.reader :as reader]
[cljs.test :refer [deftest is testing]]
[logseq.db.frontend.property :as db-property]))
(defn- read-dict
[filename]
(reader/read-string
(.toString
(fs/readFileSync
(node-path/join (.cwd js/process) "src" "resources" "dicts" filename))
"utf8")))
(deftest sort-properties
(let [p1 {:db/id 1, :block/order "a", :block/uuid "uuid-a"}
p2 {:db/id 2, :block/order "b", :block/uuid "uuid-b"}
@@ -102,4 +113,10 @@
(get-in property [:properties :logseq.property/description])))
(is (contains? db-property/public-built-in-properties :logseq.property.agent/session-id))
(is (db-property/logseq-property? :logseq.property.agent/session-id))
(is (db-property/internal-property? :logseq.property.agent/session-id)))))
(is (db-property/internal-property? :logseq.property.agent/session-id)))
(testing "localized display title"
(let [i18n-key (db-property/built-in-ident->i18n-key :logseq.property.agent/session-id)]
(is (= :property.built-in/agent-session-id i18n-key))
(is (contains? (read-dict "en.edn") i18n-key))
(is (contains? (read-dict "zh-cn.edn") i18n-key))))))

View File

@@ -936,15 +936,27 @@
(p/resolved nil))
(route-task! cfg opts task))))
(def ^:private max-concurrent-routes 4)
(defn- p-map-batched
[limit f coll]
(reduce (fn [result-promise batch]
(p/let [result result-promise
batch-result (p/all (mapv f batch))]
(into result batch-result)))
(p/resolved [])
(partition-all limit coll)))
(defn- process-tasks!
[cfg {:keys [repo graph agent-name prompt-templates]}]
(p/let [tasks (list-routable-tasks cfg repo agent-name)]
(p/all (mapv #(route-task-once! cfg {:repo repo
:graph graph
:agent-name agent-name
:prompt-templates prompt-templates}
%)
tasks))))
(p-map-batched max-concurrent-routes
#(route-task-once! cfg {:repo repo
:graph graph
:agent-name agent-name
:prompt-templates prompt-templates}
%)
tasks)))
(def ^:private assignee-property-ident :logseq.property/assignee)
@@ -1024,6 +1036,14 @@
(or (string/includes? (:v datom) (str "[[" agent-name "]]"))
(string/includes? (:v datom) "[["))))
(defn- task-routability-datom?
[datom]
(and (true? (:added datom))
(or (and (= :block/tags (:a datom))
(= :logseq.class/Task (:v datom)))
(and (= :logseq.property/status (:a datom))
(= :logseq.property/status.todo (:v datom))))))
(defn- pull-assignee-property
[cfg repo]
(transport/invoke cfg :thread-api/pull [repo assignee-property-selector assignee-property-ident]))
@@ -1212,6 +1232,15 @@
(route-task-once! cfg opts {:block block
:tree-text tree-text})))))))))
(defn- route-routability-datom!
[cfg {:keys [repo agent-name] :as opts} datom]
(when-let [block-id (:e datom)]
(p/let [block (pull-task-block cfg repo block-id)]
(when (routable-task? block agent-name)
(p/let [tree-text (show-task-tree cfg repo block)]
(route-task-once! cfg opts {:block block
:tree-text tree-text}))))))
(defn- route-comment-datom!
[cfg {:keys [repo agent-name] :as opts} datom]
(when-let [block-id (:e datom)]
@@ -1222,8 +1251,10 @@
(defn- process-sync-db-changes-event!
[cfg {:keys [repo] :as opts} {:keys [tx-data]}]
(p/let [assignee-datoms (resolve-assignee-datoms cfg repo tx-data)]
(let [comment-datoms (filter #(comment-title-datom? % (:agent-name opts)) tx-data)
(let [routability-datoms (filter task-routability-datom? tx-data)
comment-datoms (filter #(comment-title-datom? % (:agent-name opts)) tx-data)
routing (vec (concat (map #(route-assignee-datom! cfg opts %) assignee-datoms)
(map #(route-routability-datom! cfg opts %) routability-datoms)
(map #(route-comment-datom! cfg opts %) comment-datoms)))]
(when (seq routing)
(p/all routing)))))
@@ -1295,10 +1326,10 @@
:agent-name agent-name
:logs logs
:commands commands}})
(do
(doseq [line (conj logs (log-line "listening graph changes ..."))]
(emit-log! cfg line))
(if (:process-once? action)
(if (:process-once? action)
(do
(doseq [line (conj logs (log-line "listening graph changes ..."))]
(emit-log! cfg line))
(p/let [routed (process-tasks! cfg {:repo repo
:graph graph
:agent-name agent-name
@@ -1308,12 +1339,15 @@
:data {:mode :processed-once
:graph graph
:agent-name agent-name
:routed routed}})
:routed routed}}))
(let [listen-promise (listen-forever! cfg {:repo repo
:graph graph
:agent-name agent-name
:prompt-templates prompt-templates})]
(doseq [line (conj logs (log-line "listening graph changes ..."))]
(emit-log! cfg line))
(p/let [_ (process-tasks! cfg {:repo repo
:graph graph
:agent-name agent-name
:prompt-templates prompt-templates})]
(listen-forever! cfg {:repo repo
:graph graph
:agent-name agent-name
:prompt-templates prompt-templates}))))))))))))
listen-promise)))))))))))

View File

@@ -110,8 +110,13 @@
(instance? js/Error value)
(or (.-stack value) (.-message value) (str value))
(and encoding
(exists? js/Buffer)
(.isBuffer js/Buffer value))
(.toString value encoding)
(and value (fn? (.-toString value)))
(.toString value (or encoding "utf8"))
(.toString value)
:else
(str value))))

View File

@@ -1390,6 +1390,7 @@
:property/update-success "Property updated!"
:property/use-choice-in-tag "Use in #{1}"
:property.built-in/agent-session-id "Agent Session ID"
:property.built-in/alias "Alias"
:property.built-in/asset "Asset"
:property.built-in/asset-align "Asset alignment"

View File

@@ -1379,6 +1379,7 @@
:property/update-success "属性已更新!"
:property/use-choice-in-tag "用于 #{1}"
:property.built-in/agent-session-id "Agent 会话 ID"
:property.built-in/alias "别名"
:property.built-in/asset "附件"
:property.built-in/asset-align "附件对齐"

View File

@@ -311,6 +311,27 @@
(-> (stop!) (p/finally (fn [] (done))))
(done))))))))
(deftest db-worker-node-logs-console-number-output
(async done
(let [daemon (atom nil)
data-dir (node-helper/create-tmp-dir "db-worker-log-console-number")
repo (str "logseq_db_log_console_number_" (subs (str (random-uuid)) 0 8))
log-file (log-path data-dir repo)]
(-> (p/let [{:keys [stop!]}
(start-daemon! {:root-dir data-dir
:repo repo})
_ (reset! daemon {:stop! stop!})
_ (.error js/console 123)
_ (p/delay 50)
contents (.toString (fs/readFileSync log-file) "utf8")]
(is (string/includes? contents "123")))
(p/catch (fn [e]
(is false (str "unexpected error: " e))))
(p/finally (fn []
(if-let [stop! (:stop! @daemon)]
(-> (stop!) (p/finally (fn [] (done))))
(done))))))))
(deftest db-worker-node-logs-process-stdout-output
(async done
(let [daemon (atom nil)

View File

@@ -187,6 +187,24 @@
(d/entity @conn [:block/uuid range-comments-uuid])))))
"Existing range comment targets should be preserved")))
(deftest migrate-65-30-adds-assignee-property
(let [conn (d/create-conn db-schema/schema)]
(d/transact! conn [{:db/ident :logseq.kv/schema-version
:kv/value {:major 65 :minor 29}}])
(let [result (db-migrate/migrate conn :target-version {:major 65 :minor 30})]
(is (= {:major 65 :minor 30}
(:kv/value (d/entity @conn :logseq.kv/schema-version))))
(let [property (d/entity @conn :logseq.property/assignee)]
(is (some? property))
(is (= "Assignee" (:block/title property)))
(is (= :node (:logseq.property/type property)))
(is (= :db.cardinality/many (:db/cardinality property)))
(is (true? (:logseq.property/public? property))))
(is (some #(= {:properties [:logseq.property/assignee]}
(:migrate-updates %))
(:upgrade-result-coll result))))))
(deftest migrate-65-31-adds-agent-session-id-property
(let [conn (d/create-conn db-schema/schema)]
(d/transact! conn [{:db/ident :logseq.kv/schema-version

View File

@@ -21,6 +21,14 @@
[]
(.mkdtempSync fs (node-path/join (.tmpdir os) "logseq-agent-bridge-test-")))
(defn- read-dict
[filename]
(reader/read-string
(.toString
(fs/readFileSync
(node-path/join (.cwd js/process) "src" "resources" "dicts" filename))
"utf8")))
(defn- task-block
[overrides]
(merge {:db/id 42
@@ -108,7 +116,11 @@
(get-in property [:properties :logseq.property/description])))
(is (contains? db-property/public-built-in-properties :logseq.property.agent/session-id))
(is (db-property/logseq-property? :logseq.property.agent/session-id))
(is (db-property/internal-property? :logseq.property.agent/session-id)))))
(is (db-property/internal-property? :logseq.property.agent/session-id))
(let [i18n-key (db-property/built-in-ident->i18n-key :logseq.property.agent/session-id)]
(is (= :property.built-in/agent-session-id i18n-key))
(is (contains? (read-dict "en.edn") i18n-key))
(is (contains? (read-dict "zh-cn.edn") i18n-key))))))
(deftest test-agent-command-entries
(testing "parse agent bridge command surface"
@@ -1308,6 +1320,110 @@
(is false (str "unexpected setup error: " e))
(done))))))
(deftest test-execute-agent-bridge-connects-listener-before-ready-log-and-initial-scan
(async done
(let [calls (atom [])
call-index (fn [pred]
(first (keep-indexed (fn [idx call]
(when (pred call) idx))
@calls)))]
(-> (p/with-redefs [agent-command/codex-available? (fn [_] true)
cli-server/ensure-server! (fn [cfg repo]
(swap! calls conj [:ensure-server repo])
(assoc cfg :base-url "http://127.0.0.1:1234"))
agent-command/register-agent-bridge! (fn [_cfg repo agent-name]
(swap! calls conj [:register repo agent-name])
(p/resolved true))
agent-command/ensure-agent-bridge-prompt-templates!
(fn [_cfg repo]
(swap! calls conj [:prompt-templates repo])
(p/resolved {:task "Task {{graph}} {{block-uuid}} {{agent-name}}\n{{task-block-tree}}"
:comment "Comment {{graph}} {{comment-uuid}} {{agent-name}}\n{{comment-target-context}}\n{{comment-thread-context}}\n{{requesting-comment}}"}))
transport/connect-events! (fn [_cfg _handler]
(swap! calls conj [:connect])
{:close! (fn [] nil)})
agent-command/list-routable-tasks (fn [_cfg repo agent-name]
(swap! calls conj [:list repo agent-name])
(p/resolved []))]
(do
(agent-command/execute-bridge {:type :agent-bridge
:repo "logseq_db_demo"
:graph "demo"
:dry-run? false}
{:root-dir "/tmp/logseq"
:agent-name "build-host"
:log-fn (fn [line]
(swap! calls conj [:log line]))})
(p/let [_ (p/delay 10)]
(let [connect-index (call-index #(= [:connect] %))
listening-index (call-index #(and (= :log (first %))
(string/includes? (second %) "listening graph changes")))
list-index (call-index #(= :list (first %)))]
(is (some? connect-index))
(is (some? listening-index))
(is (some? list-index))
(is (< connect-index listening-index))
(is (< connect-index list-index))))))
(p/catch (fn [e]
(is false (str "unexpected error: " e))))
(p/finally done)))))
(deftest test-execute-agent-bridge-bounds-initial-task-routing-concurrency
(async done
(let [root (temp-root)
active* (atom 0)
max-active* (atom 0)
routed* (atom [])
blocks (mapv (fn [idx]
(task-block {:db/id (+ 42 idx)
:block/uuid (uuid (str "11111111-1111-1111-1111-11111111111" idx))}))
(range 5))]
(try
(-> (p/with-redefs [agent-command/codex-available? (fn [_] true)
cli-server/ensure-server! (fn [cfg _repo]
(assoc cfg :base-url "http://127.0.0.1:1234"))
agent-command/register-agent-bridge! (fn [_cfg _repo _agent-name]
(p/resolved true))
agent-command/ensure-agent-bridge-prompt-templates!
(fn [_cfg _repo]
(p/resolved {:task "Task {{graph}} {{block-uuid}} {{agent-name}}\n{{task-block-tree}}"
:comment "Comment {{graph}} {{comment-uuid}} {{agent-name}}\n{{comment-target-context}}\n{{comment-thread-context}}\n{{requesting-comment}}"}))
agent-command/list-routable-tasks (fn [_cfg _repo _agent-name]
(p/resolved (mapv (fn [block]
{:block block
:tree-text (:block/title block)})
blocks)))
agent-command/start-codex! (fn [_command _opts]
(let [active (swap! active* inc)]
(swap! max-active* max active)
(p/let [_ (p/delay 20)]
(swap! active* dec)
{:session (str "session-" (random-uuid))
:status :running})))
agent-command/write-agent-session-id! (fn [_cfg _repo block-uuid session-id]
(swap! routed* conj [block-uuid session-id])
(p/resolved true))]
(p/let [result (agent-command/execute-bridge {:type :agent-bridge
:repo "logseq_db_demo"
:graph "demo"
:dry-run? false
:process-once? true}
{:root-dir root
:agent-name "build-host"
:log-fn (fn [_] nil)})]
(is (= :ok (:status result)))
(is (= 5 (count @routed*)))
(is (<= @max-active* 4))))
(p/catch (fn [e]
(is false (str "unexpected error: " e))))
(p/finally (fn []
(fs/rmSync root #js {:recursive true :force true})
(done))))
(catch :default e
(fs/rmSync root #js {:recursive true :force true})
(is false (str "unexpected setup error: " e))
(done))))))
(deftest test-agent-bridge-listener-ignores-unrelated-events
(async done
(let [handler* (atom nil)
@@ -1487,3 +1603,75 @@
(fs/rmSync root #js {:recursive true :force true})
(is false (str "unexpected setup error: " e))
(done))))))
(deftest test-agent-bridge-listener-routes-task-tag-and-status-datoms
(async done
(let [root (temp-root)
handler* (atom nil)
calls (atom [])
status-block (task-block {:db/id 42})
tag-block (task-block {:db/id 43
:block/uuid #uuid "22222222-2222-2222-2222-222222222222"})]
(try
(-> (p/with-redefs [transport/connect-events! (fn [_cfg handler]
(reset! handler* handler)
{:close! (fn [] nil)})
agent-command/list-routable-tasks (fn [_cfg repo agent-name]
(swap! calls conj [:broad-scan repo agent-name])
(p/resolved []))
transport/invoke (fn [_cfg method args]
(swap! calls conj [method args])
(case method
:thread-api/pull
(let [[_repo _selector lookup] args]
(cond
(= lookup 42) (p/resolved status-block)
(= lookup 43) (p/resolved tag-block)
:else (p/rejected (ex-info "unexpected pull"
{:lookup lookup}))))
(p/rejected (ex-info "unexpected invoke"
{:method method
:args args}))))
show-command/execute-show (fn [action _cfg]
(swap! calls conj [:show action])
(p/resolved {:status :ok
:data {:message "- Routed task"}}))
cli-server/ensure-server! (fn [cfg repo]
(swap! calls conj [:ensure-server (:root-dir cfg) repo])
(assoc cfg :base-url "http://127.0.0.1:1234"))
agent-command/start-codex! (fn [_command _opts]
(p/resolved {:session (str "session-" (random-uuid))
:status :running}))
agent-command/write-agent-session-id! (fn [_cfg repo block-uuid session-id]
(swap! calls conj [:write-session repo block-uuid session-id])
(p/resolved true))]
(do
(#'agent-command/listen-forever! {:root-dir root
:base-url "http://127.0.0.1:1234"
:log-fn (fn [_] nil)}
{:repo "logseq_db_demo"
:graph "demo"
:agent-name "build-host"})
(@handler* :sync-db-changes {:tx-data [{:e 42
:a :logseq.property/status
:v :logseq.property/status.todo
:added true}
{:e 43
:a :block/tags
:v :logseq.class/Task
:added true}]})
(p/let [_ (p/delay 10)]
(is (not-any? #(= :broad-scan (first %)) @calls))
(is (= #{(:block/uuid status-block)
(:block/uuid tag-block)}
(set (map #(nth % 2)
(filter #(= :write-session (first %)) @calls))))))))
(p/catch (fn [e]
(is false (str "unexpected error: " e))))
(p/finally (fn []
(fs/rmSync root #js {:recursive true :force true})
(done))))
(catch :default e
(fs/rmSync root #js {:recursive true :force true})
(is false (str "unexpected setup error: " e))
(done))))))