Files
proxmox/scripts/verify/build-extended-live-inventory.py
2026-04-24 10:56:01 -07:00

272 lines
10 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
**Join** the 408-row unified table to *unique* (chain × address) on-chain+explorer
results, then **expand** to one line per table row.
**Default (no `--recompute-checks`):** reads
`contract-inventory-onchain-snapshot.json` from `build-deduped-onchain-inventory.py`
and **does not** re-run RPC / Etherscan for pairs already in that file. Fetches
only for addresses missing from the snapshot (e.g. you added a row before re-running
deduped). Use `--recompute-checks` for the previous “run `run_checks_with_cache` for
all unique pairs” behavior.
Output:
reports/inventory/DEPLOYED_CONTRACTS_UNIFIED_EXTENDED_LIVE_VERIFICATION.md
reports/inventory/contract-inventory-extended-live-snapshot.json
"""
from __future__ import annotations
import argparse
import json
import sys
import time
from pathlib import Path
from typing import Any
from inventory_onchain import (
ROOT,
UNIFIED_EXTENDED_PATH,
esc,
load_etherscan_key,
parse_table,
run_check,
)
from onchain_check_cache import cache_path_for, run_checks_with_cache
OUT_MD = ROOT / "reports/inventory/DEPLOYED_CONTRACTS_UNIFIED_EXTENDED_LIVE_VERIFICATION.md"
OUT_JSON = ROOT / "reports/inventory/contract-inventory-extended-live-snapshot.json"
CHECK_CACHE = cache_path_for(ROOT)
DEDUPED_SNAPSHOT = ROOT / "reports/inventory/contract-inventory-onchain-snapshot.json"
def _row_key(chain: str, address: str) -> tuple[str, str]:
return (str(chain).strip(), (address or "").lower())
def load_deduped_snapshot_as_by_addr(
path: Path,
) -> tuple[dict[tuple[str, str], dict[str, Any]], str | None]:
"""Return (by_addr map, generated_utc from the JSON) or ({} , None) if unreadable."""
if not path.is_file():
return {}, None
try:
data = json.loads(path.read_text(encoding="utf-8"))
except (json.JSONDecodeError, OSError, UnicodeDecodeError):
return {}, None
if not isinstance(data, dict) or "rows" not in data:
return {}, None
rows = data.get("rows")
if not isinstance(rows, list):
return {}, None
g = data.get("generated_utc")
by_addr: dict[tuple[str, str], dict[str, Any]] = {}
for r in rows:
if not isinstance(r, dict):
continue
ch, addr = r.get("chain"), r.get("address")
if ch is None or not addr:
continue
by_addr[_row_key(str(ch), str(addr))] = r
return by_addr, (str(g) if g else None)
def main() -> int:
ap = argparse.ArgumentParser(
description="Expand unified table (408) with on-chain+explorer columns; by default reuses the deduped on-chain snapshot JSON (no duplicate API for those pairs)."
)
ap.add_argument(
"--recompute-checks",
action="store_true",
help=(
"Run the full per-(chain,address) `run_check` path for all unique pairs "
f"(+ resume cache) instead of loading {DEDUPED_SNAPSHOT.name} first. "
"Slower; use after changing chain/RPC/verify logic, or to ignore a stale snapshot."
),
)
ap.add_argument(
"--no-cache",
action="store_true",
help="Ignore the on-disk check cache; re-run all RPC + Sourcify + Etherscan (high API use).",
)
ap.add_argument(
"--refresh-transient",
action="store_true",
help="Re-fetch only rows whose cached result looks like rate limit / RPC failure (saves a full re-run).",
)
args = ap.parse_args()
recompute = args.recompute_checks
use_cache = not args.no_cache
refresh = args.refresh_transient
if not UNIFIED_EXTENDED_PATH.is_file():
print("Missing", UNIFIED_EXTENDED_PATH, file=sys.stderr)
return 1
raw = parse_table(UNIFIED_EXTENDED_PATH)
if not raw:
print("No rows", file=sys.stderr)
return 1
by_key: dict[tuple[str, str], dict[str, Any]] = {}
for r in raw:
k = (r["chain"], r["address"])
if k not in by_key:
by_key[k] = {"chain": r["chain"], "address": r["address"]}
key_order = sorted(
by_key.keys(),
key=lambda x: (int(x[0]) if str(x[0]).isdigit() else 999_999, x[1].lower()),
)
to_run = [by_key[k] for k in key_order]
print(
f"Table rows: {len(raw)} | Unique (ch×addr) checks: {len(to_run)} (shared across duplicate labels)"
)
to_run_slim: list[dict[str, Any]] = [
{"chain": e["chain"], "address": e["address"]} for e in to_run
]
by_addr: dict[tuple[str, str], dict[str, Any]] = {}
snap_gen: str | None = None
on_chain_mode = "network"
fetched = 0
if recompute:
on_chain_mode = "network"
print(" on-chain: --recompute-checks (full per-pair `run_check` with resume cache).")
else:
snap_by, snap_gen = load_deduped_snapshot_as_by_addr(DEDUPED_SNAPSHOT)
if not snap_by:
print(
f" on-chain: no/empty {DEDUPED_SNAPSHOT.name}"
f" (run `python3 scripts/verify/build-deduped-onchain-inventory.py` first) — will fetch all pairs."
)
else:
n_hit = 0
for e in to_run_slim:
k = _row_key(e["chain"], e["address"])
r = snap_by.get(k)
if r is not None:
by_addr[k] = r
n_hit += 1
on_chain_mode = "snapshot" if n_hit == len(to_run) else "hybrid"
print(
f" on-chain: {n_hit}/{len(to_run)} pair(s) from {DEDUPED_SNAPSHOT.name}"
f" (generated: {snap_gen or 'n/a'})"
)
if recompute:
need: list[dict[str, Any]] = list(to_run_slim)
else:
need = [
e
for e in to_run_slim
if _row_key(e["chain"], e["address"]) not in by_addr
]
if need:
es_key = load_etherscan_key()
print("Etherscan key loaded:", bool(es_key))
print("Check cache file:", CHECK_CACHE)
fetched = len(need)
merged = run_checks_with_cache(
need,
es_key,
run_check,
CHECK_CACHE,
use_cache=use_cache,
refresh_transient=refresh,
max_workers=5,
progress_every=50,
)
for k, row in merged.items():
by_addr[k] = row
else:
es_key = load_etherscan_key()
print("Etherscan key loaded (metadata only, 0 fetches this run):", bool(es_key))
missing = [
e
for e in to_run_slim
if _row_key(e["chain"], e["address"]) not in by_addr
]
if missing:
print(
f" error: still missing {len(missing)} pair(s) after fetch/snapshot — internal bug or cache failure",
file=sys.stderr,
)
return 1
out_rows: list[dict[str, Any]] = []
raw_sorted = sorted(raw, key=lambda r: int(r["row_no"]))
for r in raw_sorted:
k = (r["chain"], r["address"].lower())
check = by_addr.get(k) or {}
ex_raw = (
(check.get("source_blockscout") or "")
if r["chain"] == "138"
else (check.get("source_etherscan") or "")
)
out_rows.append(
{
"row_no": r["row_no"],
"network": r["network"],
"chain": r["chain"],
"name": r["name"],
"address": r["address"],
"provenance": r.get("provenance", ""),
"code_on_chain": check.get("code_on_chain"),
"code_detail": check.get("code_detail"),
"source_sourcify": check.get("source_sourcify"),
"explorer_etherscan_or_blockscout": esc(str(ex_raw))[:220],
}
)
ts = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
snap: dict[str, Any] = {
"generated_utc": ts,
"source_file": str(UNIFIED_EXTENDED_PATH.relative_to(ROOT)),
"table_rows": len(out_rows),
"unique_chain_address_checks": len(to_run),
"on_chain_data": {
"mode": on_chain_mode,
"deduped_snapshot_file": DEDUPED_SNAPSHOT.name,
"deduped_snapshot_generated_utc": snap_gen,
"pairs_fetched_this_run": fetched,
"recompute_checks": recompute,
},
"note": "Per-(chain,address) results are joined to every table row. Default: read from the deduped on-chain snapshot JSON, then fetch only missing pairs.",
"etherscan_key_used": bool(es_key),
"rows": out_rows,
}
OUT_JSON.write_text(json.dumps(snap, indent=2) + "\n", encoding="utf-8")
print("Wrote", OUT_JSON)
lines = [
"# Extended inventory — **live** verification (all table rows)",
"",
f"**Generated (UTC):** {ts}",
f"**Source table:** [`DEPLOYED_CONTRACTS_UNIFIED_EXTENDED.md`](DEPLOYED_CONTRACTS_UNIFIED_EXTENDED.md) — **{len(out_rows)}** rows.",
f"**Unique on-chain API passes:** {len(to_run)} (one per chain × address; **shared** when the same address appears in multiple rows).",
f"**JSON:** [`{OUT_JSON.name}`]({OUT_JSON.name})",
f"**Regenerate:** `python3 scripts/verify/build-extended-live-inventory.py` — by default reuses [`{DEDUPED_SNAPSHOT.name}`]({DEDUPED_SNAPSHOT.name}) (run `build-deduped-onchain-inventory.py` first) so this step does **not** duplicate Etherscan/RPC. Use `--recompute-checks` to re-run all unique pairs via the [check cache](contract-inventory-onchain-check-cache.json); `--no-cache` / `--refresh-transient` apply to that path.",
"",
"## What this is",
"",
"Same automation as the deduplicated report (`eth_getCode` + Sourcify + Blockscout for 138 + Etherscan V2 where configured), but **one output line per extended row** so you can reconcile **all 408** label/network combinations.",
"",
"| # | Ch | Network | Label | Address | Code | Sourcify | Explorer (ABI) | Provenance |",
"|---:|---:|:---|:---|:---|:---|:---|:---|:---|",
]
for o in out_rows:
c = by_addr.get((o["chain"], o["address"].lower()), {})
sfy = esc(c.get("source_sourcify", ""))[:200]
code = c.get("code_on_chain") or ""
ex = o["explorer_etherscan_or_blockscout"][:200]
addr = o["address"] if o["address"].startswith("0x") else f"0x{o['address']}"
lines.append(
f"| {o['row_no']} | {o['chain']} | {esc(o['network'][:80])} | {esc(o['name'][:100])} | `{addr}` | {code} | {sfy} | {ex} | {esc(o.get('provenance', '')[:280])} |"
)
OUT_MD.write_text("\n".join(lines) + "\n", encoding="utf-8")
print("Wrote", OUT_MD)
return 0
if __name__ == "__main__":
raise SystemExit(main())