mirror of
https://github.com/logseq/logseq.git
synced 2026-05-24 12:44:22 +00:00
fix agent bridge concurrent codex exec routing
This commit is contained in:
@@ -10,6 +10,10 @@ import time
|
||||
|
||||
TASK_TITLE = "测试 agent bridge 功能,把当前task status设置为done"
|
||||
EXPECTED_SESSION = "thread-e2e-agent-bridge"
|
||||
PARALLEL_TASK_TITLES = [
|
||||
"测试 agent bridge 并行执行任务 1",
|
||||
"测试 agent bridge 并行执行任务 2",
|
||||
]
|
||||
|
||||
|
||||
def write_fake_codex(fake_bin):
|
||||
@@ -20,7 +24,9 @@ def write_fake_codex(fake_bin):
|
||||
import json
|
||||
import os
|
||||
import pathlib
|
||||
import re
|
||||
import sys
|
||||
import time
|
||||
|
||||
args = sys.argv[1:]
|
||||
if args == ["--version"]:
|
||||
@@ -29,11 +35,26 @@ if args == ["--version"]:
|
||||
|
||||
if args[:2] == ["exec", "--json"]:
|
||||
prompt = args[2] if len(args) > 2 else ""
|
||||
block_uuid_match = re.search(r"^Block UUID: (.+)$", prompt, re.MULTILINE)
|
||||
block_uuid = block_uuid_match.group(1) if block_uuid_match else None
|
||||
log_path = pathlib.Path(os.environ["CODEX_FAKE_LOG"])
|
||||
log_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
with log_path.open("a", encoding="utf8") as f:
|
||||
f.write(json.dumps({"args": args, "prompt": prompt}, ensure_ascii=False) + "\\n")
|
||||
print(json.dumps({"type": "thread.started", "thread_id": "thread-e2e-agent-bridge"}), flush=True)
|
||||
f.write(json.dumps(
|
||||
{"event": "start", "time": time.time(), "args": args, "prompt": prompt, "block_uuid": block_uuid},
|
||||
ensure_ascii=False) + "\\n")
|
||||
delay = float(os.environ.get("CODEX_FAKE_DELAY_SECONDS", "0"))
|
||||
if delay > 0:
|
||||
time.sleep(delay)
|
||||
if os.environ.get("CODEX_FAKE_SESSION_BY_UUID") == "1" and block_uuid:
|
||||
session_id = "thread-e2e-agent-bridge-" + re.sub(r"[^A-Za-z0-9]", "", block_uuid)[-12:]
|
||||
else:
|
||||
session_id = "thread-e2e-agent-bridge"
|
||||
with log_path.open("a", encoding="utf8") as f:
|
||||
f.write(json.dumps(
|
||||
{"event": "session", "time": time.time(), "session": session_id, "block_uuid": block_uuid},
|
||||
ensure_ascii=False) + "\\n")
|
||||
print(json.dumps({"type": "thread.started", "thread_id": session_id}), flush=True)
|
||||
sys.exit(0)
|
||||
|
||||
print("unexpected codex args: " + repr(args), file=sys.stderr)
|
||||
@@ -128,17 +149,44 @@ def wait_for_log(path, text, process):
|
||||
raise SystemExit("agent bridge did not log {!r}\nstdout:\n{}".format(text, read_text(path)))
|
||||
|
||||
|
||||
def assign_task(cli, repo_root, root_dir, config, graph):
|
||||
def find_task_id(cli, repo_root, root_dir, config, graph, title):
|
||||
task_id = run_json(
|
||||
cli,
|
||||
repo_root,
|
||||
root_dir,
|
||||
config,
|
||||
graph,
|
||||
'[:find ?e . :where [?e :block/title "{}"]]'.format(TASK_TITLE),
|
||||
'[:find ?e . :where [?e :block/title "{}"]]'.format(title),
|
||||
)
|
||||
if task_id is None:
|
||||
raise SystemExit("task block was not found")
|
||||
raise SystemExit("task block was not found: {}".format(title))
|
||||
return task_id
|
||||
|
||||
|
||||
def create_task(cli, repo_root, root_dir, config, graph, title):
|
||||
run_cli(
|
||||
cli,
|
||||
repo_root,
|
||||
root_dir,
|
||||
config,
|
||||
graph,
|
||||
[
|
||||
"upsert",
|
||||
"task",
|
||||
"--graph",
|
||||
graph,
|
||||
"--target-page",
|
||||
"AgentBridgeE2E",
|
||||
"--content",
|
||||
title,
|
||||
"--status",
|
||||
"todo",
|
||||
],
|
||||
)
|
||||
|
||||
|
||||
def assign_task(cli, repo_root, root_dir, config, graph, title=TASK_TITLE):
|
||||
task_id = find_task_id(cli, repo_root, root_dir, config, graph, title)
|
||||
run_cli(
|
||||
cli,
|
||||
repo_root,
|
||||
@@ -158,6 +206,129 @@ def assign_task(cli, repo_root, root_dir, config, graph):
|
||||
)
|
||||
|
||||
|
||||
def read_codex_events(codex_log):
|
||||
if not codex_log.exists():
|
||||
return []
|
||||
return [
|
||||
json.loads(line)
|
||||
for line in codex_log.read_text(encoding="utf8").splitlines()
|
||||
if line.strip()
|
||||
]
|
||||
|
||||
|
||||
def session_query_for_title(title):
|
||||
return (
|
||||
'[:find ?session . :where [?e :block/title "{}"] '
|
||||
'[?p :block/name "agent-session-id"] '
|
||||
"[?p :db/ident ?attr] [?e ?attr ?session]]"
|
||||
).format(title)
|
||||
|
||||
|
||||
def wait_for_task_sessions(cli, repo_root, root_dir, config, graph, titles, bridge, bridge_log, bridge_err):
|
||||
deadline = time.time() + 45
|
||||
sessions = {}
|
||||
while time.time() < deadline:
|
||||
sessions = {
|
||||
title: deref_session_value(
|
||||
cli,
|
||||
repo_root,
|
||||
root_dir,
|
||||
config,
|
||||
graph,
|
||||
run_json(cli, repo_root, root_dir, config, graph, session_query_for_title(title)),
|
||||
)
|
||||
for title in titles
|
||||
}
|
||||
if all(sessions.values()):
|
||||
return sessions
|
||||
if bridge.poll() is not None:
|
||||
raise SystemExit(
|
||||
"agent bridge exited early with {}\nstdout:\n{}\nstderr:\n{}".format(
|
||||
bridge.returncode, read_text(bridge_log), read_text(bridge_err)
|
||||
)
|
||||
)
|
||||
time.sleep(0.5)
|
||||
raise SystemExit(
|
||||
"agent-session-id was not written for every task; last sessions={!r}\nstdout:\n{}\nstderr:\n{}".format(
|
||||
sessions, read_text(bridge_log), read_text(bridge_err)
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
def run_parallel_assignment_check(cli, repo_root, root_dir, config, graph, tmp_dir):
|
||||
for title in PARALLEL_TASK_TITLES:
|
||||
create_task(cli, repo_root, root_dir, config, graph, title)
|
||||
|
||||
fake_bin = tmp_dir / "fake-bin"
|
||||
bridge_log = tmp_dir / "agent-bridge.log"
|
||||
bridge_err = tmp_dir / "agent-bridge.err"
|
||||
codex_log = tmp_dir / "codex-invocations.jsonl"
|
||||
|
||||
env = os.environ.copy()
|
||||
env["PATH"] = str(fake_bin) + os.pathsep + env.get("PATH", "")
|
||||
env["CODEX_FAKE_LOG"] = str(codex_log)
|
||||
env["CODEX_FAKE_DELAY_SECONDS"] = "5"
|
||||
env["CODEX_FAKE_SESSION_BY_UUID"] = "1"
|
||||
|
||||
with bridge_log.open("wb") as out, bridge_err.open("wb") as err:
|
||||
bridge = subprocess.Popen(
|
||||
cli
|
||||
+ [
|
||||
"--root-dir",
|
||||
root_dir,
|
||||
"--config",
|
||||
config,
|
||||
"--output",
|
||||
"human",
|
||||
"agent",
|
||||
"bridge",
|
||||
"--graph",
|
||||
graph,
|
||||
],
|
||||
cwd=repo_root,
|
||||
env=env,
|
||||
stdout=out,
|
||||
stderr=err,
|
||||
)
|
||||
|
||||
try:
|
||||
wait_for_log(bridge_log, "listening graph changes", bridge)
|
||||
for title in PARALLEL_TASK_TITLES:
|
||||
assign_task(cli, repo_root, root_dir, config, graph, title)
|
||||
|
||||
sessions = wait_for_task_sessions(
|
||||
cli, repo_root, root_dir, config, graph, PARALLEL_TASK_TITLES, bridge, bridge_log, bridge_err
|
||||
)
|
||||
events = read_codex_events(codex_log)
|
||||
start_events = [event for event in events if event.get("event") == "start"]
|
||||
session_events = [event for event in events if event.get("event") == "session"]
|
||||
if len(start_events) != 2 or len(session_events) != 2:
|
||||
raise SystemExit("expected two codex starts and sessions, got: {!r}".format(events))
|
||||
|
||||
first_session_time = min(event["time"] for event in session_events)
|
||||
latest_start_time = max(event["time"] for event in start_events)
|
||||
if latest_start_time >= first_session_time:
|
||||
raise SystemExit(
|
||||
"codex exec did not overlap; starts={!r}, sessions={!r}".format(
|
||||
[event["time"] for event in start_events],
|
||||
[event["time"] for event in session_events],
|
||||
)
|
||||
)
|
||||
|
||||
prompts = "\n".join(event["prompt"] for event in start_events)
|
||||
for title in PARALLEL_TASK_TITLES:
|
||||
assert title in prompts, prompts
|
||||
print("agent bridge routed tasks concurrently: " + ", ".join(sorted(sessions.values())))
|
||||
finally:
|
||||
if bridge.poll() is None:
|
||||
bridge.terminate()
|
||||
try:
|
||||
bridge.wait(timeout=5)
|
||||
except subprocess.TimeoutExpired:
|
||||
bridge.kill()
|
||||
bridge.wait(timeout=5)
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument("--cli", required=True)
|
||||
@@ -168,6 +339,7 @@ def main():
|
||||
parser.add_argument("--repo-root", required=True)
|
||||
parser.add_argument("--prepare-fake-codex-only", action="store_true")
|
||||
parser.add_argument("--assign-after-start", action="store_true")
|
||||
parser.add_argument("--parallel-assignment-check", action="store_true")
|
||||
args = parser.parse_args()
|
||||
|
||||
repo_root = pathlib.Path(args.repo_root)
|
||||
@@ -182,6 +354,10 @@ def main():
|
||||
if args.prepare_fake_codex_only:
|
||||
return
|
||||
|
||||
if args.parallel_assignment_check:
|
||||
run_parallel_assignment_check(cli, repo_root, args.root_dir, args.config, args.graph, tmp_dir)
|
||||
return
|
||||
|
||||
env = os.environ.copy()
|
||||
env["PATH"] = str(fake_bin) + os.pathsep + env.get("PATH", "")
|
||||
env["CODEX_FAKE_LOG"] = str(codex_log)
|
||||
@@ -245,13 +421,9 @@ def main():
|
||||
)
|
||||
)
|
||||
|
||||
lines = [
|
||||
json.loads(line)
|
||||
for line in codex_log.read_text(encoding="utf8").splitlines()
|
||||
if line.strip()
|
||||
]
|
||||
assert len(lines) == 1, lines
|
||||
prompt = lines[0]["prompt"]
|
||||
start_events = [event for event in read_codex_events(codex_log) if event.get("event") == "start"]
|
||||
assert len(start_events) == 1, start_events
|
||||
prompt = start_events[0]["prompt"]
|
||||
assert TASK_TITLE in prompt, prompt
|
||||
assert "Graph: " + args.graph in prompt, prompt
|
||||
assert "Block UUID:" in prompt, prompt
|
||||
|
||||
@@ -1322,6 +1322,19 @@ PY"
|
||||
["{{cli}} --root-dir {{root-dir-arg}} --config {{config-path-arg}} --output json server stop --graph {{graph-arg}}"],
|
||||
:tags [:agent],
|
||||
:extends :non-sync/graph-json-env}
|
||||
{:id "agent-bridge-routes-assigned-tasks-concurrently",
|
||||
:cmds
|
||||
["python3 '{{repo-root}}/cli-e2e/scripts/agent_bridge_e2e.py' --cli '{{repo-root}}/static/logseq-cli.js' --root-dir '{{root-dir}}' --config '{{config-path}}' --graph '{{graph}}' --tmp-dir '{{tmp-dir}}' --repo-root '{{repo-root}}' --parallel-assignment-check"],
|
||||
:expect {:exit 0, :stdout-contains ["agent bridge routed tasks concurrently:"]},
|
||||
:covers
|
||||
{:commands ["agent bridge"],
|
||||
:options
|
||||
{:global ["--config" "--graph" "--root-dir" "--output"],
|
||||
:agent []}},
|
||||
:cleanup
|
||||
["{{cli}} --root-dir {{root-dir-arg}} --config {{config-path-arg}} --output json server stop --graph {{graph-arg}}"],
|
||||
:tags [:agent],
|
||||
:extends :non-sync/graph-json-env}
|
||||
{:id "agent-bridge-demo-script",
|
||||
:cmds
|
||||
["bash '{{repo-root}}/cli-e2e/scripts/agent_bridge_demo.sh' --cli '{{repo-root}}/static/logseq-cli.js' --root-dir '{{tmp-dir}}/demo-root' --graph cli-e2e-agent-bridge-demo --repo-root '{{repo-root}}'"],
|
||||
|
||||
@@ -601,13 +601,38 @@
|
||||
:backend :codex
|
||||
:preview preview})))
|
||||
|
||||
(defn- claim-routing-block!
|
||||
[routing-blocks* block-id]
|
||||
(loop []
|
||||
(let [routing-blocks @routing-blocks*]
|
||||
(cond
|
||||
(contains? routing-blocks block-id)
|
||||
false
|
||||
|
||||
(compare-and-set! routing-blocks* routing-blocks (conj routing-blocks block-id))
|
||||
true
|
||||
|
||||
:else
|
||||
(recur)))))
|
||||
|
||||
(defn- route-task-once!
|
||||
[cfg {:keys [routing-blocks*] :as opts} {:keys [block] :as task}]
|
||||
(let [block-id (block-uuid-str block)]
|
||||
(if (and routing-blocks* block-id)
|
||||
(if (claim-routing-block! routing-blocks* block-id)
|
||||
(-> (route-task! cfg opts task)
|
||||
(p/finally (fn []
|
||||
(swap! routing-blocks* disj block-id))))
|
||||
(p/resolved nil))
|
||||
(route-task! cfg opts task))))
|
||||
|
||||
(defn- process-tasks!
|
||||
[cfg {:keys [repo graph agent-name]}]
|
||||
(p/let [tasks (list-routable-tasks cfg repo agent-name)]
|
||||
(p/all (mapv #(route-task! cfg {:repo repo
|
||||
:graph graph
|
||||
:agent-name agent-name}
|
||||
%)
|
||||
(p/all (mapv #(route-task-once! cfg {:repo repo
|
||||
:graph graph
|
||||
:agent-name agent-name}
|
||||
%)
|
||||
tasks))))
|
||||
|
||||
(def ^:private assignee-property-ident :logseq.property/assignee)
|
||||
@@ -731,8 +756,8 @@
|
||||
(p/let [block (pull-task-block cfg repo block-id)]
|
||||
(when (routable-task? block agent-name)
|
||||
(p/let [tree-text (show-task-tree cfg repo block)]
|
||||
(route-task! cfg opts {:block block
|
||||
:tree-text tree-text})))))))))
|
||||
(route-task-once! cfg opts {:block block
|
||||
:tree-text tree-text})))))))))
|
||||
|
||||
(defn- process-sync-db-changes-event!
|
||||
[cfg {:keys [repo] :as opts} {:keys [tx-data]}]
|
||||
@@ -742,28 +767,28 @@
|
||||
|
||||
(defn- listen-forever!
|
||||
[cfg {:keys [repo graph agent-name]}]
|
||||
(let [processing* (atom (p/resolved nil))
|
||||
(let [routing-blocks* (atom #{})
|
||||
handle-error! (fn [e]
|
||||
(emit-log! cfg (log-line (str "Codex invocation failed: "
|
||||
(or (ex-message e) (str e)))))
|
||||
(log-bridge-exit! {:repo repo
|
||||
:graph graph
|
||||
:agent-name agent-name
|
||||
:reason :task-processing-failed
|
||||
:exit-code 1
|
||||
:error e})
|
||||
(.exit js/process 1))
|
||||
process! (fn [payload]
|
||||
(swap! processing*
|
||||
(fn [previous]
|
||||
(-> previous
|
||||
(p/catch (fn [_] nil))
|
||||
(p/then (fn [_]
|
||||
(process-sync-db-changes-event! cfg
|
||||
{:repo repo
|
||||
:graph graph
|
||||
:agent-name agent-name}
|
||||
payload)))
|
||||
(p/catch (fn [e]
|
||||
(emit-log! cfg (log-line (str "Codex invocation failed: "
|
||||
(or (ex-message e) (str e)))))
|
||||
(log-bridge-exit! {:repo repo
|
||||
:graph graph
|
||||
:agent-name agent-name
|
||||
:reason :task-processing-failed
|
||||
:exit-code 1
|
||||
:error e})
|
||||
(.exit js/process 1)))))))]
|
||||
(try
|
||||
(-> (process-sync-db-changes-event! cfg
|
||||
{:repo repo
|
||||
:graph graph
|
||||
:agent-name agent-name
|
||||
:routing-blocks* routing-blocks*}
|
||||
payload)
|
||||
(p/catch handle-error!))
|
||||
(catch :default e
|
||||
(handle-error! e))))]
|
||||
(transport/connect-events! cfg
|
||||
(fn [event-type payload]
|
||||
(when (= :sync-db-changes event-type)
|
||||
|
||||
Reference in New Issue
Block a user