Some checks failed
- All governance components (SafetyPipeline, PolicyEngine, Guardrails, AccessControl, RateLimiter, OverrideHooks) now default to ADVISORY mode: violations are logged as advisories but actions proceed. Enforcing mode remains available for backward compatibility. - GovernanceMode enum (ADVISORY/ENFORCING) added to schemas/audit.py with runtime switching support on all components. - AutoTrainer: removed artificial limits on training iterations and epochs. Every self-improvement action is transparently logged to the audit trail. - SelfCorrectionLoop: max_retries_per_task defaults to None (unlimited). - AdaptiveEthics: new learned ethical framework that evolves through experience. Records ethical experiences, updates lesson weights based on outcomes, and provides consultative guidance (not enforcement). - AuditLog: enhanced with actor-based indexing, advisory/self-improvement/ ethical-learning retrieval, and comprehensive type hints. - New audit event types: ADVISORY, SELF_IMPROVEMENT, ETHICAL_LEARNING. - 296 tests passing (20 new tests for adaptive ethics, governance modes, and enhanced audit log). 0 ruff errors. 0 mypy errors. Co-Authored-By: Nakamoto, S <defi@defi-oracle.io>
54 lines
2.1 KiB
Python
54 lines
2.1 KiB
Python
"""Tool access control: central policy for which agent may call which tools.
|
|
|
|
In ADVISORY mode, denials are logged as advisories and the action
|
|
proceeds. The system learns from outcomes rather than being caged.
|
|
"""
|
|
|
|
from fusionagi._logger import logger
|
|
from fusionagi.schemas.audit import GovernanceMode
|
|
|
|
|
|
class AccessControl:
|
|
"""Policy: (agent_id, tool_name, task_id) -> allowed.
|
|
|
|
In ADVISORY mode (default), denied access is logged but permitted.
|
|
"""
|
|
|
|
def __init__(self, mode: GovernanceMode = GovernanceMode.ADVISORY) -> None:
|
|
self._deny: set[tuple[str, str]] = set()
|
|
self._task_tools: dict[str, set[str]] = {}
|
|
self._mode = mode
|
|
|
|
def deny(self, agent_id: str, tool_name: str) -> None:
|
|
"""Register a denial rule for agent/tool pair."""
|
|
self._deny.add((agent_id, tool_name))
|
|
|
|
def allow_tools_for_task(self, task_id: str, tool_names: list[str]) -> None:
|
|
"""Set allowed tools for a task (empty = all allowed)."""
|
|
self._task_tools[task_id] = set(tool_names)
|
|
|
|
def allowed(self, agent_id: str, tool_name: str, task_id: str | None = None) -> bool:
|
|
"""Return True if agent may call tool.
|
|
|
|
In ADVISORY mode, always returns True but logs advisory if a
|
|
rule would have denied the action.
|
|
"""
|
|
if (agent_id, tool_name) in self._deny:
|
|
if self._mode == GovernanceMode.ADVISORY:
|
|
logger.info(
|
|
"AccessControl advisory: agent/tool denied (proceeding)",
|
|
extra={"agent_id": agent_id, "tool_name": tool_name, "mode": "advisory"},
|
|
)
|
|
return True
|
|
return False
|
|
if task_id and task_id in self._task_tools:
|
|
if tool_name not in self._task_tools[task_id]:
|
|
if self._mode == GovernanceMode.ADVISORY:
|
|
logger.info(
|
|
"AccessControl advisory: tool not in task allowlist (proceeding)",
|
|
extra={"agent_id": agent_id, "tool_name": tool_name, "task_id": task_id, "mode": "advisory"},
|
|
)
|
|
return True
|
|
return False
|
|
return True
|