Files
proxmox/scripts/verify/build-cwusdc-etherscan-value-dossier.py
defiQUG 349ac415b9
Some checks failed
Deploy to Phoenix / validate (push) Failing after 1s
Deploy to Phoenix / deploy (push) Has been skipped
Deploy to Phoenix / deploy-atomic-swap-dapp (push) Has been skipped
Deploy to Phoenix / cloudflare (push) Has been skipped
chore(cwusdc): optional DexScreener probes and Etherscan-value readiness subset
- Treat DexScreener token v1 APIs as optional; document in non-manual tasks.
- Align tracker checks, handoff/dossier builders, CMC sanity, monitors, and CI
  shell wrapper with ETHERSCAN_VALUE_PATH_READY_IDS and summary fields.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-12 00:00:52 -07:00

538 lines
23 KiB
Python
Executable File

#!/usr/bin/env python3
"""Build a submission-ready cWUSDC Etherscan Value dossier.
The dossier intentionally separates Ethereum Mainnet cWUSDC evidence from
global cUSDC/cWUSDC family context. It is read-only: it runs monitors and proof
generators, then summarizes what can be submitted and what remains externally
blocked.
"""
from __future__ import annotations
import argparse
import datetime as dt
import json
import os
import subprocess
import urllib.parse
import urllib.request
from pathlib import Path
from typing import Any
ROOT = Path(__file__).resolve().parents[2]
REPORT_JSON = ROOT / "reports" / "status" / "cwusdc-etherscan-value-dossier-latest.json"
REPORT_MD = ROOT / "reports" / "status" / "cwusdc-etherscan-value-dossier-latest.md"
CWUSDC = "0x2de5F116bFcE3d0f922d9C8351e0c5Fc24b9284a"
ETHERSCAN_CHAINLIST_URL = "https://api.etherscan.io/v2/chainlist"
ETHERSCAN_V2_API = "https://api.etherscan.io/v2/api"
DEPLOYER_FALLBACK = "0x4A666F96fC8764181194447A7dFdb7d471b301C8"
L2_DEPOSIT_CHAINS = {
"10": "OP Mainnet",
"42161": "Arbitrum One Mainnet",
}
ARTIFACTS = {
"mainnetSupply": ROOT / "reports" / "status" / "cwusdc-supply-circulating-attestation-latest.json",
"globalFamilySupply": ROOT / "reports" / "status" / "global-cusdc-cwusdc-family-supply-proof-latest.json",
"feedAudit": ROOT / "reports" / "status" / "cusdc-cwusdc-etherscan-feed-audit-latest.json",
"propagation": ROOT / "reports" / "status" / "cwusdc-etherscan-value-propagation-latest.json",
}
DOCS = {
"executionPlan": ROOT / "docs" / "04-configuration" / "etherscan" / "CWUSDC_ETHERSCAN_VALUE_EXECUTION_PLAN.md",
"bridgeLayerMap": ROOT / "docs" / "04-configuration" / "etherscan" / "CWUSDC_ETHERSCAN_BRIDGE_CROSSCHAIN_LAYER_MAP.md",
"profilePacket": ROOT / "docs" / "04-configuration" / "etherscan" / "CWUSDC_MAINNET_ETHERSCAN_PROFILE_PACKET.md",
"e2eRecommendations": ROOT / "docs" / "04-configuration" / "etherscan" / "CWUSDC_ETHERSCAN_E2E_RECOMMENDATIONS.md",
"trackerPacket": ROOT / "docs" / "04-configuration" / "coingecko" / "CWUSDC_MAINNET_TRACKER_SUBMISSION_PACKET.md",
}
def load_dotenv(path: Path, env: dict[str, str]) -> dict[str, str]:
if not path.exists():
return env
merged = dict(env)
for line in path.read_text().splitlines():
line = line.strip()
if not line or line.startswith("#") or "=" not in line:
continue
key, value = line.split("=", 1)
key = key.strip()
value = value.strip().strip('"').strip("'")
if key and key not in merged:
merged[key] = value
return merged
def run_command(command: list[str], env: dict[str, str]) -> dict[str, Any]:
proc = subprocess.run(
command,
cwd=ROOT,
env=env,
text=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
check=False,
)
return {
"command": command,
"returncode": proc.returncode,
"stdout": proc.stdout.strip(),
"stderr": proc.stderr.strip(),
"ok": proc.returncode == 0,
}
def read_json(path: Path) -> Any | None:
if not path.exists():
return None
return json.loads(path.read_text())
def fetch_json_url(url: str, timeout: int = 30) -> Any:
req = urllib.request.Request(url, headers={"User-Agent": "dbis-cwusdc-etherscan-dossier/1.0"})
with urllib.request.urlopen(req, timeout=timeout) as response:
return json.loads(response.read().decode("utf-8"))
def load_etherscan_chainlist() -> dict[str, Any]:
try:
payload = fetch_json_url(ETHERSCAN_CHAINLIST_URL)
except Exception as exc: # noqa: BLE001 - dossier should capture diagnostics instead of crashing
return {
"url": ETHERSCAN_CHAINLIST_URL,
"available": False,
"error": str(exc),
"totalcount": None,
"supportedChainIds": [],
"statusByChainId": {},
}
result = payload.get("result") if isinstance(payload, dict) else None
chains = result if isinstance(result, list) else []
status_by_chain_id = {
str(item.get("chainid")): {
"chainname": item.get("chainname"),
"blockexplorer": item.get("blockexplorer"),
"apiurl": item.get("apiurl"),
"status": item.get("status"),
"comment": item.get("comment"),
}
for item in chains
if isinstance(item, dict) and item.get("chainid") is not None
}
return {
"url": ETHERSCAN_CHAINLIST_URL,
"available": True,
"comments": payload.get("comments") if isinstance(payload, dict) else None,
"totalcount": payload.get("totalcount") if isinstance(payload, dict) else len(chains),
"supportedChainIds": sorted(status_by_chain_id, key=lambda value: int(value) if value.isdigit() else value),
"statusByChainId": status_by_chain_id,
}
def human_token_value(raw: Any, token_address: str | None) -> str | None:
try:
raw_int = int(str(raw))
except (TypeError, ValueError):
return None
decimals = 18 if token_address == "ETH" else 6 if token_address and token_address.lower() == CWUSDC.lower() else 18
whole = raw_int // (10**decimals)
frac = str(raw_int % (10**decimals)).rjust(decimals, "0").rstrip("0")
return f"{whole}" + (f".{frac}" if frac else "")
def human_wei(raw: Any) -> str | None:
try:
raw_int = int(str(raw))
except (TypeError, ValueError):
return None
whole = raw_int // (10**18)
frac = str(raw_int % (10**18)).rjust(18, "0").rstrip("0")
return f"{whole}" + (f".{frac}" if frac else "")
def normalize_deposit_row(row: dict[str, Any]) -> dict[str, Any]:
token_address = row.get("tokenAddress")
return {
"hash": row.get("hash"),
"l1TransactionHash": row.get("L1transactionhash"),
"timeStamp": row.get("timeStamp"),
"from": row.get("from"),
"to": row.get("to"),
"valueRaw": row.get("value"),
"valueEth": human_wei(row.get("value")),
"tokenAddress": token_address,
"tokenValueRaw": row.get("tokenValue"),
"tokenValueUnits": human_token_value(row.get("tokenValue"), token_address),
"txreceiptStatus": row.get("txreceipt_status"),
"isError": row.get("isError"),
}
def etherscan_v2_call(params: dict[str, str], api_key: str) -> dict[str, Any]:
query = {**params, "apikey": api_key}
url = f"{ETHERSCAN_V2_API}?{urllib.parse.urlencode(query)}"
redacted_query = {**params, "apikey": "REDACTED"}
redacted_url = f"{ETHERSCAN_V2_API}?{urllib.parse.urlencode(redacted_query)}"
try:
payload = fetch_json_url(url)
except Exception as exc: # noqa: BLE001 - capture diagnostics instead of crashing the dossier
return {"url": redacted_url, "ok": False, "error": str(exc), "status": None, "message": None, "result": None}
status = str(payload.get("status")) if isinstance(payload, dict) else None
message = payload.get("message") if isinstance(payload, dict) else None
result = payload.get("result") if isinstance(payload, dict) else None
no_rows = isinstance(result, str) and "No transactions found" in result
return {
"url": redacted_url,
"ok": status == "1" or no_rows,
"status": status,
"message": message,
"result": [] if no_rows else result,
"error": None,
}
def load_l2_deposit_evidence(api_key: str, chainlist: dict[str, Any], address: str) -> dict[str, Any]:
if not api_key:
return {
"checked": False,
"reason": "ETHERSCAN_API_KEY is not set.",
"address": address,
"chains": {},
}
supported = set(chainlist.get("statusByChainId", {}))
chains: dict[str, Any] = {}
for chain_id, chain_name in L2_DEPOSIT_CHAINS.items():
if chain_id not in supported:
chains[chain_id] = {
"chainName": chain_name,
"checked": False,
"reason": "chain is not present in Etherscan V2 chainlist",
}
continue
response = etherscan_v2_call(
{
"chainid": chain_id,
"module": "account",
"action": "getdeposittxs",
"address": address,
"page": "1",
"offset": "10",
"sort": "desc",
},
api_key,
)
result = response.get("result")
rows = result if isinstance(result, list) else []
chains[chain_id] = {
"chainName": chain_name,
"checked": True,
"ok": response.get("ok"),
"status": response.get("status"),
"message": response.get("message"),
"sampleCount": len(rows),
"latest": normalize_deposit_row(rows[0]) if rows else None,
"url": response.get("url"),
"error": response.get("error"),
}
return {
"checked": True,
"address": address,
"scope": "Etherscan-indexed L2 deposits by address. This is bridge provenance only and does not set Mainnet cWUSDC USD Value.",
"rawUnitNote": "tokenValue is returned as raw token units. ETH uses 18 decimals; ERC-20 rows must be normalized with that token contract's decimals.",
"chains": chains,
}
def load_contract_source_verification(api_key: str, address: str) -> dict[str, Any]:
if not api_key:
return {
"checked": False,
"reason": "ETHERSCAN_API_KEY is not set.",
"address": address,
"verified": False,
}
response = etherscan_v2_call(
{
"chainid": "1",
"module": "contract",
"action": "getsourcecode",
"address": address,
},
api_key,
)
result = response.get("result")
entry = result[0] if isinstance(result, list) and result and isinstance(result[0], dict) else {}
source_code = str(entry.get("SourceCode") or "")
abi = str(entry.get("ABI") or "")
contract_name = str(entry.get("ContractName") or "")
return {
"checked": True,
"address": address,
"ok": response.get("ok"),
"status": response.get("status"),
"message": response.get("message"),
"verified": bool(source_code and contract_name and abi and abi != "Contract source code not verified"),
"contractName": contract_name or None,
"compilerVersion": entry.get("CompilerVersion") or None,
"optimizationUsed": entry.get("OptimizationUsed") or None,
"runs": entry.get("Runs") or None,
"constructorArgumentsPresent": bool(entry.get("ConstructorArguments")),
"evmVersion": entry.get("EVMVersion") or None,
"licenseType": entry.get("LicenseType") or None,
"proxy": entry.get("Proxy") or None,
"implementation": entry.get("Implementation") or None,
"sourceCodeBytes": len(source_code),
"abiAvailable": bool(abi and abi != "Contract source code not verified"),
"url": response.get("url"),
"error": response.get("error"),
}
def rel(path: Path) -> str:
return str(path.relative_to(ROOT))
def build(args: argparse.Namespace) -> dict[str, Any]:
env = load_dotenv(ROOT / ".env", dict(os.environ))
etherscan_api_key = env.get("ETHERSCAN_API_KEY", "")
l2_deposit_address = args.l2_deposit_address or env.get("DEPLOYER_ADDRESS") or DEPLOYER_FALLBACK
commands: list[dict[str, Any]] = []
if args.refresh:
commands = [
run_command(["python3", "scripts/verify/generate-cwusdc-supply-circulating-attestation.py"], env),
run_command(["python3", "scripts/verify/generate-global-cusdc-cwusdc-family-supply-proof.py"], env),
run_command(["python3", "scripts/verify/audit-cusdc-cwusdc-etherscan-feeds.py"], env),
run_command(["python3", "scripts/verify/monitor-cwusdc-etherscan-value-propagation.py"], env),
run_command(["bash", "scripts/verify/check-cwusdc-etherscan-prereq-urls.sh"], env),
]
artifacts = {key: read_json(path) for key, path in ARTIFACTS.items()}
propagation = artifacts["propagation"] or {}
supply = artifacts["mainnetSupply"] or {}
global_family = artifacts["globalFamilySupply"] or {}
feed_audit = artifacts["feedAudit"] or {}
chainlist = load_etherscan_chainlist()
l2_deposits = load_l2_deposit_evidence(etherscan_api_key, chainlist, l2_deposit_address)
contract_source = load_contract_source_verification(etherscan_api_key, CWUSDC)
family_chain_ids = sorted(
{str(item.get("chainId")) for item in global_family.get("entries", []) if isinstance(item, dict) and item.get("chainId") is not None},
key=lambda value: int(value) if value.isdigit() else value,
)
supported_family_chain_ids = [chain_id for chain_id in family_chain_ids if chain_id in chainlist.get("statusByChainId", {})]
unsupported_family_chain_ids = [chain_id for chain_id in family_chain_ids if chain_id not in chainlist.get("statusByChainId", {})]
blockers = list(((propagation.get("summary") or {}).get("blockers") or []))
propagation_advisory = list(((propagation.get("summary") or {}).get("advisoryNotes") or []))
command_failures = [item for item in commands if not item["ok"]]
for item in command_failures:
blockers.append("Command failed: " + " ".join(item["command"]))
ready_evidence = {
"mainnetSupplyAttestation": bool(supply.get("supply")),
"globalFamilySupplyContext": bool(global_family.get("summary")),
"chain138MainnetFeedAudit": bool(feed_audit.get("canonicalRelationship")),
"mainnetContractSourceVerified": bool(contract_source.get("verified")),
"propagationMonitor": bool(propagation.get("checks")),
"publicPrereqUrls": not any(
item["command"] == ["bash", "scripts/verify/check-cwusdc-etherscan-prereq-urls.sh"] and not item["ok"]
for item in commands
)
if commands
else None,
"documentationPacket": {key: path.exists() for key, path in DOCS.items()},
}
next_actions = []
if blockers:
next_actions.extend(
[
"Submit/update Etherscan token profile for the exact Ethereum Mainnet cWUSDC contract.",
"Submit/update CoinGecko and CoinMarketCap listings with Mainnet supply proof, DEX evidence, and bridge-family context.",
"Use the global family supply proof only as context; use the Ethereum Mainnet cWUSDC attestation as the token-page supply basis.",
"Re-run this dossier after each external approval or tracker response.",
]
)
else:
next_actions.append("No blockers detected by local monitors; capture Etherscan screenshots and continue propagation monitoring.")
return {
"schema": "cwusdc-etherscan-value-dossier/v1",
"generatedAt": dt.datetime.now(dt.UTC).isoformat().replace("+00:00", "Z"),
"purpose": "Single submission and monitoring packet for making Etherscan show USD value for Ethereum Mainnet cWUSDC.",
"target": {
"network": "Ethereum Mainnet",
"chainId": 1,
"contract": CWUSDC,
"caip19": f"eip155:1/erc20:{CWUSDC}",
"name": "Wrapped cUSDC",
"symbol": "cWUSDC",
"decimals": 6,
},
"readiness": {
"readyForExternalSubmission": ready_evidence["mainnetSupplyAttestation"]
and ready_evidence["chain138MainnetFeedAudit"]
and ready_evidence["mainnetContractSourceVerified"]
and ready_evidence["propagationMonitor"],
"etherscanValueReady": (propagation.get("summary") or {}).get("etherscanValueReady"),
"coinGeckoPriceReady": (propagation.get("summary") or {}).get("coingeckoPriceReady"),
"blockers": blockers,
"advisoryNotes": propagation_advisory,
},
"evidence": {
"artifacts": {key: rel(path) for key, path in ARTIFACTS.items()},
"docs": {key: rel(path) for key, path in DOCS.items()},
"readyEvidence": ready_evidence,
"mainnetContractSourceVerification": contract_source,
"mainnetSupply": supply.get("supply"),
"globalFamilyWarning": (global_family.get("caveats") or ["Global family supply is context only; do not use it as Ethereum Etherscan token-page supply."])[0],
"globalFamilySummary": global_family.get("summary"),
"feedRelationship": feed_audit.get("canonicalRelationship"),
"etherscanChainlist": {
**chainlist,
"familyChainIds": family_chain_ids,
"etherscanSupportedFamilyChainIds": supported_family_chain_ids,
"notEtherscanSupportedFamilyChainIds": unsupported_family_chain_ids,
"chain138SupportedByEtherscanV2": "138" in chainlist.get("statusByChainId", {}),
"interpretation": "Only chains present in Etherscan V2 chainlist should be described as first-class Etherscan-family API evidence. Chain 138 remains provenance/context evidence unless Etherscan adds chainid 138.",
},
"l2DepositTransactions": l2_deposits,
},
"commands": commands,
"nextActions": next_actions,
}
def write_md(payload: dict[str, Any], path: Path) -> None:
readiness = payload["readiness"]
evidence = payload["evidence"]
target = payload["target"]
lines = [
"# cWUSDC Etherscan Value Dossier",
"",
f"- Generated: `{payload['generatedAt']}`",
f"- Target: `{target['contract']}`",
f"- CAIP-19: `{target['caip19']}`",
f"- Ready for external submission: `{readiness['readyForExternalSubmission']}`",
f"- Etherscan value ready: `{readiness['etherscanValueReady']}`",
f"- CoinGecko price ready: `{readiness['coinGeckoPriceReady']}`",
"",
"## Blockers",
"",
]
if readiness["blockers"]:
lines.extend(f"- {item}" for item in readiness["blockers"])
else:
lines.append("- None detected by this dossier.")
advisory = readiness.get("advisoryNotes") or []
if advisory:
lines.extend(["", "## Advisory (non-gating)", ""])
lines.extend(f"- {item}" for item in advisory)
lines.extend(
[
"",
"## Evidence Artifacts",
"",
"| Artifact | Path |",
"|---|---|",
]
)
for key, value in evidence["artifacts"].items():
lines.append(f"| `{key}` | `{value}` |")
lines.extend(["", "## Documentation Packet", "", "| Document | Path |", "|---|---|"])
for key, value in evidence["docs"].items():
lines.append(f"| `{key}` | `{value}` |")
lines.extend(
[
"",
"## Mainnet Contract Verification",
"",
f"- Checked: `{(evidence['mainnetContractSourceVerification'] or {}).get('checked')}`",
f"- Verified: `{(evidence['mainnetContractSourceVerification'] or {}).get('verified')}`",
f"- Contract name: `{(evidence['mainnetContractSourceVerification'] or {}).get('contractName')}`",
f"- Compiler: `{(evidence['mainnetContractSourceVerification'] or {}).get('compilerVersion')}`",
f"- License: `{(evidence['mainnetContractSourceVerification'] or {}).get('licenseType')}`",
f"- Proxy: `{(evidence['mainnetContractSourceVerification'] or {}).get('proxy')}`",
"",
"## Supply Boundary",
"",
f"- Ethereum Mainnet cWUSDC supply basis: `{(evidence['mainnetSupply'] or {}).get('totalSupplyUnits')}`",
f"- Circulating supply basis: `{(evidence['mainnetSupply'] or {}).get('circulatingSupplyUnits')}`",
f"- Global family warning: {evidence['globalFamilyWarning']}",
"",
"## Etherscan Chainlist Boundary",
"",
f"- Etherscan V2 chainlist total: `{(evidence['etherscanChainlist'] or {}).get('totalcount')}`",
f"- Family chain IDs: `{', '.join((evidence['etherscanChainlist'] or {}).get('familyChainIds') or [])}`",
f"- Etherscan-supported family chain IDs: `{', '.join((evidence['etherscanChainlist'] or {}).get('etherscanSupportedFamilyChainIds') or [])}`",
f"- Not Etherscan-supported family chain IDs: `{', '.join((evidence['etherscanChainlist'] or {}).get('notEtherscanSupportedFamilyChainIds') or [])}`",
f"- Chain 138 supported by Etherscan V2: `{(evidence['etherscanChainlist'] or {}).get('chain138SupportedByEtherscanV2')}`",
"",
"## L2 Deposit Transaction Boundary",
"",
f"- Address checked: `{(evidence['l2DepositTransactions'] or {}).get('address')}`",
f"- Checked: `{(evidence['l2DepositTransactions'] or {}).get('checked')}`",
"- Scope: Etherscan-indexed OP/Arbitrum deposit provenance only; it does not set Mainnet cWUSDC USD Value.",
"- Unit note: `value` is raw wei; `tokenValue` is raw token units. `1195403000000000` in `value` is `0.001195403 ETH`; `598200000000000` with `tokenAddress=ETH` is `0.0005982 ETH`.",
"",
"| Chain | Checked | Sample deposits | Latest tx | Native value | Token value |",
"|---|---:|---:|---|---:|---:|",
]
)
for chain_id, item in (evidence["l2DepositTransactions"].get("chains") or {}).items():
latest = item.get("latest") or {}
lines.append(
f"| `{chain_id}` {item.get('chainName')} | `{item.get('checked')}` | `{item.get('sampleCount', 0)}` | `{latest.get('hash')}` | `{latest.get('valueEth')}` ETH | `{latest.get('tokenValueUnits')}` {latest.get('tokenAddress') or ''} |"
)
lines.extend(
[
"",
"## Next Actions",
"",
]
)
lines.extend(f"- {item}" for item in payload["nextActions"])
if payload["commands"]:
lines.extend(["", "## Command Results", "", "| Command | Exit |", "|---|---:|"])
for item in payload["commands"]:
lines.append(f"| `{' '.join(item['command'])}` | `{item['returncode']}` |")
path.write_text("\n".join(lines) + "\n")
def main() -> int:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--no-refresh", action="store_true", help="Only aggregate existing reports; do not rerun checks.")
parser.add_argument("--json-out", type=Path, default=REPORT_JSON)
parser.add_argument("--md-out", type=Path, default=REPORT_MD)
parser.add_argument("--l2-deposit-address", default="", help="Address to check with Etherscan getdeposittxs.")
parser.add_argument("--strict", action="store_true")
args = parser.parse_args()
args.refresh = not args.no_refresh
payload = build(args)
args.json_out.parent.mkdir(parents=True, exist_ok=True)
args.json_out.write_text(json.dumps(payload, indent=2) + "\n")
write_md(payload, args.md_out)
print(f"Wrote {args.json_out.relative_to(ROOT)}")
print(f"Wrote {args.md_out.relative_to(ROOT)}")
print(f"readyForExternalSubmission={payload['readiness']['readyForExternalSubmission']}")
if payload["readiness"]["blockers"]:
print("Blockers: " + "; ".join(payload["readiness"]["blockers"]))
adv = payload["readiness"].get("advisoryNotes") or []
if adv:
print("Advisory (non-gating): " + "; ".join(adv))
if args.strict and payload["readiness"]["blockers"]:
return 1
return 0
if __name__ == "__main__":
raise SystemExit(main())