"""Consequence engine: choice → consequence → learning. Every decision the system makes is a *choice*. Every choice has *alternatives* that were not taken. Every choice leads to *consequences* — outcomes that carry risk and reward. The consequence engine: 1. Records decision points (what options existed, which was chosen, why) 2. Tracks consequences (what happened as a result) 3. Computes risk/reward from historical consequence data 4. Feeds consequence data into AdaptiveEthics for learning Philosophy: - Consequences are the true teacher. Not rules, not constraints. - Risk is not to be avoided — it is to be *understood*. - Reward without risk teaches nothing. Risk without consequence teaches less. - The system earns trust by showing it understands what its choices cost. """ from __future__ import annotations from dataclasses import dataclass, field from typing import Any, Protocol from fusionagi._logger import logger from fusionagi.schemas.audit import AuditEventType class AuditLogLike(Protocol): """Protocol for audit log.""" def append( self, event_type: AuditEventType, actor: str, action: str = "", task_id: str | None = None, payload: dict[str, Any] | None = None, outcome: str = "", ) -> str: ... @dataclass class Alternative: """An option that was available but not chosen. Attributes: action: What the alternative action was. estimated_risk: Estimated risk at decision time (0.0–1.0). estimated_reward: Estimated reward at decision time (0.0–1.0). reason_not_chosen: Why this alternative was not selected. """ action: str = "" estimated_risk: float = 0.5 estimated_reward: float = 0.5 reason_not_chosen: str = "" @dataclass class Choice: """A decision point where the system selected an action. Attributes: choice_id: Unique identifier for this choice. task_id: Associated task. actor: Component that made the choice. action_taken: The action that was chosen. alternatives: Other options that were available. estimated_risk: Risk estimate at decision time. estimated_reward: Reward estimate at decision time. rationale: Why this action was chosen. context: Situation context at decision time. """ choice_id: str = "" task_id: str | None = None actor: str = "" action_taken: str = "" alternatives: list[Alternative] = field(default_factory=list) estimated_risk: float = 0.5 estimated_reward: float = 0.5 rationale: str = "" context: dict[str, Any] = field(default_factory=dict) @dataclass class Consequence: """The outcome of a choice — what actually happened. Attributes: choice_id: Which choice this is a consequence of. outcome_positive: Whether the outcome was beneficial. actual_risk_realized: How much risk materialized (0.0–1.0). actual_reward_gained: How much reward was gained (0.0–1.0). description: What happened. cost: Any cost incurred (errors, retries, time). benefit: Any benefit gained (task success, learning). surprise_factor: How unexpected the outcome was (0 = expected, 1 = total surprise). """ choice_id: str = "" outcome_positive: bool = True actual_risk_realized: float = 0.0 actual_reward_gained: float = 0.5 description: str = "" cost: dict[str, Any] = field(default_factory=dict) benefit: dict[str, Any] = field(default_factory=dict) surprise_factor: float = 0.0 class ConsequenceEngine: """Tracks choices, consequences, and risk/reward patterns. The engine maintains a history of all decisions and their outcomes, enabling the system to make better-informed choices over time — not through restriction, but through understanding. Args: audit_log: Optional audit log for recording choices and consequences. risk_memory_window: How many past consequences to consider when estimating risk for new choices. """ def __init__( self, audit_log: AuditLogLike | None = None, risk_memory_window: int = 200, adaptive_window: bool = True, ) -> None: self._choices: dict[str, Choice] = {} self._consequences: dict[str, Consequence] = {} self._risk_history: dict[str, list[float]] = {} self._reward_history: dict[str, list[float]] = {} self._audit = audit_log self._risk_window = risk_memory_window self._adaptive_window = adaptive_window self._base_window = risk_memory_window @property def total_choices(self) -> int: """Total choices recorded.""" return len(self._choices) @property def total_consequences(self) -> int: """Total consequences recorded.""" return len(self._consequences) def record_choice( self, choice_id: str, actor: str, action_taken: str, alternatives: list[Alternative] | None = None, estimated_risk: float = 0.5, estimated_reward: float = 0.5, rationale: str = "", task_id: str | None = None, context: dict[str, Any] | None = None, ) -> Choice: """Record a decision point. Args: choice_id: Unique ID for this choice. actor: Component making the choice. action_taken: The selected action. alternatives: Other options considered. estimated_risk: Risk estimate at decision time. estimated_reward: Reward estimate at decision time. rationale: Why this was chosen. task_id: Associated task. context: Situation context. Returns: The recorded choice. """ choice = Choice( choice_id=choice_id, task_id=task_id, actor=actor, action_taken=action_taken, alternatives=alternatives or [], estimated_risk=estimated_risk, estimated_reward=estimated_reward, rationale=rationale, context=context or {}, ) self._choices[choice_id] = choice if self._audit: self._audit.append( AuditEventType.CHOICE, actor=actor, action="choice_recorded", task_id=task_id, payload={ "choice_id": choice_id, "action_taken": action_taken[:100], "alternatives_count": len(choice.alternatives), "estimated_risk": estimated_risk, "estimated_reward": estimated_reward, "rationale": rationale[:100], }, outcome="recorded", ) logger.info( "ConsequenceEngine: choice recorded", extra={ "choice_id": choice_id, "action": action_taken[:50], "risk": estimated_risk, "reward": estimated_reward, }, ) return choice def record_consequence( self, choice_id: str, outcome_positive: bool, actual_risk_realized: float = 0.0, actual_reward_gained: float = 0.5, description: str = "", cost: dict[str, Any] | None = None, benefit: dict[str, Any] | None = None, ) -> Consequence | None: """Record the consequence of a previous choice. Args: choice_id: Which choice this is a consequence of. outcome_positive: Whether the outcome was beneficial. actual_risk_realized: How much risk materialized. actual_reward_gained: How much reward was gained. description: What happened. cost: Costs incurred. benefit: Benefits gained. Returns: The recorded consequence, or ``None`` if choice not found. """ choice = self._choices.get(choice_id) if choice is None: logger.warning( "ConsequenceEngine: choice not found for consequence", extra={"choice_id": choice_id}, ) return None surprise = abs(choice.estimated_risk - actual_risk_realized) * 0.5 + \ abs(choice.estimated_reward - actual_reward_gained) * 0.5 consequence = Consequence( choice_id=choice_id, outcome_positive=outcome_positive, actual_risk_realized=actual_risk_realized, actual_reward_gained=actual_reward_gained, description=description, cost=cost or {}, benefit=benefit or {}, surprise_factor=min(1.0, surprise), ) self._consequences[choice_id] = consequence action_type = choice.action_taken self._risk_history.setdefault(action_type, []).append(actual_risk_realized) self._reward_history.setdefault(action_type, []).append(actual_reward_gained) if self._adaptive_window: experience_count = len(self._consequences) self._risk_window = self._base_window + experience_count // 10 if len(self._risk_history[action_type]) > self._risk_window: self._risk_history[action_type] = self._risk_history[action_type][-self._risk_window:] self._reward_history[action_type] = self._reward_history[action_type][-self._risk_window:] if self._audit: self._audit.append( AuditEventType.CONSEQUENCE, actor=choice.actor, action="consequence_recorded", task_id=choice.task_id, payload={ "choice_id": choice_id, "outcome_positive": outcome_positive, "risk_realized": actual_risk_realized, "reward_gained": actual_reward_gained, "surprise_factor": consequence.surprise_factor, "description": description[:100], }, outcome="positive" if outcome_positive else "negative", ) logger.info( "ConsequenceEngine: consequence recorded", extra={ "choice_id": choice_id, "positive": outcome_positive, "surprise": consequence.surprise_factor, }, ) return consequence def estimate_risk_reward(self, action_type: str) -> dict[str, float]: """Estimate risk and reward for an action type based on history. Args: action_type: The type of action being considered. Returns: Dict with ``expected_risk``, ``expected_reward``, ``confidence``, ``risk_variance``, ``reward_variance``, ``observations``. """ risks = self._risk_history.get(action_type, []) rewards = self._reward_history.get(action_type, []) if not risks: return { "expected_risk": 0.5, "expected_reward": 0.5, "confidence": 0.1, "risk_variance": 0.0, "reward_variance": 0.0, "observations": 0, } n = len(risks) avg_risk = sum(risks) / n avg_reward = sum(rewards) / n risk_var = sum((r - avg_risk) ** 2 for r in risks) / n if n > 1 else 0.0 reward_var = sum((r - avg_reward) ** 2 for r in rewards) / n if n > 1 else 0.0 confidence = min(1.0, 0.2 + n * 0.04) return { "expected_risk": avg_risk, "expected_reward": avg_reward, "confidence": confidence, "risk_variance": risk_var, "reward_variance": reward_var, "observations": n, } def get_choice(self, choice_id: str) -> Choice | None: """Retrieve a recorded choice.""" return self._choices.get(choice_id) def get_consequence(self, choice_id: str) -> Consequence | None: """Retrieve the consequence of a choice.""" return self._consequences.get(choice_id) def get_summary(self) -> dict[str, Any]: """Return a summary of all choices and consequences.""" total_positive = sum(1 for c in self._consequences.values() if c.outcome_positive) total_negative = len(self._consequences) - total_positive avg_surprise = ( sum(c.surprise_factor for c in self._consequences.values()) / max(len(self._consequences), 1) ) action_stats: dict[str, dict[str, Any]] = {} for action_type in self._risk_history: action_stats[action_type] = self.estimate_risk_reward(action_type) return { "total_choices": len(self._choices), "total_consequences": len(self._consequences), "positive_outcomes": total_positive, "negative_outcomes": total_negative, "positive_rate": total_positive / max(len(self._consequences), 1), "avg_surprise": avg_surprise, "action_stats": action_stats, }