Files
proxmox/scripts/it-ops/lib/collect_inventory_remote.py
defiQUG 1f44a50a25 feat(it-ops): cluster live inventory + QEMU ipconfig LAN IPs
Add scripts/it-ops export pipeline (collect_inventory_remote, compute_ipam_drift)
and proxmox_guest_lan_ips parser for ipconfig* and all net* interfaces.

Reconcile ALL_VMIDS, ip-addresses.conf, and operational template with live
VMID/IP data; Order portal env vars; DBIS node matrix; inventory helpers.

Track latest reports/status/live_inventory.json and drift.json (137 guests,
no duplicate LAN IPs). Document export in AGENTS.md.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-11 10:22:59 -07:00

163 lines
4.6 KiB
Python
Executable File

#!/usr/bin/env python3
"""Run ON a Proxmox cluster node (as root). Stdout: JSON live guest inventory."""
from __future__ import annotations
import json
import os
import subprocess
import sys
from datetime import datetime, timezone
from pathlib import Path
_SCRIPT_DIR = Path(__file__).resolve().parent
def _parser_search_paths() -> list[Path]:
paths: list[Path] = []
for entry in os.environ.get("PYTHONPATH", "").split(":"):
if entry:
paths.append(Path(entry))
paths.append(_SCRIPT_DIR)
for parent in _SCRIPT_DIR.parents:
paths.append(parent / "lib")
paths.append(parent / "scripts" / "lib")
return paths
for _path in _parser_search_paths():
if (_path / "proxmox_guest_lan_ips.py").is_file():
sys.path.insert(0, str(_path))
break
from proxmox_guest_lan_ips import ( # noqa: E402
parse_guest_network_from_conf_text,
parse_guest_network_from_config,
)
def _run(cmd: list[str]) -> str:
return subprocess.check_output(cmd, text=True, stderr=subprocess.DEVNULL)
def _read_config(path: str) -> str:
try:
with open(path, encoding="utf-8", errors="replace") as f:
return f.read()
except OSError:
return ""
def _guest_network(
guest_type: str, node: str, vmid_s: str, body: str
) -> tuple[str, str, tuple[str, ...]]:
if body.strip():
net = parse_guest_network_from_conf_text(body)
else:
net = parse_guest_network_from_config({})
if not net.ips:
try:
cfg_raw = _run(
[
"pvesh",
"get",
f"/nodes/{node}/{guest_type}/{vmid_s}/config",
"--output-format",
"json",
]
)
net = parse_guest_network_from_config(json.loads(cfg_raw))
except (subprocess.CalledProcessError, json.JSONDecodeError, OSError):
pass
return net.primary_ip, (net.macs[0] if net.macs else ""), net.ips
def main() -> None:
collected_at = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
try:
raw = _run(
["pvesh", "get", "/cluster/resources", "--output-format", "json"]
)
resources = json.loads(raw)
except (subprocess.CalledProcessError, json.JSONDecodeError) as e:
json.dump(
{
"collected_at": collected_at,
"error": f"pvesh_cluster_resources_failed: {e}",
"guests": [],
},
sys.stdout,
indent=2,
)
return
guests: list[dict] = []
for r in resources:
t = r.get("type")
if t not in ("lxc", "qemu"):
continue
vmid = r.get("vmid")
node = r.get("node")
if vmid is None or not node:
continue
vmid_s = str(vmid)
name = r.get("name") or ""
status = r.get("status") or ""
if t == "lxc":
cfg_path = f"/etc/pve/nodes/{node}/lxc/{vmid_s}.conf"
else:
cfg_path = f"/etc/pve/nodes/{node}/qemu-server/{vmid_s}.conf"
body = _read_config(cfg_path)
ip, mac, ips = _guest_network(t, str(node), vmid_s, body)
guest: dict = {
"vmid": vmid_s,
"type": t,
"node": str(node),
"name": name,
"status": status,
"ip": ip,
"mac": mac,
"config_path": cfg_path,
}
if len(ips) > 1:
guest["ips"] = list(ips)
guests.append(guest)
out: dict = {
"collected_at": collected_at,
"source": "proxmox_cluster_pvesh_plus_config",
"guests": sorted(guests, key=lambda g: int(g["vmid"])),
}
if os.environ.get("IT_COLLECT_IP_NEIGH", "").strip().lower() in (
"1",
"yes",
"true",
):
neigh_lines: list[str] = []
try:
raw_neigh = subprocess.check_output(
["ip", "-4", "neigh", "show", "dev", "vmbr0"],
text=True,
stderr=subprocess.DEVNULL,
timeout=30,
)
neigh_lines = [
ln.strip() for ln in raw_neigh.splitlines() if ln.strip()
][:500]
except (subprocess.CalledProcessError, subprocess.TimeoutExpired, FileNotFoundError):
neigh_lines = []
out["ip_neigh_vmbr0_sample"] = {
"collected_at": collected_at,
"line_count": len(neigh_lines),
"lines": neigh_lines,
}
json.dump(out, sys.stdout, indent=2)
if __name__ == "__main__":
main()