#!/usr/bin/env python3 """Monitor whether cWUSDC USD value has propagated to Etherscan and upstream feeds.""" from __future__ import annotations import argparse import datetime as dt import json import os import re import time import urllib.error import urllib.parse import urllib.request from pathlib import Path from typing import Any ROOT = Path(__file__).resolve().parents[2] REPORT_JSON = ROOT / "reports" / "status" / "cwusdc-etherscan-value-propagation-latest.json" REPORT_MD = ROOT / "reports" / "status" / "cwusdc-etherscan-value-propagation-latest.md" CWUSDC = "0x2de5f116bfce3d0f922d9c8351e0c5fc24b9284a" ETHERSCAN_API = "https://api.etherscan.io/v2/api" ETHERSCAN_PAGE = f"https://etherscan.io/token/{CWUSDC}" COINGECKO_PRICE = ( "https://api.coingecko.com/api/v3/simple/token_price/ethereum?" f"contract_addresses={CWUSDC}&vs_currencies=usd&include_market_cap=true&include_24hr_vol=true&include_last_updated_at=true" ) DEXSCREENER_TOKEN_PAIRS = f"https://api.dexscreener.com/token-pairs/v1/ethereum/{CWUSDC}" GECKOTERMINAL_POOLS = [ "https://api.geckoterminal.com/api/v2/networks/eth/pools/0x1cf2e685682c7f7bef508f0af15dfb5cdda01ee3", "https://api.geckoterminal.com/api/v2/networks/eth/pools/0xc28706f899266b36bc43cc072b3a921bdf2c48d9", ] def load_dotenv(path: Path) -> None: if not path.exists(): return for line in path.read_text().splitlines(): line = line.strip() if not line or line.startswith("#") or "=" not in line: continue key, value = line.split("=", 1) key = key.strip() value = value.strip().strip('"').strip("'") if key and key not in os.environ: os.environ[key] = value def fetch_text(url: str, timeout: int = 30) -> tuple[int | None, str, str]: req = urllib.request.Request( url, headers={ "User-Agent": "Mozilla/5.0 DBIS-cwusdc-value-monitor/1.0", "Accept": "application/json,text/html;q=0.9,*/*;q=0.8", }, ) try: with urllib.request.urlopen(req, timeout=timeout) as response: return response.status, response.headers.get("content-type", ""), response.read().decode("utf-8", errors="replace") except urllib.error.HTTPError as exc: body = exc.read().decode("utf-8", errors="replace") if exc.fp else "" return exc.code, exc.headers.get("content-type", "") if exc.headers else "", body except Exception as exc: # noqa: BLE001 - monitor evidence should capture transient failures return None, "", str(exc) def fetch_json(url: str, timeout: int = 30) -> tuple[int | None, str, Any, str]: status, content_type, text = fetch_text(url, timeout) try: return status, content_type, json.loads(text), "" except json.JSONDecodeError as exc: return status, content_type, None, str(exc) def fetch_text_with_retries(url: str, timeout: int, max_attempts: int) -> tuple[int | None, str, str, int]: """Retry GeckoTerminal-style rate limits (HTTP 429 / 502 / 503).""" base_delay_s = 2.0 last_status: int | None = None last_ct = "" last_text = "" for attempt in range(1, max_attempts + 1): status, content_type, text = fetch_text(url, timeout) last_status, last_ct, last_text = status, content_type, text if status not in (429, 502, 503) or attempt >= max_attempts: return status, content_type, text, attempt time.sleep(min(base_delay_s * (2 ** (attempt - 1)), 45.0)) return last_status, last_ct, last_text, max_attempts def fetch_etherscan_api(params: dict[str, str], api_key: str) -> tuple[int | None, str, Any, str]: query = {"chainid": "1", **params, "apikey": api_key} url = f"{ETHERSCAN_API}?{urllib.parse.urlencode(query)}" last: tuple[int | None, str, Any, str] = (None, "", None, "") for attempt in range(5): status, content_type, data, error = fetch_json(url) last = (status, content_type, data, error) if error: return last if not isinstance(data, dict): return last message = str(data.get("message", "")) result = data.get("result") if str(data.get("status")) != "0": time.sleep(0.25) return last if "rate limit" in message.lower() or "rate limit" in str(result).lower(): time.sleep(1.25 + attempt * 0.5) continue return last return last def extract_div_missing(html: str, element_id: str) -> bool: pattern = rf'id="{re.escape(element_id)}".*?
\s*-\s*
' return bool(re.search(pattern, html, flags=re.I | re.S)) def parse_etherscan() -> dict[str, Any]: status, content_type, html = fetch_text(ETHERSCAN_PAGE) has_profile = "Wrapped cUSDC" in html and "cWUSDC" in html total_supply_match = re.search(r'id="ContentPlaceHolder1_hdnTotalSupply" value="([^"]+)"', html) holders_match = re.search(r"]*>\s*Holders\s*\s*]*>\s*
\s*([0-9,]+)", html, re.I) market_missing = extract_div_missing(html, "ContentPlaceHolder1_tr_marketcap") circ_market_missing = extract_div_missing(html, "ContentPlaceHolder1_tr_circulatingmarketcap") value_ready = bool(status and 200 <= status < 300 and has_profile and not market_missing and not circ_market_missing) return { "id": "etherscan_token_page", "url": ETHERSCAN_PAGE, "status": status, "contentType": content_type, "profileDetected": has_profile, "holdersText": holders_match.group(1) if holders_match else None, "totalSupplyText": total_supply_match.group(1) if total_supply_match else None, "onchainMarketCapMissing": market_missing, "circulatingMarketCapMissing": circ_market_missing, "valueReady": value_ready, } def parse_etherscan_tokeninfo(api_key: str) -> dict[str, Any]: if not api_key: return { "id": "etherscan_tokeninfo_api", "url": ETHERSCAN_API, "status": None, "contentType": "", "parseError": "", "skipped": True, "skipReason": "ETHERSCAN_API_KEY is not set.", "metadataReady": False, "priceReady": False, } status, content_type, data, error = fetch_etherscan_api( {"module": "token", "action": "tokeninfo", "contractaddress": CWUSDC}, api_key, ) result = data.get("result") if isinstance(data, dict) else None entry = result[0] if isinstance(result, list) and result and isinstance(result[0], dict) else None token_price_raw = entry.get("tokenPriceUSD") if isinstance(entry, dict) else None try: token_price = float(token_price_raw) if token_price_raw not in (None, "") else 0.0 except (TypeError, ValueError): token_price = 0.0 metadata_ready = bool( isinstance(entry, dict) and entry.get("contractAddress", "").lower() == CWUSDC and entry.get("symbol") == "cWUSDC" and entry.get("tokenName") ) profile_enriched = bool( isinstance(entry, dict) and (entry.get("image") or entry.get("website") or entry.get("description") or entry.get("twitter")) ) return { "id": "etherscan_tokeninfo_api", "url": ETHERSCAN_API, "status": status, "contentType": content_type, "parseError": error, "skipped": False, "apiStatus": data.get("status") if isinstance(data, dict) else None, "apiMessage": data.get("message") if isinstance(data, dict) else None, "apiResultPreview": result, "metadataReady": metadata_ready, "profileEnriched": profile_enriched, "priceReady": token_price > 0, "tokenPriceUSD": token_price_raw, "tokenName": entry.get("tokenName") if isinstance(entry, dict) else None, "symbol": entry.get("symbol") if isinstance(entry, dict) else None, "divisor": entry.get("divisor") if isinstance(entry, dict) else None, "tokenType": entry.get("tokenType") if isinstance(entry, dict) else None, "totalSupply": entry.get("totalSupply") if isinstance(entry, dict) else None, "blueCheckmark": entry.get("blueCheckmark") if isinstance(entry, dict) else None, "image": entry.get("image") if isinstance(entry, dict) else None, "website": entry.get("website") if isinstance(entry, dict) else None, "descriptionPresent": bool(entry.get("description")) if isinstance(entry, dict) else False, } def parse_coingecko() -> dict[str, Any]: status, content_type, data, error = fetch_json(COINGECKO_PRICE) entry = None if isinstance(data, dict): entry = data.get(CWUSDC) usd = entry.get("usd") if isinstance(entry, dict) else None return { "id": "coingecko_token_price", "url": COINGECKO_PRICE, "status": status, "contentType": content_type, "parseError": error, "listedByContract": isinstance(entry, dict), "usd": usd, "marketCapUsd": entry.get("usd_market_cap") if isinstance(entry, dict) else None, "volume24hUsd": entry.get("usd_24h_vol") if isinstance(entry, dict) else None, "lastUpdatedAt": entry.get("last_updated_at") if isinstance(entry, dict) else None, "priceReady": isinstance(usd, (int, float)) and usd > 0, "jsonPreview": data, } def parse_dexscreener() -> dict[str, Any]: status, content_type, data, error = fetch_json(DEXSCREENER_TOKEN_PAIRS) pair_count = len(data) if isinstance(data, list) else 0 return { "id": "dexscreener_token_pairs", "url": DEXSCREENER_TOKEN_PAIRS, "status": status, "contentType": content_type, "parseError": error, "pairCount": pair_count, "indexed": pair_count > 0, "jsonPreview": data[:3] if isinstance(data, list) else data, } def parse_geckoterminal(timeout: int, gecko_retries: int) -> list[dict[str, Any]]: checks: list[dict[str, Any]] = [] for url in GECKOTERMINAL_POOLS: status, content_type, text, attempts = fetch_text_with_retries(url, timeout, gecko_retries) data: Any = None error = "" try: data = json.loads(text) except json.JSONDecodeError as exc: error = str(exc) attrs = ((data or {}).get("data") or {}).get("attributes") if isinstance(data, dict) else None checks.append( { "id": "geckoterminal_pool", "url": url, "status": status, "contentType": content_type, "parseError": error, "fetchAttempts": attempts, "indexed": isinstance(attrs, dict), "reserveUsd": attrs.get("reserve_in_usd") if isinstance(attrs, dict) else None, "volume24hUsd": ((attrs.get("volume_usd") or {}).get("h24") if isinstance(attrs, dict) else None), } ) return checks def build(gecko_retries: int, http_timeout: int) -> dict[str, Any]: load_dotenv(ROOT / ".env") etherscan_api_key = os.environ.get("ETHERSCAN_API_KEY", "") etherscan = parse_etherscan() etherscan_tokeninfo = parse_etherscan_tokeninfo(etherscan_api_key) coingecko = parse_coingecko() dexscreener = parse_dexscreener() gecko = parse_geckoterminal(http_timeout, gecko_retries) blockers: list[str] = [] advisory_notes: list[str] = [] if not etherscan["profileDetected"]: blockers.append("Etherscan token profile text was not detected.") if etherscan["onchainMarketCapMissing"]: blockers.append("Etherscan Onchain Market Cap is still blank.") if etherscan["circulatingMarketCapMissing"]: blockers.append("Etherscan Circulating Supply Market Cap is still blank.") tokeninfo_preview = str(etherscan_tokeninfo.get("apiResultPreview", "")) if "API Pro endpoint" in tokeninfo_preview: blockers.append("Etherscan tokeninfo API is an API Pro endpoint for the current key; tokeninfo propagation cannot be monitored with the current plan.") elif not etherscan_tokeninfo["metadataReady"]: blockers.append("Etherscan tokeninfo API does not return accepted token metadata for cWUSDC.") if "API Pro endpoint" not in tokeninfo_preview and not etherscan_tokeninfo["priceReady"]: blockers.append("Etherscan tokeninfo API does not return a positive USD token price.") if not coingecko["priceReady"]: blockers.append("CoinGecko contract price API does not return a positive USD price.") if not dexscreener["indexed"]: advisory_notes.append( "DexScreener token-pairs API has not indexed cWUSDC yet (supplementary DEX-terminal signal; not the same gate as CoinGecko for Etherscan Value)." ) return { "schema": "cwusdc-etherscan-value-propagation/v1", "generatedAt": dt.datetime.now(dt.UTC).isoformat().replace("+00:00", "Z"), "token": { "chainId": 1, "address": CWUSDC, "caip19": f"eip155:1/erc20:{CWUSDC}", "symbol": "cWUSDC", }, "summary": { "etherscanValueReady": etherscan["valueReady"], "etherscanTokenInfoMetadataReady": etherscan_tokeninfo["metadataReady"], "etherscanTokenInfoPriceReady": etherscan_tokeninfo["priceReady"], "coingeckoPriceReady": coingecko["priceReady"], "readyForEtherscanValuePropagation": etherscan["valueReady"] or coingecko["priceReady"], "blockers": blockers, "advisoryNotes": advisory_notes, }, "checks": { "etherscan": etherscan, "etherscanTokenInfo": etherscan_tokeninfo, "coingecko": coingecko, "dexscreener": dexscreener, "geckoterminal": gecko, }, } def write_md(payload: dict[str, Any], path: Path) -> None: summary = payload["summary"] checks = payload["checks"] lines = [ "# cWUSDC Etherscan Value Propagation Monitor", "", f"- Generated: `{payload['generatedAt']}`", f"- Token: `{payload['token']['address']}`", f"- CAIP-19: `{payload['token']['caip19']}`", f"- Etherscan value ready: `{summary['etherscanValueReady']}`", f"- Etherscan tokeninfo metadata ready: `{summary['etherscanTokenInfoMetadataReady']}`", f"- Etherscan tokeninfo price ready: `{summary['etherscanTokenInfoPriceReady']}`", f"- CoinGecko price ready: `{summary['coingeckoPriceReady']}`", "", "## Blockers", "", ] if summary["blockers"]: lines.extend(f"- {item}" for item in summary["blockers"]) else: lines.append("- None detected by this monitor.") advisory = summary.get("advisoryNotes") or [] if advisory: lines.extend(["", "## Advisory (non-gating)", ""]) lines.extend(f"- {item}" for item in advisory) lines.extend( [ "", "## Checks", "", "| Surface | Status | Ready / indexed | Key fields |", "|---|---:|---:|---|", f"| Etherscan | `{checks['etherscan']['status']}` | `{checks['etherscan']['valueReady']}` | marketCapMissing={checks['etherscan']['onchainMarketCapMissing']}; circulatingMarketCapMissing={checks['etherscan']['circulatingMarketCapMissing']}; holders={checks['etherscan']['holdersText']} |", f"| Etherscan tokeninfo API | `{checks['etherscanTokenInfo']['status']}` | `{checks['etherscanTokenInfo']['metadataReady']}` / price `{checks['etherscanTokenInfo']['priceReady']}` | symbol={checks['etherscanTokenInfo']['symbol']}; price={checks['etherscanTokenInfo']['tokenPriceUSD']}; image={checks['etherscanTokenInfo']['image']}; website={checks['etherscanTokenInfo']['website']} |", f"| CoinGecko contract price | `{checks['coingecko']['status']}` | `{checks['coingecko']['priceReady']}` | usd={checks['coingecko']['usd']}; marketCap={checks['coingecko']['marketCapUsd']}; lastUpdated={checks['coingecko']['lastUpdatedAt']} |", f"| DexScreener token pairs | `{checks['dexscreener']['status']}` | `{checks['dexscreener']['indexed']}` | pairCount={checks['dexscreener']['pairCount']} |", ] ) for item in checks["geckoterminal"]: attempts = item.get("fetchAttempts", 1) extra = f"; httpAttempts={attempts}" if attempts > 1 else "" lines.append( f"| GeckoTerminal pool | `{item['status']}` | `{item['indexed']}` | reserveUsd={item['reserveUsd']}; volume24hUsd={item['volume24hUsd']}{extra}; url={item['url']} |" ) path.write_text("\n".join(lines) + "\n") def main() -> int: parser = argparse.ArgumentParser(description=__doc__) parser.add_argument("--json-out", type=Path, default=REPORT_JSON) parser.add_argument("--md-out", type=Path, default=REPORT_MD) parser.add_argument( "--gecko-retries", type=int, default=int(os.environ.get("CWUSDC_GECKO_RETRIES", "5")), help="Max HTTP attempts per GeckoTerminal pool URL (429/502/503 backoff).", ) parser.add_argument( "--timeout", type=int, default=int(os.environ.get("CWUSDC_TRACKER_TIMEOUT", "30")), help="Per-request HTTP timeout seconds for GeckoTerminal probes.", ) parser.add_argument("--strict", action="store_true") args = parser.parse_args() payload = build(args.gecko_retries, args.timeout) args.json_out.parent.mkdir(parents=True, exist_ok=True) args.json_out.write_text(json.dumps(payload, indent=2) + "\n") write_md(payload, args.md_out) print(f"Wrote {args.json_out.relative_to(ROOT)}") print(f"Wrote {args.md_out.relative_to(ROOT)}") print(f"etherscanValueReady={payload['summary']['etherscanValueReady']}") if payload["summary"]["blockers"]: print("Blockers: " + "; ".join(payload["summary"]["blockers"])) adv = payload["summary"].get("advisoryNotes") or [] if adv: print("Advisory (non-gating): " + "; ".join(adv)) if args.strict and not payload["summary"]["etherscanValueReady"]: return 1 return 0 if __name__ == "__main__": raise SystemExit(main())