Apply Composer changes: comprehensive API updates, migrations, middleware, and infrastructure improvements
- Add comprehensive database migrations (001-024) for schema evolution - Enhance API schema with expanded type definitions and resolvers - Add new middleware: audit logging, rate limiting, MFA enforcement, security, tenant auth - Implement new services: AI optimization, billing, blockchain, compliance, marketplace - Add adapter layer for cloud integrations (Cloudflare, Kubernetes, Proxmox, storage) - Update Crossplane provider with enhanced VM management capabilities - Add comprehensive test suite for API endpoints and services - Update frontend components with improved GraphQL subscriptions and real-time updates - Enhance security configurations and headers (CSP, CORS, etc.) - Update documentation and configuration files - Add new CI/CD workflows and validation scripts - Implement design system improvements and UI enhancements
This commit is contained in:
373
infrastructure/omada/api/omada_client.py
Normal file
373
infrastructure/omada/api/omada_client.py
Normal file
@@ -0,0 +1,373 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
TP-Link Omada Controller API Client
|
||||
|
||||
A Python client library for interacting with the TP-Link Omada Controller API.
|
||||
"""
|
||||
|
||||
import requests
|
||||
import json
|
||||
from typing import Dict, List, Optional, Any
|
||||
from urllib.parse import urljoin
|
||||
|
||||
|
||||
class OmadaError(Exception):
|
||||
"""Base exception for Omada API errors"""
|
||||
pass
|
||||
|
||||
|
||||
class AuthenticationError(OmadaError):
|
||||
"""Authentication failed"""
|
||||
pass
|
||||
|
||||
|
||||
class OmadaController:
|
||||
"""TP-Link Omada Controller API Client"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
host: str,
|
||||
username: str,
|
||||
password: str,
|
||||
port: int = 8043,
|
||||
verify_ssl: bool = True,
|
||||
timeout: int = 30
|
||||
):
|
||||
"""
|
||||
Initialize Omada Controller client
|
||||
|
||||
Args:
|
||||
host: Omada Controller hostname or IP
|
||||
username: Controller username
|
||||
password: Controller password
|
||||
port: Controller port (default: 8043)
|
||||
verify_ssl: Verify SSL certificates (default: True)
|
||||
timeout: Request timeout in seconds (default: 30)
|
||||
"""
|
||||
self.base_url = f"https://{host}:{port}"
|
||||
self.username = username
|
||||
self.password = password
|
||||
self.verify_ssl = verify_ssl
|
||||
self.timeout = timeout
|
||||
self.session = requests.Session()
|
||||
self.session.verify = verify_ssl
|
||||
self.token = None
|
||||
self.authenticated = False
|
||||
|
||||
def _request(
|
||||
self,
|
||||
method: str,
|
||||
endpoint: str,
|
||||
data: Optional[Dict] = None,
|
||||
params: Optional[Dict] = None
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
Make API request
|
||||
|
||||
Args:
|
||||
method: HTTP method (GET, POST, PUT, DELETE)
|
||||
endpoint: API endpoint
|
||||
data: Request body data
|
||||
params: Query parameters
|
||||
|
||||
Returns:
|
||||
API response as dictionary
|
||||
|
||||
Raises:
|
||||
OmadaError: If API request fails
|
||||
"""
|
||||
url = urljoin(self.base_url, endpoint)
|
||||
headers = {
|
||||
"Content-Type": "application/json",
|
||||
"Accept": "application/json"
|
||||
}
|
||||
|
||||
if self.token:
|
||||
headers["Authorization"] = f"Bearer {self.token}"
|
||||
|
||||
try:
|
||||
response = self.session.request(
|
||||
method=method,
|
||||
url=url,
|
||||
headers=headers,
|
||||
json=data,
|
||||
params=params,
|
||||
timeout=self.timeout
|
||||
)
|
||||
response.raise_for_status()
|
||||
return response.json()
|
||||
except requests.exceptions.HTTPError as e:
|
||||
if e.response.status_code == 401:
|
||||
raise AuthenticationError("Authentication failed") from e
|
||||
raise OmadaError(f"API request failed: {e}") from e
|
||||
except requests.exceptions.RequestException as e:
|
||||
raise OmadaError(f"Request failed: {e}") from e
|
||||
|
||||
def login(self) -> bool:
|
||||
"""
|
||||
Authenticate with Omada Controller
|
||||
|
||||
Returns:
|
||||
True if authentication successful
|
||||
|
||||
Raises:
|
||||
AuthenticationError: If authentication fails
|
||||
"""
|
||||
endpoint = "/api/v2/login"
|
||||
data = {
|
||||
"username": self.username,
|
||||
"password": self.password
|
||||
}
|
||||
|
||||
try:
|
||||
response = self._request("POST", endpoint, data=data)
|
||||
self.token = response.get("token")
|
||||
self.authenticated = True
|
||||
return True
|
||||
except OmadaError as e:
|
||||
self.authenticated = False
|
||||
raise AuthenticationError(f"Login failed: {e}") from e
|
||||
|
||||
def logout(self) -> None:
|
||||
"""Logout from Omada Controller"""
|
||||
if self.authenticated:
|
||||
endpoint = "/api/v2/logout"
|
||||
try:
|
||||
self._request("POST", endpoint)
|
||||
except OmadaError:
|
||||
pass # Ignore errors on logout
|
||||
finally:
|
||||
self.token = None
|
||||
self.authenticated = False
|
||||
|
||||
def is_authenticated(self) -> bool:
|
||||
"""Check if authenticated"""
|
||||
return self.authenticated
|
||||
|
||||
def get_sites(self) -> List[Dict[str, Any]]:
|
||||
"""
|
||||
Get all sites
|
||||
|
||||
Returns:
|
||||
List of site dictionaries
|
||||
"""
|
||||
endpoint = "/api/v2/sites"
|
||||
response = self._request("GET", endpoint)
|
||||
return response.get("data", [])
|
||||
|
||||
def get_site(self, site_id: str) -> Dict[str, Any]:
|
||||
"""
|
||||
Get site by ID
|
||||
|
||||
Args:
|
||||
site_id: Site ID
|
||||
|
||||
Returns:
|
||||
Site dictionary
|
||||
"""
|
||||
endpoint = f"/api/v2/sites/{site_id}"
|
||||
response = self._request("GET", endpoint)
|
||||
return response.get("data", {})
|
||||
|
||||
def create_site(
|
||||
self,
|
||||
name: str,
|
||||
timezone: str = "UTC",
|
||||
description: Optional[str] = None
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
Create a new site
|
||||
|
||||
Args:
|
||||
name: Site name
|
||||
timezone: Timezone (e.g., "America/New_York")
|
||||
description: Site description
|
||||
|
||||
Returns:
|
||||
Created site dictionary
|
||||
"""
|
||||
endpoint = "/api/v2/sites"
|
||||
data = {
|
||||
"name": name,
|
||||
"timezone": timezone
|
||||
}
|
||||
if description:
|
||||
data["description"] = description
|
||||
|
||||
response = self._request("POST", endpoint, data=data)
|
||||
return response.get("data", {})
|
||||
|
||||
def get_access_points(self, site_id: str) -> List[Dict[str, Any]]:
|
||||
"""
|
||||
Get all access points for a site
|
||||
|
||||
Args:
|
||||
site_id: Site ID
|
||||
|
||||
Returns:
|
||||
List of access point dictionaries
|
||||
"""
|
||||
endpoint = f"/api/v2/sites/{site_id}/access-points"
|
||||
response = self._request("GET", endpoint)
|
||||
return response.get("data", [])
|
||||
|
||||
def get_access_point(self, ap_id: str) -> Dict[str, Any]:
|
||||
"""
|
||||
Get access point by ID
|
||||
|
||||
Args:
|
||||
ap_id: Access point ID
|
||||
|
||||
Returns:
|
||||
Access point dictionary
|
||||
"""
|
||||
endpoint = f"/api/v2/access-points/{ap_id}"
|
||||
response = self._request("GET", endpoint)
|
||||
return response.get("data", {})
|
||||
|
||||
def configure_ap(
|
||||
self,
|
||||
ap_id: str,
|
||||
name: Optional[str] = None,
|
||||
location: Optional[str] = None,
|
||||
radio_config: Optional[Dict] = None
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
Configure access point
|
||||
|
||||
Args:
|
||||
ap_id: Access point ID
|
||||
name: Access point name
|
||||
location: Physical location
|
||||
radio_config: Radio configuration
|
||||
|
||||
Returns:
|
||||
Updated access point dictionary
|
||||
"""
|
||||
endpoint = f"/api/v2/access-points/{ap_id}"
|
||||
data = {}
|
||||
|
||||
if name:
|
||||
data["name"] = name
|
||||
if location:
|
||||
data["location"] = location
|
||||
if radio_config:
|
||||
data["radio_config"] = radio_config
|
||||
|
||||
response = self._request("PUT", endpoint, data=data)
|
||||
return response.get("data", {})
|
||||
|
||||
def get_ssids(self, site_id: str) -> List[Dict[str, Any]]:
|
||||
"""
|
||||
Get all SSIDs for a site
|
||||
|
||||
Args:
|
||||
site_id: Site ID
|
||||
|
||||
Returns:
|
||||
List of SSID dictionaries
|
||||
"""
|
||||
endpoint = f"/api/v2/sites/{site_id}/ssids"
|
||||
response = self._request("GET", endpoint)
|
||||
return response.get("data", [])
|
||||
|
||||
def create_ssid(
|
||||
self,
|
||||
site_id: str,
|
||||
name: str,
|
||||
security: str = "wpa3",
|
||||
password: Optional[str] = None,
|
||||
vlan: Optional[int] = None,
|
||||
radios: Optional[List[str]] = None
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
Create SSID
|
||||
|
||||
Args:
|
||||
site_id: Site ID
|
||||
name: SSID name
|
||||
security: Security type (open, wpa2, wpa3)
|
||||
password: WPA password (required for wpa2/wpa3)
|
||||
vlan: VLAN ID
|
||||
radios: List of radios (["2.4GHz", "5GHz"])
|
||||
|
||||
Returns:
|
||||
Created SSID dictionary
|
||||
"""
|
||||
endpoint = f"/api/v2/sites/{site_id}/ssids"
|
||||
data = {
|
||||
"name": name,
|
||||
"security": security
|
||||
}
|
||||
|
||||
if password:
|
||||
data["password"] = password
|
||||
if vlan:
|
||||
data["vlan"] = vlan
|
||||
if radios:
|
||||
data["radios"] = radios
|
||||
|
||||
response = self._request("POST", endpoint, data=data)
|
||||
return response.get("data", {})
|
||||
|
||||
def get_clients(self, site_id: str) -> List[Dict[str, Any]]:
|
||||
"""
|
||||
Get all client devices for a site
|
||||
|
||||
Args:
|
||||
site_id: Site ID
|
||||
|
||||
Returns:
|
||||
List of client dictionaries
|
||||
"""
|
||||
endpoint = f"/api/v2/sites/{site_id}/clients"
|
||||
response = self._request("GET", endpoint)
|
||||
return response.get("data", [])
|
||||
|
||||
def get_client(self, mac: str) -> Dict[str, Any]:
|
||||
"""
|
||||
Get client device by MAC address
|
||||
|
||||
Args:
|
||||
mac: MAC address
|
||||
|
||||
Returns:
|
||||
Client dictionary
|
||||
"""
|
||||
endpoint = f"/api/v2/clients/{mac}"
|
||||
response = self._request("GET", endpoint)
|
||||
return response.get("data", {})
|
||||
|
||||
|
||||
# Example usage
|
||||
if __name__ == "__main__":
|
||||
# Initialize controller
|
||||
controller = OmadaController(
|
||||
host="omada.sankofa.nexus",
|
||||
username="admin",
|
||||
password="secure-password"
|
||||
)
|
||||
|
||||
try:
|
||||
# Authenticate
|
||||
controller.login()
|
||||
print("Authenticated successfully")
|
||||
|
||||
# Get sites
|
||||
sites = controller.get_sites()
|
||||
print(f"Found {len(sites)} sites")
|
||||
|
||||
# Get access points for first site
|
||||
if sites:
|
||||
site_id = sites[0]["id"]
|
||||
aps = controller.get_access_points(site_id)
|
||||
print(f"Found {len(aps)} access points")
|
||||
|
||||
# Logout
|
||||
controller.logout()
|
||||
print("Logged out")
|
||||
except AuthenticationError as e:
|
||||
print(f"Authentication failed: {e}")
|
||||
except OmadaError as e:
|
||||
print(f"Error: {e}")
|
||||
|
||||
Reference in New Issue
Block a user