#!/usr/bin/env python3 """ Build the contract-verification completion matrix from the live on-chain snapshot. Inputs: - reports/inventory/contract-inventory-onchain-snapshot.json - reports/inventory/incomplete-duplicate-bytecode-analysis.json - previous reports/inventory/deployed-contracts-completion-matrix.json (optional, used only to avoid regressing COMPLETE_SOURCE_PUBLISHED rows when a fresh run hits Etherscan-family rate limits) Outputs: - reports/inventory/deployed-contracts-completion-matrix.json - reports/inventory/DEPLOYED_CONTRACTS_COMPLETION_MATRIX.md Also patches the summary block in: - reports/inventory/DEPLOYED_CONTRACTS_UNIFIED_EXTENDED.md """ from __future__ import annotations import json import re import time from collections import Counter, defaultdict from pathlib import Path from typing import Any from inventory_onchain import ROOT SNAP = ROOT / "reports/inventory/contract-inventory-onchain-snapshot.json" DUP = ROOT / "reports/inventory/incomplete-duplicate-bytecode-analysis.json" OLD_MATRIX = ROOT / "reports/inventory/deployed-contracts-completion-matrix.json" OUT_JSON = ROOT / "reports/inventory/deployed-contracts-completion-matrix.json" OUT_MD = ROOT / "reports/inventory/DEPLOYED_CONTRACTS_COMPLETION_MATRIX.md" UNIFIED_EXTENDED = ROOT / "reports/inventory/DEPLOYED_CONTRACTS_UNIFIED_EXTENDED.md" SUBMITTED_PENDING_138 = { "0x022267b26400114af01baccb92456fe36cfccd93", # Voting "0x105f8a15b819948a89153505762444ee9f324684", # CCIP Sender "0x3f729632e9553ebaccde2e9b4c8f2b285b014f2e", # old DODOPMMProvider "0x7131f887dbeeb2e44c1ed267d2a68b5b83285afc", # TransactionMirror "0x86ada6ef91a3b450f89f2b751e93b1b7a3218895", # old DODOPMMIntegration "0x90563867f2ba94ed277303e200f4311c00982e92", # Oracle Proxy (2026-02-13) "0xafd9e25ff301a79feabcc56f46969f34808358ce", # Oracle Aggregator (2026-02-13) "0xb9e29cfa1f89d369671e640d0bb3ad94cab43965", # Governance Multisig "0xc12236c03b28e675d376774fce2c2c052488430f", # CCIP Receiver "0xcacfd227a040002e49e2e01626363071324f820a", # CCIPWETH9Bridge "0xe0e93247376aa097db308b92e6ba36ba015535d0", # CCIPWETH10Bridge "0xf4aa429be277d1a1a1a744c9e5b3ad821a9b96f7", # Multicall (2026-02-13) "0xf4bb2e28688e89fcce3c0580d37d36a7672e8a9f", # WETH10 } STATUS_ORDER = { "CLOSE_GENESIS_PREDEPLOY_MANUAL_IMPORT": 0, "CLOSE_DUPLICATE_RUNTIME_MANUAL_SOURCE": 1, "SUBMITTED_PENDING_BLOCKSCOUT": 2, "WAIT_WEMIXSCAN_API_KEY": 3, "MANUAL_CRONOS_EXPLORER_IMPORT": 4, "NEEDS_BLOCKSCOUT_SOURCE_IMPORT": 5, "NEEDS_ETHERSCAN_FAMILY_VERIFY": 6, "MAPPING_ONLY_SOURCE_MISSING": 7, "MANUAL_XDC_EXPLORER_CHECK": 8, "MANUAL_ALLTRA_EXPLORER_CHECK": 9, "REMOVE_NON_CONTRACT": 10, } SUMMARY_LINE_PREFIX = "- Current deduped closure state:" def load_json(path: Path) -> dict[str, Any]: return json.loads(path.read_text(encoding="utf-8")) def normalize_address(address: str) -> str: addr = (address or "").lower() return addr if addr.startswith("0x") else f"0x{addr}" def is_rate_limited(msg: str) -> bool: m = (msg or "").lower() return "rate limit" in m or "max calls" in m or "3/sec" in m def is_source_published(row: dict[str, Any]) -> bool: return any( "yes" in str(row.get(key, "")).lower() for key in ("source_sourcify", "source_blockscout", "source_etherscan") ) def chain_sort_key(chain: str) -> tuple[int, str]: return (int(chain) if str(chain).isdigit() else 999_999, str(chain)) def build_duplicate_key_set(dup: dict[str, Any]) -> set[tuple[str, str]]: keys: set[tuple[str, str]] = set() for match in dup.get("matches", []): inc = match.get("incomplete", {}) keys.add((str(inc.get("chain", "")), normalize_address(inc.get("address", "")))) return keys def build_prior_published_map(old: dict[str, Any]) -> dict[tuple[str, str], dict[str, Any]]: out: dict[tuple[str, str], dict[str, Any]] = {} for row in old.get("rows", []): if row.get("completion_status") != "COMPLETE_SOURCE_PUBLISHED": continue out[(str(row.get("chain", "")), normalize_address(row.get("address", "")))] = row return out def explorer_source_state(row: dict[str, Any]) -> str: parts = [] if row.get("chain") == "138": if row.get("source_blockscout"): parts.append(str(row["source_blockscout"])) else: if row.get("source_etherscan"): parts.append(str(row["source_etherscan"])) if row.get("source_sourcify"): parts.append(str(row["source_sourcify"])) return "; ".join(parts) def classify_row( row: dict[str, Any], duplicate_keys: set[tuple[str, str]], prior_published: dict[tuple[str, str], dict[str, Any]], ) -> tuple[str, str]: chain = str(row.get("chain", "")) address = normalize_address(row.get("address", "")) labels = str(row.get("labels_merged", "")) provenance = str(row.get("provenance_merged", "")) etherscan = str(row.get("source_etherscan", "")) if row.get("code_on_chain") == "none": return ( "REMOVE_NON_CONTRACT", "No bytecode at latest block; remove from deployed-contract verification target or keep only as historical/mapping/failed-deploy evidence.", ) if is_source_published(row): return ("COMPLETE_SOURCE_PUBLISHED", "No action for source-publication closure.") key = (chain, address) if key in prior_published and is_rate_limited(etherscan): return ( "COMPLETE_SOURCE_PUBLISHED", "No action for source-publication closure (prior published state retained while explorer API was rate-limited).", ) if chain == "138" and address == "0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2": return ( "CLOSE_GENESIS_PREDEPLOY_MANUAL_IMPORT", "Treat as deployed genesis predeploy with canonical WETH9 runtime; do not redeploy. Use Blockscout manual/imported-source path if explorer source-published flag is mandatory.", ) if key in duplicate_keys: return ( "CLOSE_DUPLICATE_RUNTIME_MANUAL_SOURCE", "Runtime matches already-published implementation; can close as duplicate implementation, but chain explorer may still need manual source publication.", ) if chain == "1111": return ( "WAIT_WEMIXSCAN_API_KEY", "Load WEMIXSCAN_API_KEY and rerun smom-dbis-138/scripts/deployment/verify-wemix-bridges.sh.", ) if chain == "25": return ( "MANUAL_CRONOS_EXPLORER_IMPORT", "Use Cronos manual Standard JSON inputs under smom-dbis-138/.cronos-verify where available; mapping-only rows need explorer confirmation or removal from repo-owned target.", ) if chain == "50": return ( "MANUAL_XDC_EXPLORER_CHECK", "XDC rows are endpoint/config inventory here; verify manually on XDC explorer or keep outside repo-owned source-publication target.", ) if chain == "651940": return ( "MANUAL_ALLTRA_EXPLORER_CHECK", "All Mainnet/Alltra source API is unsupported by this automated pass; verify manually on its explorer.", ) if chain == "138" and address in SUBMITTED_PENDING_138: return ( "SUBMITTED_PENDING_BLOCKSCOUT", "Verification submission was accepted in the aggregate run; keep polling/importing until Blockscout getabi/is_verified flips true.", ) if chain == "138": return ( "NEEDS_BLOCKSCOUT_SOURCE_IMPORT", "Has bytecode on Chain 138 but no Blockscout ABI/source yet; submit exact source or use manual smart-contract import. Check deprecated/superseded labels before spending time.", ) if "token-mapping" in provenance.lower() or "(to)" in labels or "(from)" in labels: return ( "MAPPING_ONLY_SOURCE_MISSING", "This is a routing/mapping address, not necessarily repo-owned deployment. Confirm ownership/scope; if in scope, verify on the chain explorer; otherwise mark external mapping.", ) return ( "NEEDS_ETHERSCAN_FAMILY_VERIFY", "Has bytecode but no automated source result; verify with the relevant Etherscan-family explorer or document as external.", ) def build_rows( snap: dict[str, Any], duplicate_keys: set[tuple[str, str]], prior_published: dict[tuple[str, str], dict[str, Any]], ) -> list[dict[str, Any]]: rows: list[dict[str, Any]] = [] for srow in snap.get("rows", []): chain = str(srow.get("chain", "")) address = normalize_address(srow.get("address", "")) status, action = classify_row(srow, duplicate_keys, prior_published) rows.append( { "chain": chain, "address": address, "labels": srow.get("labels_merged", ""), "code_on_chain": srow.get("code_on_chain"), "code_detail": srow.get("code_detail", ""), "source_sourcify": srow.get("source_sourcify", ""), "source_blockscout": srow.get("source_blockscout", ""), "source_etherscan": srow.get("source_etherscan", ""), "completion_status": status, "recommended_action": action, "provenance": srow.get("provenance_merged", ""), "explorer_source_state": explorer_source_state({**srow, "chain": chain}), } ) rows.sort( key=lambda r: ( STATUS_ORDER.get(r["completion_status"], 99), chain_sort_key(r["chain"]), r["address"], ) ) return rows def esc(text: str, limit: int = 700) -> str: return str(text).replace("|", "\\|").replace("\n", " ")[:limit] def build_counts(rows: list[dict[str, Any]]) -> tuple[Counter, dict[str, dict[str, int]]]: counts = Counter(r["completion_status"] for r in rows) by_chain_counter: dict[str, Counter] = defaultdict(Counter) for row in rows: by_chain_counter[row["chain"]][row["completion_status"]] += 1 by_chain: dict[str, dict[str, int]] = {} for chain in sorted(by_chain_counter, key=chain_sort_key): by_chain[chain] = dict( sorted(by_chain_counter[chain].items(), key=lambda item: (-item[1], item[0])) ) return counts, by_chain def write_json(rows: list[dict[str, Any]], counts: Counter, by_chain: dict[str, dict[str, int]]) -> str: generated = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()) not_verified_count = sum( 1 for row in rows if row["completion_status"] != "COMPLETE_SOURCE_PUBLISHED" ) payload = { "generated_utc": generated, "source_snapshot": "contract-inventory-onchain-snapshot.json", "dedupe_key": "chain_id + address lowercase", "unique_pairs": len(rows), "not_verified_rule": "completion_status != COMPLETE_SOURCE_PUBLISHED", "not_verified_count": not_verified_count, "counts_by_completion_status": { key: counts[key] for key, _value in sorted(counts.items(), key=lambda item: (-item[1], item[0])) }, "counts_by_chain": by_chain, "rows": rows, } OUT_JSON.write_text(json.dumps(payload, indent=2) + "\n", encoding="utf-8") return generated def build_markdown(rows: list[dict[str, Any]], counts: Counter, generated: str) -> None: completed = [row for row in rows if row["completion_status"] == "COMPLETE_SOURCE_PUBLISHED"] open_rows = [row for row in rows if row["completion_status"] != "COMPLETE_SOURCE_PUBLISHED"] completed_by_chain = Counter(row["chain"] for row in completed) lines: list[str] = [] lines.append("# Deployed contracts completion matrix") lines.append("") lines.append(f"**Generated (UTC):** {generated}") lines.append("**Source snapshot:** [`contract-inventory-onchain-snapshot.json`](contract-inventory-onchain-snapshot.json)") lines.append("**Machine-readable matrix:** [`deployed-contracts-completion-matrix.json`](deployed-contracts-completion-matrix.json)") lines.append("**Regenerate:** `python3 scripts/verify/build-deduped-onchain-inventory.py && python3 scripts/verify/build-inventory-completion-matrix.py`") lines.append("") lines.append("## Executive status") lines.append("") lines.append(f"- Unique chain/address pairs checked: **{len(rows)}**") lines.append(f"- Not verified by matrix rule: **{len(open_rows)}**") for status, _count in sorted(counts.items(), key=lambda item: (-item[1], item[0])): lines.append(f"- {status}: **{counts[status]}**") lines.append("") lines.append("## Not Verified Rule") lines.append("") lines.append("- In this matrix, **not verified** means `completion_status != COMPLETE_SOURCE_PUBLISHED`.") lines.append("- That includes true source-publication work, manual explorer workflows, and non-verification closeouts such as duplicates, genesis predeploys, and non-contract rows.") lines.append("") lines.append("## Recommendations") lines.append("") lines.append("- Treat `COMPLETE_SOURCE_PUBLISHED` as closed for the verification/publication list.") lines.append("- Treat Chain 138 WETH9 as a deployed genesis predeploy with canonical WETH9 runtime; only use manual/imported-source if Blockscout `is_verified=true` is a hard requirement.") lines.append("- Do not spend verification cycles on `REMOVE_NON_CONTRACT` rows unless the address was supposed to deploy; they have no bytecode and cannot be source-verified.") lines.append("- For Chain 138 rows marked `SUBMITTED_PENDING_BLOCKSCOUT`, keep polling or use Blockscout manual import; the previous proxy run accepted submissions but the API still reports not verified.") lines.append("- For mapping-only rows, decide whether the address is repo-owned. If not repo-owned, close as external/counterparty rather than forcing source publication.") lines.append("") lines.append("## Open / action rows") lines.append("") lines.append("| Ch | Address | Labels | Completion status | Recommended action | Explorer/source state |") lines.append("|---:|:---|:---|:---|:---|:---|") for row in open_rows: lines.append( f"| {row['chain']} | `{row['address']}` | {esc(row['labels'])} | `{row['completion_status']}` | {esc(row['recommended_action'])} | {esc(row['explorer_source_state'], 500)} |" ) lines.append("") lines.append("## Completed rows by chain") lines.append("") for chain in sorted(completed_by_chain, key=chain_sort_key): lines.append(f"- Chain {chain}: **{completed_by_chain[chain]}** source-published rows") lines.append("") OUT_MD.write_text("\n".join(lines), encoding="utf-8") def patch_unified_extended(counts: Counter, unique_pairs: int) -> None: if not UNIFIED_EXTENDED.is_file(): return text = UNIFIED_EXTENDED.read_text(encoding="utf-8") replacement = ( f"{SUMMARY_LINE_PREFIX} **{unique_pairs}** unique chain/address pairs; " f"**{counts['COMPLETE_SOURCE_PUBLISHED']}** source-published by automated API evidence, " f"**{counts['CLOSE_GENESIS_PREDEPLOY_MANUAL_IMPORT']}** Chain 138 genesis-predeploy closeout, " f"**{counts['CLOSE_DUPLICATE_RUNTIME_MANUAL_SOURCE']}** duplicate-runtime closeouts, " f"**{counts['SUBMITTED_PENDING_BLOCKSCOUT']}** Chain 138 submissions pending Blockscout settlement, " f"**{counts['NEEDS_BLOCKSCOUT_SOURCE_IMPORT']}** Chain 138 rows needing Blockscout source import/verification, " f"**{counts['MAPPING_ONLY_SOURCE_MISSING']}** mapping-only rows needing scope confirmation, " f"**{counts['NEEDS_ETHERSCAN_FAMILY_VERIFY']}** Etherscan-family rows needing verification, " f"**{counts['MANUAL_CRONOS_EXPLORER_IMPORT'] + counts['MANUAL_XDC_EXPLORER_CHECK'] + counts['MANUAL_ALLTRA_EXPLORER_CHECK']}** manual/unsupported explorer rows, " f"**{counts['WAIT_WEMIXSCAN_API_KEY']}** Wemix rows waiting on `WEMIXSCAN_API_KEY`, and " f"**{counts['REMOVE_NON_CONTRACT']}** no-bytecode rows that cannot be source-verified as contracts." ) pattern = rf"^{re.escape(SUMMARY_LINE_PREFIX)}.*$" updated = re.sub(pattern, replacement, text, count=1, flags=re.MULTILINE) if updated != text: UNIFIED_EXTENDED.write_text(updated, encoding="utf-8") def main() -> int: snap = load_json(SNAP) dup = load_json(DUP) if DUP.is_file() else {} old = load_json(OLD_MATRIX) if OLD_MATRIX.is_file() else {"rows": []} duplicate_keys = build_duplicate_key_set(dup) prior_published = build_prior_published_map(old) rows = build_rows(snap, duplicate_keys, prior_published) counts, by_chain = build_counts(rows) generated = write_json(rows, counts, by_chain) build_markdown(rows, counts, generated) patch_unified_extended(counts, len(rows)) print(f"Wrote {OUT_JSON}") print(f"Wrote {OUT_MD}") if UNIFIED_EXTENDED.is_file(): print(f"Patched {UNIFIED_EXTENDED}") return 0 if __name__ == "__main__": raise SystemExit(main())