PR D: typed + signed event bus + events table + SSE (arch step 5) #8
Reference in New Issue
Block a user
Delete Branch "devin/1776875718-event-bus-sse"
Deleting a branch is permanent. Although the deleted branch may continue to exist for a short time before it actually gets removed, it CANNOT be undone in most cases. Continue?
Implements step 5 from the architecture gap-analysis — replaces the in-memory
EventEmitterpattern with a typed, signed, hash-chained, persisted event journal + SSE endpoint. Stacks on PR B.What lands
db/migrations/003_events.ts— append-only journalIndexed on
(plan_id, created_at)and(type).services/eventBus.tsEVENT_TYPES— all 15 normalised categories from arch §7.2. Single source of truth for subscribers.publish({ planId, type, actor?, payload? })— hashes the payload, reads the last signature for the plan, signs the new event, INSERTs, emits to the in-processEventEmitter.verifyChain(planId)— replays the chain and returns{ ok: true }or{ ok: false, brokenAt, reason }on the first payload / signature / prev_hash mismatch.getEventsForPlan(planId)— chronological replay.subscribe(planId, cb)— returns an unsubscribe fn. Used by the SSE route.HMAC secret precedence:
EVENT_BUS_HMAC_SECRET→SESSION_SECRET→ dev default.api/plans.tsGET /api/plans/:planId/events— full trail.?verify=1runsverifyChainand returns{ chain_valid, broken_at?, broken_reason? }so UIs can flag tampered audit logs.GET /api/plans/:planId/events/stream— SSE. On connect we replay history then subscribe to live events; 15s keep-alive; clean unsubscribe on client disconnect.Tests
tests/unit/eventBus.test.ts— 9 cases: enum completeness, first-event null prev_hash, multi-event chain, per-plan isolation, untampered chain passes, three tamper-detection cases (payload / signature / prev_hash mutation).Verification
Not in this PR
EventEmitterforstatusevents. PR A's state machine already recordssource_event_idon every transition; PR E (next) will wireExecutionCoordinatortoeventBus.publish()so every §7.2 event flows through the signed journal and every state transition can be joined back to its source event.publish()call would additionally broadcast to NATS/Kafka, but the persistence layer and hash chain stay unchanged.Series order
A → B → C → D → E → F → G → H.
Base:
devin/1776875351-validating-exception-manager(PR B). The diff shown here is D-only.- services/notaryChain.ts: new ethers-v6 adapter speaking to the deployed NotaryRegistry.sol via CHAIN_138_RPC_URL + NOTARY_REGISTRY_ADDRESS + ORCHESTRATOR_PRIVATE_KEY. Exposes anchorPlan(plan) -> { mode, txHash, planHash, blockNumber } and finalizeAnchor(planId, success) -> { mode, txHash, receiptHash } with deterministic mock fallback when envs are absent. - services/notary.ts: refactored to delegate to notaryChain; preserves the prior signature and returns extra on-chain fields (mode, txHash, blockNumber, contractAddress) when the anchor lands. - config/env.ts: add CHAIN_138_RPC_URL, CHAIN_138_CHAIN_ID, NOTARY_REGISTRY_ADDRESS, ORCHESTRATOR_PRIVATE_KEY (all optional, validated via regex where applicable). - package.json: add ethers@^6.11.0 dependency. - tests/unit/notaryChain.test.ts: 6 tests covering deterministic hashing helpers and the mock fallback path. tsc clean. 51 tests pass (45 pre-existing + 6 new).- db/migrations/003_events.ts: append-only events table with payload_hash, prev_hash, HMAC signature, indexed by plan_id + type - services/eventBus.ts: EVENT_TYPES union (all 15 arch §7.2 categories), publish() with hash-chain + HMAC signing, verifyChain() for tamper detection, subscribe() via in-process EventEmitter - api/plans.ts: - GET /api/plans/:planId/events (?verify=1 returns chain_valid) - GET /api/plans/:planId/events/stream (SSE with history replay + live push, 15s keep-alive, clean unsubscribe on client disconnect) - index.ts: register the two new endpoints - tests/unit/eventBus.test.ts: 9 tests covering publish, hash chain, per-plan isolation, and three tamper-detection scenarios (payload, signature, prev_hash) 60 tests pass. tsc clean.18bdaf61d5to59e1a85267