PR D: typed + signed event bus + events table + SSE (arch step 5) #8

Merged
nsatoshi merged 1 commits from devin/1776875718-event-bus-sse into main 2026-04-22 17:17:41 +00:00
Owner

Implements step 5 from the architecture gap-analysis — replaces the in-memory EventEmitter pattern with a typed, signed, hash-chained, persisted event journal + SSE endpoint. Stacks on PR B.

What lands

db/migrations/003_events.ts — append-only journal

column type note
id UUID pk
plan_id UUID FK → plans(plan_id), cascade delete
type VARCHAR(128) one of arch §7.2
actor VARCHAR(255) nullable (system events)
payload JSONB arbitrary event body
payload_hash CHAR(64) sha256(JSON(payload))
prev_hash CHAR(64) signature of previous event for same plan
signature CHAR(64) hmac_sha256(secret, plan_id|type|payload_hash|prev_hash)
created_at TIMESTAMPTZ default now()

Indexed on (plan_id, created_at) and (type).

services/eventBus.ts

  • EVENT_TYPESall 15 normalised categories from arch §7.2. Single source of truth for subscribers.
  • publish({ planId, type, actor?, payload? }) — hashes the payload, reads the last signature for the plan, signs the new event, INSERTs, emits to the in-process EventEmitter.
  • verifyChain(planId) — replays the chain and returns { ok: true } or { ok: false, brokenAt, reason } on the first payload / signature / prev_hash mismatch.
  • getEventsForPlan(planId) — chronological replay.
  • subscribe(planId, cb) — returns an unsubscribe fn. Used by the SSE route.

HMAC secret precedence: EVENT_BUS_HMAC_SECRETSESSION_SECRET → dev default.

api/plans.ts

  • GET /api/plans/:planId/events — full trail. ?verify=1 runs verifyChain and returns { chain_valid, broken_at?, broken_reason? } so UIs can flag tampered audit logs.
  • GET /api/plans/:planId/events/stream — SSE. On connect we replay history then subscribe to live events; 15s keep-alive; clean unsubscribe on client disconnect.

Tests

tests/unit/eventBus.test.ts — 9 cases: enum completeness, first-event null prev_hash, multi-event chain, per-plan isolation, untampered chain passes, three tamper-detection cases (payload / signature / prev_hash mutation).

Verification

$ npx tsc --noEmit     # clean
$ npx jest             # 60 passed, 5 suites

Not in this PR

  • ExecutionCoordinator still uses its own in-process EventEmitter for status events. PR A's state machine already records source_event_id on every transition; PR E (next) will wire ExecutionCoordinator to eventBus.publish() so every §7.2 event flows through the signed journal and every state transition can be joined back to its source event.
  • The in-process subscriber does not fan out across replicas — for multi-replica orchestrators the publish() call would additionally broadcast to NATS/Kafka, but the persistence layer and hash chain stay unchanged.

Series order

A → B → C → D → E → F → G → H.

Base: devin/1776875351-validating-exception-manager (PR B). The diff shown here is D-only.

Implements **step 5** from the architecture gap-analysis — replaces the in-memory `EventEmitter` pattern with a typed, signed, hash-chained, persisted event journal + SSE endpoint. Stacks on PR B. ## What lands ### `db/migrations/003_events.ts` — append-only journal | column | type | note | | --- | --- | --- | | id | UUID | pk | | plan_id | UUID | FK → plans(plan_id), cascade delete | | type | VARCHAR(128) | one of arch §7.2 | | actor | VARCHAR(255) | nullable (system events) | | payload | JSONB | arbitrary event body | | payload_hash | CHAR(64) | sha256(JSON(payload)) | | prev_hash | CHAR(64) | signature of previous event for same plan | | signature | CHAR(64) | hmac_sha256(secret, plan_id\|type\|payload_hash\|prev_hash) | | created_at | TIMESTAMPTZ | default now() | Indexed on `(plan_id, created_at)` and `(type)`. ### `services/eventBus.ts` - `EVENT_TYPES` — **all 15** normalised categories from arch §7.2. Single source of truth for subscribers. - `publish({ planId, type, actor?, payload? })` — hashes the payload, reads the last signature for the plan, signs the new event, INSERTs, emits to the in-process `EventEmitter`. - `verifyChain(planId)` — replays the chain and returns `{ ok: true }` or `{ ok: false, brokenAt, reason }` on the first payload / signature / prev_hash mismatch. - `getEventsForPlan(planId)` — chronological replay. - `subscribe(planId, cb)` — returns an unsubscribe fn. Used by the SSE route. HMAC secret precedence: `EVENT_BUS_HMAC_SECRET` → `SESSION_SECRET` → dev default. ### `api/plans.ts` - `GET /api/plans/:planId/events` — full trail. `?verify=1` runs `verifyChain` and returns `{ chain_valid, broken_at?, broken_reason? }` so UIs can flag tampered audit logs. - `GET /api/plans/:planId/events/stream` — SSE. On connect we replay history then subscribe to live events; 15s keep-alive; clean unsubscribe on client disconnect. ### Tests `tests/unit/eventBus.test.ts` — 9 cases: enum completeness, first-event null prev_hash, multi-event chain, per-plan isolation, untampered chain passes, three tamper-detection cases (payload / signature / prev_hash mutation). ## Verification ``` $ npx tsc --noEmit # clean $ npx jest # 60 passed, 5 suites ``` ## Not in this PR - ExecutionCoordinator still uses its own in-process `EventEmitter` for `status` events. PR A's state machine already records `source_event_id` on every transition; PR E (next) will wire `ExecutionCoordinator` to `eventBus.publish()` so every §7.2 event flows through the signed journal and every state transition can be joined back to its source event. - The in-process subscriber does not fan out across replicas — for multi-replica orchestrators the `publish()` call would additionally broadcast to NATS/Kafka, but the persistence layer and hash chain stay unchanged. ## Series order A → B → C → **D** → E → F → G → H. Base: `devin/1776875351-validating-exception-manager` (PR B). The diff shown here is D-only.
nsatoshi changed target branch from devin/1776875351-validating-exception-manager to main 2026-04-22 17:16:07 +00:00
nsatoshi added 4 commits 2026-04-22 17:16:07 +00:00
PR A: 12-state transaction machine + issueInstrument step + SoD matrix
Some checks failed
Code Quality / SonarQube Analysis (pull_request) Failing after 23s
Code Quality / Code Quality Checks (pull_request) Failing after 11s
Security Scan / Dependency Vulnerability Scan (pull_request) Failing after 4s
Security Scan / OWASP ZAP Scan (pull_request) Failing after 5s
b24a4df983
Architecture note steps 1, 2, 10 (data model).

