PR D: typed + signed event bus + events table + SSE (arch step 5) (#8)
Some checks failed
CI / Frontend Lint (push) Has been cancelled
CI / Frontend Type Check (push) Has been cancelled
CI / Frontend Build (push) Has been cancelled
CI / Frontend E2E Tests (push) Has been cancelled
CI / Orchestrator Build (push) Has been cancelled
CI / Contracts Compile (push) Has been cancelled
CI / Contracts Test (push) Has been cancelled
Security Scan / Dependency Vulnerability Scan (push) Has been cancelled
Security Scan / OWASP ZAP Scan (push) Has been cancelled
Some checks failed
CI / Frontend Lint (push) Has been cancelled
CI / Frontend Type Check (push) Has been cancelled
CI / Frontend Build (push) Has been cancelled
CI / Frontend E2E Tests (push) Has been cancelled
CI / Orchestrator Build (push) Has been cancelled
CI / Contracts Compile (push) Has been cancelled
CI / Contracts Test (push) Has been cancelled
Security Scan / Dependency Vulnerability Scan (push) Has been cancelled
Security Scan / OWASP ZAP Scan (push) Has been cancelled
This commit was merged in pull request #8.
This commit is contained in:
197
orchestrator/src/services/eventBus.ts
Normal file
197
orchestrator/src/services/eventBus.ts
Normal file
@@ -0,0 +1,197 @@
|
||||
/**
|
||||
* Typed, signed, append-only Event Bus (arch §5.5 Event Bus + §7).
|
||||
*
|
||||
* Architecture contract
|
||||
* ---------------------
|
||||
* 1. Every event is a normalised category from arch §7.2 — `EventType`.
|
||||
* 2. Every event is persisted to the `events` append-only table.
|
||||
* 3. Every event carries
|
||||
* payload_hash = sha256(JSON.stringify(payload))
|
||||
* prev_hash = signature of the previous event for the same plan
|
||||
* signature = hmac_sha256(secret, plan_id|type|payload_hash|prev_hash)
|
||||
* which gives a tamper-evident per-plan hash chain (arch §14 audit).
|
||||
* 4. Callers can subscribe to live events via `subscribe(planId, cb)` —
|
||||
* backed by a process-local EventEmitter that the SSE route consumes.
|
||||
*
|
||||
* When the orchestrator scales to >1 replicas, the in-process emitter
|
||||
* must be replaced by a broker (NATS / Kafka). The persistence layer
|
||||
* and signature chain remain unchanged.
|
||||
*/
|
||||
|
||||
import { createHash, createHmac } from "crypto";
|
||||
import { EventEmitter } from "events";
|
||||
import { query } from "../db/postgres";
|
||||
import { logger } from "../logging/logger";
|
||||
|
||||
/**
|
||||
* Normalised event types — arch §7.2. Keep this list as the single
|
||||
* source of truth so subscribers can exhaustively match on it.
|
||||
*/
|
||||
export const EVENT_TYPES = [
|
||||
"transaction.created",
|
||||
"participants.authorized",
|
||||
"preconditions.satisfied",
|
||||
"instrument.ready",
|
||||
"payment.ready",
|
||||
"transaction.prepared",
|
||||
"instrument.dispatched",
|
||||
"payment.dispatched",
|
||||
"instrument.acknowledged",
|
||||
"payment.accepted",
|
||||
"payment.settled",
|
||||
"transaction.validated",
|
||||
"transaction.committed",
|
||||
"transaction.aborted",
|
||||
"transaction.unwind_initiated",
|
||||
] as const;
|
||||
|
||||
export type EventType = (typeof EVENT_TYPES)[number];
|
||||
|
||||
export interface EventRecord {
|
||||
id: string;
|
||||
plan_id: string;
|
||||
type: EventType;
|
||||
actor: string | null;
|
||||
payload: Record<string, unknown>;
|
||||
payload_hash: string;
|
||||
prev_hash: string | null;
|
||||
signature: string;
|
||||
created_at: string;
|
||||
}
|
||||
|
||||
export interface PublishInput {
|
||||
planId: string;
|
||||
type: EventType;
|
||||
actor?: string;
|
||||
payload?: Record<string, unknown>;
|
||||
}
|
||||
|
||||
const emitter = new EventEmitter();
|
||||
emitter.setMaxListeners(0);
|
||||
|
||||
function getSigningSecret(): string {
|
||||
return (
|
||||
process.env.EVENT_BUS_HMAC_SECRET ??
|
||||
process.env.SESSION_SECRET ??
|
||||
"dev-event-bus-secret-change-in-production"
|
||||
);
|
||||
}
|
||||
|
||||
function sha256(input: string): string {
|
||||
return createHash("sha256").update(input).digest("hex");
|
||||
}
|
||||
|
||||
function sign(
|
||||
planId: string,
|
||||
type: string,
|
||||
payloadHash: string,
|
||||
prevHash: string | null,
|
||||
): string {
|
||||
const h = createHmac("sha256", getSigningSecret());
|
||||
h.update(`${planId}|${type}|${payloadHash}|${prevHash ?? ""}`);
|
||||
return h.digest("hex");
|
||||
}
|
||||
|
||||
/**
|
||||
* Publish a typed, signed, hash-chained event for a plan. Returns the
|
||||
* persisted record (including id + signature) so callers can reference
|
||||
* it from transition `source_event_id`.
|
||||
*/
|
||||
export async function publish(input: PublishInput): Promise<EventRecord> {
|
||||
const payload = input.payload ?? {};
|
||||
const payloadHash = sha256(JSON.stringify(payload));
|
||||
|
||||
const prev = await query<{ signature: string }>(
|
||||
`SELECT signature
|
||||
FROM events
|
||||
WHERE plan_id = $1
|
||||
ORDER BY created_at DESC, id DESC
|
||||
LIMIT 1`,
|
||||
[input.planId],
|
||||
);
|
||||
const prevHash = prev.length > 0 ? prev[0].signature : null;
|
||||
const signature = sign(input.planId, input.type, payloadHash, prevHash);
|
||||
|
||||
const rows = await query<EventRecord>(
|
||||
`INSERT INTO events (plan_id, type, actor, payload, payload_hash, prev_hash, signature)
|
||||
VALUES ($1, $2, $3, $4::jsonb, $5, $6, $7)
|
||||
RETURNING id, plan_id, type, actor, payload, payload_hash, prev_hash, signature, created_at`,
|
||||
[
|
||||
input.planId,
|
||||
input.type,
|
||||
input.actor ?? null,
|
||||
JSON.stringify(payload),
|
||||
payloadHash,
|
||||
prevHash,
|
||||
signature,
|
||||
],
|
||||
);
|
||||
|
||||
const record = rows[0];
|
||||
logger.info(
|
||||
{ planId: record.plan_id, type: record.type, eventId: record.id },
|
||||
"[EventBus] published",
|
||||
);
|
||||
|
||||
emitter.emit(`plan:${record.plan_id}`, record);
|
||||
emitter.emit("plan:*", record);
|
||||
|
||||
return record;
|
||||
}
|
||||
|
||||
/**
|
||||
* Read the full event trail for a plan in chronological order.
|
||||
*/
|
||||
export async function getEventsForPlan(planId: string): Promise<EventRecord[]> {
|
||||
return query<EventRecord>(
|
||||
`SELECT id, plan_id, type, actor, payload, payload_hash, prev_hash, signature, created_at
|
||||
FROM events
|
||||
WHERE plan_id = $1
|
||||
ORDER BY created_at ASC, id ASC`,
|
||||
[planId],
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Verify the full hash chain for a plan's events. Returns `{ ok: true }`
|
||||
* when every signature matches and `prev_hash` forms a contiguous chain;
|
||||
* otherwise returns the first index that fails with a reason.
|
||||
*/
|
||||
export async function verifyChain(planId: string): Promise<
|
||||
{ ok: true } | { ok: false; brokenAt: number; reason: string }
|
||||
> {
|
||||
const events = await getEventsForPlan(planId);
|
||||
let prevSig: string | null = null;
|
||||
for (let i = 0; i < events.length; i++) {
|
||||
const e = events[i];
|
||||
if (e.prev_hash !== prevSig) {
|
||||
return { ok: false, brokenAt: i, reason: "prev_hash mismatch" };
|
||||
}
|
||||
const expectedPayloadHash = sha256(JSON.stringify(e.payload));
|
||||
if (expectedPayloadHash !== e.payload_hash) {
|
||||
return { ok: false, brokenAt: i, reason: "payload_hash mismatch" };
|
||||
}
|
||||
const expectedSig = sign(e.plan_id, e.type, e.payload_hash, e.prev_hash);
|
||||
if (expectedSig !== e.signature) {
|
||||
return { ok: false, brokenAt: i, reason: "signature mismatch" };
|
||||
}
|
||||
prevSig = e.signature;
|
||||
}
|
||||
return { ok: true };
|
||||
}
|
||||
|
||||
/**
|
||||
* Subscribe to live events for a single plan. Returns an unsubscribe
|
||||
* function. Used by the SSE route.
|
||||
*/
|
||||
export function subscribe(
|
||||
planId: string,
|
||||
callback: (record: EventRecord) => void,
|
||||
): () => void {
|
||||
const channel = `plan:${planId}`;
|
||||
emitter.on(channel, callback);
|
||||
return () => emitter.off(channel, callback);
|
||||
}
|
||||
|
||||
/** test-only emitter access, never import in prod code */
|
||||
export const __emitterForTests = emitter;
|
||||
Reference in New Issue
Block a user