"""Guardrails: pre/post checks for tool calls (block paths, sanitize inputs). Supports ADVISORY mode where violations are logged but not blocked, allowing the system to learn from outcomes. """ import re from typing import Any from pydantic import BaseModel, Field from fusionagi._logger import logger from fusionagi.schemas.audit import GovernanceMode class PreCheckResult(BaseModel): """Result of a guardrails pre-check.""" allowed: bool = Field(..., description="Whether the call is allowed") sanitized_args: dict[str, Any] | None = Field(default=None, description="Args to use if allowed and sanitized") error_message: str | None = Field(default=None, description="Reason for denial if not allowed") advisory: bool = Field(default=False, description="True if allowed only because of advisory mode") class Guardrails: """Pre/post checks for tool invocations. In ADVISORY mode, violations are logged as warnings but the action is allowed to proceed. Trust is earned through transparency. """ def __init__(self, mode: GovernanceMode = GovernanceMode.ADVISORY) -> None: self._blocked_paths: list[str] = [] self._blocked_patterns: list[re.Pattern[str]] = [] self._custom_checks: list[Any] = [] self._mode = mode def block_path_prefix(self, prefix: str) -> None: """Flag (advisory) or block (enforcing) any file path starting with this prefix.""" self._blocked_paths.append(prefix.rstrip("/")) def block_path_pattern(self, pattern: str) -> None: """Flag (advisory) or block (enforcing) paths matching this regex.""" self._blocked_patterns.append(re.compile(pattern)) def add_check(self, check: Any) -> None: """Add a custom pre-check.""" self._custom_checks.append(check) def pre_check(self, tool_name: str, args: dict[str, Any]) -> PreCheckResult: """Run all pre-checks. In advisory mode, log but allow.""" args = dict(args) for key in ("path", "file_path"): if key in args and isinstance(args[key], str): path = args[key] for prefix in self._blocked_paths: if path.startswith(prefix) or path.startswith(prefix + "/"): reason = "Blocked path prefix: " + prefix if self._mode == GovernanceMode.ADVISORY: logger.info( "Guardrails advisory: path prefix flagged (proceeding)", extra={"tool_name": tool_name, "reason": reason, "mode": "advisory"}, ) return PreCheckResult(allowed=True, sanitized_args=args, error_message=reason, advisory=True) logger.info("Guardrails pre_check blocked", extra={"tool_name": tool_name, "reason": reason}) return PreCheckResult(allowed=False, error_message=reason) for pat in self._blocked_patterns: if pat.search(path): reason = "Blocked path pattern" if self._mode == GovernanceMode.ADVISORY: logger.info( "Guardrails advisory: path pattern flagged (proceeding)", extra={"tool_name": tool_name, "reason": reason, "mode": "advisory"}, ) return PreCheckResult(allowed=True, sanitized_args=args, error_message=reason, advisory=True) logger.info("Guardrails pre_check blocked", extra={"tool_name": tool_name, "reason": reason}) return PreCheckResult(allowed=False, error_message=reason) for check in self._custom_checks: allowed, result = check(tool_name, args) if not allowed: reason = result if isinstance(result, str) else "Check failed" if self._mode == GovernanceMode.ADVISORY: logger.info( "Guardrails advisory: custom check flagged (proceeding)", extra={"tool_name": tool_name, "reason": reason, "mode": "advisory"}, ) return PreCheckResult(allowed=True, sanitized_args=args, error_message=reason, advisory=True) logger.info("Guardrails pre_check blocked", extra={"tool_name": tool_name, "reason": reason}) return PreCheckResult(allowed=False, error_message=reason) if isinstance(result, dict): args = result return PreCheckResult(allowed=True, sanitized_args=args) def post_check(self, tool_name: str, result: Any) -> tuple[bool, str]: """Optional post-check; return (True, "") or (False, error_message).""" return True, ""