Files
proxmox/scripts/lib/non_blockchain_vm_routing_matrix.py
defiQUG e00e1f9b54
Some checks failed
Deploy to Phoenix / validate (push) Failing after 1s
Deploy to Phoenix / deploy (push) Has been skipped
Deploy to Phoenix / deploy-atomic-swap-dapp (push) Has been skipped
Deploy to Phoenix / cloudflare (push) Has been skipped
feat(gitea-phoenix): non-blockchain VM routing matrix + inventory closure CI
- Add besu hostname-prefix exclusions and JSON Schema for matrix rows
- Generate 94-row matrix from committed live_inventory (non-besu guests)
- Enrich VMIDs from deploy-targets descriptions (7804, 5000, 7801, …)
- Python generate/validate; shell wrapper; hook validate-config-files.sh
- Docs: task status, matrix doc links, MASTER_INDEX

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-12 15:43:39 -07:00

273 lines
9.4 KiB
Python

#!/usr/bin/env python3
"""
Generate / validate non-blockchain VM routing matrix vs live inventory.
See config/gitea-phoenix/README.md and docs/04-configuration/GITEA_PHOENIX_NON_BLOCKCHAIN_VM_ROUTING_CLEANUP_TASK.md
"""
from __future__ import annotations
import argparse
import json
import re
import sys
from pathlib import Path
from typing import Any
def load_json(path: Path) -> Any:
return json.loads(path.read_text(encoding="utf-8"))
def guest_excluded(name: str, rules: list[dict[str, Any]]) -> bool:
for r in rules:
if r.get("kind") != "hostname_prefix":
continue
prefix = r.get("value", "")
if prefix and name.startswith(prefix):
return True
return False
def categorize(name: str, vmid: str) -> str:
v = int(vmid)
if "npmplus" in name:
return "npmplus"
if 10233 <= v <= 10240:
return "npmplus"
if v == 5000 or "blockscout" in name:
return "blockscout"
if v in (7801, 7804, 7810) or "gov-portal" in name or "sankofa-portal" in name:
return "gov_portals"
if v == 8604 or "currencicombo" in name.lower():
return "phoenix_app"
if v == 5801 or "atomic-swap" in name:
return "phoenix_app"
if 5700 <= v <= 5705 or "act-runner" in name or "runner" in name:
return "ci_runner"
if 5410 <= v <= 5599 or "ccip" in name.lower() or "chainlink" in name.lower():
return "ccip"
if v >= 10000 or "dbis" in name or "postgres" in name or "redis" in name:
return "sovereign_dbis"
if 100 <= v <= 199:
return "infrastructure"
if "monitoring" in name or v == 130:
return "monitoring"
return "cluster_service"
_VMID_FROM_DESC = re.compile(r"\b(?:CT|VMID|LXC)\s+(\d{3,5})\b", re.IGNORECASE)
def _vmid_from_target_description(description: str) -> str | None:
"""Map deploy-target description text to a single guest VMID when unambiguous."""
if not description:
return None
m = _VMID_FROM_DESC.search(description)
if not m:
return None
return m.group(1)
def enrich_from_deploy_targets(
repo_root: Path,
) -> dict[str, dict[str, Any]]:
"""Map VMID string -> gitea_repos, deploy_target, health_url from deploy-targets.json descriptions + healthchecks."""
path = repo_root / "phoenix-deploy-api" / "deploy-targets.json"
if not path.is_file():
return {}
data = load_json(path)
out: dict[str, dict[str, Any]] = {}
for t in data.get("targets", []):
repo = t.get("repo")
target = t.get("target")
desc = t.get("description") or ""
if not repo:
continue
vmid = _vmid_from_target_description(desc)
hc = t.get("healthcheck") or {}
url = hc.get("url") or ""
if vmid is None and url:
if "192.168.11.51:3000" in url:
vmid = "7801"
elif "blockscout.defi-oracle.io" in url or "/api/config/capabilities" in url:
vmid = "5000"
if vmid is None:
continue
cur = out.setdefault(vmid, {"gitea_repos": [], "deploy_targets": [], "health_urls": []})
if repo not in cur["gitea_repos"]:
cur["gitea_repos"].append(repo)
if target and target not in cur["deploy_targets"]:
cur["deploy_targets"].append(target)
if url and url not in cur["health_urls"]:
cur["health_urls"].append(url)
for vmid, cur in out.items():
cur["deploy_target"] = cur["deploy_targets"][0] if cur["deploy_targets"] else None
cur["health_url"] = cur["health_urls"][0] if cur["health_urls"] else None
del cur["deploy_targets"]
del cur["health_urls"]
return out
def build_entries(
inventory: dict[str, Any],
exclusions: dict[str, Any],
repo_root: Path,
) -> list[dict[str, Any]]:
rules = exclusions.get("rules", [])
enrich = enrich_from_deploy_targets(repo_root)
collected_at = inventory.get("collected_at", "")
source = inventory.get("source", "")
guests = inventory.get("guests", [])
rows: list[dict[str, Any]] = []
for g in guests:
if g.get("status") != "running":
continue
vmid = str(g.get("vmid", "")).strip()
name = (g.get("name") or "").strip()
if not vmid or not name:
continue
if guest_excluded(name, rules):
continue
cat = categorize(name, vmid)
ex = enrich.get(vmid, {})
repos = list(ex.get("gitea_repos") or [])
deploy_target = ex.get("deploy_target")
health_url = ex.get("health_url")
workflow = None
if repos:
slug = repos[0].split("/")[-1].lower()
workflow = f".gitea/workflows/*.yml (see template for {slug})"
notes = (
f"Row from inventory snapshot collected_at={collected_at}; "
"fill gitea_repos / deploy_target / health_url for Phoenix-backed services."
)
if repos:
notes = (
f"Partially enriched from deploy-targets.json healthchecks ({source}); "
"confirm workflow_glob in repo."
)
rows.append(
{
"vmid": vmid,
"hostname": name,
"primary_ip": (g.get("ip") or "") or "",
"category": cat,
"gitea_repos": repos,
"deploy_target": deploy_target,
"workflow_glob": workflow,
"health_url": health_url,
"notes": notes,
}
)
rows.sort(key=lambda r: int(r["vmid"]))
return rows
def cmd_generate(args: argparse.Namespace) -> int:
inv_path = Path(args.inventory)
exc_path = Path(args.exclusions)
out_path = Path(args.out)
repo_root = Path(args.repo_root).resolve()
inventory = load_json(inv_path)
exclusions = load_json(exc_path)
entries = build_entries(inventory, exclusions, repo_root)
doc: dict[str, Any] = {
"schemaVersion": "1",
"generated_from_inventory_collected_at": inventory.get("collected_at"),
"generated_from_inventory_source": inventory.get("source"),
"allowed_missing": [],
"entries": entries,
}
out_path.parent.mkdir(parents=True, exist_ok=True)
out_path.write_text(json.dumps(doc, indent=2) + "\n", encoding="utf-8")
print(f"Wrote {len(entries)} entries to {out_path}", file=sys.stderr)
return 0
def cmd_validate(args: argparse.Namespace) -> int:
inv_path = Path(args.inventory)
mtx_path = Path(args.matrix)
exc_path = Path(args.exclusions)
inventory = load_json(inv_path)
matrix = load_json(mtx_path)
exclusions = load_json(exc_path)
rules = exclusions.get("rules", [])
running = [g for g in inventory.get("guests", []) if g.get("status") == "running"]
in_scope = [
str(g["vmid"])
for g in running
if g.get("name") and not guest_excluded(str(g["name"]), rules)
]
inv_set = set(in_scope)
allowed_raw = matrix.get("allowed_missing") or []
allowed = {str(x["vmid"]): x.get("reason", "") for x in allowed_raw if x.get("vmid")}
entries = matrix.get("entries")
if not isinstance(entries, list) or not entries:
print("ERROR: matrix.entries must be a non-empty array", file=sys.stderr)
return 1
matrix_ids: set[str] = set()
for i, e in enumerate(entries):
req = (
"vmid",
"hostname",
"primary_ip",
"category",
"gitea_repos",
"deploy_target",
"workflow_glob",
"health_url",
"notes",
)
missing = [k for k in req if k not in e]
if missing:
print(f"ERROR: entry[{i}] missing keys: {missing}", file=sys.stderr)
return 1
vid = str(e["vmid"])
if vid in matrix_ids:
print(f"ERROR: duplicate vmid in matrix: {vid}", file=sys.stderr)
return 1
matrix_ids.add(vid)
missing_vmids = sorted(inv_set - matrix_ids - set(allowed.keys()), key=int)
if missing_vmids:
for v in missing_vmids:
print(f"ERROR: running in-scope VMID {v} missing from matrix (not in allowed_missing)", file=sys.stderr)
return 1
extra = sorted(matrix_ids - inv_set, key=int)
for v in extra:
print(f"WARN: matrix vmid {v} not in current inventory running in-scope set", file=sys.stderr)
print(
f"OK: inventory in-scope={len(inv_set)} matrix entries={len(matrix_ids)} allowed_missing={len(allowed)}",
file=sys.stderr,
)
return 0
def main() -> int:
ap = argparse.ArgumentParser(description=__doc__)
sub = ap.add_subparsers(dest="cmd", required=True)
g = sub.add_parser("generate", help="Write matrix JSON from inventory + exclusions")
g.add_argument("--inventory", required=True, type=Path)
g.add_argument("--exclusions", required=True, type=Path)
g.add_argument("--out", required=True, type=Path)
g.add_argument("--repo-root", type=Path, default=Path(__file__).resolve().parents[2])
g.set_defaults(func=cmd_generate)
v = sub.add_parser("validate", help="Fail if in-scope inventory VMIDs are not covered")
v.add_argument("--inventory", required=True, type=Path)
v.add_argument("--exclusions", required=True, type=Path)
v.add_argument("--matrix", required=True, type=Path)
v.set_defaults(func=cmd_validate)
args = ap.parse_args()
return int(args.func(args))
if __name__ == "__main__":
raise SystemExit(main())