Files
logseq/cli-e2e/scripts/agent_bridge_e2e.py
2026-05-22 15:56:19 +08:00

808 lines
26 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
import argparse
import json
import os
import pathlib
import subprocess
import time
TASK_TITLE = "测试 agent bridge 功能把当前task status设置为done"
EXPECTED_SESSION = "thread-e2e-agent-bridge"
PARALLEL_TASK_TITLES = [
"测试 agent bridge 并行执行任务 1",
"测试 agent bridge 并行执行任务 2",
]
def write_fake_codex(fake_bin):
fake_codex = fake_bin / "codex"
fake_codex.parent.mkdir(parents=True, exist_ok=True)
fake_codex.write_text(
"""#!/usr/bin/env python3
import json
import os
import pathlib
import re
import sys
import time
args = sys.argv[1:]
if args == ["--version"]:
print("codex-cli 0.0.0-e2e")
sys.exit(0)
if args[:5] == ["--sandbox", "danger-full-access", "exec", "--json", "--skip-git-repo-check"]:
prompt = args[5] if len(args) > 5 else ""
block_uuid_match = re.search(r"^Block UUID: (.+)$", prompt, re.MULTILINE)
block_uuid = block_uuid_match.group(1) if block_uuid_match else None
log_path = pathlib.Path(os.environ["CODEX_FAKE_LOG"])
log_path.parent.mkdir(parents=True, exist_ok=True)
with log_path.open("a", encoding="utf8") as f:
f.write(json.dumps(
{"event": "start", "time": time.time(), "args": args, "prompt": prompt, "block_uuid": block_uuid},
ensure_ascii=False) + "\\n")
delay = float(os.environ.get("CODEX_FAKE_DELAY_SECONDS", "0"))
if delay > 0:
time.sleep(delay)
if os.environ.get("CODEX_FAKE_SESSION_BY_UUID") == "1" and block_uuid:
session_id = "thread-e2e-agent-bridge-" + re.sub(r"[^A-Za-z0-9]", "", block_uuid)[-12:]
else:
session_id = "thread-e2e-agent-bridge"
with log_path.open("a", encoding="utf8") as f:
f.write(json.dumps(
{"event": "session", "time": time.time(), "session": session_id, "block_uuid": block_uuid},
ensure_ascii=False) + "\\n")
print(json.dumps({"type": "thread.started", "thread_id": session_id}), flush=True)
sys.exit(0)
if args[:6] == ["--sandbox", "danger-full-access", "exec", "resume", "--json", "--skip-git-repo-check"]:
session_id = args[6] if len(args) > 6 else ""
prompt = args[7] if len(args) > 7 else ""
comment_uuid_match = re.search(r"^Comment UUID: (.+)$", prompt, re.MULTILINE)
comment_uuid = comment_uuid_match.group(1) if comment_uuid_match else None
log_path = pathlib.Path(os.environ["CODEX_FAKE_LOG"])
log_path.parent.mkdir(parents=True, exist_ok=True)
with log_path.open("a", encoding="utf8") as f:
f.write(json.dumps(
{"event": "resume", "time": time.time(), "args": args, "prompt": prompt, "comment_uuid": comment_uuid, "session": session_id},
ensure_ascii=False) + "\\n")
print(json.dumps({"type": "thread.started", "thread_id": session_id}), flush=True)
sys.exit(0)
print("unexpected codex args: " + repr(args), file=sys.stderr)
sys.exit(2)
""",
encoding="utf8",
)
fake_codex.chmod(0o755)
def run_json(cli, repo_root, root_dir, config, graph, query):
result = subprocess.run(
cli
+ [
"--root-dir",
root_dir,
"--config",
config,
"--output",
"json",
"query",
"--graph",
graph,
"--query",
query,
],
cwd=repo_root,
text=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
if result.returncode != 0:
return None
return json.loads(result.stdout).get("data", {}).get("result")
def run_cli(cli, repo_root, root_dir, config, graph, extra_args):
result = subprocess.run(
cli
+ [
"--root-dir",
root_dir,
"--config",
config,
"--output",
"json",
]
+ extra_args,
cwd=repo_root,
text=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
if result.returncode != 0:
raise SystemExit(
"CLI command failed: {}\nstdout:\n{}\nstderr:\n{}".format(
" ".join(extra_args), result.stdout, result.stderr
)
)
return json.loads(result.stdout) if result.stdout.strip() else None
def run_cli_with_env(cli, repo_root, root_dir, config, extra_args, env):
result = subprocess.run(
cli
+ [
"--root-dir",
root_dir,
"--config",
config,
"--output",
"json",
]
+ extra_args,
cwd=repo_root,
env=env,
text=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
return result
def deref_session_value(cli, repo_root, root_dir, config, graph, value):
if not isinstance(value, int):
return value
return run_json(
cli,
repo_root,
root_dir,
config,
graph,
f"[:find ?title . :where [{value} :block/title ?title]]",
)
def read_text(path):
return path.read_text(encoding="utf8", errors="replace")
def wait_for_log(path, text, process):
deadline = time.time() + 30
while time.time() < deadline:
if path.exists() and text in read_text(path):
return
if process.poll() is not None:
raise SystemExit(
"agent bridge exited before {}\nstdout:\n{}".format(
text, read_text(path)
)
)
time.sleep(0.2)
raise SystemExit("agent bridge did not log {!r}\nstdout:\n{}".format(text, read_text(path)))
def find_task_id(cli, repo_root, root_dir, config, graph, title):
task_id = run_json(
cli,
repo_root,
root_dir,
config,
graph,
'[:find ?e . :where [?e :block/title "{}"]]'.format(title),
)
if task_id is None:
raise SystemExit("task block was not found: {}".format(title))
return task_id
def find_task_uuid(cli, repo_root, root_dir, config, graph, title):
task_uuid = run_json(
cli,
repo_root,
root_dir,
config,
graph,
'[:find ?uuid . :where [?e :block/title "{}"] [?e :block/uuid ?uuid]]'.format(title),
)
if task_uuid is None:
raise SystemExit("task block uuid was not found: {}".format(title))
return task_uuid
def create_task(cli, repo_root, root_dir, config, graph, title):
run_cli(
cli,
repo_root,
root_dir,
config,
graph,
[
"upsert",
"task",
"--graph",
graph,
"--target-page",
"AgentBridgeE2E",
"--content",
title,
"--status",
"todo",
],
)
def assign_task(cli, repo_root, root_dir, config, graph, title=TASK_TITLE):
task_id = find_task_id(cli, repo_root, root_dir, config, graph, title)
run_cli(
cli,
repo_root,
root_dir,
config,
graph,
[
"upsert",
"block",
"--graph",
graph,
"--id",
str(task_id),
"--update-properties",
'{{"Assignee" "{}"}}'.format(os.uname().nodename),
],
)
def read_codex_events(codex_log):
if not codex_log.exists():
return []
return [
json.loads(line)
for line in codex_log.read_text(encoding="utf8").splitlines()
if line.strip()
]
def session_query_for_title(title):
return (
'[:find ?session . :where [?e :block/title "{}"] '
"[?e :logseq.property.agent/session-id ?session]]"
).format(title)
def wait_for_task_sessions(cli, repo_root, root_dir, config, graph, titles, bridge, bridge_log, bridge_err):
deadline = time.time() + 45
sessions = {}
while time.time() < deadline:
sessions = {
title: deref_session_value(
cli,
repo_root,
root_dir,
config,
graph,
run_json(cli, repo_root, root_dir, config, graph, session_query_for_title(title)),
)
for title in titles
}
if all(sessions.values()):
return sessions
if bridge.poll() is not None:
raise SystemExit(
"agent bridge exited early with {}\nstdout:\n{}\nstderr:\n{}".format(
bridge.returncode, read_text(bridge_log), read_text(bridge_err)
)
)
time.sleep(0.5)
raise SystemExit(
"agent-session-id was not written for every task; last sessions={!r}\nstdout:\n{}\nstderr:\n{}".format(
sessions, read_text(bridge_log), read_text(bridge_err)
)
)
def wait_for_codex_event(codex_log, event_name, bridge, bridge_log, bridge_err):
deadline = time.time() + 30
events = []
while time.time() < deadline:
events = read_codex_events(codex_log)
matches = [event for event in events if event.get("event") == event_name]
if matches:
return matches
if bridge.poll() is not None:
raise SystemExit(
"agent bridge exited early with {}\nstdout:\n{}\nstderr:\n{}".format(
bridge.returncode, read_text(bridge_log), read_text(bridge_err)
)
)
time.sleep(0.5)
raise SystemExit("codex event {!r} was not observed; events={!r}".format(event_name, events))
def write_comment_blocks_file(path, task_uuid, hostname):
path.write_text(
"""[{{:block/title "Comments"
:block/tags [:logseq.class/Comments]
:logseq.property.comments/blocks [[:block/uuid #uuid "{task_uuid}"]]
:block/children [{{:block/title "[[{hostname}]] please continue from the comment"
:block/tags [:logseq.class/Comment]}}]}}]""".format(
task_uuid=task_uuid,
hostname=hostname,
),
encoding="utf8",
)
def run_comment_mention_check(cli, repo_root, root_dir, config, graph, tmp_dir):
create_task(cli, repo_root, root_dir, config, graph, TASK_TITLE)
fake_bin = tmp_dir / "fake-bin"
bridge_log = tmp_dir / "agent-bridge.log"
bridge_err = tmp_dir / "agent-bridge.err"
codex_log = tmp_dir / "codex-invocations.jsonl"
env = os.environ.copy()
env["PATH"] = str(fake_bin) + os.pathsep + env.get("PATH", "")
env["CODEX_FAKE_LOG"] = str(codex_log)
with bridge_log.open("wb") as out, bridge_err.open("wb") as err:
bridge = subprocess.Popen(
cli
+ [
"--root-dir",
root_dir,
"--config",
config,
"--output",
"human",
"agent",
"bridge",
"--graph",
graph,
],
cwd=repo_root,
env=env,
stdout=out,
stderr=err,
)
try:
wait_for_log(bridge_log, "listening graph changes", bridge)
assign_task(cli, repo_root, root_dir, config, graph)
session = wait_for_task_sessions(
cli, repo_root, root_dir, config, graph, [TASK_TITLE], bridge, bridge_log, bridge_err
)[TASK_TITLE]
task_id = find_task_id(cli, repo_root, root_dir, config, graph, TASK_TITLE)
task_uuid = find_task_uuid(cli, repo_root, root_dir, config, graph, TASK_TITLE)
blocks_file = tmp_dir / "comment-blocks.edn"
write_comment_blocks_file(blocks_file, task_uuid, os.uname().nodename)
run_cli(
cli,
repo_root,
root_dir,
config,
graph,
[
"upsert",
"block",
"--graph",
graph,
"--target-id",
str(task_id),
"--pos",
"last-child",
"--blocks-file",
str(blocks_file),
],
)
resume_events = wait_for_codex_event(codex_log, "resume", bridge, bridge_log, bridge_err)
if resume_events[0].get("session") != session:
raise SystemExit("comment resumed the wrong session: {!r}".format(resume_events[0]))
prompt = resume_events[0].get("prompt", "")
assert "You are handling a Logseq AgentBridge comment request." in prompt, prompt
assert "Comment target context:" in prompt, prompt
assert "Requesting comment:" in prompt, prompt
print("agent bridge resumed comment request in " + session)
finally:
if bridge.poll() is None:
bridge.terminate()
try:
bridge.wait(timeout=5)
except subprocess.TimeoutExpired:
bridge.kill()
bridge.wait(timeout=5)
def run_parallel_assignment_check(cli, repo_root, root_dir, config, graph, tmp_dir):
for title in PARALLEL_TASK_TITLES:
create_task(cli, repo_root, root_dir, config, graph, title)
fake_bin = tmp_dir / "fake-bin"
bridge_log = tmp_dir / "agent-bridge.log"
bridge_err = tmp_dir / "agent-bridge.err"
codex_log = tmp_dir / "codex-invocations.jsonl"
env = os.environ.copy()
env["PATH"] = str(fake_bin) + os.pathsep + env.get("PATH", "")
env["CODEX_FAKE_LOG"] = str(codex_log)
env["CODEX_FAKE_DELAY_SECONDS"] = "5"
env["CODEX_FAKE_SESSION_BY_UUID"] = "1"
with bridge_log.open("wb") as out, bridge_err.open("wb") as err:
bridge = subprocess.Popen(
cli
+ [
"--root-dir",
root_dir,
"--config",
config,
"--output",
"human",
"agent",
"bridge",
"--graph",
graph,
],
cwd=repo_root,
env=env,
stdout=out,
stderr=err,
)
try:
wait_for_log(bridge_log, "listening graph changes", bridge)
for title in PARALLEL_TASK_TITLES:
assign_task(cli, repo_root, root_dir, config, graph, title)
sessions = wait_for_task_sessions(
cli, repo_root, root_dir, config, graph, PARALLEL_TASK_TITLES, bridge, bridge_log, bridge_err
)
events = read_codex_events(codex_log)
start_events = [event for event in events if event.get("event") == "start"]
session_events = [event for event in events if event.get("event") == "session"]
if len(start_events) != 2 or len(session_events) != 2:
raise SystemExit("expected two codex starts and sessions, got: {!r}".format(events))
first_session_time = min(event["time"] for event in session_events)
latest_start_time = max(event["time"] for event in start_events)
if latest_start_time >= first_session_time:
raise SystemExit(
"codex exec did not overlap; starts={!r}, sessions={!r}".format(
[event["time"] for event in start_events],
[event["time"] for event in session_events],
)
)
prompts = "\n".join(event["prompt"] for event in start_events)
for title in PARALLEL_TASK_TITLES:
assert title in prompts, prompts
print("agent bridge routed tasks concurrently: " + ", ".join(sorted(sessions.values())))
finally:
if bridge.poll() is None:
bridge.terminate()
try:
bridge.wait(timeout=5)
except subprocess.TimeoutExpired:
bridge.kill()
bridge.wait(timeout=5)
def edn_string(value):
return json.dumps(value, ensure_ascii=False)
def write_task_template_file(path, marker):
template = "\n".join(
[
marker,
"Graph: {{graph}}",
"Block UUID: {{block-uuid}}",
"AgentBridge name: {{agent-name}}",
"{{task-block-tree}}",
]
)
path.write_text(
"""[{:block/title "Task prompt template"
:block/children [{:block/title "Description: Custom task template for this graph."}
{:block/title "Template variables: {{graph}}, {{block-uuid}}, {{agent-name}}, and {{task-block-tree}}."}
{:block/title %s}]}]"""
% edn_string("```text\n" + template + "\n```"),
encoding="utf8",
)
def write_invalid_task_template_file(path):
path.write_text(
"""[{:block/title "Task prompt template"
:block/children [{:block/title "Description: Invalid task template for lint coverage."}
{:block/title %s}]}]"""
% edn_string("```text\nBroken {{graph}} {{unknown-var}}\n```"),
encoding="utf8",
)
def install_task_template(cli, repo_root, root_dir, config, graph, tmp_dir, marker):
run_cli(
cli,
repo_root,
root_dir,
config,
graph,
["upsert", "page", "--graph", graph, "--page", "AgentBridge"],
)
blocks_file = tmp_dir / ("task-template-" + graph + ".edn")
write_task_template_file(blocks_file, marker)
run_cli(
cli,
repo_root,
root_dir,
config,
graph,
[
"upsert",
"block",
"--graph",
graph,
"--target-page",
"AgentBridge",
"--blocks-file",
str(blocks_file),
],
)
def install_invalid_task_template(cli, repo_root, root_dir, config, graph, tmp_dir):
run_cli(
cli,
repo_root,
root_dir,
config,
graph,
["upsert", "page", "--graph", graph, "--page", "AgentBridge"],
)
blocks_file = tmp_dir / ("invalid-task-template-" + graph + ".edn")
write_invalid_task_template_file(blocks_file)
run_cli(
cli,
repo_root,
root_dir,
config,
graph,
[
"upsert",
"block",
"--graph",
graph,
"--target-page",
"AgentBridge",
"--blocks-file",
str(blocks_file),
],
)
def dry_run_prompt(cli, repo_root, root_dir, config, graph, env):
result = run_cli_with_env(
cli,
repo_root,
root_dir,
config,
["agent", "bridge", "--graph", graph, "--dry-run"],
env,
)
if result.returncode != 0:
raise SystemExit(
"agent bridge dry-run failed for {}\nstdout:\n{}\nstderr:\n{}".format(
graph, result.stdout, result.stderr
)
)
payload = json.loads(result.stdout)
commands = payload.get("data", {}).get("commands", [])
if len(commands) != 1:
raise SystemExit("expected one dry-run command for {}, got {!r}".format(graph, commands))
command = commands[0].get("command", [])
if len(command) < 1:
raise SystemExit("dry-run command did not contain a prompt: {!r}".format(command))
return command[-1]
def assert_contains(text, expected):
if expected not in text:
raise SystemExit("expected {!r} in:\n{}".format(expected, text))
def assert_not_contains(text, unexpected):
if unexpected in text:
raise SystemExit("did not expect {!r} in:\n{}".format(unexpected, text))
def run_prompt_template_graph_check(cli, repo_root, root_dir, config, graph, tmp_dir):
graph_a = graph
graph_b = graph + "-other"
graph_bad = graph + "-invalid"
marker_a = "CUSTOM GRAPH A TEMPLATE"
marker_b = "CUSTOM GRAPH B TEMPLATE"
fake_bin = tmp_dir / "fake-bin"
env = os.environ.copy()
env["PATH"] = str(fake_bin) + os.pathsep + env.get("PATH", "")
env["CODEX_FAKE_LOG"] = str(tmp_dir / "codex-dry-run.jsonl")
for graph_name in [graph_b, graph_bad]:
run_cli(
cli,
repo_root,
root_dir,
config,
graph_name,
["graph", "create", "--graph", graph_name],
)
for graph_name in [graph_a, graph_b, graph_bad]:
create_task(cli, repo_root, root_dir, config, graph_name, TASK_TITLE)
assign_task(cli, repo_root, root_dir, config, graph_name)
install_task_template(cli, repo_root, root_dir, config, graph_a, tmp_dir, marker_a)
prompt_a = dry_run_prompt(cli, repo_root, root_dir, config, graph_a, env)
assert_contains(prompt_a, marker_a)
assert_contains(prompt_a, "Graph: " + graph_a)
assert_not_contains(prompt_a, marker_b)
assert_not_contains(prompt_a, "You are handling a Logseq AgentBridge task.")
prompt_b_default = dry_run_prompt(cli, repo_root, root_dir, config, graph_b, env)
assert_contains(prompt_b_default, "You are handling a Logseq AgentBridge task.")
assert_contains(prompt_b_default, "Graph: " + graph_b)
assert_not_contains(prompt_b_default, marker_a)
install_task_template(cli, repo_root, root_dir, config, graph_b, tmp_dir, marker_b)
prompt_b = dry_run_prompt(cli, repo_root, root_dir, config, graph_b, env)
assert_contains(prompt_b, marker_b)
assert_contains(prompt_b, "Graph: " + graph_b)
assert_not_contains(prompt_b, marker_a)
assert_not_contains(prompt_b, "You are handling a Logseq AgentBridge task.")
prompt_a_again = dry_run_prompt(cli, repo_root, root_dir, config, graph_a, env)
assert_contains(prompt_a_again, marker_a)
assert_not_contains(prompt_a_again, marker_b)
install_invalid_task_template(cli, repo_root, root_dir, config, graph_bad, tmp_dir)
bad_result = run_cli_with_env(
cli,
repo_root,
root_dir,
config,
["agent", "bridge", "--graph", graph_bad, "--dry-run"],
env,
)
if bad_result.returncode == 0:
raise SystemExit("invalid graph prompt template unexpectedly passed:\n" + bad_result.stdout)
assert_contains(bad_result.stdout + bad_result.stderr, "agent-prompt-template-invalid")
print("agent bridge graph prompt templates are isolated and linted")
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--cli", required=True)
parser.add_argument("--root-dir", required=True)
parser.add_argument("--config", required=True)
parser.add_argument("--graph", required=True)
parser.add_argument("--tmp-dir", required=True)
parser.add_argument("--repo-root", required=True)
parser.add_argument("--prepare-fake-codex-only", action="store_true")
parser.add_argument("--assign-after-start", action="store_true")
parser.add_argument("--parallel-assignment-check", action="store_true")
parser.add_argument("--comment-mention-check", action="store_true")
parser.add_argument("--prompt-template-graph-check", action="store_true")
args = parser.parse_args()
repo_root = pathlib.Path(args.repo_root)
tmp_dir = pathlib.Path(args.tmp_dir)
fake_bin = tmp_dir / "fake-bin"
bridge_log = tmp_dir / "agent-bridge.log"
bridge_err = tmp_dir / "agent-bridge.err"
codex_log = tmp_dir / "codex-invocations.jsonl"
cli = ["node", args.cli]
write_fake_codex(fake_bin)
if args.prepare_fake_codex_only:
return
if args.parallel_assignment_check:
run_parallel_assignment_check(cli, repo_root, args.root_dir, args.config, args.graph, tmp_dir)
return
if args.comment_mention_check:
run_comment_mention_check(cli, repo_root, args.root_dir, args.config, args.graph, tmp_dir)
return
if args.prompt_template_graph_check:
run_prompt_template_graph_check(cli, repo_root, args.root_dir, args.config, args.graph, tmp_dir)
return
env = os.environ.copy()
env["PATH"] = str(fake_bin) + os.pathsep + env.get("PATH", "")
env["CODEX_FAKE_LOG"] = str(codex_log)
with bridge_log.open("wb") as out, bridge_err.open("wb") as err:
bridge = subprocess.Popen(
cli
+ [
"--root-dir",
args.root_dir,
"--config",
args.config,
"--output",
"human",
"agent",
"bridge",
"--graph",
args.graph,
],
cwd=repo_root,
env=env,
stdout=out,
stderr=err,
)
try:
if args.assign_after_start:
wait_for_log(bridge_log, "listening graph changes", bridge)
assign_task(cli, repo_root, args.root_dir, args.config, args.graph)
deadline = time.time() + 30
session = None
query = (
'[:find ?session . :where [?e :block/title "{}"] '
"[?e :logseq.property.agent/session-id ?session]]"
).format(TASK_TITLE)
while time.time() < deadline:
session = deref_session_value(
cli,
repo_root,
args.root_dir,
args.config,
args.graph,
run_json(cli, repo_root, args.root_dir, args.config, args.graph, query),
)
if session == EXPECTED_SESSION:
break
if bridge.poll() is not None:
raise SystemExit(
"agent bridge exited early with {}\nstdout:\n{}\nstderr:\n{}".format(
bridge.returncode, read_text(bridge_log), read_text(bridge_err)
)
)
time.sleep(0.5)
else:
raise SystemExit(
"agent-session-id was not written; last session={!r}\nstdout:\n{}\nstderr:\n{}".format(
session, read_text(bridge_log), read_text(bridge_err)
)
)
start_events = [event for event in read_codex_events(codex_log) if event.get("event") == "start"]
assert len(start_events) == 1, start_events
prompt = start_events[0]["prompt"]
assert TASK_TITLE in prompt, prompt
assert "Graph: " + args.graph in prompt, prompt
assert "Block UUID:" in prompt, prompt
print("agent bridge routed task to " + session)
finally:
if bridge.poll() is None:
bridge.terminate()
try:
bridge.wait(timeout=5)
except subprocess.TimeoutExpired:
bridge.kill()
bridge.wait(timeout=5)
if __name__ == "__main__":
main()