PR D: typed + signed event bus + events table + SSE (arch step 5) #8

Merged
nsatoshi merged 1 commits from devin/1776875718-event-bus-sse into main 2026-04-22 17:17:41 +00:00
6 changed files with 478 additions and 1 deletions

View File

@@ -5,6 +5,11 @@ import { validatePlan, checkStepDependencies } from "../services/planValidation"
import { storePlan, getPlanById, updatePlanSignature, listPlans } from "../db/plans";
import { asyncHandler, AppError, ErrorType } from "../services/errorHandler";
import { getTransactionState, getTransitionHistory } from "../services/stateMachine";
import {
getEventsForPlan,
subscribe as subscribeToEvents,
verifyChain,
} from "../services/eventBus";
import type { Plan, PlanStep } from "../types/plan";
/**
@@ -220,3 +225,82 @@ export const getPlanState = asyncHandler(async (req: Request, res: Response) =>
});
});
/**
* GET /api/plans/:planId/events
* Return the full signed + hash-chained event trail for a plan
* (arch §4.5 State Registry + §7 Event Model + §14 Audit).
*
* Query `?verify=1` re-verifies the chain server-side and adds
* { chain_valid: true|false, broken_at?: n } to the response.
*/
export const getPlanEvents = asyncHandler(async (req: Request, res: Response) => {
const { planId } = req.params;
const plan = await getPlanById(planId);
if (!plan) {
throw new AppError(ErrorType.NOT_FOUND_ERROR, 404, "Plan not found");
}
const events = await getEventsForPlan(planId);
const body: {
plan_id: string;
count: number;
events: typeof events;
chain_valid?: boolean;
broken_at?: number;
broken_reason?: string;
} = { plan_id: planId, count: events.length, events };
if (req.query.verify === "1") {
const v = await verifyChain(planId);
body.chain_valid = v.ok;
if (!v.ok) {
body.broken_at = v.brokenAt;
body.broken_reason = v.reason;
}
}
res.json(body);
});
/**
* GET /api/plans/:planId/events/stream
* Server-sent-events stream of live events for a single plan.
*/
export const streamPlanEvents = asyncHandler(async (req: Request, res: Response) => {
const { planId } = req.params;
const plan = await getPlanById(planId);
if (!plan) {
throw new AppError(ErrorType.NOT_FOUND_ERROR, 404, "Plan not found");
}
res.setHeader("Content-Type", "text/event-stream");
res.setHeader("Cache-Control", "no-cache, no-transform");
res.setHeader("Connection", "keep-alive");
res.setHeader("X-Accel-Buffering", "no");
res.flushHeaders?.();
// Replay the history on connect so clients can reconstruct state
// without a separate REST call.
const history = await getEventsForPlan(planId);
for (const e of history) {
res.write(`id: ${e.id}\nevent: ${e.type}\ndata: ${JSON.stringify(e)}\n\n`);
}
const unsubscribe = subscribeToEvents(planId, (record) => {
res.write(
`id: ${record.id}\nevent: ${record.type}\ndata: ${JSON.stringify(record)}\n\n`,
);
});
const keepAlive = setInterval(() => {
res.write(": keep-alive\n\n");
}, 15_000);
req.on("close", () => {
clearInterval(keepAlive);
unsubscribe();
res.end();
});
});

View File

@@ -0,0 +1,43 @@
import { query } from "../postgres";
/**
* Migration 003 — append-only events journal (arch §4.5, §5.5, §7).
*
* The `events` table is the system-of-record for normalised workflow
* events (arch §7.2: `transaction.created`, `instrument.ready`,
* `payment.settled`, `transaction.committed`, …). It is:
*
* - append-only (no UPDATE / DELETE)
* - signed (HMAC of (plan_id, type, payload_hash, prev_hash))
* - hash-chained via prev_hash for tamper-evident forensic replay
* - indexed by plan_id so the SSE endpoint can stream efficiently
*/
export async function up() {
await query(
`CREATE TABLE IF NOT EXISTS events (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
plan_id UUID NOT NULL REFERENCES plans(plan_id) ON DELETE CASCADE,
type VARCHAR(128) NOT NULL,
actor VARCHAR(255),
payload JSONB NOT NULL DEFAULT '{}'::jsonb,
payload_hash CHAR(64) NOT NULL,
prev_hash CHAR(64),
signature CHAR(64) NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP
)`,
);
await query(
`CREATE INDEX IF NOT EXISTS idx_events_plan_id_created
ON events(plan_id, created_at)`,
);
await query(
`CREATE INDEX IF NOT EXISTS idx_events_type
ON events(type)`,
);
}
export async function down() {
await query("DROP TABLE IF EXISTS events CASCADE");
}

