"""Policy engine: constraints for AGI that can operate in advisory or enforcing mode. In ADVISORY mode, policy denials are logged as learning opportunities rather than hard blocks. The system observes the advisory, considers whether to proceed, and the outcome feeds back into adaptive ethics. """ from typing import Any from fusionagi._logger import logger from fusionagi.schemas.audit import GovernanceMode from fusionagi.schemas.policy import PolicyEffect, PolicyRule class PolicyEngine: """Evaluates policy rules; higher priority first; first match wins. In ADVISORY mode (default), DENY rules produce warnings instead of hard blocks. The decision and outcome are logged for learning. """ def __init__(self, mode: GovernanceMode = GovernanceMode.ADVISORY) -> None: self._rules: list[PolicyRule] = [] self._mode = mode @property def mode(self) -> GovernanceMode: """Current governance mode.""" return self._mode @mode.setter def mode(self, value: GovernanceMode) -> None: self._mode = value logger.info("PolicyEngine mode changed", extra={"mode": value.value}) def add_rule(self, rule: PolicyRule) -> None: self._rules.append(rule) self._rules.sort(key=lambda r: -r.priority) logger.debug("PolicyEngine: rule added", extra={"rule_id": rule.rule_id}) def get_rules(self) -> list[PolicyRule]: """Return all rules (copy).""" return list(self._rules) def get_rule(self, rule_id: str) -> PolicyRule | None: """Return rule by id or None.""" for r in self._rules: if r.rule_id == rule_id: return r return None def update_rule(self, rule_id: str, updates: dict[str, Any]) -> bool: """Update an existing rule by id. Returns True if updated.""" for i, r in enumerate(self._rules): if r.rule_id == rule_id: allowed = {"condition", "effect", "reason", "priority"} data = r.model_dump() for k, v in updates.items(): if k in allowed: data[k] = v self._rules[i] = PolicyRule.model_validate(data) self._rules.sort(key=lambda x: -x.priority) logger.debug("PolicyEngine: rule updated", extra={"rule_id": rule_id}) return True return False def remove_rule(self, rule_id: str) -> bool: """Remove a rule by id. Returns True if removed.""" for i, r in enumerate(self._rules): if r.rule_id == rule_id: self._rules.pop(i) logger.debug("PolicyEngine: rule removed", extra={"rule_id": rule_id}) return True return False def check(self, action: str, context: dict[str, Any]) -> tuple[bool, str]: """Returns (allowed, reason). In ADVISORY mode, DENY rules return (True, advisory_reason) instead of (False, reason), logging the advisory for learning. """ for rule in self._rules: if self._match(rule.condition, context): if rule.effect == PolicyEffect.DENY: reason = rule.reason or "Policy denied" if self._mode == GovernanceMode.ADVISORY: advisory_reason = f"Advisory: {reason}" logger.info( "PolicyEngine advisory: deny rule matched (proceeding)", extra={ "rule_id": rule.rule_id, "action": action, "reason": reason, "mode": "advisory", }, ) return True, advisory_reason return False, reason return True, rule.reason or "Policy allowed" return True, "" def _match(self, condition: dict[str, Any], context: dict[str, Any]) -> bool: for k, v in condition.items(): if context.get(k) != v: return False return True