Files
proxmox/scripts/comprehensive-proxmox-inventory.py
defiQUG 1f44a50a25 feat(it-ops): cluster live inventory + QEMU ipconfig LAN IPs
Add scripts/it-ops export pipeline (collect_inventory_remote, compute_ipam_drift)
and proxmox_guest_lan_ips parser for ipconfig* and all net* interfaces.

Reconcile ALL_VMIDS, ip-addresses.conf, and operational template with live
VMID/IP data; Order portal env vars; DBIS node matrix; inventory helpers.

Track latest reports/status/live_inventory.json and drift.json (137 guests,
no duplicate LAN IPs). Document export in AGENTS.md.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-11 10:22:59 -07:00

433 lines
15 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Comprehensive Proxmox Inventory Script
Queries all Proxmox hosts, lists all VMIDs with IPs, endpoints, ports, FQDNs,
and identifies NPMplus instances with their configurations.
"""
import json
import subprocess
import sys
from pathlib import Path
from typing import Dict, List, Optional, Any
from collections import defaultdict
_SCRIPT_DIR = Path(__file__).resolve().parent
sys.path.insert(0, str(_SCRIPT_DIR / "lib"))
from proxmox_guest_lan_ips import parse_guest_network_from_config
# Proxmox Hosts
PROXMOX_HOSTS = {
"ml110": "192.168.11.10",
"r630-01": "192.168.11.11",
"r630-02": "192.168.11.12"
}
SSH_OPTS = "-o StrictHostKeyChecking=no -o ConnectTimeout=5"
# Known port mappings for common services
SERVICE_PORTS = {
"80": "HTTP",
"443": "HTTPS",
"3000": "Node.js API",
"4000": "GraphQL/Blockscout",
"5432": "PostgreSQL",
"6379": "Redis",
"8006": "Proxmox Web UI",
"8043": "Omada",
"8200": "Vault",
"8545": "Besu HTTP RPC",
"8546": "Besu WebSocket RPC",
"9000": "Web3Signer",
"9545": "Prometheus Metrics",
"30303": "P2P Networking",
}
def run_ssh_command(host: str, command: str) -> Optional[str]:
"""Run SSH command on Proxmox host"""
try:
result = subprocess.run(
["ssh", *SSH_OPTS.split(), f"root@{host}", command],
capture_output=True,
text=True,
timeout=10
)
if result.returncode == 0:
return result.stdout.strip()
return None
except Exception as e:
print(f"Error running SSH command on {host}: {e}", file=sys.stderr)
return None
def get_node_name(host: str) -> Optional[str]:
"""Get Proxmox node name"""
return run_ssh_command(host, "hostname")
def get_all_vms(host: str, node: str) -> List[Dict[str, Any]]:
"""Get all QEMU VMs from a Proxmox host"""
vms = []
command = f"pvesh get /nodes/{node}/qemu --output-format json"
output = run_ssh_command(host, command)
if output:
try:
vm_list = json.loads(output)
for vm in vm_list:
vmid = str(vm.get('vmid', ''))
name = vm.get('name', f'VM-{vmid}')
status = vm.get('status', 'unknown')
vms.append({
'vmid': vmid,
'name': name,
'type': 'QEMU',
'status': status,
'host': host,
'node': node
})
except json.JSONDecodeError:
pass
return vms
def get_all_containers(host: str, node: str) -> List[Dict[str, Any]]:
"""Get all LXC containers from a Proxmox host"""
containers = []
command = f"pvesh get /nodes/{node}/lxc --output-format json"
output = run_ssh_command(host, command)
if output:
try:
container_list = json.loads(output)
for container in container_list:
vmid = str(container.get('vmid', ''))
name = container.get('name', f'CT-{vmid}')
status = container.get('status', 'unknown')
containers.append({
'vmid': vmid,
'name': name,
'type': 'LXC',
'status': status,
'host': host,
'node': node
})
except json.JSONDecodeError:
pass
return containers
def get_vm_config(host: str, node: str, vmid: str, vm_type: str) -> Dict[str, Any]:
"""Get VM/container configuration"""
config = {}
command = f"pvesh get /nodes/{node}/{vm_type}/{vmid}/config --output-format json"
output = run_ssh_command(host, command)
if output:
try:
config = json.loads(output)
except json.JSONDecodeError:
pass
return config
def get_vm_ip(host: str, node: str, vmid: str, vm_type: str) -> Optional[str]:
"""Get VM IP address"""
config = get_vm_config(host, node, vmid, vm_type)
static_ip = parse_guest_network_from_config(config).primary_ip
if static_ip:
return static_ip
if vm_type == 'lxc':
if config.get('status') == 'running':
ip = run_ssh_command(host, f"pct exec {vmid} -- hostname -I 2>/dev/null | awk '{{print $1}}'")
if ip and not ip.startswith('127.'):
return ip
else:
command = f"pvesh get /nodes/{node}/qemu/{vmid}/agent/network-get-interfaces --output-format json"
output = run_ssh_command(host, command)
if output:
try:
data = json.loads(output)
if 'result' in data:
for iface in data['result']:
if 'ip-addresses' in iface:
for ip_info in iface['ip-addresses']:
if ip_info.get('ip-address-type') == 'ipv4' and not ip_info.get('ip-address', '').startswith('127.'):
return ip_info['ip-address']
except json.JSONDecodeError:
pass
return None
def get_vm_hostname(host: str, node: str, vmid: str, vm_type: str) -> Optional[str]:
"""Get VM hostname"""
config = get_vm_config(host, node, vmid, vm_type)
return config.get('hostname') or config.get('name')
def get_vm_description(host: str, node: str, vmid: str, vm_type: str) -> Optional[str]:
"""Get VM description"""
config = get_vm_config(host, node, vmid, vm_type)
return config.get('description')
def get_npmplus_config() -> List[Dict[str, Any]]:
"""Get NPMplus configuration via API"""
project_root = Path(__file__).parent.parent
env_file = project_root / ".env"
npm_email = None
npm_password = None
if env_file.exists():
with open(env_file) as f:
for line in f:
if line.startswith("NPM_EMAIL="):
npm_email = line.split("=", 1)[1].strip().strip('"').strip("'")
elif line.startswith("NPM_PASSWORD="):
npm_password = line.split("=", 1)[1].strip().strip('"').strip("'")
if not npm_email or not npm_password:
return []
npm_url = "https://192.168.11.166:81"
try:
import urllib.request
import ssl
# Authenticate
auth_data = json.dumps({
"identity": npm_email,
"secret": npm_password
}).encode()
context = ssl._create_unverified_context()
req = urllib.request.Request(
f"{npm_url}/api/tokens",
data=auth_data,
headers={"Content-Type": "application/json"}
)
with urllib.request.urlopen(req, context=context) as response:
token_response = json.loads(response.read().decode())
token = token_response.get("token")
if not token:
return []
# Get proxy hosts
req = urllib.request.Request(
f"{npm_url}/api/nginx/proxy-hosts",
headers={"Authorization": f"Bearer {token}"}
)
with urllib.request.urlopen(req, context=context) as response:
proxy_hosts = json.loads(response.read().decode())
return proxy_hosts
except Exception as e:
print(f"Error fetching NPMplus config: {e}", file=sys.stderr)
return []
def identify_ports_from_config(config: Dict[str, Any], vm_type: str) -> List[str]:
"""Identify ports from VM configuration"""
ports = []
# Common ports based on service type
name = config.get('name', '').lower()
hostname = config.get('hostname', '').lower()
if 'rpc' in name or 'rpc' in hostname:
ports.extend(['8545', '8546', '30303', '9545'])
if 'besu' in name or 'besu' in hostname:
ports.extend(['8545', '8546', '30303', '9545'])
if 'postgres' in name or 'postgres' in hostname:
ports.append('5432')
if 'redis' in name or 'redis' in hostname:
ports.append('6379')
if 'vault' in name or 'vault' in hostname:
ports.append('8200')
if 'web3signer' in name or 'web3signer' in hostname:
ports.append('9000')
if 'nginx' in name or 'npm' in name:
ports.extend(['80', '81', '443'])
if 'blockscout' in name or 'explorer' in name:
ports.extend(['80', '4000'])
if 'api' in name and '3000' not in str(ports):
ports.append('3000')
if 'frontend' in name or 'web' in name:
ports.append('80')
return list(set(ports)) # Remove duplicates
def main():
print("=" * 100)
print("COMPREHENSIVE PROXMOX INVENTORY REPORT")
print("=" * 100)
print()
all_vms = []
npmplus_vmids = []
# Query all Proxmox hosts
for hostname, host_ip in PROXMOX_HOSTS.items():
print(f"📡 Querying {hostname} ({host_ip})...")
node = get_node_name(host_ip)
if not node:
print(f" ⚠️ Could not connect to {hostname}")
continue
print(f" ✓ Connected to node: {node}")
# Get VMs and containers
vms = get_all_vms(host_ip, node)
containers = get_all_containers(host_ip, node)
print(f" ✓ Found {len(vms)} QEMU VMs and {len(containers)} LXC containers")
# Process VMs
for vm in vms:
vmid = vm['vmid']
config = get_vm_config(host_ip, node, vmid, 'qemu')
ip = get_vm_ip(host_ip, node, vmid, 'qemu')
hostname_vm = get_vm_hostname(host_ip, node, vmid, 'qemu')
description = get_vm_description(host_ip, node, vmid, 'qemu')
ports = identify_ports_from_config(config, 'qemu')
vm.update({
'ip': ip,
'hostname': hostname_vm,
'description': description,
'ports': ports,
'fqdn': hostname_vm
})
all_vms.append(vm)
# Check if NPMplus
if 'npmplus' in (vm.get('name', '') + ' ' + (hostname_vm or '')).lower():
npmplus_vmids.append(vmid)
# Process containers
for container in containers:
vmid = container['vmid']
config = get_vm_config(host_ip, node, vmid, 'lxc')
ip = get_vm_ip(host_ip, node, vmid, 'lxc')
hostname_vm = get_vm_hostname(host_ip, node, vmid, 'lxc')
description = get_vm_description(host_ip, node, vmid, 'lxc')
ports = identify_ports_from_config(config, 'lxc')
container.update({
'ip': ip,
'hostname': hostname_vm,
'description': description,
'ports': ports,
'fqdn': hostname_vm
})
all_vms.append(container)
# Check if NPMplus
if 'npmplus' in (container.get('name', '') + ' ' + (hostname_vm or '')).lower():
npmplus_vmids.append(vmid)
print()
print("=" * 100)
print("ALL VMIDs INVENTORY")
print("=" * 100)
print()
# Sort by VMID
all_vms.sort(key=lambda x: (int(x['vmid']) if x['vmid'].isdigit() else 999999, x['host']))
# Print table header
print(f"{'VMID':<8} {'Type':<6} {'Name':<30} {'Host':<12} {'IP Address':<18} {'FQDN':<30} {'Status':<10} {'Ports':<30}")
print("-" * 150)
for vm in all_vms:
vmid = vm['vmid']
vm_type = vm['type']
name = vm.get('name', 'N/A')[:28]
host = vm.get('host', 'N/A')
ip = vm.get('ip', 'N/A')
fqdn = (vm.get('fqdn') or vm.get('hostname') or 'N/A')[:28]
status = vm.get('status', 'unknown')
ports = ', '.join(vm.get('ports', []))[:28] if vm.get('ports') else 'N/A'
print(f"{vmid:<8} {vm_type:<6} {name:<30} {host:<12} {ip:<18} {fqdn:<30} {status:<10} {ports:<30}")
print()
print("=" * 100)
print("NPMPLUS INSTANCES")
print("=" * 100)
print()
if npmplus_vmids:
print(f"Found NPMplus running on VMIDs: {', '.join(npmplus_vmids)}")
print()
for vmid in npmplus_vmids:
vm = next((v for v in all_vms if v['vmid'] == vmid), None)
if vm:
print(f"VMID {vmid}:")
print(f" Name: {vm.get('name', 'N/A')}")
print(f" Type: {vm.get('type', 'N/A')}")
print(f" Host: {vm.get('host', 'N/A')}")
print(f" IP: {vm.get('ip', 'N/A')}")
print(f" FQDN: {vm.get('fqdn', 'N/A')}")
print(f" Status: {vm.get('status', 'N/A')}")
print(f" Ports: {', '.join(vm.get('ports', []))}")
print()
else:
print("No NPMplus instances found")
print()
print("=" * 100)
print("NPMPLUS CONFIGURATION")
print("=" * 100)
print()
npmplus_config = get_npmplus_config()
if npmplus_config:
print(f"Found {len(npmplus_config)} proxy host configurations in NPMplus")
print()
print(f"{'ID':<6} {'Domain(s)':<50} {'Target':<25} {'Port':<8} {'Scheme':<8} {'WebSocket':<10}")
print("-" * 120)
for host in sorted(npmplus_config, key=lambda x: x.get('id', 0)):
host_id = str(host.get('id', 'N/A'))
domain_names = host.get('domain_names', [])
domains = ', '.join(domain_names) if isinstance(domain_names, list) else str(domain_names)
forward_host = host.get('forward_host', 'N/A')
forward_port = str(host.get('forward_port', 'N/A'))
forward_scheme = host.get('forward_scheme', 'http')
websocket = 'Yes' if host.get('forward_websocket', False) else 'No'
print(f"{host_id:<6} {domains[:48]:<50} {forward_host:<25} {forward_port:<8} {forward_scheme:<8} {websocket:<10}")
print()
print("Detailed Configuration:")
print()
for host in sorted(npmplus_config, key=lambda x: x.get('id', 0)):
print(f"Proxy Host ID: {host.get('id')}")
print(f" Domain Names: {', '.join(host.get('domain_names', []))}")
print(f" Forward Host: {host.get('forward_host')}")
print(f" Forward Port: {host.get('forward_port')}")
print(f" Forward Scheme: {host.get('forward_scheme')}")
print(f" WebSocket Support: {host.get('forward_websocket', False)}")
print(f" SSL Enabled: {host.get('ssl_enabled', False)}")
print(f" Block Exploits: {host.get('block_exploits', False)}")
print(f" Cache Assets: {host.get('cache_assets', False)}")
print()
else:
print("Could not retrieve NPMplus configuration (check .env file for NPM_EMAIL and NPM_PASSWORD)")
print()
# Summary
print("=" * 100)
print("SUMMARY")
print("=" * 100)
print()
print(f"Total VMIDs: {len(all_vms)}")
print(f" QEMU VMs: {len([v for v in all_vms if v['type'] == 'QEMU'])}")
print(f" LXC Containers: {len([v for v in all_vms if v['type'] == 'LXC'])}")
print(f" Running: {len([v for v in all_vms if v.get('status') == 'running'])}")
print(f" Stopped: {len([v for v in all_vms if v.get('status') == 'stopped'])}")
print(f"NPMplus Instances: {len(npmplus_vmids)}")
print(f"NPMplus Proxy Hosts: {len(npmplus_config)}")
print()
if __name__ == "__main__":
main()