"""Claim verification: cross-check claims against known facts and evidence. Provides formal verification of claims produced by the reasoning pipeline before they reach the final output. Each claim is checked for: - Internal consistency (does it contradict other claims in the same response?) - Evidence support (how well-supported is this claim by cited evidence?) - Confidence calibration (is the claimed confidence appropriate?) - Factual grounding (can the claim be grounded in the semantic graph?) """ from __future__ import annotations from dataclasses import dataclass, field from typing import Any, Protocol from fusionagi._logger import logger from fusionagi.schemas.head import HeadClaim, HeadOutput class SemanticGraphLike(Protocol): """Protocol for semantic graph memory.""" def query_units( self, unit_ids: list[str] | None = None, content_contains: str | None = None, limit: int = 50, ) -> list[Any]: ... @dataclass class VerificationResult: """Result of verifying a single claim. Attributes: claim_text: The claim that was verified. verified: Whether the claim passed verification. confidence_calibrated: Whether confidence seems well-calibrated. evidence_score: Evidence support strength (0.0–1.0). consistency_score: Internal consistency with other claims (0.0–1.0). grounding_score: Grounding in known facts (0.0–1.0). issues: List of issues found. overall_score: Composite verification score (0.0–1.0). """ claim_text: str = "" verified: bool = True confidence_calibrated: bool = True evidence_score: float = 0.5 consistency_score: float = 1.0 grounding_score: float = 0.5 issues: list[str] = field(default_factory=list) overall_score: float = 0.5 @dataclass class VerificationReport: """Verification report for all claims in a response. Attributes: results: Per-claim verification results. overall_integrity: Overall response integrity (0.0–1.0). total_claims: Total claims checked. verified_count: How many passed verification. flagged_count: How many were flagged with issues. recommendations: Suggested actions based on verification. """ results: list[VerificationResult] = field(default_factory=list) overall_integrity: float = 0.5 total_claims: int = 0 verified_count: int = 0 flagged_count: int = 0 recommendations: list[str] = field(default_factory=list) class ClaimVerifier: """Verifies claims from head outputs against evidence and known facts. Args: semantic_graph: Optional semantic graph for fact grounding. min_evidence_for_high_conf: Minimum evidence items expected for high-confidence claims (>=0.8). """ def __init__( self, semantic_graph: SemanticGraphLike | None = None, min_evidence_for_high_conf: int = 2, ) -> None: self._graph = semantic_graph self._min_evidence_high = min_evidence_for_high_conf def verify_outputs(self, outputs: list[HeadOutput]) -> VerificationReport: """Verify all claims across all head outputs. Args: outputs: Head outputs to verify. Returns: Comprehensive verification report. """ all_claims: list[tuple[HeadClaim, str]] = [] for out in outputs: for claim in out.claims: all_claims.append((claim, out.head_id.value)) results: list[VerificationResult] = [] for claim, head_id in all_claims: result = self._verify_claim(claim, head_id, all_claims) results.append(result) verified = sum(1 for r in results if r.verified) flagged = sum(1 for r in results if r.issues) overall = ( sum(r.overall_score for r in results) / max(len(results), 1) ) recommendations: list[str] = [] if flagged > len(results) * 0.3: recommendations.append( f"{flagged}/{len(results)} claims flagged — consider second-pass verification" ) uncalibrated = [r for r in results if not r.confidence_calibrated] if uncalibrated: recommendations.append( f"{len(uncalibrated)} claims with miscalibrated confidence" ) low_evidence = [r for r in results if r.evidence_score < 0.3] if low_evidence: recommendations.append( f"{len(low_evidence)} claims lack evidence support" ) report = VerificationReport( results=results, overall_integrity=overall, total_claims=len(results), verified_count=verified, flagged_count=flagged, recommendations=recommendations, ) logger.info( "ClaimVerifier: verification complete", extra={ "total": report.total_claims, "verified": report.verified_count, "flagged": report.flagged_count, "integrity": report.overall_integrity, }, ) return report def _verify_claim( self, claim: HeadClaim, head_id: str, all_claims: list[tuple[HeadClaim, str]], ) -> VerificationResult: """Verify a single claim.""" issues: list[str] = [] evidence_score = self._check_evidence(claim, issues) calibrated = self._check_calibration(claim, evidence_score, issues) consistency_score = self._check_consistency(claim, head_id, all_claims, issues) grounding_score = self._check_grounding(claim, issues) overall = ( 0.35 * evidence_score + 0.25 * consistency_score + 0.25 * grounding_score + 0.15 * (1.0 if calibrated else 0.5) ) return VerificationResult( claim_text=claim.claim_text, verified=len(issues) == 0, confidence_calibrated=calibrated, evidence_score=evidence_score, consistency_score=consistency_score, grounding_score=grounding_score, issues=issues, overall_score=overall, ) def _check_evidence(self, claim: HeadClaim, issues: list[str]) -> float: """Check how well a claim is supported by evidence.""" if not claim.evidence: issues.append("No evidence cited") return 0.1 score = min(1.0, len(claim.evidence) / 3.0) if claim.confidence >= 0.8 and len(claim.evidence) < self._min_evidence_high: issues.append( f"High confidence ({claim.confidence:.2f}) with only " f"{len(claim.evidence)} evidence item(s)" ) score *= 0.7 return score def _check_calibration( self, claim: HeadClaim, evidence_score: float, issues: list[str], ) -> bool: """Check if confidence is well-calibrated relative to evidence.""" if claim.confidence >= 0.9 and evidence_score < 0.3: issues.append( f"Confidence {claim.confidence:.2f} not supported by evidence " f"(evidence score: {evidence_score:.2f})" ) return False if claim.confidence >= 0.8 and evidence_score < 0.2: issues.append("Very high confidence with minimal evidence") return False return True def _check_consistency( self, claim: HeadClaim, head_id: str, all_claims: list[tuple[HeadClaim, str]], issues: list[str], ) -> float: """Check if this claim is consistent with other claims.""" claim_words = set(claim.claim_text.lower().split()) neg_words = {"not", "no", "never", "none", "cannot", "shouldn't", "won't"} claim_has_neg = bool(claim_words & neg_words) contradictions = 0 comparisons = 0 for other_claim, other_head in all_claims: if other_claim is claim: continue other_words = set(other_claim.claim_text.lower().split()) overlap = len(claim_words & other_words) / max(len(claim_words), 1) if overlap < 0.2: continue comparisons += 1 other_has_neg = bool(other_words & neg_words) if claim_has_neg != other_has_neg and overlap > 0.3: contradictions += 1 issues.append( f"Potential contradiction with claim from '{other_head}': " f"'{other_claim.claim_text[:60]}...'" ) if comparisons == 0: return 0.7 return max(0.0, 1.0 - contradictions / max(comparisons, 1)) def _check_grounding(self, claim: HeadClaim, issues: list[str]) -> float: """Check if the claim can be grounded in the semantic graph.""" if self._graph is None: return 0.5 try: claim_keywords = claim.claim_text[:80] units = self._graph.query_units(content_contains=claim_keywords, limit=5) if not units: return 0.3 return min(1.0, 0.3 + len(units) * 0.15) except Exception: logger.debug("ClaimVerifier: grounding check failed (non-fatal)") return 0.5