Redis-backed Event Bus broker (feature-flagged via REDIS_URL)
Some checks failed
CI / Frontend Lint (pull_request) Failing after 7s
CI / Frontend Type Check (pull_request) Failing after 6s
CI / Frontend Build (pull_request) Failing after 7s
CI / Frontend E2E Tests (pull_request) Failing after 8s
CI / Orchestrator Build (pull_request) Failing after 6s
CI / Contracts Compile (pull_request) Failing after 7s
CI / Contracts Test (pull_request) Failing after 6s
Code Quality / SonarQube Analysis (pull_request) Failing after 19s
Code Quality / Code Quality Checks (pull_request) Failing after 4s
Security Scan / Dependency Vulnerability Scan (pull_request) Failing after 5s
Security Scan / OWASP ZAP Scan (pull_request) Failing after 3s
Some checks failed
CI / Frontend Lint (pull_request) Failing after 7s
CI / Frontend Type Check (pull_request) Failing after 6s
CI / Frontend Build (pull_request) Failing after 7s
CI / Frontend E2E Tests (pull_request) Failing after 8s
CI / Orchestrator Build (pull_request) Failing after 6s
CI / Contracts Compile (pull_request) Failing after 7s
CI / Contracts Test (pull_request) Failing after 6s
Code Quality / SonarQube Analysis (pull_request) Failing after 19s
Code Quality / Code Quality Checks (pull_request) Failing after 4s
Security Scan / Dependency Vulnerability Scan (pull_request) Failing after 5s
Security Scan / OWASP ZAP Scan (pull_request) Failing after 3s
Closes gap-analysis v2 §7.3 and §10.3. - New services/eventBusBroker.ts: per-process singleton with unique instanceId; subscribes to a Redis pub/sub channel (default ccombo:events) and re-emits inbound records on the in-process EventEmitter. Outbound envelopes carry the instanceId so the originating replica drops its own echo. - eventBus.publish() hands every persisted record to the broker after the local emit, so same-process SSE subscribers see zero change while remote-replica subscribers now receive every event. - When REDIS_URL is unset the broker is strictly inert (no connect, no publish) — single-process behaviour is preserved byte-for-byte. - 3 unit tests for feature-flag semantics; full suite 83/83 green. - Live Redis behaviour will be covered by the Testcontainers suite in PR Q.
This commit is contained in:
64
orchestrator/tests/unit/eventBusBroker.test.ts
Normal file
64
orchestrator/tests/unit/eventBusBroker.test.ts
Normal file
@@ -0,0 +1,64 @@
|
||||
/**
|
||||
* Tests for the Redis-backed cross-replica broker (gap v2 §7.3 / §10.3).
|
||||
*
|
||||
* These tests exercise the broker's feature-flag semantics without
|
||||
* spinning up a real Redis: when `REDIS_URL` is unset the broker must
|
||||
* be strictly inert. Live Redis behaviour is covered in the integration
|
||||
* suite (PR Q).
|
||||
*/
|
||||
|
||||
import { EventEmitter } from "events";
|
||||
import { getBroker, __resetBrokerForTests } from "../../src/services/eventBusBroker";
|
||||
import type { EventRecord } from "../../src/services/eventBus";
|
||||
|
||||
describe("eventBusBroker (feature flag)", () => {
|
||||
const savedEnv = { ...process.env };
|
||||
|
||||
beforeEach(() => {
|
||||
process.env = { ...savedEnv };
|
||||
__resetBrokerForTests();
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
__resetBrokerForTests();
|
||||
process.env = { ...savedEnv };
|
||||
});
|
||||
|
||||
it("is inert when REDIS_URL is unset", async () => {
|
||||
delete process.env.REDIS_URL;
|
||||
const emitter = new EventEmitter();
|
||||
const broker = getBroker(emitter);
|
||||
expect(broker.enabled).toBe(false);
|
||||
expect(typeof broker.instanceId).toBe("string");
|
||||
const rec: EventRecord = {
|
||||
id: "00000000-0000-0000-0000-000000000001",
|
||||
plan_id: "11111111-1111-1111-1111-111111111111",
|
||||
type: "transaction.created",
|
||||
actor: null,
|
||||
payload: {},
|
||||
payload_hash: "h",
|
||||
prev_hash: null,
|
||||
signature: "s",
|
||||
created_at: new Date().toISOString(),
|
||||
};
|
||||
await expect(broker.publish(rec)).resolves.toBeUndefined();
|
||||
await expect(broker.close()).resolves.toBeUndefined();
|
||||
});
|
||||
|
||||
it("returns a stable singleton within a process", () => {
|
||||
delete process.env.REDIS_URL;
|
||||
const emitter = new EventEmitter();
|
||||
const a = getBroker(emitter);
|
||||
const b = getBroker(emitter);
|
||||
expect(a).toBe(b);
|
||||
});
|
||||
|
||||
it("generates a unique instanceId per process", () => {
|
||||
delete process.env.REDIS_URL;
|
||||
const emitter = new EventEmitter();
|
||||
const a = getBroker(emitter).instanceId;
|
||||
__resetBrokerForTests();
|
||||
const b = getBroker(emitter).instanceId;
|
||||
expect(a).not.toEqual(b);
|
||||
});
|
||||
});
|
||||
Reference in New Issue
Block a user