"""Docs connector: read documents (text, markdown, PDF via extraction).""" from pathlib import Path from typing import Any from fusionagi._logger import logger from fusionagi.tools.connectors.base import BaseConnector class DocsConnector(BaseConnector): """Read and search text-based documents. Supports plain text, markdown, and basic PDF text extraction (when ``pdfplumber`` is installed). """ name = "docs" def __init__(self, base_path: str = ".") -> None: self._base = Path(base_path) def invoke(self, action: str, params: dict[str, Any]) -> Any: if action == "read": return self._read(params.get("path", "")) if action == "search": return self._search(params.get("query", ""), params.get("path", ".")) if action == "list": return self._list(params.get("path", "."), params.get("pattern", "*")) return {"error": f"Unknown action: {action}"} def _read(self, path: str) -> dict[str, Any]: target = self._base / path if not target.exists(): return {"content": "", "path": path, "error": f"File not found: {path}"} if target.suffix.lower() == ".pdf": return self._read_pdf(target, path) try: content = target.read_text(encoding="utf-8", errors="replace") return {"content": content, "path": path, "error": None, "size": len(content)} except Exception as e: logger.warning("DocsConnector read failed", extra={"path": path, "error": str(e)}) return {"content": "", "path": path, "error": str(e)} def _read_pdf(self, target: Path, path: str) -> dict[str, Any]: try: import pdfplumber with pdfplumber.open(target) as pdf: pages = [p.extract_text() or "" for p in pdf.pages] content = "\n\n".join(pages) return {"content": content, "path": path, "error": None, "pages": len(pages)} except ImportError: text = target.read_bytes()[:2000].decode("utf-8", errors="replace") return {"content": text, "path": path, "error": "pdfplumber not installed; showing raw bytes"} except Exception as e: return {"content": "", "path": path, "error": f"PDF read failed: {e}"} def _search(self, query: str, path: str) -> dict[str, Any]: results = [] target = self._base / path if not target.exists(): return {"results": [], "query": query, "error": f"Path not found: {path}"} pattern = "**/*" if target.is_dir() else str(target.name) search_dir = target if target.is_dir() else target.parent for fp in search_dir.glob(pattern): if fp.is_file() and fp.suffix in (".txt", ".md", ".rst", ".py", ".json"): try: text = fp.read_text(encoding="utf-8", errors="replace") if query.lower() in text.lower(): idx = text.lower().index(query.lower()) snippet = text[max(0, idx - 50) : idx + len(query) + 50] results.append({"file": str(fp.relative_to(self._base)), "snippet": snippet}) except Exception: continue if len(results) >= 20: break return {"results": results, "query": query, "error": None} def _list(self, path: str, pattern: str) -> dict[str, Any]: target = self._base / path if not target.is_dir(): return {"files": [], "error": f"Not a directory: {path}"} files = [str(f.relative_to(self._base)) for f in target.glob(pattern) if f.is_file()] return {"files": sorted(files)[:100], "error": None} def schema(self) -> dict[str, Any]: return { "name": self.name, "actions": ["read", "search", "list"], "parameters": {"path": "string", "query": "string", "pattern": "string"}, }