From bf12f94250316cda6628a69bb5e7b943f859c357 Mon Sep 17 00:00:00 2001 From: Devin Date: Wed, 22 Apr 2026 18:09:02 +0000 Subject: [PATCH] Redis-backed Event Bus broker (feature-flagged via REDIS_URL) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Closes gap-analysis v2 §7.3 and §10.3. - New services/eventBusBroker.ts: per-process singleton with unique instanceId; subscribes to a Redis pub/sub channel (default ccombo:events) and re-emits inbound records on the in-process EventEmitter. Outbound envelopes carry the instanceId so the originating replica drops its own echo. - eventBus.publish() hands every persisted record to the broker after the local emit, so same-process SSE subscribers see zero change while remote-replica subscribers now receive every event. - When REDIS_URL is unset the broker is strictly inert (no connect, no publish) — single-process behaviour is preserved byte-for-byte. - 3 unit tests for feature-flag semantics; full suite 83/83 green. - Live Redis behaviour will be covered by the Testcontainers suite in PR Q. --- orchestrator/src/services/eventBus.ts | 14 +- orchestrator/src/services/eventBusBroker.ts | 158 ++++++++++++++++++ .../tests/unit/eventBusBroker.test.ts | 64 +++++++ 3 files changed, 233 insertions(+), 3 deletions(-) create mode 100644 orchestrator/src/services/eventBusBroker.ts create mode 100644 orchestrator/tests/unit/eventBusBroker.test.ts diff --git a/orchestrator/src/services/eventBus.ts b/orchestrator/src/services/eventBus.ts index 54a8fa7..e53f484 100644 --- a/orchestrator/src/services/eventBus.ts +++ b/orchestrator/src/services/eventBus.ts @@ -13,15 +13,19 @@ * 4. Callers can subscribe to live events via `subscribe(planId, cb)` — * backed by a process-local EventEmitter that the SSE route consumes. * - * When the orchestrator scales to >1 replicas, the in-process emitter - * must be replaced by a broker (NATS / Kafka). The persistence layer - * and signature chain remain unchanged. + * When the orchestrator scales to >1 replicas, cross-process fan-out + * is bridged by `eventBusBroker.ts`: it publishes every persisted + * record on a Redis pub/sub channel and re-emits inbound records on + * the in-process emitter. Set `REDIS_URL` to enable; without it the + * broker is inert and the bus stays strictly process-local. The + * persistence layer and signature chain are unchanged in both modes. */ import { createHash, createHmac } from "crypto"; import { EventEmitter } from "events"; import { query } from "../db/postgres"; import { logger } from "../logging/logger"; +import { getBroker } from "./eventBusBroker"; /** * Normalised event types — arch §7.2. Keep this list as the single @@ -136,6 +140,10 @@ export async function publish(input: PublishInput): Promise { emitter.emit(`plan:${record.plan_id}`, record); emitter.emit("plan:*", record); + // Fan out to other replicas over the broker (no-op when REDIS_URL + // is unset — preserves single-process behaviour byte-for-byte). + void getBroker(emitter).publish(record); + return record; } diff --git a/orchestrator/src/services/eventBusBroker.ts b/orchestrator/src/services/eventBusBroker.ts new file mode 100644 index 0000000..f0b2842 --- /dev/null +++ b/orchestrator/src/services/eventBusBroker.ts @@ -0,0 +1,158 @@ +/** + * Cross-replica event broker for the Event Bus (arch §5.5, gap v2 §7.3). + * + * When the orchestrator runs as a single process, the in-process + * `EventEmitter` inside `eventBus.ts` is sufficient. When it scales to + * >1 replicas (k8s/ manifests expect this), SSE consumers connected to + * replica A would never see events originated on replica B. This module + * bridges replicas over a Redis pub/sub channel while leaving the + * signature chain, persistence, and public API untouched. + * + * Contract + * -------- + * - Publish side: the caller publishes a fully-persisted `EventRecord` + * (via `eventBus.publish`). That record is ALSO handed to + * `broker.publish(record)` which JSON-encodes it and `PUBLISH`es it + * on `EVENT_BUS_REDIS_CHANNEL` (default `ccombo:events`). + * - Subscribe side: on startup, the broker creates a dedicated subscriber + * client, subscribes to the same channel, and re-emits every incoming + * record on the in-process `EventEmitter` so local subscribers + * (SSE routes, verifyChain callers) see remote events as if they were + * local. + * - Echo suppression: each process carries a random `instanceId`. + * Outbound messages include it; inbound messages whose `instanceId` + * matches are dropped (the local emitter already saw them). + * + * Feature flag: the broker is **inert** (no-op publish, no subscribe + * client) when `REDIS_URL` is unset. Existing single-process behaviour + * is preserved byte-for-byte. + */ + +import { randomUUID } from "crypto"; +import type { EventEmitter } from "events"; +import Redis, { type Redis as RedisClient } from "ioredis"; +import { logger } from "../logging/logger"; +import type { EventRecord } from "./eventBus"; + +const DEFAULT_CHANNEL = "ccombo:events"; + +interface Envelope { + instanceId: string; + record: EventRecord; +} + +export interface EventBusBroker { + readonly enabled: boolean; + readonly instanceId: string; + /** Publish a locally-persisted record to other replicas. */ + publish(record: EventRecord): Promise; + /** Close the broker. Safe to call multiple times. */ + close(): Promise; +} + +/** + * Singleton broker — one per process. Created lazily on first access + * so tests that stub `REDIS_URL` can still exercise both branches. + */ +let current: EventBusBroker | null = null; + +export function getBroker(emitter: EventEmitter): EventBusBroker { + if (current) return current; + current = createBroker(emitter); + return current; +} + +/** Test-only: reset the singleton between tests. */ +export function __resetBrokerForTests(): void { + if (current) { + void current.close(); + } + current = null; +} + +function createBroker(emitter: EventEmitter): EventBusBroker { + const instanceId = randomUUID(); + const url = process.env.REDIS_URL; + + if (!url) { + logger.info( + { instanceId }, + "[EventBusBroker] REDIS_URL unset; cross-replica fan-out disabled", + ); + return { + enabled: false, + instanceId, + publish: async () => {}, + close: async () => {}, + }; + } + + const channel = process.env.EVENT_BUS_REDIS_CHANNEL ?? DEFAULT_CHANNEL; + + const publisher: RedisClient = new Redis(url, { lazyConnect: true }); + const subscriber: RedisClient = new Redis(url, { lazyConnect: true }); + + const wire = async () => { + try { + await publisher.connect(); + await subscriber.connect(); + await subscriber.subscribe(channel); + logger.info( + { instanceId, channel }, + "[EventBusBroker] subscribed to Redis channel", + ); + } catch (err) { + logger.error( + { err, instanceId, channel }, + "[EventBusBroker] failed to connect to Redis; falling back to local-only", + ); + } + }; + + subscriber.on("message", (ch, raw) => { + if (ch !== channel) return; + let env: Envelope; + try { + env = JSON.parse(raw) as Envelope; + } catch (err) { + logger.warn({ err, ch }, "[EventBusBroker] malformed envelope; dropped"); + return; + } + if (env.instanceId === instanceId) { + // Echo of our own publish — local emitter already saw it. + return; + } + emitter.emit(`plan:${env.record.plan_id}`, env.record); + emitter.emit("plan:*", env.record); + }); + + void wire(); + + return { + enabled: true, + instanceId, + publish: async (record) => { + const envelope: Envelope = { instanceId, record }; + try { + await publisher.publish(channel, JSON.stringify(envelope)); + } catch (err) { + logger.error( + { err, instanceId, channel, eventId: record.id }, + "[EventBusBroker] publish failed; record persisted locally only", + ); + } + }, + close: async () => { + try { + await subscriber.quit(); + } catch { + /* already closed */ + } + try { + await publisher.quit(); + } catch { + /* already closed */ + } + }, + }; +} diff --git a/orchestrator/tests/unit/eventBusBroker.test.ts b/orchestrator/tests/unit/eventBusBroker.test.ts new file mode 100644 index 0000000..dc1ec75 --- /dev/null +++ b/orchestrator/tests/unit/eventBusBroker.test.ts @@ -0,0 +1,64 @@ +/** + * Tests for the Redis-backed cross-replica broker (gap v2 §7.3 / §10.3). + * + * These tests exercise the broker's feature-flag semantics without + * spinning up a real Redis: when `REDIS_URL` is unset the broker must + * be strictly inert. Live Redis behaviour is covered in the integration + * suite (PR Q). + */ + +import { EventEmitter } from "events"; +import { getBroker, __resetBrokerForTests } from "../../src/services/eventBusBroker"; +import type { EventRecord } from "../../src/services/eventBus"; + +describe("eventBusBroker (feature flag)", () => { + const savedEnv = { ...process.env }; + + beforeEach(() => { + process.env = { ...savedEnv }; + __resetBrokerForTests(); + }); + + afterEach(() => { + __resetBrokerForTests(); + process.env = { ...savedEnv }; + }); + + it("is inert when REDIS_URL is unset", async () => { + delete process.env.REDIS_URL; + const emitter = new EventEmitter(); + const broker = getBroker(emitter); + expect(broker.enabled).toBe(false); + expect(typeof broker.instanceId).toBe("string"); + const rec: EventRecord = { + id: "00000000-0000-0000-0000-000000000001", + plan_id: "11111111-1111-1111-1111-111111111111", + type: "transaction.created", + actor: null, + payload: {}, + payload_hash: "h", + prev_hash: null, + signature: "s", + created_at: new Date().toISOString(), + }; + await expect(broker.publish(rec)).resolves.toBeUndefined(); + await expect(broker.close()).resolves.toBeUndefined(); + }); + + it("returns a stable singleton within a process", () => { + delete process.env.REDIS_URL; + const emitter = new EventEmitter(); + const a = getBroker(emitter); + const b = getBroker(emitter); + expect(a).toBe(b); + }); + + it("generates a unique instanceId per process", () => { + delete process.env.REDIS_URL; + const emitter = new EventEmitter(); + const a = getBroker(emitter).instanceId; + __resetBrokerForTests(); + const b = getBroker(emitter).instanceId; + expect(a).not.toEqual(b); + }); +}); -- 2.34.1