Files
proxmox/scripts/verify/inventory_onchain.py
2026-04-24 10:56:01 -07:00

262 lines
9.5 KiB
Python

"""
Shared: parse DEPLOYED_CONTRACTS_UNIFIED_EXTENDED.md, eth_getCode, Sourcify, Blockscout 138, Etherscan V2.
"""
from __future__ import annotations
import json
import os
import re
import threading
import time
import urllib.error
import urllib.parse
import urllib.request
from pathlib import Path
from typing import Any
ROOT = Path(__file__).resolve().parents[2]
UNIFIED_EXTENDED_PATH = ROOT / "reports/inventory/DEPLOYED_CONTRACTS_UNIFIED_EXTENDED.md"
RPC_BY_CHAIN: dict[str, str | None] = {
"1": os.environ.get("ETHEREUM_MAINNET_RPC", "https://ethereum.publicnode.com"),
"10": "https://mainnet.optimism.io",
"25": "https://evm.cronos.org",
"50": "https://rpc.xinfin.network",
"56": "https://bsc-dataseed.binance.org",
"100": "https://rpc.gnosischain.com",
"137": "https://polygon-bor.publicnode.com",
"138": os.environ.get("RPC_URL_138", "https://rpc-core.d-bis.org"),
"1111": "https://api.wemix.com",
"8453": "https://mainnet.base.org",
"42161": "https://arb1.arbitrum.io/rpc",
"42220": "https://forno.celo.org",
"43114": "https://api.avax.network/ext/bc/C/rpc",
"651940": "https://mainnet-rpc.alltra.global",
}
BLOCKSCOUT_138 = os.environ.get("BLOCKSCOUT_138_URL", "https://explorer.d-bis.org").rstrip(
"/"
)
SOURCIFY = "https://sourcify.dev/server"
ETHERSCAN_V2 = "https://api.etherscan.io/v2/api"
ES_CHAINS = {"1", "10", "56", "100", "137", "42161", "8453", "43114", "42220"}
EXTRA_RPC_BY_CHAIN: dict[str, list[str]] = {
"1": [
"https://eth.llamarpc.com",
"https://cloudflare-eth.com",
],
}
def load_etherscan_key() -> str | None:
for p in (ROOT / ".env", ROOT / "smom-dbis-138" / ".env"):
if not p.is_file():
continue
try:
for line in p.read_text().splitlines():
line = line.strip()
if not line.startswith("ETHERSCAN_API_KEY="):
continue
v = line.split("=", 1)[1].strip().strip('"').strip("'")
if v and "your" not in v.lower() and "placeholder" not in v.lower():
return v
except OSError:
continue
return os.environ.get("ETHERSCAN_API_KEY")
def rpc_call(rpc_url: str, method: str, params: list[Any]) -> dict[str, Any] | None:
body = json.dumps(
{"jsonrpc": "2.0", "id": 1, "method": method, "params": params}
).encode()
req = urllib.request.Request(
rpc_url,
data=body,
headers={
"Content-Type": "application/json",
"User-Agent": "proxmox-inventory/1.0 (inventory_onchain)",
},
method="POST",
)
try:
with urllib.request.urlopen(req, timeout=25) as r:
return json.loads(r.read().decode())
except (urllib.error.URLError, json.JSONDecodeError, TimeoutError, OSError, ValueError):
return None
def get_code(rpc_url: str, address: str, chain: str) -> tuple[str | None, str | None]:
if not rpc_url:
return None, "no RPC configured"
urls: list[str] = [rpc_url]
for u in EXTRA_RPC_BY_CHAIN.get(chain, []):
if u not in urls:
urls.append(u)
for url in urls:
for _attempt in range(3):
j = rpc_call(url, "eth_getCode", [address, "latest"])
if j and "result" in j and not j.get("error"):
res = j.get("result")
if not res or res in ("0x", "0x0"):
return "none", "no contract bytecode (EOA or empty slot)"
return "yes", f"bytecode ~{len(res) // 2 - 1} bytes"
time.sleep(0.3 * (_attempt + 1))
return None, "RPC error or no response"
def _sourcify_label_from_json(chain: str, d: dict[str, Any]) -> str:
if d.get("customCode") == "unsupported_chain":
return "n/a (Sourcify: unsupported chain)"
if "not found" in str(d.get("message", "")).lower() and d.get("match") is None:
return "no — not in Sourcify"
m, cr, rt = d.get("match"), d.get("creationMatch"), d.get("runtimeMatch")
if m or cr or rt:
st = m or cr or rt
stv = st.get("status", "match") if isinstance(st, dict) else "match"
return f"yes — Sourcify ({stv})"[:120]
if m is None and cr is None and rt is None:
return "no — not in Sourcify"
if d.get("address") or str(d.get("chainId", "")) == str(chain):
return "no — not in Sourcify"
return "no — not in Sourcify (empty response)"
def sourcify_status(chain: str, address: str) -> str:
q = f"{SOURCIFY}/v2/contract/{chain}/{address}?fields=all"
try:
with urllib.request.urlopen(q, timeout=30) as r:
d = json.loads(r.read().decode())
except urllib.error.HTTPError as e:
b = (e.read().decode() if e.fp else "") or ""
try:
d = json.loads(b)
except json.JSONDecodeError:
return f"HTTP {e.code} (non-JSON body)"
return _sourcify_label_from_json(chain, d)
except Exception as e:
return f"err {str(e)[:80]}"
return _sourcify_label_from_json(chain, d)
def blockscout_138_getabi(address: str) -> str:
q = f"{BLOCKSCOUT_138}/api?module=contract&action=getabi&address={address}"
try:
with urllib.request.urlopen(q, timeout=25) as r:
d = json.loads(r.read().decode())
except Exception as e:
return f"err: {e!s}"
if d.get("status") == "1" and d.get("result") and str(d["result"]).startswith("["):
return "yes — source/ABI on Blockscout (verified in API sense)"
if d.get("message") and "not verified" in d.get("message", "").lower():
return "no — not verified (Blockscout getabi)"
return f"other: {str(d.get('message', d))[:100]}"
_ETHERSCAN_V2_LOCK = threading.Lock()
# Etherscan free tier: 3 calls/sec; default 0.5s = 2/sec with margin. Override: ETHERSCAN_V2_MIN_INTERVAL=0.45
_ETHERSCAN_V2_MIN_INTERVAL = float(
os.environ.get("ETHERSCAN_V2_MIN_INTERVAL", "0.5")
)
def etherscan_v2_getabi(api_key: str, chain: str, address: str) -> str:
params = {
"chainid": chain,
"module": "contract",
"action": "getabi",
"address": address,
"apikey": api_key,
}
url = f"{ETHERSCAN_V2}?{urllib.parse.urlencode(params)}"
with _ETHERSCAN_V2_LOCK:
time.sleep(_ETHERSCAN_V2_MIN_INTERVAL)
try:
with urllib.request.urlopen(url, timeout=25) as r:
d = json.loads(r.read().decode())
except Exception as e:
return f"err: {e!s}"
if d.get("status") == "1" and d.get("result") and str(d["result"]).startswith("["):
return "yes — Etherscan-family: verified (ABI returned)"
err = str(d.get("result", d.get("message", "")))
if "API Key" in err or "Invalid" in err and "key" in err.lower():
return "n/a (Etherscan key missing/invalid)"
if "unsupported chainid" in err.lower() or "Missing or unsupported chainid" in err:
return "n/a (Etherscan V2: this chainid not in API)"
if "Max calls" in err or "rate" in err.lower() and "limit" in err.lower():
return f"no — {err[:120]}"
return f"no — {err[:100]}" if err else "no/unknown"
def _split_verified_provenance(tail: str) -> tuple[str, str]:
t = tail.rstrip()
if t.endswith("|"):
t = t[:-1].rstrip()
if t.startswith("token-mapping"):
return "Not recorded in repo (mapping row)", t
idx = t.rfind(" | ")
if idx > 0:
return t[:idx].strip(), t[idx + 3 :].strip()
return t, ""
def parse_table(path: Path) -> list[dict[str, str]]:
"""Rows include `row_no` (table # column) for extended live reports."""
rows: list[dict[str, str]] = []
for line in path.read_text().splitlines():
m2 = re.match(
r"^\| (\d+) \| ([^|]+) \| ([^|]+) \| ([^|]+) \| `(0x[a-fA-F0-9]{40})` \| (.+) \|$",
line,
)
if not m2:
continue
ver, prov = _split_verified_provenance(m2.group(6))
prov_merged = f"{ver} | {prov}" if prov else ver
rows.append(
{
"row_no": m2.group(1).strip(),
"network": m2.group(2).strip(),
"chain": m2.group(3).strip(),
"name": m2.group(4).strip(),
"address": m2.group(5).lower(),
"provenance": prov_merged,
}
)
return rows
def esc(s: str) -> str:
return s.replace("|", "\\|")[:2000]
def run_check(entry: dict[str, Any], es_key: str | None) -> dict[str, Any]:
ch = entry["chain"]
addr0 = "0x" + entry["address"].replace("0x", "")
time.sleep(0.04)
code_st, code_note = get_code(RPC_BY_CHAIN.get(ch) or "", addr0, ch)
sfy = sourcify_status(ch, addr0)
out: dict[str, Any] = {
"chain": ch,
"address": entry["address"],
"code_on_chain": code_st,
"code_detail": code_note,
"source_sourcify": sfy,
}
if ch == "138":
out["source_blockscout"] = blockscout_138_getabi(addr0)
if es_key and ch in ES_CHAINS:
out["source_etherscan"] = etherscan_v2_getabi(es_key, ch, addr0)
parts: list[str] = [f"code={code_st}"]
if "yes" in sfy and "Sourcify" in sfy:
parts.append("Sourcify:ok")
elif "not in Sourcify" in sfy or sfy.lower().strip().startswith("no"):
parts.append("Sourcify:none")
else:
parts.append("Sourcify:see col")
if ch == "138" and "source_blockscout" in out:
parts.append(out["source_blockscout"][:80].replace(" | ", ";"))
if "source_etherscan" in out:
parts.append("ES:" + out["source_etherscan"][:60].replace(" | ", ";"))
out["verification_summary"] = " | ".join(parts)[:500]
return out