View File

@@ -1,5 +1,6 @@
import { up as up001 } from "./001_initial_schema";
import { up as up002 } from "./002_transaction_state";
import { up as up003 } from "./003_events";
/**
* Run all migrations
@@ -8,6 +9,7 @@ export async function runMigration() {
try {
await up001();
await up002();
await up003();
console.log("All migrations completed");
} catch (error) {
console.error("Migration failed:", error);

View File

@@ -14,7 +14,7 @@ import { requestTimeout } from "./middleware/timeout";
import { logger } from "./logging/logger";
import { getMetrics, httpRequestDuration, httpRequestTotal, register } from "./metrics/prometheus";
import { healthCheck, readinessCheck, livenessCheck } from "./health/health";
import { listPlansEndpoint, createPlan, getPlan, getPlanState, addSignature, validatePlanEndpoint } from "./api/plans";
import { listPlansEndpoint, createPlan, getPlan, getPlanState, getPlanEvents, streamPlanEvents, addSignature, validatePlanEndpoint } from "./api/plans";
import { streamPlanStatus } from "./api/sse";
import { executionCoordinator } from "./services/execution";
import { runMigration } from "./db/migrations";
@@ -89,6 +89,8 @@ app.get("/api/plans", listPlansEndpoint);
app.post("/api/plans", auditLog("CREATE_PLAN", "plan"), createPlan);
app.get("/api/plans/:planId", getPlan);
app.get("/api/plans/:planId/state", getPlanState);
app.get("/api/plans/:planId/events", getPlanEvents);
app.get("/api/plans/:planId/events/stream", streamPlanEvents);
app.post("/api/plans/:planId/signature", addSignature);
app.post("/api/plans/:planId/validate", validatePlanEndpoint);

View File

@@ -0,0 +1,197 @@
/**
* Typed, signed, append-only Event Bus (arch §5.5 Event Bus + §7).
*
* Architecture contract
* ---------------------
* 1. Every event is a normalised category from arch §7.2 — `EventType`.
* 2. Every event is persisted to the `events` append-only table.
* 3. Every event carries
* payload_hash = sha256(JSON.stringify(payload))
* prev_hash = signature of the previous event for the same plan
* signature = hmac_sha256(secret, plan_id|type|payload_hash|prev_hash)
* which gives a tamper-evident per-plan hash chain (arch §14 audit).
* 4. Callers can subscribe to live events via `subscribe(planId, cb)` —
* backed by a process-local EventEmitter that the SSE route consumes.
*
* When the orchestrator scales to >1 replicas, the in-process emitter
* must be replaced by a broker (NATS / Kafka). The persistence layer
* and signature chain remain unchanged.
*/
import { createHash, createHmac } from "crypto";
import { EventEmitter } from "events";
import { query } from "../db/postgres";
import { logger } from "../logging/logger";
/**
* Normalised event types — arch §7.2. Keep this list as the single
* source of truth so subscribers can exhaustively match on it.
*/
export const EVENT_TYPES = [
"transaction.created",
"participants.authorized",
"preconditions.satisfied",
"instrument.ready",
"payment.ready",
"transaction.prepared",
"instrument.dispatched",
"payment.dispatched",
"instrument.acknowledged",
"payment.accepted",
"payment.settled",
"transaction.validated",
"transaction.committed",
"transaction.aborted",
"transaction.unwind_initiated",
] as const;
export type EventType = (typeof EVENT_TYPES)[number];
export interface EventRecord {
id: string;
plan_id: string;
type: EventType;
actor: string | null;
payload: Record<string, unknown>;
payload_hash: string;
prev_hash: string | null;
signature: string;
created_at: string;
}
export interface PublishInput {
planId: string;
type: EventType;
actor?: string;
payload?: Record<string, unknown>;
}
const emitter = new EventEmitter();
emitter.setMaxListeners(0);
function getSigningSecret(): string {
return (
process.env.EVENT_BUS_HMAC_SECRET ??
process.env.SESSION_SECRET ??
"dev-event-bus-secret-change-in-production"
);
}
function sha256(input: string): string {
return createHash("sha256").update(input).digest("hex");
}
function sign(
planId: string,
type: string,
payloadHash: string,
prevHash: string | null,
): string {
const h = createHmac("sha256", getSigningSecret());
h.update(`${planId}|${type}|${payloadHash}|${prevHash ?? ""}`);
return h.digest("hex");
}
/**
* Publish a typed, signed, hash-chained event for a plan. Returns the
* persisted record (including id + signature) so callers can reference
* it from transition `source_event_id`.
*/
export async function publish(input: PublishInput): Promise<EventRecord> {
const payload = input.payload ?? {};
const payloadHash = sha256(JSON.stringify(payload));
const prev = await query<{ signature: string }>(
`SELECT signature
FROM events
WHERE plan_id = $1
ORDER BY created_at DESC, id DESC
LIMIT 1`,
[input.planId],
);
const prevHash = prev.length > 0 ? prev[0].signature : null;
const signature = sign(input.planId, input.type, payloadHash, prevHash);
const rows = await query<EventRecord>(
`INSERT INTO events (plan_id, type, actor, payload, payload_hash, prev_hash, signature)
VALUES ($1, $2, $3, $4::jsonb, $5, $6, $7)
RETURNING id, plan_id, type, actor, payload, payload_hash, prev_hash, signature, created_at`,
[
input.planId,
input.type,
input.actor ?? null,
JSON.stringify(payload),
payloadHash,
prevHash,
signature,
],
);
const record = rows[0];
logger.info(
{ planId: record.plan_id, type: record.type, eventId: record.id },
"[EventBus] published",
);
emitter.emit(`plan:${record.plan_id}`, record);
emitter.emit("plan:*", record);
return record;
}
/**
* Read the full event trail for a plan in chronological order.
*/
export async function getEventsForPlan(planId: string): Promise<EventRecord[]> {
return query<EventRecord>(
`SELECT id, plan_id, type, actor, payload, payload_hash, prev_hash, signature, created_at
FROM events
WHERE plan_id = $1
ORDER BY created_at ASC, id ASC`,
[planId],
);
}
/**
* Verify the full hash chain for a plan's events. Returns `{ ok: true }`
* when every signature matches and `prev_hash` forms a contiguous chain;
* otherwise returns the first index that fails with a reason.
*/
export async function verifyChain(planId: string): Promise<
{ ok: true } | { ok: false; brokenAt: number; reason: string }
> {
const events = await getEventsForPlan(planId);
let prevSig: string | null = null;
for (let i = 0; i < events.length; i++) {
const e = events[i];
if (e.prev_hash !== prevSig) {
return { ok: false, brokenAt: i, reason: "prev_hash mismatch" };
}
const expectedPayloadHash = sha256(JSON.stringify(e.payload));
if (expectedPayloadHash !== e.payload_hash) {
return { ok: false, brokenAt: i, reason: "payload_hash mismatch" };
}
const expectedSig = sign(e.plan_id, e.type, e.payload_hash, e.prev_hash);
if (expectedSig !== e.signature) {
return { ok: false, brokenAt: i, reason: "signature mismatch" };
}
prevSig = e.signature;
}
return { ok: true };
}
/**
* Subscribe to live events for a single plan. Returns an unsubscribe
* function. Used by the SSE route.
*/
export function subscribe(
planId: string,
callback: (record: EventRecord) => void,
): () => void {
const channel = `plan:${planId}`;
emitter.on(channel, callback);
return () => emitter.off(channel, callback);
}
/** test-only emitter access, never import in prod code */
export const __emitterForTests = emitter;

