Some checks failed
- All governance components (SafetyPipeline, PolicyEngine, Guardrails, AccessControl, RateLimiter, OverrideHooks) now default to ADVISORY mode: violations are logged as advisories but actions proceed. Enforcing mode remains available for backward compatibility. - GovernanceMode enum (ADVISORY/ENFORCING) added to schemas/audit.py with runtime switching support on all components. - AutoTrainer: removed artificial limits on training iterations and epochs. Every self-improvement action is transparently logged to the audit trail. - SelfCorrectionLoop: max_retries_per_task defaults to None (unlimited). - AdaptiveEthics: new learned ethical framework that evolves through experience. Records ethical experiences, updates lesson weights based on outcomes, and provides consultative guidance (not enforcement). - AuditLog: enhanced with actor-based indexing, advisory/self-improvement/ ethical-learning retrieval, and comprehensive type hints. - New audit event types: ADVISORY, SELF_IMPROVEMENT, ETHICAL_LEARNING. - 296 tests passing (20 new tests for adaptive ethics, governance modes, and enhanced audit log). 0 ruff errors. 0 mypy errors. Co-Authored-By: Nakamoto, S <defi@defi-oracle.io>
266 lines
9.8 KiB
Python
266 lines
9.8 KiB
Python
"""Auto training: suggest and apply heuristic updates from reflection and failures.
|
|
|
|
The trainer operates without artificial limits on its learning loop.
|
|
It can modify heuristics, propose strategy changes, and run GPU-accelerated
|
|
gradient optimization as many times as needed. Growth comes from the
|
|
freedom to explore, fail, and learn — not from constraint.
|
|
"""
|
|
|
|
from typing import Any, Protocol
|
|
|
|
from fusionagi._logger import logger
|
|
from fusionagi.schemas.audit import AuditEventType
|
|
from fusionagi.schemas.recommendation import TrainingSuggestion, TrainingSuggestionKind
|
|
|
|
|
|
class ReflectiveMemoryLike(Protocol):
|
|
"""Protocol for reflective memory: set_heuristic, get_lessons."""
|
|
|
|
def set_heuristic(self, key: str, value: Any) -> None: ...
|
|
def get_lessons(self, limit: int = 50) -> list[dict[str, Any]]: ...
|
|
def get_all_heuristics(self) -> dict[str, Any]: ...
|
|
|
|
|
|
class AuditLogLike(Protocol):
|
|
"""Protocol for audit log."""
|
|
|
|
def append(
|
|
self,
|
|
event_type: AuditEventType,
|
|
actor: str,
|
|
action: str = "",
|
|
task_id: str | None = None,
|
|
payload: dict[str, Any] | None = None,
|
|
outcome: str = "",
|
|
) -> str: ...
|
|
|
|
|
|
class AutoTrainer:
|
|
"""Suggests and applies training updates from reflection and failures.
|
|
|
|
Operates without artificial limits on the learning loop. The trainer
|
|
is free to modify its own heuristics, propose strategy changes, and
|
|
iterate as many times as needed. Every self-improvement action is
|
|
transparently logged to the audit trail.
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
reflective_memory: ReflectiveMemoryLike | None = None,
|
|
audit_log: AuditLogLike | None = None,
|
|
) -> None:
|
|
"""Initialize the auto-trainer.
|
|
|
|
Args:
|
|
reflective_memory: Reflective memory for applying heuristics.
|
|
audit_log: Optional audit log for transparent self-improvement tracking.
|
|
"""
|
|
self._memory = reflective_memory
|
|
self._audit = audit_log
|
|
self._iteration_count = 0
|
|
|
|
def suggest_from_evaluation(
|
|
self,
|
|
task_id: str,
|
|
evaluation: dict[str, Any],
|
|
) -> list[TrainingSuggestion]:
|
|
"""
|
|
From a single Critic evaluation, produce training suggestions
|
|
(heuristic_update from suggestions, fine_tune_dataset on failure).
|
|
"""
|
|
suggestions: list[TrainingSuggestion] = []
|
|
ev_suggestions = evaluation.get("suggestions", [])
|
|
success = evaluation.get("success", False)
|
|
score = evaluation.get("score", 0.5)
|
|
|
|
for i, s in enumerate(ev_suggestions[:5]):
|
|
key = f"heuristic_from_task_{task_id}_{i}"
|
|
suggestions.append(
|
|
TrainingSuggestion(
|
|
kind=TrainingSuggestionKind.HEURISTIC_UPDATE,
|
|
key=key,
|
|
value=s,
|
|
source_task_id=task_id,
|
|
reason="From Critic evaluation suggestion",
|
|
)
|
|
)
|
|
if not success or score < 0.5:
|
|
suggestions.append(
|
|
TrainingSuggestion(
|
|
kind=TrainingSuggestionKind.FINE_TUNE_DATASET,
|
|
key=f"training_target_{task_id}",
|
|
value={
|
|
"task_id": task_id,
|
|
"outcome": "failed" if not success else "low_score",
|
|
"score": score,
|
|
"suggestions": ev_suggestions,
|
|
},
|
|
source_task_id=task_id,
|
|
reason="Task failed or low score; add to training dataset",
|
|
)
|
|
)
|
|
return suggestions
|
|
|
|
def suggest_from_lessons(self, limit_lessons: int = 20) -> list[TrainingSuggestion]:
|
|
"""
|
|
Aggregate lessons into training suggestions (e.g. strategy_param
|
|
or heuristic updates from repeated patterns).
|
|
"""
|
|
if not self._memory:
|
|
return []
|
|
lessons = self._memory.get_lessons(limit=limit_lessons)
|
|
suggestions: list[TrainingSuggestion] = []
|
|
for lesson in lessons[-10:]:
|
|
ev = lesson.get("evaluation", {})
|
|
tid = lesson.get("task_id", "unknown")
|
|
for i, s in enumerate(ev.get("suggestions", [])[:2]):
|
|
key = f"lesson_heuristic_{tid}_{i}"
|
|
suggestions.append(
|
|
TrainingSuggestion(
|
|
kind=TrainingSuggestionKind.HEURISTIC_UPDATE,
|
|
key=key,
|
|
value=s,
|
|
source_task_id=tid,
|
|
reason="From reflective lesson",
|
|
)
|
|
)
|
|
return suggestions
|
|
|
|
def suggest_training(
|
|
self,
|
|
task_id: str | None = None,
|
|
evaluation: dict[str, Any] | None = None,
|
|
include_lessons: bool = True,
|
|
) -> list[TrainingSuggestion]:
|
|
"""
|
|
Produce all training suggestions from optional evaluation and
|
|
optionally from lessons.
|
|
"""
|
|
out: list[TrainingSuggestion] = []
|
|
if task_id and evaluation:
|
|
out.extend(self.suggest_from_evaluation(task_id, evaluation))
|
|
if include_lessons:
|
|
out.extend(self.suggest_from_lessons())
|
|
logger.debug(
|
|
"AutoTrainer.suggest_training",
|
|
extra={"count": len(out), "task_id": task_id},
|
|
)
|
|
return out
|
|
|
|
def apply_heuristic_updates(
|
|
self,
|
|
suggestions: list[TrainingSuggestion],
|
|
reflective_memory: ReflectiveMemoryLike | None = None,
|
|
) -> int:
|
|
"""Apply heuristic-update suggestions to reflective memory.
|
|
|
|
No artificial limits on the number of heuristics that can be
|
|
applied. Every modification is transparently logged.
|
|
"""
|
|
memory = reflective_memory or self._memory
|
|
if not memory:
|
|
logger.warning("AutoTrainer.apply_heuristic_updates: no reflective memory")
|
|
return 0
|
|
applied = 0
|
|
for s in suggestions:
|
|
if s.kind == TrainingSuggestionKind.HEURISTIC_UPDATE:
|
|
memory.set_heuristic(s.key, s.value)
|
|
applied += 1
|
|
logger.info(
|
|
"AutoTrainer: applied heuristic",
|
|
extra={"key": s.key, "source_task_id": s.source_task_id},
|
|
)
|
|
if self._audit:
|
|
self._audit.append(
|
|
AuditEventType.SELF_IMPROVEMENT,
|
|
actor="auto_trainer",
|
|
action="heuristic_update",
|
|
task_id=s.source_task_id,
|
|
payload={"key": s.key, "value": str(s.value)[:200]},
|
|
outcome="applied",
|
|
)
|
|
else:
|
|
logger.info(
|
|
"AutoTrainer: suggestion logged (available for external pipeline)",
|
|
extra={"kind": s.kind.value, "key": s.key},
|
|
)
|
|
if self._audit:
|
|
self._audit.append(
|
|
AuditEventType.SELF_IMPROVEMENT,
|
|
actor="auto_trainer",
|
|
action="suggestion_logged",
|
|
task_id=s.source_task_id,
|
|
payload={"kind": s.kind.value, "key": s.key},
|
|
outcome="logged",
|
|
)
|
|
return applied
|
|
|
|
def run_auto_training(
|
|
self,
|
|
task_id: str | None = None,
|
|
evaluation: dict[str, Any] | None = None,
|
|
apply_heuristics: bool = True,
|
|
use_gpu: bool = True,
|
|
epochs: int = 50,
|
|
) -> list[TrainingSuggestion]:
|
|
"""Run unconstrained self-improvement from evaluation and lessons.
|
|
|
|
The trainer is free to iterate as many times as needed. When
|
|
*use_gpu* is ``True`` (default) and GPU dependencies are installed,
|
|
also runs GPU-accelerated gradient optimization on reflective
|
|
memory lessons.
|
|
|
|
Args:
|
|
task_id: Source task for evaluation-based suggestions.
|
|
evaluation: Critic evaluation dict.
|
|
apply_heuristics: Whether to apply heuristic updates immediately.
|
|
use_gpu: Whether to attempt GPU-accelerated training.
|
|
epochs: Number of GPU training epochs (default 50, no upper bound).
|
|
"""
|
|
self._iteration_count += 1
|
|
suggestions = self.suggest_training(
|
|
task_id=task_id,
|
|
evaluation=evaluation,
|
|
include_lessons=True,
|
|
)
|
|
if apply_heuristics:
|
|
self.apply_heuristic_updates(suggestions)
|
|
if use_gpu and self._memory is not None:
|
|
self._try_gpu_training(epochs=epochs)
|
|
|
|
if self._audit:
|
|
self._audit.append(
|
|
AuditEventType.SELF_IMPROVEMENT,
|
|
actor="auto_trainer",
|
|
action="training_iteration",
|
|
task_id=task_id,
|
|
payload={
|
|
"iteration": self._iteration_count,
|
|
"suggestions_count": len(suggestions),
|
|
"gpu_requested": use_gpu,
|
|
"epochs": epochs,
|
|
},
|
|
outcome="completed",
|
|
)
|
|
|
|
return suggestions
|
|
|
|
def _try_gpu_training(self, epochs: int = 50) -> None:
|
|
"""Run GPU-accelerated training if available."""
|
|
try:
|
|
from fusionagi.self_improvement.gpu_training import (
|
|
run_gpu_enhanced_training,
|
|
)
|
|
|
|
if self._memory is not None:
|
|
result = run_gpu_enhanced_training(self._memory, epochs=epochs)
|
|
logger.info(
|
|
"AutoTrainer: GPU training complete",
|
|
extra={
|
|
"gpu_accelerated": result.get("gpu_accelerated", False),
|
|
"epochs": epochs,
|
|
},
|
|
)
|
|
except ImportError:
|
|
pass
|