""" Shared: parse DEPLOYED_CONTRACTS_UNIFIED_EXTENDED.md, eth_getCode, Sourcify, Blockscout 138, Etherscan V2. """ from __future__ import annotations import json import os import re import threading import time import urllib.error import urllib.parse import urllib.request from pathlib import Path from typing import Any ROOT = Path(__file__).resolve().parents[2] UNIFIED_EXTENDED_PATH = ROOT / "reports/inventory/DEPLOYED_CONTRACTS_UNIFIED_EXTENDED.md" RPC_BY_CHAIN: dict[str, str | None] = { "1": os.environ.get("ETHEREUM_MAINNET_RPC", "https://ethereum.publicnode.com"), "10": "https://mainnet.optimism.io", "25": "https://evm.cronos.org", "50": "https://rpc.xinfin.network", "56": "https://bsc-dataseed.binance.org", "100": "https://rpc.gnosischain.com", "137": "https://polygon-bor.publicnode.com", "138": os.environ.get("RPC_URL_138", "https://rpc-core.d-bis.org"), "1111": "https://api.wemix.com", "8453": "https://mainnet.base.org", "42161": "https://arb1.arbitrum.io/rpc", "42220": "https://forno.celo.org", "43114": "https://api.avax.network/ext/bc/C/rpc", "651940": "https://mainnet-rpc.alltra.global", } BLOCKSCOUT_138 = os.environ.get("BLOCKSCOUT_138_URL", "https://explorer.d-bis.org").rstrip( "/" ) SOURCIFY = "https://sourcify.dev/server" ETHERSCAN_V2 = "https://api.etherscan.io/v2/api" ES_CHAINS = {"1", "10", "56", "100", "137", "42161", "8453", "43114", "42220"} EXTRA_RPC_BY_CHAIN: dict[str, list[str]] = { "1": [ "https://eth.llamarpc.com", "https://cloudflare-eth.com", ], } def load_etherscan_key() -> str | None: for p in (ROOT / ".env", ROOT / "smom-dbis-138" / ".env"): if not p.is_file(): continue try: for line in p.read_text().splitlines(): line = line.strip() if not line.startswith("ETHERSCAN_API_KEY="): continue v = line.split("=", 1)[1].strip().strip('"').strip("'") if v and "your" not in v.lower() and "placeholder" not in v.lower(): return v except OSError: continue return os.environ.get("ETHERSCAN_API_KEY") def rpc_call(rpc_url: str, method: str, params: list[Any]) -> dict[str, Any] | None: body = json.dumps( {"jsonrpc": "2.0", "id": 1, "method": method, "params": params} ).encode() req = urllib.request.Request( rpc_url, data=body, headers={ "Content-Type": "application/json", "User-Agent": "proxmox-inventory/1.0 (inventory_onchain)", }, method="POST", ) try: with urllib.request.urlopen(req, timeout=25) as r: return json.loads(r.read().decode()) except (urllib.error.URLError, json.JSONDecodeError, TimeoutError, OSError, ValueError): return None def get_code(rpc_url: str, address: str, chain: str) -> tuple[str | None, str | None]: if not rpc_url: return None, "no RPC configured" urls: list[str] = [rpc_url] for u in EXTRA_RPC_BY_CHAIN.get(chain, []): if u not in urls: urls.append(u) for url in urls: for _attempt in range(3): j = rpc_call(url, "eth_getCode", [address, "latest"]) if j and "result" in j and not j.get("error"): res = j.get("result") if not res or res in ("0x", "0x0"): return "none", "no contract bytecode (EOA or empty slot)" return "yes", f"bytecode ~{len(res) // 2 - 1} bytes" time.sleep(0.3 * (_attempt + 1)) return None, "RPC error or no response" def _sourcify_label_from_json(chain: str, d: dict[str, Any]) -> str: if d.get("customCode") == "unsupported_chain": return "n/a (Sourcify: unsupported chain)" if "not found" in str(d.get("message", "")).lower() and d.get("match") is None: return "no — not in Sourcify" m, cr, rt = d.get("match"), d.get("creationMatch"), d.get("runtimeMatch") if m or cr or rt: st = m or cr or rt stv = st.get("status", "match") if isinstance(st, dict) else "match" return f"yes — Sourcify ({stv})"[:120] if m is None and cr is None and rt is None: return "no — not in Sourcify" if d.get("address") or str(d.get("chainId", "")) == str(chain): return "no — not in Sourcify" return "no — not in Sourcify (empty response)" def sourcify_status(chain: str, address: str) -> str: q = f"{SOURCIFY}/v2/contract/{chain}/{address}?fields=all" try: with urllib.request.urlopen(q, timeout=30) as r: d = json.loads(r.read().decode()) except urllib.error.HTTPError as e: b = (e.read().decode() if e.fp else "") or "" try: d = json.loads(b) except json.JSONDecodeError: return f"HTTP {e.code} (non-JSON body)" return _sourcify_label_from_json(chain, d) except Exception as e: return f"err {str(e)[:80]}" return _sourcify_label_from_json(chain, d) def blockscout_138_getabi(address: str) -> str: q = f"{BLOCKSCOUT_138}/api?module=contract&action=getabi&address={address}" try: with urllib.request.urlopen(q, timeout=25) as r: d = json.loads(r.read().decode()) except Exception as e: return f"err: {e!s}" if d.get("status") == "1" and d.get("result") and str(d["result"]).startswith("["): return "yes — source/ABI on Blockscout (verified in API sense)" if d.get("message") and "not verified" in d.get("message", "").lower(): return "no — not verified (Blockscout getabi)" return f"other: {str(d.get('message', d))[:100]}" _ETHERSCAN_V2_LOCK = threading.Lock() # Etherscan free tier: 3 calls/sec; default 0.5s = 2/sec with margin. Override: ETHERSCAN_V2_MIN_INTERVAL=0.45 _ETHERSCAN_V2_MIN_INTERVAL = float( os.environ.get("ETHERSCAN_V2_MIN_INTERVAL", "0.5") ) def etherscan_v2_getabi(api_key: str, chain: str, address: str) -> str: params = { "chainid": chain, "module": "contract", "action": "getabi", "address": address, "apikey": api_key, } url = f"{ETHERSCAN_V2}?{urllib.parse.urlencode(params)}" with _ETHERSCAN_V2_LOCK: time.sleep(_ETHERSCAN_V2_MIN_INTERVAL) try: with urllib.request.urlopen(url, timeout=25) as r: d = json.loads(r.read().decode()) except Exception as e: return f"err: {e!s}" if d.get("status") == "1" and d.get("result") and str(d["result"]).startswith("["): return "yes — Etherscan-family: verified (ABI returned)" err = str(d.get("result", d.get("message", ""))) if "API Key" in err or "Invalid" in err and "key" in err.lower(): return "n/a (Etherscan key missing/invalid)" if "unsupported chainid" in err.lower() or "Missing or unsupported chainid" in err: return "n/a (Etherscan V2: this chainid not in API)" if "Max calls" in err or "rate" in err.lower() and "limit" in err.lower(): return f"no — {err[:120]}" return f"no — {err[:100]}" if err else "no/unknown" def _split_verified_provenance(tail: str) -> tuple[str, str]: t = tail.rstrip() if t.endswith("|"): t = t[:-1].rstrip() if t.startswith("token-mapping"): return "Not recorded in repo (mapping row)", t idx = t.rfind(" | ") if idx > 0: return t[:idx].strip(), t[idx + 3 :].strip() return t, "" def parse_table(path: Path) -> list[dict[str, str]]: """Rows include `row_no` (table # column) for extended live reports.""" rows: list[dict[str, str]] = [] for line in path.read_text().splitlines(): m2 = re.match( r"^\| (\d+) \| ([^|]+) \| ([^|]+) \| ([^|]+) \| `(0x[a-fA-F0-9]{40})` \| (.+) \|$", line, ) if not m2: continue ver, prov = _split_verified_provenance(m2.group(6)) prov_merged = f"{ver} | {prov}" if prov else ver rows.append( { "row_no": m2.group(1).strip(), "network": m2.group(2).strip(), "chain": m2.group(3).strip(), "name": m2.group(4).strip(), "address": m2.group(5).lower(), "provenance": prov_merged, } ) return rows def esc(s: str) -> str: return s.replace("|", "\\|")[:2000] def run_check(entry: dict[str, Any], es_key: str | None) -> dict[str, Any]: ch = entry["chain"] addr0 = "0x" + entry["address"].replace("0x", "") time.sleep(0.04) code_st, code_note = get_code(RPC_BY_CHAIN.get(ch) or "", addr0, ch) sfy = sourcify_status(ch, addr0) out: dict[str, Any] = { "chain": ch, "address": entry["address"], "code_on_chain": code_st, "code_detail": code_note, "source_sourcify": sfy, } if ch == "138": out["source_blockscout"] = blockscout_138_getabi(addr0) if es_key and ch in ES_CHAINS: out["source_etherscan"] = etherscan_v2_getabi(es_key, ch, addr0) parts: list[str] = [f"code={code_st}"] if "yes" in sfy and "Sourcify" in sfy: parts.append("Sourcify:ok") elif "not in Sourcify" in sfy or sfy.lower().strip().startswith("no"): parts.append("Sourcify:none") else: parts.append("Sourcify:see col") if ch == "138" and "source_blockscout" in out: parts.append(out["source_blockscout"][:80].replace(" | ", ";")) if "source_etherscan" in out: parts.append("ES:" + out["source_etherscan"][:60].replace(" | ", ";")) out["verification_summary"] = " | ".join(parts)[:500] return out