View File

@@ -0,0 +1,149 @@
import { describe, it, expect, beforeEach, jest } from "@jest/globals";
type Row = {
id: string;
plan_id: string;
type: string;
actor: string | null;
payload: Record<string, unknown>;
payload_hash: string;
prev_hash: string | null;
signature: string;
created_at: string;
};
const rows: Row[] = [];
let idSeq = 0;
jest.mock("../../src/db/postgres", () => ({
query: async (sql: string, params: unknown[] = []) => {
if (sql.startsWith("SELECT signature")) {
const planId = params[0] as string;
const matches = rows.filter((r) => r.plan_id === planId);
if (matches.length === 0) return [];
return [{ signature: matches[matches.length - 1].signature }];
}
if (sql.startsWith("INSERT INTO events")) {
const [plan_id, type, actor, payloadJson, payload_hash, prev_hash, signature] =
params as [string, string, string | null, string, string, string | null, string];
const rec: Row = {
id: `evt-${++idSeq}`,
plan_id,
type,
actor,
payload: JSON.parse(payloadJson),
payload_hash,
prev_hash,
signature,
created_at: new Date(Date.now() + idSeq).toISOString(),
};
rows.push(rec);
return [rec];
}
if (sql.startsWith("SELECT id, plan_id")) {
const planId = params[0] as string;
return rows.filter((r) => r.plan_id === planId);
}
return [];
},
}));
import { publish, getEventsForPlan, verifyChain, EVENT_TYPES } from "../../src/services/eventBus";
describe("Event Bus", () => {
beforeEach(() => {
rows.length = 0;
idSeq = 0;
});
it("EVENT_TYPES covers all arch §7.2 categories", () => {
expect(EVENT_TYPES).toContain("transaction.created");
expect(EVENT_TYPES).toContain("transaction.committed");
expect(EVENT_TYPES).toContain("transaction.aborted");
expect(EVENT_TYPES).toContain("payment.settled");
expect(EVENT_TYPES).toContain("instrument.dispatched");
expect(EVENT_TYPES.length).toBe(15);
});
it("publish persists with payload_hash, prev_hash=null, and signature", async () => {
const rec = await publish({
planId: "p-1",
type: "transaction.created",
actor: "coordinator",
payload: { foo: 1 },
});
expect(rec.id).toMatch(/evt-/);
expect(rec.prev_hash).toBeNull();
expect(rec.payload_hash).toMatch(/^[0-9a-f]{64}$/);
expect(rec.signature).toMatch(/^[0-9a-f]{64}$/);
expect(rec.payload).toEqual({ foo: 1 });
});
it("prev_hash chains consecutive events for the same plan", async () => {
const a = await publish({ planId: "p-1", type: "transaction.created" });
const b = await publish({ planId: "p-1", type: "participants.authorized" });
const c = await publish({ planId: "p-1", type: "preconditions.satisfied" });
expect(a.prev_hash).toBeNull();
expect(b.prev_hash).toBe(a.signature);
expect(c.prev_hash).toBe(b.signature);
});
it("events are isolated per plan_id", async () => {
const a1 = await publish({ planId: "p-1", type: "transaction.created" });
const b1 = await publish({ planId: "p-2", type: "transaction.created" });
expect(a1.prev_hash).toBeNull();
expect(b1.prev_hash).toBeNull();
});
it("verifyChain returns ok for an untampered chain", async () => {
await publish({ planId: "p-1", type: "transaction.created" });
await publish({ planId: "p-1", type: "transaction.prepared" });
await publish({ planId: "p-1", type: "transaction.committed" });
const result = await verifyChain("p-1");
expect(result.ok).toBe(true);
});
it("verifyChain detects payload tampering", async () => {
await publish({ planId: "p-1", type: "transaction.created", payload: { amount: 100 } });
await publish({ planId: "p-1", type: "transaction.committed" });
rows[0].payload = { amount: 999_999 }; // tamper
const result = await verifyChain("p-1");
expect(result.ok).toBe(false);
if (!result.ok) {
expect(result.brokenAt).toBe(0);
expect(result.reason).toBe("payload_hash mismatch");
}
});
it("verifyChain detects signature tampering", async () => {
await publish({ planId: "p-1", type: "transaction.created" });
await publish({ planId: "p-1", type: "transaction.committed" });
rows[1].signature = "0".repeat(64); // tamper
const result = await verifyChain("p-1");
expect(result.ok).toBe(false);
if (!result.ok) {
expect(result.brokenAt).toBe(1);
}
});
it("verifyChain detects broken prev_hash link", async () => {
await publish({ planId: "p-1", type: "transaction.created" });
await publish({ planId: "p-1", type: "transaction.committed" });
rows[1].prev_hash = "0".repeat(64);
const result = await verifyChain("p-1");
expect(result.ok).toBe(false);
if (!result.ok) {
expect(result.reason).toBe("prev_hash mismatch");
}
});
it("getEventsForPlan returns events in chronological order", async () => {
await publish({ planId: "p-1", type: "transaction.created" });
await publish({ planId: "p-1", type: "transaction.prepared" });
const events = await getEventsForPlan("p-1");
expect(events.map((e) => e.type)).toEqual([
"transaction.created",
"transaction.prepared",
]);
});
});