Add optional Cosmos/Engine-X/act-runner templates, CWUSDC/EI-matrix tooling, non-EVM route planner in multi-chain-execution (tests passing), token list and extraction updates, and documentation (MetaMask matrix, GRU/CWUSDC packets). Ignore institutional evidence tarballs/sha256 under reports/status. Validated with: bash scripts/verify/run-all-validation.sh --skip-genesis Co-authored-by: Cursor <cursoragent@cursor.com>
365 lines
14 KiB
Python
Executable File
365 lines
14 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""Read-only CWMultiToken bridge readiness evidence.
|
|
|
|
The script checks deployed bridge contracts, configured routes, canonical-to-
|
|
mirrored token mappings, and cW token MINTER/BURNER roles. It does not send
|
|
transactions or trigger bridge transfers.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import os
|
|
import subprocess
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
|
|
ROOT = Path(__file__).resolve().parents[2]
|
|
TOKEN_MAPPING = ROOT / "config" / "token-mapping-multichain.json"
|
|
DEFAULT_JSON = ROOT / "reports" / "status" / "cw-multitoken-bridge-e2e-latest.json"
|
|
DEFAULT_MD = ROOT / "reports" / "status" / "cw-multitoken-bridge-e2e-latest.md"
|
|
|
|
ACTIVE_CHAINS = [
|
|
(1, "Ethereum Mainnet", "MAINNET", "5009297550715157269"),
|
|
(10, "Optimism", "OPTIMISM", "3734403246176062136"),
|
|
(25, "Cronos", "CRONOS", "1456215246176062136"),
|
|
(56, "BSC", "BSC", "11344663589394136015"),
|
|
(100, "Gnosis", "GNOSIS", "465200170687744372"),
|
|
(137, "Polygon", "POLYGON", "4051577828743386545"),
|
|
(8453, "Base", "BASE", "15971525489660198786"),
|
|
(42161, "Arbitrum", "ARBITRUM", "4949039107694359620"),
|
|
(42220, "Celo", "CELO", "1346049177634351622"),
|
|
(43114, "Avalanche", "AVALANCHE", "6433500567565415381"),
|
|
]
|
|
|
|
RPC_CANDIDATES = {
|
|
1: ["ETHEREUM_MAINNET_RPC", "ETH_MAINNET_RPC_URL", "MAINNET_RPC_URL", "RPC_URL_MAINNET"],
|
|
10: ["OPTIMISM_RPC_URL", "OPTIMISM_MAINNET_RPC"],
|
|
25: ["CRONOS_RPC_URL", "CRONOS_MAINNET_RPC", "CRONOS_RPC"],
|
|
56: ["BSC_RPC_URL", "BSC_MAINNET_RPC"],
|
|
100: ["GNOSIS_RPC_URL", "GNOSIS_MAINNET_RPC", "GNOSIS_RPC"],
|
|
137: ["POLYGON_RPC_URL", "POLYGON_MAINNET_RPC"],
|
|
8453: ["BASE_RPC_URL", "BASE_MAINNET_RPC"],
|
|
42161: ["ARBITRUM_RPC_URL", "ARBITRUM_MAINNET_RPC"],
|
|
42220: ["CELO_RPC_URL", "CELO_MAINNET_RPC", "CELO_RPC"],
|
|
43114: ["AVALANCHE_RPC_URL", "AVALANCHE_MAINNET_RPC", "AVALANCHE_RPC"],
|
|
}
|
|
|
|
CORE_KEYS = {"Compliant_USDT_cW", "Compliant_USDC_cW"}
|
|
|
|
|
|
def rel(path: Path) -> str:
|
|
try:
|
|
return str(path.relative_to(ROOT))
|
|
except ValueError:
|
|
return str(path)
|
|
|
|
|
|
def env_first(keys: list[str]) -> str:
|
|
for key in keys:
|
|
value = os.environ.get(key, "").strip()
|
|
if value:
|
|
return value
|
|
return ""
|
|
|
|
|
|
def is_address(value: str) -> bool:
|
|
return value.startswith("0x") and len(value) == 42
|
|
|
|
|
|
def run_cast(args: list[str], timeout: int = 18) -> tuple[bool, str]:
|
|
try:
|
|
proc = subprocess.run(["cast", *args], cwd=ROOT, text=True, capture_output=True, timeout=timeout, check=False)
|
|
except Exception as exc: # noqa: BLE001 - evidence should capture any runner failure
|
|
return False, str(exc)
|
|
out = (proc.stdout or proc.stderr or "").strip()
|
|
return proc.returncode == 0, out
|
|
|
|
|
|
def cast_call(address: str, signature: str, params: list[str], rpc: str) -> tuple[bool, str]:
|
|
return run_cast(["call", address, signature, *params, "--rpc-url", rpc])
|
|
|
|
|
|
def cast_code(address: str, rpc: str) -> tuple[bool, str]:
|
|
return run_cast(["code", address, "--rpc-url", rpc])
|
|
|
|
|
|
def cast_keccak(value: str) -> str:
|
|
ok, out = run_cast(["keccak", value], timeout=5)
|
|
if not ok:
|
|
raise RuntimeError(f"cast keccak failed for {value}: {out}")
|
|
return out.split()[0]
|
|
|
|
|
|
def bool_from_cast(value: str) -> bool:
|
|
return value.strip().lower().splitlines()[-1:] == ["true"]
|
|
|
|
|
|
def address_in_cast(value: str, expected: str) -> bool:
|
|
return expected.lower() in value.lower()
|
|
|
|
|
|
def load_token_rows(full_family: bool) -> dict[int, list[dict[str, str]]]:
|
|
data = json.loads(TOKEN_MAPPING.read_text())
|
|
out: dict[int, list[dict[str, str]]] = {}
|
|
for pair in data.get("pairs") or []:
|
|
if pair.get("fromChainId") != 138:
|
|
continue
|
|
chain_id = int(pair.get("toChainId"))
|
|
rows: list[dict[str, str]] = []
|
|
for token in pair.get("tokens") or []:
|
|
key = token.get("key", "")
|
|
if not key.endswith("_cW") and key not in CORE_KEYS:
|
|
continue
|
|
if not full_family and key not in CORE_KEYS:
|
|
continue
|
|
address_from = token.get("addressFrom", "")
|
|
address_to = token.get("addressTo", "")
|
|
if is_address(address_from) and is_address(address_to) and int(address_to, 16) != 0:
|
|
rows.append(
|
|
{
|
|
"key": key,
|
|
"name": token.get("name", key),
|
|
"canonical": address_from,
|
|
"mirrored": address_to,
|
|
}
|
|
)
|
|
if rows:
|
|
out[chain_id] = rows
|
|
return out
|
|
|
|
|
|
def check_l1(l1_bridge: str, rpc: str, token_rows: dict[int, list[dict[str, str]]]) -> dict[str, Any]:
|
|
result: dict[str, Any] = {
|
|
"address": l1_bridge,
|
|
"rpcConfigured": bool(rpc),
|
|
"hasCode": False,
|
|
"sendRouterReadable": False,
|
|
"receiveRouterReadable": False,
|
|
"destinationChecks": [],
|
|
"passed": False,
|
|
"errors": [],
|
|
}
|
|
if not is_address(l1_bridge):
|
|
result["errors"].append("CW_L1_BRIDGE_CHAIN138 is unset or invalid.")
|
|
return result
|
|
if not rpc:
|
|
result["errors"].append("RPC_URL_138/CHAIN138_RPC is unset.")
|
|
return result
|
|
|
|
ok, code = cast_code(l1_bridge, rpc)
|
|
result["hasCode"] = ok and code not in ("", "0x")
|
|
for field in ["sendRouter", "receiveRouter"]:
|
|
ok, out = cast_call(l1_bridge, f"{field}()(address)", [], rpc)
|
|
result[f"{field}Readable"] = ok and is_address(out.splitlines()[-1].strip())
|
|
result[f"{field}"] = out.splitlines()[-1].strip() if ok and out else ""
|
|
|
|
for chain_id, _, _, selector in ACTIVE_CHAINS:
|
|
rows = token_rows.get(chain_id) or []
|
|
for token in rows:
|
|
ok, out = cast_call(
|
|
l1_bridge,
|
|
"destinations(address,uint64)((address,bool))",
|
|
[token["canonical"], selector],
|
|
rpc,
|
|
)
|
|
result["destinationChecks"].append(
|
|
{
|
|
"chainId": chain_id,
|
|
"selector": selector,
|
|
"token": token["key"],
|
|
"canonical": token["canonical"],
|
|
"raw": out,
|
|
"configured": ok and "true" in out.lower(),
|
|
}
|
|
)
|
|
|
|
result["passed"] = (
|
|
result["hasCode"]
|
|
and result["sendRouterReadable"]
|
|
and result["receiveRouterReadable"]
|
|
and all(x["configured"] for x in result["destinationChecks"])
|
|
)
|
|
return result
|
|
|
|
|
|
def check_chain(
|
|
chain_id: int,
|
|
name: str,
|
|
suffix: str,
|
|
selector: str,
|
|
token_rows: list[dict[str, str]],
|
|
minter_role: str,
|
|
burner_role: str,
|
|
) -> dict[str, Any]:
|
|
rpc = env_first(RPC_CANDIDATES[chain_id])
|
|
bridge = os.environ.get(f"CW_BRIDGE_{suffix}", "").strip()
|
|
result: dict[str, Any] = {
|
|
"chainId": chain_id,
|
|
"network": name,
|
|
"selector": selector,
|
|
"bridge": bridge,
|
|
"rpcConfigured": bool(rpc),
|
|
"hasCode": False,
|
|
"sendRouterReadable": False,
|
|
"receiveRouterReadable": False,
|
|
"feeTokenReadable": False,
|
|
"tokenPairChecks": [],
|
|
"roleChecks": [],
|
|
"destination138": {},
|
|
"passed": False,
|
|
"errors": [],
|
|
}
|
|
if not rpc:
|
|
result["errors"].append("RPC unset.")
|
|
return result
|
|
if not is_address(bridge):
|
|
result["errors"].append(f"CW_BRIDGE_{suffix} is unset or invalid.")
|
|
return result
|
|
|
|
ok, code = cast_code(bridge, rpc)
|
|
result["hasCode"] = ok and code not in ("", "0x")
|
|
for field in ["sendRouter", "receiveRouter", "feeToken"]:
|
|
ok, out = cast_call(bridge, f"{field}()(address)", [], rpc)
|
|
result[f"{field}Readable"] = ok and is_address(out.splitlines()[-1].strip())
|
|
result[field] = out.splitlines()[-1].strip() if ok and out else ""
|
|
|
|
ok, out = cast_call(bridge, "destinations(uint64)((address,bool))", ["138"], rpc)
|
|
result["destination138"] = {"raw": out, "configured": ok and "true" in out.lower()}
|
|
|
|
for token in token_rows:
|
|
ok, out = cast_call(bridge, "canonicalToMirrored(address)(address)", [token["canonical"]], rpc)
|
|
mapped = ok and address_in_cast(out, token["mirrored"])
|
|
result["tokenPairChecks"].append(
|
|
{
|
|
"token": token["key"],
|
|
"canonical": token["canonical"],
|
|
"expectedMirrored": token["mirrored"],
|
|
"raw": out,
|
|
"configured": mapped,
|
|
}
|
|
)
|
|
for role_name, role in [("MINTER_ROLE", minter_role), ("BURNER_ROLE", burner_role)]:
|
|
ok_role, out_role = cast_call(
|
|
token["mirrored"],
|
|
"hasRole(bytes32,address)(bool)",
|
|
[role, bridge],
|
|
rpc,
|
|
)
|
|
result["roleChecks"].append(
|
|
{
|
|
"token": token["key"],
|
|
"mirrored": token["mirrored"],
|
|
"role": role_name,
|
|
"holder": bridge,
|
|
"granted": ok_role and bool_from_cast(out_role),
|
|
"raw": out_role,
|
|
}
|
|
)
|
|
|
|
result["passed"] = (
|
|
result["hasCode"]
|
|
and result["sendRouterReadable"]
|
|
and result["receiveRouterReadable"]
|
|
and result["destination138"]["configured"]
|
|
and all(x["configured"] for x in result["tokenPairChecks"])
|
|
and all(x["granted"] for x in result["roleChecks"])
|
|
)
|
|
return result
|
|
|
|
|
|
def write_markdown(payload: dict[str, Any], path: Path) -> None:
|
|
lines = [
|
|
"# cW MultiToken Bridge E2E Readiness",
|
|
"",
|
|
f"- Generated: `{payload['generatedAt']}`",
|
|
f"- Scope: `{payload['scope']}`",
|
|
f"- All active chains passed: `{payload['summary']['allActiveChainsPassed']}`",
|
|
f"- L1 passed: `{payload['summary']['l1Passed']}`",
|
|
f"- Chain pass count: `{payload['summary']['passedChainCount']} / {payload['summary']['activeChainCount']}`",
|
|
"",
|
|
"## Chain Status",
|
|
"",
|
|
"| Chain | Network | Passed | Bridge | Notes |",
|
|
"|---:|---|---:|---|---|",
|
|
]
|
|
for row in payload["chains"]:
|
|
notes = []
|
|
if row["errors"]:
|
|
notes.extend(row["errors"])
|
|
failed_pairs = [x["token"] for x in row["tokenPairChecks"] if not x["configured"]]
|
|
failed_roles = [f"{x['token']} {x['role']}" for x in row["roleChecks"] if not x["granted"]]
|
|
if failed_pairs:
|
|
notes.append("missing token pairs: " + ", ".join(failed_pairs[:6]))
|
|
if failed_roles:
|
|
notes.append("missing roles: " + ", ".join(failed_roles[:6]))
|
|
if not row["destination138"].get("configured"):
|
|
notes.append("destination 138 not configured")
|
|
lines.append(
|
|
f"| {row['chainId']} | {row['network']} | `{row['passed']}` | `{row['bridge'] or '<unset>'}` | {'; '.join(notes) or 'ok'} |"
|
|
)
|
|
lines.extend(["", "## L1", ""])
|
|
l1 = payload["l1"]
|
|
lines.append(f"- Bridge: `{l1['address']}`")
|
|
lines.append(f"- Passed: `{l1['passed']}`")
|
|
if l1["errors"]:
|
|
lines.append(f"- Errors: `{'; '.join(l1['errors'])}`")
|
|
missing = [f"{x['chainId']} {x['token']}" for x in l1["destinationChecks"] if not x["configured"]]
|
|
lines.append(f"- Missing destination checks: `{', '.join(missing[:30]) if missing else 'none'}`")
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
|
path.write_text("\n".join(lines) + "\n")
|
|
|
|
|
|
def main() -> int:
|
|
parser = argparse.ArgumentParser(description=__doc__)
|
|
parser.add_argument("--full-family", action="store_true", help="Check every cW mapping, not only cWUSDT/cWUSDC canary routes.")
|
|
parser.add_argument("--json-out", type=Path, default=DEFAULT_JSON)
|
|
parser.add_argument("--md-out", type=Path, default=DEFAULT_MD)
|
|
parser.add_argument("--strict", action="store_true")
|
|
args = parser.parse_args()
|
|
|
|
token_rows = load_token_rows(args.full_family)
|
|
minter_role = cast_keccak("MINTER_ROLE")
|
|
burner_role = cast_keccak("BURNER_ROLE")
|
|
l1_bridge = os.environ.get("CW_L1_BRIDGE_CHAIN138", "").strip()
|
|
rpc_138 = env_first(["RPC_URL_138", "CHAIN138_RPC", "CHAIN138_RPC_URL", "RPC_URL"])
|
|
l1 = check_l1(l1_bridge, rpc_138, token_rows)
|
|
chains = [
|
|
check_chain(chain_id, name, suffix, selector, token_rows.get(chain_id) or [], minter_role, burner_role)
|
|
for chain_id, name, suffix, selector in ACTIVE_CHAINS
|
|
]
|
|
passed = [row for row in chains if row["passed"]]
|
|
payload = {
|
|
"schema": "cw-multitoken-bridge-e2e-readiness/v1",
|
|
"generatedAt": datetime.now(timezone.utc).isoformat(),
|
|
"scope": "full-family" if args.full_family else "core-cwusdt-cwusdc",
|
|
"summary": {
|
|
"readyForProduction": l1["passed"] and len(passed) == len(chains),
|
|
"allActiveChainsPassed": l1["passed"] and len(passed) == len(chains),
|
|
"l1Passed": l1["passed"],
|
|
"activeChainCount": len(chains),
|
|
"passedChainCount": len(passed),
|
|
"failedChainIds": [row["chainId"] for row in chains if not row["passed"]],
|
|
},
|
|
"roles": {"MINTER_ROLE": minter_role, "BURNER_ROLE": burner_role},
|
|
"l1": l1,
|
|
"chains": chains,
|
|
}
|
|
args.json_out.parent.mkdir(parents=True, exist_ok=True)
|
|
args.json_out.write_text(json.dumps(payload, indent=2) + "\n")
|
|
write_markdown(payload, args.md_out)
|
|
print(f"Wrote {rel(args.json_out)}")
|
|
print(f"Wrote {rel(args.md_out)}")
|
|
print(f"All active chains passed: {payload['summary']['allActiveChainsPassed']}")
|
|
if payload["summary"]["failedChainIds"]:
|
|
print("Failed chains: " + ", ".join(str(x) for x in payload["summary"]["failedChainIds"]))
|
|
if args.strict and not payload["summary"]["allActiveChainsPassed"]:
|
|
return 1
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|