Some checks failed
- Integrate GPU scoring inline into reasoning/multi_path.py (auto-uses GPU when available) - Integrate GPU deduplication into multi_agent/consensus_engine.py - Add semantic_search() method to memory/semantic_graph.py with GPU acceleration - Integrate GPU training into self_improvement/training.py AutoTrainer - Fix all 758 ruff lint issues (whitespace, import sorting, unused imports, ambiguous vars, undefined names) - Fix all 40 mypy type errors across the codebase (no-any-return, union-attr, arg-type, etc.) - Fix deprecated ruff config keys (select/ignore -> [tool.ruff.lint]) - Add .dockerignore to exclude .venv/, tests/, docs/ from Docker builds - Add type hints and docstrings to verification/outcome.py - Fix E402 import ordering in witness_agent.py - Fix F821 undefined names in vector_pgvector.py and native.py - Fix E741 ambiguous variable names in reflective.py and recommender.py All 276 tests pass. 0 ruff errors. 0 mypy errors. Co-Authored-By: Nakamoto, S <defi@defi-oracle.io>
100 lines
3.2 KiB
Python
100 lines
3.2 KiB
Python
"""Telemetry tracer: per-head latency, costs, event bus subscription."""
|
|
|
|
import time
|
|
from collections import deque
|
|
from dataclasses import dataclass, field
|
|
from typing import Any
|
|
|
|
from fusionagi._logger import logger
|
|
|
|
_tracer: "TelemetryTracer | None" = None
|
|
|
|
|
|
@dataclass
|
|
class TraceEntry:
|
|
"""Single trace entry."""
|
|
|
|
event_type: str
|
|
task_id: str | None
|
|
head_id: str | None
|
|
latency_ms: float | None
|
|
payload: dict[str, Any]
|
|
timestamp: float = field(default_factory=time.monotonic)
|
|
|
|
|
|
class TelemetryTracer:
|
|
"""In-memory ring buffer for traces; subscribes to event bus."""
|
|
|
|
def __init__(self, max_entries: int = 10000) -> None:
|
|
self._entries: deque[TraceEntry] = deque(maxlen=max_entries)
|
|
self._subscription: Any = None
|
|
self._starts: dict[str, float] = {}
|
|
|
|
def subscribe(self, event_bus: Any) -> None:
|
|
"""Subscribe to event bus for message_received, dvadasa_complete."""
|
|
|
|
def on_message(_event_type: str, payload: dict[str, Any]) -> None:
|
|
task_id = payload.get("task_id", "")
|
|
recipient = payload.get("recipient", "")
|
|
self._starts[f"{task_id}:{recipient}"] = time.monotonic()
|
|
|
|
def on_dvadasa(_event_type: str, payload: dict[str, Any]) -> None:
|
|
task_id = payload.get("task_id", "")
|
|
head_count = payload.get("head_count", 0)
|
|
self._entries.append(
|
|
TraceEntry(
|
|
event_type="dvadasa_complete",
|
|
task_id=task_id,
|
|
head_id=None,
|
|
latency_ms=None,
|
|
payload={"head_count": head_count},
|
|
)
|
|
)
|
|
|
|
try:
|
|
event_bus.subscribe("message_received", on_message)
|
|
event_bus.subscribe("dvadasa_complete", on_dvadasa)
|
|
except Exception as e:
|
|
logger.warning("Telemetry subscribe failed", extra={"error": str(e)})
|
|
|
|
def record_head_output(self, task_id: str, head_id: str, start: float | None = None) -> None:
|
|
"""Record head completion with optional latency."""
|
|
key = f"{task_id}:{head_id}"
|
|
end = time.monotonic()
|
|
latency_ms = (end - self._starts.pop(key, end)) * 1000 if start is None else (end - start) * 1000
|
|
self._entries.append(
|
|
TraceEntry(
|
|
event_type="head_output",
|
|
task_id=task_id,
|
|
head_id=head_id,
|
|
latency_ms=latency_ms,
|
|
payload={},
|
|
)
|
|
)
|
|
|
|
def get_traces(self, task_id: str | None = None, limit: int = 100) -> list[dict[str, Any]]:
|
|
"""Return traces, optionally filtered by task_id."""
|
|
out = []
|
|
for e in reversed(self._entries):
|
|
if task_id and e.task_id != task_id:
|
|
continue
|
|
out.append({
|
|
"event_type": e.event_type,
|
|
"task_id": e.task_id,
|
|
"head_id": e.head_id,
|
|
"latency_ms": e.latency_ms,
|
|
"payload": e.payload,
|
|
})
|
|
if len(out) >= limit:
|
|
break
|
|
return out
|
|
|
|
|
|
def get_tracer() -> TelemetryTracer | None:
|
|
return _tracer
|
|
|
|
|
|
def set_tracer(t: TelemetryTracer | None) -> None:
|
|
global _tracer
|
|
_tracer = t
|