Files
proxmox/scripts/lib/non_blockchain_vm_routing_matrix.py
defiQUG 377369a5be
Some checks failed
Deploy to Phoenix / deploy (push) Has been skipped
Deploy to Phoenix / deploy-atomic-swap-dapp (push) Has been skipped
Deploy to Phoenix / cloudflare (push) Has been skipped
Deploy to Phoenix / validate (push) Failing after 2s
feat(gitea-phoenix): gov runtime, deploy/template parity, workflow dedupe docs
- Add gov-portals-runtime.v1.json + schema; jq gate in validate-config-files
- Python: parity-deploy-targets, parity-operational-template (IP strict, hostname WARN),
  parity-gov-portals-runtime; validate-vm-routing-parity.sh wrapper
- check-gov-portal-workflow-canonical-strings.sh for monorepo Pattern A
- PORTAL_WORKFLOW_PARITY.md; template headers; repos README; operator checklist secrets
- report-gitea-cd-parity runs full VM routing parity; task doc marked complete
- GOV_PORTALS_XOM_DEV + GITEA_GOV + MASTER_INDEX + matrix doc cross-links

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-12 15:55:50 -07:00

414 lines
15 KiB
Python

#!/usr/bin/env python3
"""
Generate / validate non-blockchain VM routing matrix vs live inventory.
See config/gitea-phoenix/README.md and docs/04-configuration/GITEA_PHOENIX_NON_BLOCKCHAIN_VM_ROUTING_CLEANUP_TASK.md
"""
from __future__ import annotations
import argparse
import json
import re
import sys
from pathlib import Path
from typing import Any
def load_json(path: Path) -> Any:
return json.loads(path.read_text(encoding="utf-8"))
def guest_excluded(name: str, rules: list[dict[str, Any]]) -> bool:
for r in rules:
if r.get("kind") != "hostname_prefix":
continue
prefix = r.get("value", "")
if prefix and name.startswith(prefix):
return True
return False
def categorize(name: str, vmid: str) -> str:
v = int(vmid)
if "npmplus" in name:
return "npmplus"
if 10233 <= v <= 10240:
return "npmplus"
if v == 5000 or "blockscout" in name:
return "blockscout"
if v in (7801, 7804, 7810) or "gov-portal" in name or "sankofa-portal" in name:
return "gov_portals"
if v == 8604 or "currencicombo" in name.lower():
return "phoenix_app"
if v == 5801 or "atomic-swap" in name:
return "phoenix_app"
if 5700 <= v <= 5705 or "act-runner" in name or "runner" in name:
return "ci_runner"
if 5410 <= v <= 5599 or "ccip" in name.lower() or "chainlink" in name.lower():
return "ccip"
if v >= 10000 or "dbis" in name or "postgres" in name or "redis" in name:
return "sovereign_dbis"
if 100 <= v <= 199:
return "infrastructure"
if "monitoring" in name or v == 130:
return "monitoring"
return "cluster_service"
_VMID_FROM_DESC = re.compile(r"\b(?:CT|VMID|LXC)\s+(\d{3,5})\b", re.IGNORECASE)
def _vmid_from_target_description(description: str) -> str | None:
"""Map deploy-target description text to a single guest VMID when unambiguous."""
if not description:
return None
m = _VMID_FROM_DESC.search(description)
if not m:
return None
return m.group(1)
def enrich_from_deploy_targets(
repo_root: Path,
) -> dict[str, dict[str, Any]]:
"""Map VMID string -> gitea_repos, deploy_target, health_url from deploy-targets.json descriptions + healthchecks."""
path = repo_root / "phoenix-deploy-api" / "deploy-targets.json"
if not path.is_file():
return {}
data = load_json(path)
out: dict[str, dict[str, Any]] = {}
for t in data.get("targets", []):
repo = t.get("repo")
target = t.get("target")
desc = t.get("description") or ""
if not repo:
continue
vmid = _vmid_from_target_description(desc)
hc = t.get("healthcheck") or {}
url = hc.get("url") or ""
if vmid is None and url:
if "192.168.11.51:3000" in url:
vmid = "7801"
elif "blockscout.defi-oracle.io" in url or "/api/config/capabilities" in url:
vmid = "5000"
if vmid is None:
continue
cur = out.setdefault(vmid, {"gitea_repos": [], "deploy_targets": [], "health_urls": []})
if repo not in cur["gitea_repos"]:
cur["gitea_repos"].append(repo)
if target and target not in cur["deploy_targets"]:
cur["deploy_targets"].append(target)
if url and url not in cur["health_urls"]:
cur["health_urls"].append(url)
for vmid, cur in out.items():
cur["deploy_target"] = cur["deploy_targets"][0] if cur["deploy_targets"] else None
cur["health_url"] = cur["health_urls"][0] if cur["health_urls"] else None
del cur["deploy_targets"]
del cur["health_urls"]
return out
def build_entries(
inventory: dict[str, Any],
exclusions: dict[str, Any],
repo_root: Path,
) -> list[dict[str, Any]]:
rules = exclusions.get("rules", [])
enrich = enrich_from_deploy_targets(repo_root)
collected_at = inventory.get("collected_at", "")
source = inventory.get("source", "")
guests = inventory.get("guests", [])
rows: list[dict[str, Any]] = []
for g in guests:
if g.get("status") != "running":
continue
vmid = str(g.get("vmid", "")).strip()
name = (g.get("name") or "").strip()
if not vmid or not name:
continue
if guest_excluded(name, rules):
continue
cat = categorize(name, vmid)
ex = enrich.get(vmid, {})
repos = list(ex.get("gitea_repos") or [])
deploy_target = ex.get("deploy_target")
health_url = ex.get("health_url")
workflow = None
if repos:
slug = repos[0].split("/")[-1].lower()
workflow = f".gitea/workflows/*.yml (see template for {slug})"
notes = (
f"Row from inventory snapshot collected_at={collected_at}; "
"fill gitea_repos / deploy_target / health_url for Phoenix-backed services."
)
if repos:
notes = (
f"Partially enriched from deploy-targets.json healthchecks ({source}); "
"confirm workflow_glob in repo."
)
rows.append(
{
"vmid": vmid,
"hostname": name,
"primary_ip": (g.get("ip") or "") or "",
"category": cat,
"gitea_repos": repos,
"deploy_target": deploy_target,
"workflow_glob": workflow,
"health_url": health_url,
"notes": notes,
}
)
rows.sort(key=lambda r: int(r["vmid"]))
return rows
def cmd_generate(args: argparse.Namespace) -> int:
inv_path = Path(args.inventory)
exc_path = Path(args.exclusions)
out_path = Path(args.out)
repo_root = Path(args.repo_root).resolve()
inventory = load_json(inv_path)
exclusions = load_json(exc_path)
entries = build_entries(inventory, exclusions, repo_root)
doc: dict[str, Any] = {
"schemaVersion": "1",
"generated_from_inventory_collected_at": inventory.get("collected_at"),
"generated_from_inventory_source": inventory.get("source"),
"allowed_missing": [],
"entries": entries,
}
out_path.parent.mkdir(parents=True, exist_ok=True)
out_path.write_text(json.dumps(doc, indent=2) + "\n", encoding="utf-8")
print(f"Wrote {len(entries)} entries to {out_path}", file=sys.stderr)
return 0
def cmd_validate(args: argparse.Namespace) -> int:
inv_path = Path(args.inventory)
mtx_path = Path(args.matrix)
exc_path = Path(args.exclusions)
inventory = load_json(inv_path)
matrix = load_json(mtx_path)
exclusions = load_json(exc_path)
rules = exclusions.get("rules", [])
running = [g for g in inventory.get("guests", []) if g.get("status") == "running"]
in_scope = [
str(g["vmid"])
for g in running
if g.get("name") and not guest_excluded(str(g["name"]), rules)
]
inv_set = set(in_scope)
allowed_raw = matrix.get("allowed_missing") or []
allowed = {str(x["vmid"]): x.get("reason", "") for x in allowed_raw if x.get("vmid")}
entries = matrix.get("entries")
if not isinstance(entries, list) or not entries:
print("ERROR: matrix.entries must be a non-empty array", file=sys.stderr)
return 1
matrix_ids: set[str] = set()
for i, e in enumerate(entries):
req = (
"vmid",
"hostname",
"primary_ip",
"category",
"gitea_repos",
"deploy_target",
"workflow_glob",
"health_url",
"notes",
)
missing = [k for k in req if k not in e]
if missing:
print(f"ERROR: entry[{i}] missing keys: {missing}", file=sys.stderr)
return 1
vid = str(e["vmid"])
if vid in matrix_ids:
print(f"ERROR: duplicate vmid in matrix: {vid}", file=sys.stderr)
return 1
matrix_ids.add(vid)
missing_vmids = sorted(inv_set - matrix_ids - set(allowed.keys()), key=int)
if missing_vmids:
for v in missing_vmids:
print(f"ERROR: running in-scope VMID {v} missing from matrix (not in allowed_missing)", file=sys.stderr)
return 1
extra = sorted(matrix_ids - inv_set, key=int)
for v in extra:
print(f"WARN: matrix vmid {v} not in current inventory running in-scope set", file=sys.stderr)
print(
f"OK: inventory in-scope={len(inv_set)} matrix entries={len(matrix_ids)} allowed_missing={len(allowed)}",
file=sys.stderr,
)
return 0
def cmd_parity_deploy_targets(args: argparse.Namespace) -> int:
"""Each deploy-target with a resolved VMID must list its repo on the matrix row; optional health URL match."""
repo_root = Path(args.repo_root).resolve()
matrix = load_json(Path(args.matrix))
targets_path = repo_root / "phoenix-deploy-api" / "deploy-targets.json"
if not targets_path.is_file():
print("ERROR: deploy-targets.json missing", file=sys.stderr)
return 1
data = load_json(targets_path)
by_vmid: dict[str, dict[str, Any]] = {str(e["vmid"]): e for e in matrix.get("entries", [])}
errs = 0
for t in data.get("targets", []):
repo = t.get("repo")
desc = t.get("description") or ""
if not repo or "/" not in str(repo):
continue
vmid = _vmid_from_target_description(desc)
hc = t.get("healthcheck") or {}
url = (hc.get("url") or "").strip()
if vmid is None and url:
if "192.168.11.51:3000" in url:
vmid = "7801"
elif "blockscout.defi-oracle.io" in url or "/api/config/capabilities" in url:
vmid = "5000"
if vmid is None:
continue
row = by_vmid.get(vmid)
if not row:
print(f"WARN parity-deploy-targets: vmid {vmid} has no matrix row (repo {repo})", file=sys.stderr)
continue
grepos = row.get("gitea_repos") or []
if repo not in grepos:
print(
f"ERROR parity-deploy-targets: repo {repo} for vmid {vmid} not in matrix gitea_repos {grepos}",
file=sys.stderr,
)
errs += 1
# Multi-repo CT (7804): health URL differs per portal — skip URL equality.
if url and row.get("health_url") and len(grepos) <= 1 and row["health_url"] != url:
print(
f"ERROR parity-deploy-targets: vmid {vmid} health_url matrix={row['health_url']!r} "
f"target={url!r} repo={repo}",
file=sys.stderr,
)
errs += 1
if errs:
return 1
print("OK: parity-deploy-targets (repo ⊆ matrix; health match where single-repo row)", file=sys.stderr)
return 0
def cmd_parity_operational_template(args: argparse.Namespace) -> int:
"""Matrix hostname + IP must match config/proxmox-operational-template.json when a service row exists."""
matrix = load_json(Path(args.matrix))
tmpl = load_json(Path(args.template))
services = tmpl.get("services") or []
by_vmid: dict[str, dict[str, Any]] = {}
for s in services:
vid = s.get("vmid")
if vid is None:
continue
by_vmid[str(int(vid))] = s
errs = 0
warns = 0
for row in matrix.get("entries", []):
vid = str(row["vmid"])
s = by_vmid.get(vid)
if not s:
continue
h_t = (s.get("hostname") or "").strip()
ip_t = (s.get("ipv4") or "").strip()
h_m = (row.get("hostname") or "").strip()
ip_m = (row.get("primary_ip") or "").strip()
if h_t and h_m and h_t != h_m:
print(
f"WARN parity-operational: vmid {vid} hostname inventory/matrix={h_m!r} "
f"operational_template={h_t!r} (template may use design names)",
file=sys.stderr,
)
warns += 1
if not ip_t or not ip_m:
continue
if ip_t == ip_m:
continue
# NPMplus primary: live inventory often lists first net (.166); template uses ingress .167.
if vid == "10233" and ip_m == "192.168.11.166" and ip_t == "192.168.11.167":
print(
"WARN parity-operational: vmid 10233 matrix IP .166 vs template .167 (dual-homed; see ALL_VMIDS)",
file=sys.stderr,
)
warns += 1
continue
print(
f"ERROR parity-operational: vmid {vid} ipv4 matrix={ip_m!r} template={ip_t!r}",
file=sys.stderr,
)
errs += 1
if warns:
print(f"WARN: parity-operational-template: {warns} hostname/IP note(s)", file=sys.stderr)
if errs:
return 1
print("OK: parity-operational-template (hostname + ipv4 vs matrix)", file=sys.stderr)
return 0
def cmd_parity_gov_portals_runtime(args: argparse.Namespace) -> int:
"""Matrix row 7804 gitea_repos must match gov-portals-runtime.v1.json portal list."""
matrix = load_json(Path(args.matrix))
runtime = load_json(Path(args.runtime))
portals = runtime.get("portals") or []
expected = sorted(p["gitea_repo"] for p in portals if p.get("gitea_repo"))
row7804 = next((e for e in matrix.get("entries", []) if str(e.get("vmid")) == "7804"), None)
if not row7804:
print("ERROR parity-gov-runtime: no matrix row for vmid 7804", file=sys.stderr)
return 1
got = sorted(row7804.get("gitea_repos") or [])
if got != expected:
print(
f"ERROR parity-gov-runtime: matrix 7804 gitea_repos {got} != runtime {expected}",
file=sys.stderr,
)
return 1
print("OK: parity-gov-portals-runtime (7804 repos vs runtime file)", file=sys.stderr)
return 0
def main() -> int:
ap = argparse.ArgumentParser(description=__doc__)
sub = ap.add_subparsers(dest="cmd", required=True)
g = sub.add_parser("generate", help="Write matrix JSON from inventory + exclusions")
g.add_argument("--inventory", required=True, type=Path)
g.add_argument("--exclusions", required=True, type=Path)
g.add_argument("--out", required=True, type=Path)
g.add_argument("--repo-root", type=Path, default=Path(__file__).resolve().parents[2])
g.set_defaults(func=cmd_generate)
v = sub.add_parser("validate", help="Fail if in-scope inventory VMIDs are not covered")
v.add_argument("--inventory", required=True, type=Path)
v.add_argument("--exclusions", required=True, type=Path)
v.add_argument("--matrix", required=True, type=Path)
v.set_defaults(func=cmd_validate)
pdt = sub.add_parser("parity-deploy-targets", help="Matrix rows cover deploy-target repos (and health for single-repo VMIDs)")
pdt.add_argument("--repo-root", type=Path, default=Path(__file__).resolve().parents[2])
pdt.add_argument("--matrix", required=True, type=Path)
pdt.set_defaults(func=cmd_parity_deploy_targets)
pot = sub.add_parser("parity-operational-template", help="Matrix hostname/IP vs proxmox-operational-template services")
pot.add_argument("--matrix", required=True, type=Path)
pot.add_argument("--template", required=True, type=Path)
pot.set_defaults(func=cmd_parity_operational_template)
pgr = sub.add_parser("parity-gov-portals-runtime", help="Matrix vmid 7804 gitea_repos match gov-portals-runtime.v1.json")
pgr.add_argument("--matrix", required=True, type=Path)
pgr.add_argument("--runtime", required=True, type=Path)
pgr.set_defaults(func=cmd_parity_gov_portals_runtime)
args = ap.parse_args()
return int(args.func(args))
if __name__ == "__main__":
raise SystemExit(main())