PR J: Redis-backed Event Bus broker (feature-flagged) #14

Open
nsatoshi wants to merge 1 commits from devin/1776881249-pr-j-redis-event-bus into main
3 changed files with 233 additions and 3 deletions

View File

@@ -13,15 +13,19 @@
* 4. Callers can subscribe to live events via `subscribe(planId, cb)` —
* backed by a process-local EventEmitter that the SSE route consumes.
*
* When the orchestrator scales to >1 replicas, the in-process emitter
* must be replaced by a broker (NATS / Kafka). The persistence layer
* and signature chain remain unchanged.
* When the orchestrator scales to >1 replicas, cross-process fan-out
* is bridged by `eventBusBroker.ts`: it publishes every persisted
* record on a Redis pub/sub channel and re-emits inbound records on
* the in-process emitter. Set `REDIS_URL` to enable; without it the
* broker is inert and the bus stays strictly process-local. The
* persistence layer and signature chain are unchanged in both modes.
*/
import { createHash, createHmac } from "crypto";
import { EventEmitter } from "events";
import { query } from "../db/postgres";
import { logger } from "../logging/logger";
import { getBroker } from "./eventBusBroker";
/**
* Normalised event types — arch §7.2. Keep this list as the single
@@ -136,6 +140,10 @@ export async function publish(input: PublishInput): Promise<EventRecord> {
emitter.emit(`plan:${record.plan_id}`, record);
emitter.emit("plan:*", record);
// Fan out to other replicas over the broker (no-op when REDIS_URL
// is unset — preserves single-process behaviour byte-for-byte).
void getBroker(emitter).publish(record);
return record;
}

View File

@@ -0,0 +1,158 @@
/**
* Cross-replica event broker for the Event Bus (arch §5.5, gap v2 §7.3).
*
* When the orchestrator runs as a single process, the in-process
* `EventEmitter` inside `eventBus.ts` is sufficient. When it scales to
* >1 replicas (k8s/ manifests expect this), SSE consumers connected to
* replica A would never see events originated on replica B. This module
* bridges replicas over a Redis pub/sub channel while leaving the
* signature chain, persistence, and public API untouched.
*
* Contract
* --------
* - Publish side: the caller publishes a fully-persisted `EventRecord`
* (via `eventBus.publish`). That record is ALSO handed to
* `broker.publish(record)` which JSON-encodes it and `PUBLISH`es it
* on `EVENT_BUS_REDIS_CHANNEL` (default `ccombo:events`).
* - Subscribe side: on startup, the broker creates a dedicated subscriber
* client, subscribes to the same channel, and re-emits every incoming
* record on the in-process `EventEmitter` so local subscribers
* (SSE routes, verifyChain callers) see remote events as if they were
* local.
* - Echo suppression: each process carries a random `instanceId`.
* Outbound messages include it; inbound messages whose `instanceId`
* matches are dropped (the local emitter already saw them).
*
* Feature flag: the broker is **inert** (no-op publish, no subscribe
* client) when `REDIS_URL` is unset. Existing single-process behaviour
* is preserved byte-for-byte.
*/
import { randomUUID } from "crypto";
import type { EventEmitter } from "events";
import Redis, { type Redis as RedisClient } from "ioredis";
import { logger } from "../logging/logger";
import type { EventRecord } from "./eventBus";
const DEFAULT_CHANNEL = "ccombo:events";
interface Envelope {
instanceId: string;
record: EventRecord;
}
export interface EventBusBroker {
readonly enabled: boolean;
readonly instanceId: string;
/** Publish a locally-persisted record to other replicas. */
publish(record: EventRecord): Promise<void>;
/** Close the broker. Safe to call multiple times. */
close(): Promise<void>;
}
/**
* Singleton broker — one per process. Created lazily on first access
* so tests that stub `REDIS_URL` can still exercise both branches.
*/
let current: EventBusBroker | null = null;
export function getBroker(emitter: EventEmitter): EventBusBroker {
if (current) return current;
current = createBroker(emitter);
return current;
}
/** Test-only: reset the singleton between tests. */
export function __resetBrokerForTests(): void {
if (current) {
void current.close();
}
current = null;
}
function createBroker(emitter: EventEmitter): EventBusBroker {
const instanceId = randomUUID();
const url = process.env.REDIS_URL;
if (!url) {
logger.info(
{ instanceId },
"[EventBusBroker] REDIS_URL unset; cross-replica fan-out disabled",
);
return {
enabled: false,
instanceId,
publish: async () => {},
close: async () => {},
};
}
const channel = process.env.EVENT_BUS_REDIS_CHANNEL ?? DEFAULT_CHANNEL;
const publisher: RedisClient = new Redis(url, { lazyConnect: true });
const subscriber: RedisClient = new Redis(url, { lazyConnect: true });
const wire = async () => {
try {
await publisher.connect();
await subscriber.connect();
await subscriber.subscribe(channel);
logger.info(
{ instanceId, channel },
"[EventBusBroker] subscribed to Redis channel",
);
} catch (err) {
logger.error(
{ err, instanceId, channel },
"[EventBusBroker] failed to connect to Redis; falling back to local-only",
);
}
};
subscriber.on("message", (ch, raw) => {
if (ch !== channel) return;
let env: Envelope;
try {
env = JSON.parse(raw) as Envelope;
} catch (err) {
logger.warn({ err, ch }, "[EventBusBroker] malformed envelope; dropped");
return;
}
if (env.instanceId === instanceId) {
// Echo of our own publish — local emitter already saw it.
return;
}
emitter.emit(`plan:${env.record.plan_id}`, env.record);
emitter.emit("plan:*", env.record);
});
void wire();
return {
enabled: true,
instanceId,
publish: async (record) => {
const envelope: Envelope = { instanceId, record };
try {
await publisher.publish(channel, JSON.stringify(envelope));
} catch (err) {
logger.error(
{ err, instanceId, channel, eventId: record.id },
"[EventBusBroker] publish failed; record persisted locally only",
);
}
},
close: async () => {
try {
await subscriber.quit();
} catch {
/* already closed */
}
try {
await publisher.quit();
} catch {
/* already closed */
}
},
};
}

