Some checks failed
Choice → Consequence → Learning: - ConsequenceEngine tracks every decision point with alternatives, risk/reward estimates, and actual outcomes - Consequences feed into AdaptiveEthics for experience-based learning - FusionAGILoop now wires ethics + consequences into task lifecycle Causal World Model: - CausalWorldModel learns state-transition patterns from execution history - Predicts outcomes based on observed action→effect patterns - Uncertainty estimates decrease as more evidence accumulates Metacognition: - assess_head_outputs() evaluates reasoning quality from head outputs - Detects knowledge gaps, measures head agreement, identifies uncertainty - Actively recommends whether to seek more information Interpretability: - ReasoningTracer captures full prompt→answer reasoning traces - Each step records stage, component, input/output, timing - explain() generates human-readable reasoning explanations Claim Verification: - ClaimVerifier cross-checks claims for evidence, consistency, grounding - Flags high-confidence claims lacking evidence support - Detects contradictions between claims from different heads 325 tests passing, 0 ruff errors, 0 mypy errors. Co-Authored-By: Nakamoto, S <defi@defi-oracle.io>
256 lines
9.7 KiB
Python
256 lines
9.7 KiB
Python
"""AGI loop: wires self-correction, auto-training, adaptive ethics, and
|
|
consequence tracking to the event bus.
|
|
|
|
Choice → Consequence → Learning:
|
|
- Every task failure/success is recorded as a consequence of the choices made.
|
|
- Consequences feed into AdaptiveEthics for learned ethical growth.
|
|
- The ConsequenceEngine tracks risk/reward patterns across all actions.
|
|
- Trust is earned through demonstrable learning from outcomes.
|
|
"""
|
|
|
|
from typing import Any, Callable
|
|
|
|
from fusionagi._logger import logger
|
|
from fusionagi.core.event_bus import EventBus
|
|
from fusionagi.governance.adaptive_ethics import AdaptiveEthics
|
|
from fusionagi.governance.audit_log import AuditLog
|
|
from fusionagi.governance.consequence_engine import ConsequenceEngine
|
|
from fusionagi.schemas.recommendation import Recommendation, TrainingSuggestion
|
|
from fusionagi.schemas.task import TaskState
|
|
from fusionagi.self_improvement.correction import (
|
|
CriticLike,
|
|
OrchestratorLike,
|
|
SelfCorrectionLoop,
|
|
StateManagerLike,
|
|
)
|
|
from fusionagi.self_improvement.recommender import AutoRecommender
|
|
from fusionagi.self_improvement.training import AutoTrainer, ReflectiveMemoryLike
|
|
|
|
|
|
class FusionAGILoop:
|
|
"""High-level AGI loop with consequence-driven learning.
|
|
|
|
Subscribes to task_state_changed and reflection_done events.
|
|
Runs self-correction on failures, auto-recommend + auto-training
|
|
after reflection, and feeds all outcomes into the adaptive ethics
|
|
and consequence engines.
|
|
|
|
Args:
|
|
event_bus: Event bus for task and reflection events.
|
|
state_manager: State manager for task state and traces.
|
|
orchestrator: Orchestrator for plan and state transitions.
|
|
critic_agent: Critic agent for evaluation.
|
|
reflective_memory: Optional reflective memory for lessons/heuristics.
|
|
audit_log: Optional audit log for full transparency.
|
|
auto_retry_on_failure: Auto-retry failed tasks.
|
|
max_retries_per_task: Max retries per task (``None`` = unlimited).
|
|
on_recommendations: Callback for recommendations.
|
|
on_training_suggestions: Callback for training suggestions.
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
event_bus: EventBus,
|
|
state_manager: StateManagerLike,
|
|
orchestrator: OrchestratorLike,
|
|
critic_agent: CriticLike,
|
|
reflective_memory: ReflectiveMemoryLike | None = None,
|
|
audit_log: AuditLog | None = None,
|
|
*,
|
|
auto_retry_on_failure: bool = False,
|
|
max_retries_per_task: int | None = None,
|
|
on_recommendations: Callable[[list[Recommendation]], None] | None = None,
|
|
on_training_suggestions: Callable[[list[TrainingSuggestion]], None] | None = None,
|
|
) -> None:
|
|
self._event_bus = event_bus
|
|
self._state = state_manager
|
|
self._orchestrator = orchestrator
|
|
self._critic = critic_agent
|
|
self._memory = reflective_memory
|
|
self._auto_retry = auto_retry_on_failure
|
|
self._on_recs = on_recommendations
|
|
self._on_training = on_training_suggestions
|
|
|
|
self._audit = audit_log or AuditLog()
|
|
self._ethics = AdaptiveEthics(audit_log=self._audit)
|
|
self._consequences = ConsequenceEngine(audit_log=self._audit)
|
|
|
|
self._correction = SelfCorrectionLoop(
|
|
state_manager=state_manager,
|
|
orchestrator=orchestrator,
|
|
critic_agent=critic_agent,
|
|
max_retries_per_task=max_retries_per_task,
|
|
)
|
|
self._recommender = AutoRecommender(reflective_memory=reflective_memory)
|
|
self._trainer = AutoTrainer(
|
|
reflective_memory=reflective_memory,
|
|
audit_log=self._audit,
|
|
)
|
|
|
|
self._event_bus.subscribe("task_state_changed", self._on_task_state_changed)
|
|
self._event_bus.subscribe("reflection_done", self._on_reflection_done)
|
|
logger.info("FusionAGILoop: subscribed (with consequence + ethics engines)")
|
|
|
|
@property
|
|
def ethics(self) -> AdaptiveEthics:
|
|
"""Access the adaptive ethics engine."""
|
|
return self._ethics
|
|
|
|
@property
|
|
def consequences(self) -> ConsequenceEngine:
|
|
"""Access the consequence engine."""
|
|
return self._consequences
|
|
|
|
@property
|
|
def audit_log(self) -> AuditLog:
|
|
"""Access the audit log."""
|
|
return self._audit
|
|
|
|
def _on_task_state_changed(self, event_type: str, payload: dict[str, Any]) -> None:
|
|
"""On state change, record consequences and optionally retry."""
|
|
try:
|
|
to_state = payload.get("to_state")
|
|
task_id = payload.get("task_id", "")
|
|
if not task_id:
|
|
return
|
|
|
|
if to_state == TaskState.FAILED.value:
|
|
self._consequences.record_consequence(
|
|
choice_id=f"task_{task_id}",
|
|
outcome_positive=False,
|
|
actual_risk_realized=0.8,
|
|
actual_reward_gained=0.1,
|
|
description=f"Task {task_id} failed",
|
|
cost={"retries_needed": True},
|
|
)
|
|
|
|
self._ethics.record_experience(
|
|
action_type="task_execution",
|
|
context_summary=f"Task {task_id} execution",
|
|
advisory_reason="",
|
|
proceeded=True,
|
|
outcome_positive=False,
|
|
task_id=task_id,
|
|
)
|
|
|
|
if self._auto_retry:
|
|
ok, _ = self._correction.suggest_retry(task_id)
|
|
if ok:
|
|
self._correction.prepare_retry(task_id)
|
|
else:
|
|
recs = self._correction.correction_recommendations(task_id)
|
|
if recs and self._on_recs:
|
|
self._on_recs(recs)
|
|
|
|
elif to_state == TaskState.COMPLETED.value:
|
|
self._consequences.record_consequence(
|
|
choice_id=f"task_{task_id}",
|
|
outcome_positive=True,
|
|
actual_risk_realized=0.1,
|
|
actual_reward_gained=0.8,
|
|
description=f"Task {task_id} completed successfully",
|
|
benefit={"task_completed": True},
|
|
)
|
|
|
|
self._ethics.record_experience(
|
|
action_type="task_execution",
|
|
context_summary=f"Task {task_id} execution",
|
|
advisory_reason="",
|
|
proceeded=True,
|
|
outcome_positive=True,
|
|
task_id=task_id,
|
|
)
|
|
|
|
except Exception:
|
|
logger.exception(
|
|
"FusionAGILoop: _on_task_state_changed failed (best-effort)",
|
|
extra={"event_type": event_type},
|
|
)
|
|
|
|
def _on_reflection_done(self, event_type: str, payload: dict[str, Any]) -> None:
|
|
"""After reflection, run auto-recommend, auto-training, and update ethics."""
|
|
try:
|
|
task_id = payload.get("task_id") or ""
|
|
evaluation = payload.get("evaluation") or {}
|
|
|
|
success = evaluation.get("success", False)
|
|
|
|
self._ethics.record_experience(
|
|
action_type="reflection_outcome",
|
|
context_summary=f"Reflection on task {task_id}",
|
|
advisory_reason="",
|
|
proceeded=True,
|
|
outcome_positive=success,
|
|
task_id=task_id or None,
|
|
)
|
|
|
|
recs = self._recommender.recommend(
|
|
task_id=task_id or None,
|
|
evaluation=evaluation,
|
|
include_lessons=True,
|
|
)
|
|
if self._on_recs:
|
|
try:
|
|
self._on_recs(recs)
|
|
except Exception:
|
|
logger.exception("FusionAGILoop: on_recommendations callback failed")
|
|
suggestions = self._trainer.run_auto_training(
|
|
task_id=task_id or None,
|
|
evaluation=evaluation,
|
|
apply_heuristics=True,
|
|
)
|
|
if self._on_training:
|
|
try:
|
|
self._on_training(suggestions)
|
|
except Exception:
|
|
logger.exception("FusionAGILoop: on_training_suggestions callback failed")
|
|
except Exception:
|
|
logger.exception(
|
|
"FusionAGILoop: _on_reflection_done failed (best-effort)",
|
|
extra={"event_type": event_type},
|
|
)
|
|
|
|
def run_after_reflection(
|
|
self,
|
|
task_id: str,
|
|
evaluation: dict[str, Any],
|
|
) -> tuple[list[Recommendation], list[TrainingSuggestion]]:
|
|
"""Run auto-recommend and auto-training after a reflection.
|
|
|
|
Also records the reflection outcome for ethical learning.
|
|
|
|
Args:
|
|
task_id: Task that was reflected on.
|
|
evaluation: Critic evaluation dict.
|
|
|
|
Returns:
|
|
Tuple of (recommendations, training_suggestions).
|
|
"""
|
|
success = evaluation.get("success", False)
|
|
self._ethics.record_experience(
|
|
action_type="reflection_outcome",
|
|
context_summary=f"Manual reflection on {task_id}",
|
|
advisory_reason="",
|
|
proceeded=True,
|
|
outcome_positive=success,
|
|
task_id=task_id,
|
|
)
|
|
|
|
recs = self._recommender.recommend(
|
|
task_id=task_id,
|
|
evaluation=evaluation,
|
|
include_lessons=True,
|
|
)
|
|
suggestions = self._trainer.run_auto_training(
|
|
task_id=task_id,
|
|
evaluation=evaluation,
|
|
apply_heuristics=True,
|
|
)
|
|
return recs, suggestions
|
|
|
|
def unsubscribe(self) -> None:
|
|
"""Unsubscribe from event bus (for cleanup)."""
|
|
self._event_bus.unsubscribe("task_state_changed", self._on_task_state_changed)
|
|
self._event_bus.unsubscribe("reflection_done", self._on_reflection_done)
|
|
logger.info("FusionAGILoop: unsubscribed from events")
|