PR D: typed + signed event bus + events table + SSE (arch step 5)
- db/migrations/003_events.ts: append-only events table with
payload_hash, prev_hash, HMAC signature, indexed by plan_id + type
- services/eventBus.ts: EVENT_TYPES union (all 15 arch §7.2
categories), publish() with hash-chain + HMAC signing, verifyChain()
for tamper detection, subscribe() via in-process EventEmitter
- api/plans.ts:
- GET /api/plans/:planId/events (?verify=1 returns chain_valid)
- GET /api/plans/:planId/events/stream (SSE with history replay +
live push, 15s keep-alive, clean unsubscribe on client disconnect)
- index.ts: register the two new endpoints
- tests/unit/eventBus.test.ts: 9 tests covering publish, hash chain,
per-plan isolation, and three tamper-detection scenarios (payload,
signature, prev_hash)
60 tests pass. tsc clean.
This commit is contained in:
149
orchestrator/tests/unit/eventBus.test.ts
Normal file
149
orchestrator/tests/unit/eventBus.test.ts
Normal file
@@ -0,0 +1,149 @@
|
||||
import { describe, it, expect, beforeEach, jest } from "@jest/globals";
|
||||
|
||||
type Row = {
|
||||
id: string;
|
||||
plan_id: string;
|
||||
type: string;
|
||||
actor: string | null;
|
||||
payload: Record<string, unknown>;
|
||||
payload_hash: string;
|
||||
prev_hash: string | null;
|
||||
signature: string;
|
||||
created_at: string;
|
||||
};
|
||||
|
||||
const rows: Row[] = [];
|
||||
let idSeq = 0;
|
||||
|
||||
jest.mock("../../src/db/postgres", () => ({
|
||||
query: async (sql: string, params: unknown[] = []) => {
|
||||
if (sql.startsWith("SELECT signature")) {
|
||||
const planId = params[0] as string;
|
||||
const matches = rows.filter((r) => r.plan_id === planId);
|
||||
if (matches.length === 0) return [];
|
||||
return [{ signature: matches[matches.length - 1].signature }];
|
||||
}
|
||||
if (sql.startsWith("INSERT INTO events")) {
|
||||
const [plan_id, type, actor, payloadJson, payload_hash, prev_hash, signature] =
|
||||
params as [string, string, string | null, string, string, string | null, string];
|
||||
const rec: Row = {
|
||||
id: `evt-${++idSeq}`,
|
||||
plan_id,
|
||||
type,
|
||||
actor,
|
||||
payload: JSON.parse(payloadJson),
|
||||
payload_hash,
|
||||
prev_hash,
|
||||
signature,
|
||||
created_at: new Date(Date.now() + idSeq).toISOString(),
|
||||
};
|
||||
rows.push(rec);
|
||||
return [rec];
|
||||
}
|
||||
if (sql.startsWith("SELECT id, plan_id")) {
|
||||
const planId = params[0] as string;
|
||||
return rows.filter((r) => r.plan_id === planId);
|
||||
}
|
||||
return [];
|
||||
},
|
||||
}));
|
||||
|
||||
import { publish, getEventsForPlan, verifyChain, EVENT_TYPES } from "../../src/services/eventBus";
|
||||
|
||||
describe("Event Bus", () => {
|
||||
beforeEach(() => {
|
||||
rows.length = 0;
|
||||
idSeq = 0;
|
||||
});
|
||||
|
||||
it("EVENT_TYPES covers all arch §7.2 categories", () => {
|
||||
expect(EVENT_TYPES).toContain("transaction.created");
|
||||
expect(EVENT_TYPES).toContain("transaction.committed");
|
||||
expect(EVENT_TYPES).toContain("transaction.aborted");
|
||||
expect(EVENT_TYPES).toContain("payment.settled");
|
||||
expect(EVENT_TYPES).toContain("instrument.dispatched");
|
||||
expect(EVENT_TYPES.length).toBe(15);
|
||||
});
|
||||
|
||||
it("publish persists with payload_hash, prev_hash=null, and signature", async () => {
|
||||
const rec = await publish({
|
||||
planId: "p-1",
|
||||
type: "transaction.created",
|
||||
actor: "coordinator",
|
||||
payload: { foo: 1 },
|
||||
});
|
||||
expect(rec.id).toMatch(/evt-/);
|
||||
expect(rec.prev_hash).toBeNull();
|
||||
expect(rec.payload_hash).toMatch(/^[0-9a-f]{64}$/);
|
||||
expect(rec.signature).toMatch(/^[0-9a-f]{64}$/);
|
||||
expect(rec.payload).toEqual({ foo: 1 });
|
||||
});
|
||||
|
||||
it("prev_hash chains consecutive events for the same plan", async () => {
|
||||
const a = await publish({ planId: "p-1", type: "transaction.created" });
|
||||
const b = await publish({ planId: "p-1", type: "participants.authorized" });
|
||||
const c = await publish({ planId: "p-1", type: "preconditions.satisfied" });
|
||||
expect(a.prev_hash).toBeNull();
|
||||
expect(b.prev_hash).toBe(a.signature);
|
||||
expect(c.prev_hash).toBe(b.signature);
|
||||
});
|
||||
|
||||
it("events are isolated per plan_id", async () => {
|
||||
const a1 = await publish({ planId: "p-1", type: "transaction.created" });
|
||||
const b1 = await publish({ planId: "p-2", type: "transaction.created" });
|
||||
expect(a1.prev_hash).toBeNull();
|
||||
expect(b1.prev_hash).toBeNull();
|
||||
});
|
||||
|
||||
it("verifyChain returns ok for an untampered chain", async () => {
|
||||
await publish({ planId: "p-1", type: "transaction.created" });
|
||||
await publish({ planId: "p-1", type: "transaction.prepared" });
|
||||
await publish({ planId: "p-1", type: "transaction.committed" });
|
||||
const result = await verifyChain("p-1");
|
||||
expect(result.ok).toBe(true);
|
||||
});
|
||||
|
||||
it("verifyChain detects payload tampering", async () => {
|
||||
await publish({ planId: "p-1", type: "transaction.created", payload: { amount: 100 } });
|
||||
await publish({ planId: "p-1", type: "transaction.committed" });
|
||||
rows[0].payload = { amount: 999_999 }; // tamper
|
||||
const result = await verifyChain("p-1");
|
||||
expect(result.ok).toBe(false);
|
||||
if (!result.ok) {
|
||||
expect(result.brokenAt).toBe(0);
|
||||
expect(result.reason).toBe("payload_hash mismatch");
|
||||
}
|
||||
});
|
||||
|
||||
it("verifyChain detects signature tampering", async () => {
|
||||
await publish({ planId: "p-1", type: "transaction.created" });
|
||||
await publish({ planId: "p-1", type: "transaction.committed" });
|
||||
rows[1].signature = "0".repeat(64); // tamper
|
||||
const result = await verifyChain("p-1");
|
||||
expect(result.ok).toBe(false);
|
||||
if (!result.ok) {
|
||||
expect(result.brokenAt).toBe(1);
|
||||
}
|
||||
});
|
||||
|
||||
it("verifyChain detects broken prev_hash link", async () => {
|
||||
await publish({ planId: "p-1", type: "transaction.created" });
|
||||
await publish({ planId: "p-1", type: "transaction.committed" });
|
||||
rows[1].prev_hash = "0".repeat(64);
|
||||
const result = await verifyChain("p-1");
|
||||
expect(result.ok).toBe(false);
|
||||
if (!result.ok) {
|
||||
expect(result.reason).toBe("prev_hash mismatch");
|
||||
}
|
||||
});
|
||||
|
||||
it("getEventsForPlan returns events in chronological order", async () => {
|
||||
await publish({ planId: "p-1", type: "transaction.created" });
|
||||
await publish({ planId: "p-1", type: "transaction.prepared" });
|
||||
const events = await getEventsForPlan("p-1");
|
||||
expect(events.map((e) => e.type)).toEqual([
|
||||
"transaction.created",
|
||||
"transaction.prepared",
|
||||
]);
|
||||
});
|
||||
});
|
||||
Reference in New Issue
Block a user