Files
proxmox/scripts/deployment/check-deployer-lp-balances.py
defiQUG dd02f4b59b
All checks were successful
Deploy to Phoenix / validate (push) Successful in 1m11s
Deploy to Phoenix / deploy (push) Successful in 43s
Deploy to Phoenix / deploy-atomic-swap-dapp (push) Successful in 1m32s
phoenix-deploy Deployed to cloudflare-sync
Deploy to Phoenix / cloudflare (push) Successful in 38s
Enhance .env configuration with Infura support and add new RPC endpoints for various networks. Update package.json with new deployment scripts for Engine X. Improve public LP compliance documentation in runbooks and scripts, including guidance for public pair repairs and funding strategies.
2026-05-07 18:19:37 -07:00

574 lines
20 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Enumerate PMM pool addresses (deployment-status.json) and Uniswap V2 pair addresses
(pair-discovery JSON), then report deployer balances with **LP token resolution**.
**Uniswap V2:** The pair contract *is* the LP ERC-20 (`lpToken` = pair).
**DODO PMM (DVM / IDODOPMMPool):** Official DODO Vending Machine pools inherit ERC-20;
`balanceOf(pool)` is the LP share balance — the pool address **is** the LP token.
**DODO V1-style PMM:** Some pools expose ``_BASE_CAPITAL_TOKEN_`` / ``_QUOTE_CAPITAL_TOKEN_``;
LP exposure may be split across two capital ERC-20s (balances reported separately).
When ``balanceOf(pool)`` fails (RPC flake, proxy, or non-DVM), this script optionally
re-probes with DODO view calls and alternate public RPCs (see ``--resolve-dodo``).
Deployer: ``--deployer`` / ``DEPLOYER_ADDRESS`` / ``PRIVATE_KEY`` (see below).
Usage:
python3 scripts/deployment/check-deployer-lp-balances.py --summary-only
python3 scripts/deployment/check-deployer-lp-balances.py --resolve-dodo --json-out /tmp/lp.json
Requires: ``cast`` (Foundry). Environment: ``DEPLOYER_ADDRESS``, ``PRIVATE_KEY``, etc.
"""
from __future__ import annotations
import argparse
import json
import os
import re
import subprocess
import sys
from pathlib import Path
ROOT = Path(__file__).resolve().parents[2]
DEFAULT_STATUS = ROOT / "cross-chain-pmm-lps" / "config" / "deployment-status.json"
DEFAULT_DISCOVERY = ROOT / "reports" / "extraction" / "promod-uniswap-v2-live-pair-discovery-latest.json"
DEFAULT_ENV = ROOT / "smom-dbis-138" / ".env"
ZERO = "0x0000000000000000000000000000000000000000"
# Defaults when .env has no RPC for a chain (prefer setting INFURA_PROJECT_ID + load-project-env, or per-chain *_RPC in .env).
DEFAULT_RPC: dict[str, str] = {
"1": "https://ethereum.publicnode.com",
"10": "https://mainnet.optimism.io",
"25": "https://evm.cronos.org",
"56": "https://bsc-dataseed.binance.org",
"100": "https://rpc.gnosischain.com",
"137": "https://polygon-rpc.com",
"8453": "https://mainnet.base.org",
"42161": "https://arb1.arbitrum.io/rpc",
"42220": "https://forno.celo.org",
"43114": "https://api.avax.network/ext/bc/C/rpc",
"1111": "https://api.wemix.com",
}
# Extra public RPCs (retry when primary fails — connection resets, rate limits).
RPC_FALLBACKS: dict[str, list[str]] = {
"1": [
"https://eth.llamarpc.com",
"https://1rpc.io/eth",
],
"137": ["https://polygon-bor.publicnode.com", "https://1rpc.io/matic"],
"42161": ["https://arbitrum.llamarpc.com"],
"56": ["https://bsc.publicnode.com"],
"8453": ["https://base.llamarpc.com"],
"10": ["https://optimism.publicnode.com"],
}
RPC_KEYS: dict[str, list[str]] = {
"1": ["ETHEREUM_MAINNET_RPC", "ETH_MAINNET_RPC_URL"],
"10": ["OPTIMISM_RPC_URL", "OPTIMISM_MAINNET_RPC"],
"25": ["CRONOS_RPC_URL", "CRONOS_MAINNET_RPC"],
"56": ["BSC_RPC_URL", "BSC_MAINNET_RPC"],
"100": ["GNOSIS_RPC_URL", "GNOSIS_MAINNET_RPC", "GNOSIS_RPC"],
"137": ["POLYGON_MAINNET_RPC", "POLYGON_RPC_URL"],
"138": ["RPC_URL_138", "CHAIN_138_RPC_URL"],
"8453": ["BASE_RPC_URL", "BASE_MAINNET_RPC"],
"42161": ["ARBITRUM_RPC_URL", "ARBITRUM_MAINNET_RPC"],
"42220": ["CELO_RPC_URL", "CELO_MAINNET_RPC", "CELO_RPC"],
"43114": ["AVALANCHE_RPC_URL", "AVALANCHE_MAINNET_RPC"],
"1111": ["WEMIX_RPC_URL", "WEMIX_RPC"],
"651940": ["ALL_MAINNET_RPC", "CHAIN_651940_RPC_URL"],
}
def _rpc_env_key_set() -> set[str]:
return {k for ks in RPC_KEYS.values() for k in ks} | {
"INFURA_PROJECT_ID",
"INFURA_API_KEY",
"RPC_URL_MAINNET",
}
def merge_rpc_os_into(env: dict[str, str]) -> None:
"""Process env overrides .env for RPC-related keys (so `source load-project-env.sh` applies)."""
for k in _rpc_env_key_set():
v = os.environ.get(k, "").strip()
if v:
env[k] = v
def apply_eth_mainnet_rpc_alias(env: dict[str, str]) -> None:
"""Match load-project-env.sh: dotenv often sets ETH_MAINNET_RPC_URL only."""
if not resolve(env, "ETHEREUM_MAINNET_RPC"):
alt = resolve(env, "ETH_MAINNET_RPC_URL", "")
if alt:
env["ETHEREUM_MAINNET_RPC"] = alt
def apply_infura_rpc_defaults(env: dict[str, str]) -> None:
"""When INFURA_PROJECT_ID or INFURA_API_KEY is set, fill first unset RPC var per supported chain."""
pid = (env.get("INFURA_PROJECT_ID") or env.get("INFURA_API_KEY") or "").strip()
if not pid:
return
def chain_has_rpc(cid: str) -> bool:
return any(resolve(env, k) for k in RPC_KEYS.get(cid, []))
infura_by_chain: dict[str, str] = {
"1": f"https://mainnet.infura.io/v3/{pid}",
"10": f"https://optimism-mainnet.infura.io/v3/{pid}",
"56": f"https://bnb-mainnet.infura.io/v3/{pid}",
"100": f"https://gnosis-mainnet.infura.io/v3/{pid}",
"137": f"https://polygon-mainnet.infura.io/v3/{pid}",
"8453": f"https://base-mainnet.infura.io/v3/{pid}",
"42161": f"https://arbitrum-mainnet.infura.io/v3/{pid}",
"42220": f"https://celo-mainnet.infura.io/v3/{pid}",
"43114": f"https://avalanche-mainnet.infura.io/v3/{pid}",
}
for cid, url in infura_by_chain.items():
if chain_has_rpc(cid):
continue
keys = RPC_KEYS.get(cid, [])
if keys:
env[keys[0]] = url
def load_dotenv(path: Path) -> dict[str, str]:
out: dict[str, str] = {}
if not path.is_file():
return out
for raw in path.read_text().splitlines():
line = raw.strip()
if not line or line.startswith("#") or "=" not in line:
continue
k, v = line.split("=", 1)
out[k.strip()] = v.strip().strip('"').strip("'")
return out
def resolve(env: dict[str, str], key: str, default: str = "") -> str:
v = env.get(key, "")
if v.startswith("${") and ":-" in v:
inner = v[2:-1]
alt = inner.split(":-", 1)
return env.get(alt[0], alt[1] if len(alt) > 1 else "")
return v or default
def rpc_for(env: dict[str, str], cid: str) -> str:
for k in RPC_KEYS.get(cid, []):
v = resolve(env, k, "")
if v and not v.startswith("$"):
return v
return DEFAULT_RPC.get(cid, "") or (
resolve(env, "RPC_URL_138", "http://127.0.0.1:8545") if cid == "138" else ""
)
def rpc_chain_list(env: dict[str, str], cid: str) -> list[str]:
primary = rpc_for(env, cid)
seen: set[str] = set()
out: list[str] = []
for u in [primary] + RPC_FALLBACKS.get(cid, []):
if u and u not in seen:
seen.add(u)
out.append(u)
return out
def deployer_address(env: dict[str, str], override: str | None) -> str:
if override:
return override
for k in ("DEPLOYER_ADDRESS", "DEPLOYER"):
v = (os.environ.get(k) or "").strip()
if v:
return v
for key in ("PRIVATE_KEY", "DEPLOYER_PRIVATE_KEY"):
pk = (os.environ.get(key) or env.get(key) or "").strip()
if not pk or "${" in pk:
continue
r = subprocess.run(
["cast", "wallet", "address", pk],
capture_output=True,
text=True,
check=False,
)
if r.returncode == 0 and r.stdout.strip():
return r.stdout.strip()
return (env.get("DEPLOYER_ADDRESS") or env.get("DEPLOYER") or "").strip()
def parse_uint(s: str) -> int:
return int(s.strip().split()[0])
def parse_address_line(s: str) -> str | None:
s = s.strip()
if not s:
return None
m = re.search(r"(0x[a-fA-F0-9]{40})", s)
return m.group(1) if m else None
def cast_call(to: str, sig: str, rpc: str) -> tuple[str, str]:
cmd = ["cast", "call", to, sig, "--rpc-url", rpc]
r = subprocess.run(cmd, capture_output=True, text=True)
if r.returncode != 0:
return "err", (r.stderr or r.stdout or "").strip()[:400]
return "ok", r.stdout.strip()
def erc20_balance(token: str, holder: str, rpc: str) -> tuple[str, int | str]:
cmd = ["cast", "call", token, "balanceOf(address)(uint256)", holder, "--rpc-url", rpc]
r = subprocess.run(cmd, capture_output=True, text=True)
if r.returncode != 0:
return "err", (r.stderr or r.stdout or "").strip()[:400]
try:
return "ok", parse_uint(r.stdout)
except (ValueError, IndexError):
return "err", f"parse:{r.stdout[:120]}"
def erc20_balance_any_rpc(
token: str, holder: str, rpcs: list[str]
) -> tuple[str, int | str, str]:
"""Returns (status, value|err, rpc_used)."""
last_err = ""
for rpc in rpcs:
st, val = erc20_balance(token, holder, rpc)
if st == "ok":
return st, val, rpc
last_err = str(val)
return "err", last_err, rpcs[0] if rpcs else ""
def resolve_pmm_row(
pool: str,
deployer: str,
rpcs: list[str],
do_resolve_dodo: bool,
) -> dict:
"""
Build a result dict with lp resolution fields.
Tries: pool ERC20 balance (any RPC) -> DODO _BASE/_QUOTE -> capital tokens.
"""
rec: dict = {
"contract": pool,
"lpTokenAddress": pool,
"lpResolution": "unknown",
"dodoBaseToken": None,
"dodoQuoteToken": None,
"lpBalances": [],
"balanceRaw": 0,
"status": "pending",
"error": None,
"rpcUsed": rpcs[0] if rpcs else None,
}
st, val, used = erc20_balance_any_rpc(pool, deployer, rpcs)
rec["rpcUsed"] = used
if st == "ok":
rec["status"] = "ok"
rec["lpResolution"] = "dvm_or_erc20_pool"
rec["balanceRaw"] = int(val)
rec["lpBalances"] = [
{
"role": "lp_erc20",
"token": pool,
"raw": int(val),
"note": "balanceOf(pool): DVM LP shares are usually the pool contract itself",
}
]
return rec
rec["error"] = str(val)
if not do_resolve_dodo:
rec["status"] = "erc20_error"
rec["lpResolution"] = "unresolved_pass_resolve_dodo"
return rec
base_tok: str | None = None
quote_tok: str | None = None
for rpc in rpcs:
stb, outb = cast_call(pool, "_BASE_TOKEN_()(address)", rpc)
if stb == "ok":
base_tok = parse_address_line(outb)
rec["rpcUsed"] = rpc
break
if base_tok:
rec["dodoBaseToken"] = base_tok
for rpc in rpcs:
stq, outq = cast_call(pool, "_QUOTE_TOKEN_()(address)", rpc)
if stq == "ok":
quote_tok = parse_address_line(outq)
rec["dodoQuoteToken"] = quote_tok
break
# Retry pool balanceOf after confirming DVM interface (fresh RPC may fix flake)
st2, val2, used2 = erc20_balance_any_rpc(pool, deployer, rpcs)
if st2 == "ok":
rec["status"] = "ok"
rec["lpResolution"] = "dvm_erc20_pool_after_probe"
rec["balanceRaw"] = int(val2)
rec["rpcUsed"] = used2
rec["error"] = None
rec["lpBalances"] = [
{
"role": "lp_erc20",
"token": pool,
"raw": int(val2),
"note": "balanceOf(pool) succeeded after _BASE_TOKEN_ probe + RPC retry",
}
]
return rec
capital_balances: list[dict] = []
for cap_sig, role in (
("_BASE_CAPITAL_TOKEN_()(address)", "base_capital"),
("_QUOTE_CAPITAL_TOKEN_()(address)", "quote_capital"),
):
tok_a: str | None = None
for rpc in rpcs:
stc, outc = cast_call(pool, cap_sig, rpc)
if stc == "ok":
tok_a = parse_address_line(outc)
if tok_a and tok_a != ZERO:
bst, bval, bused = erc20_balance_any_rpc(tok_a, deployer, rpcs)
if bst == "ok":
capital_balances.append(
{
"role": role,
"token": tok_a,
"raw": int(bval),
"note": f"DODO V1-style {cap_sig.split('(')[0]} balanceOf",
}
)
break
if capital_balances:
rec["status"] = "ok"
rec["lpResolution"] = "v1_capital_tokens"
rec["lpTokenAddress"] = pool
rec["lpBalances"] = capital_balances
rec["balanceRaw"] = sum(x["raw"] for x in capital_balances)
rec["error"] = None
return rec
if base_tok:
rec["lpResolution"] = "dvm_interface_no_balance"
rec["status"] = "erc20_error"
rec["error"] = (
f"Pool responds as DVM (_BASE_TOKEN_={base_tok}) but balanceOf(pool) failed: {rec.get('error', '')[:200]}"
)
else:
rec["status"] = "erc20_error"
rec["lpResolution"] = "unresolved"
return rec
def collect_entries(status_path: Path, discovery_path: Path) -> list[tuple]:
status = json.loads(status_path.read_text())
rows: list[tuple] = []
for cid, ch in (status.get("chains") or {}).items():
name = ch.get("name", cid)
for pool in (ch.get("pmmPools") or []) + (ch.get("pmmPoolsVolatile") or []) + (ch.get("gasPmmPools") or []):
addr = pool.get("poolAddress") or ""
if not addr or addr == ZERO:
continue
label = f"{pool.get('base')}/{pool.get('quote')}"
rows.append((cid, name, "PMM", label, addr))
if discovery_path.is_file():
disc = json.loads(discovery_path.read_text())
for ent in disc.get("entries") or []:
cid = str(ent.get("chain_id"))
name = ent.get("network", cid)
for pr in ent.get("pairsChecked") or []:
addr = pr.get("poolAddress") or ""
if not addr or addr == ZERO:
continue
label = f"{pr.get('base')}/{pr.get('quote')}"
rows.append((cid, name, "UniV2", label, addr))
seen: set[tuple[str, str]] = set()
out: list[tuple] = []
for row in rows:
k = (row[0], row[4].lower())
if k in seen:
continue
seen.add(k)
out.append(row)
return out
def main() -> int:
ap = argparse.ArgumentParser(
description="Deployer LP balances with DODO / Uni V2 LP token resolution."
)
ap.add_argument("--status", type=Path, default=DEFAULT_STATUS)
ap.add_argument("--discovery", type=Path, default=DEFAULT_DISCOVERY)
ap.add_argument("--env", type=Path, default=DEFAULT_ENV)
ap.add_argument("--deployer", default=None)
ap.add_argument("--summary-only", action="store_true")
ap.add_argument("--only-nonzero", action="store_true")
ap.add_argument(
"--no-resolve-dodo",
action="store_true",
help="Skip DODO _BASE_TOKEN_ / capital-token probes and extra RPC fallbacks (faster; more erc20_error).",
)
ap.add_argument(
"--chain-id",
type=int,
default=None,
metavar="N",
help="Only check this chain (e.g. 1 for Ethereum). Default: all chains.",
)
ap.add_argument(
"--json-out",
type=Path,
default=None,
help="Full report JSON.",
)
ap.add_argument(
"--errors-json",
type=Path,
default=None,
help="Rows that remain erc20_error or no_rpc.",
)
args = ap.parse_args()
env = load_dotenv(args.env)
merge_rpc_os_into(env)
apply_eth_mainnet_rpc_alias(env)
apply_infura_rpc_defaults(env)
dep = deployer_address(env, args.deployer)
if not dep:
print("No deployer: set PRIVATE_KEY or DEPLOYER_ADDRESS in .env or pass --deployer", file=sys.stderr)
return 1
rows = collect_entries(args.status, args.discovery)
if args.chain_id is not None:
want = str(args.chain_id)
rows = [r for r in rows if r[0] == want]
results: list[dict] = []
nonzero: list[dict] = []
errors: list[dict] = []
for cid, name, venue, label, addr in sorted(rows, key=lambda x: (int(x[0]), x[2], x[3])):
rpcs = rpc_chain_list(env, cid)
base_rec: dict = {
"chainId": cid,
"network": name,
"venue": venue,
"pair": label,
"contract": addr,
}
if not rpcs or not rpcs[0]:
base_rec["status"] = "no_rpc"
base_rec["lpResolution"] = "no_rpc"
errors.append(base_rec)
results.append(base_rec)
continue
if venue == "UniV2":
st, val, used = erc20_balance_any_rpc(addr, dep, rpcs)
r = {
**base_rec,
"lpTokenAddress": addr,
"lpResolution": "uniswap_v2_pair",
"rpcUsed": used,
"lpBalances": [
{
"role": "pair_lp",
"token": addr,
"raw": int(val) if st == "ok" else 0,
"note": "Uniswap V2 pair contract is the LP ERC-20",
}
],
"balanceRaw": int(val) if st == "ok" else 0,
"status": "ok" if st == "ok" else "erc20_error",
"error": None if st == "ok" else str(val),
"dodoBaseToken": None,
"dodoQuoteToken": None,
}
if st != "ok":
r["status"] = "erc20_error"
errors.append(r)
elif r["balanceRaw"] > 0:
nonzero.append(r)
results.append(r)
continue
# PMM
r = {**base_rec, **resolve_pmm_row(addr, dep, rpcs, not args.no_resolve_dodo)}
if r.get("status") == "ok" and r.get("balanceRaw", 0) > 0:
nonzero.append(r)
if r.get("status") != "ok":
errors.append(r)
results.append(r)
# Summary stats
by_res: dict[str, int] = {}
for r in results:
lr = r.get("lpResolution") or "unknown"
by_res[lr] = by_res.get(lr, 0) + 1
print(f"Deployer: {dep}")
print(f"Contracts checked: {len(rows)}")
print(f"Non-zero LP exposure (sum of components): {len(nonzero)}")
print(f"Errors / no RPC: {len(errors)}")
print(f"Resolution breakdown: {by_res}")
if args.no_resolve_dodo:
print("(Re-run without --no-resolve-dodo to probe DODO interfaces + RPC fallbacks.)")
if not args.summary_only:
print("\n=== Non-zero LP / capital balances ===")
for r in nonzero:
lp = r.get("lpTokenAddress", r.get("contract"))
print(
f" chain {r['chainId']} {r['venue']} {r['pair']} | lpToken={lp} | "
f"resolution={r.get('lpResolution')} | raw_total={r.get('balanceRaw')}"
)
for leg in r.get("lpBalances") or []:
if leg.get("raw", 0) > 0:
print(f" - {leg.get('role')} {leg.get('token')}: {leg.get('raw')} ({leg.get('note', '')[:60]})")
if not args.only_nonzero and errors:
print("\nSample unresolved / errors:")
for r in errors[:12]:
e = r.get("error", r.get("status", ""))
print(
f" chain {r['chainId']} {r['venue']} {r['pair']} | "
f"{r.get('lpResolution', '')}: {str(e)[:120]}"
)
if args.json_out:
payload = {
"deployer": dep,
"resolveDodo": not args.no_resolve_dodo,
"summary": {
"checked": len(rows),
"nonzero": len(nonzero),
"errors": len(errors),
"byLpResolution": by_res,
},
"nonzero": nonzero,
"all": results,
}
args.json_out.parent.mkdir(parents=True, exist_ok=True)
args.json_out.write_text(json.dumps(payload, indent=2) + "\n")
print(f"Wrote {args.json_out}")
if args.errors_json:
err_only = [r for r in results if r.get("status") in ("no_rpc", "erc20_error")]
args.errors_json.parent.mkdir(parents=True, exist_ok=True)
args.errors_json.write_text(json.dumps(err_only, indent=2) + "\n")
print(f"Wrote {args.errors_json} ({len(err_only)} rows)")
return 0
if __name__ == "__main__":
sys.exit(main())