Enhance mapping and orchestrator services with new features and improvements

- Updated `mapping-service` to include WEB3-ETH-IBAN support, health check endpoint, and improved error handling for account-wallet linking.
- Added new provider connection and status endpoints in `mapping-service`.
- Enhanced `orchestrator` service with health check, trigger management endpoints, and improved error handling for trigger validation and submission.
- Updated dependencies in `package.json` for both services, including `axios`, `uuid`, and type definitions.
- Improved packet service with additional validation and error handling for packet generation and dispatching.
- Introduced webhook service enhancements, including delivery retries, dead letter queue management, and webhook management endpoints.
This commit is contained in:
defiQUG
2025-12-12 13:53:30 -08:00
parent c32fcf48e8
commit d7379f108e
49 changed files with 3656 additions and 146 deletions

View File

@@ -0,0 +1,45 @@
# Orchestrator Service
ISO-20022 orchestrator service managing trigger state machine and rail adapters.
## Features
- ISO-20022 message routing and normalization
- Trigger state machine (CREATED → VALIDATED → SUBMITTED → PENDING → SETTLED/REJECTED)
- On-chain fund locking and release
- Rail adapter coordination (Fedwire, SWIFT, SEPA, RTGS)
- Event publishing
## State Machine
```
CREATED → VALIDATED → SUBMITTED_TO_RAIL → PENDING → SETTLED
REJECTED
```
## API Endpoints
- `GET /v1/orchestrator/triggers/:triggerId` - Get trigger
- `GET /v1/orchestrator/triggers` - List triggers
- `POST /v1/orchestrator/triggers/:triggerId/validate-and-lock` - Validate and lock
- `POST /v1/orchestrator/triggers/:triggerId/mark-submitted` - Mark submitted
- `POST /v1/orchestrator/triggers/:triggerId/confirm-settled` - Confirm settled
- `POST /v1/orchestrator/triggers/:triggerId/confirm-rejected` - Confirm rejected
- `POST /v1/iso/inbound` - Route inbound ISO-20022 message
- `POST /v1/iso/outbound` - Route outbound ISO-20022 message
## Rails
Supported payment rails:
- `fedwire` - Fedwire
- `swift` - SWIFT
- `sepa` - SEPA
- `rtgs` - RTGS
## Configuration
- `REST_API_URL` - Main REST API URL
- `RPC_URL` - Blockchain RPC URL
- `PRIVATE_KEY` - Signer private key

View File

@@ -12,12 +12,19 @@
"express": "^4.18.2",
"@grpc/grpc-js": "^1.9.14",
"@grpc/proto-loader": "^0.7.10",
"axios": "^1.6.2",
"xml2js": "^0.6.2",
"js-yaml": "^4.1.0",
"uuid": "^9.0.1",
"@emoney/blockchain": "workspace:*",
"@emoney/events": "workspace:*"
},
"devDependencies": {
"@types/express": "^4.17.21",
"@types/node": "^20.10.0",
"@types/xml2js": "^0.4.14",
"@types/js-yaml": "^4.0.9",
"@types/uuid": "^9.0.7",
"typescript": "^5.3.0",
"ts-node-dev": "^2.0.0"
}

View File

@@ -5,7 +5,6 @@
import express from 'express';
import { orchestratorRouter } from './routes/orchestrator';
import { triggerStateMachine } from './services/state-machine';
import { isoRouter } from './services/iso-router';
const app = express();
@@ -19,6 +18,11 @@ app.use('/v1/orchestrator', orchestratorRouter);
// ISO-20022 router
app.use('/v1/iso', isoRouter);
// Health check
app.get('/health', (req, res) => {
res.json({ status: 'ok', service: 'orchestrator' });
});
app.listen(PORT, () => {
console.log(`Orchestrator service listening on port ${PORT}`);
});

View File

