diff --git a/cli-e2e/scripts/agent_bridge_e2e.py b/cli-e2e/scripts/agent_bridge_e2e.py index 2c8fec88d3..2f936050db 100644 --- a/cli-e2e/scripts/agent_bridge_e2e.py +++ b/cli-e2e/scripts/agent_bridge_e2e.py @@ -57,6 +57,20 @@ if args[:2] == ["exec", "--json"]: print(json.dumps({"type": "thread.started", "thread_id": session_id}), flush=True) sys.exit(0) +if args[:3] == ["exec", "resume", "--json"]: + session_id = args[3] if len(args) > 3 else "" + prompt = args[4] if len(args) > 4 else "" + comment_uuid_match = re.search(r"^Comment UUID: (.+)$", prompt, re.MULTILINE) + comment_uuid = comment_uuid_match.group(1) if comment_uuid_match else None + log_path = pathlib.Path(os.environ["CODEX_FAKE_LOG"]) + log_path.parent.mkdir(parents=True, exist_ok=True) + with log_path.open("a", encoding="utf8") as f: + f.write(json.dumps( + {"event": "resume", "time": time.time(), "args": args, "prompt": prompt, "comment_uuid": comment_uuid, "session": session_id}, + ensure_ascii=False) + "\\n") + print(json.dumps({"type": "thread.started", "thread_id": session_id}), flush=True) + sys.exit(0) + print("unexpected codex args: " + repr(args), file=sys.stderr) sys.exit(2) """, @@ -163,6 +177,20 @@ def find_task_id(cli, repo_root, root_dir, config, graph, title): return task_id +def find_task_uuid(cli, repo_root, root_dir, config, graph, title): + task_uuid = run_json( + cli, + repo_root, + root_dir, + config, + graph, + '[:find ?uuid . :where [?e :block/title "{}"] [?e :block/uuid ?uuid]]'.format(title), + ) + if task_uuid is None: + raise SystemExit("task block uuid was not found: {}".format(title)) + return task_uuid + + def create_task(cli, repo_root, root_dir, config, graph, title): run_cli( cli, @@ -255,6 +283,120 @@ def wait_for_task_sessions(cli, repo_root, root_dir, config, graph, titles, brid ) +def wait_for_codex_event(codex_log, event_name, bridge, bridge_log, bridge_err): + deadline = time.time() + 30 + events = [] + while time.time() < deadline: + events = read_codex_events(codex_log) + matches = [event for event in events if event.get("event") == event_name] + if matches: + return matches + if bridge.poll() is not None: + raise SystemExit( + "agent bridge exited early with {}\nstdout:\n{}\nstderr:\n{}".format( + bridge.returncode, read_text(bridge_log), read_text(bridge_err) + ) + ) + time.sleep(0.5) + raise SystemExit("codex event {!r} was not observed; events={!r}".format(event_name, events)) + + +def write_comment_blocks_file(path, task_uuid, hostname): + path.write_text( + """[{{:block/title "Comments" + :block/tags [:logseq.class/Comments] + :logseq.property.comments/blocks [[:block/uuid #uuid "{task_uuid}"]] + :block/children [{{:block/title "[[{hostname}]] please continue from the comment" + :block/tags [:logseq.class/Comment]}}]}}]""".format( + task_uuid=task_uuid, + hostname=hostname, + ), + encoding="utf8", + ) + + +def run_comment_mention_check(cli, repo_root, root_dir, config, graph, tmp_dir): + create_task(cli, repo_root, root_dir, config, graph, TASK_TITLE) + + fake_bin = tmp_dir / "fake-bin" + bridge_log = tmp_dir / "agent-bridge.log" + bridge_err = tmp_dir / "agent-bridge.err" + codex_log = tmp_dir / "codex-invocations.jsonl" + + env = os.environ.copy() + env["PATH"] = str(fake_bin) + os.pathsep + env.get("PATH", "") + env["CODEX_FAKE_LOG"] = str(codex_log) + + with bridge_log.open("wb") as out, bridge_err.open("wb") as err: + bridge = subprocess.Popen( + cli + + [ + "--root-dir", + root_dir, + "--config", + config, + "--output", + "human", + "agent", + "bridge", + "--graph", + graph, + ], + cwd=repo_root, + env=env, + stdout=out, + stderr=err, + ) + + try: + wait_for_log(bridge_log, "listening graph changes", bridge) + assign_task(cli, repo_root, root_dir, config, graph) + session = wait_for_task_sessions( + cli, repo_root, root_dir, config, graph, [TASK_TITLE], bridge, bridge_log, bridge_err + )[TASK_TITLE] + + task_id = find_task_id(cli, repo_root, root_dir, config, graph, TASK_TITLE) + task_uuid = find_task_uuid(cli, repo_root, root_dir, config, graph, TASK_TITLE) + blocks_file = tmp_dir / "comment-blocks.edn" + write_comment_blocks_file(blocks_file, task_uuid, os.uname().nodename) + run_cli( + cli, + repo_root, + root_dir, + config, + graph, + [ + "upsert", + "block", + "--graph", + graph, + "--target-id", + str(task_id), + "--pos", + "last-child", + "--blocks-file", + str(blocks_file), + ], + ) + + resume_events = wait_for_codex_event(codex_log, "resume", bridge, bridge_log, bridge_err) + if resume_events[0].get("session") != session: + raise SystemExit("comment resumed the wrong session: {!r}".format(resume_events[0])) + prompt = resume_events[0].get("prompt", "") + assert "You are handling a Logseq AgentBridge comment request." in prompt, prompt + assert "Comment target context:" in prompt, prompt + assert "Requesting comment:" in prompt, prompt + print("agent bridge resumed comment request in " + session) + finally: + if bridge.poll() is None: + bridge.terminate() + try: + bridge.wait(timeout=5) + except subprocess.TimeoutExpired: + bridge.kill() + bridge.wait(timeout=5) + + def run_parallel_assignment_check(cli, repo_root, root_dir, config, graph, tmp_dir): for title in PARALLEL_TASK_TITLES: create_task(cli, repo_root, root_dir, config, graph, title) @@ -340,6 +482,7 @@ def main(): parser.add_argument("--prepare-fake-codex-only", action="store_true") parser.add_argument("--assign-after-start", action="store_true") parser.add_argument("--parallel-assignment-check", action="store_true") + parser.add_argument("--comment-mention-check", action="store_true") args = parser.parse_args() repo_root = pathlib.Path(args.repo_root) @@ -358,6 +501,10 @@ def main(): run_parallel_assignment_check(cli, repo_root, args.root_dir, args.config, args.graph, tmp_dir) return + if args.comment_mention_check: + run_comment_mention_check(cli, repo_root, args.root_dir, args.config, args.graph, tmp_dir) + return + env = os.environ.copy() env["PATH"] = str(fake_bin) + os.pathsep + env.get("PATH", "") env["CODEX_FAKE_LOG"] = str(codex_log) diff --git a/cli-e2e/spec/non_sync_cases.edn b/cli-e2e/spec/non_sync_cases.edn index 4d50ae10bd..9157610758 100644 --- a/cli-e2e/spec/non_sync_cases.edn +++ b/cli-e2e/spec/non_sync_cases.edn @@ -1307,77 +1307,29 @@ PY" :server ["--graph"]}}, :tags [:server], :extends :non-sync/graph-json-env} - {:id "agent-bridge-routes-assigned-task", + {:id "agent-bridge-workflows", :setup - ["{{cli}} --root-dir {{root-dir-arg}} --config {{config-path-arg}} --output json upsert task --graph {{graph-arg}} --target-page AgentBridgeE2E --content '测试 agent bridge 功能,把当前task status设置为done' --status todo >/dev/null"], + ["mkdir -p '{{tmp-dir}}/assigned-root' && printf '{:output-format :json}\\n' > '{{tmp-dir}}/assigned-root/cli.edn' && {{cli}} --root-dir '{{tmp-dir}}/assigned-root' --config '{{tmp-dir}}/assigned-root/cli.edn' --output json graph create --graph cli-e2e-agent-bridge-assigned >/dev/null && {{cli}} --root-dir '{{tmp-dir}}/assigned-root' --config '{{tmp-dir}}/assigned-root/cli.edn' --output json upsert task --graph cli-e2e-agent-bridge-assigned --target-page AgentBridgeE2E --content '测试 agent bridge 功能,把当前task status设置为done' --status todo >/dev/null" + "mkdir -p '{{tmp-dir}}/parallel-root' && printf '{:output-format :json}\\n' > '{{tmp-dir}}/parallel-root/cli.edn' && {{cli}} --root-dir '{{tmp-dir}}/parallel-root' --config '{{tmp-dir}}/parallel-root/cli.edn' --output json graph create --graph cli-e2e-agent-bridge-parallel >/dev/null" + "mkdir -p '{{tmp-dir}}/comment-root' && printf '{:output-format :json}\\n' > '{{tmp-dir}}/comment-root/cli.edn' && {{cli}} --root-dir '{{tmp-dir}}/comment-root' --config '{{tmp-dir}}/comment-root/cli.edn' --output json graph create --graph cli-e2e-agent-bridge-comment >/dev/null"], :cmds - ["python3 '{{repo-root}}/cli-e2e/scripts/agent_bridge_e2e.py' --cli '{{repo-root}}/static/logseq-cli.js' --root-dir '{{root-dir}}' --config '{{config-path}}' --graph '{{graph}}' --tmp-dir '{{tmp-dir}}' --repo-root '{{repo-root}}' --assign-after-start"], - :expect {:exit 0, :stdout-contains ["agent bridge routed task to thread-e2e-agent-bridge"]}, - :covers - {:commands ["agent bridge"], - :options - {:global ["--config" "--graph" "--root-dir" "--output"], - :agent []}}, - :cleanup - ["{{cli}} --root-dir {{root-dir-arg}} --config {{config-path-arg}} --output json server stop --graph {{graph-arg}}"], - :tags [:agent], - :extends :non-sync/graph-json-env} - {:id "agent-bridge-routes-assigned-tasks-concurrently", - :cmds - ["python3 '{{repo-root}}/cli-e2e/scripts/agent_bridge_e2e.py' --cli '{{repo-root}}/static/logseq-cli.js' --root-dir '{{root-dir}}' --config '{{config-path}}' --graph '{{graph}}' --tmp-dir '{{tmp-dir}}' --repo-root '{{repo-root}}' --parallel-assignment-check"], - :expect {:exit 0, :stdout-contains ["agent bridge routed tasks concurrently:"]}, - :covers - {:commands ["agent bridge"], - :options - {:global ["--config" "--graph" "--root-dir" "--output"], - :agent []}}, - :cleanup - ["{{cli}} --root-dir {{root-dir-arg}} --config {{config-path-arg}} --output json server stop --graph {{graph-arg}}"], - :tags [:agent], - :extends :non-sync/graph-json-env} - {:id "agent-bridge-demo-script", - :cmds - ["bash '{{repo-root}}/cli-e2e/scripts/agent_bridge_demo.sh' --cli '{{repo-root}}/static/logseq-cli.js' --root-dir '{{tmp-dir}}/demo-root' --graph cli-e2e-agent-bridge-demo --repo-root '{{repo-root}}'"], + ["python3 '{{repo-root}}/cli-e2e/scripts/agent_bridge_e2e.py' --cli '{{repo-root}}/static/logseq-cli.js' --root-dir '{{tmp-dir}}/assigned-root' --config '{{tmp-dir}}/assigned-root/cli.edn' --graph cli-e2e-agent-bridge-assigned --tmp-dir '{{tmp-dir}}/assigned-work' --repo-root '{{repo-root}}' --assign-after-start" + "python3 '{{repo-root}}/cli-e2e/scripts/agent_bridge_e2e.py' --cli '{{repo-root}}/static/logseq-cli.js' --root-dir '{{tmp-dir}}/parallel-root' --config '{{tmp-dir}}/parallel-root/cli.edn' --graph cli-e2e-agent-bridge-parallel --tmp-dir '{{tmp-dir}}/parallel-work' --repo-root '{{repo-root}}' --parallel-assignment-check" + "python3 '{{repo-root}}/cli-e2e/scripts/agent_bridge_e2e.py' --cli '{{repo-root}}/static/logseq-cli.js' --root-dir '{{tmp-dir}}/comment-root' --config '{{tmp-dir}}/comment-root/cli.edn' --graph cli-e2e-agent-bridge-comment --tmp-dir '{{tmp-dir}}/comment-work' --repo-root '{{repo-root}}' --comment-mention-check" + "bash '{{repo-root}}/cli-e2e/scripts/agent_bridge_demo.sh' --cli '{{repo-root}}/static/logseq-cli.js' --root-dir '{{tmp-dir}}/demo-root' --graph cli-e2e-agent-bridge-demo --repo-root '{{repo-root}}'" + "mkdir -p '{{tmp-dir}}/list-root' && printf '{:output-format :json}\\n' > '{{tmp-dir}}/list-root/cli.edn' && {{cli}} --root-dir '{{tmp-dir}}/list-root' --config '{{tmp-dir}}/list-root/cli.edn' --output json agent bridge list --all"], :expect {:exit 0, - :stdout-contains ["agent bridge demo completed" - "task status: done" - "agent-session-id: thread-agent-bridge-demo"]}, + :stdout-json-paths {[:status] "ok", [:data :sessions] []}}, :covers - {:commands ["agent bridge"], + {:commands ["agent bridge" "agent bridge list"], :options {:global ["--config" "--graph" "--root-dir" "--output"], - :agent []}}, - :tags [:agent]} - {:id "agent-bridge-dry-run-json", - :setup - ["python3 '{{repo-root}}/cli-e2e/scripts/agent_bridge_e2e.py' --cli '{{repo-root}}/static/logseq-cli.js' --root-dir '{{root-dir}}' --config '{{config-path}}' --graph '{{graph}}' --tmp-dir '{{tmp-dir}}' --repo-root '{{repo-root}}' --prepare-fake-codex-only" - "{{cli}} --root-dir {{root-dir-arg}} --config {{config-path-arg}} --output json upsert task --graph {{graph-arg}} --target-page AgentBridgeE2E --content '测试 agent bridge 功能,把当前task status设置为done' --status todo >/dev/null" - "TASK_ID=\"$({{cli}} --root-dir {{root-dir-arg}} --config {{config-path-arg}} --output json query --graph {{graph-arg}} --query '[:find ?e . :where [?e :block/title \"测试 agent bridge 功能,把当前task status设置为done\"]]' | python3 -c 'import sys,json; print(json.load(sys.stdin)[\"data\"][\"result\"])')\"; {{cli}} --root-dir {{root-dir-arg}} --config {{config-path-arg}} --output json upsert block --graph {{graph-arg}} --id \"$TASK_ID\" --update-properties \"{\\\"Assignee\\\" \\\"$(hostname)\\\"}\" >/dev/null"], - :cmds - ["CODEX_FAKE_LOG={{tmp-dir-arg}}/dry-run-codex.jsonl PATH={{tmp-dir-arg}}/fake-bin:$PATH {{cli}} --root-dir {{root-dir-arg}} --config {{config-path-arg}} --output json agent bridge --graph {{graph-arg}} --dry-run"], - :expect - {:exit 0, - :stdout-json-paths {[:status] "ok", [:data :mode] "dry-run"}, - :stdout-contains ["测试 agent bridge 功能,把当前task status设置为done"]}, - :covers - {:commands ["agent bridge"], - :options - {:global ["--config" "--graph" "--root-dir" "--output"], - :agent ["--dry-run"]}}, - :cleanup - ["{{cli}} --root-dir {{root-dir-arg}} --config {{config-path-arg}} --output json server stop --graph {{graph-arg}}"], - :tags [:agent], - :extends :non-sync/graph-json-env} - {:id "agent-bridge-list-all-json", - :cmds - ["{{cli}} --root-dir {{root-dir-arg}} --config {{config-path-arg}} --output json agent bridge list --all"], - :expect {:exit 0, :stdout-json-paths {[:status] "ok", [:data :sessions] []}}, - :covers - {:commands ["agent bridge list"], - :options - {:global ["--config" "--root-dir" "--output"], :agent ["--all"]}}, + :cleanup + ["{{cli}} --root-dir '{{tmp-dir}}/assigned-root' --config '{{tmp-dir}}/assigned-root/cli.edn' --output json server stop --graph cli-e2e-agent-bridge-assigned" + "{{cli}} --root-dir '{{tmp-dir}}/parallel-root' --config '{{tmp-dir}}/parallel-root/cli.edn' --output json server stop --graph cli-e2e-agent-bridge-parallel" + "{{cli}} --root-dir '{{tmp-dir}}/comment-root' --config '{{tmp-dir}}/comment-root/cli.edn' --output json server stop --graph cli-e2e-agent-bridge-comment"], :tags [:agent]} {:id "skill-show-human", :cmds ["{{cli}} skill show"], diff --git a/cli-e2e/spec/non_sync_inventory.edn b/cli-e2e/spec/non_sync_inventory.edn index bf868c6180..7f4478024f 100644 --- a/cli-e2e/spec/non_sync_inventory.edn +++ b/cli-e2e/spec/non_sync_inventory.edn @@ -157,8 +157,7 @@ :agent {:commands ["agent bridge" "agent bridge list"] - :options ["--dry-run" - "--all"]} + :options ["--all"]} :completion {:commands ["completion"] diff --git a/src/main/frontend/worker/db_worker_node.cljs b/src/main/frontend/worker/db_worker_node.cljs index f3c470e126..ba8c697a6e 100644 --- a/src/main/frontend/worker/db_worker_node.cljs +++ b/src/main/frontend/worker/db_worker_node.cljs @@ -1,8 +1,6 @@ (ns frontend.worker.db-worker-node "Node.js daemon entrypoint for db-worker." - (:require ["fs" :as fs] - ["http" :as http] - ["path" :as node-path] + (:require ["http" :as http] [clojure.string :as string] [frontend.worker.db-core :as db-core] [frontend.worker.db-worker-node-lock :as db-lock] @@ -14,13 +12,13 @@ [logseq.cli.root-dir :as root-dir] [logseq.cli.style :as style] [logseq.db :as ldb] + [logseq.db-worker.log :as db-worker-log] [logseq.db-worker.server-list :as server-list] [promesa.core :as p])) (defonce ^:private *ready? (atom false)) (defonce ^:private *sse-clients (atom #{})) (defonce ^:private *lock-info (atom nil)) -(defonce ^:private *file-handler (atom nil)) (defonce ^:private *server-list-file (atom nil)) (defn- server-list-file-path @@ -412,69 +410,6 @@ :sync-download-graph? true} {})) -(defn- pad2 - [value] - (if (< value 10) - (str "0" value) - (str value))) - -(defn- yyyymmdd - [^js date] - (str (.getFullYear date) - (pad2 (inc (.getMonth date))) - (pad2 (.getDate date)))) - -(defn- log-path - [root-dir repo] - (let [root-dir (db-lock/resolve-root-dir root-dir) - repo-dir (db-lock/repo-dir (db-lock/graphs-dir root-dir) repo) - date-str (yyyymmdd (js/Date.))] - (node-path/join repo-dir (str "db-worker-node-" date-str ".log")))) - -(defn- log-files - [repo-dir] - (->> (when (fs/existsSync repo-dir) - (fs/readdirSync repo-dir)) - (filter (fn [^js name] - (re-matches #"db-worker-node-\d{8}\.log" name))) - (sort))) - -(defn- enforce-log-retention! - [repo-dir] - (let [files (log-files repo-dir) - excess (max 0 (- (count files) 7))] - (doseq [name (take excess files)] - (fs/unlinkSync (node-path/join repo-dir name))))) - -(defn- format-log-line - [{:keys [time level message logger-name exception]}] - (let [ts (.toISOString (js/Date. time)) - base (str ts - " [" - (name level) - "] [" - logger-name - "] " - (pr-str message))] - (str base (when exception (str " " (pr-str exception))) "\n"))) - -(defn- install-file-logger! - [{:keys [root-dir repo log-level]}] - (let [root-dir (db-lock/resolve-root-dir root-dir) - repo-dir (db-lock/repo-dir (db-lock/graphs-dir root-dir) repo) - file-path (log-path root-dir repo)] - (fs/mkdirSync repo-dir #js {:recursive true}) - (fs/writeFileSync file-path "" #js {:flag "a"}) - (enforce-log-retention! repo-dir) - (when-let [handler @*file-handler] - (log/remove-handler handler)) - (let [handler (fn [record] - (fs/appendFileSync file-path (format-log-line record)))] - (reset! *file-handler handler) - (log/add-handler handler)) - (log/set-levels {:glogi/root log-level}) - file-path)) - (defn- assert-lock-owner! [] (let [{:keys [path lock]} @*lock-info] @@ -534,7 +469,8 @@ (p/finally (fn [] (when (fn? on-stopped!) - (on-stopped!))))))))) + (on-stopped!)) + (db-worker-log/uninstall!)))))))) (defn- resolve-listening-daemon! [{:keys [server proxy repo host port* stop!* stopped? on-stopped!]} resolve] @@ -606,9 +542,9 @@ (try (let [root-dir (root-dir/ensure-root-dir! root-dir) server-list-file (server-list-file-path root-dir)] - (install-file-logger! {:root-dir root-dir - :repo repo - :log-level (keyword (or log-level "info"))}) + (db-worker-log/install! {:root-dir root-dir + :repo repo + :log-level (keyword (or log-level "info"))}) (log/info :db-worker-node-version {:build-time (build-version/build-time) :revision (build-version/revision)}) (reset! *ready? false) diff --git a/src/main/logseq/cli/command/agent.cljs b/src/main/logseq/cli/command/agent.cljs index c526abcdb0..21075d9d52 100644 --- a/src/main/logseq/cli/command/agent.cljs +++ b/src/main/logseq/cli/command/agent.cljs @@ -192,10 +192,43 @@ "Task block tree:" (or tree-text (:block/title block) "")])) +(defn- build-comment-codex-prompt + [{:keys [graph agent-name comment-tree-text comments-area-tree-text target-tree-texts] + comment-block :comment}] + (string/join + "\n" + ["You are handling a Logseq AgentBridge comment request." + "" + (str "Graph: " graph) + (str "Comment UUID: " (block-uuid-str comment-block)) + (str "AgentBridge name: " agent-name) + "" + "Do not operate outside the target graph." + "Complete the request from the mentioned comment." + "Report the final status, files changed, commands run, verification, and any blockers." + "" + "Comment target context:" + (string/join "\n" (remove string/blank? target-tree-texts)) + "" + "Comment thread context:" + (or comments-area-tree-text (:block/title (:block/parent comment-block)) "") + "" + "Requesting comment:" + (or comment-tree-text (:block/title comment-block) "") + "" + "Reply instructions:" + "For a short reply, append a comment after the requesting comment." + "For a long reply, write a normal block tree after the comments area and append a comment that references that tree." + "If the request is blocked or fails, make that clear in the reply."])) + (defn build-codex-command [prompt {:keys [codex-bin]}] [(or (trim-non-empty codex-bin) "codex") "exec" "--json" prompt]) +(defn- build-codex-resume-command + [session-id prompt {:keys [codex-bin]}] + [(or (trim-non-empty codex-bin) "codex") "exec" "resume" "--json" session-id prompt]) + (defn- shell-quote [value] (let [text (str value)] @@ -652,6 +685,46 @@ {:logseq.property/assignee [:db/id :block/title :block/name :db/ident]} '*]) +(def ^:private comment-block-selector + [:db/id + :block/uuid + :block/title + {:block/tags [:db/ident :block/title]} + {:block/refs [:db/id :block/title :block/name]} + {:block/parent [:db/id :block/uuid :block/title {:block/tags [:db/ident :block/title]}]} + '*]) + +(defn- comment-target-block-selector + [session-property-ident] + (cond-> [:db/id + :block/uuid + :block/title + {:logseq.property/assignee [:db/id :block/title :block/name :db/ident]} + '*] + session-property-ident + (conj {session-property-ident [:db/id :block/title :block/name]}))) + +(defn- comments-area-selector + [session-property-ident] + [:db/id + :block/uuid + :block/title + {:block/tags [:db/ident :block/title]} + {:logseq.property.comments/blocks (comment-target-block-selector session-property-ident)}]) + +(def ^:private comment-start-reaction "eyes") +(def ^:private comment-complete-reaction "white_check_mark") +(def ^:private comment-failed-reaction "x") + +(def ^:private comment-reaction-query + '[:find ?r . + :in $ ?target-uuid ?emoji-id + :where + [?target :block/uuid ?target-uuid] + [?r :logseq.property.reaction/target ?target] + [?r :logseq.property.reaction/emoji-id ?emoji-id] + [(missing? $ ?r :logseq.property/created-by-ref)]]) + (defn- datom-added? [datom] (cond @@ -701,6 +774,15 @@ (and (datom-added? datom) (= assignee-property-ident (datom-attr datom)))) +(defn- comment-title-datom? + [datom agent-name] + (and (datom-added? datom) + (seq agent-name) + (= :block/title (datom-attr datom)) + (string? (datom-value datom)) + (or (string/includes? (datom-value datom) (str "[[" agent-name "]]")) + (string/includes? (datom-value datom) "[[")))) + (defn- pull-assignee-property [cfg repo] (transport/invoke cfg :thread-api/pull [repo assignee-property-selector assignee-property-ident])) @@ -746,6 +828,145 @@ (or (get-in show-result [:data :message]) (:block/title block)))) +(defn- show-block-tree + [cfg repo block] + (p/let [show-result (show-command/execute-show {:type :show + :repo repo + :uuid (block-uuid-str block) + :level 100 + :linked-references? false + :ref-id-footer? false} + cfg)] + (or (get-in show-result [:data :message]) + (:block/title block)))) + +(defn- comment-tag? + [tag] + (or (= :logseq.class/Comment (value-ident tag)) + (= "Comment" (value-title tag)))) + +(defn- comments-area-tag? + [tag] + (or (= :logseq.class/Comments (value-ident tag)) + (= "Comments" (value-title tag)))) + +(defn- comment-block? + [block agent-name] + (and (:block/uuid block) + (some comment-tag? (:block/tags block)) + (or (string/includes? (or (:block/title block) "") (str "[[" agent-name "]]")) + (contains? (set (keep value-title (:block/refs block))) agent-name)))) + +(defn- comments-area? + [block] + (some comments-area-tag? (:block/tags block))) + +(defn- pull-comment-block + [cfg repo block-id] + (transport/invoke cfg :thread-api/pull [repo comment-block-selector block-id])) + +(defn- pull-comments-area + [cfg repo comment-block session-property-ident] + (let [parent-id (get-in comment-block [:block/parent :db/id])] + (when-not parent-id + (throw (ex-info "comment block parent is missing" + {:code :agent-comment-parent-missing + :block (block-uuid-str comment-block)}))) + (transport/invoke cfg :thread-api/pull [repo (comments-area-selector session-property-ident) parent-id]))) + +(defn- ensure-comment-reaction! + [cfg repo target-uuid emoji-id] + (p/let [existing (transport/invoke cfg :thread-api/q [repo [comment-reaction-query target-uuid emoji-id]])] + (when-not (if (sequential? existing) + (seq existing) + (some? existing)) + (transport/invoke cfg :thread-api/apply-outliner-ops + [repo [[:toggle-reaction [target-uuid emoji-id nil]]] {}])))) + +(defn- comment-session-record + [graph agent-name comment-block session-id status] + (assoc (session-record graph agent-name comment-block session-id status) + :request :comment)) + +(defn- comment-target-session-id + [agent-name session-property-ident block] + (when (contains? (assignee-values block) agent-name) + (or (agent-session-id block) + (some-> (get block session-property-ident) + value-title + trim-non-empty)))) + +(defn- route-comment! + [cfg {:keys [repo graph agent-name]} comment-block] + (p/catch + (p/let [comment-uuid (:block/uuid comment-block) + _ (when-not comment-uuid + (throw (ex-info "comment block uuid is missing" + {:code :agent-comment-uuid-missing}))) + session-property (pull-agent-session-id-property cfg repo) + session-property-ident (:db/ident session-property) + comments-area (pull-comments-area cfg repo comment-block session-property-ident) + _ (when-not (comments-area? comments-area) + (throw (ex-info "comment parent is not a comments area" + {:code :agent-comment-parent-invalid + :block (block-uuid-str comment-block)}))) + target-blocks (vec (:logseq.property.comments/blocks comments-area)) + _ (ensure-comment-reaction! cfg repo comment-uuid comment-start-reaction) + target-tree-texts (p/all (mapv #(show-block-tree cfg repo %) target-blocks)) + comments-area-tree-text (show-block-tree cfg repo comments-area) + comment-tree-text (show-block-tree cfg repo comment-block) + prompt (build-comment-codex-prompt {:graph graph + :agent-name agent-name + :comment comment-block + :target-tree-texts target-tree-texts + :comments-area-tree-text comments-area-tree-text + :comment-tree-text comment-tree-text}) + resume-session-id (some #(comment-target-session-id agent-name session-property-ident %) target-blocks) + command (if resume-session-id + (build-codex-resume-command resume-session-id prompt {}) + (build-codex-command prompt {})) + preview (command-preview command) + _ (emit-log! cfg (log-line (str "Codex command prepared for comment " (block-uuid-str comment-block) ": " preview))) + {:keys [session]} (start-codex! command + {:on-exit (fn [code session-id] + (when session-id + (update-session-status! cfg session-id + (if (zero? (or code 1)) + :completed + :failed))) + (-> (ensure-comment-reaction! cfg repo comment-uuid + (if (zero? (or code 1)) + comment-complete-reaction + comment-failed-reaction)) + (p/catch (fn [e] + (log/error :agent-bridge-comment-reaction-failed e)))))}) + _ (when-not (seq session) + (throw (ex-info "codex session id missing" + {:code :codex-session-id-missing}))) + cfg* (cli-server/ensure-server! cfg repo) + _ (record-session! cfg* (comment-session-record graph agent-name comment-block session :running))] + {:block (block-uuid-str comment-block) + :session session + :backend :codex + :preview preview + :request :comment}) + (fn [e] + (if-let [comment-uuid (:block/uuid comment-block)] + (p/let [_ (ensure-comment-reaction! cfg repo comment-uuid comment-failed-reaction)] + (throw e)) + (p/rejected e))))) + +(defn- route-comment-once! + [cfg {:keys [routing-blocks*] :as opts} comment-block] + (let [block-id (block-uuid-str comment-block)] + (if (and routing-blocks* block-id) + (if (claim-routing-block! routing-blocks* block-id) + (-> (route-comment! cfg opts comment-block) + (p/finally (fn [] + (swap! routing-blocks* disj block-id)))) + (p/resolved nil)) + (route-comment! cfg opts comment-block)))) + (defn- route-assignee-datom! [cfg {:keys [repo agent-name] :as opts} datom] (let [block-id (datom-e datom) @@ -759,11 +980,21 @@ (route-task-once! cfg opts {:block block :tree-text tree-text}))))))))) +(defn- route-comment-datom! + [cfg {:keys [repo agent-name] :as opts} datom] + (when-let [block-id (datom-e datom)] + (p/let [comment-block (pull-comment-block cfg repo block-id)] + (when (comment-block? comment-block agent-name) + (route-comment-once! cfg opts comment-block))))) + (defn- process-sync-db-changes-event! [cfg {:keys [repo] :as opts} {:keys [tx-data]}] (p/let [assignee-datoms (resolve-assignee-datoms cfg repo tx-data)] - (when (seq assignee-datoms) - (p/all (mapv #(route-assignee-datom! cfg opts %) assignee-datoms))))) + (let [comment-datoms (filter #(comment-title-datom? % (:agent-name opts)) tx-data) + routing (vec (concat (map #(route-assignee-datom! cfg opts %) assignee-datoms) + (map #(route-comment-datom! cfg opts %) comment-datoms)))] + (when (seq routing) + (p/all routing))))) (defn- listen-forever! [cfg {:keys [repo graph agent-name]}] diff --git a/src/main/logseq/db_worker/daemon.cljs b/src/main/logseq/db_worker/daemon.cljs index 1c56785e63..5c03bed1b9 100644 --- a/src/main/logseq/db_worker/daemon.cljs +++ b/src/main/logseq/db_worker/daemon.cljs @@ -5,6 +5,7 @@ ["http" :as http] [clojure.string :as string] [lambdaisland.glogi :as log] + [logseq.db-worker.log :as db-worker-log] [promesa.core :as p])) (def ^:private valid-owner-sources @@ -231,11 +232,20 @@ (do (log/warn :db-worker-daemon/missing-script {:repo repo :root-dir root-dir}) nil) - (let [child (.spawn child-process (.-execPath js/process) args #js {:detached detached? - :stdio (if detached? - "ignore" - "inherit") - :env env})] + (let [stdio-config (when detached? + (db-worker-log/child-stdio! {:root-dir root-dir + :repo repo})) + _ (when detached? + (aset env db-worker-log/stdio-redirected-env "1")) + child (try + (.spawn child-process (.-execPath js/process) args #js {:detached detached? + :stdio (if detached? + (:stdio stdio-config) + "inherit") + :env env}) + (finally + (when-let [close! (:close! stdio-config)] + (close!))))] (when detached? (.unref child)) child)))) diff --git a/src/main/logseq/db_worker/log.cljs b/src/main/logseq/db_worker/log.cljs new file mode 100644 index 0000000000..228bf7884f --- /dev/null +++ b/src/main/logseq/db_worker/log.cljs @@ -0,0 +1,293 @@ +(ns logseq.db-worker.log + "Unified db-worker-node logging." + (:require ["fs" :as fs] + ["path" :as node-path] + [clojure.string :as string] + [goog.log :as glog] + [lambdaisland.glogi :as log] + [logseq.cli.root-dir :as root-dir] + [logseq.common.graph-dir :as graph-dir])) + +(def stdio-redirected-env "LOGSEQ_DB_WORKER_NODE_STDIO_REDIRECTED_TO_LOG") + +(defonce ^:private *installed (atom nil)) +(defonce ^:private *writing? (atom false)) +(defonce ^:private *forwarding? (atom false)) + +(defn- pad2 + [value] + (if (< value 10) + (str "0" value) + (str value))) + +(defn- yyyymmdd + [^js date] + (str (.getFullYear date) + (pad2 (inc (.getMonth date))) + (pad2 (.getDate date)))) + +(defn resolve-root-dir + [root-dir] + (root-dir/normalize-root-dir root-dir)) + +(defn graphs-dir + [root-dir] + (root-dir/graphs-dir (resolve-root-dir root-dir))) + +(defn repo-dir + [root-dir repo] + (when-not (seq repo) + (throw (ex-info "repo is required" {:code :missing-repo}))) + (node-path/join (graphs-dir root-dir) + (graph-dir/repo->encoded-graph-dir-name repo))) + +(defn log-path + [root-dir repo] + (node-path/join (repo-dir root-dir repo) + (str "db-worker-node-" (yyyymmdd (js/Date.)) ".log"))) + +(defn- log-files + [graph-dir-path] + (->> (when (fs/existsSync graph-dir-path) + (fs/readdirSync graph-dir-path)) + (filter (fn [^js name] + (re-matches #"db-worker-node-\d{8}\.log" name))) + (sort))) + +(defn enforce-retention! + [graph-dir-path] + (let [files (log-files graph-dir-path) + excess (max 0 (- (count files) 7))] + (doseq [name (take excess files)] + (fs/unlinkSync (node-path/join graph-dir-path name))))) + +(defn- ensure-log-file! + [{:keys [root-dir repo]}] + (let [graph-dir-path (repo-dir root-dir repo) + file-path (log-path root-dir repo)] + (fs/mkdirSync graph-dir-path #js {:recursive true}) + (fs/writeFileSync file-path "" #js {:flag "a"}) + (enforce-retention! graph-dir-path) + {:repo-dir graph-dir-path + :file-path file-path})) + +(defn- format-glogi-line + [{:keys [time level message logger-name exception]}] + (let [ts (.toISOString (js/Date. time)) + base (str ts + " [" + (name level) + "] [" + logger-name + "] " + (pr-str message))] + (str base (when exception (str " " (pr-str exception))) "\n"))) + +(defn- goog-get-level + [^js logger] + (if (exists? glog/getLevel) + (^:cljs.analyzer/no-resolve glog/getLevel logger) + (.getLevel logger))) + +(defn- current-root-level + [] + (let [level (goog-get-level log/root-logger)] + (some (fn [[k v]] + (when (= v level) + k)) + log/levels))) + +(defn- value->text + ([value] (value->text value nil)) + ([value encoding] + (cond + (nil? value) + "" + + (string? value) + value + + (instance? js/Error value) + (or (.-stack value) (.-message value) (str value)) + + (and value (fn? (.-toString value))) + (.toString value (or encoding "utf8")) + + :else + (str value)))) + +(defn- args->text + [args] + (->> args + (map value->text) + (string/join " "))) + +(defn- chunk-args->text + [args] + (let [payload (first args) + encoding (when (string? (second args)) + (second args))] + (value->text payload encoding))) + +(defn- append-lines! + [file-path source text] + (when-not (or @*writing? @*forwarding?) + (reset! *writing? true) + (try + (let [text (string/replace (str text) #"\r\n?" "\n") + text (string/replace text #"\n$" "") + lines (if (string/blank? text) + [""] + (string/split text #"\n"))] + (doseq [line lines] + (fs/appendFileSync file-path + (str (.toISOString (js/Date.)) + " [stdio] [" + source + "] " + line + "\n")))) + (finally + (reset! *writing? false))))) + +(defn- call-original + [f this args] + (reset! *forwarding? true) + (try + (.apply f this (to-array args)) + (finally + (reset! *forwarding? false)))) + +(defn- call-print-original + [f value] + (reset! *forwarding? true) + (try + (f value) + (finally + (reset! *forwarding? false)))) + +(defn- wrap-print! + [{:keys [file-path stdio-redirected?] :as state}] + (let [original-print-fn *print-fn* + original-print-err-fn *print-err-fn*] + (set-print-fn! + (fn [value] + (append-lines! file-path "stdout" value) + (when-not stdio-redirected? + (call-print-original original-print-fn value)))) + (set-print-err-fn! + (fn [value] + (append-lines! file-path "stderr" value) + (when-not stdio-redirected? + (call-print-original original-print-err-fn value)))) + (assoc state + :original-print-fn original-print-fn + :original-print-err-fn original-print-err-fn))) + +(defn- wrap-console! + [{:keys [file-path stdio-redirected?] :as state}] + (let [original-log (.-log js/console) + original-warn (.-warn js/console) + original-error (.-error js/console)] + (set! (.-log js/console) + (fn [& args] + (append-lines! file-path "console.log" (args->text args)) + (when-not stdio-redirected? + (call-original original-log js/console args)))) + (set! (.-warn js/console) + (fn [& args] + (append-lines! file-path "console.warn" (args->text args)) + (when-not stdio-redirected? + (call-original original-warn js/console args)))) + (set! (.-error js/console) + (fn [& args] + (append-lines! file-path "console.error" (args->text args)) + (when-not stdio-redirected? + (call-original original-error js/console args)))) + (assoc state + :original-console-log original-log + :original-console-warn original-warn + :original-console-error original-error))) + +(defn- wrap-stream! + [stream source file-path stdio-redirected?] + (let [original-write (.-write stream)] + (set! (.-write stream) + (fn [& args] + (append-lines! file-path source (chunk-args->text args)) + (if stdio-redirected? + true + (call-original original-write stream args)))) + original-write)) + +(defn- wrap-streams! + [{:keys [file-path stdio-redirected?] :as state}] + (assoc state + :original-stdout-write (wrap-stream! (.-stdout js/process) "stdout" file-path stdio-redirected?) + :original-stderr-write (wrap-stream! (.-stderr js/process) "stderr" file-path stdio-redirected?))) + +(defn uninstall! + [] + (when-let [{:keys [handler + original-print-fn + original-print-err-fn + original-console-log + original-console-warn + original-console-error + original-stdout-write + original-stderr-write + original-root-level]} @*installed] + (when handler + (log/remove-handler handler)) + (when original-print-fn + (set-print-fn! original-print-fn)) + (when original-print-err-fn + (set-print-err-fn! original-print-err-fn)) + (when original-console-log + (set! (.-log js/console) original-console-log)) + (when original-console-warn + (set! (.-warn js/console) original-console-warn)) + (when original-console-error + (set! (.-error js/console) original-console-error)) + (when original-stdout-write + (set! (.-write (.-stdout js/process)) original-stdout-write)) + (when original-stderr-write + (set! (.-write (.-stderr js/process)) original-stderr-write)) + (when original-root-level + (log/set-level :glogi/root original-root-level)) + (reset! *installed nil))) + +(defn install! + [{:keys [root-dir repo log-level]}] + (uninstall!) + (let [{:keys [file-path]} (ensure-log-file! {:root-dir root-dir :repo repo}) + stdio-redirected? (= "1" (aget (.-env js/process) stdio-redirected-env)) + original-root-level (current-root-level) + handler (fn [record] + (when-not @*forwarding? + (fs/appendFileSync file-path (format-glogi-line record)))) + state (-> {:file-path file-path + :handler handler + :original-root-level original-root-level + :stdio-redirected? stdio-redirected?} + wrap-print! + wrap-console! + wrap-streams!)] + (log/add-handler handler) + (log/set-levels {:glogi/root (or log-level :info)}) + (reset! *installed state) + file-path)) + +(defn child-stdio! + [{:keys [root-dir repo]}] + (let [{:keys [file-path]} (ensure-log-file! {:root-dir root-dir :repo repo}) + stdout-fd (fs/openSync file-path "a")] + (try + (let [stderr-fd (fs/openSync file-path "a")] + {:stdio #js ["ignore" stdout-fd stderr-fd] + :close! (fn [] + (fs/closeSync stdout-fd) + (fs/closeSync stderr-fd))}) + (catch :default e + (fs/closeSync stdout-fd) + (throw e))))) diff --git a/src/test/frontend/worker/db_worker_node_test.cljs b/src/test/frontend/worker/db_worker_node_test.cljs index b8a7622c43..e4c96b230b 100644 --- a/src/test/frontend/worker/db_worker_node_test.cljs +++ b/src/test/frontend/worker/db_worker_node_test.cljs @@ -17,6 +17,7 @@ [logseq.common.config :as common-config] [logseq.common.version :as build-version] [logseq.db :as ldb] + [logseq.db-worker.log :as db-worker-log] [promesa.core :as p])) (defn- http-request @@ -102,23 +103,9 @@ [root-dir repo] (db-lock/lock-path root-dir repo)) -(defn- pad2 - [value] - (if (< value 10) - (str "0" value) - (str value))) - -(defn- yyyymmdd - [^js date] - (str (.getFullYear date) - (pad2 (inc (.getMonth date))) - (pad2 (.getDate date)))) - (defn- log-path [root-dir repo] - (let [repo-dir (db-lock/repo-dir (db-lock/graphs-dir root-dir) repo) - date-str (yyyymmdd (js/Date.))] - (node-path/join repo-dir (str "db-worker-node-" date-str ".log")))) + (db-worker-log/log-path root-dir repo)) (defn- start-daemon! "Start daemon with quiet logging by default" @@ -152,7 +139,7 @@ (reset! @#'db-worker-node/*ready? false) (reset! @#'db-worker-node/*sse-clients #{}) (reset! @#'db-worker-node/*lock-info nil) - (reset! @#'db-worker-node/*file-handler nil)) + (db-worker-log/uninstall!)) (defn- normalize-db-worker-state-before [] @@ -280,9 +267,74 @@ (-> (stop!) (p/finally (fn [] (done)))) (done)))))))) +(deftest db-worker-node-logs-println-output + (async done + (let [daemon (atom nil) + data-dir (node-helper/create-tmp-dir "db-worker-log-println") + repo (str "logseq_db_log_println_" (subs (str (random-uuid)) 0 8)) + log-file (log-path data-dir repo) + message (str "println output " (random-uuid))] + (-> (p/let [{:keys [stop!]} + (start-daemon! {:root-dir data-dir + :repo repo}) + _ (reset! daemon {:stop! stop!}) + _ (println message) + _ (p/delay 50) + contents (.toString (fs/readFileSync log-file) "utf8")] + (is (string/includes? contents message))) + (p/catch (fn [e] + (is false (str "unexpected error: " e)))) + (p/finally (fn [] + (if-let [stop! (:stop! @daemon)] + (-> (stop!) (p/finally (fn [] (done)))) + (done)))))))) + +(deftest db-worker-node-logs-console-error-output + (async done + (let [daemon (atom nil) + data-dir (node-helper/create-tmp-dir "db-worker-log-console-error") + repo (str "logseq_db_log_console_error_" (subs (str (random-uuid)) 0 8)) + log-file (log-path data-dir repo) + message (str "console error output " (random-uuid))] + (-> (p/let [{:keys [stop!]} + (start-daemon! {:root-dir data-dir + :repo repo}) + _ (reset! daemon {:stop! stop!}) + _ (.error js/console message) + _ (p/delay 50) + contents (.toString (fs/readFileSync log-file) "utf8")] + (is (string/includes? contents message))) + (p/catch (fn [e] + (is false (str "unexpected error: " e)))) + (p/finally (fn [] + (if-let [stop! (:stop! @daemon)] + (-> (stop!) (p/finally (fn [] (done)))) + (done)))))))) + +(deftest db-worker-node-logs-process-stdout-output + (async done + (let [daemon (atom nil) + data-dir (node-helper/create-tmp-dir "db-worker-log-stdout") + repo (str "logseq_db_log_stdout_" (subs (str (random-uuid)) 0 8)) + log-file (log-path data-dir repo) + message (str "stdout output " (random-uuid))] + (-> (p/let [{:keys [stop!]} + (start-daemon! {:root-dir data-dir + :repo repo}) + _ (reset! daemon {:stop! stop!}) + _ (.write (.-stdout js/process) (str message "\n")) + _ (p/delay 50) + contents (.toString (fs/readFileSync log-file) "utf8")] + (is (string/includes? contents message))) + (p/catch (fn [e] + (is false (str "unexpected error: " e)))) + (p/finally (fn [] + (if-let [stop! (:stop! @daemon)] + (-> (stop!) (p/finally (fn [] (done)))) + (done)))))))) + (deftest db-worker-node-log-retention - (let [enforce-log-retention! #'db-worker-node/enforce-log-retention! - data-dir (node-helper/create-tmp-dir "db-worker-log-retention") + (let [data-dir (node-helper/create-tmp-dir "db-worker-log-retention") repo (str "logseq_db_log_retention_" (subs (str (random-uuid)) 0 8)) repo-dir (db-lock/repo-dir data-dir repo) days ["20240101" "20240102" "20240103" "20240104" "20240105" @@ -292,7 +344,7 @@ (fs/mkdirSync repo-dir #js {:recursive true}) (doseq [day days] (fs/writeFileSync (make-log day) "log\n")) - (enforce-log-retention! repo-dir) + (db-worker-log/enforce-retention! repo-dir) (let [remaining (->> (fs/readdirSync repo-dir) (filter (fn [^js name] (re-matches #"db-worker-node-\d{8}\.log" name))) diff --git a/src/test/logseq/cli/command/agent_test.cljs b/src/test/logseq/cli/command/agent_test.cljs index ad553f2631..131d971e44 100644 --- a/src/test/logseq/cli/command/agent_test.cljs +++ b/src/test/logseq/cli/command/agent_test.cljs @@ -31,6 +31,60 @@ "Assignee" "build-host"} overrides)) +(defn- comment-block + [overrides] + (merge {:db/id 50 + :block/uuid #uuid "55555555-5555-5555-5555-555555555555" + :block/title "[[build-host]] please summarize the selected blocks" + :block/tags [{:db/ident :logseq.class/Comment + :block/title "Comment"}] + :block/parent {:db/id 60 + :block/uuid #uuid "66666666-6666-6666-6666-666666666666"}} + overrides)) + +(defn- comments-area-block + [overrides] + (merge {:db/id 60 + :block/uuid #uuid "66666666-6666-6666-6666-666666666666" + :block/title "Comments" + :block/tags [{:db/ident :logseq.class/Comments + :block/title "Comments"}] + :logseq.property.comments/blocks [{:db/id 70 + :block/uuid #uuid "77777777-7777-7777-7777-777777777777" + :block/title "Target block A"} + {:db/id 80 + :block/uuid #uuid "88888888-8888-8888-8888-888888888888" + :block/title "Target block B"}]} + overrides)) + +(defn- assert-comment-request-prompt + [prompt] + (doseq [expected ["You are handling a Logseq AgentBridge comment request." + "Graph: demo" + "Comment UUID: 55555555-5555-5555-5555-555555555555" + "AgentBridge name: build-host" + "Comment target context:" + "- Target block A\n - Target A child" + "- Target block B\n - Target B child" + "Comment thread context:" + "- Comments\n - [[build-host]] please summarize the selected blocks" + "Requesting comment:" + "For a short reply, append a comment after the requesting comment." + "For a long reply, write a normal block tree after the comments area and append a comment that references that tree."]] + (is (string/includes? prompt expected)))) + +(defn- assert-basic-comment-request-prompt + [prompt] + (doseq [expected ["You are handling a Logseq AgentBridge comment request." + "Graph: demo" + "Comment UUID: 55555555-5555-5555-5555-555555555555" + "AgentBridge name: build-host" + "Comment target context:" + "Comment thread context:" + "Requesting comment:" + "Reply instructions:"]] + (is (string/includes? prompt expected)))) + (deftest test-assignee-built-in-property (let [property (get db-property/built-in-properties :logseq.property/assignee)] (is (= "Assignee" (:title property))) @@ -157,6 +211,389 @@ (is (string/starts-with? preview "codex exec --json '")) (is (string/includes? preview "Ship the CLI bridge"))))) +(deftest test-agent-bridge-listener-routes-comment-mention-with-context-and-reactions + (async done + (let [root (temp-root) + calls (atom []) + start-on-exit* (atom nil) + request-comment (comment-block {}) + comments-area (comments-area-block {}) + comment-uuid (:block/uuid request-comment) + tree-by-uuid {"55555555-5555-5555-5555-555555555555" + "- [[build-host]] please summarize the selected blocks" + "66666666-6666-6666-6666-666666666666" + "- Comments\n - [[build-host]] please summarize the selected blocks\n - Earlier comment" + "77777777-7777-7777-7777-777777777777" + "- Target block A\n - Target A child" + "88888888-8888-8888-8888-888888888888" + "- Target block B\n - Target B child"}] + (-> (p/with-redefs [agent-command/list-routable-tasks + (fn [_cfg repo agent-name] + (swap! calls conj [:broad-scan repo agent-name]) + (p/resolved [])) + transport/invoke + (fn [_cfg method args] + (swap! calls conj [method args]) + (case method + :thread-api/pull + (let [[_repo _selector lookup] args] + (case lookup + 50 (p/resolved request-comment) + 60 (p/resolved comments-area) + (p/rejected (ex-info "unexpected pull" + {:lookup lookup})))) + + :thread-api/q + (p/resolved []) + + :thread-api/apply-outliner-ops + (p/resolved {:ok true}) + + (p/rejected (ex-info "unexpected invoke" + {:method method + :args args})))) + show-command/execute-show + (fn [action _cfg] + (swap! calls conj [:show action]) + (p/resolved {:status :ok + :data {:message (get tree-by-uuid (:uuid action))}})) + cli-server/ensure-server! + (fn [cfg repo] + (swap! calls conj [:ensure-server (:root-dir cfg) repo]) + (assoc cfg :base-url "http://127.0.0.1:1234")) + agent-command/start-codex! + (fn [command opts] + (swap! calls conj [:codex command]) + (reset! start-on-exit* (:on-exit opts)) + ((:on-exit opts) 0 "comment-session-123") + (p/resolved {:session "comment-session-123" + :status :running}))] + (#'agent-command/process-sync-db-changes-event! + {:root-dir root + :base-url "http://127.0.0.1:1234" + :log-fn (fn [_] nil)} + {:repo "logseq_db_demo" + :graph "demo" + :agent-name "build-host" + :routing-blocks* (atom #{})} + {:tx-data [{:e 50 + :a :block/title + :v (:block/title request-comment)}]})) + (p/then (fn [_] + (is (not-any? #(= :broad-scan (first %)) @calls)) + (is (some #(= [:thread-api/apply-outliner-ops + ["logseq_db_demo" + [[:toggle-reaction [comment-uuid "eyes" nil]]] + {}]] + %) + @calls)) + (let [[_ command] (some #(when (= :codex (first %)) %) @calls)] + (is (some? command)) + (when command + (assert-comment-request-prompt (nth command 3)))) + (is (fn? @start-on-exit*)) + (p/let [_ (p/delay 10)] + (is (some #(= [:thread-api/apply-outliner-ops + ["logseq_db_demo" + [[:toggle-reaction [comment-uuid "white_check_mark" nil]]] + {}]] + %) + @calls))))) + (p/catch (fn [e] + (is false (str "unexpected error: " e)))) + (p/finally (fn [] + (fs/rmSync root #js {:recursive true :force true}) + (done))))))) + +(deftest test-agent-bridge-listener-routes-normalized-comment-mention-ref + (async done + (let [root (temp-root) + calls (atom []) + request-comment (comment-block {:block/title "[[aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa]] please summarize" + :block/refs [{:db/id 101 + :block/title "build-host"}]}) + comments-area (comments-area-block {}) + comment-uuid (:block/uuid request-comment)] + (-> (p/with-redefs [transport/invoke + (fn [_cfg method args] + (swap! calls conj [method args]) + (case method + :thread-api/pull + (let [[_repo _selector lookup] args] + (case lookup + 50 (p/resolved request-comment) + 60 (p/resolved comments-area) + (p/rejected (ex-info "unexpected pull" + {:lookup lookup})))) + :thread-api/q + (p/resolved []) + :thread-api/apply-outliner-ops + (p/resolved {:ok true}) + (p/rejected (ex-info "unexpected invoke" + {:method method + :args args})))) + show-command/execute-show + (fn [_action _cfg] + (p/resolved {:status :ok + :data {:message "- context"}})) + cli-server/ensure-server! + (fn [cfg _repo] cfg) + agent-command/start-codex! + (fn [command _opts] + (swap! calls conj [:codex command]) + (p/resolved {:session "comment-session-ref" + :status :running}))] + (#'agent-command/process-sync-db-changes-event! + {:root-dir root + :base-url "http://127.0.0.1:1234" + :log-fn (fn [_] nil)} + {:repo "logseq_db_demo" + :graph "demo" + :agent-name "build-host" + :routing-blocks* (atom #{})} + {:tx-data [{:e 50 + :a :block/title + :v (:block/title request-comment)}]})) + (p/then (fn [_] + (is (some #(= :codex (first %)) @calls)) + (is (some #(= [:thread-api/apply-outliner-ops + ["logseq_db_demo" + [[:toggle-reaction [comment-uuid "eyes" nil]]] + {}]] + %) + @calls)))) + (p/catch (fn [e] + (is false (str "unexpected error: " e)))) + (p/finally (fn [] + (fs/rmSync root #js {:recursive true :force true}) + (done))))))) + +(deftest test-agent-bridge-listener-resumes-commented-block-session-when-assignee-matches + (async done + (let [root (temp-root) + calls (atom []) + request-comment (comment-block {}) + commented-block (assoc (first (:logseq.property.comments/blocks (comments-area-block {}))) + "agent-session-id" "existing-session-123" + :logseq.property/assignee [{:db/id 101 + :block/title "build-host"}]) + comments-area (comments-area-block {:logseq.property.comments/blocks [commented-block]})] + (-> (p/with-redefs [transport/invoke + (fn [_cfg method args] + (swap! calls conj [method args]) + (case method + :thread-api/pull + (let [[_repo _selector lookup] args] + (case lookup + 50 (p/resolved request-comment) + 60 (p/resolved comments-area) + (p/rejected (ex-info "unexpected pull" + {:lookup lookup})))) + :thread-api/q + (p/resolved []) + :thread-api/apply-outliner-ops + (p/resolved {:ok true}) + (p/rejected (ex-info "unexpected invoke" + {:method method + :args args})))) + show-command/execute-show + (fn [_action _cfg] + (p/resolved {:status :ok + :data {:message "- context"}})) + cli-server/ensure-server! + (fn [cfg _repo] cfg) + agent-command/start-codex! + (fn [command _opts] + (swap! calls conj [:codex command]) + (p/resolved {:session "existing-session-123" + :status :running}))] + (#'agent-command/process-sync-db-changes-event! + {:root-dir root + :base-url "http://127.0.0.1:1234" + :log-fn (fn [_] nil)} + {:repo "logseq_db_demo" + :graph "demo" + :agent-name "build-host" + :routing-blocks* (atom #{})} + {:tx-data [{:e 50 + :a :block/title + :v (:block/title request-comment)}]})) + (p/then (fn [_] + (let [[_ command] (some #(when (= :codex (first %)) %) @calls)] + (is (some? command)) + (is (= ["codex" "exec" "resume" "--json" "existing-session-123"] + (vec (take 5 command)))) + (when command + (assert-basic-comment-request-prompt (last command)))))) + (p/catch (fn [e] + (is false (str "unexpected error: " e)))) + (p/finally (fn [] + (fs/rmSync root #js {:recursive true :force true}) + (done))))))) + +(deftest test-agent-bridge-listener-starts-temporary-comment-session-when-assignee-mismatches + (async done + (let [root (temp-root) + calls (atom []) + request-comment (comment-block {}) + commented-block (assoc (first (:logseq.property.comments/blocks (comments-area-block {}))) + "agent-session-id" "existing-session-123" + :logseq.property/assignee [{:db/id 101 + :block/title "other-host"}]) + comments-area (comments-area-block {:logseq.property.comments/blocks [commented-block]})] + (-> (p/with-redefs [transport/invoke + (fn [_cfg method args] + (swap! calls conj [method args]) + (case method + :thread-api/pull + (let [[_repo _selector lookup] args] + (case lookup + 50 (p/resolved request-comment) + 60 (p/resolved comments-area) + (p/rejected (ex-info "unexpected pull" + {:lookup lookup})))) + :thread-api/q + (p/resolved []) + :thread-api/apply-outliner-ops + (p/resolved {:ok true}) + (p/rejected (ex-info "unexpected invoke" + {:method method + :args args})))) + show-command/execute-show + (fn [_action _cfg] + (p/resolved {:status :ok + :data {:message "- context"}})) + cli-server/ensure-server! + (fn [cfg _repo] cfg) + agent-command/start-codex! + (fn [command _opts] + (swap! calls conj [:codex command]) + (p/resolved {:session "comment-session-789" + :status :running}))] + (#'agent-command/process-sync-db-changes-event! + {:root-dir root + :base-url "http://127.0.0.1:1234" + :log-fn (fn [_] nil)} + {:repo "logseq_db_demo" + :graph "demo" + :agent-name "build-host" + :routing-blocks* (atom #{})} + {:tx-data [{:e 50 + :a :block/title + :v (:block/title request-comment)}]})) + (p/then (fn [_] + (let [[_ command] (some #(when (= :codex (first %)) %) @calls)] + (is (some? command)) + (is (= ["codex" "exec" "--json"] + (vec (take 3 command)))) + (when command + (assert-basic-comment-request-prompt (last command)))))) + (p/catch (fn [e] + (is false (str "unexpected error: " e)))) + (p/finally (fn [] + (fs/rmSync root #js {:recursive true :force true}) + (done))))))) + +(deftest test-agent-bridge-listener-adds-failure-reaction-when-comment-codex-exits-nonzero + (async done + (let [root (temp-root) + calls (atom []) + start-on-exit* (atom nil) + request-comment (comment-block {}) + comments-area (comments-area-block {}) + comment-uuid (:block/uuid request-comment)] + (-> (p/with-redefs [transport/invoke + (fn [_cfg method args] + (swap! calls conj [method args]) + (case method + :thread-api/pull + (let [[_repo _selector lookup] args] + (case lookup + 50 (p/resolved request-comment) + 60 (p/resolved comments-area) + (p/rejected (ex-info "unexpected pull" + {:lookup lookup})))) + :thread-api/q + (p/resolved []) + :thread-api/apply-outliner-ops + (p/resolved {:ok true}) + (p/rejected (ex-info "unexpected invoke" + {:method method + :args args})))) + show-command/execute-show + (fn [_action _cfg] + (p/resolved {:status :ok + :data {:message "- context"}})) + cli-server/ensure-server! + (fn [cfg _repo] cfg) + agent-command/start-codex! + (fn [_command opts] + (reset! start-on-exit* (:on-exit opts)) + ((:on-exit opts) 1 "comment-session-456") + (p/resolved {:session "comment-session-456" + :status :running}))] + (#'agent-command/process-sync-db-changes-event! + {:root-dir root + :base-url "http://127.0.0.1:1234" + :log-fn (fn [_] nil)} + {:repo "logseq_db_demo" + :graph "demo" + :agent-name "build-host" + :routing-blocks* (atom #{})} + {:tx-data [{:e 50 + :a :block/title + :v (:block/title request-comment)}]})) + (p/then (fn [_] + (is (fn? @start-on-exit*)) + (p/let [_ (p/delay 10)] + (is (some #(= [:thread-api/apply-outliner-ops + ["logseq_db_demo" + [[:toggle-reaction [comment-uuid "x" nil]]] + {}]] + %) + @calls))))) + (p/catch (fn [e] + (is false (str "unexpected error: " e)))) + (p/finally (fn [] + (fs/rmSync root #js {:recursive true :force true}) + (done))))))) + +(deftest test-agent-bridge-listener-ignores-comment-like-title-without-comment-tag + (async done + (let [handler* (atom nil) + calls (atom []) + non-comment (comment-block {:block/tags []})] + (-> (p/with-redefs [transport/connect-events! (fn [_cfg handler] + (reset! handler* handler) + {:close! (fn [] nil)}) + transport/invoke (fn [_cfg method args] + (swap! calls conj [method args]) + (case method + :thread-api/pull + (p/resolved non-comment) + (p/rejected (ex-info "unexpected invoke" + {:method method + :args args})))) + agent-command/start-codex! (fn [command _opts] + (swap! calls conj [:codex command]) + (p/resolved {:session "should-not-run" + :status :running}))] + (do + (#'agent-command/listen-forever! {:root-dir "/tmp/logseq" + :base-url "http://127.0.0.1:1234" + :log-fn (fn [_] nil)} + {:repo "logseq_db_demo" + :graph "demo" + :agent-name "build-host"}) + (@handler* :sync-db-changes {:tx-data [{:e 50 + :a :block/title + :v (:block/title non-comment)}]}) + (p/let [_ (p/delay 10)] + (is (not-any? #(= :codex (first %)) @calls))))) + (p/catch (fn [e] + (is false (str "unexpected error: " e)))) + (p/finally done))))) + (deftest test-codex-session-id-capture (testing "captures the first Codex JSONL session id" (is (= "session-123" @@ -192,7 +629,7 @@ (-> (agent-command/start-codex! ["codex" "exec" "--json" "prompt"] {}) (p/then (fn [result] (is (= {:session "thread-late" - :status :running} + :status :running} (select-keys result [:session :status]))))) (p/catch (fn [e] (is false (str "unexpected error: " e)))) @@ -638,16 +1075,16 @@ transport/invoke (fn [_cfg method args] (swap! calls conj [method args]) (case method - :thread-api/pull - (let [[_repo selector lookup] args] - (cond - (= lookup :logseq.property/assignee) - (p/resolved {:db/id 900 - :db/ident :logseq.property/assignee}) + :thread-api/pull + (let [[_repo selector lookup] args] + (cond + (= lookup :logseq.property/assignee) + (p/resolved {:db/id 900 + :db/ident :logseq.property/assignee}) - (= lookup 101) - (p/resolved {:db/id 101 - :block/title "build-host"}) + (= lookup 101) + (p/resolved {:db/id 101 + :block/title "build-host"}) (= lookup 42) (p/resolved block) diff --git a/src/test/logseq/db_worker/daemon_test.cljs b/src/test/logseq/db_worker/daemon_test.cljs index 0c256f741d..c6cd1b3cbc 100644 --- a/src/test/logseq/db_worker/daemon_test.cljs +++ b/src/test/logseq/db_worker/daemon_test.cljs @@ -36,6 +36,12 @@ (is (not-any? #{"/tmp/server-list"} (:args @captured))) (is (not-any? #{"--host" "--port"} (:args @captured))) (is (= true (get-in @captured [:opts :detached]))) + (let [stdio (get-in @captured [:opts :stdio])] + (is (vector? stdio)) + (is (= "ignore" (first stdio))) + (is (number? (second stdio))) + (is (number? (nth stdio 2)))) + (is (= "1" (get-in @captured [:opts :env :LOGSEQ_DB_WORKER_NODE_STDIO_REDIRECTED_TO_LOG]))) (is (= "1" (get-in @captured [:opts :env :ELECTRON_RUN_AS_NODE]))) (is (= true @unref-called?)) (finally