/** * Typed, signed, append-only Event Bus (arch §5.5 Event Bus + §7). * * Architecture contract * --------------------- * 1. Every event is a normalised category from arch §7.2 — `EventType`. * 2. Every event is persisted to the `events` append-only table. * 3. Every event carries * payload_hash = sha256(JSON.stringify(payload)) * prev_hash = signature of the previous event for the same plan * signature = hmac_sha256(secret, plan_id|type|payload_hash|prev_hash) * which gives a tamper-evident per-plan hash chain (arch §14 audit). * 4. Callers can subscribe to live events via `subscribe(planId, cb)` — * backed by a process-local EventEmitter that the SSE route consumes. * * When the orchestrator scales to >1 replicas, the in-process emitter * must be replaced by a broker (NATS / Kafka). The persistence layer * and signature chain remain unchanged. */ import { createHash, createHmac } from "crypto"; import { EventEmitter } from "events"; import { query } from "../db/postgres"; import { logger } from "../logging/logger"; /** * Normalised event types — arch §7.2. Keep this list as the single * source of truth so subscribers can exhaustively match on it. */ export const EVENT_TYPES = [ "transaction.created", "participants.authorized", "preconditions.satisfied", "instrument.ready", "payment.ready", "transaction.prepared", "instrument.dispatched", "payment.dispatched", "instrument.acknowledged", "payment.accepted", "payment.settled", "transaction.validated", "transaction.committed", "transaction.aborted", "transaction.unwind_initiated", ] as const; export type EventType = (typeof EVENT_TYPES)[number]; export interface EventRecord { id: string; plan_id: string; type: EventType; actor: string | null; payload: Record; payload_hash: string; prev_hash: string | null; signature: string; created_at: string; } export interface PublishInput { planId: string; type: EventType; actor?: string; payload?: Record; } const emitter = new EventEmitter(); emitter.setMaxListeners(0); function getSigningSecret(): string { return ( process.env.EVENT_BUS_HMAC_SECRET ?? process.env.SESSION_SECRET ?? "dev-event-bus-secret-change-in-production" ); } function sha256(input: string): string { return createHash("sha256").update(input).digest("hex"); } function sign( planId: string, type: string, payloadHash: string, prevHash: string | null, ): string { const h = createHmac("sha256", getSigningSecret()); h.update(`${planId}|${type}|${payloadHash}|${prevHash ?? ""}`); return h.digest("hex"); } /** * Publish a typed, signed, hash-chained event for a plan. Returns the * persisted record (including id + signature) so callers can reference * it from transition `source_event_id`. */ export async function publish(input: PublishInput): Promise { const payload = input.payload ?? {}; const payloadHash = sha256(JSON.stringify(payload)); const prev = await query<{ signature: string }>( `SELECT signature FROM events WHERE plan_id = $1 ORDER BY created_at DESC, id DESC LIMIT 1`, [input.planId], ); const prevHash = prev.length > 0 ? prev[0].signature : null; const signature = sign(input.planId, input.type, payloadHash, prevHash); const rows = await query( `INSERT INTO events (plan_id, type, actor, payload, payload_hash, prev_hash, signature) VALUES ($1, $2, $3, $4::jsonb, $5, $6, $7) RETURNING id, plan_id, type, actor, payload, payload_hash, prev_hash, signature, created_at`, [ input.planId, input.type, input.actor ?? null, JSON.stringify(payload), payloadHash, prevHash, signature, ], ); const record = rows[0]; logger.info( { planId: record.plan_id, type: record.type, eventId: record.id }, "[EventBus] published", ); emitter.emit(`plan:${record.plan_id}`, record); emitter.emit("plan:*", record); return record; } /** * Read the full event trail for a plan in chronological order. */ export async function getEventsForPlan(planId: string): Promise { return query( `SELECT id, plan_id, type, actor, payload, payload_hash, prev_hash, signature, created_at FROM events WHERE plan_id = $1 ORDER BY created_at ASC, id ASC`, [planId], ); } /** * Verify the full hash chain for a plan's events. Returns `{ ok: true }` * when every signature matches and `prev_hash` forms a contiguous chain; * otherwise returns the first index that fails with a reason. */ export async function verifyChain(planId: string): Promise< { ok: true } | { ok: false; brokenAt: number; reason: string } > { const events = await getEventsForPlan(planId); let prevSig: string | null = null; for (let i = 0; i < events.length; i++) { const e = events[i]; if (e.prev_hash !== prevSig) { return { ok: false, brokenAt: i, reason: "prev_hash mismatch" }; } const expectedPayloadHash = sha256(JSON.stringify(e.payload)); if (expectedPayloadHash !== e.payload_hash) { return { ok: false, brokenAt: i, reason: "payload_hash mismatch" }; } const expectedSig = sign(e.plan_id, e.type, e.payload_hash, e.prev_hash); if (expectedSig !== e.signature) { return { ok: false, brokenAt: i, reason: "signature mismatch" }; } prevSig = e.signature; } return { ok: true }; } /** * Subscribe to live events for a single plan. Returns an unsubscribe * function. Used by the SSE route. */ export function subscribe( planId: string, callback: (record: EventRecord) => void, ): () => void { const channel = `plan:${planId}`; emitter.on(channel, callback); return () => emitter.off(channel, callback); } /** test-only emitter access, never import in prod code */ export const __emitterForTests = emitter;