272 lines
10 KiB
Python
272 lines
10 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
**Join** the 408-row unified table to *unique* (chain × address) on-chain+explorer
|
||
results, then **expand** to one line per table row.
|
||
|
||
**Default (no `--recompute-checks`):** reads
|
||
`contract-inventory-onchain-snapshot.json` from `build-deduped-onchain-inventory.py`
|
||
and **does not** re-run RPC / Etherscan for pairs already in that file. Fetches
|
||
only for addresses missing from the snapshot (e.g. you added a row before re-running
|
||
deduped). Use `--recompute-checks` for the previous “run `run_checks_with_cache` for
|
||
all unique pairs” behavior.
|
||
|
||
Output:
|
||
reports/inventory/DEPLOYED_CONTRACTS_UNIFIED_EXTENDED_LIVE_VERIFICATION.md
|
||
reports/inventory/contract-inventory-extended-live-snapshot.json
|
||
"""
|
||
from __future__ import annotations
|
||
|
||
import argparse
|
||
import json
|
||
import sys
|
||
import time
|
||
from pathlib import Path
|
||
from typing import Any
|
||
|
||
from inventory_onchain import (
|
||
ROOT,
|
||
UNIFIED_EXTENDED_PATH,
|
||
esc,
|
||
load_etherscan_key,
|
||
parse_table,
|
||
run_check,
|
||
)
|
||
from onchain_check_cache import cache_path_for, run_checks_with_cache
|
||
|
||
OUT_MD = ROOT / "reports/inventory/DEPLOYED_CONTRACTS_UNIFIED_EXTENDED_LIVE_VERIFICATION.md"
|
||
OUT_JSON = ROOT / "reports/inventory/contract-inventory-extended-live-snapshot.json"
|
||
CHECK_CACHE = cache_path_for(ROOT)
|
||
DEDUPED_SNAPSHOT = ROOT / "reports/inventory/contract-inventory-onchain-snapshot.json"
|
||
|
||
|
||
def _row_key(chain: str, address: str) -> tuple[str, str]:
|
||
return (str(chain).strip(), (address or "").lower())
|
||
|
||
|
||
def load_deduped_snapshot_as_by_addr(
|
||
path: Path,
|
||
) -> tuple[dict[tuple[str, str], dict[str, Any]], str | None]:
|
||
"""Return (by_addr map, generated_utc from the JSON) or ({} , None) if unreadable."""
|
||
if not path.is_file():
|
||
return {}, None
|
||
try:
|
||
data = json.loads(path.read_text(encoding="utf-8"))
|
||
except (json.JSONDecodeError, OSError, UnicodeDecodeError):
|
||
return {}, None
|
||
if not isinstance(data, dict) or "rows" not in data:
|
||
return {}, None
|
||
rows = data.get("rows")
|
||
if not isinstance(rows, list):
|
||
return {}, None
|
||
g = data.get("generated_utc")
|
||
by_addr: dict[tuple[str, str], dict[str, Any]] = {}
|
||
for r in rows:
|
||
if not isinstance(r, dict):
|
||
continue
|
||
ch, addr = r.get("chain"), r.get("address")
|
||
if ch is None or not addr:
|
||
continue
|
||
by_addr[_row_key(str(ch), str(addr))] = r
|
||
return by_addr, (str(g) if g else None)
|
||
|
||
|
||
def main() -> int:
|
||
ap = argparse.ArgumentParser(
|
||
description="Expand unified table (408) with on-chain+explorer columns; by default reuses the deduped on-chain snapshot JSON (no duplicate API for those pairs)."
|
||
)
|
||
ap.add_argument(
|
||
"--recompute-checks",
|
||
action="store_true",
|
||
help=(
|
||
"Run the full per-(chain,address) `run_check` path for all unique pairs "
|
||
f"(+ resume cache) instead of loading {DEDUPED_SNAPSHOT.name} first. "
|
||
"Slower; use after changing chain/RPC/verify logic, or to ignore a stale snapshot."
|
||
),
|
||
)
|
||
ap.add_argument(
|
||
"--no-cache",
|
||
action="store_true",
|
||
help="Ignore the on-disk check cache; re-run all RPC + Sourcify + Etherscan (high API use).",
|
||
)
|
||
ap.add_argument(
|
||
"--refresh-transient",
|
||
action="store_true",
|
||
help="Re-fetch only rows whose cached result looks like rate limit / RPC failure (saves a full re-run).",
|
||
)
|
||
args = ap.parse_args()
|
||
recompute = args.recompute_checks
|
||
use_cache = not args.no_cache
|
||
refresh = args.refresh_transient
|
||
if not UNIFIED_EXTENDED_PATH.is_file():
|
||
print("Missing", UNIFIED_EXTENDED_PATH, file=sys.stderr)
|
||
return 1
|
||
raw = parse_table(UNIFIED_EXTENDED_PATH)
|
||
if not raw:
|
||
print("No rows", file=sys.stderr)
|
||
return 1
|
||
by_key: dict[tuple[str, str], dict[str, Any]] = {}
|
||
for r in raw:
|
||
k = (r["chain"], r["address"])
|
||
if k not in by_key:
|
||
by_key[k] = {"chain": r["chain"], "address": r["address"]}
|
||
key_order = sorted(
|
||
by_key.keys(),
|
||
key=lambda x: (int(x[0]) if str(x[0]).isdigit() else 999_999, x[1].lower()),
|
||
)
|
||
to_run = [by_key[k] for k in key_order]
|
||
print(
|
||
f"Table rows: {len(raw)} | Unique (ch×addr) checks: {len(to_run)} (shared across duplicate labels)"
|
||
)
|
||
to_run_slim: list[dict[str, Any]] = [
|
||
{"chain": e["chain"], "address": e["address"]} for e in to_run
|
||
]
|
||
|
||
by_addr: dict[tuple[str, str], dict[str, Any]] = {}
|
||
snap_gen: str | None = None
|
||
on_chain_mode = "network"
|
||
fetched = 0
|
||
|
||
if recompute:
|
||
on_chain_mode = "network"
|
||
print(" on-chain: --recompute-checks (full per-pair `run_check` with resume cache).")
|
||
else:
|
||
snap_by, snap_gen = load_deduped_snapshot_as_by_addr(DEDUPED_SNAPSHOT)
|
||
if not snap_by:
|
||
print(
|
||
f" on-chain: no/empty {DEDUPED_SNAPSHOT.name}"
|
||
f" (run `python3 scripts/verify/build-deduped-onchain-inventory.py` first) — will fetch all pairs."
|
||
)
|
||
else:
|
||
n_hit = 0
|
||
for e in to_run_slim:
|
||
k = _row_key(e["chain"], e["address"])
|
||
r = snap_by.get(k)
|
||
if r is not None:
|
||
by_addr[k] = r
|
||
n_hit += 1
|
||
on_chain_mode = "snapshot" if n_hit == len(to_run) else "hybrid"
|
||
print(
|
||
f" on-chain: {n_hit}/{len(to_run)} pair(s) from {DEDUPED_SNAPSHOT.name}"
|
||
f" (generated: {snap_gen or 'n/a'})"
|
||
)
|
||
|
||
if recompute:
|
||
need: list[dict[str, Any]] = list(to_run_slim)
|
||
else:
|
||
need = [
|
||
e
|
||
for e in to_run_slim
|
||
if _row_key(e["chain"], e["address"]) not in by_addr
|
||
]
|
||
|
||
if need:
|
||
es_key = load_etherscan_key()
|
||
print("Etherscan key loaded:", bool(es_key))
|
||
print("Check cache file:", CHECK_CACHE)
|
||
fetched = len(need)
|
||
merged = run_checks_with_cache(
|
||
need,
|
||
es_key,
|
||
run_check,
|
||
CHECK_CACHE,
|
||
use_cache=use_cache,
|
||
refresh_transient=refresh,
|
||
max_workers=5,
|
||
progress_every=50,
|
||
)
|
||
for k, row in merged.items():
|
||
by_addr[k] = row
|
||
else:
|
||
es_key = load_etherscan_key()
|
||
print("Etherscan key loaded (metadata only, 0 fetches this run):", bool(es_key))
|
||
|
||
missing = [
|
||
e
|
||
for e in to_run_slim
|
||
if _row_key(e["chain"], e["address"]) not in by_addr
|
||
]
|
||
if missing:
|
||
print(
|
||
f" error: still missing {len(missing)} pair(s) after fetch/snapshot — internal bug or cache failure",
|
||
file=sys.stderr,
|
||
)
|
||
return 1
|
||
|
||
out_rows: list[dict[str, Any]] = []
|
||
raw_sorted = sorted(raw, key=lambda r: int(r["row_no"]))
|
||
for r in raw_sorted:
|
||
k = (r["chain"], r["address"].lower())
|
||
check = by_addr.get(k) or {}
|
||
ex_raw = (
|
||
(check.get("source_blockscout") or "—")
|
||
if r["chain"] == "138"
|
||
else (check.get("source_etherscan") or "—")
|
||
)
|
||
out_rows.append(
|
||
{
|
||
"row_no": r["row_no"],
|
||
"network": r["network"],
|
||
"chain": r["chain"],
|
||
"name": r["name"],
|
||
"address": r["address"],
|
||
"provenance": r.get("provenance", ""),
|
||
"code_on_chain": check.get("code_on_chain"),
|
||
"code_detail": check.get("code_detail"),
|
||
"source_sourcify": check.get("source_sourcify"),
|
||
"explorer_etherscan_or_blockscout": esc(str(ex_raw))[:220],
|
||
}
|
||
)
|
||
|
||
ts = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
|
||
snap: dict[str, Any] = {
|
||
"generated_utc": ts,
|
||
"source_file": str(UNIFIED_EXTENDED_PATH.relative_to(ROOT)),
|
||
"table_rows": len(out_rows),
|
||
"unique_chain_address_checks": len(to_run),
|
||
"on_chain_data": {
|
||
"mode": on_chain_mode,
|
||
"deduped_snapshot_file": DEDUPED_SNAPSHOT.name,
|
||
"deduped_snapshot_generated_utc": snap_gen,
|
||
"pairs_fetched_this_run": fetched,
|
||
"recompute_checks": recompute,
|
||
},
|
||
"note": "Per-(chain,address) results are joined to every table row. Default: read from the deduped on-chain snapshot JSON, then fetch only missing pairs.",
|
||
"etherscan_key_used": bool(es_key),
|
||
"rows": out_rows,
|
||
}
|
||
OUT_JSON.write_text(json.dumps(snap, indent=2) + "\n", encoding="utf-8")
|
||
print("Wrote", OUT_JSON)
|
||
|
||
lines = [
|
||
"# Extended inventory — **live** verification (all table rows)",
|
||
"",
|
||
f"**Generated (UTC):** {ts}",
|
||
f"**Source table:** [`DEPLOYED_CONTRACTS_UNIFIED_EXTENDED.md`](DEPLOYED_CONTRACTS_UNIFIED_EXTENDED.md) — **{len(out_rows)}** rows.",
|
||
f"**Unique on-chain API passes:** {len(to_run)} (one per chain × address; **shared** when the same address appears in multiple rows).",
|
||
f"**JSON:** [`{OUT_JSON.name}`]({OUT_JSON.name})",
|
||
f"**Regenerate:** `python3 scripts/verify/build-extended-live-inventory.py` — by default reuses [`{DEDUPED_SNAPSHOT.name}`]({DEDUPED_SNAPSHOT.name}) (run `build-deduped-onchain-inventory.py` first) so this step does **not** duplicate Etherscan/RPC. Use `--recompute-checks` to re-run all unique pairs via the [check cache](contract-inventory-onchain-check-cache.json); `--no-cache` / `--refresh-transient` apply to that path.",
|
||
"",
|
||
"## What this is",
|
||
"",
|
||
"Same automation as the deduplicated report (`eth_getCode` + Sourcify + Blockscout for 138 + Etherscan V2 where configured), but **one output line per extended row** so you can reconcile **all 408** label/network combinations.",
|
||
"",
|
||
"| # | Ch | Network | Label | Address | Code | Sourcify | Explorer (ABI) | Provenance |",
|
||
"|---:|---:|:---|:---|:---|:---|:---|:---|:---|",
|
||
]
|
||
for o in out_rows:
|
||
c = by_addr.get((o["chain"], o["address"].lower()), {})
|
||
sfy = esc(c.get("source_sourcify", "—"))[:200]
|
||
code = c.get("code_on_chain") or "—"
|
||
ex = o["explorer_etherscan_or_blockscout"][:200]
|
||
addr = o["address"] if o["address"].startswith("0x") else f"0x{o['address']}"
|
||
lines.append(
|
||
f"| {o['row_no']} | {o['chain']} | {esc(o['network'][:80])} | {esc(o['name'][:100])} | `{addr}` | {code} | {sfy} | {ex} | {esc(o.get('provenance', '')[:280])} |"
|
||
)
|
||
OUT_MD.write_text("\n".join(lines) + "\n", encoding="utf-8")
|
||
print("Wrote", OUT_MD)
|
||
return 0
|
||
|
||
|
||
if __name__ == "__main__":
|
||
raise SystemExit(main())
|