- types/transactionState.ts: 12 states, allowed-transition table, SoD matrix
- types/plan.ts: add InstrumentTerms + 'issueInstrument' PlanStep type
- services/planValidation.ts: validate SBLC step (BIC, ISO-4217, sha256,
  YYYY-MM-DD expiry, >0 amount)
- services/stateMachine.ts: transition() enforces legality + SoD + appends
  to transaction_state_transitions
- db/migrations/002: plans.transaction_state (CHECK) +
  transaction_state_transitions append-only table
- tests/unit: 13 + 8 unit tests (31 total, all pass)

No behaviour change yet: coordinator still uses legacy status field.
PRs B-G will migrate execution paths onto the new machine.
- services/exceptionManager.ts: single taxonomy (timing/data/control/
  business/system) with §12 codes, deterministic route() table, and
  handle() dispatch to retry/DLQ/escalate
- services/execution.ts: refactor executePlan to drive the full 12-state
  machine (DRAFT -> INITIATED -> ... -> VALIDATING -> COMMITTED -> CLOSED)
  via stateMachine.transition(), with a new validatePhase() that
  reconciles DLT tx hash + bank message id + per-step amounts before
  COMMIT; SoD-gated edges use distinct synthetic actors by default
- api/plans.ts + index.ts: GET /api/plans/:planId/state returning
  current transaction_state + full audit trail of transitions
- tests/unit/exceptionManager.test.ts: 14 tests for classification +
  routing matrix

59 tests pass. tsc clean.
PR C: wire real NotaryRegistry contract on Chain 138 (arch step 4)
Some checks failed
Code Quality / SonarQube Analysis (pull_request) Failing after 20s
Code Quality / Code Quality Checks (pull_request) Failing after 8s
Security Scan / Dependency Vulnerability Scan (pull_request) Failing after 3s
Security Scan / OWASP ZAP Scan (pull_request) Failing after 4s
5bd6a200c3
- services/notaryChain.ts: new ethers-v6 adapter speaking to the
  deployed NotaryRegistry.sol via CHAIN_138_RPC_URL +
  NOTARY_REGISTRY_ADDRESS + ORCHESTRATOR_PRIVATE_KEY. Exposes
  anchorPlan(plan) -> { mode, txHash, planHash, blockNumber } and
  finalizeAnchor(planId, success) -> { mode, txHash, receiptHash }
  with deterministic mock fallback when envs are absent.
- services/notary.ts: refactored to delegate to notaryChain; preserves
  the prior signature and returns extra on-chain fields (mode, txHash,
  blockNumber, contractAddress) when the anchor lands.
- config/env.ts: add CHAIN_138_RPC_URL, CHAIN_138_CHAIN_ID,
  NOTARY_REGISTRY_ADDRESS, ORCHESTRATOR_PRIVATE_KEY (all optional,
  validated via regex where applicable).
- package.json: add ethers@^6.11.0 dependency.
- tests/unit/notaryChain.test.ts: 6 tests covering deterministic
  hashing helpers and the mock fallback path.

tsc clean. 51 tests pass (45 pre-existing + 6 new).
- db/migrations/003_events.ts: append-only events table with
  payload_hash, prev_hash, HMAC signature, indexed by plan_id + type
- services/eventBus.ts: EVENT_TYPES union (all 15 arch §7.2
  categories), publish() with hash-chain + HMAC signing, verifyChain()
  for tamper detection, subscribe() via in-process EventEmitter
- api/plans.ts:
    - GET /api/plans/:planId/events (?verify=1 returns chain_valid)
    - GET /api/plans/:planId/events/stream (SSE with history replay +
      live push, 15s keep-alive, clean unsubscribe on client disconnect)
- index.ts: register the two new endpoints
- tests/unit/eventBus.test.ts: 9 tests covering publish, hash chain,
  per-plan isolation, and three tamper-detection scenarios (payload,
  signature, prev_hash)

60 tests pass. tsc clean.
nsatoshi force-pushed devin/1776875718-event-bus-sse from 18bdaf61d5 to 59e1a85267 2026-04-22 17:17:21 +00:00 Compare
nsatoshi merged commit cb376eda31 into main 2026-04-22 17:17:41 +00:00
nsatoshi deleted branch devin/1776875718-event-bus-sse 2026-04-22 17:17:44 +00:00
Sign in to join this conversation.
No Reviewers
No Label
1 Participants
Notifications
Due Date
No due date set.
Dependencies

No dependencies set.

Reference: d-bis/CurrenciCombo#8