View File

@@ -0,0 +1,64 @@
/**
* Tests for the Redis-backed cross-replica broker (gap v2 §7.3 / §10.3).
*
* These tests exercise the broker's feature-flag semantics without
* spinning up a real Redis: when `REDIS_URL` is unset the broker must
* be strictly inert. Live Redis behaviour is covered in the integration
* suite (PR Q).
*/
import { EventEmitter } from "events";
import { getBroker, __resetBrokerForTests } from "../../src/services/eventBusBroker";
import type { EventRecord } from "../../src/services/eventBus";
describe("eventBusBroker (feature flag)", () => {
const savedEnv = { ...process.env };
beforeEach(() => {
process.env = { ...savedEnv };
__resetBrokerForTests();
});
afterEach(() => {
__resetBrokerForTests();
process.env = { ...savedEnv };
});
it("is inert when REDIS_URL is unset", async () => {
delete process.env.REDIS_URL;
const emitter = new EventEmitter();
const broker = getBroker(emitter);
expect(broker.enabled).toBe(false);
expect(typeof broker.instanceId).toBe("string");
const rec: EventRecord = {
id: "00000000-0000-0000-0000-000000000001",
plan_id: "11111111-1111-1111-1111-111111111111",
type: "transaction.created",
actor: null,
payload: {},
payload_hash: "h",
prev_hash: null,
signature: "s",
created_at: new Date().toISOString(),
};
await expect(broker.publish(rec)).resolves.toBeUndefined();
await expect(broker.close()).resolves.toBeUndefined();
});
it("returns a stable singleton within a process", () => {
delete process.env.REDIS_URL;
const emitter = new EventEmitter();
const a = getBroker(emitter);
const b = getBroker(emitter);
expect(a).toBe(b);
});
it("generates a unique instanceId per process", () => {
delete process.env.REDIS_URL;
const emitter = new EventEmitter();
const a = getBroker(emitter).instanceId;
__resetBrokerForTests();
const b = getBroker(emitter).instanceId;
expect(a).not.toEqual(b);
});
});