PR D: typed + signed event bus + events table + SSE (arch step 5) #8
@@ -5,6 +5,11 @@ import { validatePlan, checkStepDependencies } from "../services/planValidation"
|
||||
import { storePlan, getPlanById, updatePlanSignature, listPlans } from "../db/plans";
|
||||
import { asyncHandler, AppError, ErrorType } from "../services/errorHandler";
|
||||
import { getTransactionState, getTransitionHistory } from "../services/stateMachine";
|
||||
import {
|
||||
getEventsForPlan,
|
||||
subscribe as subscribeToEvents,
|
||||
verifyChain,
|
||||
} from "../services/eventBus";
|
||||
import type { Plan, PlanStep } from "../types/plan";
|
||||
|
||||
/**
|
||||
@@ -220,3 +225,82 @@ export const getPlanState = asyncHandler(async (req: Request, res: Response) =>
|
||||
});
|
||||
});
|
||||
|
||||
/**
|
||||
* GET /api/plans/:planId/events
|
||||
* Return the full signed + hash-chained event trail for a plan
|
||||
* (arch §4.5 State Registry + §7 Event Model + §14 Audit).
|
||||
*
|
||||
* Query `?verify=1` re-verifies the chain server-side and adds
|
||||
* { chain_valid: true|false, broken_at?: n } to the response.
|
||||
*/
|
||||
export const getPlanEvents = asyncHandler(async (req: Request, res: Response) => {
|
||||
const { planId } = req.params;
|
||||
const plan = await getPlanById(planId);
|
||||
if (!plan) {
|
||||
throw new AppError(ErrorType.NOT_FOUND_ERROR, 404, "Plan not found");
|
||||
}
|
||||
|
||||
const events = await getEventsForPlan(planId);
|
||||
|
||||
const body: {
|
||||
plan_id: string;
|
||||
count: number;
|
||||
events: typeof events;
|
||||
chain_valid?: boolean;
|
||||
broken_at?: number;
|
||||
broken_reason?: string;
|
||||
} = { plan_id: planId, count: events.length, events };
|
||||
|
||||
if (req.query.verify === "1") {
|
||||
const v = await verifyChain(planId);
|
||||
body.chain_valid = v.ok;
|
||||
if (!v.ok) {
|
||||
body.broken_at = v.brokenAt;
|
||||
body.broken_reason = v.reason;
|
||||
}
|
||||
}
|
||||
|
||||
res.json(body);
|
||||
});
|
||||
|
||||
/**
|
||||
* GET /api/plans/:planId/events/stream
|
||||
* Server-sent-events stream of live events for a single plan.
|
||||
*/
|
||||
export const streamPlanEvents = asyncHandler(async (req: Request, res: Response) => {
|
||||
const { planId } = req.params;
|
||||
const plan = await getPlanById(planId);
|
||||
if (!plan) {
|
||||
throw new AppError(ErrorType.NOT_FOUND_ERROR, 404, "Plan not found");
|
||||
}
|
||||
|
||||
res.setHeader("Content-Type", "text/event-stream");
|
||||
res.setHeader("Cache-Control", "no-cache, no-transform");
|
||||
res.setHeader("Connection", "keep-alive");
|
||||
res.setHeader("X-Accel-Buffering", "no");
|
||||
res.flushHeaders?.();
|
||||
|
||||
// Replay the history on connect so clients can reconstruct state
|
||||
// without a separate REST call.
|
||||
const history = await getEventsForPlan(planId);
|
||||
for (const e of history) {
|
||||
res.write(`id: ${e.id}\nevent: ${e.type}\ndata: ${JSON.stringify(e)}\n\n`);
|
||||
}
|
||||
|
||||
const unsubscribe = subscribeToEvents(planId, (record) => {
|
||||
res.write(
|
||||
`id: ${record.id}\nevent: ${record.type}\ndata: ${JSON.stringify(record)}\n\n`,
|
||||
);
|
||||
});
|
||||
|
||||
const keepAlive = setInterval(() => {
|
||||
res.write(": keep-alive\n\n");
|
||||
}, 15_000);
|
||||
|
||||
req.on("close", () => {
|
||||
clearInterval(keepAlive);
|
||||
unsubscribe();
|
||||
res.end();
|
||||
});
|
||||
});
|
||||
|
||||
|
||||
43
orchestrator/src/db/migrations/003_events.ts
Normal file
43
orchestrator/src/db/migrations/003_events.ts
Normal file
@@ -0,0 +1,43 @@
|
||||
import { query } from "../postgres";
|
||||
|
||||
/**
|
||||
* Migration 003 — append-only events journal (arch §4.5, §5.5, §7).
|
||||
*
|
||||
* The `events` table is the system-of-record for normalised workflow
|
||||
* events (arch §7.2: `transaction.created`, `instrument.ready`,
|
||||
* `payment.settled`, `transaction.committed`, …). It is:
|
||||
*
|
||||
* - append-only (no UPDATE / DELETE)
|
||||
* - signed (HMAC of (plan_id, type, payload_hash, prev_hash))
|
||||
* - hash-chained via prev_hash for tamper-evident forensic replay
|
||||
* - indexed by plan_id so the SSE endpoint can stream efficiently
|
||||
*/
|
||||
export async function up() {
|
||||
await query(
|
||||
`CREATE TABLE IF NOT EXISTS events (
|
||||
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
|
||||
plan_id UUID NOT NULL REFERENCES plans(plan_id) ON DELETE CASCADE,
|
||||
type VARCHAR(128) NOT NULL,
|
||||
actor VARCHAR(255),
|
||||
payload JSONB NOT NULL DEFAULT '{}'::jsonb,
|
||||
payload_hash CHAR(64) NOT NULL,
|
||||
prev_hash CHAR(64),
|
||||
signature CHAR(64) NOT NULL,
|
||||
created_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP
|
||||
)`,
|
||||
);
|
||||
|
||||
await query(
|
||||
`CREATE INDEX IF NOT EXISTS idx_events_plan_id_created
|
||||
ON events(plan_id, created_at)`,
|
||||
);
|
||||
|
||||
await query(
|
||||
`CREATE INDEX IF NOT EXISTS idx_events_type
|
||||
ON events(type)`,
|
||||
);
|
||||
}
|
||||
|
||||
export async function down() {
|
||||
await query("DROP TABLE IF EXISTS events CASCADE");
|
||||
}
|
||||
@@ -1,5 +1,6 @@
|
||||
import { up as up001 } from "./001_initial_schema";
|
||||
import { up as up002 } from "./002_transaction_state";
|
||||
import { up as up003 } from "./003_events";
|
||||
|
||||
/**
|
||||
* Run all migrations
|
||||
@@ -8,6 +9,7 @@ export async function runMigration() {
|
||||
try {
|
||||
await up001();
|
||||
await up002();
|
||||
await up003();
|
||||
console.log("All migrations completed");
|
||||
} catch (error) {
|
||||
console.error("Migration failed:", error);
|
||||
|
||||
@@ -14,7 +14,7 @@ import { requestTimeout } from "./middleware/timeout";
|
||||
import { logger } from "./logging/logger";
|
||||
import { getMetrics, httpRequestDuration, httpRequestTotal, register } from "./metrics/prometheus";
|
||||
import { healthCheck, readinessCheck, livenessCheck } from "./health/health";
|
||||
import { listPlansEndpoint, createPlan, getPlan, getPlanState, addSignature, validatePlanEndpoint } from "./api/plans";
|
||||
import { listPlansEndpoint, createPlan, getPlan, getPlanState, getPlanEvents, streamPlanEvents, addSignature, validatePlanEndpoint } from "./api/plans";
|
||||
import { streamPlanStatus } from "./api/sse";
|
||||
import { executionCoordinator } from "./services/execution";
|
||||
import { runMigration } from "./db/migrations";
|
||||
@@ -89,6 +89,8 @@ app.get("/api/plans", listPlansEndpoint);
|
||||
app.post("/api/plans", auditLog("CREATE_PLAN", "plan"), createPlan);
|
||||
app.get("/api/plans/:planId", getPlan);
|
||||
app.get("/api/plans/:planId/state", getPlanState);
|
||||
app.get("/api/plans/:planId/events", getPlanEvents);
|
||||
app.get("/api/plans/:planId/events/stream", streamPlanEvents);
|
||||
app.post("/api/plans/:planId/signature", addSignature);
|
||||
app.post("/api/plans/:planId/validate", validatePlanEndpoint);
|
||||
|
||||
|
||||
197
orchestrator/src/services/eventBus.ts
Normal file
197
orchestrator/src/services/eventBus.ts
Normal file
@@ -0,0 +1,197 @@
|
||||
/**
|
||||
* Typed, signed, append-only Event Bus (arch §5.5 Event Bus + §7).
|
||||
*
|
||||
* Architecture contract
|
||||
* ---------------------
|
||||
* 1. Every event is a normalised category from arch §7.2 — `EventType`.
|
||||
* 2. Every event is persisted to the `events` append-only table.
|
||||
* 3. Every event carries
|
||||
* payload_hash = sha256(JSON.stringify(payload))
|
||||
* prev_hash = signature of the previous event for the same plan
|
||||
* signature = hmac_sha256(secret, plan_id|type|payload_hash|prev_hash)
|
||||
* which gives a tamper-evident per-plan hash chain (arch §14 audit).
|
||||
* 4. Callers can subscribe to live events via `subscribe(planId, cb)` —
|
||||
* backed by a process-local EventEmitter that the SSE route consumes.
|
||||
*
|
||||
* When the orchestrator scales to >1 replicas, the in-process emitter
|
||||
* must be replaced by a broker (NATS / Kafka). The persistence layer
|
||||
* and signature chain remain unchanged.
|
||||
*/
|
||||
|
||||
import { createHash, createHmac } from "crypto";
|
||||
import { EventEmitter } from "events";
|
||||
import { query } from "../db/postgres";
|
||||
import { logger } from "../logging/logger";
|
||||
|
||||
/**
|
||||
* Normalised event types — arch §7.2. Keep this list as the single
|
||||
* source of truth so subscribers can exhaustively match on it.
|
||||
*/
|
||||
export const EVENT_TYPES = [
|
||||
"transaction.created",
|
||||
"participants.authorized",
|
||||
"preconditions.satisfied",
|
||||
"instrument.ready",
|
||||
"payment.ready",
|
||||
"transaction.prepared",
|
||||
"instrument.dispatched",
|
||||
"payment.dispatched",
|
||||
"instrument.acknowledged",
|
||||
"payment.accepted",
|
||||
"payment.settled",
|
||||
"transaction.validated",
|
||||
"transaction.committed",
|
||||
"transaction.aborted",
|
||||
"transaction.unwind_initiated",
|
||||
] as const;
|
||||
|
||||
export type EventType = (typeof EVENT_TYPES)[number];
|
||||
|
||||
export interface EventRecord {
|
||||
id: string;
|
||||
plan_id: string;
|
||||
type: EventType;
|
||||
actor: string | null;
|
||||
payload: Record<string, unknown>;
|
||||
payload_hash: string;
|
||||
prev_hash: string | null;
|
||||
signature: string;
|
||||
created_at: string;
|
||||
}
|
||||
|
||||
export interface PublishInput {
|
||||
planId: string;
|
||||
type: EventType;
|
||||
actor?: string;
|
||||
payload?: Record<string, unknown>;
|
||||
}
|
||||
|
||||
const emitter = new EventEmitter();
|
||||
emitter.setMaxListeners(0);
|
||||
|
||||
function getSigningSecret(): string {
|
||||
return (
|
||||
process.env.EVENT_BUS_HMAC_SECRET ??
|
||||
process.env.SESSION_SECRET ??
|
||||
"dev-event-bus-secret-change-in-production"
|
||||
);
|
||||
}
|
||||
|
||||
function sha256(input: string): string {
|
||||
return createHash("sha256").update(input).digest("hex");
|
||||
}
|
||||
|
||||
function sign(
|
||||
planId: string,
|
||||
type: string,
|
||||
payloadHash: string,
|
||||
prevHash: string | null,
|
||||
): string {
|
||||
const h = createHmac("sha256", getSigningSecret());
|
||||
h.update(`${planId}|${type}|${payloadHash}|${prevHash ?? ""}`);
|
||||
return h.digest("hex");
|
||||
}
|
||||
|
||||
/**
|
||||
* Publish a typed, signed, hash-chained event for a plan. Returns the
|
||||
* persisted record (including id + signature) so callers can reference
|
||||
* it from transition `source_event_id`.
|
||||
*/
|
||||
export async function publish(input: PublishInput): Promise<EventRecord> {
|
||||
const payload = input.payload ?? {};
|
||||
const payloadHash = sha256(JSON.stringify(payload));
|
||||
|
||||
const prev = await query<{ signature: string }>(
|
||||
`SELECT signature
|
||||
FROM events
|
||||
WHERE plan_id = $1
|
||||
ORDER BY created_at DESC, id DESC
|
||||
LIMIT 1`,
|
||||
[input.planId],
|
||||
);
|
||||
const prevHash = prev.length > 0 ? prev[0].signature : null;
|
||||
const signature = sign(input.planId, input.type, payloadHash, prevHash);
|
||||
|
||||
const rows = await query<EventRecord>(
|
||||
`INSERT INTO events (plan_id, type, actor, payload, payload_hash, prev_hash, signature)
|
||||
VALUES ($1, $2, $3, $4::jsonb, $5, $6, $7)
|
||||
RETURNING id, plan_id, type, actor, payload, payload_hash, prev_hash, signature, created_at`,
|
||||
[
|
||||
input.planId,
|
||||
input.type,
|
||||
input.actor ?? null,
|
||||
JSON.stringify(payload),
|
||||
payloadHash,
|
||||
prevHash,
|
||||
signature,
|
||||
],
|
||||
);
|
||||
|
||||
const record = rows[0];
|
||||
logger.info(
|
||||
{ planId: record.plan_id, type: record.type, eventId: record.id },
|
||||
"[EventBus] published",
|
||||
);
|
||||
|
||||
emitter.emit(`plan:${record.plan_id}`, record);
|
||||
emitter.emit("plan:*", record);
|
||||
|
||||
return record;
|
||||
}
|
||||
|
||||
/**
|
||||
* Read the full event trail for a plan in chronological order.
|
||||
*/
|
||||
export async function getEventsForPlan(planId: string): Promise<EventRecord[]> {
|
||||
return query<EventRecord>(
|
||||
`SELECT id, plan_id, type, actor, payload, payload_hash, prev_hash, signature, created_at
|
||||
FROM events
|
||||
WHERE plan_id = $1
|
||||
ORDER BY created_at ASC, id ASC`,
|
||||
[planId],
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Verify the full hash chain for a plan's events. Returns `{ ok: true }`
|
||||
* when every signature matches and `prev_hash` forms a contiguous chain;
|
||||
* otherwise returns the first index that fails with a reason.
|
||||
*/
|
||||
export async function verifyChain(planId: string): Promise<
|
||||
{ ok: true } | { ok: false; brokenAt: number; reason: string }
|
||||
> {
|
||||
const events = await getEventsForPlan(planId);
|
||||
let prevSig: string | null = null;
|
||||
for (let i = 0; i < events.length; i++) {
|
||||
const e = events[i];
|
||||
if (e.prev_hash !== prevSig) {
|
||||
return { ok: false, brokenAt: i, reason: "prev_hash mismatch" };
|
||||
}
|
||||
const expectedPayloadHash = sha256(JSON.stringify(e.payload));
|
||||
if (expectedPayloadHash !== e.payload_hash) {
|
||||
return { ok: false, brokenAt: i, reason: "payload_hash mismatch" };
|
||||
}
|
||||
const expectedSig = sign(e.plan_id, e.type, e.payload_hash, e.prev_hash);
|
||||
if (expectedSig !== e.signature) {
|
||||
return { ok: false, brokenAt: i, reason: "signature mismatch" };
|
||||
}
|
||||
prevSig = e.signature;
|
||||
}
|
||||
return { ok: true };
|
||||
}
|
||||
|
||||
/**
|
||||
* Subscribe to live events for a single plan. Returns an unsubscribe
|
||||
* function. Used by the SSE route.
|
||||
*/
|
||||
export function subscribe(
|
||||
planId: string,
|
||||
callback: (record: EventRecord) => void,
|
||||
): () => void {
|
||||
const channel = `plan:${planId}`;
|
||||
emitter.on(channel, callback);
|
||||
return () => emitter.off(channel, callback);
|
||||
}
|
||||
|
||||
/** test-only emitter access, never import in prod code */
|
||||
export const __emitterForTests = emitter;
|
||||
149
orchestrator/tests/unit/eventBus.test.ts
Normal file
149
orchestrator/tests/unit/eventBus.test.ts
Normal file
@@ -0,0 +1,149 @@
|
||||
import { describe, it, expect, beforeEach, jest } from "@jest/globals";
|
||||
|
||||
type Row = {
|
||||
id: string;
|
||||
plan_id: string;
|
||||
type: string;
|
||||
actor: string | null;
|
||||
payload: Record<string, unknown>;
|
||||
payload_hash: string;
|
||||
prev_hash: string | null;
|
||||
signature: string;
|
||||
created_at: string;
|
||||
};
|
||||
|
||||
const rows: Row[] = [];
|
||||
let idSeq = 0;
|
||||
|
||||
jest.mock("../../src/db/postgres", () => ({
|
||||
query: async (sql: string, params: unknown[] = []) => {
|
||||
if (sql.startsWith("SELECT signature")) {
|
||||
const planId = params[0] as string;
|
||||
const matches = rows.filter((r) => r.plan_id === planId);
|
||||
if (matches.length === 0) return [];
|
||||
return [{ signature: matches[matches.length - 1].signature }];
|
||||
}
|
||||
if (sql.startsWith("INSERT INTO events")) {
|
||||
const [plan_id, type, actor, payloadJson, payload_hash, prev_hash, signature] =
|
||||
params as [string, string, string | null, string, string, string | null, string];
|
||||
const rec: Row = {
|
||||
id: `evt-${++idSeq}`,
|
||||
plan_id,
|
||||
type,
|
||||
actor,
|
||||
payload: JSON.parse(payloadJson),
|
||||
payload_hash,
|
||||
prev_hash,
|
||||
signature,
|
||||
created_at: new Date(Date.now() + idSeq).toISOString(),
|
||||
};
|
||||
rows.push(rec);
|
||||
return [rec];
|
||||
}
|
||||
if (sql.startsWith("SELECT id, plan_id")) {
|
||||
const planId = params[0] as string;
|
||||
return rows.filter((r) => r.plan_id === planId);
|
||||
}
|
||||
return [];
|
||||
},
|
||||
}));
|
||||
|
||||
import { publish, getEventsForPlan, verifyChain, EVENT_TYPES } from "../../src/services/eventBus";
|
||||
|
||||
describe("Event Bus", () => {
|
||||
beforeEach(() => {
|
||||
rows.length = 0;
|
||||
idSeq = 0;
|
||||
});
|
||||
|
||||
it("EVENT_TYPES covers all arch §7.2 categories", () => {
|
||||
expect(EVENT_TYPES).toContain("transaction.created");
|
||||
expect(EVENT_TYPES).toContain("transaction.committed");
|
||||
expect(EVENT_TYPES).toContain("transaction.aborted");
|
||||
expect(EVENT_TYPES).toContain("payment.settled");
|
||||
expect(EVENT_TYPES).toContain("instrument.dispatched");
|
||||
expect(EVENT_TYPES.length).toBe(15);
|
||||
});
|
||||
|
||||
it("publish persists with payload_hash, prev_hash=null, and signature", async () => {
|
||||
const rec = await publish({
|
||||
planId: "p-1",
|
||||
type: "transaction.created",
|
||||
actor: "coordinator",
|
||||
payload: { foo: 1 },
|
||||
});
|
||||
expect(rec.id).toMatch(/evt-/);
|
||||
expect(rec.prev_hash).toBeNull();
|
||||
expect(rec.payload_hash).toMatch(/^[0-9a-f]{64}$/);
|
||||
expect(rec.signature).toMatch(/^[0-9a-f]{64}$/);
|
||||
expect(rec.payload).toEqual({ foo: 1 });
|
||||
});
|
||||
|
||||
it("prev_hash chains consecutive events for the same plan", async () => {
|
||||
const a = await publish({ planId: "p-1", type: "transaction.created" });
|
||||
const b = await publish({ planId: "p-1", type: "participants.authorized" });
|
||||
const c = await publish({ planId: "p-1", type: "preconditions.satisfied" });
|
||||
expect(a.prev_hash).toBeNull();
|
||||
expect(b.prev_hash).toBe(a.signature);
|
||||
expect(c.prev_hash).toBe(b.signature);
|
||||
});
|
||||
|
||||
it("events are isolated per plan_id", async () => {
|
||||
const a1 = await publish({ planId: "p-1", type: "transaction.created" });
|
||||
const b1 = await publish({ planId: "p-2", type: "transaction.created" });
|
||||
expect(a1.prev_hash).toBeNull();
|
||||
expect(b1.prev_hash).toBeNull();
|
||||
});
|
||||
|
||||
it("verifyChain returns ok for an untampered chain", async () => {
|
||||
await publish({ planId: "p-1", type: "transaction.created" });
|
||||
await publish({ planId: "p-1", type: "transaction.prepared" });
|
||||
await publish({ planId: "p-1", type: "transaction.committed" });
|
||||
const result = await verifyChain("p-1");
|
||||
expect(result.ok).toBe(true);
|
||||
});
|
||||
|
||||
it("verifyChain detects payload tampering", async () => {
|
||||
await publish({ planId: "p-1", type: "transaction.created", payload: { amount: 100 } });
|
||||
await publish({ planId: "p-1", type: "transaction.committed" });
|
||||
rows[0].payload = { amount: 999_999 }; // tamper
|
||||
const result = await verifyChain("p-1");
|
||||
expect(result.ok).toBe(false);
|
||||
if (!result.ok) {
|
||||
expect(result.brokenAt).toBe(0);
|
||||
expect(result.reason).toBe("payload_hash mismatch");
|
||||
}
|
||||
});
|
||||
|
||||
it("verifyChain detects signature tampering", async () => {
|
||||
await publish({ planId: "p-1", type: "transaction.created" });
|
||||
await publish({ planId: "p-1", type: "transaction.committed" });
|
||||
rows[1].signature = "0".repeat(64); // tamper
|
||||
const result = await verifyChain("p-1");
|
||||
expect(result.ok).toBe(false);
|
||||
if (!result.ok) {
|
||||
expect(result.brokenAt).toBe(1);
|
||||
}
|
||||
});
|
||||
|
||||
it("verifyChain detects broken prev_hash link", async () => {
|
||||
await publish({ planId: "p-1", type: "transaction.created" });
|
||||
await publish({ planId: "p-1", type: "transaction.committed" });
|
||||
rows[1].prev_hash = "0".repeat(64);
|
||||
const result = await verifyChain("p-1");
|
||||
expect(result.ok).toBe(false);
|
||||
if (!result.ok) {
|
||||
expect(result.reason).toBe("prev_hash mismatch");
|
||||
}
|
||||
});
|
||||
|
||||
it("getEventsForPlan returns events in chronological order", async () => {
|
||||
await publish({ planId: "p-1", type: "transaction.created" });
|
||||
await publish({ planId: "p-1", type: "transaction.prepared" });
|
||||
const events = await getEventsForPlan("p-1");
|
||||
expect(events.map((e) => e.type)).toEqual([
|
||||
"transaction.created",
|
||||
"transaction.prepared",
|
||||
]);
|
||||
});
|
||||
});
|
||||
Reference in New Issue
Block a user