""" On-disk cache for per-(chain, address) `run_check` results to avoid repeating Etherscan / Sourcify / RPC on every inventory regen. The cache is intentionally tied to the effective explorer/RPC endpoints used by the inventory scripts. A prior run against a public Blockscout or a different RPC should not mask later LAN-backed results for the same chain/address pair. """ from __future__ import annotations import json import os import threading import time from pathlib import Path from typing import Any # Same directory as other inventory artifacts; safe to commit for team "resume" runs. def cache_path_for(root: Path) -> Path: return root / "reports" / "inventory" / "contract-inventory-onchain-check-cache.json" CACHE_VERSION = 2 _write_lock = threading.Lock() def _runtime_fingerprint() -> dict[str, str]: """ Capture the endpoint settings that materially affect explorer/on-chain verification results. Keep this small and explicit so cache invalidation is predictable. """ return { "ETHEREUM_MAINNET_RPC": os.environ.get( "ETHEREUM_MAINNET_RPC", "https://ethereum.publicnode.com" ), "RPC_URL_138": os.environ.get("RPC_URL_138", "https://rpc-core.d-bis.org"), "BLOCKSCOUT_138_URL": os.environ.get( "BLOCKSCOUT_138_URL", "https://explorer.d-bis.org" ).rstrip("/"), } def _key(chain: str, address: str) -> str: a = (address or "").lower().replace("0x", "") return f"{str(chain).strip()}:{a}" if a else f"{str(chain).strip()}:-" def load_check_cache(path: Path) -> dict[str, Any]: if not path.is_file(): return { "version": CACHE_VERSION, "updated_utc": None, "runtime_fingerprint": _runtime_fingerprint(), "keys": {}, } try: data = json.loads(path.read_text(encoding="utf-8")) except (json.JSONDecodeError, OSError, UnicodeDecodeError): return { "version": CACHE_VERSION, "updated_utc": None, "runtime_fingerprint": _runtime_fingerprint(), "keys": {}, } if not isinstance(data, dict) or "keys" not in data: return { "version": CACHE_VERSION, "updated_utc": None, "runtime_fingerprint": _runtime_fingerprint(), "keys": {}, } if data.get("version") != CACHE_VERSION: return { "version": CACHE_VERSION, "updated_utc": data.get("updated_utc"), "runtime_fingerprint": _runtime_fingerprint(), "keys": {}, } if data.get("runtime_fingerprint") != _runtime_fingerprint(): return { "version": CACHE_VERSION, "updated_utc": data.get("updated_utc"), "runtime_fingerprint": _runtime_fingerprint(), "keys": {}, } return data def _atomic_write(path: Path, text: str) -> None: path.parent.mkdir(parents=True, exist_ok=True) tmp = path.with_name(path.name + ".tmp") try: tmp.write_text(text, encoding="utf-8") os.replace(tmp, path) finally: if tmp.is_file(): try: tmp.unlink() except OSError: pass def save_check_cache(path: Path, data: dict[str, Any]) -> None: data = dict(data) data["version"] = CACHE_VERSION data["updated_utc"] = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()) data["runtime_fingerprint"] = _runtime_fingerprint() with _write_lock: _atomic_write(path, json.dumps(data, indent=2) + "\n") def should_refetch_transient(cached: dict[str, Any] | None) -> bool: """True if a previous run likely hit rate limits or RPC flakiness and should be retried.""" if not cached: return False s_es = str(cached.get("source_etherscan") or "") s_code = str(cached.get("code_detail") or "") s_sfy = str(cached.get("source_sourcify") or "") s_bs = str(cached.get("source_blockscout") or "") for s in (s_es, s_code, s_sfy, s_bs): low = s.lower() if "max calls" in low or "3/sec" in low: return True if "per sec" in low and "rate" in low: return True if "rate" in low and "limit" in low: return True if cached.get("code_on_chain") is None and "rpc" in s_code.lower(): return True if "http error" in s_sfy.lower() and "429" in s_sfy: return True if "http error" in s_bs.lower() and "403" in s_bs: return True return False def get_cached_for_entry( keys_store: dict[str, Any], chain: str, address: str ) -> dict[str, Any] | None: k = _key(chain, address) ent = keys_store.get(k) return ent if isinstance(ent, dict) else None def set_cached_for_entry( data: dict[str, Any], result: dict[str, Any] ) -> None: ch = str(result.get("chain", "")) addr = str(result.get("address", "")) if not ch or not addr: return k = _key(ch, addr) st = data.setdefault("keys", {}) if not isinstance(st, dict): data["keys"] = st = {} st[k] = {kk: result[kk] for kk in result if not str(kk).startswith("_")} def run_checks_with_cache( to_run: list[dict[str, Any]], es_key: str | None, run_check: Any, cache_path: Path, use_cache: bool, refresh_transient: bool, max_workers: int, progress_every: int = 50, ) -> dict[tuple[str, str], dict[str, Any]]: """ Load `run_check` results for each entry in to_run, using on-disk cache on cache hits to avoid Etherscan / Sourcify / RPC re-queries. """ from concurrent.futures import ThreadPoolExecutor, as_completed data: dict[str, Any] if use_cache: data = load_check_cache(cache_path) else: data = { "version": CACHE_VERSION, "updated_utc": None, "runtime_fingerprint": _runtime_fingerprint(), "keys": {}, } keys: dict[str, Any] = data.setdefault("keys", {}) # type: ignore[assignment] if not isinstance(keys, dict): data["keys"] = keys = {} to_fetch: list[dict[str, Any]] = [] for e in to_run: c = get_cached_for_entry(keys, e["chain"], e["address"]) if not use_cache: to_fetch.append(e) elif c is None: to_fetch.append(e) elif refresh_transient and should_refetch_transient(c): to_fetch.append(e) n_hits = len(to_run) - len(to_fetch) print( f" cache: {n_hits} hit(s), {len(to_fetch)} fetch(es) (resume saves Etherscan + other APIs on hits)" ) if to_fetch: with ThreadPoolExecutor(max_workers=max_workers) as ex: futs = {ex.submit(run_check, dict(e), es_key): e for e in to_fetch} n = 0 for f in as_completed(futs): r = f.result() set_cached_for_entry(data, r) n += 1 if n % 5 == 0 or n == len(futs): save_check_cache(cache_path, data) if n % progress_every == 0: print(f" progress {n}/{len(futs)}") save_check_cache(cache_path, data) by_addr: dict[tuple[str, str], dict[str, Any]] = {} for e in to_run: c = get_cached_for_entry(keys, e["chain"], e["address"]) if c is None: raise RuntimeError( f"Internal error: no cache after run for {e.get('chain')} {e.get('address')}" ) by_addr[(e["chain"], e["address"].lower())] = c if to_run and not to_fetch: print(f" (all {len(to_run)} pairs served from cache; 0 fetches this run)") return by_addr