Files
FusionAGI/fusionagi/memory/vector_pgvector.py
defiQUG c052b07662
Some checks failed
Tests / test (3.10) (push) Has been cancelled
Tests / test (3.11) (push) Has been cancelled
Tests / test (3.12) (push) Has been cancelled
Tests / lint (push) Has been cancelled
Tests / docker (push) Has been cancelled
Initial commit: add .gitignore and README
2026-02-09 21:51:42 -08:00

102 lines
3.3 KiB
Python

"""pgvector-backed VectorMemory adapter. Requires: pip install fusionagi[vector]."""
from typing import Any
from fusionagi._logger import logger
def create_vector_memory_pgvector(
connection_string: str,
table_name: str = "embeddings",
dimension: int = 1536,
) -> Any:
"""
Create pgvector-backed VectorMemory when pgvector is installed.
Returns None if pgvector/database unavailable.
"""
try:
import pgvector
from pgvector.psycopg import register_vector
except ImportError:
logger.debug("pgvector not installed; use pip install fusionagi[vector]")
return None
try:
import psycopg
except ImportError:
logger.debug("psycopg not installed; use pip install fusionagi[memory]")
return None
return VectorMemoryPgvector(connection_string, table_name, dimension)
class VectorMemoryPgvector:
"""VectorMemory implementation using pgvector."""
def __init__(
self,
connection_string: str,
table_name: str = "embeddings",
dimension: int = 1536,
) -> None:
import pgvector
from pgvector.psycopg import register_vector
self._conn_str = connection_string
self._table = table_name
self._dim = dimension
with psycopg.connect(connection_string) as conn:
register_vector(conn)
with conn.cursor() as cur:
cur.execute("CREATE EXTENSION IF NOT EXISTS vector")
cur.execute(
f"""
CREATE TABLE IF NOT EXISTS {table_name} (
id TEXT PRIMARY KEY,
embedding vector({dimension}),
metadata JSONB,
created_at TIMESTAMPTZ DEFAULT NOW()
)
"""
)
conn.commit()
def add(self, id: str, embedding: list[float], metadata: dict[str, Any] | None = None) -> None:
import json
import psycopg
from pgvector.psycopg import register_vector
with psycopg.connect(self._conn_str) as conn:
register_vector(conn)
with conn.cursor() as cur:
cur.execute(
f"""
INSERT INTO {self._table} (id, embedding, metadata)
VALUES (%s, %s, %s)
ON CONFLICT (id) DO UPDATE SET embedding = EXCLUDED.embedding, metadata = EXCLUDED.metadata
""",
(id, embedding, json.dumps(metadata or {})),
)
conn.commit()
def search(self, query_embedding: list[float], top_k: int = 10) -> list[dict[str, Any]]:
import json
import psycopg
from pgvector.psycopg import register_vector
with psycopg.connect(self._conn_str) as conn:
register_vector(conn)
with conn.cursor() as cur:
cur.execute(
f"""
SELECT id, metadata
FROM {self._table}
ORDER BY embedding <-> %s
LIMIT %s
""",
(query_embedding, top_k),
)
rows = cur.fetchall()
return [{"id": r[0], "metadata": json.loads(r[1]) if r[1] else {}} for r in rows]