102 lines
3.3 KiB
Python
102 lines
3.3 KiB
Python
"""pgvector-backed VectorMemory adapter. Requires: pip install fusionagi[vector]."""
|
|
|
|
from typing import Any
|
|
|
|
from fusionagi._logger import logger
|
|
|
|
|
|
def create_vector_memory_pgvector(
|
|
connection_string: str,
|
|
table_name: str = "embeddings",
|
|
dimension: int = 1536,
|
|
) -> Any:
|
|
"""
|
|
Create pgvector-backed VectorMemory when pgvector is installed.
|
|
Returns None if pgvector/database unavailable.
|
|
"""
|
|
try:
|
|
import pgvector
|
|
from pgvector.psycopg import register_vector
|
|
except ImportError:
|
|
logger.debug("pgvector not installed; use pip install fusionagi[vector]")
|
|
return None
|
|
|
|
try:
|
|
import psycopg
|
|
except ImportError:
|
|
logger.debug("psycopg not installed; use pip install fusionagi[memory]")
|
|
return None
|
|
|
|
return VectorMemoryPgvector(connection_string, table_name, dimension)
|
|
|
|
|
|
class VectorMemoryPgvector:
|
|
"""VectorMemory implementation using pgvector."""
|
|
|
|
def __init__(
|
|
self,
|
|
connection_string: str,
|
|
table_name: str = "embeddings",
|
|
dimension: int = 1536,
|
|
) -> None:
|
|
import pgvector
|
|
from pgvector.psycopg import register_vector
|
|
|
|
self._conn_str = connection_string
|
|
self._table = table_name
|
|
self._dim = dimension
|
|
|
|
with psycopg.connect(connection_string) as conn:
|
|
register_vector(conn)
|
|
with conn.cursor() as cur:
|
|
cur.execute("CREATE EXTENSION IF NOT EXISTS vector")
|
|
cur.execute(
|
|
f"""
|
|
CREATE TABLE IF NOT EXISTS {table_name} (
|
|
id TEXT PRIMARY KEY,
|
|
embedding vector({dimension}),
|
|
metadata JSONB,
|
|
created_at TIMESTAMPTZ DEFAULT NOW()
|
|
)
|
|
"""
|
|
)
|
|
conn.commit()
|
|
|
|
def add(self, id: str, embedding: list[float], metadata: dict[str, Any] | None = None) -> None:
|
|
import json
|
|
import psycopg
|
|
from pgvector.psycopg import register_vector
|
|
|
|
with psycopg.connect(self._conn_str) as conn:
|
|
register_vector(conn)
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
f"""
|
|
INSERT INTO {self._table} (id, embedding, metadata)
|
|
VALUES (%s, %s, %s)
|
|
ON CONFLICT (id) DO UPDATE SET embedding = EXCLUDED.embedding, metadata = EXCLUDED.metadata
|
|
""",
|
|
(id, embedding, json.dumps(metadata or {})),
|
|
)
|
|
conn.commit()
|
|
|
|
def search(self, query_embedding: list[float], top_k: int = 10) -> list[dict[str, Any]]:
|
|
import json
|
|
import psycopg
|
|
from pgvector.psycopg import register_vector
|
|
|
|
with psycopg.connect(self._conn_str) as conn:
|
|
register_vector(conn)
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
f"""
|
|
SELECT id, metadata
|
|
FROM {self._table}
|
|
ORDER BY embedding <-> %s
|
|
LIMIT %s
|
|
""",
|
|
(query_embedding, top_k),
|
|
)
|
|
rows = cur.fetchall()
|
|
return [{"id": r[0], "metadata": json.loads(r[1]) if r[1] else {}} for r in rows]
|