"""Audit log export endpoint. Exports governance audit trail as CSV or JSON for compliance and review. """ from __future__ import annotations import csv import io import json import time from typing import Any from fastapi import APIRouter, Query from fastapi.responses import StreamingResponse from fusionagi._logger import logger from fusionagi.api.audit_store import get_audit_events from fusionagi.api.dependencies import get_telemetry_tracer router = APIRouter() def _get_audit_records( task_id: str | None = None, limit: int = 1000, since: float | None = None, ) -> list[dict[str, Any]]: """Collect audit records from persistent store, falling back to telemetry tracer.""" # Try persistent audit store first try: records = get_audit_events(limit=limit, since=since) if records: return records except Exception: pass # Fallback to telemetry tracer tracer = get_telemetry_tracer() if not tracer: return [] traces = tracer.get_traces(task_id=task_id, limit=limit) if since: traces = [t for t in traces if t.get("timestamp", 0) >= since] return traces @router.get("/audit/export/json") def export_audit_json( task_id: str | None = None, limit: int = Query(default=1000, le=10000), since: float | None = None, ) -> dict[str, Any]: """Export audit log as JSON. Args: task_id: Filter by task ID. limit: Maximum records (default 1000, max 10000). since: Unix timestamp filter (records after this time). Returns: Dict with records array and metadata. """ records = _get_audit_records(task_id=task_id, limit=limit, since=since) logger.info("Audit log exported (JSON)", extra={"count": len(records)}) return { "format": "json", "count": len(records), "exported_at": time.time(), "records": records, } @router.get("/audit/export/csv") def export_audit_csv( task_id: str | None = None, limit: int = Query(default=1000, le=10000), since: float | None = None, ) -> StreamingResponse: """Export audit log as CSV download. Args: task_id: Filter by task ID. limit: Maximum records (default 1000, max 10000). since: Unix timestamp filter (records after this time). Returns: CSV file as streaming download. """ records = _get_audit_records(task_id=task_id, limit=limit, since=since) # Collect all unique keys across records all_keys: set[str] = set() for r in records: all_keys.update(r.keys()) fieldnames = sorted(all_keys) output = io.StringIO() writer = csv.DictWriter(output, fieldnames=fieldnames, extrasaction="ignore") writer.writeheader() for r in records: # Flatten nested dicts to JSON strings flat = {} for k, v in r.items(): flat[k] = json.dumps(v) if isinstance(v, (dict, list)) else v writer.writerow(flat) output.seek(0) logger.info("Audit log exported (CSV)", extra={"count": len(records)}) return StreamingResponse( iter([output.getvalue()]), media_type="text/csv", headers={ "Content-Disposition": f"attachment; filename=fusionagi_audit_{int(time.time())}.csv", }, )