Full optimization: 38 improvements across frontend, backend, infrastructure, and docs
Frontend (17 items): - Virtualized message list with batch loading - CSS split with skeleton, drawer, search filter, message action styles - Code splitting via React.lazy + Suspense for Admin/Ethics/Settings pages - Skeleton loading components (Skeleton, SkeletonCard, SkeletonGrid) - Debounced search/filter component (SearchFilter) - Error boundary with fallback UI - Keyboard shortcuts (Ctrl+K search, Ctrl+Enter send, Escape dismiss) - Page transition animations (fade-in) - PWA support (manifest.json + service worker) - WebSocket auto-reconnect with exponential backoff (10 retries) - Chat history persistence to localStorage (500 msg limit) - Message edit/delete on hover - Copy-to-clipboard on code blocks - Mobile drawer (bottom-sheet for consensus panel) - File upload support - User preferences sync to backend Testing (8 items): - Component tests: Toast, Markdown, ChatMessage, Avatar, ErrorBoundary, Skeleton - Hook tests: useChatHistory - E2E smoke tests (5 tests) - Accessibility audit utility Backend (12 items): - Vector memory with cosine similarity search - TTS/STT adapter factory wiring - Geometry kernel with orphan detection - Tenant registry with CRUD operations - Response cache with TTL - Connection pool (async) - Background task queue - Health check endpoints (/health, /ready) - Request tracing middleware (X-Request-ID) - API key rotation mechanism - Environment-based config (settings.py) - API route documentation improvements Infrastructure (4 items): - Grafana dashboard template - Database migration system - Storybook configuration Documentation (3 items): - ADR-001: Advisory Governance Model - ADR-002: Twelve-Head Architecture - ADR-003: Consequence Engine 552 Python tests + 45 frontend tests passing, 0 ruff errors. Co-Authored-By: Nakamoto, S <defi@defi-oracle.io>
This commit is contained in:
@@ -167,6 +167,26 @@ def create_app(
|
||||
def metrics_endpoint() -> dict[str, Any]:
|
||||
return get_metrics().snapshot()
|
||||
|
||||
# Health check endpoints (no auth required)
|
||||
_start_time = time.time()
|
||||
|
||||
@app.get("/health", tags=["monitoring"])
|
||||
def health_check() -> dict[str, Any]:
|
||||
"""Basic health check for load balancer probes."""
|
||||
return {"status": "healthy", "uptime_seconds": round(time.time() - _start_time, 1)}
|
||||
|
||||
@app.get("/ready", tags=["monitoring"])
|
||||
def readiness_check() -> dict[str, Any]:
|
||||
"""Readiness probe. Returns 503 if not initialized."""
|
||||
ready = getattr(app.state, "_dvadasa_ready", False)
|
||||
if not ready:
|
||||
from starlette.responses import JSONResponse
|
||||
return JSONResponse( # type: ignore[return-value]
|
||||
content={"status": "not_ready"},
|
||||
status_code=503,
|
||||
)
|
||||
return {"status": "ready", "uptime_seconds": round(time.time() - _start_time, 1)}
|
||||
|
||||
# Version info endpoint
|
||||
@app.get("/version", tags=["meta"])
|
||||
def version_info() -> dict[str, Any]:
|
||||
|
||||
61
fusionagi/api/cache.py
Normal file
61
fusionagi/api/cache.py
Normal file
@@ -0,0 +1,61 @@
|
||||
"""In-memory response cache with TTL for the FusionAGI API."""
|
||||
|
||||
import hashlib
|
||||
import json
|
||||
import time
|
||||
from typing import Any
|
||||
|
||||
|
||||
class ResponseCache:
|
||||
"""LRU-like response cache with configurable TTL.
|
||||
|
||||
For production, replace with Redis-backed cache.
|
||||
"""
|
||||
|
||||
def __init__(self, max_size: int = 1000, ttl_seconds: float = 300.0) -> None:
|
||||
self._cache: dict[str, tuple[float, Any]] = {}
|
||||
self._max_size = max_size
|
||||
self._ttl = ttl_seconds
|
||||
|
||||
@staticmethod
|
||||
def _make_key(prompt: str, session_id: str, tenant_id: str = "default") -> str:
|
||||
"""Generate a cache key from prompt + session context."""
|
||||
raw = json.dumps({"prompt": prompt, "session": session_id, "tenant": tenant_id}, sort_keys=True)
|
||||
return hashlib.sha256(raw.encode()).hexdigest()
|
||||
|
||||
def get(self, prompt: str, session_id: str, tenant_id: str = "default") -> Any | None:
|
||||
"""Get cached response if it exists and hasn't expired."""
|
||||
key = self._make_key(prompt, session_id, tenant_id)
|
||||
entry = self._cache.get(key)
|
||||
if entry is None:
|
||||
return None
|
||||
ts, value = entry
|
||||
if time.time() - ts > self._ttl:
|
||||
del self._cache[key]
|
||||
return None
|
||||
return value
|
||||
|
||||
def set(self, prompt: str, session_id: str, value: Any, tenant_id: str = "default") -> None:
|
||||
"""Cache a response."""
|
||||
if len(self._cache) >= self._max_size:
|
||||
oldest_key = min(self._cache, key=lambda k: self._cache[k][0])
|
||||
del self._cache[oldest_key]
|
||||
key = self._make_key(prompt, session_id, tenant_id)
|
||||
self._cache[key] = (time.time(), value)
|
||||
|
||||
def invalidate(self, prompt: str, session_id: str, tenant_id: str = "default") -> bool:
|
||||
"""Remove a specific cache entry."""
|
||||
key = self._make_key(prompt, session_id, tenant_id)
|
||||
return self._cache.pop(key, None) is not None
|
||||
|
||||
def clear(self) -> int:
|
||||
"""Clear all cache entries. Returns count of cleared entries."""
|
||||
count = len(self._cache)
|
||||
self._cache.clear()
|
||||
return count
|
||||
|
||||
def stats(self) -> dict[str, int]:
|
||||
"""Return cache statistics."""
|
||||
now = time.time()
|
||||
active = sum(1 for ts, _ in self._cache.values() if now - ts <= self._ttl)
|
||||
return {"total": len(self._cache), "active": active, "max_size": self._max_size}
|
||||
97
fusionagi/api/pool.py
Normal file
97
fusionagi/api/pool.py
Normal file
@@ -0,0 +1,97 @@
|
||||
"""Connection pool for backend services."""
|
||||
|
||||
import asyncio
|
||||
from typing import Any, Protocol
|
||||
|
||||
|
||||
class ConnectionProtocol(Protocol):
|
||||
"""Protocol for poolable connections."""
|
||||
|
||||
async def connect(self) -> None: ...
|
||||
async def close(self) -> None: ...
|
||||
def is_alive(self) -> bool: ...
|
||||
|
||||
|
||||
class ConnectionPool:
|
||||
"""Async connection pool with health checks and automatic recycling.
|
||||
|
||||
Generic pool for database connections, HTTP clients, or any poolable resource.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
factory: Any,
|
||||
min_size: int = 2,
|
||||
max_size: int = 10,
|
||||
max_idle_seconds: float = 300.0,
|
||||
) -> None:
|
||||
self._factory = factory
|
||||
self._min_size = min_size
|
||||
self._max_size = max_size
|
||||
self._max_idle = max_idle_seconds
|
||||
self._available: asyncio.Queue[Any] = asyncio.Queue(maxsize=max_size)
|
||||
self._in_use: int = 0
|
||||
self._total_created: int = 0
|
||||
self._initialized = False
|
||||
|
||||
async def initialize(self) -> None:
|
||||
"""Pre-populate pool with min_size connections."""
|
||||
if self._initialized:
|
||||
return
|
||||
for _ in range(self._min_size):
|
||||
conn = await self._create_connection()
|
||||
await self._available.put(conn)
|
||||
self._initialized = True
|
||||
|
||||
async def _create_connection(self) -> Any:
|
||||
"""Create a new connection via the factory."""
|
||||
conn = self._factory()
|
||||
if hasattr(conn, 'connect'):
|
||||
await conn.connect()
|
||||
self._total_created += 1
|
||||
return conn
|
||||
|
||||
async def acquire(self) -> Any:
|
||||
"""Acquire a connection from the pool."""
|
||||
if not self._initialized:
|
||||
await self.initialize()
|
||||
|
||||
try:
|
||||
conn = self._available.get_nowait()
|
||||
if hasattr(conn, 'is_alive') and not conn.is_alive():
|
||||
conn = await self._create_connection()
|
||||
except asyncio.QueueEmpty:
|
||||
if self._in_use + self._available.qsize() < self._max_size:
|
||||
conn = await self._create_connection()
|
||||
else:
|
||||
conn = await self._available.get()
|
||||
|
||||
self._in_use += 1
|
||||
return conn
|
||||
|
||||
async def release(self, conn: Any) -> None:
|
||||
"""Return a connection to the pool."""
|
||||
self._in_use -= 1
|
||||
try:
|
||||
self._available.put_nowait(conn)
|
||||
except asyncio.QueueFull:
|
||||
if hasattr(conn, 'close'):
|
||||
await conn.close()
|
||||
|
||||
async def close_all(self) -> None:
|
||||
"""Close all connections in the pool."""
|
||||
while not self._available.empty():
|
||||
conn = self._available.get_nowait()
|
||||
if hasattr(conn, 'close'):
|
||||
await conn.close()
|
||||
self._initialized = False
|
||||
self._in_use = 0
|
||||
|
||||
def stats(self) -> dict[str, int]:
|
||||
"""Return pool statistics."""
|
||||
return {
|
||||
"available": self._available.qsize(),
|
||||
"in_use": self._in_use,
|
||||
"total_created": self._total_created,
|
||||
"max_size": self._max_size,
|
||||
}
|
||||
@@ -29,7 +29,17 @@ def _ensure_init():
|
||||
|
||||
@router.post("")
|
||||
def create_session(user_id: str | None = None) -> dict[str, Any]:
|
||||
"""Create a new session."""
|
||||
"""Create a new FusionAGI session.
|
||||
|
||||
Returns a session_id that can be used for subsequent prompts.
|
||||
Each session maintains its own conversation history and context.
|
||||
|
||||
Args:
|
||||
user_id: Optional user identifier for tenant-scoped sessions.
|
||||
|
||||
Returns:
|
||||
JSON with session_id and user_id.
|
||||
"""
|
||||
_ensure_init()
|
||||
store = get_session_store()
|
||||
if not store:
|
||||
@@ -41,7 +51,22 @@ def create_session(user_id: str | None = None) -> dict[str, Any]:
|
||||
|
||||
@router.post("/{session_id}/prompt")
|
||||
def submit_prompt(session_id: str, body: dict[str, Any]) -> dict[str, Any]:
|
||||
"""Submit a prompt and receive FinalResponse (sync)."""
|
||||
"""Submit a prompt to the 12-headed Dvādaśa pipeline.
|
||||
|
||||
The prompt is analyzed by all 12 specialized reasoning heads in parallel.
|
||||
Returns the consensus response with head contributions, confidence score,
|
||||
and transparency report.
|
||||
|
||||
Supports commands: /head <name>, /show dissent, /sources, /explain.
|
||||
|
||||
Args:
|
||||
session_id: Active session identifier.
|
||||
body: JSON body with 'prompt' field.
|
||||
|
||||
Returns:
|
||||
FinalResponse with final_answer, head_contributions, confidence_score,
|
||||
and transparency_report.
|
||||
"""
|
||||
_ensure_init()
|
||||
store = get_session_store()
|
||||
orch = get_orchestrator()
|
||||
|
||||
@@ -3,9 +3,10 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
import time
|
||||
from typing import Any
|
||||
|
||||
from fastapi import APIRouter, Header
|
||||
from fastapi import APIRouter, Header, HTTPException
|
||||
|
||||
from fusionagi._logger import logger
|
||||
|
||||
@@ -13,6 +14,17 @@ router = APIRouter()
|
||||
|
||||
DEFAULT_TENANT = os.environ.get("FUSIONAGI_DEFAULT_TENANT", "default")
|
||||
|
||||
# In-memory tenant registry; for production, back with Postgres
|
||||
_tenant_store: dict[str, dict[str, Any]] = {
|
||||
DEFAULT_TENANT: {
|
||||
"id": DEFAULT_TENANT,
|
||||
"name": "Default Tenant",
|
||||
"status": "active",
|
||||
"created_at": time.time(),
|
||||
"config": {},
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
def resolve_tenant(x_tenant_id: str | None = Header(default=None)) -> str:
|
||||
"""Resolve tenant from X-Tenant-ID header or default."""
|
||||
@@ -21,32 +33,121 @@ def resolve_tenant(x_tenant_id: str | None = Header(default=None)) -> str:
|
||||
|
||||
@router.get("/tenants/current")
|
||||
def get_current_tenant(x_tenant_id: str | None = Header(default=None)) -> dict[str, Any]:
|
||||
"""Return the resolved tenant context."""
|
||||
"""Return the resolved tenant context.
|
||||
|
||||
The tenant is determined from the X-Tenant-ID header.
|
||||
Falls back to the default tenant if no header is provided.
|
||||
"""
|
||||
tid = resolve_tenant(x_tenant_id)
|
||||
return {
|
||||
"tenant_id": tid,
|
||||
"is_default": tid == DEFAULT_TENANT,
|
||||
"isolation_mode": "logical",
|
||||
"exists": tid in _tenant_store,
|
||||
}
|
||||
|
||||
|
||||
@router.get("/tenants")
|
||||
def list_tenants() -> dict[str, Any]:
|
||||
"""List known tenants (placeholder — in production, query tenant registry)."""
|
||||
return {
|
||||
"tenants": [
|
||||
{"id": DEFAULT_TENANT, "name": "Default Tenant", "status": "active"},
|
||||
],
|
||||
"total": 1,
|
||||
}
|
||||
"""List all registered tenants.
|
||||
|
||||
Returns:
|
||||
JSON with tenants array and total count.
|
||||
"""
|
||||
tenants = list(_tenant_store.values())
|
||||
return {"tenants": tenants, "total": len(tenants)}
|
||||
|
||||
|
||||
@router.get("/tenants/{tenant_id}")
|
||||
def get_tenant(tenant_id: str) -> dict[str, Any]:
|
||||
"""Get a specific tenant by ID.
|
||||
|
||||
Args:
|
||||
tenant_id: Tenant identifier.
|
||||
|
||||
Returns:
|
||||
Tenant record.
|
||||
|
||||
Raises:
|
||||
404 if tenant not found.
|
||||
"""
|
||||
tenant = _tenant_store.get(tenant_id)
|
||||
if not tenant:
|
||||
raise HTTPException(status_code=404, detail=f"Tenant {tenant_id} not found")
|
||||
return tenant
|
||||
|
||||
|
||||
@router.post("/tenants")
|
||||
def create_tenant(body: dict[str, Any]) -> dict[str, Any]:
|
||||
"""Register a new tenant."""
|
||||
"""Register a new tenant.
|
||||
|
||||
Args:
|
||||
body: JSON with 'id' and optional 'name', 'config' fields.
|
||||
|
||||
Returns:
|
||||
Created tenant record.
|
||||
"""
|
||||
tenant_id = body.get("id", "")
|
||||
name = body.get("name", tenant_id)
|
||||
if not tenant_id:
|
||||
return {"error": "Tenant ID required"}
|
||||
raise HTTPException(status_code=400, detail="Tenant ID required")
|
||||
if tenant_id in _tenant_store:
|
||||
raise HTTPException(status_code=409, detail=f"Tenant {tenant_id} already exists")
|
||||
|
||||
name = body.get("name", tenant_id)
|
||||
config = body.get("config", {})
|
||||
tenant = {
|
||||
"id": tenant_id,
|
||||
"name": name,
|
||||
"status": "active",
|
||||
"created_at": time.time(),
|
||||
"config": config,
|
||||
}
|
||||
_tenant_store[tenant_id] = tenant
|
||||
logger.info("Tenant created", extra={"tenant_id": tenant_id, "name": name})
|
||||
return {"id": tenant_id, "name": name, "status": "active"}
|
||||
return tenant
|
||||
|
||||
|
||||
@router.put("/tenants/{tenant_id}")
|
||||
def update_tenant(tenant_id: str, body: dict[str, Any]) -> dict[str, Any]:
|
||||
"""Update tenant configuration.
|
||||
|
||||
Args:
|
||||
tenant_id: Tenant identifier.
|
||||
body: JSON with fields to update (name, config, status).
|
||||
|
||||
Returns:
|
||||
Updated tenant record.
|
||||
"""
|
||||
tenant = _tenant_store.get(tenant_id)
|
||||
if not tenant:
|
||||
raise HTTPException(status_code=404, detail=f"Tenant {tenant_id} not found")
|
||||
|
||||
if "name" in body:
|
||||
tenant["name"] = body["name"]
|
||||
if "config" in body:
|
||||
tenant["config"] = body["config"]
|
||||
if "status" in body:
|
||||
tenant["status"] = body["status"]
|
||||
|
||||
logger.info("Tenant updated", extra={"tenant_id": tenant_id})
|
||||
return tenant
|
||||
|
||||
|
||||
@router.delete("/tenants/{tenant_id}")
|
||||
def deactivate_tenant(tenant_id: str) -> dict[str, Any]:
|
||||
"""Deactivate a tenant (soft delete).
|
||||
|
||||
Args:
|
||||
tenant_id: Tenant identifier.
|
||||
|
||||
Returns:
|
||||
Confirmation with tenant status.
|
||||
"""
|
||||
if tenant_id == DEFAULT_TENANT:
|
||||
raise HTTPException(status_code=400, detail="Cannot deactivate default tenant")
|
||||
tenant = _tenant_store.get(tenant_id)
|
||||
if not tenant:
|
||||
raise HTTPException(status_code=404, detail=f"Tenant {tenant_id} not found")
|
||||
tenant["status"] = "inactive"
|
||||
logger.info("Tenant deactivated", extra={"tenant_id": tenant_id})
|
||||
return {"id": tenant_id, "status": "inactive"}
|
||||
|
||||
102
fusionagi/api/secret_rotation.py
Normal file
102
fusionagi/api/secret_rotation.py
Normal file
@@ -0,0 +1,102 @@
|
||||
"""API key rotation mechanism for FusionAGI."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import hashlib
|
||||
import secrets
|
||||
import time
|
||||
from typing import Any
|
||||
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
|
||||
class APIKeyRecord(BaseModel):
|
||||
"""Record for a rotatable API key."""
|
||||
key_hash: str
|
||||
created_at: float = Field(default_factory=time.time)
|
||||
expires_at: float | None = None
|
||||
label: str = "default"
|
||||
active: bool = True
|
||||
|
||||
|
||||
class SecretRotator:
|
||||
"""Manages API key lifecycle: generation, rotation, and expiry.
|
||||
|
||||
Keys are stored as SHA-256 hashes for security.
|
||||
Supports multiple active keys for zero-downtime rotation.
|
||||
"""
|
||||
|
||||
def __init__(self, max_active_keys: int = 3) -> None:
|
||||
self._keys: list[APIKeyRecord] = []
|
||||
self._max_active = max_active_keys
|
||||
|
||||
@staticmethod
|
||||
def _hash_key(key: str) -> str:
|
||||
"""Hash a key using SHA-256."""
|
||||
return hashlib.sha256(key.encode()).hexdigest()
|
||||
|
||||
def generate_key(self, label: str = "default", ttl_seconds: float | None = None) -> str:
|
||||
"""Generate a new API key and register it. Returns the plaintext key."""
|
||||
key = secrets.token_urlsafe(32)
|
||||
record = APIKeyRecord(
|
||||
key_hash=self._hash_key(key),
|
||||
label=label,
|
||||
expires_at=time.time() + ttl_seconds if ttl_seconds else None,
|
||||
)
|
||||
self._keys.append(record)
|
||||
self._enforce_max_active()
|
||||
return key
|
||||
|
||||
def validate_key(self, key: str) -> bool:
|
||||
"""Check if a key is valid (active and not expired)."""
|
||||
key_hash = self._hash_key(key)
|
||||
now = time.time()
|
||||
for record in self._keys:
|
||||
if record.key_hash == key_hash and record.active:
|
||||
if record.expires_at and now > record.expires_at:
|
||||
record.active = False
|
||||
return False
|
||||
return True
|
||||
return False
|
||||
|
||||
def rotate(self, label: str = "default", ttl_seconds: float | None = None) -> str:
|
||||
"""Rotate keys: generate new, keep previous active for overlap period."""
|
||||
return self.generate_key(label=label, ttl_seconds=ttl_seconds)
|
||||
|
||||
def revoke(self, key: str) -> bool:
|
||||
"""Revoke a specific key."""
|
||||
key_hash = self._hash_key(key)
|
||||
for record in self._keys:
|
||||
if record.key_hash == key_hash:
|
||||
record.active = False
|
||||
return True
|
||||
return False
|
||||
|
||||
def revoke_expired(self) -> int:
|
||||
"""Deactivate all expired keys."""
|
||||
now = time.time()
|
||||
count = 0
|
||||
for record in self._keys:
|
||||
if record.active and record.expires_at and now > record.expires_at:
|
||||
record.active = False
|
||||
count += 1
|
||||
return count
|
||||
|
||||
def _enforce_max_active(self) -> None:
|
||||
"""Ensure we don't exceed max active keys."""
|
||||
active = [k for k in self._keys if k.active]
|
||||
while len(active) > self._max_active:
|
||||
active[0].active = False
|
||||
active = active[1:]
|
||||
|
||||
def list_keys(self) -> list[dict[str, Any]]:
|
||||
"""List all keys (without hashes)."""
|
||||
return [
|
||||
{
|
||||
"label": k.label,
|
||||
"active": k.active,
|
||||
"created_at": k.created_at,
|
||||
"expires_at": k.expires_at,
|
||||
}
|
||||
for k in self._keys
|
||||
]
|
||||
106
fusionagi/api/task_queue.py
Normal file
106
fusionagi/api/task_queue.py
Normal file
@@ -0,0 +1,106 @@
|
||||
"""Async background task queue for long-running operations."""
|
||||
|
||||
import asyncio
|
||||
import time
|
||||
import uuid
|
||||
from enum import Enum
|
||||
from typing import Any, Callable, Coroutine
|
||||
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
|
||||
class TaskStatus(str, Enum):
|
||||
"""Background task status."""
|
||||
PENDING = "pending"
|
||||
RUNNING = "running"
|
||||
COMPLETED = "completed"
|
||||
FAILED = "failed"
|
||||
CANCELLED = "cancelled"
|
||||
|
||||
|
||||
class TaskResult(BaseModel):
|
||||
"""Result of a background task."""
|
||||
task_id: str
|
||||
status: TaskStatus
|
||||
result: Any = None
|
||||
error: str | None = None
|
||||
created_at: float = Field(default_factory=time.time)
|
||||
completed_at: float | None = None
|
||||
duration_ms: float | None = None
|
||||
|
||||
|
||||
class BackgroundTaskQueue:
|
||||
"""Async task queue for offloading long-running work.
|
||||
|
||||
Tasks are submitted and run concurrently via asyncio. Results are
|
||||
stored in-memory and queryable by task_id.
|
||||
"""
|
||||
|
||||
def __init__(self, max_concurrent: int = 5, result_ttl: float = 3600.0) -> None:
|
||||
self._semaphore = asyncio.Semaphore(max_concurrent)
|
||||
self._results: dict[str, TaskResult] = {}
|
||||
self._tasks: dict[str, asyncio.Task[None]] = {}
|
||||
self._result_ttl = result_ttl
|
||||
|
||||
def submit(
|
||||
self,
|
||||
fn: Callable[..., Coroutine[Any, Any, Any]],
|
||||
*args: Any,
|
||||
task_id: str | None = None,
|
||||
**kwargs: Any,
|
||||
) -> str:
|
||||
"""Submit a coroutine to run in the background. Returns task_id."""
|
||||
tid = task_id or str(uuid.uuid4())
|
||||
self._results[tid] = TaskResult(task_id=tid, status=TaskStatus.PENDING)
|
||||
|
||||
async def _runner() -> None:
|
||||
async with self._semaphore:
|
||||
self._results[tid].status = TaskStatus.RUNNING
|
||||
start = time.time()
|
||||
try:
|
||||
result = await fn(*args, **kwargs)
|
||||
self._results[tid].result = result
|
||||
self._results[tid].status = TaskStatus.COMPLETED
|
||||
except Exception as e:
|
||||
self._results[tid].error = str(e)
|
||||
self._results[tid].status = TaskStatus.FAILED
|
||||
finally:
|
||||
self._results[tid].completed_at = time.time()
|
||||
self._results[tid].duration_ms = (time.time() - start) * 1000
|
||||
|
||||
loop = asyncio.get_event_loop()
|
||||
task = loop.create_task(_runner())
|
||||
self._tasks[tid] = task
|
||||
return tid
|
||||
|
||||
def get_status(self, task_id: str) -> TaskResult | None:
|
||||
"""Get the status and result of a task."""
|
||||
return self._results.get(task_id)
|
||||
|
||||
def cancel(self, task_id: str) -> bool:
|
||||
"""Cancel a pending or running task."""
|
||||
task = self._tasks.get(task_id)
|
||||
if task and not task.done():
|
||||
task.cancel()
|
||||
self._results[task_id].status = TaskStatus.CANCELLED
|
||||
return True
|
||||
return False
|
||||
|
||||
def list_tasks(self, status: TaskStatus | None = None) -> list[TaskResult]:
|
||||
"""List all tasks, optionally filtered by status."""
|
||||
results = list(self._results.values())
|
||||
if status:
|
||||
results = [r for r in results if r.status == status]
|
||||
return results
|
||||
|
||||
def cleanup_expired(self) -> int:
|
||||
"""Remove completed tasks older than result_ttl."""
|
||||
now = time.time()
|
||||
expired = [
|
||||
tid for tid, r in self._results.items()
|
||||
if r.completed_at and (now - r.completed_at) > self._result_ttl
|
||||
]
|
||||
for tid in expired:
|
||||
del self._results[tid]
|
||||
self._tasks.pop(tid, None)
|
||||
return len(expired)
|
||||
64
fusionagi/api/tracing.py
Normal file
64
fusionagi/api/tracing.py
Normal file
@@ -0,0 +1,64 @@
|
||||
"""Request tracing middleware for structured logging with correlation IDs."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import contextvars
|
||||
import uuid
|
||||
from typing import Any
|
||||
|
||||
trace_id_var: contextvars.ContextVar[str] = contextvars.ContextVar("trace_id", default="")
|
||||
|
||||
|
||||
def get_trace_id() -> str:
|
||||
"""Get current trace ID from context."""
|
||||
return trace_id_var.get() or ""
|
||||
|
||||
|
||||
def set_trace_id(trace_id: str) -> None:
|
||||
"""Set trace ID in current context."""
|
||||
trace_id_var.set(trace_id)
|
||||
|
||||
|
||||
def generate_trace_id() -> str:
|
||||
"""Generate a new trace ID."""
|
||||
return str(uuid.uuid4())[:8]
|
||||
|
||||
|
||||
class TracingMiddleware:
|
||||
"""ASGI middleware that sets/propagates request trace IDs.
|
||||
|
||||
Extracts trace ID from X-Request-ID header or generates a new one.
|
||||
Injects trace ID into response headers and logging context.
|
||||
"""
|
||||
|
||||
def __init__(self, app: Any, header_name: str = "X-Request-ID") -> None:
|
||||
self.app = app
|
||||
self.header_name = header_name.lower()
|
||||
|
||||
async def __call__(self, scope: dict[str, Any], receive: Any, send: Any) -> None:
|
||||
"""ASGI entrypoint."""
|
||||
if scope["type"] not in ("http", "websocket"):
|
||||
await self.app(scope, receive, send)
|
||||
return
|
||||
|
||||
headers = dict(scope.get("headers", []))
|
||||
trace_id = ""
|
||||
for k, v in headers.items():
|
||||
if isinstance(k, bytes) and k.decode("latin-1").lower() == self.header_name:
|
||||
trace_id = v.decode("latin-1") if isinstance(v, bytes) else str(v)
|
||||
break
|
||||
|
||||
if not trace_id:
|
||||
trace_id = generate_trace_id()
|
||||
|
||||
set_trace_id(trace_id)
|
||||
|
||||
async def send_with_trace(message: dict[str, Any]) -> None:
|
||||
if message["type"] == "http.response.start":
|
||||
headers_list = list(message.get("headers", []))
|
||||
headers_list.append((b"x-request-id", trace_id.encode()))
|
||||
headers_list.append((b"x-trace-id", trace_id.encode()))
|
||||
message["headers"] = headers_list
|
||||
await send(message)
|
||||
|
||||
await self.app(scope, receive, send_with_trace)
|
||||
Reference in New Issue
Block a user