mirror of
https://github.com/logseq/logseq.git
synced 2026-05-29 23:19:38 +00:00
808 lines
26 KiB
Python
808 lines
26 KiB
Python
#!/usr/bin/env python3
|
|
|
|
import argparse
|
|
import json
|
|
import os
|
|
import pathlib
|
|
import subprocess
|
|
import time
|
|
|
|
|
|
TASK_TITLE = "Test AgentBridge task routing and completion status"
|
|
EXPECTED_SESSION = "thread-e2e-agent-bridge"
|
|
PARALLEL_TASK_TITLES = [
|
|
"测试 agent bridge 并行执行任务 1",
|
|
"测试 agent bridge 并行执行任务 2",
|
|
]
|
|
|
|
|
|
def write_fake_codex(fake_bin):
|
|
fake_codex = fake_bin / "codex"
|
|
fake_codex.parent.mkdir(parents=True, exist_ok=True)
|
|
fake_codex.write_text(
|
|
"""#!/usr/bin/env python3
|
|
import json
|
|
import os
|
|
import pathlib
|
|
import re
|
|
import sys
|
|
import time
|
|
|
|
args = sys.argv[1:]
|
|
if args == ["--version"]:
|
|
print("codex-cli 0.0.0-e2e")
|
|
sys.exit(0)
|
|
|
|
if args[:5] == ["--sandbox", "danger-full-access", "exec", "--json", "--skip-git-repo-check"]:
|
|
prompt = args[5] if len(args) > 5 else ""
|
|
block_uuid_match = re.search(r"^Block UUID: (.+)$", prompt, re.MULTILINE)
|
|
block_uuid = block_uuid_match.group(1) if block_uuid_match else None
|
|
log_path = pathlib.Path(os.environ["CODEX_FAKE_LOG"])
|
|
log_path.parent.mkdir(parents=True, exist_ok=True)
|
|
with log_path.open("a", encoding="utf8") as f:
|
|
f.write(json.dumps(
|
|
{"event": "start", "time": time.time(), "args": args, "prompt": prompt, "block_uuid": block_uuid},
|
|
ensure_ascii=False) + "\\n")
|
|
delay = float(os.environ.get("CODEX_FAKE_DELAY_SECONDS", "0"))
|
|
if delay > 0:
|
|
time.sleep(delay)
|
|
if os.environ.get("CODEX_FAKE_SESSION_BY_UUID") == "1" and block_uuid:
|
|
session_id = "thread-e2e-agent-bridge-" + re.sub(r"[^A-Za-z0-9]", "", block_uuid)[-12:]
|
|
else:
|
|
session_id = "thread-e2e-agent-bridge"
|
|
with log_path.open("a", encoding="utf8") as f:
|
|
f.write(json.dumps(
|
|
{"event": "session", "time": time.time(), "session": session_id, "block_uuid": block_uuid},
|
|
ensure_ascii=False) + "\\n")
|
|
print(json.dumps({"type": "thread.started", "thread_id": session_id}), flush=True)
|
|
sys.exit(0)
|
|
|
|
if args[:6] == ["--sandbox", "danger-full-access", "exec", "resume", "--json", "--skip-git-repo-check"]:
|
|
session_id = args[6] if len(args) > 6 else ""
|
|
prompt = args[7] if len(args) > 7 else ""
|
|
comment_uuid_match = re.search(r"^Comment UUID: (.+)$", prompt, re.MULTILINE)
|
|
comment_uuid = comment_uuid_match.group(1) if comment_uuid_match else None
|
|
log_path = pathlib.Path(os.environ["CODEX_FAKE_LOG"])
|
|
log_path.parent.mkdir(parents=True, exist_ok=True)
|
|
with log_path.open("a", encoding="utf8") as f:
|
|
f.write(json.dumps(
|
|
{"event": "resume", "time": time.time(), "args": args, "prompt": prompt, "comment_uuid": comment_uuid, "session": session_id},
|
|
ensure_ascii=False) + "\\n")
|
|
print(json.dumps({"type": "thread.started", "thread_id": session_id}), flush=True)
|
|
sys.exit(0)
|
|
|
|
print("unexpected codex args: " + repr(args), file=sys.stderr)
|
|
sys.exit(2)
|
|
""",
|
|
encoding="utf8",
|
|
)
|
|
fake_codex.chmod(0o755)
|
|
|
|
|
|
def run_json(cli, repo_root, root_dir, config, graph, query):
|
|
result = subprocess.run(
|
|
cli
|
|
+ [
|
|
"--root-dir",
|
|
root_dir,
|
|
"--config",
|
|
config,
|
|
"--output",
|
|
"json",
|
|
"query",
|
|
"--graph",
|
|
graph,
|
|
"--query",
|
|
query,
|
|
],
|
|
cwd=repo_root,
|
|
text=True,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
)
|
|
if result.returncode != 0:
|
|
return None
|
|
return json.loads(result.stdout).get("data", {}).get("result")
|
|
|
|
|
|
def run_cli(cli, repo_root, root_dir, config, graph, extra_args):
|
|
result = subprocess.run(
|
|
cli
|
|
+ [
|
|
"--root-dir",
|
|
root_dir,
|
|
"--config",
|
|
config,
|
|
"--output",
|
|
"json",
|
|
]
|
|
+ extra_args,
|
|
cwd=repo_root,
|
|
text=True,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
)
|
|
if result.returncode != 0:
|
|
raise SystemExit(
|
|
"CLI command failed: {}\nstdout:\n{}\nstderr:\n{}".format(
|
|
" ".join(extra_args), result.stdout, result.stderr
|
|
)
|
|
)
|
|
return json.loads(result.stdout) if result.stdout.strip() else None
|
|
|
|
|
|
def run_cli_with_env(cli, repo_root, root_dir, config, extra_args, env):
|
|
result = subprocess.run(
|
|
cli
|
|
+ [
|
|
"--root-dir",
|
|
root_dir,
|
|
"--config",
|
|
config,
|
|
"--output",
|
|
"json",
|
|
]
|
|
+ extra_args,
|
|
cwd=repo_root,
|
|
env=env,
|
|
text=True,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
)
|
|
return result
|
|
|
|
|
|
def deref_session_value(cli, repo_root, root_dir, config, graph, value):
|
|
if not isinstance(value, int):
|
|
return value
|
|
return run_json(
|
|
cli,
|
|
repo_root,
|
|
root_dir,
|
|
config,
|
|
graph,
|
|
f"[:find ?title . :where [{value} :block/title ?title]]",
|
|
)
|
|
|
|
|
|
def read_text(path):
|
|
return path.read_text(encoding="utf8", errors="replace")
|
|
|
|
|
|
def wait_for_log(path, text, process):
|
|
deadline = time.time() + 30
|
|
while time.time() < deadline:
|
|
if path.exists() and text in read_text(path):
|
|
return
|
|
if process.poll() is not None:
|
|
raise SystemExit(
|
|
"agent bridge exited before {}\nstdout:\n{}".format(
|
|
text, read_text(path)
|
|
)
|
|
)
|
|
time.sleep(0.2)
|
|
raise SystemExit("agent bridge did not log {!r}\nstdout:\n{}".format(text, read_text(path)))
|
|
|
|
|
|
def find_task_id(cli, repo_root, root_dir, config, graph, title):
|
|
task_id = run_json(
|
|
cli,
|
|
repo_root,
|
|
root_dir,
|
|
config,
|
|
graph,
|
|
'[:find ?e . :where [?e :block/title "{}"]]'.format(title),
|
|
)
|
|
if task_id is None:
|
|
raise SystemExit("task block was not found: {}".format(title))
|
|
return task_id
|
|
|
|
|
|
def find_task_uuid(cli, repo_root, root_dir, config, graph, title):
|
|
task_uuid = run_json(
|
|
cli,
|
|
repo_root,
|
|
root_dir,
|
|
config,
|
|
graph,
|
|
'[:find ?uuid . :where [?e :block/title "{}"] [?e :block/uuid ?uuid]]'.format(title),
|
|
)
|
|
if task_uuid is None:
|
|
raise SystemExit("task block uuid was not found: {}".format(title))
|
|
return task_uuid
|
|
|
|
|
|
def create_task(cli, repo_root, root_dir, config, graph, title):
|
|
run_cli(
|
|
cli,
|
|
repo_root,
|
|
root_dir,
|
|
config,
|
|
graph,
|
|
[
|
|
"upsert",
|
|
"task",
|
|
"--graph",
|
|
graph,
|
|
"--target-page",
|
|
"AgentBridgeE2E",
|
|
"--content",
|
|
title,
|
|
"--status",
|
|
"todo",
|
|
],
|
|
)
|
|
|
|
|
|
def assign_task(cli, repo_root, root_dir, config, graph, title=TASK_TITLE):
|
|
task_id = find_task_id(cli, repo_root, root_dir, config, graph, title)
|
|
run_cli(
|
|
cli,
|
|
repo_root,
|
|
root_dir,
|
|
config,
|
|
graph,
|
|
[
|
|
"upsert",
|
|
"block",
|
|
"--graph",
|
|
graph,
|
|
"--id",
|
|
str(task_id),
|
|
"--update-properties",
|
|
'{{"Assignee" "{}"}}'.format(os.uname().nodename),
|
|
],
|
|
)
|
|
|
|
|
|
def read_codex_events(codex_log):
|
|
if not codex_log.exists():
|
|
return []
|
|
return [
|
|
json.loads(line)
|
|
for line in codex_log.read_text(encoding="utf8").splitlines()
|
|
if line.strip()
|
|
]
|
|
|
|
|
|
def session_query_for_title(title):
|
|
return (
|
|
'[:find ?session . :where [?e :block/title "{}"] '
|
|
"[?e :logseq.property.agent/session-id ?session]]"
|
|
).format(title)
|
|
|
|
|
|
def wait_for_task_sessions(cli, repo_root, root_dir, config, graph, titles, bridge, bridge_log, bridge_err):
|
|
deadline = time.time() + 45
|
|
sessions = {}
|
|
while time.time() < deadline:
|
|
sessions = {
|
|
title: deref_session_value(
|
|
cli,
|
|
repo_root,
|
|
root_dir,
|
|
config,
|
|
graph,
|
|
run_json(cli, repo_root, root_dir, config, graph, session_query_for_title(title)),
|
|
)
|
|
for title in titles
|
|
}
|
|
if all(sessions.values()):
|
|
return sessions
|
|
if bridge.poll() is not None:
|
|
raise SystemExit(
|
|
"agent bridge exited early with {}\nstdout:\n{}\nstderr:\n{}".format(
|
|
bridge.returncode, read_text(bridge_log), read_text(bridge_err)
|
|
)
|
|
)
|
|
time.sleep(0.5)
|
|
raise SystemExit(
|
|
"agent-session-id was not written for every task; last sessions={!r}\nstdout:\n{}\nstderr:\n{}".format(
|
|
sessions, read_text(bridge_log), read_text(bridge_err)
|
|
)
|
|
)
|
|
|
|
|
|
def wait_for_codex_event(codex_log, event_name, bridge, bridge_log, bridge_err):
|
|
deadline = time.time() + 30
|
|
events = []
|
|
while time.time() < deadline:
|
|
events = read_codex_events(codex_log)
|
|
matches = [event for event in events if event.get("event") == event_name]
|
|
if matches:
|
|
return matches
|
|
if bridge.poll() is not None:
|
|
raise SystemExit(
|
|
"agent bridge exited early with {}\nstdout:\n{}\nstderr:\n{}".format(
|
|
bridge.returncode, read_text(bridge_log), read_text(bridge_err)
|
|
)
|
|
)
|
|
time.sleep(0.5)
|
|
raise SystemExit("codex event {!r} was not observed; events={!r}".format(event_name, events))
|
|
|
|
|
|
def write_comment_blocks_file(path, task_uuid, hostname):
|
|
path.write_text(
|
|
"""[{{:block/title "Comments"
|
|
:block/tags [:logseq.class/Comments]
|
|
:logseq.property.comments/blocks [[:block/uuid #uuid "{task_uuid}"]]
|
|
:block/children [{{:block/title "[[{hostname}]] please continue from the comment"
|
|
:block/tags [:logseq.class/Comment]}}]}}]""".format(
|
|
task_uuid=task_uuid,
|
|
hostname=hostname,
|
|
),
|
|
encoding="utf8",
|
|
)
|
|
|
|
|
|
def run_comment_mention_check(cli, repo_root, root_dir, config, graph, tmp_dir):
|
|
create_task(cli, repo_root, root_dir, config, graph, TASK_TITLE)
|
|
|
|
fake_bin = tmp_dir / "fake-bin"
|
|
bridge_log = tmp_dir / "agent-bridge.log"
|
|
bridge_err = tmp_dir / "agent-bridge.err"
|
|
codex_log = tmp_dir / "codex-invocations.jsonl"
|
|
|
|
env = os.environ.copy()
|
|
env["PATH"] = str(fake_bin) + os.pathsep + env.get("PATH", "")
|
|
env["CODEX_FAKE_LOG"] = str(codex_log)
|
|
|
|
with bridge_log.open("wb") as out, bridge_err.open("wb") as err:
|
|
bridge = subprocess.Popen(
|
|
cli
|
|
+ [
|
|
"--root-dir",
|
|
root_dir,
|
|
"--config",
|
|
config,
|
|
"--output",
|
|
"human",
|
|
"agent",
|
|
"bridge",
|
|
"--graph",
|
|
graph,
|
|
],
|
|
cwd=repo_root,
|
|
env=env,
|
|
stdout=out,
|
|
stderr=err,
|
|
)
|
|
|
|
try:
|
|
wait_for_log(bridge_log, "listening graph changes", bridge)
|
|
assign_task(cli, repo_root, root_dir, config, graph)
|
|
session = wait_for_task_sessions(
|
|
cli, repo_root, root_dir, config, graph, [TASK_TITLE], bridge, bridge_log, bridge_err
|
|
)[TASK_TITLE]
|
|
|
|
task_id = find_task_id(cli, repo_root, root_dir, config, graph, TASK_TITLE)
|
|
task_uuid = find_task_uuid(cli, repo_root, root_dir, config, graph, TASK_TITLE)
|
|
blocks_file = tmp_dir / "comment-blocks.edn"
|
|
write_comment_blocks_file(blocks_file, task_uuid, os.uname().nodename)
|
|
run_cli(
|
|
cli,
|
|
repo_root,
|
|
root_dir,
|
|
config,
|
|
graph,
|
|
[
|
|
"upsert",
|
|
"block",
|
|
"--graph",
|
|
graph,
|
|
"--target-id",
|
|
str(task_id),
|
|
"--pos",
|
|
"last-child",
|
|
"--blocks-file",
|
|
str(blocks_file),
|
|
],
|
|
)
|
|
|
|
resume_events = wait_for_codex_event(codex_log, "resume", bridge, bridge_log, bridge_err)
|
|
if resume_events[0].get("session") != session:
|
|
raise SystemExit("comment resumed the wrong session: {!r}".format(resume_events[0]))
|
|
prompt = resume_events[0].get("prompt", "")
|
|
assert "You are handling a Logseq AgentBridge comment request." in prompt, prompt
|
|
assert "Comment target context:" in prompt, prompt
|
|
assert "Requesting comment:" in prompt, prompt
|
|
print("agent bridge resumed comment request in " + session)
|
|
finally:
|
|
if bridge.poll() is None:
|
|
bridge.terminate()
|
|
try:
|
|
bridge.wait(timeout=5)
|
|
except subprocess.TimeoutExpired:
|
|
bridge.kill()
|
|
bridge.wait(timeout=5)
|
|
|
|
|
|
def run_parallel_assignment_check(cli, repo_root, root_dir, config, graph, tmp_dir):
|
|
for title in PARALLEL_TASK_TITLES:
|
|
create_task(cli, repo_root, root_dir, config, graph, title)
|
|
|
|
fake_bin = tmp_dir / "fake-bin"
|
|
bridge_log = tmp_dir / "agent-bridge.log"
|
|
bridge_err = tmp_dir / "agent-bridge.err"
|
|
codex_log = tmp_dir / "codex-invocations.jsonl"
|
|
|
|
env = os.environ.copy()
|
|
env["PATH"] = str(fake_bin) + os.pathsep + env.get("PATH", "")
|
|
env["CODEX_FAKE_LOG"] = str(codex_log)
|
|
env["CODEX_FAKE_DELAY_SECONDS"] = "5"
|
|
env["CODEX_FAKE_SESSION_BY_UUID"] = "1"
|
|
|
|
with bridge_log.open("wb") as out, bridge_err.open("wb") as err:
|
|
bridge = subprocess.Popen(
|
|
cli
|
|
+ [
|
|
"--root-dir",
|
|
root_dir,
|
|
"--config",
|
|
config,
|
|
"--output",
|
|
"human",
|
|
"agent",
|
|
"bridge",
|
|
"--graph",
|
|
graph,
|
|
],
|
|
cwd=repo_root,
|
|
env=env,
|
|
stdout=out,
|
|
stderr=err,
|
|
)
|
|
|
|
try:
|
|
wait_for_log(bridge_log, "listening graph changes", bridge)
|
|
for title in PARALLEL_TASK_TITLES:
|
|
assign_task(cli, repo_root, root_dir, config, graph, title)
|
|
|
|
sessions = wait_for_task_sessions(
|
|
cli, repo_root, root_dir, config, graph, PARALLEL_TASK_TITLES, bridge, bridge_log, bridge_err
|
|
)
|
|
events = read_codex_events(codex_log)
|
|
start_events = [event for event in events if event.get("event") == "start"]
|
|
session_events = [event for event in events if event.get("event") == "session"]
|
|
if len(start_events) != 2 or len(session_events) != 2:
|
|
raise SystemExit("expected two codex starts and sessions, got: {!r}".format(events))
|
|
|
|
first_session_time = min(event["time"] for event in session_events)
|
|
latest_start_time = max(event["time"] for event in start_events)
|
|
if latest_start_time >= first_session_time:
|
|
raise SystemExit(
|
|
"codex exec did not overlap; starts={!r}, sessions={!r}".format(
|
|
[event["time"] for event in start_events],
|
|
[event["time"] for event in session_events],
|
|
)
|
|
)
|
|
|
|
prompts = "\n".join(event["prompt"] for event in start_events)
|
|
for title in PARALLEL_TASK_TITLES:
|
|
assert title in prompts, prompts
|
|
print("agent bridge routed tasks concurrently: " + ", ".join(sorted(sessions.values())))
|
|
finally:
|
|
if bridge.poll() is None:
|
|
bridge.terminate()
|
|
try:
|
|
bridge.wait(timeout=5)
|
|
except subprocess.TimeoutExpired:
|
|
bridge.kill()
|
|
bridge.wait(timeout=5)
|
|
|
|
|
|
def edn_string(value):
|
|
return json.dumps(value, ensure_ascii=False)
|
|
|
|
|
|
def write_task_template_file(path, marker):
|
|
template = "\n".join(
|
|
[
|
|
marker,
|
|
"Graph: {{graph}}",
|
|
"Block UUID: {{block-uuid}}",
|
|
"AgentBridge name: {{agent-name}}",
|
|
"{{task-block-tree}}",
|
|
]
|
|
)
|
|
path.write_text(
|
|
"""[{:block/title "Task prompt template"
|
|
:block/children [{:block/title "Description: Custom task template for this graph."}
|
|
{:block/title "Template variables: {{graph}}, {{block-uuid}}, {{agent-name}}, and {{task-block-tree}}."}
|
|
{:block/title %s}]}]"""
|
|
% edn_string("```text\n" + template + "\n```"),
|
|
encoding="utf8",
|
|
)
|
|
|
|
|
|
def write_invalid_task_template_file(path):
|
|
path.write_text(
|
|
"""[{:block/title "Task prompt template"
|
|
:block/children [{:block/title "Description: Invalid task template for lint coverage."}
|
|
{:block/title %s}]}]"""
|
|
% edn_string("```text\nBroken {{graph}} {{unknown-var}}\n```"),
|
|
encoding="utf8",
|
|
)
|
|
|
|
|
|
def install_task_template(cli, repo_root, root_dir, config, graph, tmp_dir, marker):
|
|
run_cli(
|
|
cli,
|
|
repo_root,
|
|
root_dir,
|
|
config,
|
|
graph,
|
|
["upsert", "page", "--graph", graph, "--page", "AgentBridge"],
|
|
)
|
|
blocks_file = tmp_dir / ("task-template-" + graph + ".edn")
|
|
write_task_template_file(blocks_file, marker)
|
|
run_cli(
|
|
cli,
|
|
repo_root,
|
|
root_dir,
|
|
config,
|
|
graph,
|
|
[
|
|
"upsert",
|
|
"block",
|
|
"--graph",
|
|
graph,
|
|
"--target-page",
|
|
"AgentBridge",
|
|
"--blocks-file",
|
|
str(blocks_file),
|
|
],
|
|
)
|
|
|
|
|
|
def install_invalid_task_template(cli, repo_root, root_dir, config, graph, tmp_dir):
|
|
run_cli(
|
|
cli,
|
|
repo_root,
|
|
root_dir,
|
|
config,
|
|
graph,
|
|
["upsert", "page", "--graph", graph, "--page", "AgentBridge"],
|
|
)
|
|
blocks_file = tmp_dir / ("invalid-task-template-" + graph + ".edn")
|
|
write_invalid_task_template_file(blocks_file)
|
|
run_cli(
|
|
cli,
|
|
repo_root,
|
|
root_dir,
|
|
config,
|
|
graph,
|
|
[
|
|
"upsert",
|
|
"block",
|
|
"--graph",
|
|
graph,
|
|
"--target-page",
|
|
"AgentBridge",
|
|
"--blocks-file",
|
|
str(blocks_file),
|
|
],
|
|
)
|
|
|
|
|
|
def dry_run_prompt(cli, repo_root, root_dir, config, graph, env):
|
|
result = run_cli_with_env(
|
|
cli,
|
|
repo_root,
|
|
root_dir,
|
|
config,
|
|
["agent", "bridge", "--graph", graph, "--dry-run"],
|
|
env,
|
|
)
|
|
if result.returncode != 0:
|
|
raise SystemExit(
|
|
"agent bridge dry-run failed for {}\nstdout:\n{}\nstderr:\n{}".format(
|
|
graph, result.stdout, result.stderr
|
|
)
|
|
)
|
|
payload = json.loads(result.stdout)
|
|
commands = payload.get("data", {}).get("commands", [])
|
|
if len(commands) != 1:
|
|
raise SystemExit("expected one dry-run command for {}, got {!r}".format(graph, commands))
|
|
command = commands[0].get("command", [])
|
|
if len(command) < 1:
|
|
raise SystemExit("dry-run command did not contain a prompt: {!r}".format(command))
|
|
return command[-1]
|
|
|
|
|
|
def assert_contains(text, expected):
|
|
if expected not in text:
|
|
raise SystemExit("expected {!r} in:\n{}".format(expected, text))
|
|
|
|
|
|
def assert_not_contains(text, unexpected):
|
|
if unexpected in text:
|
|
raise SystemExit("did not expect {!r} in:\n{}".format(unexpected, text))
|
|
|
|
|
|
def run_prompt_template_graph_check(cli, repo_root, root_dir, config, graph, tmp_dir):
|
|
graph_a = graph
|
|
graph_b = graph + "-other"
|
|
graph_bad = graph + "-invalid"
|
|
marker_a = "CUSTOM GRAPH A TEMPLATE"
|
|
marker_b = "CUSTOM GRAPH B TEMPLATE"
|
|
|
|
fake_bin = tmp_dir / "fake-bin"
|
|
env = os.environ.copy()
|
|
env["PATH"] = str(fake_bin) + os.pathsep + env.get("PATH", "")
|
|
env["CODEX_FAKE_LOG"] = str(tmp_dir / "codex-dry-run.jsonl")
|
|
|
|
for graph_name in [graph_b, graph_bad]:
|
|
run_cli(
|
|
cli,
|
|
repo_root,
|
|
root_dir,
|
|
config,
|
|
graph_name,
|
|
["graph", "create", "--graph", graph_name],
|
|
)
|
|
|
|
for graph_name in [graph_a, graph_b, graph_bad]:
|
|
create_task(cli, repo_root, root_dir, config, graph_name, TASK_TITLE)
|
|
assign_task(cli, repo_root, root_dir, config, graph_name)
|
|
|
|
install_task_template(cli, repo_root, root_dir, config, graph_a, tmp_dir, marker_a)
|
|
prompt_a = dry_run_prompt(cli, repo_root, root_dir, config, graph_a, env)
|
|
assert_contains(prompt_a, marker_a)
|
|
assert_contains(prompt_a, "Graph: " + graph_a)
|
|
assert_not_contains(prompt_a, marker_b)
|
|
assert_not_contains(prompt_a, "You are handling a Logseq AgentBridge task.")
|
|
|
|
prompt_b_default = dry_run_prompt(cli, repo_root, root_dir, config, graph_b, env)
|
|
assert_contains(prompt_b_default, "You are handling a Logseq AgentBridge task.")
|
|
assert_contains(prompt_b_default, "Graph: " + graph_b)
|
|
assert_not_contains(prompt_b_default, marker_a)
|
|
|
|
install_task_template(cli, repo_root, root_dir, config, graph_b, tmp_dir, marker_b)
|
|
prompt_b = dry_run_prompt(cli, repo_root, root_dir, config, graph_b, env)
|
|
assert_contains(prompt_b, marker_b)
|
|
assert_contains(prompt_b, "Graph: " + graph_b)
|
|
assert_not_contains(prompt_b, marker_a)
|
|
assert_not_contains(prompt_b, "You are handling a Logseq AgentBridge task.")
|
|
|
|
prompt_a_again = dry_run_prompt(cli, repo_root, root_dir, config, graph_a, env)
|
|
assert_contains(prompt_a_again, marker_a)
|
|
assert_not_contains(prompt_a_again, marker_b)
|
|
|
|
install_invalid_task_template(cli, repo_root, root_dir, config, graph_bad, tmp_dir)
|
|
bad_result = run_cli_with_env(
|
|
cli,
|
|
repo_root,
|
|
root_dir,
|
|
config,
|
|
["agent", "bridge", "--graph", graph_bad, "--dry-run"],
|
|
env,
|
|
)
|
|
if bad_result.returncode == 0:
|
|
raise SystemExit("invalid graph prompt template unexpectedly passed:\n" + bad_result.stdout)
|
|
assert_contains(bad_result.stdout + bad_result.stderr, "agent-prompt-template-invalid")
|
|
|
|
print("agent bridge graph prompt templates are isolated and linted")
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument("--cli", required=True)
|
|
parser.add_argument("--root-dir", required=True)
|
|
parser.add_argument("--config", required=True)
|
|
parser.add_argument("--graph", required=True)
|
|
parser.add_argument("--tmp-dir", required=True)
|
|
parser.add_argument("--repo-root", required=True)
|
|
parser.add_argument("--prepare-fake-codex-only", action="store_true")
|
|
parser.add_argument("--assign-after-start", action="store_true")
|
|
parser.add_argument("--parallel-assignment-check", action="store_true")
|
|
parser.add_argument("--comment-mention-check", action="store_true")
|
|
parser.add_argument("--prompt-template-graph-check", action="store_true")
|
|
args = parser.parse_args()
|
|
|
|
repo_root = pathlib.Path(args.repo_root)
|
|
tmp_dir = pathlib.Path(args.tmp_dir)
|
|
fake_bin = tmp_dir / "fake-bin"
|
|
bridge_log = tmp_dir / "agent-bridge.log"
|
|
bridge_err = tmp_dir / "agent-bridge.err"
|
|
codex_log = tmp_dir / "codex-invocations.jsonl"
|
|
cli = ["node", args.cli]
|
|
|
|
write_fake_codex(fake_bin)
|
|
if args.prepare_fake_codex_only:
|
|
return
|
|
|
|
if args.parallel_assignment_check:
|
|
run_parallel_assignment_check(cli, repo_root, args.root_dir, args.config, args.graph, tmp_dir)
|
|
return
|
|
|
|
if args.comment_mention_check:
|
|
run_comment_mention_check(cli, repo_root, args.root_dir, args.config, args.graph, tmp_dir)
|
|
return
|
|
|
|
if args.prompt_template_graph_check:
|
|
run_prompt_template_graph_check(cli, repo_root, args.root_dir, args.config, args.graph, tmp_dir)
|
|
return
|
|
|
|
env = os.environ.copy()
|
|
env["PATH"] = str(fake_bin) + os.pathsep + env.get("PATH", "")
|
|
env["CODEX_FAKE_LOG"] = str(codex_log)
|
|
|
|
with bridge_log.open("wb") as out, bridge_err.open("wb") as err:
|
|
bridge = subprocess.Popen(
|
|
cli
|
|
+ [
|
|
"--root-dir",
|
|
args.root_dir,
|
|
"--config",
|
|
args.config,
|
|
"--output",
|
|
"human",
|
|
"agent",
|
|
"bridge",
|
|
"--graph",
|
|
args.graph,
|
|
],
|
|
cwd=repo_root,
|
|
env=env,
|
|
stdout=out,
|
|
stderr=err,
|
|
)
|
|
|
|
try:
|
|
if args.assign_after_start:
|
|
wait_for_log(bridge_log, "listening graph changes", bridge)
|
|
assign_task(cli, repo_root, args.root_dir, args.config, args.graph)
|
|
|
|
deadline = time.time() + 30
|
|
session = None
|
|
query = (
|
|
'[:find ?session . :where [?e :block/title "{}"] '
|
|
"[?e :logseq.property.agent/session-id ?session]]"
|
|
).format(TASK_TITLE)
|
|
|
|
while time.time() < deadline:
|
|
session = deref_session_value(
|
|
cli,
|
|
repo_root,
|
|
args.root_dir,
|
|
args.config,
|
|
args.graph,
|
|
run_json(cli, repo_root, args.root_dir, args.config, args.graph, query),
|
|
)
|
|
if session == EXPECTED_SESSION:
|
|
break
|
|
if bridge.poll() is not None:
|
|
raise SystemExit(
|
|
"agent bridge exited early with {}\nstdout:\n{}\nstderr:\n{}".format(
|
|
bridge.returncode, read_text(bridge_log), read_text(bridge_err)
|
|
)
|
|
)
|
|
time.sleep(0.5)
|
|
else:
|
|
raise SystemExit(
|
|
"agent-session-id was not written; last session={!r}\nstdout:\n{}\nstderr:\n{}".format(
|
|
session, read_text(bridge_log), read_text(bridge_err)
|
|
)
|
|
)
|
|
|
|
start_events = [event for event in read_codex_events(codex_log) if event.get("event") == "start"]
|
|
assert len(start_events) == 1, start_events
|
|
prompt = start_events[0]["prompt"]
|
|
assert TASK_TITLE in prompt, prompt
|
|
assert "Graph: " + args.graph in prompt, prompt
|
|
assert "Block UUID:" in prompt, prompt
|
|
print("agent bridge routed task to " + session)
|
|
finally:
|
|
if bridge.poll() is None:
|
|
bridge.terminate()
|
|
try:
|
|
bridge.wait(timeout=5)
|
|
except subprocess.TimeoutExpired:
|
|
bridge.kill()
|
|
bridge.wait(timeout=5)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|