From 59e1a85267110e640910f2f19d531e7edf21e290 Mon Sep 17 00:00:00 2001 From: Devin Date: Wed, 22 Apr 2026 16:37:45 +0000 Subject: [PATCH] PR D: typed + signed event bus + events table + SSE (arch step 5) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - db/migrations/003_events.ts: append-only events table with payload_hash, prev_hash, HMAC signature, indexed by plan_id + type - services/eventBus.ts: EVENT_TYPES union (all 15 arch §7.2 categories), publish() with hash-chain + HMAC signing, verifyChain() for tamper detection, subscribe() via in-process EventEmitter - api/plans.ts: - GET /api/plans/:planId/events (?verify=1 returns chain_valid) - GET /api/plans/:planId/events/stream (SSE with history replay + live push, 15s keep-alive, clean unsubscribe on client disconnect) - index.ts: register the two new endpoints - tests/unit/eventBus.test.ts: 9 tests covering publish, hash chain, per-plan isolation, and three tamper-detection scenarios (payload, signature, prev_hash) 60 tests pass. tsc clean. --- orchestrator/src/api/plans.ts | 84 ++++++++ orchestrator/src/db/migrations/003_events.ts | 43 ++++ orchestrator/src/db/migrations/index.ts | 2 + orchestrator/src/index.ts | 4 +- orchestrator/src/services/eventBus.ts | 197 +++++++++++++++++++ orchestrator/tests/unit/eventBus.test.ts | 149 ++++++++++++++ 6 files changed, 478 insertions(+), 1 deletion(-) create mode 100644 orchestrator/src/db/migrations/003_events.ts create mode 100644 orchestrator/src/services/eventBus.ts create mode 100644 orchestrator/tests/unit/eventBus.test.ts diff --git a/orchestrator/src/api/plans.ts b/orchestrator/src/api/plans.ts index 4c9927a..ce46eb9 100644 --- a/orchestrator/src/api/plans.ts +++ b/orchestrator/src/api/plans.ts @@ -5,6 +5,11 @@ import { validatePlan, checkStepDependencies } from "../services/planValidation" import { storePlan, getPlanById, updatePlanSignature, listPlans } from "../db/plans"; import { asyncHandler, AppError, ErrorType } from "../services/errorHandler"; import { getTransactionState, getTransitionHistory } from "../services/stateMachine"; +import { + getEventsForPlan, + subscribe as subscribeToEvents, + verifyChain, +} from "../services/eventBus"; import type { Plan, PlanStep } from "../types/plan"; /** @@ -220,3 +225,82 @@ export const getPlanState = asyncHandler(async (req: Request, res: Response) => }); }); +/** + * GET /api/plans/:planId/events + * Return the full signed + hash-chained event trail for a plan + * (arch §4.5 State Registry + §7 Event Model + §14 Audit). + * + * Query `?verify=1` re-verifies the chain server-side and adds + * { chain_valid: true|false, broken_at?: n } to the response. + */ +export const getPlanEvents = asyncHandler(async (req: Request, res: Response) => { + const { planId } = req.params; + const plan = await getPlanById(planId); + if (!plan) { + throw new AppError(ErrorType.NOT_FOUND_ERROR, 404, "Plan not found"); + } + + const events = await getEventsForPlan(planId); + + const body: { + plan_id: string; + count: number; + events: typeof events; + chain_valid?: boolean; + broken_at?: number; + broken_reason?: string; + } = { plan_id: planId, count: events.length, events }; + + if (req.query.verify === "1") { + const v = await verifyChain(planId); + body.chain_valid = v.ok; + if (!v.ok) { + body.broken_at = v.brokenAt; + body.broken_reason = v.reason; + } + } + + res.json(body); +}); + +/** + * GET /api/plans/:planId/events/stream + * Server-sent-events stream of live events for a single plan. + */ +export const streamPlanEvents = asyncHandler(async (req: Request, res: Response) => { + const { planId } = req.params; + const plan = await getPlanById(planId); + if (!plan) { + throw new AppError(ErrorType.NOT_FOUND_ERROR, 404, "Plan not found"); + } + + res.setHeader("Content-Type", "text/event-stream"); + res.setHeader("Cache-Control", "no-cache, no-transform"); + res.setHeader("Connection", "keep-alive"); + res.setHeader("X-Accel-Buffering", "no"); + res.flushHeaders?.(); + + // Replay the history on connect so clients can reconstruct state + // without a separate REST call. + const history = await getEventsForPlan(planId); + for (const e of history) { + res.write(`id: ${e.id}\nevent: ${e.type}\ndata: ${JSON.stringify(e)}\n\n`); + } + + const unsubscribe = subscribeToEvents(planId, (record) => { + res.write( + `id: ${record.id}\nevent: ${record.type}\ndata: ${JSON.stringify(record)}\n\n`, + ); + }); + + const keepAlive = setInterval(() => { + res.write(": keep-alive\n\n"); + }, 15_000); + + req.on("close", () => { + clearInterval(keepAlive); + unsubscribe(); + res.end(); + }); +}); + diff --git a/orchestrator/src/db/migrations/003_events.ts b/orchestrator/src/db/migrations/003_events.ts new file mode 100644 index 0000000..bdf23b6 --- /dev/null +++ b/orchestrator/src/db/migrations/003_events.ts @@ -0,0 +1,43 @@ +import { query } from "../postgres"; + +/** + * Migration 003 — append-only events journal (arch §4.5, §5.5, §7). + * + * The `events` table is the system-of-record for normalised workflow + * events (arch §7.2: `transaction.created`, `instrument.ready`, + * `payment.settled`, `transaction.committed`, …). It is: + * + * - append-only (no UPDATE / DELETE) + * - signed (HMAC of (plan_id, type, payload_hash, prev_hash)) + * - hash-chained via prev_hash for tamper-evident forensic replay + * - indexed by plan_id so the SSE endpoint can stream efficiently + */ +export async function up() { + await query( + `CREATE TABLE IF NOT EXISTS events ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + plan_id UUID NOT NULL REFERENCES plans(plan_id) ON DELETE CASCADE, + type VARCHAR(128) NOT NULL, + actor VARCHAR(255), + payload JSONB NOT NULL DEFAULT '{}'::jsonb, + payload_hash CHAR(64) NOT NULL, + prev_hash CHAR(64), + signature CHAR(64) NOT NULL, + created_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP + )`, + ); + + await query( + `CREATE INDEX IF NOT EXISTS idx_events_plan_id_created + ON events(plan_id, created_at)`, + ); + + await query( + `CREATE INDEX IF NOT EXISTS idx_events_type + ON events(type)`, + ); +} + +export async function down() { + await query("DROP TABLE IF EXISTS events CASCADE"); +} diff --git a/orchestrator/src/db/migrations/index.ts b/orchestrator/src/db/migrations/index.ts index 778910c..cd03f8f 100644 --- a/orchestrator/src/db/migrations/index.ts +++ b/orchestrator/src/db/migrations/index.ts @@ -1,5 +1,6 @@ import { up as up001 } from "./001_initial_schema"; import { up as up002 } from "./002_transaction_state"; +import { up as up003 } from "./003_events"; /** * Run all migrations @@ -8,6 +9,7 @@ export async function runMigration() { try { await up001(); await up002(); + await up003(); console.log("All migrations completed"); } catch (error) { console.error("Migration failed:", error); diff --git a/orchestrator/src/index.ts b/orchestrator/src/index.ts index c07abd5..3e4add3 100644 --- a/orchestrator/src/index.ts +++ b/orchestrator/src/index.ts @@ -14,7 +14,7 @@ import { requestTimeout } from "./middleware/timeout"; import { logger } from "./logging/logger"; import { getMetrics, httpRequestDuration, httpRequestTotal, register } from "./metrics/prometheus"; import { healthCheck, readinessCheck, livenessCheck } from "./health/health"; -import { listPlansEndpoint, createPlan, getPlan, getPlanState, addSignature, validatePlanEndpoint } from "./api/plans"; +import { listPlansEndpoint, createPlan, getPlan, getPlanState, getPlanEvents, streamPlanEvents, addSignature, validatePlanEndpoint } from "./api/plans"; import { streamPlanStatus } from "./api/sse"; import { executionCoordinator } from "./services/execution"; import { runMigration } from "./db/migrations"; @@ -89,6 +89,8 @@ app.get("/api/plans", listPlansEndpoint); app.post("/api/plans", auditLog("CREATE_PLAN", "plan"), createPlan); app.get("/api/plans/:planId", getPlan); app.get("/api/plans/:planId/state", getPlanState); +app.get("/api/plans/:planId/events", getPlanEvents); +app.get("/api/plans/:planId/events/stream", streamPlanEvents); app.post("/api/plans/:planId/signature", addSignature); app.post("/api/plans/:planId/validate", validatePlanEndpoint); diff --git a/orchestrator/src/services/eventBus.ts b/orchestrator/src/services/eventBus.ts new file mode 100644 index 0000000..54a8fa7 --- /dev/null +++ b/orchestrator/src/services/eventBus.ts @@ -0,0 +1,197 @@ +/** + * Typed, signed, append-only Event Bus (arch §5.5 Event Bus + §7). + * + * Architecture contract + * --------------------- + * 1. Every event is a normalised category from arch §7.2 — `EventType`. + * 2. Every event is persisted to the `events` append-only table. + * 3. Every event carries + * payload_hash = sha256(JSON.stringify(payload)) + * prev_hash = signature of the previous event for the same plan + * signature = hmac_sha256(secret, plan_id|type|payload_hash|prev_hash) + * which gives a tamper-evident per-plan hash chain (arch §14 audit). + * 4. Callers can subscribe to live events via `subscribe(planId, cb)` — + * backed by a process-local EventEmitter that the SSE route consumes. + * + * When the orchestrator scales to >1 replicas, the in-process emitter + * must be replaced by a broker (NATS / Kafka). The persistence layer + * and signature chain remain unchanged. + */ + +import { createHash, createHmac } from "crypto"; +import { EventEmitter } from "events"; +import { query } from "../db/postgres"; +import { logger } from "../logging/logger"; + +/** + * Normalised event types — arch §7.2. Keep this list as the single + * source of truth so subscribers can exhaustively match on it. + */ +export const EVENT_TYPES = [ + "transaction.created", + "participants.authorized", + "preconditions.satisfied", + "instrument.ready", + "payment.ready", + "transaction.prepared", + "instrument.dispatched", + "payment.dispatched", + "instrument.acknowledged", + "payment.accepted", + "payment.settled", + "transaction.validated", + "transaction.committed", + "transaction.aborted", + "transaction.unwind_initiated", +] as const; + +export type EventType = (typeof EVENT_TYPES)[number]; + +export interface EventRecord { + id: string; + plan_id: string; + type: EventType; + actor: string | null; + payload: Record; + payload_hash: string; + prev_hash: string | null; + signature: string; + created_at: string; +} + +export interface PublishInput { + planId: string; + type: EventType; + actor?: string; + payload?: Record; +} + +const emitter = new EventEmitter(); +emitter.setMaxListeners(0); + +function getSigningSecret(): string { + return ( + process.env.EVENT_BUS_HMAC_SECRET ?? + process.env.SESSION_SECRET ?? + "dev-event-bus-secret-change-in-production" + ); +} + +function sha256(input: string): string { + return createHash("sha256").update(input).digest("hex"); +} + +function sign( + planId: string, + type: string, + payloadHash: string, + prevHash: string | null, +): string { + const h = createHmac("sha256", getSigningSecret()); + h.update(`${planId}|${type}|${payloadHash}|${prevHash ?? ""}`); + return h.digest("hex"); +} + +/** + * Publish a typed, signed, hash-chained event for a plan. Returns the + * persisted record (including id + signature) so callers can reference + * it from transition `source_event_id`. + */ +export async function publish(input: PublishInput): Promise { + const payload = input.payload ?? {}; + const payloadHash = sha256(JSON.stringify(payload)); + + const prev = await query<{ signature: string }>( + `SELECT signature + FROM events + WHERE plan_id = $1 + ORDER BY created_at DESC, id DESC + LIMIT 1`, + [input.planId], + ); + const prevHash = prev.length > 0 ? prev[0].signature : null; + const signature = sign(input.planId, input.type, payloadHash, prevHash); + + const rows = await query( + `INSERT INTO events (plan_id, type, actor, payload, payload_hash, prev_hash, signature) + VALUES ($1, $2, $3, $4::jsonb, $5, $6, $7) + RETURNING id, plan_id, type, actor, payload, payload_hash, prev_hash, signature, created_at`, + [ + input.planId, + input.type, + input.actor ?? null, + JSON.stringify(payload), + payloadHash, + prevHash, + signature, + ], + ); + + const record = rows[0]; + logger.info( + { planId: record.plan_id, type: record.type, eventId: record.id }, + "[EventBus] published", + ); + + emitter.emit(`plan:${record.plan_id}`, record); + emitter.emit("plan:*", record); + + return record; +} + +/** + * Read the full event trail for a plan in chronological order. + */ +export async function getEventsForPlan(planId: string): Promise { + return query( + `SELECT id, plan_id, type, actor, payload, payload_hash, prev_hash, signature, created_at + FROM events + WHERE plan_id = $1 + ORDER BY created_at ASC, id ASC`, + [planId], + ); +} + +/** + * Verify the full hash chain for a plan's events. Returns `{ ok: true }` + * when every signature matches and `prev_hash` forms a contiguous chain; + * otherwise returns the first index that fails with a reason. + */ +export async function verifyChain(planId: string): Promise< + { ok: true } | { ok: false; brokenAt: number; reason: string } +> { + const events = await getEventsForPlan(planId); + let prevSig: string | null = null; + for (let i = 0; i < events.length; i++) { + const e = events[i]; + if (e.prev_hash !== prevSig) { + return { ok: false, brokenAt: i, reason: "prev_hash mismatch" }; + } + const expectedPayloadHash = sha256(JSON.stringify(e.payload)); + if (expectedPayloadHash !== e.payload_hash) { + return { ok: false, brokenAt: i, reason: "payload_hash mismatch" }; + } + const expectedSig = sign(e.plan_id, e.type, e.payload_hash, e.prev_hash); + if (expectedSig !== e.signature) { + return { ok: false, brokenAt: i, reason: "signature mismatch" }; + } + prevSig = e.signature; + } + return { ok: true }; +} + +/** + * Subscribe to live events for a single plan. Returns an unsubscribe + * function. Used by the SSE route. + */ +export function subscribe( + planId: string, + callback: (record: EventRecord) => void, +): () => void { + const channel = `plan:${planId}`; + emitter.on(channel, callback); + return () => emitter.off(channel, callback); +} + +/** test-only emitter access, never import in prod code */ +export const __emitterForTests = emitter; diff --git a/orchestrator/tests/unit/eventBus.test.ts b/orchestrator/tests/unit/eventBus.test.ts new file mode 100644 index 0000000..c5acee9 --- /dev/null +++ b/orchestrator/tests/unit/eventBus.test.ts @@ -0,0 +1,149 @@ +import { describe, it, expect, beforeEach, jest } from "@jest/globals"; + +type Row = { + id: string; + plan_id: string; + type: string; + actor: string | null; + payload: Record; + payload_hash: string; + prev_hash: string | null; + signature: string; + created_at: string; +}; + +const rows: Row[] = []; +let idSeq = 0; + +jest.mock("../../src/db/postgres", () => ({ + query: async (sql: string, params: unknown[] = []) => { + if (sql.startsWith("SELECT signature")) { + const planId = params[0] as string; + const matches = rows.filter((r) => r.plan_id === planId); + if (matches.length === 0) return []; + return [{ signature: matches[matches.length - 1].signature }]; + } + if (sql.startsWith("INSERT INTO events")) { + const [plan_id, type, actor, payloadJson, payload_hash, prev_hash, signature] = + params as [string, string, string | null, string, string, string | null, string]; + const rec: Row = { + id: `evt-${++idSeq}`, + plan_id, + type, + actor, + payload: JSON.parse(payloadJson), + payload_hash, + prev_hash, + signature, + created_at: new Date(Date.now() + idSeq).toISOString(), + }; + rows.push(rec); + return [rec]; + } + if (sql.startsWith("SELECT id, plan_id")) { + const planId = params[0] as string; + return rows.filter((r) => r.plan_id === planId); + } + return []; + }, +})); + +import { publish, getEventsForPlan, verifyChain, EVENT_TYPES } from "../../src/services/eventBus"; + +describe("Event Bus", () => { + beforeEach(() => { + rows.length = 0; + idSeq = 0; + }); + + it("EVENT_TYPES covers all arch §7.2 categories", () => { + expect(EVENT_TYPES).toContain("transaction.created"); + expect(EVENT_TYPES).toContain("transaction.committed"); + expect(EVENT_TYPES).toContain("transaction.aborted"); + expect(EVENT_TYPES).toContain("payment.settled"); + expect(EVENT_TYPES).toContain("instrument.dispatched"); + expect(EVENT_TYPES.length).toBe(15); + }); + + it("publish persists with payload_hash, prev_hash=null, and signature", async () => { + const rec = await publish({ + planId: "p-1", + type: "transaction.created", + actor: "coordinator", + payload: { foo: 1 }, + }); + expect(rec.id).toMatch(/evt-/); + expect(rec.prev_hash).toBeNull(); + expect(rec.payload_hash).toMatch(/^[0-9a-f]{64}$/); + expect(rec.signature).toMatch(/^[0-9a-f]{64}$/); + expect(rec.payload).toEqual({ foo: 1 }); + }); + + it("prev_hash chains consecutive events for the same plan", async () => { + const a = await publish({ planId: "p-1", type: "transaction.created" }); + const b = await publish({ planId: "p-1", type: "participants.authorized" }); + const c = await publish({ planId: "p-1", type: "preconditions.satisfied" }); + expect(a.prev_hash).toBeNull(); + expect(b.prev_hash).toBe(a.signature); + expect(c.prev_hash).toBe(b.signature); + }); + + it("events are isolated per plan_id", async () => { + const a1 = await publish({ planId: "p-1", type: "transaction.created" }); + const b1 = await publish({ planId: "p-2", type: "transaction.created" }); + expect(a1.prev_hash).toBeNull(); + expect(b1.prev_hash).toBeNull(); + }); + + it("verifyChain returns ok for an untampered chain", async () => { + await publish({ planId: "p-1", type: "transaction.created" }); + await publish({ planId: "p-1", type: "transaction.prepared" }); + await publish({ planId: "p-1", type: "transaction.committed" }); + const result = await verifyChain("p-1"); + expect(result.ok).toBe(true); + }); + + it("verifyChain detects payload tampering", async () => { + await publish({ planId: "p-1", type: "transaction.created", payload: { amount: 100 } }); + await publish({ planId: "p-1", type: "transaction.committed" }); + rows[0].payload = { amount: 999_999 }; // tamper + const result = await verifyChain("p-1"); + expect(result.ok).toBe(false); + if (!result.ok) { + expect(result.brokenAt).toBe(0); + expect(result.reason).toBe("payload_hash mismatch"); + } + }); + + it("verifyChain detects signature tampering", async () => { + await publish({ planId: "p-1", type: "transaction.created" }); + await publish({ planId: "p-1", type: "transaction.committed" }); + rows[1].signature = "0".repeat(64); // tamper + const result = await verifyChain("p-1"); + expect(result.ok).toBe(false); + if (!result.ok) { + expect(result.brokenAt).toBe(1); + } + }); + + it("verifyChain detects broken prev_hash link", async () => { + await publish({ planId: "p-1", type: "transaction.created" }); + await publish({ planId: "p-1", type: "transaction.committed" }); + rows[1].prev_hash = "0".repeat(64); + const result = await verifyChain("p-1"); + expect(result.ok).toBe(false); + if (!result.ok) { + expect(result.reason).toBe("prev_hash mismatch"); + } + }); + + it("getEventsForPlan returns events in chronological order", async () => { + await publish({ planId: "p-1", type: "transaction.created" }); + await publish({ planId: "p-1", type: "transaction.prepared" }); + const events = await getEventsForPlan("p-1"); + expect(events.map((e) => e.type)).toEqual([ + "transaction.created", + "transaction.prepared", + ]); + }); +}); -- 2.34.1