Files
proxmox/scripts/verify/check-cw-multitoken-bridge-e2e-readiness.py
defiQUG 4ebf2d7902
Some checks failed
Deploy to Phoenix / validate (push) Failing after 1s
Deploy to Phoenix / deploy (push) Has been skipped
Deploy to Phoenix / deploy-atomic-swap-dapp (push) Has been skipped
Deploy to Phoenix / cloudflare (push) Has been skipped
chore(repo): sync operator workspace (config, scripts, docs, multi-chain)
Add optional Cosmos/Engine-X/act-runner templates, CWUSDC/EI-matrix tooling,
non-EVM route planner in multi-chain-execution (tests passing), token list and
extraction updates, and documentation (MetaMask matrix, GRU/CWUSDC packets).

Ignore institutional evidence tarballs/sha256 under reports/status.

Validated with: bash scripts/verify/run-all-validation.sh --skip-genesis

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-11 16:25:08 -07:00

365 lines
14 KiB
Python
Executable File

#!/usr/bin/env python3
"""Read-only CWMultiToken bridge readiness evidence.
The script checks deployed bridge contracts, configured routes, canonical-to-
mirrored token mappings, and cW token MINTER/BURNER roles. It does not send
transactions or trigger bridge transfers.
"""
from __future__ import annotations
import argparse
import json
import os
import subprocess
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
ROOT = Path(__file__).resolve().parents[2]
TOKEN_MAPPING = ROOT / "config" / "token-mapping-multichain.json"
DEFAULT_JSON = ROOT / "reports" / "status" / "cw-multitoken-bridge-e2e-latest.json"
DEFAULT_MD = ROOT / "reports" / "status" / "cw-multitoken-bridge-e2e-latest.md"
ACTIVE_CHAINS = [
(1, "Ethereum Mainnet", "MAINNET", "5009297550715157269"),
(10, "Optimism", "OPTIMISM", "3734403246176062136"),
(25, "Cronos", "CRONOS", "1456215246176062136"),
(56, "BSC", "BSC", "11344663589394136015"),
(100, "Gnosis", "GNOSIS", "465200170687744372"),
(137, "Polygon", "POLYGON", "4051577828743386545"),
(8453, "Base", "BASE", "15971525489660198786"),
(42161, "Arbitrum", "ARBITRUM", "4949039107694359620"),
(42220, "Celo", "CELO", "1346049177634351622"),
(43114, "Avalanche", "AVALANCHE", "6433500567565415381"),
]
RPC_CANDIDATES = {
1: ["ETHEREUM_MAINNET_RPC", "ETH_MAINNET_RPC_URL", "MAINNET_RPC_URL", "RPC_URL_MAINNET"],
10: ["OPTIMISM_RPC_URL", "OPTIMISM_MAINNET_RPC"],
25: ["CRONOS_RPC_URL", "CRONOS_MAINNET_RPC", "CRONOS_RPC"],
56: ["BSC_RPC_URL", "BSC_MAINNET_RPC"],
100: ["GNOSIS_RPC_URL", "GNOSIS_MAINNET_RPC", "GNOSIS_RPC"],
137: ["POLYGON_RPC_URL", "POLYGON_MAINNET_RPC"],
8453: ["BASE_RPC_URL", "BASE_MAINNET_RPC"],
42161: ["ARBITRUM_RPC_URL", "ARBITRUM_MAINNET_RPC"],
42220: ["CELO_RPC_URL", "CELO_MAINNET_RPC", "CELO_RPC"],
43114: ["AVALANCHE_RPC_URL", "AVALANCHE_MAINNET_RPC", "AVALANCHE_RPC"],
}
CORE_KEYS = {"Compliant_USDT_cW", "Compliant_USDC_cW"}
def rel(path: Path) -> str:
try:
return str(path.relative_to(ROOT))
except ValueError:
return str(path)
def env_first(keys: list[str]) -> str:
for key in keys:
value = os.environ.get(key, "").strip()
if value:
return value
return ""
def is_address(value: str) -> bool:
return value.startswith("0x") and len(value) == 42
def run_cast(args: list[str], timeout: int = 18) -> tuple[bool, str]:
try:
proc = subprocess.run(["cast", *args], cwd=ROOT, text=True, capture_output=True, timeout=timeout, check=False)
except Exception as exc: # noqa: BLE001 - evidence should capture any runner failure
return False, str(exc)
out = (proc.stdout or proc.stderr or "").strip()
return proc.returncode == 0, out
def cast_call(address: str, signature: str, params: list[str], rpc: str) -> tuple[bool, str]:
return run_cast(["call", address, signature, *params, "--rpc-url", rpc])
def cast_code(address: str, rpc: str) -> tuple[bool, str]:
return run_cast(["code", address, "--rpc-url", rpc])
def cast_keccak(value: str) -> str:
ok, out = run_cast(["keccak", value], timeout=5)
if not ok:
raise RuntimeError(f"cast keccak failed for {value}: {out}")
return out.split()[0]
def bool_from_cast(value: str) -> bool:
return value.strip().lower().splitlines()[-1:] == ["true"]
def address_in_cast(value: str, expected: str) -> bool:
return expected.lower() in value.lower()
def load_token_rows(full_family: bool) -> dict[int, list[dict[str, str]]]:
data = json.loads(TOKEN_MAPPING.read_text())
out: dict[int, list[dict[str, str]]] = {}
for pair in data.get("pairs") or []:
if pair.get("fromChainId") != 138:
continue
chain_id = int(pair.get("toChainId"))
rows: list[dict[str, str]] = []
for token in pair.get("tokens") or []:
key = token.get("key", "")
if not key.endswith("_cW") and key not in CORE_KEYS:
continue
if not full_family and key not in CORE_KEYS:
continue
address_from = token.get("addressFrom", "")
address_to = token.get("addressTo", "")
if is_address(address_from) and is_address(address_to) and int(address_to, 16) != 0:
rows.append(
{
"key": key,
"name": token.get("name", key),
"canonical": address_from,
"mirrored": address_to,
}
)
if rows:
out[chain_id] = rows
return out
def check_l1(l1_bridge: str, rpc: str, token_rows: dict[int, list[dict[str, str]]]) -> dict[str, Any]:
result: dict[str, Any] = {
"address": l1_bridge,
"rpcConfigured": bool(rpc),
"hasCode": False,
"sendRouterReadable": False,
"receiveRouterReadable": False,
"destinationChecks": [],
"passed": False,
"errors": [],
}
if not is_address(l1_bridge):
result["errors"].append("CW_L1_BRIDGE_CHAIN138 is unset or invalid.")
return result
if not rpc:
result["errors"].append("RPC_URL_138/CHAIN138_RPC is unset.")
return result
ok, code = cast_code(l1_bridge, rpc)
result["hasCode"] = ok and code not in ("", "0x")
for field in ["sendRouter", "receiveRouter"]:
ok, out = cast_call(l1_bridge, f"{field}()(address)", [], rpc)
result[f"{field}Readable"] = ok and is_address(out.splitlines()[-1].strip())
result[f"{field}"] = out.splitlines()[-1].strip() if ok and out else ""
for chain_id, _, _, selector in ACTIVE_CHAINS:
rows = token_rows.get(chain_id) or []
for token in rows:
ok, out = cast_call(
l1_bridge,
"destinations(address,uint64)((address,bool))",
[token["canonical"], selector],
rpc,
)
result["destinationChecks"].append(
{
"chainId": chain_id,
"selector": selector,
"token": token["key"],
"canonical": token["canonical"],
"raw": out,
"configured": ok and "true" in out.lower(),
}
)
result["passed"] = (
result["hasCode"]
and result["sendRouterReadable"]
and result["receiveRouterReadable"]
and all(x["configured"] for x in result["destinationChecks"])
)
return result
def check_chain(
chain_id: int,
name: str,
suffix: str,
selector: str,
token_rows: list[dict[str, str]],
minter_role: str,
burner_role: str,
) -> dict[str, Any]:
rpc = env_first(RPC_CANDIDATES[chain_id])
bridge = os.environ.get(f"CW_BRIDGE_{suffix}", "").strip()
result: dict[str, Any] = {
"chainId": chain_id,
"network": name,
"selector": selector,
"bridge": bridge,
"rpcConfigured": bool(rpc),
"hasCode": False,
"sendRouterReadable": False,
"receiveRouterReadable": False,
"feeTokenReadable": False,
"tokenPairChecks": [],
"roleChecks": [],
"destination138": {},
"passed": False,
"errors": [],
}
if not rpc:
result["errors"].append("RPC unset.")
return result
if not is_address(bridge):
result["errors"].append(f"CW_BRIDGE_{suffix} is unset or invalid.")
return result
ok, code = cast_code(bridge, rpc)
result["hasCode"] = ok and code not in ("", "0x")
for field in ["sendRouter", "receiveRouter", "feeToken"]:
ok, out = cast_call(bridge, f"{field}()(address)", [], rpc)
result[f"{field}Readable"] = ok and is_address(out.splitlines()[-1].strip())
result[field] = out.splitlines()[-1].strip() if ok and out else ""
ok, out = cast_call(bridge, "destinations(uint64)((address,bool))", ["138"], rpc)
result["destination138"] = {"raw": out, "configured": ok and "true" in out.lower()}
for token in token_rows:
ok, out = cast_call(bridge, "canonicalToMirrored(address)(address)", [token["canonical"]], rpc)
mapped = ok and address_in_cast(out, token["mirrored"])
result["tokenPairChecks"].append(
{
"token": token["key"],
"canonical": token["canonical"],
"expectedMirrored": token["mirrored"],
"raw": out,
"configured": mapped,
}
)
for role_name, role in [("MINTER_ROLE", minter_role), ("BURNER_ROLE", burner_role)]:
ok_role, out_role = cast_call(
token["mirrored"],
"hasRole(bytes32,address)(bool)",
[role, bridge],
rpc,
)
result["roleChecks"].append(
{
"token": token["key"],
"mirrored": token["mirrored"],
"role": role_name,
"holder": bridge,
"granted": ok_role and bool_from_cast(out_role),
"raw": out_role,
}
)
result["passed"] = (
result["hasCode"]
and result["sendRouterReadable"]
and result["receiveRouterReadable"]
and result["destination138"]["configured"]
and all(x["configured"] for x in result["tokenPairChecks"])
and all(x["granted"] for x in result["roleChecks"])
)
return result
def write_markdown(payload: dict[str, Any], path: Path) -> None:
lines = [
"# cW MultiToken Bridge E2E Readiness",
"",
f"- Generated: `{payload['generatedAt']}`",
f"- Scope: `{payload['scope']}`",
f"- All active chains passed: `{payload['summary']['allActiveChainsPassed']}`",
f"- L1 passed: `{payload['summary']['l1Passed']}`",
f"- Chain pass count: `{payload['summary']['passedChainCount']} / {payload['summary']['activeChainCount']}`",
"",
"## Chain Status",
"",
"| Chain | Network | Passed | Bridge | Notes |",
"|---:|---|---:|---|---|",
]
for row in payload["chains"]:
notes = []
if row["errors"]:
notes.extend(row["errors"])
failed_pairs = [x["token"] for x in row["tokenPairChecks"] if not x["configured"]]
failed_roles = [f"{x['token']} {x['role']}" for x in row["roleChecks"] if not x["granted"]]
if failed_pairs:
notes.append("missing token pairs: " + ", ".join(failed_pairs[:6]))
if failed_roles:
notes.append("missing roles: " + ", ".join(failed_roles[:6]))
if not row["destination138"].get("configured"):
notes.append("destination 138 not configured")
lines.append(
f"| {row['chainId']} | {row['network']} | `{row['passed']}` | `{row['bridge'] or '<unset>'}` | {'; '.join(notes) or 'ok'} |"
)
lines.extend(["", "## L1", ""])
l1 = payload["l1"]
lines.append(f"- Bridge: `{l1['address']}`")
lines.append(f"- Passed: `{l1['passed']}`")
if l1["errors"]:
lines.append(f"- Errors: `{'; '.join(l1['errors'])}`")
missing = [f"{x['chainId']} {x['token']}" for x in l1["destinationChecks"] if not x["configured"]]
lines.append(f"- Missing destination checks: `{', '.join(missing[:30]) if missing else 'none'}`")
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text("\n".join(lines) + "\n")
def main() -> int:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--full-family", action="store_true", help="Check every cW mapping, not only cWUSDT/cWUSDC canary routes.")
parser.add_argument("--json-out", type=Path, default=DEFAULT_JSON)
parser.add_argument("--md-out", type=Path, default=DEFAULT_MD)
parser.add_argument("--strict", action="store_true")
args = parser.parse_args()
token_rows = load_token_rows(args.full_family)
minter_role = cast_keccak("MINTER_ROLE")
burner_role = cast_keccak("BURNER_ROLE")
l1_bridge = os.environ.get("CW_L1_BRIDGE_CHAIN138", "").strip()
rpc_138 = env_first(["RPC_URL_138", "CHAIN138_RPC", "CHAIN138_RPC_URL", "RPC_URL"])
l1 = check_l1(l1_bridge, rpc_138, token_rows)
chains = [
check_chain(chain_id, name, suffix, selector, token_rows.get(chain_id) or [], minter_role, burner_role)
for chain_id, name, suffix, selector in ACTIVE_CHAINS
]
passed = [row for row in chains if row["passed"]]
payload = {
"schema": "cw-multitoken-bridge-e2e-readiness/v1",
"generatedAt": datetime.now(timezone.utc).isoformat(),
"scope": "full-family" if args.full_family else "core-cwusdt-cwusdc",
"summary": {
"readyForProduction": l1["passed"] and len(passed) == len(chains),
"allActiveChainsPassed": l1["passed"] and len(passed) == len(chains),
"l1Passed": l1["passed"],
"activeChainCount": len(chains),
"passedChainCount": len(passed),
"failedChainIds": [row["chainId"] for row in chains if not row["passed"]],
},
"roles": {"MINTER_ROLE": minter_role, "BURNER_ROLE": burner_role},
"l1": l1,
"chains": chains,
}
args.json_out.parent.mkdir(parents=True, exist_ok=True)
args.json_out.write_text(json.dumps(payload, indent=2) + "\n")
write_markdown(payload, args.md_out)
print(f"Wrote {rel(args.json_out)}")
print(f"Wrote {rel(args.md_out)}")
print(f"All active chains passed: {payload['summary']['allActiveChainsPassed']}")
if payload["summary"]["failedChainIds"]:
print("Failed chains: " + ", ".join(str(x) for x in payload["summary"]["failedChainIds"]))
if args.strict and not payload["summary"]["allActiveChainsPassed"]:
return 1
return 0
if __name__ == "__main__":
raise SystemExit(main())