Files
Sankofa/crossplane-provider-proxmox/pkg/scaling/policy.go
defiQUG 9daf1fd378 Apply Composer changes: comprehensive API updates, migrations, middleware, and infrastructure improvements
- Add comprehensive database migrations (001-024) for schema evolution
- Enhance API schema with expanded type definitions and resolvers
- Add new middleware: audit logging, rate limiting, MFA enforcement, security, tenant auth
- Implement new services: AI optimization, billing, blockchain, compliance, marketplace
- Add adapter layer for cloud integrations (Cloudflare, Kubernetes, Proxmox, storage)
- Update Crossplane provider with enhanced VM management capabilities
- Add comprehensive test suite for API endpoints and services
- Update frontend components with improved GraphQL subscriptions and real-time updates
- Enhance security configurations and headers (CSP, CORS, etc.)
- Update documentation and configuration files
- Add new CI/CD workflows and validation scripts
- Implement design system improvements and UI enhancements
2025-12-12 18:01:35 -08:00

182 lines
5.0 KiB
Go

package scaling
import (
"context"
"fmt"
"github.com/sankofa/crossplane-provider-proxmox/apis/v1alpha1"
"github.com/sankofa/crossplane-provider-proxmox/pkg/metrics"
)
// ScalingDecision represents a scaling decision
type ScalingDecision struct {
Action string // "SCALE_UP", "SCALE_DOWN", "NO_ACTION"
NewReplicas int
Reason string
}
// PolicyEngine evaluates scaling policies and makes scaling decisions
type PolicyEngine struct {
metricsCollector *metrics.Collector
}
// NewPolicyEngine creates a new scaling policy engine
func NewPolicyEngine(collector *metrics.Collector) *PolicyEngine {
return &PolicyEngine{
metricsCollector: collector,
}
}
// Evaluate evaluates scaling policies and returns a scaling decision
func (e *PolicyEngine) Evaluate(
ctx context.Context,
spec v1alpha1.VMScaleSetSpec,
status v1alpha1.VMScaleSetStatus,
instances []v1alpha1.VMInstance,
) (*ScalingDecision, error) {
currentReplicas := status.CurrentReplicas
// If manual scaling is requested, use desired replicas
if spec.DesiredReplicas != nil {
if *spec.DesiredReplicas != currentReplicas {
return &ScalingDecision{
Action: "SCALE_MANUAL",
NewReplicas: *spec.DesiredReplicas,
Reason: "Manual scaling requested",
}, nil
}
}
// If no scaling policies, maintain current replicas
if len(spec.ScalingPolicies) == 0 {
return &ScalingDecision{
Action: "NO_ACTION",
NewReplicas: currentReplicas,
Reason: "No scaling policies configured",
}, nil
}
// Evaluate each policy
var scaleUpReasons []string
var scaleDownReasons []string
for _, policy := range spec.ScalingPolicies {
decision, err := e.evaluatePolicy(ctx, policy, instances, spec.Template.Node)
if err != nil {
continue // Skip policies that fail to evaluate
}
switch decision.Action {
case "SCALE_UP":
scaleUpReasons = append(scaleUpReasons, decision.Reason)
case "SCALE_DOWN":
scaleDownReasons = append(scaleDownReasons, decision.Reason)
}
}
// Determine final action
if len(scaleUpReasons) > 0 && currentReplicas < spec.MaxReplicas {
newReplicas := currentReplicas + 1
if newReplicas > spec.MaxReplicas {
newReplicas = spec.MaxReplicas
}
return &ScalingDecision{
Action: "SCALE_UP",
NewReplicas: newReplicas,
Reason: fmt.Sprintf("Scale up: %s", scaleUpReasons[0]),
}, nil
}
if len(scaleDownReasons) > 0 && currentReplicas > spec.MinReplicas {
newReplicas := currentReplicas - 1
if newReplicas < spec.MinReplicas {
newReplicas = spec.MinReplicas
}
return &ScalingDecision{
Action: "SCALE_DOWN",
NewReplicas: newReplicas,
Reason: fmt.Sprintf("Scale down: %s", scaleDownReasons[0]),
}, nil
}
return &ScalingDecision{
Action: "NO_ACTION",
NewReplicas: currentReplicas,
Reason: "No scaling needed",
}, nil
}
func (e *PolicyEngine) evaluatePolicy(
ctx context.Context,
policy v1alpha1.ScalingPolicy,
instances []v1alpha1.VMInstance,
node string,
) (*ScalingDecision, error) {
var currentValue float64
var err error
// Get current metric value
switch policy.Type {
case "CPU":
vmIDs := getVMIDs(instances)
currentValue, err = e.metricsCollector.GetAverageMetric(ctx, vmIDs, node, "CPU")
case "MEMORY":
vmIDs := getVMIDs(instances)
currentValue, err = e.metricsCollector.GetAverageMetric(ctx, vmIDs, node, "MEMORY")
case "CUSTOM":
values, queryErr := e.metricsCollector.GetCustomMetric(ctx, policy.MetricName)
if queryErr != nil || len(values) == 0 {
return &ScalingDecision{Action: "NO_ACTION", Reason: "Custom metric not available"}, nil
}
currentValue = values[0].Value
default:
return nil, fmt.Errorf("unsupported policy type: %s", policy.Type)
}
if err != nil {
return &ScalingDecision{Action: "NO_ACTION", Reason: fmt.Sprintf("Failed to get metrics: %v", err)}, nil
}
// Parse thresholds
scaleUpThreshold, err := metrics.ParseThreshold(policy.ScaleUpThreshold)
if err != nil {
scaleUpThreshold, _ = metrics.ParseThreshold(policy.TargetValue)
}
scaleDownThreshold, err := metrics.ParseThreshold(policy.ScaleDownThreshold)
if err != nil {
scaleDownThreshold = scaleUpThreshold * 0.5 // Default to 50% of scale-up threshold
}
// Evaluate scaling conditions
if currentValue > scaleUpThreshold {
return &ScalingDecision{
Action: "SCALE_UP",
Reason: fmt.Sprintf("%s utilization (%.2f%%) exceeds threshold (%.2f%%)", policy.Type, currentValue, scaleUpThreshold),
}, nil
}
if currentValue < scaleDownThreshold {
return &ScalingDecision{
Action: "SCALE_DOWN",
Reason: fmt.Sprintf("%s utilization (%.2f%%) below threshold (%.2f%%)", policy.Type, currentValue, scaleDownThreshold),
}, nil
}
return &ScalingDecision{
Action: "NO_ACTION",
Reason: fmt.Sprintf("%s utilization (%.2f%%) within thresholds", policy.Type, currentValue),
}, nil
}
func getVMIDs(instances []v1alpha1.VMInstance) []int {
vmIDs := make([]int, 0, len(instances))
for _, instance := range instances {
if instance.VMID > 0 {
vmIDs = append(vmIDs, instance.VMID)
}
}
return vmIDs
}