Add scripts/it-ops export pipeline (collect_inventory_remote, compute_ipam_drift) and proxmox_guest_lan_ips parser for ipconfig* and all net* interfaces. Reconcile ALL_VMIDS, ip-addresses.conf, and operational template with live VMID/IP data; Order portal env vars; DBIS node matrix; inventory helpers. Track latest reports/status/live_inventory.json and drift.json (137 guests, no duplicate LAN IPs). Document export in AGENTS.md. Co-authored-by: Cursor <cursoragent@cursor.com>
114 lines
3.1 KiB
Python
114 lines
3.1 KiB
Python
"""Parse static LAN IPv4/MAC from Proxmox LXC/QEMU guest config (net* / ipconfig*)."""
|
|
from __future__ import annotations
|
|
|
|
import re
|
|
from dataclasses import dataclass
|
|
|
|
_IP_VALUE_RE = re.compile(
|
|
r"(?:^|[,=])ip=([0-9]{1,3}(?:\.[0-9]{1,3}){3})(?:/|,|$)",
|
|
re.IGNORECASE,
|
|
)
|
|
_HWADDR_RE = re.compile(r"hwaddr=([0-9A-Fa-f:]+)", re.IGNORECASE)
|
|
_VIRTIO_MAC_RE = re.compile(
|
|
r"(?:^|[,=])virtio=([0-9A-Fa-f:]+)(?:,|$)",
|
|
re.IGNORECASE,
|
|
)
|
|
_NIC_INDEX_RE = re.compile(r"^(net|ipconfig)(\d+)$", re.IGNORECASE)
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class GuestLanNetwork:
|
|
ips: tuple[str, ...]
|
|
macs: tuple[str, ...]
|
|
primary_lan11: str | None
|
|
|
|
@property
|
|
def primary_ip(self) -> str:
|
|
if self.primary_lan11:
|
|
return self.primary_lan11
|
|
return self.ips[0] if self.ips else ""
|
|
|
|
|
|
def _nic_index(key: str) -> tuple[str, int] | None:
|
|
m = _NIC_INDEX_RE.match(key)
|
|
if not m:
|
|
return None
|
|
return m.group(1).lower(), int(m.group(2))
|
|
|
|
|
|
def _extract_ipv4(value: str) -> str | None:
|
|
m = _IP_VALUE_RE.search(value)
|
|
if not m:
|
|
return None
|
|
ip = m.group(1)
|
|
if ip in ("dhcp", "auto"):
|
|
return None
|
|
return ip
|
|
|
|
|
|
def _extract_mac(value: str) -> str | None:
|
|
m = _HWADDR_RE.search(value)
|
|
if m:
|
|
return m.group(1)
|
|
m = _VIRTIO_MAC_RE.search(value)
|
|
if m:
|
|
return m.group(1)
|
|
return None
|
|
|
|
|
|
def _ordered_nic_keys(keys: list[str]) -> list[str]:
|
|
indexed: list[tuple[int, int, str]] = []
|
|
for key in keys:
|
|
parsed = _nic_index(key)
|
|
if not parsed:
|
|
continue
|
|
kind, idx = parsed
|
|
kind_order = 0 if kind == "ipconfig" else 1
|
|
indexed.append((idx, kind_order, key))
|
|
indexed.sort()
|
|
return [key for _, _, key in indexed]
|
|
|
|
|
|
def parse_guest_network_from_config(config: dict[str, object]) -> GuestLanNetwork:
|
|
"""Return static IPv4/MAC from pvesh config dict or parsed .conf key/value map."""
|
|
ips: list[str] = []
|
|
macs: list[str] = []
|
|
for key in _ordered_nic_keys(list(config.keys())):
|
|
raw = config.get(key)
|
|
if not isinstance(raw, str):
|
|
continue
|
|
kind, _ = _nic_index(key) or ("", 0)
|
|
if kind == "ipconfig":
|
|
ip = _extract_ipv4(raw)
|
|
if ip:
|
|
ips.append(ip)
|
|
continue
|
|
if kind == "net":
|
|
ip = _extract_ipv4(raw)
|
|
if ip:
|
|
ips.append(ip)
|
|
mac = _extract_mac(raw)
|
|
if mac:
|
|
macs.append(mac)
|
|
dedup_ips = tuple(dict.fromkeys(ips))
|
|
dedup_macs = tuple(dict.fromkeys(macs))
|
|
primary_lan11 = next((ip for ip in dedup_ips if ip.startswith("192.168.11.")), None)
|
|
return GuestLanNetwork(
|
|
ips=dedup_ips,
|
|
macs=dedup_macs,
|
|
primary_lan11=primary_lan11,
|
|
)
|
|
|
|
|
|
def parse_guest_network_from_conf_text(body: str) -> GuestLanNetwork:
|
|
config: dict[str, str] = {}
|
|
for line in body.splitlines():
|
|
if ":" not in line:
|
|
continue
|
|
key, value = line.split(":", 1)
|
|
key = key.strip()
|
|
if not key:
|
|
continue
|
|
config[key] = value.strip()
|
|
return parse_guest_network_from_config(config)
|