Gateway outbox worker, MQ wrap, thirdweb adapter updates, and SolaceNet docs.
Made-with: Cursor
This commit is contained in:
31
src/__tests__/unit/core/gateway/gateway-mq-wrap.test.ts
Normal file
31
src/__tests__/unit/core/gateway/gateway-mq-wrap.test.ts
Normal file
@@ -0,0 +1,31 @@
|
||||
import {
|
||||
gatewayMqQueueNeedsSolaceNetCheck,
|
||||
resolveTenantIdFromMqMessage,
|
||||
wrapGatewayMqMessageHandler,
|
||||
} from '@/core/gateway/rails/gateway-mq-wrap';
|
||||
|
||||
describe('gateway-mq-wrap', () => {
|
||||
const origPrefix = process.env.SOLACENET_GATEWAY_MQ_QUEUE_PREFIX;
|
||||
|
||||
afterEach(() => {
|
||||
process.env.SOLACENET_GATEWAY_MQ_QUEUE_PREFIX = origPrefix;
|
||||
});
|
||||
|
||||
it('detects default gateway queue prefix', () => {
|
||||
delete process.env.SOLACENET_GATEWAY_MQ_QUEUE_PREFIX;
|
||||
expect(gatewayMqQueueNeedsSolaceNetCheck('dbis.gateway.inbound')).toBe(true);
|
||||
expect(gatewayMqQueueNeedsSolaceNetCheck('other.queue')).toBe(false);
|
||||
});
|
||||
|
||||
it('resolves tenant from message', () => {
|
||||
expect(resolveTenantIdFromMqMessage({ tenantId: 't1' })).toBe('t1');
|
||||
expect(resolveTenantIdFromMqMessage({ body: { tenantId: 't2' } })).toBe('t2');
|
||||
});
|
||||
|
||||
it('wrapGatewayMqMessageHandler passes through non-gateway queues', async () => {
|
||||
const handler = jest.fn().mockResolvedValue(undefined);
|
||||
const wrapped = wrapGatewayMqMessageHandler('other.queue', handler);
|
||||
await wrapped({ x: 1 });
|
||||
expect(handler).toHaveBeenCalledWith({ x: 1 });
|
||||
});
|
||||
});
|
||||
229
src/__tests__/unit/core/gateway/gateway-outbox.worker.test.ts
Normal file
229
src/__tests__/unit/core/gateway/gateway-outbox.worker.test.ts
Normal file
@@ -0,0 +1,229 @@
|
||||
import type { PrismaClient } from '@prisma/client';
|
||||
import { GatewayOutboxWorker } from '@/workers/gateway-outbox.worker';
|
||||
|
||||
jest.mock('@/core/gateway/adapters/gateway-adapter-registry', () => {
|
||||
const actual = jest.requireActual<typeof import('@/core/gateway/adapters/gateway-adapter-registry')>(
|
||||
'@/core/gateway/adapters/gateway-adapter-registry',
|
||||
);
|
||||
return {
|
||||
...actual,
|
||||
createGatewayRailAdapter: jest.fn(actual.createGatewayRailAdapter),
|
||||
};
|
||||
});
|
||||
|
||||
import { createGatewayRailAdapter } from '@/core/gateway/adapters/gateway-adapter-registry';
|
||||
|
||||
type OutboxRow = {
|
||||
id: string;
|
||||
txnId: string;
|
||||
adapterId: string;
|
||||
payloadHash: string;
|
||||
sendAttempts: number;
|
||||
status: string;
|
||||
createdAt: Date;
|
||||
lastAttemptAt: Date | null;
|
||||
};
|
||||
|
||||
function makeMockPrisma(rows: OutboxRow[]): {
|
||||
prisma: PrismaClient;
|
||||
getRow: (id: string) => OutboxRow | undefined;
|
||||
} {
|
||||
const store = new Map(rows.map((r) => [r.id, { ...r }]));
|
||||
|
||||
const prisma = {
|
||||
gateway_outbox: {
|
||||
findMany: jest.fn(async ({ where, take }: { where: { status: string; sendAttempts: { lt: number } }; take: number }) => {
|
||||
const max = where.sendAttempts.lt;
|
||||
const list = [...store.values()].filter(
|
||||
(r) => r.status === where.status && r.sendAttempts < max,
|
||||
);
|
||||
list.sort((a, b) => a.createdAt.getTime() - b.createdAt.getTime());
|
||||
return list.slice(0, take);
|
||||
}),
|
||||
updateMany: jest.fn(
|
||||
async ({
|
||||
where,
|
||||
data,
|
||||
}: {
|
||||
where: { id: string; status: string; sendAttempts: number };
|
||||
data: { sendAttempts: number; lastAttemptAt: Date };
|
||||
}) => {
|
||||
const r = store.get(where.id);
|
||||
if (!r) return { count: 0 };
|
||||
if (r.status !== where.status || r.sendAttempts !== where.sendAttempts) return { count: 0 };
|
||||
r.sendAttempts = data.sendAttempts;
|
||||
r.lastAttemptAt = data.lastAttemptAt;
|
||||
return { count: 1 };
|
||||
},
|
||||
),
|
||||
update: jest.fn(
|
||||
async ({
|
||||
where,
|
||||
data,
|
||||
}: {
|
||||
where: { id: string };
|
||||
data: Partial<Pick<OutboxRow, 'status' | 'sendAttempts' | 'lastAttemptAt'>>;
|
||||
}) => {
|
||||
const r = store.get(where.id);
|
||||
if (!r) throw new Error('gateway_outbox row not found');
|
||||
Object.assign(r, data);
|
||||
return r;
|
||||
},
|
||||
),
|
||||
},
|
||||
} as unknown as PrismaClient;
|
||||
|
||||
return {
|
||||
prisma,
|
||||
getRow: (id: string) => store.get(id),
|
||||
};
|
||||
}
|
||||
|
||||
describe('GatewayOutboxWorker', () => {
|
||||
const mockCreate = createGatewayRailAdapter as jest.MockedFunction<typeof createGatewayRailAdapter>;
|
||||
|
||||
beforeEach(() => {
|
||||
mockCreate.mockClear();
|
||||
mockCreate.mockImplementation((id) =>
|
||||
jest.requireActual<typeof import('@/core/gateway/adapters/gateway-adapter-registry')>(
|
||||
'@/core/gateway/adapters/gateway-adapter-registry',
|
||||
).createGatewayRailAdapter(id),
|
||||
);
|
||||
});
|
||||
|
||||
it('returns 0 when queue is empty', async () => {
|
||||
const { prisma } = makeMockPrisma([]);
|
||||
const w = new GatewayOutboxWorker(prisma, { batchSize: 10, maxAttempts: 5 });
|
||||
await expect(w.runOnce()).resolves.toBe(0);
|
||||
});
|
||||
|
||||
it('claims row, sends SENT, marks gateway_outbox SENT', async () => {
|
||||
mockCreate.mockReturnValue({
|
||||
initialize: jest.fn().mockResolvedValue(undefined),
|
||||
send: jest.fn().mockResolvedValue({ status: 'SENT' as const, railMessageId: 'mid' }),
|
||||
} as never);
|
||||
|
||||
const row: OutboxRow = {
|
||||
id: 'o1',
|
||||
txnId: 'TX-1',
|
||||
adapterId: 'dbis.adapter.thirdweb',
|
||||
payloadHash: '0xabc',
|
||||
sendAttempts: 0,
|
||||
status: 'PENDING',
|
||||
createdAt: new Date('2020-01-01'),
|
||||
lastAttemptAt: null,
|
||||
};
|
||||
const { prisma, getRow } = makeMockPrisma([row]);
|
||||
const w = new GatewayOutboxWorker(prisma, { batchSize: 10, maxAttempts: 5 });
|
||||
const n = await w.runOnce();
|
||||
expect(n).toBe(1);
|
||||
expect(getRow('o1')?.status).toBe('SENT');
|
||||
});
|
||||
|
||||
it('skips row when compare-and-swap loses (concurrent claim)', async () => {
|
||||
const row: OutboxRow = {
|
||||
id: 'o1',
|
||||
txnId: 'TX-1',
|
||||
adapterId: 'dbis.adapter.thirdweb',
|
||||
payloadHash: '0xabc',
|
||||
sendAttempts: 1,
|
||||
status: 'PENDING',
|
||||
createdAt: new Date('2020-01-01'),
|
||||
lastAttemptAt: new Date(),
|
||||
};
|
||||
const { prisma } = makeMockPrisma([row]);
|
||||
(prisma.gateway_outbox.updateMany as jest.Mock).mockImplementationOnce(async () => ({ count: 0 }));
|
||||
const w = new GatewayOutboxWorker(prisma, { batchSize: 10, maxAttempts: 5 });
|
||||
const n = await w.runOnce();
|
||||
expect(n).toBe(0);
|
||||
expect(mockCreate).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('on send throw, sets PENDING when attempts remain', async () => {
|
||||
mockCreate.mockReturnValue({
|
||||
initialize: jest.fn().mockResolvedValue(undefined),
|
||||
send: jest.fn().mockRejectedValue(new Error('network down')),
|
||||
} as never);
|
||||
|
||||
const row: OutboxRow = {
|
||||
id: 'o1',
|
||||
txnId: 'TX-1',
|
||||
adapterId: 'dbis.adapter.thirdweb',
|
||||
payloadHash: '0xabc',
|
||||
sendAttempts: 0,
|
||||
status: 'PENDING',
|
||||
createdAt: new Date('2020-01-01'),
|
||||
lastAttemptAt: null,
|
||||
};
|
||||
const { prisma, getRow } = makeMockPrisma([row]);
|
||||
const w = new GatewayOutboxWorker(prisma, { batchSize: 10, maxAttempts: 5 });
|
||||
await w.runOnce();
|
||||
expect(getRow('o1')?.status).toBe('PENDING');
|
||||
expect(getRow('o1')?.sendAttempts).toBe(1);
|
||||
});
|
||||
|
||||
it('on send throw at max attempts, sets FAILED', async () => {
|
||||
mockCreate.mockReturnValue({
|
||||
initialize: jest.fn().mockResolvedValue(undefined),
|
||||
send: jest.fn().mockRejectedValue(new Error('still down')),
|
||||
} as never);
|
||||
|
||||
const row: OutboxRow = {
|
||||
id: 'o1',
|
||||
txnId: 'TX-1',
|
||||
adapterId: 'dbis.adapter.thirdweb',
|
||||
payloadHash: '0xabc',
|
||||
sendAttempts: 2,
|
||||
status: 'PENDING',
|
||||
createdAt: new Date('2020-01-01'),
|
||||
lastAttemptAt: null,
|
||||
};
|
||||
const { prisma, getRow } = makeMockPrisma([row]);
|
||||
const w = new GatewayOutboxWorker(prisma, { batchSize: 10, maxAttempts: 3 });
|
||||
await w.runOnce();
|
||||
expect(getRow('o1')?.status).toBe('FAILED');
|
||||
});
|
||||
|
||||
it('unknown adapterId marks FAILED when no retries left after claim', async () => {
|
||||
const row: OutboxRow = {
|
||||
id: 'o1',
|
||||
txnId: 'TX-1',
|
||||
adapterId: 'not.a.registered.adapter',
|
||||
payloadHash: '0xabc',
|
||||
sendAttempts: 2,
|
||||
status: 'PENDING',
|
||||
createdAt: new Date('2020-01-01'),
|
||||
lastAttemptAt: null,
|
||||
};
|
||||
const { prisma, getRow } = makeMockPrisma([row]);
|
||||
const w = new GatewayOutboxWorker(prisma, { batchSize: 10, maxAttempts: 3 });
|
||||
await w.runOnce();
|
||||
expect(mockCreate).not.toHaveBeenCalled();
|
||||
expect(getRow('o1')?.status).toBe('FAILED');
|
||||
});
|
||||
|
||||
it('non-SENT adapter result leaves PENDING when under max attempts', async () => {
|
||||
mockCreate.mockReturnValue({
|
||||
initialize: jest.fn().mockResolvedValue(undefined),
|
||||
send: jest.fn().mockResolvedValue({
|
||||
status: 'FAILED',
|
||||
error: { code: 'X', retryClass: 'RETRYABLE' },
|
||||
}),
|
||||
} as never);
|
||||
|
||||
const row: OutboxRow = {
|
||||
id: 'o1',
|
||||
txnId: 'TX-1',
|
||||
adapterId: 'dbis.adapter.ktt-evidence',
|
||||
payloadHash: '0xabc',
|
||||
sendAttempts: 0,
|
||||
status: 'PENDING',
|
||||
createdAt: new Date('2020-01-01'),
|
||||
lastAttemptAt: null,
|
||||
};
|
||||
const { prisma, getRow } = makeMockPrisma([row]);
|
||||
const w = new GatewayOutboxWorker(prisma, { batchSize: 10, maxAttempts: 5 });
|
||||
await w.runOnce();
|
||||
expect(getRow('o1')?.status).toBe('PENDING');
|
||||
});
|
||||
});
|
||||
@@ -6,7 +6,7 @@ import { AdapterReceiveResult, AdapterSendResult, AdapterValidateResult } from '
|
||||
* dbis.adapter.thirdweb
|
||||
*
|
||||
* Thirdweb Gateway Adapter — contract invocation via Thirdweb SDK (not a bank messaging rail).
|
||||
* Supports multiple chains and contract method invocation.
|
||||
* **ABI / `encodeMethodCall`:** not production-complete — throws until ABI-backed encoding is wired (PG-GW-009).
|
||||
*/
|
||||
export class ThirdwebAdapter extends AdapterBase {
|
||||
private clientId?: string;
|
||||
|
||||
75
src/core/gateway/control/gateway-outbox.service.ts
Normal file
75
src/core/gateway/control/gateway-outbox.service.ts
Normal file
@@ -0,0 +1,75 @@
|
||||
import { createHash } from 'crypto';
|
||||
import prisma from '@/shared/database/prisma';
|
||||
import { logger } from '@/infrastructure/monitoring/logger';
|
||||
|
||||
/**
|
||||
* Rail send outbox (Prisma `gateway_outbox`). Scaffold for PG-GW-W01 send orchestration:
|
||||
* enqueue → worker claims PENDING → marks SENT/FAILED (DLQ semantics = FAILED + operator retry).
|
||||
*/
|
||||
export function hashPayload(payload: string): string {
|
||||
return createHash('sha256').update(payload, 'utf8').digest('hex');
|
||||
}
|
||||
|
||||
export async function enqueueGatewayOutbox(params: {
|
||||
txnId: string;
|
||||
adapterId: string;
|
||||
payload: string;
|
||||
}): Promise<{ id: string }> {
|
||||
const payloadHash = hashPayload(params.payload);
|
||||
const row = await prisma.gateway_outbox.create({
|
||||
data: {
|
||||
txnId: params.txnId,
|
||||
adapterId: params.adapterId,
|
||||
payloadHash,
|
||||
status: 'PENDING',
|
||||
sendAttempts: 0,
|
||||
},
|
||||
});
|
||||
logger.info('gateway_outbox enqueued', { id: row.id, adapterId: params.adapterId, txnId: params.txnId });
|
||||
return { id: row.id };
|
||||
}
|
||||
|
||||
export async function listPendingGatewayOutbox(limit = 50): Promise<
|
||||
Array<{
|
||||
id: string;
|
||||
txnId: string;
|
||||
adapterId: string;
|
||||
payloadHash: string;
|
||||
sendAttempts: number;
|
||||
status: string;
|
||||
}>
|
||||
> {
|
||||
return prisma.gateway_outbox.findMany({
|
||||
where: { status: 'PENDING' },
|
||||
orderBy: { createdAt: 'asc' },
|
||||
take: limit,
|
||||
select: {
|
||||
id: true,
|
||||
txnId: true,
|
||||
adapterId: true,
|
||||
payloadHash: true,
|
||||
sendAttempts: true,
|
||||
status: true,
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
export async function markGatewayOutboxSent(id: string): Promise<void> {
|
||||
await prisma.gateway_outbox.update({
|
||||
where: { id },
|
||||
data: { status: 'SENT', lastAttemptAt: new Date() },
|
||||
});
|
||||
}
|
||||
|
||||
export async function markGatewayOutboxFailed(id: string, incrementAttempts = true): Promise<void> {
|
||||
const row = await prisma.gateway_outbox.findUnique({ where: { id } });
|
||||
if (!row) return;
|
||||
await prisma.gateway_outbox.update({
|
||||
where: { id },
|
||||
data: {
|
||||
status: 'FAILED',
|
||||
lastAttemptAt: new Date(),
|
||||
sendAttempts: incrementAttempts ? row.sendAttempts + 1 : row.sendAttempts,
|
||||
},
|
||||
});
|
||||
}
|
||||
10
src/core/gateway/integration/plugin-gateway-facade.ts
Normal file
10
src/core/gateway/integration/plugin-gateway-facade.ts
Normal file
@@ -0,0 +1,10 @@
|
||||
/**
|
||||
* PG-GW-W02 — Optional facade when one physical connector serves both PluginRegistry
|
||||
* (`src/integration/plugins/`) and gateway rail adapters (`adapters/`).
|
||||
*
|
||||
* Implement domain-specific bridging here (e.g. single SWIFT session → plugin + `dbis.adapter.swift-fin`).
|
||||
* Intentionally empty until a concrete dual-surface deployment exists.
|
||||
*/
|
||||
export class PluginGatewayFacade {
|
||||
// Placeholder — add methods when a shared connector is chartered.
|
||||
}
|
||||
@@ -14,4 +14,6 @@ Tenant resolution order: **`body.tenantId`** → **`x-tenant-id`** → **`SOLACE
|
||||
|
||||
**Worker ingress:** `requireGatewayMicroservicesForWorker()` in `gateway-rails-enforcement.ts` — call from file/MQ consumers when enforcement is on.
|
||||
|
||||
**Related:** `gateway-rails-audit.ts`, `../routes/gateway.routes.ts`, `../adapters/gateway-adapter-registry.ts`, `docs/solacenet/RAIL_AND_PROTOCOL_GOVERNANCE.md`, `docs/solacenet/SOLACENET_GATEWAY_RAILS_ENFORCE_RUNBOOK.md`.
|
||||
**Outbox send loop:** `../../workers/gateway-outbox.worker.ts` — `npm run worker:gateway-outbox`.
|
||||
|
||||
**Related:** `gateway-rails-audit.ts`, `../routes/gateway.routes.ts`, `../adapters/gateway-adapter-registry.ts`, `../control/gateway-outbox.service.ts`, `docs/solacenet/RAIL_AND_PROTOCOL_GOVERNANCE.md`, `docs/solacenet/SOLACENET_GATEWAY_RAILS_ENFORCE_RUNBOOK.md`.
|
||||
|
||||
53
src/core/gateway/rails/gateway-mq-wrap.ts
Normal file
53
src/core/gateway/rails/gateway-mq-wrap.ts
Normal file
@@ -0,0 +1,53 @@
|
||||
import { requireGatewayMicroservicesForWorker } from './gateway-rails-enforcement';
|
||||
|
||||
const DEFAULT_PREFIX = 'dbis.gateway.';
|
||||
|
||||
/**
|
||||
* Queue names matching this prefix (or `SOLACENET_GATEWAY_MQ_QUEUE_PREFIX`) run
|
||||
* `requireGatewayMicroservicesForWorker` before the handler when enforcement is on.
|
||||
*/
|
||||
export function gatewayMqQueueNeedsSolaceNetCheck(queueName: string): boolean {
|
||||
const prefix = process.env.SOLACENET_GATEWAY_MQ_QUEUE_PREFIX?.trim() || DEFAULT_PREFIX;
|
||||
return queueName.startsWith(prefix);
|
||||
}
|
||||
|
||||
export function resolveTenantIdFromMqMessage(message: unknown): string {
|
||||
if (message && typeof message === 'object') {
|
||||
const m = message as Record<string, unknown>;
|
||||
const direct = m.tenantId;
|
||||
if (typeof direct === 'string' && direct.length > 0) return direct;
|
||||
const body = m.body;
|
||||
if (body && typeof body === 'object') {
|
||||
const b = body as Record<string, unknown>;
|
||||
if (typeof b.tenantId === 'string' && b.tenantId.length > 0) return b.tenantId;
|
||||
}
|
||||
}
|
||||
return process.env.SOLACENET_DEFAULT_TENANT_ID || 'system';
|
||||
}
|
||||
|
||||
/**
|
||||
* Wrap a RabbitMQ/Kafka-style handler so SolaceNet gateway enforcement applies
|
||||
* to gateway-prefixed queues. Use when wiring real `subscribeToQueue` consumers.
|
||||
*/
|
||||
export function wrapGatewayMqMessageHandler(
|
||||
queueName: string,
|
||||
handler: (message: unknown) => Promise<void>,
|
||||
): (message: unknown) => Promise<void> {
|
||||
if (!gatewayMqQueueNeedsSolaceNetCheck(queueName)) {
|
||||
return handler;
|
||||
}
|
||||
return async (message: unknown) => {
|
||||
const tenantId = resolveTenantIdFromMqMessage(message);
|
||||
const adapterId =
|
||||
message && typeof message === 'object' && typeof (message as Record<string, unknown>).adapterId === 'string'
|
||||
? String((message as Record<string, unknown>).adapterId)
|
||||
: undefined;
|
||||
await requireGatewayMicroservicesForWorker({
|
||||
tenantId,
|
||||
ingressKind: 'mq',
|
||||
detail: queueName,
|
||||
adapterId,
|
||||
});
|
||||
await handler(message);
|
||||
};
|
||||
}
|
||||
@@ -19,6 +19,7 @@ import {
|
||||
import {
|
||||
gatewayRailsEnforcementEnabled,
|
||||
maybeRequireGatewayMicroservices,
|
||||
resolveGatewayTenantId,
|
||||
} from '../rails/gateway-rails-enforcement';
|
||||
import { DbisError } from '@/shared/types';
|
||||
|
||||
@@ -36,6 +37,10 @@ if (process.env.GATEWAY_RAIL_RATE_LIMIT_IN_TEST !== '1') {
|
||||
max,
|
||||
standardHeaders: true,
|
||||
legacyHeaders: false,
|
||||
keyGenerator: (req) => {
|
||||
const tenant = resolveGatewayTenantId(req);
|
||||
return `${req.ip ?? 'unknown'}:${tenant}`;
|
||||
},
|
||||
message: {
|
||||
success: false,
|
||||
error: {
|
||||
|
||||
@@ -348,7 +348,7 @@ export class CapabilityRegistryService {
|
||||
providerId: binding.providerId ?? undefined,
|
||||
region: binding.region,
|
||||
config: binding.config as Record<string, unknown> | undefined,
|
||||
secretsRef: binding.secretsRef,
|
||||
secretsRef: binding.secretsRef ?? undefined,
|
||||
status: binding.status as 'active' | 'inactive',
|
||||
createdAt: binding.createdAt,
|
||||
updatedAt: binding.updatedAt,
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
// Message Bus Integration - RabbitMQ/Kafka
|
||||
|
||||
import { logger } from '@/infrastructure/monitoring/logger';
|
||||
import { wrapGatewayMqMessageHandler } from '@/core/gateway/rails/gateway-mq-wrap';
|
||||
|
||||
export class MessageBusService {
|
||||
/**
|
||||
@@ -18,6 +19,18 @@ export class MessageBusService {
|
||||
// In production, this would subscribe to RabbitMQ or Kafka
|
||||
logger.info(`Subscribing to queue: ${queue}`);
|
||||
}
|
||||
|
||||
/**
|
||||
* Build a handler for amqplib/kafkajs consumers: SolaceNet enforcement on `dbis.gateway.*`
|
||||
* queues (prefix overridable via `SOLACENET_GATEWAY_MQ_QUEUE_PREFIX`) when
|
||||
* `SOLACENET_GATEWAY_RAILS_ENFORCE` is on.
|
||||
*/
|
||||
buildGatewayAwareHandler(
|
||||
queue: string,
|
||||
handler: (message: any) => Promise<void>,
|
||||
): (message: unknown) => Promise<void> {
|
||||
return wrapGatewayMqMessageHandler(queue, handler);
|
||||
}
|
||||
}
|
||||
|
||||
export const messageBusService = new MessageBusService();
|
||||
|
||||
119
src/workers/gateway-outbox.worker.ts
Normal file
119
src/workers/gateway-outbox.worker.ts
Normal file
@@ -0,0 +1,119 @@
|
||||
import type { PrismaClient } from '@prisma/client';
|
||||
import { logger } from '@/infrastructure/monitoring/logger';
|
||||
import {
|
||||
createGatewayRailAdapter,
|
||||
isGatewayRailAdapterId,
|
||||
} from '@/core/gateway/adapters/gateway-adapter-registry';
|
||||
|
||||
export interface GatewayOutboxWorkerOptions {
|
||||
batchSize?: number;
|
||||
/** After this many send attempts, row stays FAILED (DLQ). */
|
||||
maxAttempts?: number;
|
||||
}
|
||||
|
||||
/**
|
||||
* Processes `gateway_outbox` rows (PENDING → adapter.send → SENT or PENDING retry / FAILED).
|
||||
* Uses compare-and-swap on `(id, status, sendAttempts)` to limit double-sends under multiple workers.
|
||||
*/
|
||||
export class GatewayOutboxWorker {
|
||||
private readonly batchSize: number;
|
||||
private readonly maxAttempts: number;
|
||||
|
||||
constructor(
|
||||
private readonly prisma: PrismaClient,
|
||||
opts: GatewayOutboxWorkerOptions = {},
|
||||
) {
|
||||
this.batchSize = opts.batchSize ?? parseInt(process.env.GATEWAY_OUTBOX_BATCH_SIZE || '25', 10);
|
||||
this.maxAttempts =
|
||||
opts.maxAttempts ?? parseInt(process.env.GATEWAY_OUTBOX_MAX_ATTEMPTS || '10', 10);
|
||||
}
|
||||
|
||||
async runOnce(): Promise<number> {
|
||||
const candidates = await this.prisma.gateway_outbox.findMany({
|
||||
where: {
|
||||
status: 'PENDING',
|
||||
sendAttempts: { lt: this.maxAttempts },
|
||||
},
|
||||
orderBy: { createdAt: 'asc' },
|
||||
take: this.batchSize,
|
||||
});
|
||||
|
||||
let processed = 0;
|
||||
for (const row of candidates) {
|
||||
const claimed = await this.prisma.gateway_outbox.updateMany({
|
||||
where: {
|
||||
id: row.id,
|
||||
status: 'PENDING',
|
||||
sendAttempts: row.sendAttempts,
|
||||
},
|
||||
data: {
|
||||
sendAttempts: row.sendAttempts + 1,
|
||||
lastAttemptAt: new Date(),
|
||||
},
|
||||
});
|
||||
if (claimed.count !== 1) {
|
||||
continue;
|
||||
}
|
||||
|
||||
const attemptsAfterClaim = row.sendAttempts + 1;
|
||||
try {
|
||||
await this.dispatchSend(row.id, row.txnId, row.adapterId, row.payloadHash, attemptsAfterClaim);
|
||||
processed += 1;
|
||||
} catch (err) {
|
||||
logger.error('GatewayOutboxWorker: dispatch error', {
|
||||
id: row.id,
|
||||
error: err instanceof Error ? err.message : String(err),
|
||||
});
|
||||
await this.prisma.gateway_outbox.update({
|
||||
where: { id: row.id },
|
||||
data: {
|
||||
status: attemptsAfterClaim >= this.maxAttempts ? 'FAILED' : 'PENDING',
|
||||
},
|
||||
});
|
||||
processed += 1;
|
||||
}
|
||||
}
|
||||
return processed;
|
||||
}
|
||||
|
||||
private async dispatchSend(
|
||||
id: string,
|
||||
txnId: string,
|
||||
adapterId: string,
|
||||
payloadHash: string,
|
||||
attemptsAfterClaim: number,
|
||||
): Promise<void> {
|
||||
if (!isGatewayRailAdapterId(adapterId)) {
|
||||
logger.warn('GatewayOutboxWorker: unknown adapter', { id, adapterId });
|
||||
await this.prisma.gateway_outbox.update({
|
||||
where: { id },
|
||||
data: { status: attemptsAfterClaim >= this.maxAttempts ? 'FAILED' : 'PENDING' },
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
const adapter = createGatewayRailAdapter(adapterId)!;
|
||||
await adapter.initialize({}, undefined);
|
||||
const result = await adapter.send({
|
||||
txnId,
|
||||
payloadHash,
|
||||
envelope: { source: 'gateway_outbox', scaffold: true },
|
||||
});
|
||||
|
||||
if (result.status === 'SENT') {
|
||||
await this.prisma.gateway_outbox.update({
|
||||
where: { id },
|
||||
data: { status: 'SENT', lastAttemptAt: new Date() },
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
await this.prisma.gateway_outbox.update({
|
||||
where: { id },
|
||||
data: {
|
||||
status: attemptsAfterClaim >= this.maxAttempts ? 'FAILED' : 'PENDING',
|
||||
lastAttemptAt: new Date(),
|
||||
},
|
||||
});
|
||||
}
|
||||
}
|
||||
40
src/workers/run-gateway-outbox-worker.ts
Normal file
40
src/workers/run-gateway-outbox-worker.ts
Normal file
@@ -0,0 +1,40 @@
|
||||
/**
|
||||
* Loop: claim gateway_outbox rows and invoke rail adapter `send`.
|
||||
* Run: npx ts-node -r tsconfig-paths/register src/workers/run-gateway-outbox-worker.ts
|
||||
* Env: DATABASE_URL, optional GATEWAY_OUTBOX_BATCH_SIZE, GATEWAY_OUTBOX_MAX_ATTEMPTS
|
||||
*/
|
||||
|
||||
import { PrismaClient } from '@prisma/client';
|
||||
import { GatewayOutboxWorker } from './gateway-outbox.worker';
|
||||
import { logger } from '@/infrastructure/monitoring/logger';
|
||||
|
||||
const prisma = new PrismaClient();
|
||||
|
||||
async function main() {
|
||||
logger.info('GatewayOutboxWorker: starting');
|
||||
const worker = new GatewayOutboxWorker(prisma);
|
||||
|
||||
let shutdown = false;
|
||||
const onShutdown = () => {
|
||||
shutdown = true;
|
||||
logger.info('GatewayOutboxWorker: shutdown signal');
|
||||
};
|
||||
process.on('SIGINT', onShutdown);
|
||||
process.on('SIGTERM', onShutdown);
|
||||
|
||||
while (!shutdown) {
|
||||
const n = await worker.runOnce();
|
||||
if (n === 0) {
|
||||
await new Promise((r) => setTimeout(r, 500));
|
||||
}
|
||||
}
|
||||
|
||||
await prisma.$disconnect();
|
||||
process.exit(0);
|
||||
}
|
||||
|
||||
main().catch(async (e) => {
|
||||
logger.error('GatewayOutboxWorker: fatal', { error: e instanceof Error ? e.message : e });
|
||||
await prisma.$disconnect().catch(() => undefined);
|
||||
process.exit(1);
|
||||
});
|
||||
Reference in New Issue
Block a user