Add scripts/it-ops export pipeline (collect_inventory_remote, compute_ipam_drift) and proxmox_guest_lan_ips parser for ipconfig* and all net* interfaces. Reconcile ALL_VMIDS, ip-addresses.conf, and operational template with live VMID/IP data; Order portal env vars; DBIS node matrix; inventory helpers. Track latest reports/status/live_inventory.json and drift.json (137 guests, no duplicate LAN IPs). Document export in AGENTS.md. Co-authored-by: Cursor <cursoragent@cursor.com>
163 lines
4.6 KiB
Python
Executable File
163 lines
4.6 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""Run ON a Proxmox cluster node (as root). Stdout: JSON live guest inventory."""
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import os
|
|
import subprocess
|
|
import sys
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
|
|
_SCRIPT_DIR = Path(__file__).resolve().parent
|
|
|
|
|
|
def _parser_search_paths() -> list[Path]:
|
|
paths: list[Path] = []
|
|
for entry in os.environ.get("PYTHONPATH", "").split(":"):
|
|
if entry:
|
|
paths.append(Path(entry))
|
|
paths.append(_SCRIPT_DIR)
|
|
for parent in _SCRIPT_DIR.parents:
|
|
paths.append(parent / "lib")
|
|
paths.append(parent / "scripts" / "lib")
|
|
return paths
|
|
|
|
|
|
for _path in _parser_search_paths():
|
|
if (_path / "proxmox_guest_lan_ips.py").is_file():
|
|
sys.path.insert(0, str(_path))
|
|
break
|
|
|
|
from proxmox_guest_lan_ips import ( # noqa: E402
|
|
parse_guest_network_from_conf_text,
|
|
parse_guest_network_from_config,
|
|
)
|
|
|
|
|
|
def _run(cmd: list[str]) -> str:
|
|
return subprocess.check_output(cmd, text=True, stderr=subprocess.DEVNULL)
|
|
|
|
|
|
def _read_config(path: str) -> str:
|
|
try:
|
|
with open(path, encoding="utf-8", errors="replace") as f:
|
|
return f.read()
|
|
except OSError:
|
|
return ""
|
|
|
|
|
|
def _guest_network(
|
|
guest_type: str, node: str, vmid_s: str, body: str
|
|
) -> tuple[str, str, tuple[str, ...]]:
|
|
if body.strip():
|
|
net = parse_guest_network_from_conf_text(body)
|
|
else:
|
|
net = parse_guest_network_from_config({})
|
|
if not net.ips:
|
|
try:
|
|
cfg_raw = _run(
|
|
[
|
|
"pvesh",
|
|
"get",
|
|
f"/nodes/{node}/{guest_type}/{vmid_s}/config",
|
|
"--output-format",
|
|
"json",
|
|
]
|
|
)
|
|
net = parse_guest_network_from_config(json.loads(cfg_raw))
|
|
except (subprocess.CalledProcessError, json.JSONDecodeError, OSError):
|
|
pass
|
|
return net.primary_ip, (net.macs[0] if net.macs else ""), net.ips
|
|
|
|
|
|
def main() -> None:
|
|
collected_at = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
|
|
try:
|
|
raw = _run(
|
|
["pvesh", "get", "/cluster/resources", "--output-format", "json"]
|
|
)
|
|
resources = json.loads(raw)
|
|
except (subprocess.CalledProcessError, json.JSONDecodeError) as e:
|
|
json.dump(
|
|
{
|
|
"collected_at": collected_at,
|
|
"error": f"pvesh_cluster_resources_failed: {e}",
|
|
"guests": [],
|
|
},
|
|
sys.stdout,
|
|
indent=2,
|
|
)
|
|
return
|
|
|
|
guests: list[dict] = []
|
|
for r in resources:
|
|
t = r.get("type")
|
|
if t not in ("lxc", "qemu"):
|
|
continue
|
|
vmid = r.get("vmid")
|
|
node = r.get("node")
|
|
if vmid is None or not node:
|
|
continue
|
|
vmid_s = str(vmid)
|
|
name = r.get("name") or ""
|
|
status = r.get("status") or ""
|
|
|
|
if t == "lxc":
|
|
cfg_path = f"/etc/pve/nodes/{node}/lxc/{vmid_s}.conf"
|
|
else:
|
|
cfg_path = f"/etc/pve/nodes/{node}/qemu-server/{vmid_s}.conf"
|
|
|
|
body = _read_config(cfg_path)
|
|
ip, mac, ips = _guest_network(t, str(node), vmid_s, body)
|
|
|
|
guest: dict = {
|
|
"vmid": vmid_s,
|
|
"type": t,
|
|
"node": str(node),
|
|
"name": name,
|
|
"status": status,
|
|
"ip": ip,
|
|
"mac": mac,
|
|
"config_path": cfg_path,
|
|
}
|
|
if len(ips) > 1:
|
|
guest["ips"] = list(ips)
|
|
guests.append(guest)
|
|
|
|
out: dict = {
|
|
"collected_at": collected_at,
|
|
"source": "proxmox_cluster_pvesh_plus_config",
|
|
"guests": sorted(guests, key=lambda g: int(g["vmid"])),
|
|
}
|
|
|
|
if os.environ.get("IT_COLLECT_IP_NEIGH", "").strip().lower() in (
|
|
"1",
|
|
"yes",
|
|
"true",
|
|
):
|
|
neigh_lines: list[str] = []
|
|
try:
|
|
raw_neigh = subprocess.check_output(
|
|
["ip", "-4", "neigh", "show", "dev", "vmbr0"],
|
|
text=True,
|
|
stderr=subprocess.DEVNULL,
|
|
timeout=30,
|
|
)
|
|
neigh_lines = [
|
|
ln.strip() for ln in raw_neigh.splitlines() if ln.strip()
|
|
][:500]
|
|
except (subprocess.CalledProcessError, subprocess.TimeoutExpired, FileNotFoundError):
|
|
neigh_lines = []
|
|
out["ip_neigh_vmbr0_sample"] = {
|
|
"collected_at": collected_at,
|
|
"line_count": len(neigh_lines),
|
|
"lines": neigh_lines,
|
|
}
|
|
|
|
json.dump(out, sys.stdout, indent=2)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|