manager_utils(status): topo-sort tasks and better load error handling

This commit is contained in:
Rai (Michael Pokorny)
2025-06-24 22:49:49 -07:00
parent 9b2cb58ce5
commit c9613d41e5

View File

@@ -7,7 +7,7 @@ import sys
from datetime import datetime
import click
from tasklib import load_task, repo_root, save_task, task_dir
from tasklib import load_task, repo_root, save_task, task_dir, TaskMeta
import shutil
try:
@@ -28,130 +28,134 @@ def status():
If tabulate is installed, render as GitHub-flavored Markdown table;
otherwise fallback to fixed-width formatting.
"""
# preload all task statuses to filter dependencies
all_meta = {}
# Load all task metadata, reporting load errors with file path
all_meta: dict[str, TaskMeta] = {}
path_map: dict[str, Path] = {}
for md in sorted(task_dir().glob('*.md')):
if md.name == 'task-template.md' or md.name.endswith('-plan.md'):
if md.name in ('task-template.md',) or md.name.endswith('-plan.md'):
continue
try:
meta, _ = load_task(md)
except ValueError:
except Exception as e:
print(f"Error loading {md}: {e}")
continue
all_meta[meta.id] = meta
rows = []
merged_tasks = [] # collect merged tasks for bottom summary
path_map[meta.id] = md
# Build dependency graph
deps_map: dict[str, list[str]] = {}
for tid, meta in all_meta.items():
deps_map[tid] = [d for d in re.findall(r"\d+", meta.dependencies) if d in all_meta]
# Topologically sort tasks by dependencies, fall back on filename order on error
try:
sorted_ids: list[str] = []
temp: set[str] = set()
perm: set[str] = set()
def visit(n: str) -> None:
if n in perm:
return
if n in temp:
raise RuntimeError(f"Circular dependency detected at task {n}")
temp.add(n)
for m in deps_map.get(n, []):
visit(m)
temp.remove(n)
perm.add(n)
sorted_ids.append(n)
for n in all_meta:
visit(n)
sorted_ids.reverse()
except Exception as e:
print(f"Warning: cannot topo-sort tasks ({e}); falling back to filename order")
sorted_ids = [m.id for m in sorted(all_meta.values(), key=lambda m: path_map[m.id].name)]
rows: list[tuple] = []
merged_tasks: list[tuple[str, str]] = []
root = repo_root()
for md in sorted(task_dir().glob('*.md')):
if md.name == 'task-template.md' or md.name.endswith('-plan.md'):
continue
try:
meta, _ = load_task(md)
except ValueError as e:
print(e)
continue
for tid in sorted_ids:
meta = all_meta[tid]
md = path_map[tid]
slug = md.stem
# branch detection
# list matching task branch names cleanly
branches = subprocess.run(
['git', 'for-each-ref', '--format=%(refname:short)', f'refs/heads/agentydragon-{meta.id}-*'],
['git', 'for-each-ref', '--format=%(refname:short)',
f'refs/heads/agentydragon-{tid}-*'],
capture_output=True, text=True, cwd=root
).stdout.strip().splitlines()
branch_exists = 'Y' if branches and branches[0].strip() else 'N'
merged = 'N'
merged_flag = 'N'
if branch_exists == 'Y':
bname = branches[0].lstrip('*+ ').strip()
merged = 'Y' if subprocess.run(
['git', 'merge-base', '--is-ancestor', bname, 'agentydragon'],
cwd=root
).returncode == 0 else 'N'
b = branches[0].lstrip('*+ ').strip()
if subprocess.run(['git', 'merge-base', '--is-ancestor', b, 'agentydragon'], cwd=root).returncode == 0:
merged_flag = 'Y'
# worktree detection
wt_dir = task_dir() / '.worktrees' / slug
wt_exists = wt_dir.exists()
wt_clean = 'NA'
if wt_exists:
status_out = subprocess.run(
['git', 'status', '--porcelain'], cwd=wt_dir,
capture_output=True, text=True
).stdout.strip()
wt_clean = 'Clean' if not status_out else 'Dirty'
# derive branch & merge status (unchanged)
if branches:
bname = branches[0]
# merged into agentydragon?
is_merged = subprocess.run(
['git', 'merge-base', '--is-ancestor', bname, 'agentydragon'],
cwd=root
).returncode == 0
if is_merged:
branch_info = 'merged'
else:
# ahead/behind
a_cnt, b_cnt = subprocess.check_output(
['git', 'rev-list', '--left-right', '--count', f'{bname}...agentydragon'],
cwd=root
).decode().split()
# diffstat
stat = subprocess.check_output(
['git', 'diff', '--shortstat', f'{bname}...agentydragon'], cwd=root
).decode().strip()
diffstat = stat.replace(' file changed', '')
# merge conflict scan
base = subprocess.check_output(
['git', 'merge-base', 'agentydragon', bname], cwd=root
).decode().strip()
mtree = subprocess.check_output(
['git', 'merge-tree', base, 'agentydragon', bname], cwd=root
).decode(errors='ignore')
conflict = 'conflict' if '<<<<<<<' in mtree else 'ok'
if a_cnt == '0' and b_cnt == '0':
branch_info = f'up-to-date (+{diffstat or 0})'
else:
branch_info = f'{b_cnt} behind / {a_cnt} ahead (+{diffstat or 0}) {conflict}'
else:
branch_info = 'no branch'
# worktree status
wt_dir = task_dir() / '.worktrees' / slug
wt_info = 'none'
if wt_dir.exists():
wt_clean = 'clean' if not subprocess.run(
['git', 'status', '--porcelain'], cwd=wt_dir,
capture_output=True, text=True
).stdout.strip() else 'dirty'
wt_info = wt_clean
else:
wt_info = 'none'
# skip fully merged tasks (no branch, no worktree) into summary
if meta.status == 'Merged' and branch_info == 'no branch' and wt_info == 'none':
merged_tasks.append((meta.id, meta.title))
st = subprocess.run(['git', 'status', '--porcelain'], cwd=wt_dir,
capture_output=True, text=True).stdout.strip()
wt_info = 'clean' if not st else 'dirty'
# skip fully merged tasks (no branch, no worktree)
if meta.status == 'Merged' and branch_exists == 'N' and wt_info == 'none':
merged_tasks.append((tid, meta.title))
continue
# filter out merged dependencies by ID
deps = [d.strip() for d in re.findall(r"\d+", meta.dependencies)]
deps = [d for d in deps if all_meta.get(d, None) and all_meta[d].status != 'Merged']
# filter out merged dependencies
deps = [d for d in deps_map.get(tid, []) if all_meta[d].status != 'Merged']
deps_str = ','.join(deps)
# color status and worktree info with ANSI codes
# determine branch_info text
if branch_exists == 'N':
branch_info = 'no branch'
elif merged_flag == 'Y':
branch_info = 'merged'
else:
a_cnt, b_cnt = subprocess.check_output(
['git', 'rev-list', '--left-right', '--count',
f'{branches[0]}...agentydragon'], cwd=root
).decode().split()
stat = subprocess.check_output(
['git', 'diff', '--shortstat', f'{branches[0]}...agentydragon'], cwd=root
).decode().strip().replace(' file changed', '')
base = subprocess.check_output(
['git', 'merge-base', 'agentydragon', branches[0]], cwd=root
).decode().strip()
mtree = subprocess.check_output(
['git', 'merge-tree', base, 'agentydragon', branches[0]], cwd=root
).decode(errors='ignore')
conflict = 'conflict' if '<<<<<<<' in mtree else 'ok'
if a_cnt == '0' and b_cnt == '0':
branch_info = f'up-to-date (+{stat or 0})'
else:
branch_info = f'{b_cnt} behind / {a_cnt} ahead (+{stat or 0}) {conflict}'
# colorize status/worktree
stat_disp = meta.status
wt_disp = wt_info
if wt_info.lower() == 'dirty':
wt_disp = f"\033[31m{wt_info}\033[0m"
if meta.status in ('Done', 'Merged'):
stat_disp = f"\033[32m{meta.status}\033[0m"
wt_disp = wt_info
if wt_info == 'dirty':
wt_disp = f"\033[31m{wt_info}\033[0m"
rows.append((
meta.id, meta.title, stat_disp,
tid, meta.title, stat_disp,
deps_str, meta.last_updated.strftime('%Y-%m-%d %H:%M'),
branch_info, wt_disp
))
headers = ['ID', 'Title', 'Status', 'Dependencies', 'Updated',
'Branch Status', 'Worktree Status']
if tabulate:
# render as Markdown table if tabulate is available
print(tabulate(rows, headers=headers, tablefmt='github'))
else:
# fallback to plain fixed-width formatting
fmt = (
'{:>2} {:<30} {:<12} {:<20} {:<16} {:<40} {:<10}'
)
fmt = '{:>2} {:<30} {:<12} {:<20} {:<16} {:<40} {:<10}'
print(fmt.format(*headers))
for r in rows:
print(fmt.format(*r))
# summary of merged tasks (no branch, no worktree)
if merged_tasks:
items = ' '.join(f"{tid} ({title})" for tid, title in merged_tasks)