"""Persistent audit event storage with SQLite backend.""" import json import logging import os import sqlite3 import threading import time from pathlib import Path from typing import Any logger = logging.getLogger(__name__) _DB_PATH = Path("data/audit.db") _local = threading.local() _lock = threading.Lock() _initialized_dbs: set[str] = set() def _get_conn() -> sqlite3.Connection: """Get or create a thread-local SQLite connection for audit storage.""" db_path_str = os.environ.get("FUSIONAGI_AUDIT_DB", str(_DB_PATH)) conn = getattr(_local, "conn", None) conn_path = getattr(_local, "conn_path", None) if conn is not None and conn_path == db_path_str: return conn db_path = Path(db_path_str) db_path.parent.mkdir(parents=True, exist_ok=True) conn = sqlite3.connect(str(db_path), check_same_thread=False) conn.execute("PRAGMA journal_mode=WAL") with _lock: if db_path_str not in _initialized_dbs: conn.execute(""" CREATE TABLE IF NOT EXISTS audit_events ( id INTEGER PRIMARY KEY AUTOINCREMENT, timestamp REAL NOT NULL, action TEXT NOT NULL, actor TEXT DEFAULT '', resource_type TEXT DEFAULT '', resource_id TEXT DEFAULT '', details TEXT DEFAULT '{}', ip_address TEXT DEFAULT '', tenant_id TEXT DEFAULT '' ) """) conn.execute("CREATE INDEX IF NOT EXISTS idx_audit_ts ON audit_events(timestamp)") conn.execute("CREATE INDEX IF NOT EXISTS idx_audit_action ON audit_events(action)") conn.commit() _initialized_dbs.add(db_path_str) _local.conn = conn _local.conn_path = db_path_str return conn def record_audit_event( action: str, actor: str = "", resource_type: str = "", resource_id: str = "", details: dict[str, Any] | None = None, ip_address: str = "", tenant_id: str = "", ) -> int: """Record an audit event to the persistent store. Args: action: The action performed (e.g. 'session.create', 'prompt.submit'). actor: Who performed the action. resource_type: Type of resource affected. resource_id: ID of the resource affected. details: Additional JSON-serializable details. ip_address: Client IP address. tenant_id: Tenant identifier. Returns: The event ID. """ conn = _get_conn() cursor = conn.execute( """INSERT INTO audit_events (timestamp, action, actor, resource_type, resource_id, details, ip_address, tenant_id) VALUES (?, ?, ?, ?, ?, ?, ?, ?)""", (time.time(), action, actor, resource_type, resource_id, json.dumps(details or {}), ip_address, tenant_id), ) conn.commit() return cursor.lastrowid or 0 def get_audit_events( limit: int = 100, since: float | None = None, action: str | None = None, tenant_id: str | None = None, ) -> list[dict[str, Any]]: """Retrieve audit events with optional filters. Args: limit: Maximum number of events to return. since: Only return events after this Unix timestamp. action: Filter by action type. tenant_id: Filter by tenant. Returns: List of audit event dicts. """ conn = _get_conn() query = "SELECT id, timestamp, action, actor, resource_type, resource_id, details, ip_address, tenant_id FROM audit_events WHERE 1=1" params: list[Any] = [] if since is not None: query += " AND timestamp >= ?" params.append(since) if action: query += " AND action = ?" params.append(action) if tenant_id: query += " AND tenant_id = ?" params.append(tenant_id) query += " ORDER BY timestamp DESC LIMIT ?" params.append(min(limit, 10000)) rows = conn.execute(query, params).fetchall() return [ { "id": r[0], "timestamp": r[1], "action": r[2], "actor": r[3], "resource_type": r[4], "resource_id": r[5], "details": json.loads(r[6]) if r[6] else {}, "ip_address": r[7], "tenant_id": r[8], } for r in rows ] def get_audit_count() -> int: """Return total number of audit events.""" conn = _get_conn() row = conn.execute("SELECT COUNT(*) FROM audit_events").fetchone() return row[0] if row else 0