#!/usr/bin/env python3 """Run ON a Proxmox cluster node (as root). Stdout: JSON live guest inventory.""" from __future__ import annotations import json import os import subprocess import sys from datetime import datetime, timezone from pathlib import Path _SCRIPT_DIR = Path(__file__).resolve().parent def _parser_search_paths() -> list[Path]: paths: list[Path] = [] for entry in os.environ.get("PYTHONPATH", "").split(":"): if entry: paths.append(Path(entry)) paths.append(_SCRIPT_DIR) for parent in _SCRIPT_DIR.parents: paths.append(parent / "lib") paths.append(parent / "scripts" / "lib") return paths for _path in _parser_search_paths(): if (_path / "proxmox_guest_lan_ips.py").is_file(): sys.path.insert(0, str(_path)) break from proxmox_guest_lan_ips import ( # noqa: E402 parse_guest_network_from_conf_text, parse_guest_network_from_config, ) def _run(cmd: list[str]) -> str: return subprocess.check_output(cmd, text=True, stderr=subprocess.DEVNULL) def _read_config(path: str) -> str: try: with open(path, encoding="utf-8", errors="replace") as f: return f.read() except OSError: return "" def _guest_network( guest_type: str, node: str, vmid_s: str, body: str ) -> tuple[str, str, tuple[str, ...]]: if body.strip(): net = parse_guest_network_from_conf_text(body) else: net = parse_guest_network_from_config({}) if not net.ips: try: cfg_raw = _run( [ "pvesh", "get", f"/nodes/{node}/{guest_type}/{vmid_s}/config", "--output-format", "json", ] ) net = parse_guest_network_from_config(json.loads(cfg_raw)) except (subprocess.CalledProcessError, json.JSONDecodeError, OSError): pass return net.primary_ip, (net.macs[0] if net.macs else ""), net.ips def main() -> None: collected_at = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") try: raw = _run( ["pvesh", "get", "/cluster/resources", "--output-format", "json"] ) resources = json.loads(raw) except (subprocess.CalledProcessError, json.JSONDecodeError) as e: json.dump( { "collected_at": collected_at, "error": f"pvesh_cluster_resources_failed: {e}", "guests": [], }, sys.stdout, indent=2, ) return guests: list[dict] = [] for r in resources: t = r.get("type") if t not in ("lxc", "qemu"): continue vmid = r.get("vmid") node = r.get("node") if vmid is None or not node: continue vmid_s = str(vmid) name = r.get("name") or "" status = r.get("status") or "" if t == "lxc": cfg_path = f"/etc/pve/nodes/{node}/lxc/{vmid_s}.conf" else: cfg_path = f"/etc/pve/nodes/{node}/qemu-server/{vmid_s}.conf" body = _read_config(cfg_path) ip, mac, ips = _guest_network(t, str(node), vmid_s, body) guest: dict = { "vmid": vmid_s, "type": t, "node": str(node), "name": name, "status": status, "ip": ip, "mac": mac, "config_path": cfg_path, } if len(ips) > 1: guest["ips"] = list(ips) guests.append(guest) out: dict = { "collected_at": collected_at, "source": "proxmox_cluster_pvesh_plus_config", "guests": sorted(guests, key=lambda g: int(g["vmid"])), } if os.environ.get("IT_COLLECT_IP_NEIGH", "").strip().lower() in ( "1", "yes", "true", ): neigh_lines: list[str] = [] try: raw_neigh = subprocess.check_output( ["ip", "-4", "neigh", "show", "dev", "vmbr0"], text=True, stderr=subprocess.DEVNULL, timeout=30, ) neigh_lines = [ ln.strip() for ln in raw_neigh.splitlines() if ln.strip() ][:500] except (subprocess.CalledProcessError, subprocess.TimeoutExpired, FileNotFoundError): neigh_lines = [] out["ip_neigh_vmbr0_sample"] = { "collected_at": collected_at, "line_count": len(neigh_lines), "lines": neigh_lines, } json.dump(out, sys.stdout, indent=2) if __name__ == "__main__": main()