#!/usr/bin/env python3 """Read-only CWMultiToken bridge readiness evidence. The script checks deployed bridge contracts, configured routes, canonical-to- mirrored token mappings, and cW token MINTER/BURNER roles. It does not send transactions or trigger bridge transfers. """ from __future__ import annotations import argparse import json import os import subprocess from datetime import datetime, timezone from pathlib import Path from typing import Any ROOT = Path(__file__).resolve().parents[2] TOKEN_MAPPING = ROOT / "config" / "token-mapping-multichain.json" DEFAULT_JSON = ROOT / "reports" / "status" / "cw-multitoken-bridge-e2e-latest.json" DEFAULT_MD = ROOT / "reports" / "status" / "cw-multitoken-bridge-e2e-latest.md" ACTIVE_CHAINS = [ (1, "Ethereum Mainnet", "MAINNET", "5009297550715157269"), (10, "Optimism", "OPTIMISM", "3734403246176062136"), (25, "Cronos", "CRONOS", "1456215246176062136"), (56, "BSC", "BSC", "11344663589394136015"), (100, "Gnosis", "GNOSIS", "465200170687744372"), (137, "Polygon", "POLYGON", "4051577828743386545"), (8453, "Base", "BASE", "15971525489660198786"), (42161, "Arbitrum", "ARBITRUM", "4949039107694359620"), (42220, "Celo", "CELO", "1346049177634351622"), (43114, "Avalanche", "AVALANCHE", "6433500567565415381"), ] RPC_CANDIDATES = { 1: ["ETHEREUM_MAINNET_RPC", "ETH_MAINNET_RPC_URL", "MAINNET_RPC_URL", "RPC_URL_MAINNET"], 10: ["OPTIMISM_RPC_URL", "OPTIMISM_MAINNET_RPC"], 25: ["CRONOS_RPC_URL", "CRONOS_MAINNET_RPC", "CRONOS_RPC"], 56: ["BSC_RPC_URL", "BSC_MAINNET_RPC"], 100: ["GNOSIS_RPC_URL", "GNOSIS_MAINNET_RPC", "GNOSIS_RPC"], 137: ["POLYGON_RPC_URL", "POLYGON_MAINNET_RPC"], 8453: ["BASE_RPC_URL", "BASE_MAINNET_RPC"], 42161: ["ARBITRUM_RPC_URL", "ARBITRUM_MAINNET_RPC"], 42220: ["CELO_RPC_URL", "CELO_MAINNET_RPC", "CELO_RPC"], 43114: ["AVALANCHE_RPC_URL", "AVALANCHE_MAINNET_RPC", "AVALANCHE_RPC"], } CORE_KEYS = {"Compliant_USDT_cW", "Compliant_USDC_cW"} def rel(path: Path) -> str: try: return str(path.relative_to(ROOT)) except ValueError: return str(path) def env_first(keys: list[str]) -> str: for key in keys: value = os.environ.get(key, "").strip() if value: return value return "" def is_address(value: str) -> bool: return value.startswith("0x") and len(value) == 42 def run_cast(args: list[str], timeout: int = 18) -> tuple[bool, str]: try: proc = subprocess.run(["cast", *args], cwd=ROOT, text=True, capture_output=True, timeout=timeout, check=False) except Exception as exc: # noqa: BLE001 - evidence should capture any runner failure return False, str(exc) out = (proc.stdout or proc.stderr or "").strip() return proc.returncode == 0, out def cast_call(address: str, signature: str, params: list[str], rpc: str) -> tuple[bool, str]: return run_cast(["call", address, signature, *params, "--rpc-url", rpc]) def cast_code(address: str, rpc: str) -> tuple[bool, str]: return run_cast(["code", address, "--rpc-url", rpc]) def cast_keccak(value: str) -> str: ok, out = run_cast(["keccak", value], timeout=5) if not ok: raise RuntimeError(f"cast keccak failed for {value}: {out}") return out.split()[0] def bool_from_cast(value: str) -> bool: return value.strip().lower().splitlines()[-1:] == ["true"] def address_in_cast(value: str, expected: str) -> bool: return expected.lower() in value.lower() def load_token_rows(full_family: bool) -> dict[int, list[dict[str, str]]]: data = json.loads(TOKEN_MAPPING.read_text()) out: dict[int, list[dict[str, str]]] = {} for pair in data.get("pairs") or []: if pair.get("fromChainId") != 138: continue chain_id = int(pair.get("toChainId")) rows: list[dict[str, str]] = [] for token in pair.get("tokens") or []: key = token.get("key", "") if not key.endswith("_cW") and key not in CORE_KEYS: continue if not full_family and key not in CORE_KEYS: continue address_from = token.get("addressFrom", "") address_to = token.get("addressTo", "") if is_address(address_from) and is_address(address_to) and int(address_to, 16) != 0: rows.append( { "key": key, "name": token.get("name", key), "canonical": address_from, "mirrored": address_to, } ) if rows: out[chain_id] = rows return out def check_l1(l1_bridge: str, rpc: str, token_rows: dict[int, list[dict[str, str]]]) -> dict[str, Any]: result: dict[str, Any] = { "address": l1_bridge, "rpcConfigured": bool(rpc), "hasCode": False, "sendRouterReadable": False, "receiveRouterReadable": False, "destinationChecks": [], "passed": False, "errors": [], } if not is_address(l1_bridge): result["errors"].append("CW_L1_BRIDGE_CHAIN138 is unset or invalid.") return result if not rpc: result["errors"].append("RPC_URL_138/CHAIN138_RPC is unset.") return result ok, code = cast_code(l1_bridge, rpc) result["hasCode"] = ok and code not in ("", "0x") for field in ["sendRouter", "receiveRouter"]: ok, out = cast_call(l1_bridge, f"{field}()(address)", [], rpc) result[f"{field}Readable"] = ok and is_address(out.splitlines()[-1].strip()) result[f"{field}"] = out.splitlines()[-1].strip() if ok and out else "" for chain_id, _, _, selector in ACTIVE_CHAINS: rows = token_rows.get(chain_id) or [] for token in rows: ok, out = cast_call( l1_bridge, "destinations(address,uint64)((address,bool))", [token["canonical"], selector], rpc, ) result["destinationChecks"].append( { "chainId": chain_id, "selector": selector, "token": token["key"], "canonical": token["canonical"], "raw": out, "configured": ok and "true" in out.lower(), } ) result["passed"] = ( result["hasCode"] and result["sendRouterReadable"] and result["receiveRouterReadable"] and all(x["configured"] for x in result["destinationChecks"]) ) return result def check_chain( chain_id: int, name: str, suffix: str, selector: str, token_rows: list[dict[str, str]], minter_role: str, burner_role: str, ) -> dict[str, Any]: rpc = env_first(RPC_CANDIDATES[chain_id]) bridge = os.environ.get(f"CW_BRIDGE_{suffix}", "").strip() result: dict[str, Any] = { "chainId": chain_id, "network": name, "selector": selector, "bridge": bridge, "rpcConfigured": bool(rpc), "hasCode": False, "sendRouterReadable": False, "receiveRouterReadable": False, "feeTokenReadable": False, "tokenPairChecks": [], "roleChecks": [], "destination138": {}, "passed": False, "errors": [], } if not rpc: result["errors"].append("RPC unset.") return result if not is_address(bridge): result["errors"].append(f"CW_BRIDGE_{suffix} is unset or invalid.") return result ok, code = cast_code(bridge, rpc) result["hasCode"] = ok and code not in ("", "0x") for field in ["sendRouter", "receiveRouter", "feeToken"]: ok, out = cast_call(bridge, f"{field}()(address)", [], rpc) result[f"{field}Readable"] = ok and is_address(out.splitlines()[-1].strip()) result[field] = out.splitlines()[-1].strip() if ok and out else "" ok, out = cast_call(bridge, "destinations(uint64)((address,bool))", ["138"], rpc) result["destination138"] = {"raw": out, "configured": ok and "true" in out.lower()} for token in token_rows: ok, out = cast_call(bridge, "canonicalToMirrored(address)(address)", [token["canonical"]], rpc) mapped = ok and address_in_cast(out, token["mirrored"]) result["tokenPairChecks"].append( { "token": token["key"], "canonical": token["canonical"], "expectedMirrored": token["mirrored"], "raw": out, "configured": mapped, } ) for role_name, role in [("MINTER_ROLE", minter_role), ("BURNER_ROLE", burner_role)]: ok_role, out_role = cast_call( token["mirrored"], "hasRole(bytes32,address)(bool)", [role, bridge], rpc, ) result["roleChecks"].append( { "token": token["key"], "mirrored": token["mirrored"], "role": role_name, "holder": bridge, "granted": ok_role and bool_from_cast(out_role), "raw": out_role, } ) result["passed"] = ( result["hasCode"] and result["sendRouterReadable"] and result["receiveRouterReadable"] and result["destination138"]["configured"] and all(x["configured"] for x in result["tokenPairChecks"]) and all(x["granted"] for x in result["roleChecks"]) ) return result def write_markdown(payload: dict[str, Any], path: Path) -> None: lines = [ "# cW MultiToken Bridge E2E Readiness", "", f"- Generated: `{payload['generatedAt']}`", f"- Scope: `{payload['scope']}`", f"- All active chains passed: `{payload['summary']['allActiveChainsPassed']}`", f"- L1 passed: `{payload['summary']['l1Passed']}`", f"- Chain pass count: `{payload['summary']['passedChainCount']} / {payload['summary']['activeChainCount']}`", "", "## Chain Status", "", "| Chain | Network | Passed | Bridge | Notes |", "|---:|---|---:|---|---|", ] for row in payload["chains"]: notes = [] if row["errors"]: notes.extend(row["errors"]) failed_pairs = [x["token"] for x in row["tokenPairChecks"] if not x["configured"]] failed_roles = [f"{x['token']} {x['role']}" for x in row["roleChecks"] if not x["granted"]] if failed_pairs: notes.append("missing token pairs: " + ", ".join(failed_pairs[:6])) if failed_roles: notes.append("missing roles: " + ", ".join(failed_roles[:6])) if not row["destination138"].get("configured"): notes.append("destination 138 not configured") lines.append( f"| {row['chainId']} | {row['network']} | `{row['passed']}` | `{row['bridge'] or ''}` | {'; '.join(notes) or 'ok'} |" ) lines.extend(["", "## L1", ""]) l1 = payload["l1"] lines.append(f"- Bridge: `{l1['address']}`") lines.append(f"- Passed: `{l1['passed']}`") if l1["errors"]: lines.append(f"- Errors: `{'; '.join(l1['errors'])}`") missing = [f"{x['chainId']} {x['token']}" for x in l1["destinationChecks"] if not x["configured"]] lines.append(f"- Missing destination checks: `{', '.join(missing[:30]) if missing else 'none'}`") path.parent.mkdir(parents=True, exist_ok=True) path.write_text("\n".join(lines) + "\n") def main() -> int: parser = argparse.ArgumentParser(description=__doc__) parser.add_argument("--full-family", action="store_true", help="Check every cW mapping, not only cWUSDT/cWUSDC canary routes.") parser.add_argument("--json-out", type=Path, default=DEFAULT_JSON) parser.add_argument("--md-out", type=Path, default=DEFAULT_MD) parser.add_argument("--strict", action="store_true") args = parser.parse_args() token_rows = load_token_rows(args.full_family) minter_role = cast_keccak("MINTER_ROLE") burner_role = cast_keccak("BURNER_ROLE") l1_bridge = os.environ.get("CW_L1_BRIDGE_CHAIN138", "").strip() rpc_138 = env_first(["RPC_URL_138", "CHAIN138_RPC", "CHAIN138_RPC_URL", "RPC_URL"]) l1 = check_l1(l1_bridge, rpc_138, token_rows) chains = [ check_chain(chain_id, name, suffix, selector, token_rows.get(chain_id) or [], minter_role, burner_role) for chain_id, name, suffix, selector in ACTIVE_CHAINS ] passed = [row for row in chains if row["passed"]] payload = { "schema": "cw-multitoken-bridge-e2e-readiness/v1", "generatedAt": datetime.now(timezone.utc).isoformat(), "scope": "full-family" if args.full_family else "core-cwusdt-cwusdc", "summary": { "readyForProduction": l1["passed"] and len(passed) == len(chains), "allActiveChainsPassed": l1["passed"] and len(passed) == len(chains), "l1Passed": l1["passed"], "activeChainCount": len(chains), "passedChainCount": len(passed), "failedChainIds": [row["chainId"] for row in chains if not row["passed"]], }, "roles": {"MINTER_ROLE": minter_role, "BURNER_ROLE": burner_role}, "l1": l1, "chains": chains, } args.json_out.parent.mkdir(parents=True, exist_ok=True) args.json_out.write_text(json.dumps(payload, indent=2) + "\n") write_markdown(payload, args.md_out) print(f"Wrote {rel(args.json_out)}") print(f"Wrote {rel(args.md_out)}") print(f"All active chains passed: {payload['summary']['allActiveChainsPassed']}") if payload["summary"]["failedChainIds"]: print("Failed chains: " + ", ".join(str(x) for x in payload["summary"]["failedChainIds"])) if args.strict and not payload["summary"]["allActiveChainsPassed"]: return 1 return 0 if __name__ == "__main__": raise SystemExit(main())