@@ -4,9 +4,43 @@
import { Router, Request, Response } from 'express';
import { triggerStateMachine } from '../services/state-machine';
import { storage } from '../services/storage';
export const orchestratorRouter = Router();
orchestratorRouter.get('/triggers/:triggerId', async (req: Request, res: Response) => {
try {
const trigger = await storage.getTrigger(req.params.triggerId);
if (!trigger) {
return res.status(404).json({ error: 'Trigger not found' });
}
res.json(trigger);
} catch (error: any) {
res.status(404).json({ error: error.message });
}
});
orchestratorRouter.get('/triggers', async (req: Request, res: Response) => {
try {
const { state, rail, accountRef, walletRef, limit, offset } = req.query;
const result = await storage.listTriggers({
state: state as any,
rail: rail as string,
accountRef: accountRef as string,
walletRef: walletRef as string,
limit: limit ? parseInt(limit as string) : 20,
offset: offset ? parseInt(offset as string) : 0,
});
res.json(result);
} catch (error: any) {
res.status(500).json({ error: error.message });
}
});
orchestratorRouter.post('/triggers/:triggerId/validate-and-lock', async (req: Request, res: Response) => {
try {
const trigger = await triggerStateMachine.validateAndLock(req.params.triggerId);
@@ -19,6 +53,11 @@ orchestratorRouter.post('/triggers/:triggerId/validate-and-lock', async (req: Re
orchestratorRouter.post('/triggers/:triggerId/mark-submitted', async (req: Request, res: Response) => {
try {
const { railTxRef } = req.body;
if (!railTxRef) {
return res.status(400).json({ error: 'Missing required field: railTxRef' });
}
const trigger = await triggerStateMachine.markSubmitted(req.params.triggerId, railTxRef);
res.json(trigger);
} catch (error: any) {

View File

@@ -0,0 +1,78 @@
/**
* Blockchain integration for orchestrator
* Handles on-chain operations for trigger state machine
*/
import { blockchainClient } from '@emoney/blockchain';
export interface LockParams {
tokenAddress: string;
account: string;
amount: string;
}
export interface ReleaseParams {
tokenAddress: string;
account: string;
amount: string;
}
export const blockchainService = {
/**
* Lock funds on-chain for trigger
*/
async lockFunds(params: LockParams): Promise<{ txHash: string }> {
// In production, this would lock funds in a smart contract
// For now, we'll use a placeholder that validates the operation
// Validate token exists
const tokenInfo = await blockchainClient.getTokenInfo(params.tokenAddress);
if (!tokenInfo) {
throw new Error(`Token not found: ${params.tokenAddress}`);
}
// Check balance
const balance = await blockchainClient.getTokenBalance(params.tokenAddress, params.account);
if (BigInt(balance) < BigInt(params.amount)) {
throw new Error('Insufficient balance for lock');
}
// In production, call lock function on smart contract
// For now, return a placeholder transaction hash
return {
txHash: `0x${Date.now().toString(16)}`,
};
},
/**
* Release locked funds
*/
async releaseLocks(params: ReleaseParams): Promise<{ txHash: string }> {
// In production, release locks from smart contract
return {
txHash: `0x${Date.now().toString(16)}`,
};
},
/**
* Validate transfer can proceed
*/
async validateTransfer(
tokenAddress: string,
from: string,
to: string,
amount: string
): Promise<{ allowed: boolean; reasonCode?: string }> {
// Check balance
const balance = await blockchainClient.getTokenBalance(tokenAddress, from);
if (BigInt(balance) < BigInt(amount)) {
return { allowed: false, reasonCode: 'INSUFFICIENT_BALANCE' };
}
// Check compliance (via REST API)
// This would be done via httpClient in the state machine
return { allowed: true };
},
};

View File

@@ -0,0 +1,81 @@
/**
* HTTP client for calling main REST API
*/
import axios, { AxiosInstance } from 'axios';
const REST_API_URL = process.env.REST_API_URL || 'http://localhost:3000';
class HttpClient {
private client: AxiosInstance;
constructor() {
this.client = axios.create({
baseURL: REST_API_URL,
timeout: 10000,
headers: {
'Content-Type': 'application/json',
},
});
}
/**
* Get token data
*/
async getToken(code: string): Promise<any> {
try {
const response = await this.client.get(`/v1/tokens/${code}`);
return response.data;
} catch (error: any) {
if (error.response?.status === 404) {
throw new Error(`Token not found: ${code}`);
}
throw new Error(`Failed to fetch token: ${error.message}`);
}
}
/**
* Get account data
*/
async getAccount(accountRefId: string): Promise<any> {
try {
const response = await this.client.get(`/v1/compliance/accounts/${accountRefId}`);
return response.data;
} catch (error: any) {
if (error.response?.status === 404) {
throw new Error(`Account not found: ${accountRefId}`);
}
throw new Error(`Failed to fetch account: ${error.message}`);
}
}
/**
* Get compliance profile
*/
async getComplianceProfile(refId: string): Promise<any> {
try {
const response = await this.client.get(`/v1/compliance/accounts/${refId}`);
return response.data;
} catch (error: any) {
if (error.response?.status === 404) {
return null;
}
throw new Error(`Failed to fetch compliance profile: ${error.message}`);
}
}
/**
* Get encumbrance for account
*/
async getEncumbrance(accountRefId: string): Promise<{ encumbrance: string; hasActiveLien: boolean }> {
try {
const response = await this.client.get(`/v1/liens/accounts/${accountRefId}/encumbrance`);
return response.data;
} catch (error: any) {
throw new Error(`Failed to fetch encumbrance: ${error.message}`);
}
}
}
export const httpClient = new HttpClient();

View File

@@ -7,35 +7,125 @@ import { Router } from 'express';
import { readFileSync } from 'fs';
import { join } from 'path';
import * as yaml from 'js-yaml';
import { parseString } from 'xml2js';
import { v4 as uuidv4 } from 'uuid';
import { createHash } from 'crypto';
import { storage, Trigger } from './storage';
import { httpClient } from './http-client';
import { eventBusClient } from '@emoney/events';
import { TriggerState } from './state-machine';
import { promisify } from 'util';
const parseXMLAsync = promisify(parseString);
// Load ISO-20022 mappings
const mappingsPath = join(__dirname, '../../../packages/schemas/iso20022-mapping/message-mappings.yaml');
const mappings = yaml.load(readFileSync(mappingsPath, 'utf-8')) as any;
let mappings: any = {};
try {
mappings = yaml.load(readFileSync(mappingsPath, 'utf-8')) as any;
} catch (error) {
console.warn('ISO-20022 mappings file not found, using defaults');
mappings = { mappings: {} };
}
export const isoRouter = Router();
/**
* Parse XML payload and extract fields
*/
async function parseXML(xml: string): Promise<any> {
try {
return await parseXMLAsync(xml, { explicitArray: false });
} catch (error: any) {
throw new Error(`Failed to parse XML: ${error.message}`);
}
}
export const isoRouterService = {
/**
* Normalize ISO-20022 message to canonical format
*/
async normalizeMessage(msgType: string, payload: string, rail: string): Promise<any> {
const mapping = mappings.mappings[msgType];
const mapping = mappings.mappings?.[msgType];
if (!mapping) {
throw new Error(`Unknown message type: ${msgType}`);
}
// TODO: Parse XML payload and extract fields according to mapping
// TODO: Create canonical message
throw new Error('Not implemented');
// Parse XML payload
const xmlData = await parseXML(payload);
// Extract fields according to mapping
const canonicalMessage: any = {
msgType,
instructionId: '',
payloadHash: '',
refs: {},
amount: '',
token: '',
};
// Extract instruction ID (varies by message type)
if (msgType === 'pacs.008') {
// Credit Transfer
canonicalMessage.instructionId = xmlData.Document?.CstmrCdtTrfInitn?.GrpHdr?.MsgId || uuidv4();
canonicalMessage.amount = xmlData.Document?.CstmrCdtTrfInitn?.CdtTrfTxInf?.IntrBkSttlmAmt?._ || '';
canonicalMessage.refs.accountRef = xmlData.Document?.CstmrCdtTrfInitn?.CdtTrfTxInf?.Dbtr?.Nm || '';
} else if (msgType === 'pain.001') {
// Payment Initiation
canonicalMessage.instructionId = xmlData.Document?.CstmrCdtTrfInitn?.GrpHdr?.MsgId || uuidv4();
canonicalMessage.amount = xmlData.Document?.CstmrCdtTrfInitn?.PmtInf?.CdtTrfTxInf?.InstdAmt?._ || '';
} else {
// Generic extraction
canonicalMessage.instructionId = xmlData.Document?.GrpHdr?.MsgId || uuidv4();
}
// Generate payload hash
canonicalMessage.payloadHash = createHash('sha256')
.update(payload)
.digest('hex');
return canonicalMessage;
},
/**
* Create trigger from canonical message
*/
async createTrigger(canonicalMessage: any, rail: string): Promise<string> {
// TODO: Create trigger in database/state
// TODO: Publish trigger.created event
throw new Error('Not implemented');
const triggerId = `trigger_${uuidv4()}`;
const trigger: Trigger = {
triggerId,
state: TriggerState.CREATED,
rail,
msgType: canonicalMessage.msgType,
instructionId: canonicalMessage.instructionId,
payloadHash: canonicalMessage.payloadHash,
amount: canonicalMessage.amount || '0',
token: canonicalMessage.token || '',
refs: canonicalMessage.refs || {},
canonicalMessage,
createdAt: Date.now(),
updatedAt: Date.now(),
};
// Save trigger
await storage.saveTrigger(trigger);
// Publish trigger.created event
await eventBusClient.publish('triggers.created', {
eventId: uuidv4(),
eventType: 'triggers.created',
occurredAt: new Date().toISOString(),
correlationId: triggerId,
payload: {
triggerId,
instructionId: trigger.instructionId,
rail,
msgType: trigger.msgType,
},
});
return triggerId;
},
/**
@@ -52,9 +142,52 @@ export const isoRouterService = {
*/
async routeOutbound(msgType: string, payload: string, rail: string, config: any): Promise<string> {
const canonicalMessage = await this.normalizeMessage(msgType, payload, rail);
// TODO: Additional validation for outbound
// Additional validation for outbound
if (!canonicalMessage.instructionId) {
throw new Error('Missing instruction ID in outbound message');
}
if (!canonicalMessage.amount || canonicalMessage.amount === '0') {
throw new Error('Invalid amount in outbound message');
}
const triggerId = await this.createTrigger(canonicalMessage, rail);
return triggerId;
},
};
// ISO-20022 routes
isoRouter.post('/inbound', async (req, res) => {
try {
const { msgType, payload, rail } = req.body;
if (!msgType || !payload || !rail) {
return res.status(400).json({
error: 'Missing required fields: msgType, payload, rail'
});
}
const triggerId = await isoRouterService.routeInbound(msgType, payload, rail);
res.status(201).json({ triggerId });
} catch (error: any) {
res.status(400).json({ error: error.message });
}
});
isoRouter.post('/outbound', async (req, res) => {
try {
const { msgType, payload, rail, config } = req.body;
if (!msgType || !payload || !rail) {
return res.status(400).json({
error: 'Missing required fields: msgType, payload, rail'
});
}
const triggerId = await isoRouterService.routeOutbound(msgType, payload, rail, config);
res.status(201).json({ triggerId });
} catch (error: any) {
res.status(400).json({ error: error.message });
}
});

View File

@@ -0,0 +1,37 @@
/**
* Fedwire adapter implementation
*/
import { RailAdapter } from './rail-adapter';
export class FedwireAdapter implements RailAdapter {
async submitPayment(params: {
instructionId: string;
amount: string;
currency: string;
fromAccount: string;
toAccount: string;
metadata?: Record<string, any>;
}): Promise<{ txRef: string; status: string }> {
// In production, integrate with Fedwire API
// For now, return mock response
const txRef = `fedwire_${Date.now()}`;
return {
txRef,
status: 'submitted',
};
}
async getPaymentStatus(txRef: string): Promise<{ status: string; settled?: boolean }> {
// In production, query Fedwire API
return {
status: 'pending',
settled: false,
};
}
async cancelPayment(txRef: string): Promise<void> {
// In production, cancel via Fedwire API
}
}

View File

@@ -0,0 +1,28 @@
/**
* Base rail adapter interface
*/
export interface RailAdapter {
/**
* Submit payment to rail
*/
submitPayment(params: {
instructionId: string;
amount: string;
currency: string;
fromAccount: string;
toAccount: string;
metadata?: Record<string, any>;
}): Promise<{ txRef: string; status: string }>;
/**
* Get payment status
*/
getPaymentStatus(txRef: string): Promise<{ status: string; settled?: boolean }>;
/**
* Cancel payment
*/
cancelPayment(txRef: string): Promise<void>;
}

View File

@@ -0,0 +1,40 @@
/**
* Rail adapter registry
*/
import { RailAdapter } from './rail-adapter';
import { FedwireAdapter } from './fedwire-adapter';
import { SwiftAdapter } from './swift-adapter';
import { SepaAdapter } from './sepa-adapter';
import { RTGSAdapter } from './rtgs-adapter';
export class RailRegistry {
private adapters = new Map<string, RailAdapter>();
constructor() {
// Register default rail adapters
this.register('fedwire', new FedwireAdapter());
this.register('swift', new SwiftAdapter());
this.register('sepa', new SepaAdapter());
this.register('rtgs', new RTGSAdapter());
}
register(name: string, adapter: RailAdapter): void {
this.adapters.set(name.toLowerCase(), adapter);
}
get(name: string): RailAdapter {
const adapter = this.adapters.get(name.toLowerCase());
if (!adapter) {
throw new Error(`Rail adapter not found: ${name}`);
}
return adapter;
}
listRails(): string[] {
return Array.from(this.adapters.keys());
}
}
export const railRegistry = new RailRegistry();

View File

@@ -0,0 +1,35 @@
/**
* RTGS adapter implementation
*/
import { RailAdapter } from './rail-adapter';
export class RTGSAdapter implements RailAdapter {
async submitPayment(params: {
instructionId: string;
amount: string;
currency: string;
fromAccount: string;
toAccount: string;
metadata?: Record<string, any>;
}): Promise<{ txRef: string; status: string }> {
// In production, integrate with RTGS API
const txRef = `rtgs_${Date.now()}`;
return {
txRef,
status: 'submitted',
};
}
async getPaymentStatus(txRef: string): Promise<{ status: string; settled?: boolean }> {
return {
status: 'pending',
settled: false,
};
}
async cancelPayment(txRef: string): Promise<void> {
// In production, cancel via RTGS API
}
}

View File

@@ -0,0 +1,35 @@
/**
* SEPA adapter implementation
*/
import { RailAdapter } from './rail-adapter';
export class SepaAdapter implements RailAdapter {
async submitPayment(params: {
instructionId: string;
amount: string;
currency: string;
fromAccount: string;
toAccount: string;
metadata?: Record<string, any>;
}): Promise<{ txRef: string; status: string }> {
// In production, integrate with SEPA API
const txRef = `sepa_${Date.now()}`;
return {
txRef,
status: 'submitted',
};
}
async getPaymentStatus(txRef: string): Promise<{ status: string; settled?: boolean }> {
return {
status: 'pending',
settled: false,
};
}
async cancelPayment(txRef: string): Promise<void> {
// In production, cancel via SEPA API
}
}

View File

@@ -0,0 +1,35 @@
/**
* SWIFT adapter implementation
*/
import { RailAdapter } from './rail-adapter';
export class SwiftAdapter implements RailAdapter {
async submitPayment(params: {
instructionId: string;
amount: string;
currency: string;
fromAccount: string;
toAccount: string;
metadata?: Record<string, any>;
}): Promise<{ txRef: string; status: string }> {
// In production, integrate with SWIFT API
const txRef = `swift_${Date.now()}`;
return {
txRef,
status: 'submitted',
};
}
async getPaymentStatus(txRef: string): Promise<{ status: string; settled?: boolean }> {
return {
status: 'pending',
settled: false,
};
}
async cancelPayment(txRef: string): Promise<void> {
// In production, cancel via SWIFT API
}
}

View File

@@ -3,6 +3,12 @@
* Manages trigger lifecycle: CREATED -> VALIDATED -> SUBMITTED -> PENDING -> SETTLED/REJECTED
*/
import { v4 as uuidv4 } from 'uuid';
import { storage, Trigger } from './storage';
import { blockchainService } from './blockchain';
import { httpClient } from './http-client';
import { eventBusClient } from '@emoney/events';
export enum TriggerState {
CREATED = 'CREATED',
VALIDATED = 'VALIDATED',
@@ -14,50 +20,217 @@ export enum TriggerState {
RECALLED = 'RECALLED',
}
export interface Trigger {
triggerId: string;
state: TriggerState;
rail: string;
msgType: string;
instructionId: string;
// ... other fields
}
export const triggerStateMachine = {
/**
* Validate and lock trigger
*/
async validateAndLock(triggerId: string): Promise<Trigger> {
// TODO: Validate trigger, lock funds on-chain
// Transition: CREATED -> VALIDATED
throw new Error('Not implemented');
const trigger = await storage.getTrigger(triggerId);
if (!trigger) {
throw new Error(`Trigger not found: ${triggerId}`);
}
if (trigger.state !== TriggerState.CREATED) {
throw new Error(`Invalid state transition: ${trigger.state} -> VALIDATED`);
}
// Validate trigger data
if (!trigger.token || !trigger.amount) {
throw new Error('Missing required trigger data');
}
// Get token address
const token = await httpClient.getToken(trigger.token);
if (!token) {
throw new Error(`Token not found: ${trigger.token}`);
}
// Check compliance
if (trigger.refs?.accountRef) {
const compliance = await httpClient.getComplianceProfile(trigger.refs.accountRef);
if (!compliance || !compliance.allowed) {
throw new Error('Account not compliant');
}
if (compliance.frozen) {
throw new Error('Account is frozen');
}
}
// Check encumbrance
if (trigger.refs?.accountRef) {
const encumbrance = await httpClient.getEncumbrance(trigger.refs.accountRef);
const freeBalance = BigInt(trigger.amount) - BigInt(encumbrance.encumbrance || '0');
if (freeBalance < BigInt(trigger.amount)) {
throw new Error('Insufficient free balance due to encumbrance');
}
}
// Lock funds on-chain
const lockResult = await blockchainService.lockFunds({
tokenAddress: token.address,
account: trigger.refs?.accountRef || '',
amount: trigger.amount,
});
// Update trigger state
const updated = await storage.updateTrigger(triggerId, {
state: TriggerState.VALIDATED,
lockedAmount: trigger.amount,
});
// Publish trigger.state.updated event
await eventBusClient.publish('triggers.state.updated', {
eventId: uuidv4(),
eventType: 'triggers.state.updated',
occurredAt: new Date().toISOString(),
correlationId: triggerId,
payload: {
triggerId,
state: TriggerState.VALIDATED,
instructionId: trigger.instructionId,
},
});
return updated;
},
/**
* Mark trigger as submitted to rail
*/
async markSubmitted(triggerId: string, railTxRef: string): Promise<Trigger> {
// TODO: Update trigger with rail transaction reference
// Transition: VALIDATED -> SUBMITTED_TO_RAIL -> PENDING
throw new Error('Not implemented');
const trigger = await storage.getTrigger(triggerId);
if (!trigger) {
throw new Error(`Trigger not found: ${triggerId}`);
}
if (trigger.state !== TriggerState.VALIDATED) {
throw new Error(`Invalid state transition: ${trigger.state} -> SUBMITTED_TO_RAIL`);
}
// Update trigger with rail transaction reference
const updated = await storage.updateTrigger(triggerId, {
state: TriggerState.SUBMITTED_TO_RAIL,
railTxRef,
});
// Transition to PENDING after submission
const pending = await storage.updateTrigger(triggerId, {
state: TriggerState.PENDING,
});
// Publish trigger.state.updated event
await eventBusClient.publish('triggers.state.updated', {
eventId: uuidv4(),
eventType: 'triggers.state.updated',
occurredAt: new Date().toISOString(),
correlationId: triggerId,
payload: {
triggerId,
state: TriggerState.PENDING,
railTxRef,
instructionId: trigger.instructionId,
},
});
return pending;
},
/**
* Confirm trigger settled
*/
async confirmSettled(triggerId: string): Promise<Trigger> {
// TODO: Finalize on-chain, release locks if needed
// Transition: PENDING -> SETTLED
throw new Error('Not implemented');
const trigger = await storage.getTrigger(triggerId);
if (!trigger) {
throw new Error(`Trigger not found: ${triggerId}`);
}
if (trigger.state !== TriggerState.PENDING) {
throw new Error(`Invalid state transition: ${trigger.state} -> SETTLED`);
}
// Get token address
const token = await httpClient.getToken(trigger.token);
// Release locks (if any)
if (trigger.lockedAmount) {
await blockchainService.releaseLocks({
tokenAddress: token.address,
account: trigger.refs?.accountRef || '',
amount: trigger.lockedAmount,
});
}
// Update trigger state
const updated = await storage.updateTrigger(triggerId, {
state: TriggerState.SETTLED,
});
// Publish trigger.state.updated event
await eventBusClient.publish('triggers.state.updated', {
eventId: uuidv4(),
eventType: 'triggers.state.updated',
occurredAt: new Date().toISOString(),
correlationId: triggerId,
payload: {
triggerId,
state: TriggerState.SETTLED,
instructionId: trigger.instructionId,
},
});
return updated;
},
/**
* Confirm trigger rejected
*/
async confirmRejected(triggerId: string, reason?: string): Promise<Trigger> {
// TODO: Release locks, handle rejection
// Transition: PENDING -> REJECTED
throw new Error('Not implemented');
const trigger = await storage.getTrigger(triggerId);
if (!trigger) {
throw new Error(`Trigger not found: ${triggerId}`);
}
if (trigger.state !== TriggerState.PENDING) {
throw new Error(`Invalid state transition: ${trigger.state} -> REJECTED`);
}
// Get token address
const token = await httpClient.getToken(trigger.token);
// Release locks
if (trigger.lockedAmount) {
await blockchainService.releaseLocks({
tokenAddress: token.address,
account: trigger.refs?.accountRef || '',
amount: trigger.lockedAmount,
});
}
// Update trigger state
const updated = await storage.updateTrigger(triggerId, {
state: TriggerState.REJECTED,
rejectionReason: reason,
});
// Publish trigger.state.updated event
await eventBusClient.publish('triggers.state.updated', {
eventId: uuidv4(),
eventType: 'triggers.state.updated',
occurredAt: new Date().toISOString(),
correlationId: triggerId,
payload: {
triggerId,
state: TriggerState.REJECTED,
reason,
instructionId: trigger.instructionId,
},
});
return updated;
},
/**
@@ -78,4 +251,3 @@ export const triggerStateMachine = {
return validTransitions[from]?.includes(to) ?? false;
},
};

View File

@@ -0,0 +1,108 @@
/**
* Storage layer for triggers
* In-memory implementation with database-ready interface
*/
import { TriggerState } from './state-machine';
export interface Trigger {
triggerId: string;
state: TriggerState;
rail: string;
msgType: string;
instructionId: string;
payloadHash: string;
amount: string;
token: string;
refs: {
accountRef?: string;
walletRef?: string;
};
canonicalMessage?: any;
railTxRef?: string;
rejectionReason?: string;
lockedAmount?: string;
createdAt: number;
updatedAt: number;
}
export interface StorageAdapter {
saveTrigger(trigger: Trigger): Promise<void>;
getTrigger(triggerId: string): Promise<Trigger | null>;
updateTrigger(triggerId: string, updates: Partial<Trigger>): Promise<Trigger>;
listTriggers(filters: {
state?: TriggerState;
rail?: string;
accountRef?: string;
walletRef?: string;
limit?: number;
offset?: number;
}): Promise<{ triggers: Trigger[]; total: number }>;
}
/**
* In-memory storage implementation
*/
class InMemoryStorage implements StorageAdapter {
private triggers = new Map<string, Trigger>();
async saveTrigger(trigger: Trigger): Promise<void> {
this.triggers.set(trigger.triggerId, { ...trigger });
}
async getTrigger(triggerId: string): Promise<Trigger | null> {
return this.triggers.get(triggerId) || null;
}
async updateTrigger(triggerId: string, updates: Partial<Trigger>): Promise<Trigger> {
const trigger = this.triggers.get(triggerId);
if (!trigger) {
throw new Error(`Trigger not found: ${triggerId}`);
}
const updated = {
...trigger,
...updates,
updatedAt: Date.now(),
};
this.triggers.set(triggerId, updated);
return updated;
}
async listTriggers(filters: {
state?: TriggerState;
rail?: string;
accountRef?: string;
walletRef?: string;
limit?: number;
offset?: number;
}): Promise<{ triggers: Trigger[]; total: number }> {
let triggers = Array.from(this.triggers.values());
if (filters.state) {
triggers = triggers.filter(t => t.state === filters.state);
}
if (filters.rail) {
triggers = triggers.filter(t => t.rail === filters.rail);
}
if (filters.accountRef) {
triggers = triggers.filter(t => t.refs?.accountRef === filters.accountRef);
}
if (filters.walletRef) {
triggers = triggers.filter(t => t.refs?.walletRef === filters.walletRef);
}
const total = triggers.length;
const offset = filters.offset || 0;
const limit = filters.limit || 20;
triggers.sort((a, b) => b.createdAt - a.createdAt);
triggers = triggers.slice(offset, offset + limit);
return { triggers, total };
}
}
export const storage: StorageAdapter = new InMemoryStorage();