Add optional Cosmos/Engine-X/act-runner templates, CWUSDC/EI-matrix tooling, non-EVM route planner in multi-chain-execution (tests passing), token list and extraction updates, and documentation (MetaMask matrix, GRU/CWUSDC packets). Ignore institutional evidence tarballs/sha256 under reports/status. Validated with: bash scripts/verify/run-all-validation.sh --skip-genesis Co-authored-by: Cursor <cursoragent@cursor.com>
246 lines
10 KiB
Python
246 lines
10 KiB
Python
#!/usr/bin/env python3
|
|
"""Read-only role/control audit for Ethereum Mainnet cWUSDC."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import datetime as dt
|
|
import json
|
|
import os
|
|
import subprocess
|
|
import urllib.parse
|
|
import urllib.request
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
|
|
ROOT = Path(__file__).resolve().parents[2]
|
|
REPORT_JSON = ROOT / "reports" / "status" / "cwusdc-mainnet-role-audit-latest.json"
|
|
REPORT_MD = ROOT / "reports" / "status" / "cwusdc-mainnet-role-audit-latest.md"
|
|
CWUSDC = "0x2de5F116bFcE3d0f922d9C8351e0c5Fc24b9284a"
|
|
DEPLOYER = "0x4A666F96fC8764181194447A7dFdb7d471b301C8"
|
|
CW_BRIDGE_MAINNET_FALLBACK = "0x2bF74583206A49Be07E0E8A94197C12987AbD7B5"
|
|
ETHERSCAN_V2_API = "https://api.etherscan.io/v2/api"
|
|
|
|
|
|
def load_dotenv(path: Path) -> None:
|
|
if not path.exists():
|
|
return
|
|
for line in path.read_text().splitlines():
|
|
line = line.strip()
|
|
if not line or line.startswith("#") or "=" not in line:
|
|
continue
|
|
key, value = line.split("=", 1)
|
|
key = key.strip()
|
|
value = value.strip().strip('"').strip("'")
|
|
if key and key not in os.environ:
|
|
os.environ[key] = value
|
|
|
|
|
|
def cast_call(contract: str, signature: str, *args: str, rpc_url: str) -> str:
|
|
command = ["cast", "call", contract, signature, *args, "--rpc-url", rpc_url]
|
|
proc = subprocess.run(command, cwd=ROOT, text=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=False)
|
|
if proc.returncode != 0:
|
|
raise RuntimeError(proc.stderr.strip() or proc.stdout.strip())
|
|
return proc.stdout.strip()
|
|
|
|
|
|
def cast_keccak(signature: str) -> str:
|
|
proc = subprocess.run(["cast", "keccak", signature], cwd=ROOT, text=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=False)
|
|
if proc.returncode != 0:
|
|
raise RuntimeError(proc.stderr.strip() or proc.stdout.strip())
|
|
return proc.stdout.strip()
|
|
|
|
|
|
def fetch_json(url: str) -> Any:
|
|
req = urllib.request.Request(url, headers={"User-Agent": "dbis-cwusdc-role-audit/1.0"})
|
|
with urllib.request.urlopen(req, timeout=30) as response:
|
|
return json.loads(response.read().decode("utf-8"))
|
|
|
|
|
|
def etherscan_logs(api_key: str, address: str, topic0: str) -> list[dict[str, Any]]:
|
|
if not api_key:
|
|
return []
|
|
query = {
|
|
"chainid": "1",
|
|
"module": "logs",
|
|
"action": "getLogs",
|
|
"fromBlock": "0",
|
|
"toBlock": "latest",
|
|
"address": address,
|
|
"topic0": topic0,
|
|
"apikey": api_key,
|
|
}
|
|
payload = fetch_json(f"{ETHERSCAN_V2_API}?{urllib.parse.urlencode(query)}")
|
|
result = payload.get("result") if isinstance(payload, dict) else None
|
|
return result if isinstance(result, list) else []
|
|
|
|
|
|
def topic_to_address(topic: str) -> str:
|
|
return "0x" + topic[-40:]
|
|
|
|
|
|
def bool_from_cast(value: str) -> bool:
|
|
return value.strip().lower() in {"true", "1"}
|
|
|
|
|
|
def candidate_addresses() -> dict[str, str]:
|
|
candidates = {
|
|
"deployer": os.environ.get("DEPLOYER_ADDRESS") or DEPLOYER,
|
|
"cwBridgeMainnet": os.environ.get("CW_BRIDGE_MAINNET", "") or CW_BRIDGE_MAINNET_FALLBACK,
|
|
"ccipRelayBridgeMainnet": os.environ.get("CCIP_RELAY_BRIDGE_MAINNET", ""),
|
|
"mainnetCcipWeth9Bridge": os.environ.get("MAINNET_CCIP_WETH9_BRIDGE", ""),
|
|
"mainnetCcipWeth10Bridge": os.environ.get("MAINNET_CCIP_WETH10_BRIDGE", ""),
|
|
"ccipEthRouter": os.environ.get("CCIP_ETH_ROUTER", ""),
|
|
"uniswapV3CwusdcUsdcPool": "0x1Cf2e685682C7F7beF508F0Af15Dfb5CDda01ee3",
|
|
"uniswapV2CwusdcUsdcPair": "0xC28706F899266b36BC43cc072b3a921BDf2C48D9",
|
|
"engineXVirtualBatchVault": "0xf108586d1FC330EA1D4EA4ff8fd983cde94279B1",
|
|
}
|
|
return {label: address for label, address in candidates.items() if address and address.startswith("0x") and len(address) == 42}
|
|
|
|
|
|
def build(args: argparse.Namespace) -> dict[str, Any]:
|
|
load_dotenv(ROOT / ".env")
|
|
load_dotenv(ROOT / "smom-dbis-138" / ".env")
|
|
rpc_url = args.rpc_url or os.environ.get("ETHEREUM_MAINNET_RPC") or os.environ.get("MAINNET_RPC_URL")
|
|
if not rpc_url:
|
|
raise SystemExit("ETHEREUM_MAINNET_RPC or --rpc-url is required")
|
|
|
|
roles = {
|
|
"DEFAULT_ADMIN_ROLE": cast_call(args.token, "DEFAULT_ADMIN_ROLE()(bytes32)", rpc_url=rpc_url),
|
|
"MINTER_ROLE": cast_call(args.token, "MINTER_ROLE()(bytes32)", rpc_url=rpc_url),
|
|
"BURNER_ROLE": cast_call(args.token, "BURNER_ROLE()(bytes32)", rpc_url=rpc_url),
|
|
}
|
|
role_admins = {
|
|
role_name: cast_call(args.token, "getRoleAdmin(bytes32)(bytes32)", role_id, rpc_url=rpc_url)
|
|
for role_name, role_id in roles.items()
|
|
}
|
|
|
|
candidates = candidate_addresses()
|
|
checks: dict[str, Any] = {}
|
|
for label, address in candidates.items():
|
|
checks[label] = {"address": address, "roles": {}}
|
|
for role_name, role_id in roles.items():
|
|
checks[label]["roles"][role_name] = bool_from_cast(
|
|
cast_call(args.token, "hasRole(bytes32,address)(bool)", role_id, address, rpc_url=rpc_url)
|
|
)
|
|
|
|
privileged = [
|
|
{
|
|
"label": label,
|
|
"address": data["address"],
|
|
"roles": [role for role, has_role in data["roles"].items() if has_role],
|
|
}
|
|
for label, data in checks.items()
|
|
if any(data["roles"].values())
|
|
]
|
|
|
|
api_key = os.environ.get("ETHERSCAN_API_KEY", "")
|
|
event_topics = {
|
|
"RoleGranted": cast_keccak("RoleGranted(bytes32,address,address)"),
|
|
"RoleRevoked": cast_keccak("RoleRevoked(bytes32,address,address)"),
|
|
}
|
|
events: list[dict[str, Any]] = []
|
|
if api_key:
|
|
for event_name, topic0 in event_topics.items():
|
|
for item in etherscan_logs(api_key, args.token, topic0):
|
|
topics = item.get("topics") or []
|
|
if len(topics) < 4:
|
|
continue
|
|
role_id = topics[1]
|
|
account = topic_to_address(topics[2])
|
|
sender = topic_to_address(topics[3])
|
|
role_name = next((name for name, value in roles.items() if value.lower() == role_id.lower()), role_id)
|
|
events.append(
|
|
{
|
|
"event": event_name,
|
|
"role": role_name,
|
|
"roleId": role_id,
|
|
"account": account,
|
|
"sender": sender,
|
|
"blockNumber": int(str(item.get("blockNumber", "0")), 16) if str(item.get("blockNumber", "")).startswith("0x") else item.get("blockNumber"),
|
|
"transactionHash": item.get("transactionHash"),
|
|
"logIndex": item.get("logIndex"),
|
|
}
|
|
)
|
|
effective_from_events: dict[str, set[str]] = {role: set() for role in roles}
|
|
for item in sorted(events, key=lambda row: (int(row.get("blockNumber") or 0), int(str(row.get("logIndex") or "0x0"), 16) if str(row.get("logIndex", "")).startswith("0x") else 0)):
|
|
role = item["role"]
|
|
if role not in effective_from_events:
|
|
continue
|
|
if item["event"] == "RoleGranted":
|
|
effective_from_events[role].add(item["account"])
|
|
elif item["event"] == "RoleRevoked":
|
|
effective_from_events[role].discard(item["account"])
|
|
|
|
return {
|
|
"schema": "cwusdc-mainnet-role-audit/v1",
|
|
"generatedAt": dt.datetime.now(dt.UTC).isoformat().replace("+00:00", "Z"),
|
|
"network": {"chainId": 1, "name": "Ethereum Mainnet"},
|
|
"token": {"address": args.token, "symbol": "cWUSDC", "name": "Wrapped cUSDC"},
|
|
"roles": roles,
|
|
"roleAdmins": role_admins,
|
|
"candidateChecks": checks,
|
|
"privilegedCandidates": privileged,
|
|
"eventLogReview": {
|
|
"checked": bool(api_key),
|
|
"topics": event_topics,
|
|
"eventCount": len(events),
|
|
"events": events,
|
|
"effectiveMembersFromEvents": {role: sorted(values) for role, values in effective_from_events.items()},
|
|
},
|
|
"limitations": [
|
|
"This audit checks known candidate addresses only.",
|
|
"Event-log reconstruction is included when ETHERSCAN_API_KEY is available, but provider log limits or pruned responses can still require manual verification.",
|
|
"This is a read-only control snapshot, not a formal third-party audit.",
|
|
],
|
|
}
|
|
|
|
|
|
def write_md(payload: dict[str, Any], path: Path) -> None:
|
|
lines = [
|
|
"# cWUSDC Mainnet Role Audit",
|
|
"",
|
|
f"- Generated: `{payload['generatedAt']}`",
|
|
f"- Token: `{payload['token']['address']}`",
|
|
"",
|
|
"## Role IDs",
|
|
"",
|
|
"| Role | ID | Admin role ID |",
|
|
"|---|---|---|",
|
|
]
|
|
for role, role_id in payload["roles"].items():
|
|
lines.append(f"| `{role}` | `{role_id}` | `{payload['roleAdmins'][role]}` |")
|
|
lines.extend(["", "## Candidate Role Checks", "", "| Label | Address | Admin | Minter | Burner |", "|---|---|---:|---:|---:|"])
|
|
for label, data in payload["candidateChecks"].items():
|
|
roles = data["roles"]
|
|
lines.append(
|
|
f"| `{label}` | `{data['address']}` | `{roles['DEFAULT_ADMIN_ROLE']}` | `{roles['MINTER_ROLE']}` | `{roles['BURNER_ROLE']}` |"
|
|
)
|
|
lines.extend(["", "## Event-Log Role Reconstruction", "", f"- Checked: `{payload['eventLogReview']['checked']}`", f"- Event count: `{payload['eventLogReview']['eventCount']}`", "", "| Role | Effective members from events |", "|---|---|"])
|
|
for role, members in payload["eventLogReview"]["effectiveMembersFromEvents"].items():
|
|
lines.append(f"| `{role}` | `{', '.join(members) if members else 'none observed'}` |")
|
|
lines.extend(["", "## Limitations", ""])
|
|
lines.extend(f"- {item}" for item in payload["limitations"])
|
|
path.write_text("\n".join(lines) + "\n")
|
|
|
|
|
|
def main() -> int:
|
|
parser = argparse.ArgumentParser(description=__doc__)
|
|
parser.add_argument("--token", default=CWUSDC)
|
|
parser.add_argument("--rpc-url", default="")
|
|
parser.add_argument("--json-out", type=Path, default=REPORT_JSON)
|
|
parser.add_argument("--md-out", type=Path, default=REPORT_MD)
|
|
args = parser.parse_args()
|
|
payload = build(args)
|
|
args.json_out.parent.mkdir(parents=True, exist_ok=True)
|
|
args.json_out.write_text(json.dumps(payload, indent=2) + "\n")
|
|
write_md(payload, args.md_out)
|
|
print(f"Wrote {args.json_out.relative_to(ROOT)}")
|
|
print(f"Wrote {args.md_out.relative_to(ROOT)}")
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|