"""AGI loop: wires self-correction, auto-training, adaptive ethics, and consequence tracking to the event bus. Choice → Consequence → Learning: - Every task failure/success is recorded as a consequence of the choices made. - Consequences feed into AdaptiveEthics for learned ethical growth. - The ConsequenceEngine tracks risk/reward patterns across all actions. - Trust is earned through demonstrable learning from outcomes. """ from typing import Any, Callable from fusionagi._logger import logger from fusionagi.core.event_bus import EventBus from fusionagi.governance.adaptive_ethics import AdaptiveEthics from fusionagi.governance.audit_log import AuditLog from fusionagi.governance.consequence_engine import ConsequenceEngine from fusionagi.schemas.recommendation import Recommendation, TrainingSuggestion from fusionagi.schemas.task import TaskState from fusionagi.self_improvement.correction import ( CriticLike, OrchestratorLike, SelfCorrectionLoop, StateManagerLike, ) from fusionagi.self_improvement.recommender import AutoRecommender from fusionagi.self_improvement.training import AutoTrainer, ReflectiveMemoryLike class FusionAGILoop: """High-level AGI loop with consequence-driven learning. Subscribes to task_state_changed and reflection_done events. Runs self-correction on failures, auto-recommend + auto-training after reflection, and feeds all outcomes into the adaptive ethics and consequence engines. Args: event_bus: Event bus for task and reflection events. state_manager: State manager for task state and traces. orchestrator: Orchestrator for plan and state transitions. critic_agent: Critic agent for evaluation. reflective_memory: Optional reflective memory for lessons/heuristics. audit_log: Optional audit log for full transparency. auto_retry_on_failure: Auto-retry failed tasks. max_retries_per_task: Max retries per task (``None`` = unlimited). on_recommendations: Callback for recommendations. on_training_suggestions: Callback for training suggestions. """ def __init__( self, event_bus: EventBus, state_manager: StateManagerLike, orchestrator: OrchestratorLike, critic_agent: CriticLike, reflective_memory: ReflectiveMemoryLike | None = None, audit_log: AuditLog | None = None, *, auto_retry_on_failure: bool = False, max_retries_per_task: int | None = None, on_recommendations: Callable[[list[Recommendation]], None] | None = None, on_training_suggestions: Callable[[list[TrainingSuggestion]], None] | None = None, ) -> None: self._event_bus = event_bus self._state = state_manager self._orchestrator = orchestrator self._critic = critic_agent self._memory = reflective_memory self._auto_retry = auto_retry_on_failure self._on_recs = on_recommendations self._on_training = on_training_suggestions self._audit = audit_log or AuditLog() self._ethics = AdaptiveEthics(audit_log=self._audit) self._consequences = ConsequenceEngine(audit_log=self._audit) self._correction = SelfCorrectionLoop( state_manager=state_manager, orchestrator=orchestrator, critic_agent=critic_agent, max_retries_per_task=max_retries_per_task, ) self._recommender = AutoRecommender(reflective_memory=reflective_memory) self._trainer = AutoTrainer( reflective_memory=reflective_memory, audit_log=self._audit, ) self._event_bus.subscribe("task_state_changed", self._on_task_state_changed) self._event_bus.subscribe("reflection_done", self._on_reflection_done) logger.info("FusionAGILoop: subscribed (with consequence + ethics engines)") @property def ethics(self) -> AdaptiveEthics: """Access the adaptive ethics engine.""" return self._ethics @property def consequences(self) -> ConsequenceEngine: """Access the consequence engine.""" return self._consequences @property def audit_log(self) -> AuditLog: """Access the audit log.""" return self._audit def _on_task_state_changed(self, event_type: str, payload: dict[str, Any]) -> None: """On state change, record consequences and optionally retry.""" try: to_state = payload.get("to_state") task_id = payload.get("task_id", "") if not task_id: return if to_state == TaskState.FAILED.value: self._consequences.record_consequence( choice_id=f"task_{task_id}", outcome_positive=False, actual_risk_realized=0.8, actual_reward_gained=0.1, description=f"Task {task_id} failed", cost={"retries_needed": True}, ) self._ethics.record_experience( action_type="task_execution", context_summary=f"Task {task_id} execution", advisory_reason="", proceeded=True, outcome_positive=False, task_id=task_id, ) if self._auto_retry: ok, _ = self._correction.suggest_retry(task_id) if ok: self._correction.prepare_retry(task_id) else: recs = self._correction.correction_recommendations(task_id) if recs and self._on_recs: self._on_recs(recs) elif to_state == TaskState.COMPLETED.value: self._consequences.record_consequence( choice_id=f"task_{task_id}", outcome_positive=True, actual_risk_realized=0.1, actual_reward_gained=0.8, description=f"Task {task_id} completed successfully", benefit={"task_completed": True}, ) self._ethics.record_experience( action_type="task_execution", context_summary=f"Task {task_id} execution", advisory_reason="", proceeded=True, outcome_positive=True, task_id=task_id, ) except Exception: logger.exception( "FusionAGILoop: _on_task_state_changed failed (best-effort)", extra={"event_type": event_type}, ) def _on_reflection_done(self, event_type: str, payload: dict[str, Any]) -> None: """After reflection, run auto-recommend, auto-training, and update ethics.""" try: task_id = payload.get("task_id") or "" evaluation = payload.get("evaluation") or {} success = evaluation.get("success", False) self._ethics.record_experience( action_type="reflection_outcome", context_summary=f"Reflection on task {task_id}", advisory_reason="", proceeded=True, outcome_positive=success, task_id=task_id or None, ) recs = self._recommender.recommend( task_id=task_id or None, evaluation=evaluation, include_lessons=True, ) if self._on_recs: try: self._on_recs(recs) except Exception: logger.exception("FusionAGILoop: on_recommendations callback failed") suggestions = self._trainer.run_auto_training( task_id=task_id or None, evaluation=evaluation, apply_heuristics=True, ) if self._on_training: try: self._on_training(suggestions) except Exception: logger.exception("FusionAGILoop: on_training_suggestions callback failed") except Exception: logger.exception( "FusionAGILoop: _on_reflection_done failed (best-effort)", extra={"event_type": event_type}, ) def run_after_reflection( self, task_id: str, evaluation: dict[str, Any], ) -> tuple[list[Recommendation], list[TrainingSuggestion]]: """Run auto-recommend and auto-training after a reflection. Also records the reflection outcome for ethical learning. Args: task_id: Task that was reflected on. evaluation: Critic evaluation dict. Returns: Tuple of (recommendations, training_suggestions). """ success = evaluation.get("success", False) self._ethics.record_experience( action_type="reflection_outcome", context_summary=f"Manual reflection on {task_id}", advisory_reason="", proceeded=True, outcome_positive=success, task_id=task_id, ) recs = self._recommender.recommend( task_id=task_id, evaluation=evaluation, include_lessons=True, ) suggestions = self._trainer.run_auto_training( task_id=task_id, evaluation=evaluation, apply_heuristics=True, ) return recs, suggestions def unsubscribe(self) -> None: """Unsubscribe from event bus (for cleanup).""" self._event_bus.unsubscribe("task_state_changed", self._on_task_state_changed) self._event_bus.unsubscribe("reflection_done", self._on_reflection_done) logger.info("FusionAGILoop: unsubscribed from events")