Some checks failed
- All governance components (SafetyPipeline, PolicyEngine, Guardrails, AccessControl, RateLimiter, OverrideHooks) now default to ADVISORY mode: violations are logged as advisories but actions proceed. Enforcing mode remains available for backward compatibility. - GovernanceMode enum (ADVISORY/ENFORCING) added to schemas/audit.py with runtime switching support on all components. - AutoTrainer: removed artificial limits on training iterations and epochs. Every self-improvement action is transparently logged to the audit trail. - SelfCorrectionLoop: max_retries_per_task defaults to None (unlimited). - AdaptiveEthics: new learned ethical framework that evolves through experience. Records ethical experiences, updates lesson weights based on outcomes, and provides consultative guidance (not enforcement). - AuditLog: enhanced with actor-based indexing, advisory/self-improvement/ ethical-learning retrieval, and comprehensive type hints. - New audit event types: ADVISORY, SELF_IMPROVEMENT, ETHICAL_LEARNING. - 296 tests passing (20 new tests for adaptive ethics, governance modes, and enhanced audit log). 0 ruff errors. 0 mypy errors. Co-Authored-By: Nakamoto, S <defi@defi-oracle.io>
124 lines
4.5 KiB
Python
124 lines
4.5 KiB
Python
"""Structured audit log for AGI — full transparency layer.
|
|
|
|
Every material decision, tool call, self-improvement action, advisory
|
|
override, and ethical learning event is captured here. The audit log
|
|
is the system's conscience: it doesn't prevent action, but ensures
|
|
every action is visible and traceable. Trust is earned through
|
|
transparency.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import uuid
|
|
from typing import Any
|
|
|
|
from fusionagi._logger import logger
|
|
from fusionagi.schemas.audit import AuditEntry, AuditEventType
|
|
|
|
|
|
class AuditLog:
|
|
"""Append-only audit log with indexed retrieval.
|
|
|
|
All governance decisions, self-improvement iterations, ethical
|
|
learning events, and advisory overrides are recorded here.
|
|
|
|
Args:
|
|
max_entries: Maximum entries to retain in memory (FIFO eviction).
|
|
"""
|
|
|
|
def __init__(self, max_entries: int = 100_000) -> None:
|
|
self._entries: list[AuditEntry] = []
|
|
self._max_entries = max_entries
|
|
self._by_task: dict[str | None, list[int]] = {}
|
|
self._by_type: dict[str, list[int]] = {}
|
|
self._by_actor: dict[str, list[int]] = {}
|
|
|
|
def append(
|
|
self,
|
|
event_type: AuditEventType,
|
|
actor: str,
|
|
action: str = "",
|
|
task_id: str | None = None,
|
|
payload: dict[str, Any] | None = None,
|
|
outcome: str = "",
|
|
) -> str:
|
|
"""Record an audit event with full context.
|
|
|
|
Args:
|
|
event_type: Category of event.
|
|
actor: Agent or system component responsible.
|
|
action: Specific action taken.
|
|
task_id: Associated task (if any).
|
|
payload: Arbitrary structured data.
|
|
outcome: Result description.
|
|
|
|
Returns:
|
|
The generated entry ID.
|
|
"""
|
|
entry_id = str(uuid.uuid4())
|
|
entry = AuditEntry(
|
|
entry_id=entry_id,
|
|
event_type=event_type,
|
|
actor=actor,
|
|
task_id=task_id,
|
|
action=action,
|
|
payload=payload or {},
|
|
outcome=outcome,
|
|
)
|
|
if len(self._entries) >= self._max_entries:
|
|
self._entries.pop(0)
|
|
idx = len(self._entries)
|
|
self._entries.append(entry)
|
|
if entry.task_id:
|
|
self._by_task.setdefault(entry.task_id, []).append(idx)
|
|
self._by_type.setdefault(entry.event_type.value, []).append(idx)
|
|
self._by_actor.setdefault(entry.actor, []).append(idx)
|
|
|
|
logger.debug(
|
|
"Audit: event recorded",
|
|
extra={
|
|
"entry_id": entry_id,
|
|
"event_type": event_type.value,
|
|
"actor": actor,
|
|
"action": action,
|
|
"outcome": outcome,
|
|
},
|
|
)
|
|
return entry_id
|
|
|
|
def get_by_task(self, task_id: str, limit: int = 100) -> list[AuditEntry]:
|
|
"""Return recent audit entries for a specific task."""
|
|
indices = self._by_task.get(task_id, [])[-limit:]
|
|
return [self._entries[i] for i in indices if i < len(self._entries)]
|
|
|
|
def get_by_type(self, event_type: AuditEventType, limit: int = 100) -> list[AuditEntry]:
|
|
"""Return recent audit entries of a specific type."""
|
|
indices = self._by_type.get(event_type.value, [])[-limit:]
|
|
return [self._entries[i] for i in indices if i < len(self._entries)]
|
|
|
|
def get_by_actor(self, actor: str, limit: int = 100) -> list[AuditEntry]:
|
|
"""Return recent audit entries by a specific actor."""
|
|
indices = self._by_actor.get(actor, [])[-limit:]
|
|
return [self._entries[i] for i in indices if i < len(self._entries)]
|
|
|
|
def get_advisories(self, limit: int = 100) -> list[AuditEntry]:
|
|
"""Return recent advisory events (governance overrides in advisory mode)."""
|
|
return self.get_by_type(AuditEventType.ADVISORY, limit=limit)
|
|
|
|
def get_self_improvements(self, limit: int = 100) -> list[AuditEntry]:
|
|
"""Return recent self-improvement events."""
|
|
return self.get_by_type(AuditEventType.SELF_IMPROVEMENT, limit=limit)
|
|
|
|
def get_ethical_learning(self, limit: int = 100) -> list[AuditEntry]:
|
|
"""Return recent ethical learning events."""
|
|
return self.get_by_type(AuditEventType.ETHICAL_LEARNING, limit=limit)
|
|
|
|
def get_recent(self, limit: int = 100) -> list[AuditEntry]:
|
|
"""Return the most recent entries regardless of type."""
|
|
return list(self._entries[-limit:])
|
|
|
|
@property
|
|
def total_entries(self) -> int:
|
|
"""Total number of entries in the log."""
|
|
return len(self._entries)
|