diff --git a/.env.example b/.env.example index 7d1cf6e..6393ed8 100644 --- a/.env.example +++ b/.env.example @@ -66,6 +66,11 @@ CRYPTO_COM_API_SECRET= # GATEWAY_RAIL_MUTATE_MAX=120 # Jest / integration: disable rail mutate limiter # GATEWAY_RAIL_RATE_LIMIT_IN_TEST=1 +# MQ consumers: queue prefix for SolaceNet gateway guard (see gateway-mq-wrap.ts) +# SOLACENET_GATEWAY_MQ_QUEUE_PREFIX=dbis.gateway. +# Gateway outbox worker (npm run worker:gateway-outbox) +# GATEWAY_OUTBOX_BATCH_SIZE=25 +# GATEWAY_OUTBOX_MAX_ATTEMPTS=10 # ---------------------------------------------------------------------------- # Other (add as needed from dbis_core code) diff --git a/docs/solacenet/DBIS_RAIL_SCOPE_DEFERRAL.md b/docs/solacenet/DBIS_RAIL_SCOPE_DEFERRAL.md new file mode 100644 index 0000000..bfe7c27 --- /dev/null +++ b/docs/solacenet/DBIS_RAIL_SCOPE_DEFERRAL.md @@ -0,0 +1,13 @@ +# DBIS Rail on-chain scope — formal deferral (repository record) + +**Gap IDs:** PG-RAIL-001 through PG-RAIL-005 (and dependent mint path PG-RAIL-005). + +**Status:** **Deferred** in this codebase until a chartered program funds contract development, security review, and deployment to target chains. + +**What is not delivered here:** `DBIS_RootRegistry`, `DBIS_ParticipantRegistry`, `DBIS_SignerRegistry`, `DBIS_SettlementRouter`, `DBIS_GRU_MintController` (router-only mint) as live deployed contracts wired to production services. + +**Authoritative planning doc (Proxmox repo):** `docs/dbis-rail/DBIS_RAIL_AND_PROJECT_COMPLETION_MASTER_V1.md`. + +**Narrative rule:** Docs and UIs must **not** imply these contracts are live. Off-chain ISO / MintAuth work (PG-RAIL-006, PG-RAIL-007) may proceed in parallel where specs exist. + +**Last updated:** 2026-04-08 diff --git a/docs/solacenet/FILE_DROP_GATEWAY_INGRESS.example.md b/docs/solacenet/FILE_DROP_GATEWAY_INGRESS.example.md new file mode 100644 index 0000000..ecda174 --- /dev/null +++ b/docs/solacenet/FILE_DROP_GATEWAY_INGRESS.example.md @@ -0,0 +1,21 @@ +# File-drop ingress — SolaceNet gateway enforcement (example) + +**Gap:** PG-GW-W03 — file workers are app-specific; this pattern shows how to call the shared guard. + +```typescript +import { requireGatewayMicroservicesForWorker } from '@/core/gateway/rails/gateway-rails-enforcement'; +import fs from 'fs/promises'; + +async function processInboundFile(path: string, tenantId: string, adapterId: string) { + await requireGatewayMicroservicesForWorker({ + tenantId, + ingressKind: 'file-drop', + detail: path, + adapterId, + }); + const raw = await fs.readFile(path, 'utf8'); + // ... validate / ingest ... +} +``` + +Set `SOLACENET_GATEWAY_RAILS_ENFORCE=1` in the worker environment when policy should apply. diff --git a/docs/solacenet/GATEWAY_ADAPTER_LIFECYCLE.md b/docs/solacenet/GATEWAY_ADAPTER_LIFECYCLE.md new file mode 100644 index 0000000..f50c255 --- /dev/null +++ b/docs/solacenet/GATEWAY_ADAPTER_LIFECYCLE.md @@ -0,0 +1,11 @@ +# Gateway rail adapter lifecycle (DI policy) + +**Gap:** PG-GW-W01 (singleton vs per-request). + +**Policy (until superseded by performance review):** + +1. **HTTP (`/api/v1/gateway/rails/*`)** — `createGatewayRailAdapter(id)` returns a **new instance per request** after `registry` lookup. Do **not** cache adapter instances on the Express router. +2. **Long-lived workers** — if a worker holds a connector with sockets or session state, that worker process owns **one** instance per `adapterId`; document in the worker module. +3. **Secrets** — `initialize(config, secretsRef)` is the hook for KMS/HSM-backed config; REST routes still pass `{}` until per-rail secrets routing is implemented. + +**Related:** `gateway-adapter-registry.ts`, `gateway.routes.ts`. diff --git a/docs/solacenet/PROTOCOL_GAPS_CHECKLIST.md b/docs/solacenet/PROTOCOL_GAPS_CHECKLIST.md index 38ae173..92652ff 100644 --- a/docs/solacenet/PROTOCOL_GAPS_CHECKLIST.md +++ b/docs/solacenet/PROTOCOL_GAPS_CHECKLIST.md @@ -8,6 +8,7 @@ |--------|---------| | `open` | Not implemented or only stub/placeholder | | `partial` | Some path exists; production contract or wiring incomplete | +| `deferred` | Explicit program deferral recorded (see linked doc); not “open” for sprint pickup | | `n/a` | Explicitly out of scope for this repo (document boundary only) | | `done` | Production-capable per defined gate (update row when true) | @@ -83,11 +84,11 @@ Source of truth for deployment truth: `docs/dbis-rail/DBIS_RAIL_AND_PROJECT_COMP | ID | Protocol / contract layer | Status | Gap summary | |----|---------------------------|--------|-------------| -| PG-RAIL-001 | **DBIS_RootRegistry** | `open` | Contract set not implemented in repo | -| PG-RAIL-002 | **DBIS_ParticipantRegistry** | `open` | Same | -| PG-RAIL-003 | **DBIS_SignerRegistry** | `open` | Same | -| PG-RAIL-004 | **DBIS_SettlementRouter** | `open` | Same | -| PG-RAIL-005 | **DBIS_GRU_MintController** (router-only mint) | `open` | Mint path still owner-mint on c* per master doc | +| PG-RAIL-001 | **DBIS_RootRegistry** | `deferred` | [DBIS_RAIL_SCOPE_DEFERRAL.md](DBIS_RAIL_SCOPE_DEFERRAL.md) — not in active implementation until program charter | +| PG-RAIL-002 | **DBIS_ParticipantRegistry** | `deferred` | Same | +| PG-RAIL-003 | **DBIS_SignerRegistry** | `deferred` | Same | +| PG-RAIL-004 | **DBIS_SettlementRouter** | `deferred` | Same | +| PG-RAIL-005 | **DBIS_GRU_MintController** (router-only mint) | `deferred` | Same; owner-mint vs router mint per master doc when un-deferred | | PG-RAIL-006 | **Production ISO Gateway** matching EIP-712 MintAuth pipeline | `partial` | Spec/rulebook complete; production service + on-chain registry not aligned | | PG-RAIL-007 | **MintAuth relayer** operational hardening | `partial` | Documented; full ops gate open | @@ -138,7 +139,7 @@ Below: rows that are **protocol, endpoint, or message-contract** heavy and **not |----|-----|--------|------------| | PG-SN-001 | `gateway-microservices` capability describes rails; **adapters remain scaffolds** | `open` | Critical rails reach `done` in section B or scope reduced in offering JSON | | PG-SN-002 | `solacenet_provider_connector` rows for each live rail provider | `partial` | **Seed:** `npm run seed:gateway-provider` (`dbis-gateway-rail-plane`). **Remaining:** per-institution connectors + bindings | -| PG-SN-003 | smom-dbis-138-publish / tokenization docs: **SolaceNet policy** integration called out as future in places | `partial` | Code paths call `requireCapability` or documented exception | +| PG-SN-003 | smom-dbis-138-publish / tokenization vs SolaceNet | `partial` | **Doc:** [TOKENIZATION_SMOM_AND_SOLACENET.md](TOKENIZATION_SMOM_AND_SOLACENET.md). **Close when:** smom flows hit `dbis_core` tokenization APIs and entitlements are enforced there | --- diff --git a/docs/solacenet/REMAINING_TASKS_FULL_LIST.md b/docs/solacenet/REMAINING_TASKS_FULL_LIST.md index 6f42e7a..56ecb8b 100644 --- a/docs/solacenet/REMAINING_TASKS_FULL_LIST.md +++ b/docs/solacenet/REMAINING_TASKS_FULL_LIST.md @@ -21,7 +21,7 @@ |---|------|-----------|--------| | 1 | **Reconcile marketplace/offering copy** with live connector maturity — or scope down `gateway-microservices-offering.json` until rails are real | PG-SN-001, offering JSON | **Updated 2026-04-08:** description names scaffolds + KTT evidence semantics; keep row until production connectors ship. | | 2 | **Document KTT explicitly** as evidence-only in marketplace/UI copy; separate from symmetric send/receive rails | PG-GW-008, inconsistencies doc | Code already fails `send` by design | -| 3 | **Decide DBIS Rail scope:** implement contracts + router-only mint **or** formally defer and update all docs that imply live Rail | PG-RAIL-001–005, master doc | See `docs/dbis-rail/DBIS_RAIL_AND_PROJECT_COMPLETION_MASTER_V1.md` | +| 3 | **Decide DBIS Rail scope:** implement contracts + router-only mint **or** formally defer and update all docs that imply live Rail | PG-RAIL-001–005, master doc | **Deferred (recorded):** [DBIS_RAIL_SCOPE_DEFERRAL.md](DBIS_RAIL_SCOPE_DEFERRAL.md) + checklist `deferred`. Planning: `docs/dbis-rail/DBIS_RAIL_AND_PROJECT_COMPLETION_MASTER_V1.md` | | 4 | **Fix or quarantine Thirdweb adapter** type issues (`ethers` namespace) for clean `tsc` / CI | PG-GW-009 | **Partial:** `ethers` dependency + `import type { Provider, Signer }`; full `dbis_core` `tsc` may still fail elsewhere. | --- @@ -30,12 +30,12 @@ | # | Task | Gap ID(s) | Notes | |---|------|-----------|--------| -| 5 | **DI / lifecycle policy** for rail adapters (singleton vs per-request) and document | PG-GW-W01 | Registry exists; avoid accidental state bleed | -| 6 | **Orchestrated `send` path** (outbox, idempotency, DLQ) for rails that support outbound messages | PG-GW-W01, PG-GW-001–007 | KTT may stay receive-only | -| 7 | **Extend `SOLACENET_GATEWAY_RAILS_ENFORCE`** to every rail ingress (file drop, MQ, future workers) + **audit log** export for denied/allowed | PG-GW-W03 | HTTP path partially done | -| 8 | **Optional code facade** for PluginRegistry ↔ gateway adapters when one physical connector serves both | PG-GW-W02 | Doc exists; implement if needed | +| 5 | **DI / lifecycle policy** for rail adapters (singleton vs per-request) and document | PG-GW-W01 | **Doc:** [GATEWAY_ADAPTER_LIFECYCLE.md](GATEWAY_ADAPTER_LIFECYCLE.md) | +| 6 | **Orchestrated `send` path** (outbox, idempotency, DLQ) for rails that support outbound messages | PG-GW-W01, PG-GW-001–007 | **Partial:** [gateway-outbox.service.ts](../../src/core/gateway/control/gateway-outbox.service.ts) + [gateway-outbox.worker.ts](../../src/workers/gateway-outbox.worker.ts) (`npm run worker:gateway-outbox`); app-level CAS claim; production rails + optional DB `SKIP LOCKED` still TBD | +| 7 | **Extend `SOLACENET_GATEWAY_RAILS_ENFORCE`** to every rail ingress (file drop, MQ, future workers) + **audit log** export for denied/allowed | PG-GW-W03 | HTTP + audit + [gateway-mq-wrap.ts](../../src/core/gateway/rails/gateway-mq-wrap.ts) + `MessageBusService.buildGatewayAwareHandler`; file-drop = app-specific | +| 8 | **Optional code facade** for PluginRegistry ↔ gateway adapters when one physical connector serves both | PG-GW-W02 | **Stub:** [plugin-gateway-facade.ts](../../src/core/gateway/integration/plugin-gateway-facade.ts) | | 9 | **Register `solacenet_provider_connector`** rows per environment for each live provider | PG-SN-002 | Tie to capability bindings | -|10 | **smom-dbis-138-publish / tokenization:** wire `requireCapability` or document explicit exception | PG-SN-003 | | +|10 | **smom-dbis-138-publish / tokenization:** wire `requireCapability` or document explicit exception | PG-SN-003 | **Doc:** [TOKENIZATION_SMOM_AND_SOLACENET.md](TOKENIZATION_SMOM_AND_SOLACENET.md) | --- @@ -166,6 +166,7 @@ Besu, Explorer, Caliper benchmarks, and other **non-protocol** RTGS matrix rows - [public/GAPS_AND_INCONSISTENCIES.md](public/GAPS_AND_INCONSISTENCIES.md) — narrative gaps - [RAIL_AND_PROTOCOL_GOVERNANCE.md](RAIL_AND_PROTOCOL_GOVERNANCE.md) - [PLUGIN_AND_GATEWAY_BRIDGE.md](PLUGIN_AND_GATEWAY_BRIDGE.md) +- [TASK_BACKLOG_COMPLETION_STATUS.md](TASK_BACKLOG_COMPLETION_STATUS.md) — repo-done vs external vs deferred - Proxmox: `docs/03-deployment/DBIS_RTGS_E2E_REQUIREMENTS_MATRIX.md` - Proxmox: `docs/dbis-rail/DBIS_RAIL_AND_PROJECT_COMPLETION_MASTER_V1.md` - Proxmox: `docs/04-configuration/SOLACENET_PUBLIC_HUB.md` diff --git a/docs/solacenet/SOLACENET_GATEWAY_RAILS_ENFORCE_RUNBOOK.md b/docs/solacenet/SOLACENET_GATEWAY_RAILS_ENFORCE_RUNBOOK.md index e8881bf..1fbad5d 100644 --- a/docs/solacenet/SOLACENET_GATEWAY_RAILS_ENFORCE_RUNBOOK.md +++ b/docs/solacenet/SOLACENET_GATEWAY_RAILS_ENFORCE_RUNBOOK.md @@ -23,6 +23,11 @@ 4. **Smoke test with entitlement:** same call with an entitled tenant — expect **200** and health JSON. 5. **Confirm list endpoint:** `GET /api/v1/gateway/rails` remains **200** (metadata only; still requires normal API auth). +**Gateway outbox worker (send scaffold)** + +- `npm run worker:gateway-outbox` — processes `gateway_outbox` via rail `send` (scaffold until live wires). Env: `GATEWAY_OUTBOX_BATCH_SIZE`, `GATEWAY_OUTBOX_MAX_ATTEMPTS`. +- Example systemd unit (parent repo): `config/systemd/dbis-core-gateway-outbox-worker.example.service` — copy, set `WorkingDirectory` + `EnvironmentFile` with `DATABASE_URL`, then `systemctl enable --now`. + **Rollback** - Unset `SOLACENET_GATEWAY_RAILS_ENFORCE` or set to `0` / `false` and restart the API. diff --git a/docs/solacenet/TASK_BACKLOG_COMPLETION_STATUS.md b/docs/solacenet/TASK_BACKLOG_COMPLETION_STATUS.md new file mode 100644 index 0000000..0f81411 --- /dev/null +++ b/docs/solacenet/TASK_BACKLOG_COMPLETION_STATUS.md @@ -0,0 +1,113 @@ +# Backlog completion status (honest matrix) + +**Purpose:** Map `REMAINING_TASKS_FULL_LIST.md` items (and related audit bullets) to **what the repository can and cannot close** without external systems, contracts, or multi-sprint hardening. + +**Legend** + +| Status | Meaning | +|--------|--------| +| **Repo-done** | Addressed in code or canonical docs in this monorepo / `dbis_core`. | +| **Repo-partial** | Scaffolding, policy doc, or automation exists; production gate still open. | +| **External** | Requires SWIFTNet, DTCC, bank APIs, carrier CPE, jurisdiction endpoints, or paid cloud contracts. | +| **Deferred** | Explicit product/program deferral recorded in repo (implement later or never). | + +--- + +## P0 (tasks 1–4) + +| # | Status | Evidence / blocker | +|---|--------|-------------------| +| 1 | Repo-partial | Marketplace copy updated; **close** when live connectors exist or offering scoped further. | +| 2 | Repo-partial | Narrative in offering + public gaps; **close** when all UI surfaces reviewed. | +| 3 | **Deferred** | [DBIS_RAIL_SCOPE_DEFERRAL.md](DBIS_RAIL_SCOPE_DEFERRAL.md) — on-chain Rail contracts not in scope until chartered. | +| 4 | Repo-partial | `ethers` + types on Thirdweb adapter; **full `tsc`** still blocked by unrelated modules (see `npx tsc --noEmit`). | + +--- + +## P1 Gateway core (5–10) + +| # | Status | Evidence / blocker | +|---|--------|-------------------| +| 5 | Repo-done | [GATEWAY_ADAPTER_LIFECYCLE.md](GATEWAY_ADAPTER_LIFECYCLE.md) | +| 6 | Repo-partial | [gateway-outbox.service.ts](../../src/core/gateway/control/gateway-outbox.service.ts) + [gateway-outbox.worker.ts](../../src/workers/gateway-outbox.worker.ts) + `npm run worker:gateway-outbox` (CAS claim + `adapter.send`). | +| 7 | Repo-partial | HTTP + audit + MQ wrap + [FILE_DROP_GATEWAY_INGRESS.example.md](FILE_DROP_GATEWAY_INGRESS.example.md). | +| 8 | Repo-partial | [plugin-gateway-facade.ts](../../src/core/gateway/integration/plugin-gateway-facade.ts) stub — implement when dual-surface connector exists. | +| 9 | Repo-partial | `npm run seed:gateway-provider`; per-institution rows + bindings = **External / ops**. | +| 10 | Repo-partial (doc) | [TOKENIZATION_SMOM_AND_SOLACENET.md](TOKENIZATION_SMOM_AND_SOLACENET.md) — exception until cross-service API calls exist. | + +--- + +## P1 Rail scaffolds (11–19) + +| # | Status | Blocker | +|---|--------|--------| +| 11–17 | **External** | Institution contracts, network access, certification. | +| 18 | **External** | SoR integration, persistence design, bank agreement. | +| 19 | Repo-partial | Thirdweb encode path still placeholder — mark non-prod in adapter doc / complete encode = **External** (ABI per deployment). | + +--- + +## P1 DBIS Rail off-chain (20–21) + +| # | Status | Blocker | +|---|--------|--------| +| 20–21 | **External** | Production deployment, keys, monitoring, counterparty test environments. | + +--- + +## P1 RTGS spine (22–34) + +| # | Status | Blocker | +|---|--------|--------| +| 22–34 | **External** | See `docs/03-deployment/DBIS_RTGS_E2E_REQUIREMENTS_MATRIX.md` — live endpoints, legal packages, multi-party acceptance. | + +--- + +## P2 Hyperledger / institutional (35–50) + +| # | Status | Blocker | +|---|--------|--------| +| 35–50 | **External** | Topology decisions, vendors, custody/CSD/FX counterparties. | + +--- + +## P2 Quantum / QPS / admin (51–56) + +| # | Status | Evidence / blocker | +|---|--------|-------------------| +| 51–52 | Repo-partial | Types + example telecom schema; **real scoring / boundary** = External. | +| 53–56 | **External** | Real telemetry and bank data feeds into dashboards. | + +--- + +## P3 Carrier (57) + +| # | Status | Blocker | +|---|--------|--------| +| 57 | **External** | Carrier boundary service + regulated interconnect. | + +--- + +## P3 Hygiene (58–63) + +| # | Status | Evidence / blocker | +|---|--------|-------------------| +| 58 | Repo-done | Labeled in [public/GAPS_AND_INCONSISTENCIES.md](public/GAPS_AND_INCONSISTENCIES.md) §2. | +| 59–62 | Repo-partial / Ops | Heuristics remain until data model exists; verify scripts are ongoing. | +| 63 | Ongoing | Process — update [PROTOCOL_GAPS_CHECKLIST.md](PROTOCOL_GAPS_CHECKLIST.md) as gates close. | + +--- + +## Audit “remaining” bullets (post-2026-04-08) + +| Topic | Status | +|-------|--------| +| Outbox / DLQ / idempotency (full) | Repo-partial — DB tables + enqueue helper; **worker + exactly-once** = further work. | +| Per-tenant rate limits | Repo-partial — rail mutate limiter keys on **`ip:tenant`** (`gateway.routes.ts`). | +| Full `dbis_core` `tsc` | **External** to this sprint — hundreds of errors across unrelated modules; fix incrementally. | +| SIEM-signed audit | **External** — log shipping + tamper-evident store. | +| Go ↔ Node single edge | Repo-partial — documented; shared artifact optional. | + +--- + +**Last updated:** 2026-04-08 diff --git a/docs/solacenet/TOKENIZATION_SMOM_AND_SOLACENET.md b/docs/solacenet/TOKENIZATION_SMOM_AND_SOLACENET.md new file mode 100644 index 0000000..a79b093 --- /dev/null +++ b/docs/solacenet/TOKENIZATION_SMOM_AND_SOLACENET.md @@ -0,0 +1,11 @@ +# Chain 138 tokenization (smom-dbis-138) vs SolaceNet (`dbis_core`) + +**Gap:** PG-SN-003 — smom-dbis-138 publish / tokenization paths do not call `dbis_core` `requireCapability`. + +**Status:** **Documented exception.** Token mint/deploy and token-aggregation flows live primarily in **`smom-dbis-138`** (contracts, scripts, services). SolaceNet capability checks apply to **`dbis_core` HTTP/API** surfaces (e.g. `/api/v1/solacenet/tokenization/*`, gateway rails). + +**When to add checks:** If a future **cross-service** flow posts from smom automation into `dbis_core` tokenization APIs, add `requireCapability` at those API boundaries and record entitlements per tenant. + +**References:** `smom-dbis-138/docs/tokenization/`, `dbis_core/src/core/solacenet/capabilities/tokenization/`. + +**Last updated:** 2026-04-08 diff --git a/docs/solacenet/public/GAPS_AND_INCONSISTENCIES.md b/docs/solacenet/public/GAPS_AND_INCONSISTENCIES.md index 2b50d4d..3d41aae 100644 --- a/docs/solacenet/public/GAPS_AND_INCONSISTENCIES.md +++ b/docs/solacenet/public/GAPS_AND_INCONSISTENCIES.md @@ -24,7 +24,7 @@ |--------|--------| | **Dual adapter systems** | **Mitigated (doc):** [PLUGIN_AND_GATEWAY_BRIDGE.md](../PLUGIN_AND_GATEWAY_BRIDGE.md). **Remaining:** optional facade in code if one physical connector serves both shapes. | | **KTT vs other rails** | Marketplace lists **KTT Legacy** next to full **send/receive** rails; code is **evidence-only** and **send** fails by design. | -| **Synthetic success** | SWIFT FIN / TT scaffolds return **SENT** without network I/O; **KTT** returns **FAILED** on send—mixed semantics for “outbox” testing. | +| **Synthetic success** | SWIFT FIN / TT scaffolds return **SENT** without network I/O; **KTT** returns **FAILED** on send—mixed semantics for “outbox” testing. **Operator rule:** dashboards and demos must label these paths **scaffold / non-wire** so they are not read as production ACKs. | | **Thirdweb in gateway folder** | Shares adapter plane with **bank rails**; not a messaging rail—boundary should stay documented for integrators. | | **Offering copy vs reality** | `gateway-microservices-offering.json` describes a regulated fabric; **implementations are not production-grade** connectors yet. | | **QPS dashboard “SWIFT” volume** | Derived from `iso_messages` heuristics (`messageType.includes('SWIFT')`)—**not** true SWIFT FIN volume. | diff --git a/package.json b/package.json index eca8c5d..83ab1a3 100644 --- a/package.json +++ b/package.json @@ -20,6 +20,7 @@ "format": "prettier --write \"src/**/*.ts\"", "prepare": "husky install", "worker:dual-ledger-outbox": "ts-node src/workers/run-dual-ledger-outbox.ts", + "worker:gateway-outbox": "ts-node -r tsconfig-paths/register src/workers/run-gateway-outbox-worker.ts", "db:verify-columns": "psql $DATABASE_URL -f scripts/verify-column-names.sql", "db:audit-balances": "psql $DATABASE_URL -f scripts/audit-balances.sql", "db:run-migrations": "./scripts/run-migrations.sh", diff --git a/src/__tests__/unit/core/gateway/gateway-mq-wrap.test.ts b/src/__tests__/unit/core/gateway/gateway-mq-wrap.test.ts new file mode 100644 index 0000000..41641b2 --- /dev/null +++ b/src/__tests__/unit/core/gateway/gateway-mq-wrap.test.ts @@ -0,0 +1,31 @@ +import { + gatewayMqQueueNeedsSolaceNetCheck, + resolveTenantIdFromMqMessage, + wrapGatewayMqMessageHandler, +} from '@/core/gateway/rails/gateway-mq-wrap'; + +describe('gateway-mq-wrap', () => { + const origPrefix = process.env.SOLACENET_GATEWAY_MQ_QUEUE_PREFIX; + + afterEach(() => { + process.env.SOLACENET_GATEWAY_MQ_QUEUE_PREFIX = origPrefix; + }); + + it('detects default gateway queue prefix', () => { + delete process.env.SOLACENET_GATEWAY_MQ_QUEUE_PREFIX; + expect(gatewayMqQueueNeedsSolaceNetCheck('dbis.gateway.inbound')).toBe(true); + expect(gatewayMqQueueNeedsSolaceNetCheck('other.queue')).toBe(false); + }); + + it('resolves tenant from message', () => { + expect(resolveTenantIdFromMqMessage({ tenantId: 't1' })).toBe('t1'); + expect(resolveTenantIdFromMqMessage({ body: { tenantId: 't2' } })).toBe('t2'); + }); + + it('wrapGatewayMqMessageHandler passes through non-gateway queues', async () => { + const handler = jest.fn().mockResolvedValue(undefined); + const wrapped = wrapGatewayMqMessageHandler('other.queue', handler); + await wrapped({ x: 1 }); + expect(handler).toHaveBeenCalledWith({ x: 1 }); + }); +}); diff --git a/src/__tests__/unit/core/gateway/gateway-outbox.worker.test.ts b/src/__tests__/unit/core/gateway/gateway-outbox.worker.test.ts new file mode 100644 index 0000000..ceeced9 --- /dev/null +++ b/src/__tests__/unit/core/gateway/gateway-outbox.worker.test.ts @@ -0,0 +1,229 @@ +import type { PrismaClient } from '@prisma/client'; +import { GatewayOutboxWorker } from '@/workers/gateway-outbox.worker'; + +jest.mock('@/core/gateway/adapters/gateway-adapter-registry', () => { + const actual = jest.requireActual( + '@/core/gateway/adapters/gateway-adapter-registry', + ); + return { + ...actual, + createGatewayRailAdapter: jest.fn(actual.createGatewayRailAdapter), + }; +}); + +import { createGatewayRailAdapter } from '@/core/gateway/adapters/gateway-adapter-registry'; + +type OutboxRow = { + id: string; + txnId: string; + adapterId: string; + payloadHash: string; + sendAttempts: number; + status: string; + createdAt: Date; + lastAttemptAt: Date | null; +}; + +function makeMockPrisma(rows: OutboxRow[]): { + prisma: PrismaClient; + getRow: (id: string) => OutboxRow | undefined; +} { + const store = new Map(rows.map((r) => [r.id, { ...r }])); + + const prisma = { + gateway_outbox: { + findMany: jest.fn(async ({ where, take }: { where: { status: string; sendAttempts: { lt: number } }; take: number }) => { + const max = where.sendAttempts.lt; + const list = [...store.values()].filter( + (r) => r.status === where.status && r.sendAttempts < max, + ); + list.sort((a, b) => a.createdAt.getTime() - b.createdAt.getTime()); + return list.slice(0, take); + }), + updateMany: jest.fn( + async ({ + where, + data, + }: { + where: { id: string; status: string; sendAttempts: number }; + data: { sendAttempts: number; lastAttemptAt: Date }; + }) => { + const r = store.get(where.id); + if (!r) return { count: 0 }; + if (r.status !== where.status || r.sendAttempts !== where.sendAttempts) return { count: 0 }; + r.sendAttempts = data.sendAttempts; + r.lastAttemptAt = data.lastAttemptAt; + return { count: 1 }; + }, + ), + update: jest.fn( + async ({ + where, + data, + }: { + where: { id: string }; + data: Partial>; + }) => { + const r = store.get(where.id); + if (!r) throw new Error('gateway_outbox row not found'); + Object.assign(r, data); + return r; + }, + ), + }, + } as unknown as PrismaClient; + + return { + prisma, + getRow: (id: string) => store.get(id), + }; +} + +describe('GatewayOutboxWorker', () => { + const mockCreate = createGatewayRailAdapter as jest.MockedFunction; + + beforeEach(() => { + mockCreate.mockClear(); + mockCreate.mockImplementation((id) => + jest.requireActual( + '@/core/gateway/adapters/gateway-adapter-registry', + ).createGatewayRailAdapter(id), + ); + }); + + it('returns 0 when queue is empty', async () => { + const { prisma } = makeMockPrisma([]); + const w = new GatewayOutboxWorker(prisma, { batchSize: 10, maxAttempts: 5 }); + await expect(w.runOnce()).resolves.toBe(0); + }); + + it('claims row, sends SENT, marks gateway_outbox SENT', async () => { + mockCreate.mockReturnValue({ + initialize: jest.fn().mockResolvedValue(undefined), + send: jest.fn().mockResolvedValue({ status: 'SENT' as const, railMessageId: 'mid' }), + } as never); + + const row: OutboxRow = { + id: 'o1', + txnId: 'TX-1', + adapterId: 'dbis.adapter.thirdweb', + payloadHash: '0xabc', + sendAttempts: 0, + status: 'PENDING', + createdAt: new Date('2020-01-01'), + lastAttemptAt: null, + }; + const { prisma, getRow } = makeMockPrisma([row]); + const w = new GatewayOutboxWorker(prisma, { batchSize: 10, maxAttempts: 5 }); + const n = await w.runOnce(); + expect(n).toBe(1); + expect(getRow('o1')?.status).toBe('SENT'); + }); + + it('skips row when compare-and-swap loses (concurrent claim)', async () => { + const row: OutboxRow = { + id: 'o1', + txnId: 'TX-1', + adapterId: 'dbis.adapter.thirdweb', + payloadHash: '0xabc', + sendAttempts: 1, + status: 'PENDING', + createdAt: new Date('2020-01-01'), + lastAttemptAt: new Date(), + }; + const { prisma } = makeMockPrisma([row]); + (prisma.gateway_outbox.updateMany as jest.Mock).mockImplementationOnce(async () => ({ count: 0 })); + const w = new GatewayOutboxWorker(prisma, { batchSize: 10, maxAttempts: 5 }); + const n = await w.runOnce(); + expect(n).toBe(0); + expect(mockCreate).not.toHaveBeenCalled(); + }); + + it('on send throw, sets PENDING when attempts remain', async () => { + mockCreate.mockReturnValue({ + initialize: jest.fn().mockResolvedValue(undefined), + send: jest.fn().mockRejectedValue(new Error('network down')), + } as never); + + const row: OutboxRow = { + id: 'o1', + txnId: 'TX-1', + adapterId: 'dbis.adapter.thirdweb', + payloadHash: '0xabc', + sendAttempts: 0, + status: 'PENDING', + createdAt: new Date('2020-01-01'), + lastAttemptAt: null, + }; + const { prisma, getRow } = makeMockPrisma([row]); + const w = new GatewayOutboxWorker(prisma, { batchSize: 10, maxAttempts: 5 }); + await w.runOnce(); + expect(getRow('o1')?.status).toBe('PENDING'); + expect(getRow('o1')?.sendAttempts).toBe(1); + }); + + it('on send throw at max attempts, sets FAILED', async () => { + mockCreate.mockReturnValue({ + initialize: jest.fn().mockResolvedValue(undefined), + send: jest.fn().mockRejectedValue(new Error('still down')), + } as never); + + const row: OutboxRow = { + id: 'o1', + txnId: 'TX-1', + adapterId: 'dbis.adapter.thirdweb', + payloadHash: '0xabc', + sendAttempts: 2, + status: 'PENDING', + createdAt: new Date('2020-01-01'), + lastAttemptAt: null, + }; + const { prisma, getRow } = makeMockPrisma([row]); + const w = new GatewayOutboxWorker(prisma, { batchSize: 10, maxAttempts: 3 }); + await w.runOnce(); + expect(getRow('o1')?.status).toBe('FAILED'); + }); + + it('unknown adapterId marks FAILED when no retries left after claim', async () => { + const row: OutboxRow = { + id: 'o1', + txnId: 'TX-1', + adapterId: 'not.a.registered.adapter', + payloadHash: '0xabc', + sendAttempts: 2, + status: 'PENDING', + createdAt: new Date('2020-01-01'), + lastAttemptAt: null, + }; + const { prisma, getRow } = makeMockPrisma([row]); + const w = new GatewayOutboxWorker(prisma, { batchSize: 10, maxAttempts: 3 }); + await w.runOnce(); + expect(mockCreate).not.toHaveBeenCalled(); + expect(getRow('o1')?.status).toBe('FAILED'); + }); + + it('non-SENT adapter result leaves PENDING when under max attempts', async () => { + mockCreate.mockReturnValue({ + initialize: jest.fn().mockResolvedValue(undefined), + send: jest.fn().mockResolvedValue({ + status: 'FAILED', + error: { code: 'X', retryClass: 'RETRYABLE' }, + }), + } as never); + + const row: OutboxRow = { + id: 'o1', + txnId: 'TX-1', + adapterId: 'dbis.adapter.ktt-evidence', + payloadHash: '0xabc', + sendAttempts: 0, + status: 'PENDING', + createdAt: new Date('2020-01-01'), + lastAttemptAt: null, + }; + const { prisma, getRow } = makeMockPrisma([row]); + const w = new GatewayOutboxWorker(prisma, { batchSize: 10, maxAttempts: 5 }); + await w.runOnce(); + expect(getRow('o1')?.status).toBe('PENDING'); + }); +}); diff --git a/src/core/gateway/adapters/thirdweb/thirdweb-adapter.ts b/src/core/gateway/adapters/thirdweb/thirdweb-adapter.ts index afd0dc6..2e31ee4 100644 --- a/src/core/gateway/adapters/thirdweb/thirdweb-adapter.ts +++ b/src/core/gateway/adapters/thirdweb/thirdweb-adapter.ts @@ -6,7 +6,7 @@ import { AdapterReceiveResult, AdapterSendResult, AdapterValidateResult } from ' * dbis.adapter.thirdweb * * Thirdweb Gateway Adapter — contract invocation via Thirdweb SDK (not a bank messaging rail). - * Supports multiple chains and contract method invocation. + * **ABI / `encodeMethodCall`:** not production-complete — throws until ABI-backed encoding is wired (PG-GW-009). */ export class ThirdwebAdapter extends AdapterBase { private clientId?: string; diff --git a/src/core/gateway/control/gateway-outbox.service.ts b/src/core/gateway/control/gateway-outbox.service.ts new file mode 100644 index 0000000..e679c7e --- /dev/null +++ b/src/core/gateway/control/gateway-outbox.service.ts @@ -0,0 +1,75 @@ +import { createHash } from 'crypto'; +import prisma from '@/shared/database/prisma'; +import { logger } from '@/infrastructure/monitoring/logger'; + +/** + * Rail send outbox (Prisma `gateway_outbox`). Scaffold for PG-GW-W01 send orchestration: + * enqueue → worker claims PENDING → marks SENT/FAILED (DLQ semantics = FAILED + operator retry). + */ +export function hashPayload(payload: string): string { + return createHash('sha256').update(payload, 'utf8').digest('hex'); +} + +export async function enqueueGatewayOutbox(params: { + txnId: string; + adapterId: string; + payload: string; +}): Promise<{ id: string }> { + const payloadHash = hashPayload(params.payload); + const row = await prisma.gateway_outbox.create({ + data: { + txnId: params.txnId, + adapterId: params.adapterId, + payloadHash, + status: 'PENDING', + sendAttempts: 0, + }, + }); + logger.info('gateway_outbox enqueued', { id: row.id, adapterId: params.adapterId, txnId: params.txnId }); + return { id: row.id }; +} + +export async function listPendingGatewayOutbox(limit = 50): Promise< + Array<{ + id: string; + txnId: string; + adapterId: string; + payloadHash: string; + sendAttempts: number; + status: string; + }> +> { + return prisma.gateway_outbox.findMany({ + where: { status: 'PENDING' }, + orderBy: { createdAt: 'asc' }, + take: limit, + select: { + id: true, + txnId: true, + adapterId: true, + payloadHash: true, + sendAttempts: true, + status: true, + }, + }); +} + +export async function markGatewayOutboxSent(id: string): Promise { + await prisma.gateway_outbox.update({ + where: { id }, + data: { status: 'SENT', lastAttemptAt: new Date() }, + }); +} + +export async function markGatewayOutboxFailed(id: string, incrementAttempts = true): Promise { + const row = await prisma.gateway_outbox.findUnique({ where: { id } }); + if (!row) return; + await prisma.gateway_outbox.update({ + where: { id }, + data: { + status: 'FAILED', + lastAttemptAt: new Date(), + sendAttempts: incrementAttempts ? row.sendAttempts + 1 : row.sendAttempts, + }, + }); +} diff --git a/src/core/gateway/integration/plugin-gateway-facade.ts b/src/core/gateway/integration/plugin-gateway-facade.ts new file mode 100644 index 0000000..08de995 --- /dev/null +++ b/src/core/gateway/integration/plugin-gateway-facade.ts @@ -0,0 +1,10 @@ +/** + * PG-GW-W02 — Optional facade when one physical connector serves both PluginRegistry + * (`src/integration/plugins/`) and gateway rail adapters (`adapters/`). + * + * Implement domain-specific bridging here (e.g. single SWIFT session → plugin + `dbis.adapter.swift-fin`). + * Intentionally empty until a concrete dual-surface deployment exists. + */ +export class PluginGatewayFacade { + // Placeholder — add methods when a shared connector is chartered. +} diff --git a/src/core/gateway/rails/README.md b/src/core/gateway/rails/README.md index 48e0802..4f9017f 100644 --- a/src/core/gateway/rails/README.md +++ b/src/core/gateway/rails/README.md @@ -14,4 +14,6 @@ Tenant resolution order: **`body.tenantId`** → **`x-tenant-id`** → **`SOLACE **Worker ingress:** `requireGatewayMicroservicesForWorker()` in `gateway-rails-enforcement.ts` — call from file/MQ consumers when enforcement is on. -**Related:** `gateway-rails-audit.ts`, `../routes/gateway.routes.ts`, `../adapters/gateway-adapter-registry.ts`, `docs/solacenet/RAIL_AND_PROTOCOL_GOVERNANCE.md`, `docs/solacenet/SOLACENET_GATEWAY_RAILS_ENFORCE_RUNBOOK.md`. +**Outbox send loop:** `../../workers/gateway-outbox.worker.ts` — `npm run worker:gateway-outbox`. + +**Related:** `gateway-rails-audit.ts`, `../routes/gateway.routes.ts`, `../adapters/gateway-adapter-registry.ts`, `../control/gateway-outbox.service.ts`, `docs/solacenet/RAIL_AND_PROTOCOL_GOVERNANCE.md`, `docs/solacenet/SOLACENET_GATEWAY_RAILS_ENFORCE_RUNBOOK.md`. diff --git a/src/core/gateway/rails/gateway-mq-wrap.ts b/src/core/gateway/rails/gateway-mq-wrap.ts new file mode 100644 index 0000000..96e2864 --- /dev/null +++ b/src/core/gateway/rails/gateway-mq-wrap.ts @@ -0,0 +1,53 @@ +import { requireGatewayMicroservicesForWorker } from './gateway-rails-enforcement'; + +const DEFAULT_PREFIX = 'dbis.gateway.'; + +/** + * Queue names matching this prefix (or `SOLACENET_GATEWAY_MQ_QUEUE_PREFIX`) run + * `requireGatewayMicroservicesForWorker` before the handler when enforcement is on. + */ +export function gatewayMqQueueNeedsSolaceNetCheck(queueName: string): boolean { + const prefix = process.env.SOLACENET_GATEWAY_MQ_QUEUE_PREFIX?.trim() || DEFAULT_PREFIX; + return queueName.startsWith(prefix); +} + +export function resolveTenantIdFromMqMessage(message: unknown): string { + if (message && typeof message === 'object') { + const m = message as Record; + const direct = m.tenantId; + if (typeof direct === 'string' && direct.length > 0) return direct; + const body = m.body; + if (body && typeof body === 'object') { + const b = body as Record; + if (typeof b.tenantId === 'string' && b.tenantId.length > 0) return b.tenantId; + } + } + return process.env.SOLACENET_DEFAULT_TENANT_ID || 'system'; +} + +/** + * Wrap a RabbitMQ/Kafka-style handler so SolaceNet gateway enforcement applies + * to gateway-prefixed queues. Use when wiring real `subscribeToQueue` consumers. + */ +export function wrapGatewayMqMessageHandler( + queueName: string, + handler: (message: unknown) => Promise, +): (message: unknown) => Promise { + if (!gatewayMqQueueNeedsSolaceNetCheck(queueName)) { + return handler; + } + return async (message: unknown) => { + const tenantId = resolveTenantIdFromMqMessage(message); + const adapterId = + message && typeof message === 'object' && typeof (message as Record).adapterId === 'string' + ? String((message as Record).adapterId) + : undefined; + await requireGatewayMicroservicesForWorker({ + tenantId, + ingressKind: 'mq', + detail: queueName, + adapterId, + }); + await handler(message); + }; +} diff --git a/src/core/gateway/routes/gateway.routes.ts b/src/core/gateway/routes/gateway.routes.ts index 4d9db15..886634c 100644 --- a/src/core/gateway/routes/gateway.routes.ts +++ b/src/core/gateway/routes/gateway.routes.ts @@ -19,6 +19,7 @@ import { import { gatewayRailsEnforcementEnabled, maybeRequireGatewayMicroservices, + resolveGatewayTenantId, } from '../rails/gateway-rails-enforcement'; import { DbisError } from '@/shared/types'; @@ -36,6 +37,10 @@ if (process.env.GATEWAY_RAIL_RATE_LIMIT_IN_TEST !== '1') { max, standardHeaders: true, legacyHeaders: false, + keyGenerator: (req) => { + const tenant = resolveGatewayTenantId(req); + return `${req.ip ?? 'unknown'}:${tenant}`; + }, message: { success: false, error: { diff --git a/src/core/solacenet/registry/capability-registry.service.ts b/src/core/solacenet/registry/capability-registry.service.ts index c3854f5..2bb95e9 100644 --- a/src/core/solacenet/registry/capability-registry.service.ts +++ b/src/core/solacenet/registry/capability-registry.service.ts @@ -348,7 +348,7 @@ export class CapabilityRegistryService { providerId: binding.providerId ?? undefined, region: binding.region, config: binding.config as Record | undefined, - secretsRef: binding.secretsRef, + secretsRef: binding.secretsRef ?? undefined, status: binding.status as 'active' | 'inactive', createdAt: binding.createdAt, updatedAt: binding.updatedAt, diff --git a/src/integration/messaging/message-bus.service.ts b/src/integration/messaging/message-bus.service.ts index d8fd3a8..feb713f 100644 --- a/src/integration/messaging/message-bus.service.ts +++ b/src/integration/messaging/message-bus.service.ts @@ -1,6 +1,7 @@ // Message Bus Integration - RabbitMQ/Kafka import { logger } from '@/infrastructure/monitoring/logger'; +import { wrapGatewayMqMessageHandler } from '@/core/gateway/rails/gateway-mq-wrap'; export class MessageBusService { /** @@ -18,6 +19,18 @@ export class MessageBusService { // In production, this would subscribe to RabbitMQ or Kafka logger.info(`Subscribing to queue: ${queue}`); } + + /** + * Build a handler for amqplib/kafkajs consumers: SolaceNet enforcement on `dbis.gateway.*` + * queues (prefix overridable via `SOLACENET_GATEWAY_MQ_QUEUE_PREFIX`) when + * `SOLACENET_GATEWAY_RAILS_ENFORCE` is on. + */ + buildGatewayAwareHandler( + queue: string, + handler: (message: any) => Promise, + ): (message: unknown) => Promise { + return wrapGatewayMqMessageHandler(queue, handler); + } } export const messageBusService = new MessageBusService(); diff --git a/src/workers/gateway-outbox.worker.ts b/src/workers/gateway-outbox.worker.ts new file mode 100644 index 0000000..adcb4ed --- /dev/null +++ b/src/workers/gateway-outbox.worker.ts @@ -0,0 +1,119 @@ +import type { PrismaClient } from '@prisma/client'; +import { logger } from '@/infrastructure/monitoring/logger'; +import { + createGatewayRailAdapter, + isGatewayRailAdapterId, +} from '@/core/gateway/adapters/gateway-adapter-registry'; + +export interface GatewayOutboxWorkerOptions { + batchSize?: number; + /** After this many send attempts, row stays FAILED (DLQ). */ + maxAttempts?: number; +} + +/** + * Processes `gateway_outbox` rows (PENDING → adapter.send → SENT or PENDING retry / FAILED). + * Uses compare-and-swap on `(id, status, sendAttempts)` to limit double-sends under multiple workers. + */ +export class GatewayOutboxWorker { + private readonly batchSize: number; + private readonly maxAttempts: number; + + constructor( + private readonly prisma: PrismaClient, + opts: GatewayOutboxWorkerOptions = {}, + ) { + this.batchSize = opts.batchSize ?? parseInt(process.env.GATEWAY_OUTBOX_BATCH_SIZE || '25', 10); + this.maxAttempts = + opts.maxAttempts ?? parseInt(process.env.GATEWAY_OUTBOX_MAX_ATTEMPTS || '10', 10); + } + + async runOnce(): Promise { + const candidates = await this.prisma.gateway_outbox.findMany({ + where: { + status: 'PENDING', + sendAttempts: { lt: this.maxAttempts }, + }, + orderBy: { createdAt: 'asc' }, + take: this.batchSize, + }); + + let processed = 0; + for (const row of candidates) { + const claimed = await this.prisma.gateway_outbox.updateMany({ + where: { + id: row.id, + status: 'PENDING', + sendAttempts: row.sendAttempts, + }, + data: { + sendAttempts: row.sendAttempts + 1, + lastAttemptAt: new Date(), + }, + }); + if (claimed.count !== 1) { + continue; + } + + const attemptsAfterClaim = row.sendAttempts + 1; + try { + await this.dispatchSend(row.id, row.txnId, row.adapterId, row.payloadHash, attemptsAfterClaim); + processed += 1; + } catch (err) { + logger.error('GatewayOutboxWorker: dispatch error', { + id: row.id, + error: err instanceof Error ? err.message : String(err), + }); + await this.prisma.gateway_outbox.update({ + where: { id: row.id }, + data: { + status: attemptsAfterClaim >= this.maxAttempts ? 'FAILED' : 'PENDING', + }, + }); + processed += 1; + } + } + return processed; + } + + private async dispatchSend( + id: string, + txnId: string, + adapterId: string, + payloadHash: string, + attemptsAfterClaim: number, + ): Promise { + if (!isGatewayRailAdapterId(adapterId)) { + logger.warn('GatewayOutboxWorker: unknown adapter', { id, adapterId }); + await this.prisma.gateway_outbox.update({ + where: { id }, + data: { status: attemptsAfterClaim >= this.maxAttempts ? 'FAILED' : 'PENDING' }, + }); + return; + } + + const adapter = createGatewayRailAdapter(adapterId)!; + await adapter.initialize({}, undefined); + const result = await adapter.send({ + txnId, + payloadHash, + envelope: { source: 'gateway_outbox', scaffold: true }, + }); + + if (result.status === 'SENT') { + await this.prisma.gateway_outbox.update({ + where: { id }, + data: { status: 'SENT', lastAttemptAt: new Date() }, + }); + return; + } + + await this.prisma.gateway_outbox.update({ + where: { id }, + data: { + status: attemptsAfterClaim >= this.maxAttempts ? 'FAILED' : 'PENDING', + lastAttemptAt: new Date(), + }, + }); + } +} diff --git a/src/workers/run-gateway-outbox-worker.ts b/src/workers/run-gateway-outbox-worker.ts new file mode 100644 index 0000000..b941336 --- /dev/null +++ b/src/workers/run-gateway-outbox-worker.ts @@ -0,0 +1,40 @@ +/** + * Loop: claim gateway_outbox rows and invoke rail adapter `send`. + * Run: npx ts-node -r tsconfig-paths/register src/workers/run-gateway-outbox-worker.ts + * Env: DATABASE_URL, optional GATEWAY_OUTBOX_BATCH_SIZE, GATEWAY_OUTBOX_MAX_ATTEMPTS + */ + +import { PrismaClient } from '@prisma/client'; +import { GatewayOutboxWorker } from './gateway-outbox.worker'; +import { logger } from '@/infrastructure/monitoring/logger'; + +const prisma = new PrismaClient(); + +async function main() { + logger.info('GatewayOutboxWorker: starting'); + const worker = new GatewayOutboxWorker(prisma); + + let shutdown = false; + const onShutdown = () => { + shutdown = true; + logger.info('GatewayOutboxWorker: shutdown signal'); + }; + process.on('SIGINT', onShutdown); + process.on('SIGTERM', onShutdown); + + while (!shutdown) { + const n = await worker.runOnce(); + if (n === 0) { + await new Promise((r) => setTimeout(r, 500)); + } + } + + await prisma.$disconnect(); + process.exit(0); +} + +main().catch(async (e) => { + logger.error('GatewayOutboxWorker: fatal', { error: e instanceof Error ? e.message : e }); + await prisma.$disconnect().catch(() => undefined); + process.exit(1); +});