PR J: Redis-backed Event Bus broker (feature-flagged) #14
@@ -13,15 +13,19 @@
|
||||
* 4. Callers can subscribe to live events via `subscribe(planId, cb)` —
|
||||
* backed by a process-local EventEmitter that the SSE route consumes.
|
||||
*
|
||||
* When the orchestrator scales to >1 replicas, the in-process emitter
|
||||
* must be replaced by a broker (NATS / Kafka). The persistence layer
|
||||
* and signature chain remain unchanged.
|
||||
* When the orchestrator scales to >1 replicas, cross-process fan-out
|
||||
* is bridged by `eventBusBroker.ts`: it publishes every persisted
|
||||
* record on a Redis pub/sub channel and re-emits inbound records on
|
||||
* the in-process emitter. Set `REDIS_URL` to enable; without it the
|
||||
* broker is inert and the bus stays strictly process-local. The
|
||||
* persistence layer and signature chain are unchanged in both modes.
|
||||
*/
|
||||
|
||||
import { createHash, createHmac } from "crypto";
|
||||
import { EventEmitter } from "events";
|
||||
import { query } from "../db/postgres";
|
||||
import { logger } from "../logging/logger";
|
||||
import { getBroker } from "./eventBusBroker";
|
||||
|
||||
/**
|
||||
* Normalised event types — arch §7.2. Keep this list as the single
|
||||
@@ -136,6 +140,10 @@ export async function publish(input: PublishInput): Promise<EventRecord> {
|
||||
emitter.emit(`plan:${record.plan_id}`, record);
|
||||
emitter.emit("plan:*", record);
|
||||
|
||||
// Fan out to other replicas over the broker (no-op when REDIS_URL
|
||||
// is unset — preserves single-process behaviour byte-for-byte).
|
||||
void getBroker(emitter).publish(record);
|
||||
|
||||
return record;
|
||||
}
|
||||
|
||||
|
||||
158
orchestrator/src/services/eventBusBroker.ts
Normal file
158
orchestrator/src/services/eventBusBroker.ts
Normal file
@@ -0,0 +1,158 @@
|
||||
/**
|
||||
* Cross-replica event broker for the Event Bus (arch §5.5, gap v2 §7.3).
|
||||
*
|
||||
* When the orchestrator runs as a single process, the in-process
|
||||
* `EventEmitter` inside `eventBus.ts` is sufficient. When it scales to
|
||||
* >1 replicas (k8s/ manifests expect this), SSE consumers connected to
|
||||
* replica A would never see events originated on replica B. This module
|
||||
* bridges replicas over a Redis pub/sub channel while leaving the
|
||||
* signature chain, persistence, and public API untouched.
|
||||
*
|
||||
* Contract
|
||||
* --------
|
||||
* - Publish side: the caller publishes a fully-persisted `EventRecord`
|
||||
* (via `eventBus.publish`). That record is ALSO handed to
|
||||
* `broker.publish(record)` which JSON-encodes it and `PUBLISH`es it
|
||||
* on `EVENT_BUS_REDIS_CHANNEL` (default `ccombo:events`).
|
||||
* - Subscribe side: on startup, the broker creates a dedicated subscriber
|
||||
* client, subscribes to the same channel, and re-emits every incoming
|
||||
* record on the in-process `EventEmitter` so local subscribers
|
||||
* (SSE routes, verifyChain callers) see remote events as if they were
|
||||
* local.
|
||||
* - Echo suppression: each process carries a random `instanceId`.
|
||||
* Outbound messages include it; inbound messages whose `instanceId`
|
||||
* matches are dropped (the local emitter already saw them).
|
||||
*
|
||||
* Feature flag: the broker is **inert** (no-op publish, no subscribe
|
||||
* client) when `REDIS_URL` is unset. Existing single-process behaviour
|
||||
* is preserved byte-for-byte.
|
||||
*/
|
||||
|
||||
import { randomUUID } from "crypto";
|
||||
import type { EventEmitter } from "events";
|
||||
import Redis, { type Redis as RedisClient } from "ioredis";
|
||||
import { logger } from "../logging/logger";
|
||||
import type { EventRecord } from "./eventBus";
|
||||
|
||||
const DEFAULT_CHANNEL = "ccombo:events";
|
||||
|
||||
interface Envelope {
|
||||
instanceId: string;
|
||||
record: EventRecord;
|
||||
}
|
||||
|
||||
export interface EventBusBroker {
|
||||
readonly enabled: boolean;
|
||||
readonly instanceId: string;
|
||||
/** Publish a locally-persisted record to other replicas. */
|
||||
publish(record: EventRecord): Promise<void>;
|
||||
/** Close the broker. Safe to call multiple times. */
|
||||
close(): Promise<void>;
|
||||
}
|
||||
|
||||
/**
|
||||
* Singleton broker — one per process. Created lazily on first access
|
||||
* so tests that stub `REDIS_URL` can still exercise both branches.
|
||||
*/
|
||||
let current: EventBusBroker | null = null;
|
||||
|
||||
export function getBroker(emitter: EventEmitter): EventBusBroker {
|
||||
if (current) return current;
|
||||
current = createBroker(emitter);
|
||||
return current;
|
||||
}
|
||||
|
||||
/** Test-only: reset the singleton between tests. */
|
||||
export function __resetBrokerForTests(): void {
|
||||
if (current) {
|
||||
void current.close();
|
||||
}
|
||||
current = null;
|
||||
}
|
||||
|
||||
function createBroker(emitter: EventEmitter): EventBusBroker {
|
||||
const instanceId = randomUUID();
|
||||
const url = process.env.REDIS_URL;
|
||||
|
||||
if (!url) {
|
||||
logger.info(
|
||||
{ instanceId },
|
||||
"[EventBusBroker] REDIS_URL unset; cross-replica fan-out disabled",
|
||||
);
|
||||
return {
|
||||
enabled: false,
|
||||
instanceId,
|
||||
publish: async () => {},
|
||||
close: async () => {},
|
||||
};
|
||||
}
|
||||
|
||||
const channel = process.env.EVENT_BUS_REDIS_CHANNEL ?? DEFAULT_CHANNEL;
|
||||
|
||||
const publisher: RedisClient = new Redis(url, { lazyConnect: true });
|
||||
const subscriber: RedisClient = new Redis(url, { lazyConnect: true });
|
||||
|
||||
const wire = async () => {
|
||||
try {
|
||||
await publisher.connect();
|
||||
await subscriber.connect();
|
||||
await subscriber.subscribe(channel);
|
||||
logger.info(
|
||||
{ instanceId, channel },
|
||||
"[EventBusBroker] subscribed to Redis channel",
|
||||
);
|
||||
} catch (err) {
|
||||
logger.error(
|
||||
{ err, instanceId, channel },
|
||||
"[EventBusBroker] failed to connect to Redis; falling back to local-only",
|
||||
);
|
||||
}
|
||||
};
|
||||
|
||||
subscriber.on("message", (ch, raw) => {
|
||||
if (ch !== channel) return;
|
||||
let env: Envelope;
|
||||
try {
|
||||
env = JSON.parse(raw) as Envelope;
|
||||
} catch (err) {
|
||||
logger.warn({ err, ch }, "[EventBusBroker] malformed envelope; dropped");
|
||||
return;
|
||||
}
|
||||
if (env.instanceId === instanceId) {
|
||||
// Echo of our own publish — local emitter already saw it.
|
||||
return;
|
||||
}
|
||||
emitter.emit(`plan:${env.record.plan_id}`, env.record);
|
||||
emitter.emit("plan:*", env.record);
|
||||
});
|
||||
|
||||
void wire();
|
||||
|
||||
return {
|
||||
enabled: true,
|
||||
instanceId,
|
||||
publish: async (record) => {
|
||||
const envelope: Envelope = { instanceId, record };
|
||||
try {
|
||||
await publisher.publish(channel, JSON.stringify(envelope));
|
||||
} catch (err) {
|
||||
logger.error(
|
||||
{ err, instanceId, channel, eventId: record.id },
|
||||
"[EventBusBroker] publish failed; record persisted locally only",
|
||||
);
|
||||
}
|
||||
},
|
||||
close: async () => {
|
||||
try {
|
||||
await subscriber.quit();
|
||||
} catch {
|
||||
/* already closed */
|
||||
}
|
||||
try {
|
||||
await publisher.quit();
|
||||
} catch {
|
||||
/* already closed */
|
||||
}
|
||||
},
|
||||
};
|
||||
}
|
||||
64
orchestrator/tests/unit/eventBusBroker.test.ts
Normal file
64
orchestrator/tests/unit/eventBusBroker.test.ts
Normal file
@@ -0,0 +1,64 @@
|
||||
/**
|
||||
* Tests for the Redis-backed cross-replica broker (gap v2 §7.3 / §10.3).
|
||||
*
|
||||
* These tests exercise the broker's feature-flag semantics without
|
||||
* spinning up a real Redis: when `REDIS_URL` is unset the broker must
|
||||
* be strictly inert. Live Redis behaviour is covered in the integration
|
||||
* suite (PR Q).
|
||||
*/
|
||||
|
||||
import { EventEmitter } from "events";
|
||||
import { getBroker, __resetBrokerForTests } from "../../src/services/eventBusBroker";
|
||||
import type { EventRecord } from "../../src/services/eventBus";
|
||||
|
||||
describe("eventBusBroker (feature flag)", () => {
|
||||
const savedEnv = { ...process.env };
|
||||
|
||||
beforeEach(() => {
|
||||
process.env = { ...savedEnv };
|
||||
__resetBrokerForTests();
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
__resetBrokerForTests();
|
||||
process.env = { ...savedEnv };
|
||||
});
|
||||
|
||||
it("is inert when REDIS_URL is unset", async () => {
|
||||
delete process.env.REDIS_URL;
|
||||
const emitter = new EventEmitter();
|
||||
const broker = getBroker(emitter);
|
||||
expect(broker.enabled).toBe(false);
|
||||
expect(typeof broker.instanceId).toBe("string");
|
||||
const rec: EventRecord = {
|
||||
id: "00000000-0000-0000-0000-000000000001",
|
||||
plan_id: "11111111-1111-1111-1111-111111111111",
|
||||
type: "transaction.created",
|
||||
actor: null,
|
||||
payload: {},
|
||||
payload_hash: "h",
|
||||
prev_hash: null,
|
||||
signature: "s",
|
||||
created_at: new Date().toISOString(),
|
||||
};
|
||||
await expect(broker.publish(rec)).resolves.toBeUndefined();
|
||||
await expect(broker.close()).resolves.toBeUndefined();
|
||||
});
|
||||
|
||||
it("returns a stable singleton within a process", () => {
|
||||
delete process.env.REDIS_URL;
|
||||
const emitter = new EventEmitter();
|
||||
const a = getBroker(emitter);
|
||||
const b = getBroker(emitter);
|
||||
expect(a).toBe(b);
|
||||
});
|
||||
|
||||
it("generates a unique instanceId per process", () => {
|
||||
delete process.env.REDIS_URL;
|
||||
const emitter = new EventEmitter();
|
||||
const a = getBroker(emitter).instanceId;
|
||||
__resetBrokerForTests();
|
||||
const b = getBroker(emitter).instanceId;
|
||||
expect(a).not.toEqual(b);
|
||||
});
|
||||
});
|
||||
Reference in New Issue
Block a user