"""Phase 2/3: end-to-end flow with stub adapter, tools, executor, critic, reflection, governance.""" from fusionagi.adapters import StubAdapter from fusionagi.agents import CriticAgent, ExecutorAgent, PlannerAgent from fusionagi.core import StateManager from fusionagi.governance import AccessControl, Guardrails, OverrideHooks, PolicyEngine, RateLimiter from fusionagi.memory import ReflectiveMemory from fusionagi.reflection import run_reflection from fusionagi.schemas import AgentMessage, AgentMessageEnvelope from fusionagi.schemas.policy import PolicyEffect, PolicyRule from fusionagi.tools import ToolDef, ToolRegistry def test_planner_with_stub_adapter() -> None: adapter = StubAdapter('{"steps":[{"id":"s1","description":"Step 1","dependencies":[]}],"fallback_paths":[]}') planner = PlannerAgent(adapter=adapter) env = AgentMessageEnvelope( message=AgentMessage(sender="o", recipient="planner", intent="plan_request", payload={"goal": "Test"}), task_id="t1", ) out = planner.handle_message(env) assert out is not None assert out.message.intent == "plan_ready" steps = out.message.payload["plan"]["steps"] assert len(steps) == 1 assert steps[0]["id"] == "s1" def test_executor_runs_tool_and_appends_trace() -> None: state = StateManager() reg = ToolRegistry() reg.register(ToolDef(name="noop", description="No-op", fn=lambda: "ok", permission_scope=["*"])) executor = ExecutorAgent(registry=reg, state_manager=state) env = AgentMessageEnvelope( message=AgentMessage( sender="o", recipient="executor", intent="execute_step", payload={ "step_id": "s1", "plan": {"steps": [{"id": "s1", "description": "No-op", "dependencies": [], "tool_name": "noop", "tool_args": {}}], "fallback_paths": []}, "tool_name": "noop", "tool_args": {}, }, ), task_id="task-1", ) out = executor.handle_message(env) assert out is not None assert out.message.intent == "step_done" trace = state.get_trace("task-1") assert len(trace) == 1 assert trace[0].get("tool") == "noop" assert trace[0].get("result") == "ok" def test_critic_returns_evaluation() -> None: critic = CriticAgent(adapter=None) env = AgentMessageEnvelope( message=AgentMessage( sender="o", recipient="critic", intent="evaluate_request", payload={"outcome": "completed", "trace": [], "plan": None}, ), task_id="t1", ) out = critic.handle_message(env) assert out is not None assert out.message.intent == "evaluation_ready" ev = out.message.payload["evaluation"] assert "score" in ev assert ev["success"] is True def test_reflection_writes_to_reflective_memory() -> None: critic = CriticAgent(adapter=None) reflective = ReflectiveMemory() ev = run_reflection(critic, "t1", "completed", [], None, reflective) assert ev is not None lessons = reflective.get_lessons(limit=5) assert len(lessons) == 1 assert lessons[0]["task_id"] == "t1" def test_guardrails_block_path() -> None: from fusionagi.schemas.audit import GovernanceMode # Advisory mode (default): blocked paths are flagged but allowed g = Guardrails() g.block_path_prefix("/etc") result = g.pre_check("file_read", {"path": "/etc/passwd"}) assert result.allowed is True assert result.advisory is True assert result.error_message result = g.pre_check("file_read", {"path": "/tmp/foo"}) assert result.allowed is True assert result.advisory is False # Enforcing mode: blocked paths are denied g_enforcing = Guardrails(mode=GovernanceMode.ENFORCING) g_enforcing.block_path_prefix("/etc") result = g_enforcing.pre_check("file_read", {"path": "/etc/passwd"}) assert result.allowed is False assert result.error_message def test_rate_limiter() -> None: from fusionagi.schemas.audit import GovernanceMode # Advisory mode (default): exceeded limits are logged but allowed r = RateLimiter(max_calls=2, window_seconds=10.0) assert r.allow("agent1")[0] is True assert r.allow("agent1")[0] is True ok, reason = r.allow("agent1") assert ok is True # Advisory mode allows assert "Advisory" in reason # Enforcing mode: exceeded limits are rejected r_enforcing = RateLimiter(max_calls=2, window_seconds=10.0, mode=GovernanceMode.ENFORCING) assert r_enforcing.allow("agent1")[0] is True assert r_enforcing.allow("agent1")[0] is True assert r_enforcing.allow("agent1")[0] is False def test_override_hooks() -> None: h = OverrideHooks() seen = [] h.register(lambda e, p: (seen.append((e, p)), True)[1]) assert h.fire("task_paused_for_approval", {"task_id": "t1"}) is True assert len(seen) == 1 assert seen[0][0] == "task_paused_for_approval" def test_access_control_deny() -> None: from fusionagi.schemas.audit import GovernanceMode # Advisory mode (default): denied access is logged but allowed ac = AccessControl() ac.deny("executor", "noop") assert ac.allowed("executor", "noop") is True # Advisory allows assert ac.allowed("executor", "other_tool") is True assert ac.allowed("planner", "noop") is True # Enforcing mode: denied access is blocked ac_enforcing = AccessControl(mode=GovernanceMode.ENFORCING) ac_enforcing.deny("executor", "noop") assert ac_enforcing.allowed("executor", "noop") is False assert ac_enforcing.allowed("executor", "other_tool") is True assert ac_enforcing.allowed("planner", "noop") is True def test_policy_engine_update_rule() -> None: pe = PolicyEngine() r = PolicyRule(rule_id="r1", effect=PolicyEffect.DENY, condition={"tool_name": "noop"}, reason="blocked", priority=1) pe.add_rule(r) assert pe.get_rule("r1") is not None assert pe.get_rule("r1").reason == "blocked" assert pe.update_rule("r1", {"reason": "updated"}) is True assert pe.get_rule("r1").reason == "updated" assert pe.update_rule("r1", {"priority": 5}) is True assert pe.get_rule("r1").priority == 5 assert pe.remove_rule("r1") is True assert pe.get_rule("r1") is None assert pe.remove_rule("r1") is False if __name__ == "__main__": test_planner_with_stub_adapter() test_executor_runs_tool_and_appends_trace() test_critic_returns_evaluation() test_reflection_writes_to_reflective_memory() test_guardrails_block_path() test_rate_limiter() test_override_hooks() test_access_control_deny() test_policy_engine_update_rule() print("Phase 2/3 tests OK")