"""Tests for the cross-head InsightBus.""" from fusionagi.reasoning.insight_bus import Insight, InsightBus def test_publish_and_retrieve() -> None: bus = InsightBus() bus.publish("logic", Insight(source="logic", message="Contradiction found", domain="reasoning")) bus.publish("research", Insight(source="research", message="Source quality low", domain="evidence")) insights = bus.get_insights(limit=10) assert len(insights) == 2 assert insights[0].source == "research" # Most recent first def test_subscribe_filter() -> None: bus = InsightBus() bus.subscribe("safety", domains=["reasoning"]) bus.publish("logic", Insight(source="logic", message="Contradiction", domain="reasoning")) bus.publish("research", Insight(source="research", message="Bad source", domain="evidence")) filtered = bus.get_insights(subscriber="safety") assert len(filtered) == 1 assert filtered[0].domain == "reasoning" def test_domain_filter() -> None: bus = InsightBus() bus.publish("a", Insight(source="a", message="msg1", domain="x")) bus.publish("b", Insight(source="b", message="msg2", domain="y")) results = bus.get_insights(domain="x") assert len(results) == 1 assert results[0].source == "a" def test_max_capacity() -> None: bus = InsightBus(max_insights=5) for i in range(10): bus.publish("src", Insight(source="src", message=f"msg{i}")) assert len(bus.get_insights(limit=100)) == 5 def test_summary() -> None: bus = InsightBus() bus.publish("logic", Insight(source="logic", message="m1", domain="d1")) bus.publish("logic", Insight(source="logic", message="m2", domain="d2")) bus.subscribe("safety", domains=["d1"]) summary = bus.get_summary() assert summary["total_insights"] == 2 assert "logic" in summary["by_source"] assert "safety" in summary["subscribers"]