Files
proxmox/scripts/verify/build-inventory-completion-matrix.py
2026-04-24 10:56:01 -07:00

382 lines
17 KiB
Python

#!/usr/bin/env python3
"""
Build the contract-verification completion matrix from the live on-chain snapshot.
Inputs:
- reports/inventory/contract-inventory-onchain-snapshot.json
- reports/inventory/incomplete-duplicate-bytecode-analysis.json
- previous reports/inventory/deployed-contracts-completion-matrix.json (optional,
used only to avoid regressing COMPLETE_SOURCE_PUBLISHED rows when a fresh run
hits Etherscan-family rate limits)
Outputs:
- reports/inventory/deployed-contracts-completion-matrix.json
- reports/inventory/DEPLOYED_CONTRACTS_COMPLETION_MATRIX.md
Also patches the summary block in:
- reports/inventory/DEPLOYED_CONTRACTS_UNIFIED_EXTENDED.md
"""
from __future__ import annotations
import json
import re
import time
from collections import Counter, defaultdict
from pathlib import Path
from typing import Any
from inventory_onchain import ROOT
SNAP = ROOT / "reports/inventory/contract-inventory-onchain-snapshot.json"
DUP = ROOT / "reports/inventory/incomplete-duplicate-bytecode-analysis.json"
OLD_MATRIX = ROOT / "reports/inventory/deployed-contracts-completion-matrix.json"
OUT_JSON = ROOT / "reports/inventory/deployed-contracts-completion-matrix.json"
OUT_MD = ROOT / "reports/inventory/DEPLOYED_CONTRACTS_COMPLETION_MATRIX.md"
UNIFIED_EXTENDED = ROOT / "reports/inventory/DEPLOYED_CONTRACTS_UNIFIED_EXTENDED.md"
SUBMITTED_PENDING_138 = {
"0x022267b26400114af01baccb92456fe36cfccd93", # Voting
"0x105f8a15b819948a89153505762444ee9f324684", # CCIP Sender
"0x3f729632e9553ebaccde2e9b4c8f2b285b014f2e", # old DODOPMMProvider
"0x7131f887dbeeb2e44c1ed267d2a68b5b83285afc", # TransactionMirror
"0x86ada6ef91a3b450f89f2b751e93b1b7a3218895", # old DODOPMMIntegration
"0x90563867f2ba94ed277303e200f4311c00982e92", # Oracle Proxy (2026-02-13)
"0xafd9e25ff301a79feabcc56f46969f34808358ce", # Oracle Aggregator (2026-02-13)
"0xb9e29cfa1f89d369671e640d0bb3ad94cab43965", # Governance Multisig
"0xc12236c03b28e675d376774fce2c2c052488430f", # CCIP Receiver
"0xcacfd227a040002e49e2e01626363071324f820a", # CCIPWETH9Bridge
"0xe0e93247376aa097db308b92e6ba36ba015535d0", # CCIPWETH10Bridge
"0xf4aa429be277d1a1a1a744c9e5b3ad821a9b96f7", # Multicall (2026-02-13)
"0xf4bb2e28688e89fcce3c0580d37d36a7672e8a9f", # WETH10
}
STATUS_ORDER = {
"CLOSE_GENESIS_PREDEPLOY_MANUAL_IMPORT": 0,
"CLOSE_DUPLICATE_RUNTIME_MANUAL_SOURCE": 1,
"SUBMITTED_PENDING_BLOCKSCOUT": 2,
"WAIT_WEMIXSCAN_API_KEY": 3,
"MANUAL_CRONOS_EXPLORER_IMPORT": 4,
"NEEDS_BLOCKSCOUT_SOURCE_IMPORT": 5,
"NEEDS_ETHERSCAN_FAMILY_VERIFY": 6,
"MAPPING_ONLY_SOURCE_MISSING": 7,
"MANUAL_XDC_EXPLORER_CHECK": 8,
"MANUAL_ALLTRA_EXPLORER_CHECK": 9,
"REMOVE_NON_CONTRACT": 10,
}
SUMMARY_LINE_PREFIX = "- Current deduped closure state:"
def load_json(path: Path) -> dict[str, Any]:
return json.loads(path.read_text(encoding="utf-8"))
def normalize_address(address: str) -> str:
addr = (address or "").lower()
return addr if addr.startswith("0x") else f"0x{addr}"
def is_rate_limited(msg: str) -> bool:
m = (msg or "").lower()
return "rate limit" in m or "max calls" in m or "3/sec" in m
def is_source_published(row: dict[str, Any]) -> bool:
return any(
"yes" in str(row.get(key, "")).lower()
for key in ("source_sourcify", "source_blockscout", "source_etherscan")
)
def chain_sort_key(chain: str) -> tuple[int, str]:
return (int(chain) if str(chain).isdigit() else 999_999, str(chain))
def build_duplicate_key_set(dup: dict[str, Any]) -> set[tuple[str, str]]:
keys: set[tuple[str, str]] = set()
for match in dup.get("matches", []):
inc = match.get("incomplete", {})
keys.add((str(inc.get("chain", "")), normalize_address(inc.get("address", ""))))
return keys
def build_prior_published_map(old: dict[str, Any]) -> dict[tuple[str, str], dict[str, Any]]:
out: dict[tuple[str, str], dict[str, Any]] = {}
for row in old.get("rows", []):
if row.get("completion_status") != "COMPLETE_SOURCE_PUBLISHED":
continue
out[(str(row.get("chain", "")), normalize_address(row.get("address", "")))] = row
return out
def explorer_source_state(row: dict[str, Any]) -> str:
parts = []
if row.get("chain") == "138":
if row.get("source_blockscout"):
parts.append(str(row["source_blockscout"]))
else:
if row.get("source_etherscan"):
parts.append(str(row["source_etherscan"]))
if row.get("source_sourcify"):
parts.append(str(row["source_sourcify"]))
return "; ".join(parts)
def classify_row(
row: dict[str, Any],
duplicate_keys: set[tuple[str, str]],
prior_published: dict[tuple[str, str], dict[str, Any]],
) -> tuple[str, str]:
chain = str(row.get("chain", ""))
address = normalize_address(row.get("address", ""))
labels = str(row.get("labels_merged", ""))
provenance = str(row.get("provenance_merged", ""))
etherscan = str(row.get("source_etherscan", ""))
if row.get("code_on_chain") == "none":
return (
"REMOVE_NON_CONTRACT",
"No bytecode at latest block; remove from deployed-contract verification target or keep only as historical/mapping/failed-deploy evidence.",
)
if is_source_published(row):
return ("COMPLETE_SOURCE_PUBLISHED", "No action for source-publication closure.")
key = (chain, address)
if key in prior_published and is_rate_limited(etherscan):
return (
"COMPLETE_SOURCE_PUBLISHED",
"No action for source-publication closure (prior published state retained while explorer API was rate-limited).",
)
if chain == "138" and address == "0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2":
return (
"CLOSE_GENESIS_PREDEPLOY_MANUAL_IMPORT",
"Treat as deployed genesis predeploy with canonical WETH9 runtime; do not redeploy. Use Blockscout manual/imported-source path if explorer source-published flag is mandatory.",
)
if key in duplicate_keys:
return (
"CLOSE_DUPLICATE_RUNTIME_MANUAL_SOURCE",
"Runtime matches already-published implementation; can close as duplicate implementation, but chain explorer may still need manual source publication.",
)
if chain == "1111":
return (
"WAIT_WEMIXSCAN_API_KEY",
"Load WEMIXSCAN_API_KEY and rerun smom-dbis-138/scripts/deployment/verify-wemix-bridges.sh.",
)
if chain == "25":
return (
"MANUAL_CRONOS_EXPLORER_IMPORT",
"Use Cronos manual Standard JSON inputs under smom-dbis-138/.cronos-verify where available; mapping-only rows need explorer confirmation or removal from repo-owned target.",
)
if chain == "50":
return (
"MANUAL_XDC_EXPLORER_CHECK",
"XDC rows are endpoint/config inventory here; verify manually on XDC explorer or keep outside repo-owned source-publication target.",
)
if chain == "651940":
return (
"MANUAL_ALLTRA_EXPLORER_CHECK",
"All Mainnet/Alltra source API is unsupported by this automated pass; verify manually on its explorer.",
)
if chain == "138" and address in SUBMITTED_PENDING_138:
return (
"SUBMITTED_PENDING_BLOCKSCOUT",
"Verification submission was accepted in the aggregate run; keep polling/importing until Blockscout getabi/is_verified flips true.",
)
if chain == "138":
return (
"NEEDS_BLOCKSCOUT_SOURCE_IMPORT",
"Has bytecode on Chain 138 but no Blockscout ABI/source yet; submit exact source or use manual smart-contract import. Check deprecated/superseded labels before spending time.",
)
if "token-mapping" in provenance.lower() or "(to)" in labels or "(from)" in labels:
return (
"MAPPING_ONLY_SOURCE_MISSING",
"This is a routing/mapping address, not necessarily repo-owned deployment. Confirm ownership/scope; if in scope, verify on the chain explorer; otherwise mark external mapping.",
)
return (
"NEEDS_ETHERSCAN_FAMILY_VERIFY",
"Has bytecode but no automated source result; verify with the relevant Etherscan-family explorer or document as external.",
)
def build_rows(
snap: dict[str, Any],
duplicate_keys: set[tuple[str, str]],
prior_published: dict[tuple[str, str], dict[str, Any]],
) -> list[dict[str, Any]]:
rows: list[dict[str, Any]] = []
for srow in snap.get("rows", []):
chain = str(srow.get("chain", ""))
address = normalize_address(srow.get("address", ""))
status, action = classify_row(srow, duplicate_keys, prior_published)
rows.append(
{
"chain": chain,
"address": address,
"labels": srow.get("labels_merged", ""),
"code_on_chain": srow.get("code_on_chain"),
"code_detail": srow.get("code_detail", ""),
"source_sourcify": srow.get("source_sourcify", ""),
"source_blockscout": srow.get("source_blockscout", ""),
"source_etherscan": srow.get("source_etherscan", ""),
"completion_status": status,
"recommended_action": action,
"provenance": srow.get("provenance_merged", ""),
"explorer_source_state": explorer_source_state({**srow, "chain": chain}),
}
)
rows.sort(
key=lambda r: (
STATUS_ORDER.get(r["completion_status"], 99),
chain_sort_key(r["chain"]),
r["address"],
)
)
return rows
def esc(text: str, limit: int = 700) -> str:
return str(text).replace("|", "\\|").replace("\n", " ")[:limit]
def build_counts(rows: list[dict[str, Any]]) -> tuple[Counter, dict[str, dict[str, int]]]:
counts = Counter(r["completion_status"] for r in rows)
by_chain_counter: dict[str, Counter] = defaultdict(Counter)
for row in rows:
by_chain_counter[row["chain"]][row["completion_status"]] += 1
by_chain: dict[str, dict[str, int]] = {}
for chain in sorted(by_chain_counter, key=chain_sort_key):
by_chain[chain] = dict(
sorted(by_chain_counter[chain].items(), key=lambda item: (-item[1], item[0]))
)
return counts, by_chain
def write_json(rows: list[dict[str, Any]], counts: Counter, by_chain: dict[str, dict[str, int]]) -> str:
generated = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
not_verified_count = sum(
1 for row in rows if row["completion_status"] != "COMPLETE_SOURCE_PUBLISHED"
)
payload = {
"generated_utc": generated,
"source_snapshot": "contract-inventory-onchain-snapshot.json",
"dedupe_key": "chain_id + address lowercase",
"unique_pairs": len(rows),
"not_verified_rule": "completion_status != COMPLETE_SOURCE_PUBLISHED",
"not_verified_count": not_verified_count,
"counts_by_completion_status": {
key: counts[key] for key, _value in sorted(counts.items(), key=lambda item: (-item[1], item[0]))
},
"counts_by_chain": by_chain,
"rows": rows,
}
OUT_JSON.write_text(json.dumps(payload, indent=2) + "\n", encoding="utf-8")
return generated
def build_markdown(rows: list[dict[str, Any]], counts: Counter, generated: str) -> None:
completed = [row for row in rows if row["completion_status"] == "COMPLETE_SOURCE_PUBLISHED"]
open_rows = [row for row in rows if row["completion_status"] != "COMPLETE_SOURCE_PUBLISHED"]
completed_by_chain = Counter(row["chain"] for row in completed)
lines: list[str] = []
lines.append("# Deployed contracts completion matrix")
lines.append("")
lines.append(f"**Generated (UTC):** {generated}")
lines.append("**Source snapshot:** [`contract-inventory-onchain-snapshot.json`](contract-inventory-onchain-snapshot.json)")
lines.append("**Machine-readable matrix:** [`deployed-contracts-completion-matrix.json`](deployed-contracts-completion-matrix.json)")
lines.append("**Regenerate:** `python3 scripts/verify/build-deduped-onchain-inventory.py && python3 scripts/verify/build-inventory-completion-matrix.py`")
lines.append("")
lines.append("## Executive status")
lines.append("")
lines.append(f"- Unique chain/address pairs checked: **{len(rows)}**")
lines.append(f"- Not verified by matrix rule: **{len(open_rows)}**")
for status, _count in sorted(counts.items(), key=lambda item: (-item[1], item[0])):
lines.append(f"- {status}: **{counts[status]}**")
lines.append("")
lines.append("## Not Verified Rule")
lines.append("")
lines.append("- In this matrix, **not verified** means `completion_status != COMPLETE_SOURCE_PUBLISHED`.")
lines.append("- That includes true source-publication work, manual explorer workflows, and non-verification closeouts such as duplicates, genesis predeploys, and non-contract rows.")
lines.append("")
lines.append("## Recommendations")
lines.append("")
lines.append("- Treat `COMPLETE_SOURCE_PUBLISHED` as closed for the verification/publication list.")
lines.append("- Treat Chain 138 WETH9 as a deployed genesis predeploy with canonical WETH9 runtime; only use manual/imported-source if Blockscout `is_verified=true` is a hard requirement.")
lines.append("- Do not spend verification cycles on `REMOVE_NON_CONTRACT` rows unless the address was supposed to deploy; they have no bytecode and cannot be source-verified.")
lines.append("- For Chain 138 rows marked `SUBMITTED_PENDING_BLOCKSCOUT`, keep polling or use Blockscout manual import; the previous proxy run accepted submissions but the API still reports not verified.")
lines.append("- For mapping-only rows, decide whether the address is repo-owned. If not repo-owned, close as external/counterparty rather than forcing source publication.")
lines.append("")
lines.append("## Open / action rows")
lines.append("")
lines.append("| Ch | Address | Labels | Completion status | Recommended action | Explorer/source state |")
lines.append("|---:|:---|:---|:---|:---|:---|")
for row in open_rows:
lines.append(
f"| {row['chain']} | `{row['address']}` | {esc(row['labels'])} | `{row['completion_status']}` | {esc(row['recommended_action'])} | {esc(row['explorer_source_state'], 500)} |"
)
lines.append("")
lines.append("## Completed rows by chain")
lines.append("")
for chain in sorted(completed_by_chain, key=chain_sort_key):
lines.append(f"- Chain {chain}: **{completed_by_chain[chain]}** source-published rows")
lines.append("")
OUT_MD.write_text("\n".join(lines), encoding="utf-8")
def patch_unified_extended(counts: Counter, unique_pairs: int) -> None:
if not UNIFIED_EXTENDED.is_file():
return
text = UNIFIED_EXTENDED.read_text(encoding="utf-8")
replacement = (
f"{SUMMARY_LINE_PREFIX} **{unique_pairs}** unique chain/address pairs; "
f"**{counts['COMPLETE_SOURCE_PUBLISHED']}** source-published by automated API evidence, "
f"**{counts['CLOSE_GENESIS_PREDEPLOY_MANUAL_IMPORT']}** Chain 138 genesis-predeploy closeout, "
f"**{counts['CLOSE_DUPLICATE_RUNTIME_MANUAL_SOURCE']}** duplicate-runtime closeouts, "
f"**{counts['SUBMITTED_PENDING_BLOCKSCOUT']}** Chain 138 submissions pending Blockscout settlement, "
f"**{counts['NEEDS_BLOCKSCOUT_SOURCE_IMPORT']}** Chain 138 rows needing Blockscout source import/verification, "
f"**{counts['MAPPING_ONLY_SOURCE_MISSING']}** mapping-only rows needing scope confirmation, "
f"**{counts['NEEDS_ETHERSCAN_FAMILY_VERIFY']}** Etherscan-family rows needing verification, "
f"**{counts['MANUAL_CRONOS_EXPLORER_IMPORT'] + counts['MANUAL_XDC_EXPLORER_CHECK'] + counts['MANUAL_ALLTRA_EXPLORER_CHECK']}** manual/unsupported explorer rows, "
f"**{counts['WAIT_WEMIXSCAN_API_KEY']}** Wemix rows waiting on `WEMIXSCAN_API_KEY`, and "
f"**{counts['REMOVE_NON_CONTRACT']}** no-bytecode rows that cannot be source-verified as contracts."
)
pattern = rf"^{re.escape(SUMMARY_LINE_PREFIX)}.*$"
updated = re.sub(pattern, replacement, text, count=1, flags=re.MULTILINE)
if updated != text:
UNIFIED_EXTENDED.write_text(updated, encoding="utf-8")
def main() -> int:
snap = load_json(SNAP)
dup = load_json(DUP) if DUP.is_file() else {}
old = load_json(OLD_MATRIX) if OLD_MATRIX.is_file() else {"rows": []}
duplicate_keys = build_duplicate_key_set(dup)
prior_published = build_prior_published_map(old)
rows = build_rows(snap, duplicate_keys, prior_published)
counts, by_chain = build_counts(rows)
generated = write_json(rows, counts, by_chain)
build_markdown(rows, counts, generated)
patch_unified_extended(counts, len(rows))
print(f"Wrote {OUT_JSON}")
print(f"Wrote {OUT_MD}")
if UNIFIED_EXTENDED.is_file():
print(f"Patched {UNIFIED_EXTENDED}")
return 0
if __name__ == "__main__":
raise SystemExit(main())