diff --git a/cli-e2e/scripts/agent_bridge_e2e.py b/cli-e2e/scripts/agent_bridge_e2e.py index 1de0825740..2c8fec88d3 100644 --- a/cli-e2e/scripts/agent_bridge_e2e.py +++ b/cli-e2e/scripts/agent_bridge_e2e.py @@ -10,6 +10,10 @@ import time TASK_TITLE = "测试 agent bridge 功能,把当前task status设置为done" EXPECTED_SESSION = "thread-e2e-agent-bridge" +PARALLEL_TASK_TITLES = [ + "测试 agent bridge 并行执行任务 1", + "测试 agent bridge 并行执行任务 2", +] def write_fake_codex(fake_bin): @@ -20,7 +24,9 @@ def write_fake_codex(fake_bin): import json import os import pathlib +import re import sys +import time args = sys.argv[1:] if args == ["--version"]: @@ -29,11 +35,26 @@ if args == ["--version"]: if args[:2] == ["exec", "--json"]: prompt = args[2] if len(args) > 2 else "" + block_uuid_match = re.search(r"^Block UUID: (.+)$", prompt, re.MULTILINE) + block_uuid = block_uuid_match.group(1) if block_uuid_match else None log_path = pathlib.Path(os.environ["CODEX_FAKE_LOG"]) log_path.parent.mkdir(parents=True, exist_ok=True) with log_path.open("a", encoding="utf8") as f: - f.write(json.dumps({"args": args, "prompt": prompt}, ensure_ascii=False) + "\\n") - print(json.dumps({"type": "thread.started", "thread_id": "thread-e2e-agent-bridge"}), flush=True) + f.write(json.dumps( + {"event": "start", "time": time.time(), "args": args, "prompt": prompt, "block_uuid": block_uuid}, + ensure_ascii=False) + "\\n") + delay = float(os.environ.get("CODEX_FAKE_DELAY_SECONDS", "0")) + if delay > 0: + time.sleep(delay) + if os.environ.get("CODEX_FAKE_SESSION_BY_UUID") == "1" and block_uuid: + session_id = "thread-e2e-agent-bridge-" + re.sub(r"[^A-Za-z0-9]", "", block_uuid)[-12:] + else: + session_id = "thread-e2e-agent-bridge" + with log_path.open("a", encoding="utf8") as f: + f.write(json.dumps( + {"event": "session", "time": time.time(), "session": session_id, "block_uuid": block_uuid}, + ensure_ascii=False) + "\\n") + print(json.dumps({"type": "thread.started", "thread_id": session_id}), flush=True) sys.exit(0) print("unexpected codex args: " + repr(args), file=sys.stderr) @@ -128,17 +149,44 @@ def wait_for_log(path, text, process): raise SystemExit("agent bridge did not log {!r}\nstdout:\n{}".format(text, read_text(path))) -def assign_task(cli, repo_root, root_dir, config, graph): +def find_task_id(cli, repo_root, root_dir, config, graph, title): task_id = run_json( cli, repo_root, root_dir, config, graph, - '[:find ?e . :where [?e :block/title "{}"]]'.format(TASK_TITLE), + '[:find ?e . :where [?e :block/title "{}"]]'.format(title), ) if task_id is None: - raise SystemExit("task block was not found") + raise SystemExit("task block was not found: {}".format(title)) + return task_id + + +def create_task(cli, repo_root, root_dir, config, graph, title): + run_cli( + cli, + repo_root, + root_dir, + config, + graph, + [ + "upsert", + "task", + "--graph", + graph, + "--target-page", + "AgentBridgeE2E", + "--content", + title, + "--status", + "todo", + ], + ) + + +def assign_task(cli, repo_root, root_dir, config, graph, title=TASK_TITLE): + task_id = find_task_id(cli, repo_root, root_dir, config, graph, title) run_cli( cli, repo_root, @@ -158,6 +206,129 @@ def assign_task(cli, repo_root, root_dir, config, graph): ) +def read_codex_events(codex_log): + if not codex_log.exists(): + return [] + return [ + json.loads(line) + for line in codex_log.read_text(encoding="utf8").splitlines() + if line.strip() + ] + + +def session_query_for_title(title): + return ( + '[:find ?session . :where [?e :block/title "{}"] ' + '[?p :block/name "agent-session-id"] ' + "[?p :db/ident ?attr] [?e ?attr ?session]]" + ).format(title) + + +def wait_for_task_sessions(cli, repo_root, root_dir, config, graph, titles, bridge, bridge_log, bridge_err): + deadline = time.time() + 45 + sessions = {} + while time.time() < deadline: + sessions = { + title: deref_session_value( + cli, + repo_root, + root_dir, + config, + graph, + run_json(cli, repo_root, root_dir, config, graph, session_query_for_title(title)), + ) + for title in titles + } + if all(sessions.values()): + return sessions + if bridge.poll() is not None: + raise SystemExit( + "agent bridge exited early with {}\nstdout:\n{}\nstderr:\n{}".format( + bridge.returncode, read_text(bridge_log), read_text(bridge_err) + ) + ) + time.sleep(0.5) + raise SystemExit( + "agent-session-id was not written for every task; last sessions={!r}\nstdout:\n{}\nstderr:\n{}".format( + sessions, read_text(bridge_log), read_text(bridge_err) + ) + ) + + +def run_parallel_assignment_check(cli, repo_root, root_dir, config, graph, tmp_dir): + for title in PARALLEL_TASK_TITLES: + create_task(cli, repo_root, root_dir, config, graph, title) + + fake_bin = tmp_dir / "fake-bin" + bridge_log = tmp_dir / "agent-bridge.log" + bridge_err = tmp_dir / "agent-bridge.err" + codex_log = tmp_dir / "codex-invocations.jsonl" + + env = os.environ.copy() + env["PATH"] = str(fake_bin) + os.pathsep + env.get("PATH", "") + env["CODEX_FAKE_LOG"] = str(codex_log) + env["CODEX_FAKE_DELAY_SECONDS"] = "5" + env["CODEX_FAKE_SESSION_BY_UUID"] = "1" + + with bridge_log.open("wb") as out, bridge_err.open("wb") as err: + bridge = subprocess.Popen( + cli + + [ + "--root-dir", + root_dir, + "--config", + config, + "--output", + "human", + "agent", + "bridge", + "--graph", + graph, + ], + cwd=repo_root, + env=env, + stdout=out, + stderr=err, + ) + + try: + wait_for_log(bridge_log, "listening graph changes", bridge) + for title in PARALLEL_TASK_TITLES: + assign_task(cli, repo_root, root_dir, config, graph, title) + + sessions = wait_for_task_sessions( + cli, repo_root, root_dir, config, graph, PARALLEL_TASK_TITLES, bridge, bridge_log, bridge_err + ) + events = read_codex_events(codex_log) + start_events = [event for event in events if event.get("event") == "start"] + session_events = [event for event in events if event.get("event") == "session"] + if len(start_events) != 2 or len(session_events) != 2: + raise SystemExit("expected two codex starts and sessions, got: {!r}".format(events)) + + first_session_time = min(event["time"] for event in session_events) + latest_start_time = max(event["time"] for event in start_events) + if latest_start_time >= first_session_time: + raise SystemExit( + "codex exec did not overlap; starts={!r}, sessions={!r}".format( + [event["time"] for event in start_events], + [event["time"] for event in session_events], + ) + ) + + prompts = "\n".join(event["prompt"] for event in start_events) + for title in PARALLEL_TASK_TITLES: + assert title in prompts, prompts + print("agent bridge routed tasks concurrently: " + ", ".join(sorted(sessions.values()))) + finally: + if bridge.poll() is None: + bridge.terminate() + try: + bridge.wait(timeout=5) + except subprocess.TimeoutExpired: + bridge.kill() + bridge.wait(timeout=5) + + def main(): parser = argparse.ArgumentParser() parser.add_argument("--cli", required=True) @@ -168,6 +339,7 @@ def main(): parser.add_argument("--repo-root", required=True) parser.add_argument("--prepare-fake-codex-only", action="store_true") parser.add_argument("--assign-after-start", action="store_true") + parser.add_argument("--parallel-assignment-check", action="store_true") args = parser.parse_args() repo_root = pathlib.Path(args.repo_root) @@ -182,6 +354,10 @@ def main(): if args.prepare_fake_codex_only: return + if args.parallel_assignment_check: + run_parallel_assignment_check(cli, repo_root, args.root_dir, args.config, args.graph, tmp_dir) + return + env = os.environ.copy() env["PATH"] = str(fake_bin) + os.pathsep + env.get("PATH", "") env["CODEX_FAKE_LOG"] = str(codex_log) @@ -245,13 +421,9 @@ def main(): ) ) - lines = [ - json.loads(line) - for line in codex_log.read_text(encoding="utf8").splitlines() - if line.strip() - ] - assert len(lines) == 1, lines - prompt = lines[0]["prompt"] + start_events = [event for event in read_codex_events(codex_log) if event.get("event") == "start"] + assert len(start_events) == 1, start_events + prompt = start_events[0]["prompt"] assert TASK_TITLE in prompt, prompt assert "Graph: " + args.graph in prompt, prompt assert "Block UUID:" in prompt, prompt diff --git a/cli-e2e/spec/non_sync_cases.edn b/cli-e2e/spec/non_sync_cases.edn index 281e7adfe1..4d50ae10bd 100644 --- a/cli-e2e/spec/non_sync_cases.edn +++ b/cli-e2e/spec/non_sync_cases.edn @@ -1322,6 +1322,19 @@ PY" ["{{cli}} --root-dir {{root-dir-arg}} --config {{config-path-arg}} --output json server stop --graph {{graph-arg}}"], :tags [:agent], :extends :non-sync/graph-json-env} + {:id "agent-bridge-routes-assigned-tasks-concurrently", + :cmds + ["python3 '{{repo-root}}/cli-e2e/scripts/agent_bridge_e2e.py' --cli '{{repo-root}}/static/logseq-cli.js' --root-dir '{{root-dir}}' --config '{{config-path}}' --graph '{{graph}}' --tmp-dir '{{tmp-dir}}' --repo-root '{{repo-root}}' --parallel-assignment-check"], + :expect {:exit 0, :stdout-contains ["agent bridge routed tasks concurrently:"]}, + :covers + {:commands ["agent bridge"], + :options + {:global ["--config" "--graph" "--root-dir" "--output"], + :agent []}}, + :cleanup + ["{{cli}} --root-dir {{root-dir-arg}} --config {{config-path-arg}} --output json server stop --graph {{graph-arg}}"], + :tags [:agent], + :extends :non-sync/graph-json-env} {:id "agent-bridge-demo-script", :cmds ["bash '{{repo-root}}/cli-e2e/scripts/agent_bridge_demo.sh' --cli '{{repo-root}}/static/logseq-cli.js' --root-dir '{{tmp-dir}}/demo-root' --graph cli-e2e-agent-bridge-demo --repo-root '{{repo-root}}'"], diff --git a/src/main/logseq/cli/command/agent.cljs b/src/main/logseq/cli/command/agent.cljs index d644cb23c5..c526abcdb0 100644 --- a/src/main/logseq/cli/command/agent.cljs +++ b/src/main/logseq/cli/command/agent.cljs @@ -601,13 +601,38 @@ :backend :codex :preview preview}))) +(defn- claim-routing-block! + [routing-blocks* block-id] + (loop [] + (let [routing-blocks @routing-blocks*] + (cond + (contains? routing-blocks block-id) + false + + (compare-and-set! routing-blocks* routing-blocks (conj routing-blocks block-id)) + true + + :else + (recur))))) + +(defn- route-task-once! + [cfg {:keys [routing-blocks*] :as opts} {:keys [block] :as task}] + (let [block-id (block-uuid-str block)] + (if (and routing-blocks* block-id) + (if (claim-routing-block! routing-blocks* block-id) + (-> (route-task! cfg opts task) + (p/finally (fn [] + (swap! routing-blocks* disj block-id)))) + (p/resolved nil)) + (route-task! cfg opts task)))) + (defn- process-tasks! [cfg {:keys [repo graph agent-name]}] (p/let [tasks (list-routable-tasks cfg repo agent-name)] - (p/all (mapv #(route-task! cfg {:repo repo - :graph graph - :agent-name agent-name} - %) + (p/all (mapv #(route-task-once! cfg {:repo repo + :graph graph + :agent-name agent-name} + %) tasks)))) (def ^:private assignee-property-ident :logseq.property/assignee) @@ -731,8 +756,8 @@ (p/let [block (pull-task-block cfg repo block-id)] (when (routable-task? block agent-name) (p/let [tree-text (show-task-tree cfg repo block)] - (route-task! cfg opts {:block block - :tree-text tree-text}))))))))) + (route-task-once! cfg opts {:block block + :tree-text tree-text}))))))))) (defn- process-sync-db-changes-event! [cfg {:keys [repo] :as opts} {:keys [tx-data]}] @@ -742,28 +767,28 @@ (defn- listen-forever! [cfg {:keys [repo graph agent-name]}] - (let [processing* (atom (p/resolved nil)) + (let [routing-blocks* (atom #{}) + handle-error! (fn [e] + (emit-log! cfg (log-line (str "Codex invocation failed: " + (or (ex-message e) (str e))))) + (log-bridge-exit! {:repo repo + :graph graph + :agent-name agent-name + :reason :task-processing-failed + :exit-code 1 + :error e}) + (.exit js/process 1)) process! (fn [payload] - (swap! processing* - (fn [previous] - (-> previous - (p/catch (fn [_] nil)) - (p/then (fn [_] - (process-sync-db-changes-event! cfg - {:repo repo - :graph graph - :agent-name agent-name} - payload))) - (p/catch (fn [e] - (emit-log! cfg (log-line (str "Codex invocation failed: " - (or (ex-message e) (str e))))) - (log-bridge-exit! {:repo repo - :graph graph - :agent-name agent-name - :reason :task-processing-failed - :exit-code 1 - :error e}) - (.exit js/process 1)))))))] + (try + (-> (process-sync-db-changes-event! cfg + {:repo repo + :graph graph + :agent-name agent-name + :routing-blocks* routing-blocks*} + payload) + (p/catch handle-error!)) + (catch :default e + (handle-error! e))))] (transport/connect-events! cfg (fn [event-type payload] (when (= :sync-db-changes event-type)