Files
proxmox/scripts/verify/audit-cwusdc-mainnet-roles.py
defiQUG 4ebf2d7902
Some checks failed
Deploy to Phoenix / validate (push) Failing after 1s
Deploy to Phoenix / deploy (push) Has been skipped
Deploy to Phoenix / deploy-atomic-swap-dapp (push) Has been skipped
Deploy to Phoenix / cloudflare (push) Has been skipped
chore(repo): sync operator workspace (config, scripts, docs, multi-chain)
Add optional Cosmos/Engine-X/act-runner templates, CWUSDC/EI-matrix tooling,
non-EVM route planner in multi-chain-execution (tests passing), token list and
extraction updates, and documentation (MetaMask matrix, GRU/CWUSDC packets).

Ignore institutional evidence tarballs/sha256 under reports/status.

Validated with: bash scripts/verify/run-all-validation.sh --skip-genesis

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-11 16:25:08 -07:00

246 lines
10 KiB
Python

#!/usr/bin/env python3
"""Read-only role/control audit for Ethereum Mainnet cWUSDC."""
from __future__ import annotations
import argparse
import datetime as dt
import json
import os
import subprocess
import urllib.parse
import urllib.request
from pathlib import Path
from typing import Any
ROOT = Path(__file__).resolve().parents[2]
REPORT_JSON = ROOT / "reports" / "status" / "cwusdc-mainnet-role-audit-latest.json"
REPORT_MD = ROOT / "reports" / "status" / "cwusdc-mainnet-role-audit-latest.md"
CWUSDC = "0x2de5F116bFcE3d0f922d9C8351e0c5Fc24b9284a"
DEPLOYER = "0x4A666F96fC8764181194447A7dFdb7d471b301C8"
CW_BRIDGE_MAINNET_FALLBACK = "0x2bF74583206A49Be07E0E8A94197C12987AbD7B5"
ETHERSCAN_V2_API = "https://api.etherscan.io/v2/api"
def load_dotenv(path: Path) -> None:
if not path.exists():
return
for line in path.read_text().splitlines():
line = line.strip()
if not line or line.startswith("#") or "=" not in line:
continue
key, value = line.split("=", 1)
key = key.strip()
value = value.strip().strip('"').strip("'")
if key and key not in os.environ:
os.environ[key] = value
def cast_call(contract: str, signature: str, *args: str, rpc_url: str) -> str:
command = ["cast", "call", contract, signature, *args, "--rpc-url", rpc_url]
proc = subprocess.run(command, cwd=ROOT, text=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=False)
if proc.returncode != 0:
raise RuntimeError(proc.stderr.strip() or proc.stdout.strip())
return proc.stdout.strip()
def cast_keccak(signature: str) -> str:
proc = subprocess.run(["cast", "keccak", signature], cwd=ROOT, text=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=False)
if proc.returncode != 0:
raise RuntimeError(proc.stderr.strip() or proc.stdout.strip())
return proc.stdout.strip()
def fetch_json(url: str) -> Any:
req = urllib.request.Request(url, headers={"User-Agent": "dbis-cwusdc-role-audit/1.0"})
with urllib.request.urlopen(req, timeout=30) as response:
return json.loads(response.read().decode("utf-8"))
def etherscan_logs(api_key: str, address: str, topic0: str) -> list[dict[str, Any]]:
if not api_key:
return []
query = {
"chainid": "1",
"module": "logs",
"action": "getLogs",
"fromBlock": "0",
"toBlock": "latest",
"address": address,
"topic0": topic0,
"apikey": api_key,
}
payload = fetch_json(f"{ETHERSCAN_V2_API}?{urllib.parse.urlencode(query)}")
result = payload.get("result") if isinstance(payload, dict) else None
return result if isinstance(result, list) else []
def topic_to_address(topic: str) -> str:
return "0x" + topic[-40:]
def bool_from_cast(value: str) -> bool:
return value.strip().lower() in {"true", "1"}
def candidate_addresses() -> dict[str, str]:
candidates = {
"deployer": os.environ.get("DEPLOYER_ADDRESS") or DEPLOYER,
"cwBridgeMainnet": os.environ.get("CW_BRIDGE_MAINNET", "") or CW_BRIDGE_MAINNET_FALLBACK,
"ccipRelayBridgeMainnet": os.environ.get("CCIP_RELAY_BRIDGE_MAINNET", ""),
"mainnetCcipWeth9Bridge": os.environ.get("MAINNET_CCIP_WETH9_BRIDGE", ""),
"mainnetCcipWeth10Bridge": os.environ.get("MAINNET_CCIP_WETH10_BRIDGE", ""),
"ccipEthRouter": os.environ.get("CCIP_ETH_ROUTER", ""),
"uniswapV3CwusdcUsdcPool": "0x1Cf2e685682C7F7beF508F0Af15Dfb5CDda01ee3",
"uniswapV2CwusdcUsdcPair": "0xC28706F899266b36BC43cc072b3a921BDf2C48D9",
"engineXVirtualBatchVault": "0xf108586d1FC330EA1D4EA4ff8fd983cde94279B1",
}
return {label: address for label, address in candidates.items() if address and address.startswith("0x") and len(address) == 42}
def build(args: argparse.Namespace) -> dict[str, Any]:
load_dotenv(ROOT / ".env")
load_dotenv(ROOT / "smom-dbis-138" / ".env")
rpc_url = args.rpc_url or os.environ.get("ETHEREUM_MAINNET_RPC") or os.environ.get("MAINNET_RPC_URL")
if not rpc_url:
raise SystemExit("ETHEREUM_MAINNET_RPC or --rpc-url is required")
roles = {
"DEFAULT_ADMIN_ROLE": cast_call(args.token, "DEFAULT_ADMIN_ROLE()(bytes32)", rpc_url=rpc_url),
"MINTER_ROLE": cast_call(args.token, "MINTER_ROLE()(bytes32)", rpc_url=rpc_url),
"BURNER_ROLE": cast_call(args.token, "BURNER_ROLE()(bytes32)", rpc_url=rpc_url),
}
role_admins = {
role_name: cast_call(args.token, "getRoleAdmin(bytes32)(bytes32)", role_id, rpc_url=rpc_url)
for role_name, role_id in roles.items()
}
candidates = candidate_addresses()
checks: dict[str, Any] = {}
for label, address in candidates.items():
checks[label] = {"address": address, "roles": {}}
for role_name, role_id in roles.items():
checks[label]["roles"][role_name] = bool_from_cast(
cast_call(args.token, "hasRole(bytes32,address)(bool)", role_id, address, rpc_url=rpc_url)
)
privileged = [
{
"label": label,
"address": data["address"],
"roles": [role for role, has_role in data["roles"].items() if has_role],
}
for label, data in checks.items()
if any(data["roles"].values())
]
api_key = os.environ.get("ETHERSCAN_API_KEY", "")
event_topics = {
"RoleGranted": cast_keccak("RoleGranted(bytes32,address,address)"),
"RoleRevoked": cast_keccak("RoleRevoked(bytes32,address,address)"),
}
events: list[dict[str, Any]] = []
if api_key:
for event_name, topic0 in event_topics.items():
for item in etherscan_logs(api_key, args.token, topic0):
topics = item.get("topics") or []
if len(topics) < 4:
continue
role_id = topics[1]
account = topic_to_address(topics[2])
sender = topic_to_address(topics[3])
role_name = next((name for name, value in roles.items() if value.lower() == role_id.lower()), role_id)
events.append(
{
"event": event_name,
"role": role_name,
"roleId": role_id,
"account": account,
"sender": sender,
"blockNumber": int(str(item.get("blockNumber", "0")), 16) if str(item.get("blockNumber", "")).startswith("0x") else item.get("blockNumber"),
"transactionHash": item.get("transactionHash"),
"logIndex": item.get("logIndex"),
}
)
effective_from_events: dict[str, set[str]] = {role: set() for role in roles}
for item in sorted(events, key=lambda row: (int(row.get("blockNumber") or 0), int(str(row.get("logIndex") or "0x0"), 16) if str(row.get("logIndex", "")).startswith("0x") else 0)):
role = item["role"]
if role not in effective_from_events:
continue
if item["event"] == "RoleGranted":
effective_from_events[role].add(item["account"])
elif item["event"] == "RoleRevoked":
effective_from_events[role].discard(item["account"])
return {
"schema": "cwusdc-mainnet-role-audit/v1",
"generatedAt": dt.datetime.now(dt.UTC).isoformat().replace("+00:00", "Z"),
"network": {"chainId": 1, "name": "Ethereum Mainnet"},
"token": {"address": args.token, "symbol": "cWUSDC", "name": "Wrapped cUSDC"},
"roles": roles,
"roleAdmins": role_admins,
"candidateChecks": checks,
"privilegedCandidates": privileged,
"eventLogReview": {
"checked": bool(api_key),
"topics": event_topics,
"eventCount": len(events),
"events": events,
"effectiveMembersFromEvents": {role: sorted(values) for role, values in effective_from_events.items()},
},
"limitations": [
"This audit checks known candidate addresses only.",
"Event-log reconstruction is included when ETHERSCAN_API_KEY is available, but provider log limits or pruned responses can still require manual verification.",
"This is a read-only control snapshot, not a formal third-party audit.",
],
}
def write_md(payload: dict[str, Any], path: Path) -> None:
lines = [
"# cWUSDC Mainnet Role Audit",
"",
f"- Generated: `{payload['generatedAt']}`",
f"- Token: `{payload['token']['address']}`",
"",
"## Role IDs",
"",
"| Role | ID | Admin role ID |",
"|---|---|---|",
]
for role, role_id in payload["roles"].items():
lines.append(f"| `{role}` | `{role_id}` | `{payload['roleAdmins'][role]}` |")
lines.extend(["", "## Candidate Role Checks", "", "| Label | Address | Admin | Minter | Burner |", "|---|---|---:|---:|---:|"])
for label, data in payload["candidateChecks"].items():
roles = data["roles"]
lines.append(
f"| `{label}` | `{data['address']}` | `{roles['DEFAULT_ADMIN_ROLE']}` | `{roles['MINTER_ROLE']}` | `{roles['BURNER_ROLE']}` |"
)
lines.extend(["", "## Event-Log Role Reconstruction", "", f"- Checked: `{payload['eventLogReview']['checked']}`", f"- Event count: `{payload['eventLogReview']['eventCount']}`", "", "| Role | Effective members from events |", "|---|---|"])
for role, members in payload["eventLogReview"]["effectiveMembersFromEvents"].items():
lines.append(f"| `{role}` | `{', '.join(members) if members else 'none observed'}` |")
lines.extend(["", "## Limitations", ""])
lines.extend(f"- {item}" for item in payload["limitations"])
path.write_text("\n".join(lines) + "\n")
def main() -> int:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--token", default=CWUSDC)
parser.add_argument("--rpc-url", default="")
parser.add_argument("--json-out", type=Path, default=REPORT_JSON)
parser.add_argument("--md-out", type=Path, default=REPORT_MD)
args = parser.parse_args()
payload = build(args)
args.json_out.parent.mkdir(parents=True, exist_ok=True)
args.json_out.write_text(json.dumps(payload, indent=2) + "\n")
write_md(payload, args.md_out)
print(f"Wrote {args.json_out.relative_to(ROOT)}")
print(f"Wrote {args.md_out.relative_to(ROOT)}")
return 0
if __name__ == "__main__":
raise SystemExit(main())