diff --git a/orchestrator/src/api/plans.ts b/orchestrator/src/api/plans.ts index 4c9927a..ce46eb9 100644 --- a/orchestrator/src/api/plans.ts +++ b/orchestrator/src/api/plans.ts @@ -5,6 +5,11 @@ import { validatePlan, checkStepDependencies } from "../services/planValidation" import { storePlan, getPlanById, updatePlanSignature, listPlans } from "../db/plans"; import { asyncHandler, AppError, ErrorType } from "../services/errorHandler"; import { getTransactionState, getTransitionHistory } from "../services/stateMachine"; +import { + getEventsForPlan, + subscribe as subscribeToEvents, + verifyChain, +} from "../services/eventBus"; import type { Plan, PlanStep } from "../types/plan"; /** @@ -220,3 +225,82 @@ export const getPlanState = asyncHandler(async (req: Request, res: Response) => }); }); +/** + * GET /api/plans/:planId/events + * Return the full signed + hash-chained event trail for a plan + * (arch §4.5 State Registry + §7 Event Model + §14 Audit). + * + * Query `?verify=1` re-verifies the chain server-side and adds + * { chain_valid: true|false, broken_at?: n } to the response. + */ +export const getPlanEvents = asyncHandler(async (req: Request, res: Response) => { + const { planId } = req.params; + const plan = await getPlanById(planId); + if (!plan) { + throw new AppError(ErrorType.NOT_FOUND_ERROR, 404, "Plan not found"); + } + + const events = await getEventsForPlan(planId); + + const body: { + plan_id: string; + count: number; + events: typeof events; + chain_valid?: boolean; + broken_at?: number; + broken_reason?: string; + } = { plan_id: planId, count: events.length, events }; + + if (req.query.verify === "1") { + const v = await verifyChain(planId); + body.chain_valid = v.ok; + if (!v.ok) { + body.broken_at = v.brokenAt; + body.broken_reason = v.reason; + } + } + + res.json(body); +}); + +/** + * GET /api/plans/:planId/events/stream + * Server-sent-events stream of live events for a single plan. + */ +export const streamPlanEvents = asyncHandler(async (req: Request, res: Response) => { + const { planId } = req.params; + const plan = await getPlanById(planId); + if (!plan) { + throw new AppError(ErrorType.NOT_FOUND_ERROR, 404, "Plan not found"); + } + + res.setHeader("Content-Type", "text/event-stream"); + res.setHeader("Cache-Control", "no-cache, no-transform"); + res.setHeader("Connection", "keep-alive"); + res.setHeader("X-Accel-Buffering", "no"); + res.flushHeaders?.(); + + // Replay the history on connect so clients can reconstruct state + // without a separate REST call. + const history = await getEventsForPlan(planId); + for (const e of history) { + res.write(`id: ${e.id}\nevent: ${e.type}\ndata: ${JSON.stringify(e)}\n\n`); + } + + const unsubscribe = subscribeToEvents(planId, (record) => { + res.write( + `id: ${record.id}\nevent: ${record.type}\ndata: ${JSON.stringify(record)}\n\n`, + ); + }); + + const keepAlive = setInterval(() => { + res.write(": keep-alive\n\n"); + }, 15_000); + + req.on("close", () => { + clearInterval(keepAlive); + unsubscribe(); + res.end(); + }); +}); + diff --git a/orchestrator/src/db/migrations/003_events.ts b/orchestrator/src/db/migrations/003_events.ts new file mode 100644 index 0000000..bdf23b6 --- /dev/null +++ b/orchestrator/src/db/migrations/003_events.ts @@ -0,0 +1,43 @@ +import { query } from "../postgres"; + +/** + * Migration 003 — append-only events journal (arch §4.5, §5.5, §7). + * + * The `events` table is the system-of-record for normalised workflow + * events (arch §7.2: `transaction.created`, `instrument.ready`, + * `payment.settled`, `transaction.committed`, …). It is: + * + * - append-only (no UPDATE / DELETE) + * - signed (HMAC of (plan_id, type, payload_hash, prev_hash)) + * - hash-chained via prev_hash for tamper-evident forensic replay + * - indexed by plan_id so the SSE endpoint can stream efficiently + */ +export async function up() { + await query( + `CREATE TABLE IF NOT EXISTS events ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + plan_id UUID NOT NULL REFERENCES plans(plan_id) ON DELETE CASCADE, + type VARCHAR(128) NOT NULL, + actor VARCHAR(255), + payload JSONB NOT NULL DEFAULT '{}'::jsonb, + payload_hash CHAR(64) NOT NULL, + prev_hash CHAR(64), + signature CHAR(64) NOT NULL, + created_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP + )`, + ); + + await query( + `CREATE INDEX IF NOT EXISTS idx_events_plan_id_created + ON events(plan_id, created_at)`, + ); + + await query( + `CREATE INDEX IF NOT EXISTS idx_events_type + ON events(type)`, + ); +} + +export async function down() { + await query("DROP TABLE IF EXISTS events CASCADE"); +} diff --git a/orchestrator/src/db/migrations/index.ts b/orchestrator/src/db/migrations/index.ts index 778910c..cd03f8f 100644 --- a/orchestrator/src/db/migrations/index.ts +++ b/orchestrator/src/db/migrations/index.ts @@ -1,5 +1,6 @@ import { up as up001 } from "./001_initial_schema"; import { up as up002 } from "./002_transaction_state"; +import { up as up003 } from "./003_events"; /** * Run all migrations @@ -8,6 +9,7 @@ export async function runMigration() { try { await up001(); await up002(); + await up003(); console.log("All migrations completed"); } catch (error) { console.error("Migration failed:", error); diff --git a/orchestrator/src/index.ts b/orchestrator/src/index.ts index c07abd5..3e4add3 100644 --- a/orchestrator/src/index.ts +++ b/orchestrator/src/index.ts @@ -14,7 +14,7 @@ import { requestTimeout } from "./middleware/timeout"; import { logger } from "./logging/logger"; import { getMetrics, httpRequestDuration, httpRequestTotal, register } from "./metrics/prometheus"; import { healthCheck, readinessCheck, livenessCheck } from "./health/health"; -import { listPlansEndpoint, createPlan, getPlan, getPlanState, addSignature, validatePlanEndpoint } from "./api/plans"; +import { listPlansEndpoint, createPlan, getPlan, getPlanState, getPlanEvents, streamPlanEvents, addSignature, validatePlanEndpoint } from "./api/plans"; import { streamPlanStatus } from "./api/sse"; import { executionCoordinator } from "./services/execution"; import { runMigration } from "./db/migrations"; @@ -89,6 +89,8 @@ app.get("/api/plans", listPlansEndpoint); app.post("/api/plans", auditLog("CREATE_PLAN", "plan"), createPlan); app.get("/api/plans/:planId", getPlan); app.get("/api/plans/:planId/state", getPlanState); +app.get("/api/plans/:planId/events", getPlanEvents); +app.get("/api/plans/:planId/events/stream", streamPlanEvents); app.post("/api/plans/:planId/signature", addSignature); app.post("/api/plans/:planId/validate", validatePlanEndpoint); diff --git a/orchestrator/src/services/eventBus.ts b/orchestrator/src/services/eventBus.ts new file mode 100644 index 0000000..54a8fa7 --- /dev/null +++ b/orchestrator/src/services/eventBus.ts @@ -0,0 +1,197 @@ +/** + * Typed, signed, append-only Event Bus (arch §5.5 Event Bus + §7). + * + * Architecture contract + * --------------------- + * 1. Every event is a normalised category from arch §7.2 — `EventType`. + * 2. Every event is persisted to the `events` append-only table. + * 3. Every event carries + * payload_hash = sha256(JSON.stringify(payload)) + * prev_hash = signature of the previous event for the same plan + * signature = hmac_sha256(secret, plan_id|type|payload_hash|prev_hash) + * which gives a tamper-evident per-plan hash chain (arch §14 audit). + * 4. Callers can subscribe to live events via `subscribe(planId, cb)` — + * backed by a process-local EventEmitter that the SSE route consumes. + * + * When the orchestrator scales to >1 replicas, the in-process emitter + * must be replaced by a broker (NATS / Kafka). The persistence layer + * and signature chain remain unchanged. + */ + +import { createHash, createHmac } from "crypto"; +import { EventEmitter } from "events"; +import { query } from "../db/postgres"; +import { logger } from "../logging/logger"; + +/** + * Normalised event types — arch §7.2. Keep this list as the single + * source of truth so subscribers can exhaustively match on it. + */ +export const EVENT_TYPES = [ + "transaction.created", + "participants.authorized", + "preconditions.satisfied", + "instrument.ready", + "payment.ready", + "transaction.prepared", + "instrument.dispatched", + "payment.dispatched", + "instrument.acknowledged", + "payment.accepted", + "payment.settled", + "transaction.validated", + "transaction.committed", + "transaction.aborted", + "transaction.unwind_initiated", +] as const; + +export type EventType = (typeof EVENT_TYPES)[number]; + +export interface EventRecord { + id: string; + plan_id: string; + type: EventType; + actor: string | null; + payload: Record; + payload_hash: string; + prev_hash: string | null; + signature: string; + created_at: string; +} + +export interface PublishInput { + planId: string; + type: EventType; + actor?: string; + payload?: Record; +} + +const emitter = new EventEmitter(); +emitter.setMaxListeners(0); + +function getSigningSecret(): string { + return ( + process.env.EVENT_BUS_HMAC_SECRET ?? + process.env.SESSION_SECRET ?? + "dev-event-bus-secret-change-in-production" + ); +} + +function sha256(input: string): string { + return createHash("sha256").update(input).digest("hex"); +} + +function sign( + planId: string, + type: string, + payloadHash: string, + prevHash: string | null, +): string { + const h = createHmac("sha256", getSigningSecret()); + h.update(`${planId}|${type}|${payloadHash}|${prevHash ?? ""}`); + return h.digest("hex"); +} + +/** + * Publish a typed, signed, hash-chained event for a plan. Returns the + * persisted record (including id + signature) so callers can reference + * it from transition `source_event_id`. + */ +export async function publish(input: PublishInput): Promise { + const payload = input.payload ?? {}; + const payloadHash = sha256(JSON.stringify(payload)); + + const prev = await query<{ signature: string }>( + `SELECT signature + FROM events + WHERE plan_id = $1 + ORDER BY created_at DESC, id DESC + LIMIT 1`, + [input.planId], + ); + const prevHash = prev.length > 0 ? prev[0].signature : null; + const signature = sign(input.planId, input.type, payloadHash, prevHash); + + const rows = await query( + `INSERT INTO events (plan_id, type, actor, payload, payload_hash, prev_hash, signature) + VALUES ($1, $2, $3, $4::jsonb, $5, $6, $7) + RETURNING id, plan_id, type, actor, payload, payload_hash, prev_hash, signature, created_at`, + [ + input.planId, + input.type, + input.actor ?? null, + JSON.stringify(payload), + payloadHash, + prevHash, + signature, + ], + ); + + const record = rows[0]; + logger.info( + { planId: record.plan_id, type: record.type, eventId: record.id }, + "[EventBus] published", + ); + + emitter.emit(`plan:${record.plan_id}`, record); + emitter.emit("plan:*", record); + + return record; +} + +/** + * Read the full event trail for a plan in chronological order. + */ +export async function getEventsForPlan(planId: string): Promise { + return query( + `SELECT id, plan_id, type, actor, payload, payload_hash, prev_hash, signature, created_at + FROM events + WHERE plan_id = $1 + ORDER BY created_at ASC, id ASC`, + [planId], + ); +} + +/** + * Verify the full hash chain for a plan's events. Returns `{ ok: true }` + * when every signature matches and `prev_hash` forms a contiguous chain; + * otherwise returns the first index that fails with a reason. + */ +export async function verifyChain(planId: string): Promise< + { ok: true } | { ok: false; brokenAt: number; reason: string } +> { + const events = await getEventsForPlan(planId); + let prevSig: string | null = null; + for (let i = 0; i < events.length; i++) { + const e = events[i]; + if (e.prev_hash !== prevSig) { + return { ok: false, brokenAt: i, reason: "prev_hash mismatch" }; + } + const expectedPayloadHash = sha256(JSON.stringify(e.payload)); + if (expectedPayloadHash !== e.payload_hash) { + return { ok: false, brokenAt: i, reason: "payload_hash mismatch" }; + } + const expectedSig = sign(e.plan_id, e.type, e.payload_hash, e.prev_hash); + if (expectedSig !== e.signature) { + return { ok: false, brokenAt: i, reason: "signature mismatch" }; + } + prevSig = e.signature; + } + return { ok: true }; +} + +/** + * Subscribe to live events for a single plan. Returns an unsubscribe + * function. Used by the SSE route. + */ +export function subscribe( + planId: string, + callback: (record: EventRecord) => void, +): () => void { + const channel = `plan:${planId}`; + emitter.on(channel, callback); + return () => emitter.off(channel, callback); +} + +/** test-only emitter access, never import in prod code */ +export const __emitterForTests = emitter; diff --git a/orchestrator/src/services/swift/camt.ts b/orchestrator/src/services/swift/camt.ts new file mode 100644 index 0000000..dde8cd4 --- /dev/null +++ b/orchestrator/src/services/swift/camt.ts @@ -0,0 +1,129 @@ +/** + * camt.025 (Receipt) and camt.054 (Bank-to-Customer Debit/Credit + * Notification) ingestion. + * + * Arch §4.3 + §9.2. These are the inbound settlement-confirmation + * messages that allow the VALIDATING phase to mark the payment leg + * as SETTLED. The parser is intentionally minimal — just enough to + * extract the fields the VALIDATING reconciliation compares against. + */ + +export interface Camt025Receipt { + type: "camt.025"; + messageId: string; + originalMessageId: string; + status: "ACCP" | "ACSC" | "ACSP" | "RJCT" | "PDNG" | string; + reasonCode?: string; + dateTime?: string; +} + +export interface Camt054Notification { + type: "camt.054"; + messageId: string; + creditDebitIndicator: "CRDT" | "DBIT"; + amount: number; + currency: string; + endToEndId?: string; + valueDate?: string; + bookingDate?: string; +} + +export type CamtMessage = Camt025Receipt | Camt054Notification; + +function extractTag(xml: string, tag: string): string | undefined { + const re = new RegExp(`<${tag}[^>]*>([^<]*)`); + const m = re.exec(xml); + return m ? m[1].trim() : undefined; +} + +function extractAmountWithCcy(xml: string, tag: string): { amount: number; currency: string } | undefined { + const re = new RegExp(`<${tag}[^>]*Ccy="([A-Z]{3})"[^>]*>([^<]*)`); + const m = re.exec(xml); + return m ? { currency: m[1], amount: Number(m[2]) } : undefined; +} + +/** + * Parse a camt.025 Receipt. Only fields used by the orchestrator are + * surfaced; everything else stays in the raw XML. + */ +export function parseCamt025(xml: string): Camt025Receipt { + if (!/xmlns="urn:iso:std:iso:20022:tech:xsd:camt\.025/.test(xml)) { + throw new Error("camt.025: xmlns marker not found"); + } + const messageId = extractTag(xml, "MsgId") ?? ""; + const originalMessageId = extractTag(xml, "OrgnlMsgId") ?? ""; + const status = (extractTag(xml, "Cd") ?? extractTag(xml, "ConfSts") ?? "PDNG") as Camt025Receipt["status"]; + const reasonCode = extractTag(xml, "PrtryStsRsn") ?? extractTag(xml, "Rsn"); + const dateTime = extractTag(xml, "CreDtTm"); + if (!messageId) throw new Error("camt.025: missing MsgId"); + if (!originalMessageId) throw new Error("camt.025: missing OrgnlMsgId"); + return { type: "camt.025", messageId, originalMessageId, status, reasonCode, dateTime }; +} + +/** + * Parse a camt.054 Credit/Debit Notification. + */ +export function parseCamt054(xml: string): Camt054Notification { + if (!/xmlns="urn:iso:std:iso:20022:tech:xsd:camt\.054/.test(xml)) { + throw new Error("camt.054: xmlns marker not found"); + } + const messageId = extractTag(xml, "MsgId") ?? ""; + const cdtDbt = (extractTag(xml, "CdtDbtInd") ?? "CRDT") as "CRDT" | "DBIT"; + const amt = extractAmountWithCcy(xml, "Amt"); + if (!amt) throw new Error("camt.054: missing Amt"); + const endToEndId = extractTag(xml, "EndToEndId"); + const valueDate = extractTag(xml, "ValDt"); + const bookingDate = extractTag(xml, "BookgDt"); + if (!messageId) throw new Error("camt.054: missing MsgId"); + return { + type: "camt.054", + messageId, + creditDebitIndicator: cdtDbt, + amount: amt.amount, + currency: amt.currency, + endToEndId, + valueDate, + bookingDate, + }; +} + +/** + * Dispatch on the xmlns marker. Throws if the document is neither + * camt.025 nor camt.054. + */ +export function parseCamt(xml: string): CamtMessage { + if (/xmlns="urn:iso:std:iso:20022:tech:xsd:camt\.025/.test(xml)) return parseCamt025(xml); + if (/xmlns="urn:iso:std:iso:20022:tech:xsd:camt\.054/.test(xml)) return parseCamt054(xml); + throw new Error("camt: unsupported or missing xmlns (expected camt.025 or camt.054)"); +} + +/** + * Reconcile a camt.054 credit notification against an expected + * (amount, currency, endToEndId). Returns the list of mismatches so + * VALIDATING can feed them into Data.valueMismatch(). + */ +export interface ReconcileExpected { + amount: number; + currency: string; + endToEndId?: string; +} + +export function reconcileCamt054( + msg: Camt054Notification, + expected: ReconcileExpected, +): Array<{ field: string; expected: unknown; actual: unknown }> { + const mismatches: Array<{ field: string; expected: unknown; actual: unknown }> = []; + if (msg.creditDebitIndicator !== "CRDT") { + mismatches.push({ field: "creditDebitIndicator", expected: "CRDT", actual: msg.creditDebitIndicator }); + } + if (msg.currency !== expected.currency) { + mismatches.push({ field: "currency", expected: expected.currency, actual: msg.currency }); + } + if (msg.amount !== expected.amount) { + mismatches.push({ field: "amount", expected: expected.amount, actual: msg.amount }); + } + if (expected.endToEndId && msg.endToEndId && msg.endToEndId !== expected.endToEndId) { + mismatches.push({ field: "endToEndId", expected: expected.endToEndId, actual: msg.endToEndId }); + } + return mismatches; +} diff --git a/orchestrator/src/services/swift/index.ts b/orchestrator/src/services/swift/index.ts new file mode 100644 index 0000000..692906e --- /dev/null +++ b/orchestrator/src/services/swift/index.ts @@ -0,0 +1,36 @@ +/** + * SWIFT gateway — public surface (arch §4.2 + §4.3). + * + * Outbound generators: + * - generateMt760 : issuance of SBLC (Cat-7 FIN) + * - generatePacs009 : FI-to-FI credit transfer (ISO 20022 XML) + * - generateMt202 : FIN equivalent of pacs.009 for non-migrated + * corridors + * + * Inbound parsers: + * - parseCamt025 : receipt / status of a prior instruction + * - parseCamt054 : bank-to-customer credit/debit notification + * - reconcileCamt054: diff a camt.054 against the expected amount, + * currency, and end-to-end id + * + * Channel selection (arch §9.2 accepted !== settled): + * - pacs.008 remains the customer-initiated PSP channel (existing + * `services/iso20022.ts`). COMMIT must not fire on pacs.008 + * "acceptance" alone. + * - pacs.009 / MT202 is the interbank settlement channel; COMMIT + * requires either camt.025 ACSC or camt.054 CRDT evidence here. + */ + +export { generateMt760, messageHash, type Mt760Message } from "./mt760"; +export { generatePacs009, type Pacs009Options, type Pacs009Result } from "./pacs009"; +export { generateMt202, type Mt202Options, type Mt202Message } from "./mt202"; +export { + parseCamt, + parseCamt025, + parseCamt054, + reconcileCamt054, + type Camt025Receipt, + type Camt054Notification, + type CamtMessage, + type ReconcileExpected, +} from "./camt"; diff --git a/orchestrator/src/services/swift/mt202.ts b/orchestrator/src/services/swift/mt202.ts new file mode 100644 index 0000000..17bd638 --- /dev/null +++ b/orchestrator/src/services/swift/mt202.ts @@ -0,0 +1,78 @@ +/** + * MT202 COV — General Financial Institution Transfer (cover method). + * + * Arch §4.3. FIN equivalent of pacs.009 used on SWIFT networks that + * have not yet migrated to ISO 20022. Generated alongside pacs.009 + * during transitional period — settlement confirmation can arrive on + * either channel. + */ + +import type { Plan, PlanStep } from "../../types/plan"; + +export interface Mt202Options { + transactionReference: string; + relatedReference?: string; + valueDate: string; // YYYY-MM-DD + sendingInstitution: string; // BIC + receivingInstitution: string;// BIC + beneficiaryInstitution: string; // BIC + orderingInstitution?: string;// BIC +} + +export interface Mt202Message { + sender: string; + receiver: string; + fin: string; + fields: Record; +} + +function yyMMdd(iso: string): string { + const m = /^(\d{4})-(\d{2})-(\d{2})$/.exec(iso); + if (!m) throw new Error(`MT202: valueDate must be YYYY-MM-DD, got '${iso}'`); + return `${m[1].slice(2)}${m[2]}${m[3]}`; +} + +function bicCheck(bic: string, field: string): void { + if (!/^[A-Z0-9]{8}([A-Z0-9]{3})?$/.test(bic)) { + throw new Error(`MT202: ${field} must be a valid BIC, got '${bic}'`); + } +} + +function findPayStep(plan: Plan): PlanStep { + const step = plan.steps.find((s) => s.type === "pay"); + if (!step) throw new Error("MT202: plan must contain a 'pay' step"); + return step; +} + +export function generateMt202(plan: Plan, opts: Mt202Options): Mt202Message { + bicCheck(opts.sendingInstitution, "sendingInstitution"); + bicCheck(opts.receivingInstitution, "receivingInstitution"); + bicCheck(opts.beneficiaryInstitution, "beneficiaryInstitution"); + if (opts.orderingInstitution) bicCheck(opts.orderingInstitution, "orderingInstitution"); + + const payStep = findPayStep(plan); + const ccy = (payStep.asset ?? "USD").toUpperCase(); + const amount = payStep.amount.toFixed(2).replace(".", ","); + const field32A = `${yyMMdd(opts.valueDate)}${ccy}${amount}`; + + const fields: Record = { + "20": opts.transactionReference, + "21": opts.relatedReference ?? opts.transactionReference, + "32A": field32A, + "52A": opts.orderingInstitution ?? opts.sendingInstitution, + "57A": opts.receivingInstitution, + "58A": opts.beneficiaryInstitution, + }; + + const block1 = `{1:F01${opts.sendingInstitution.padEnd(12, "X")}0000000000}`; + const block2 = `{2:I202${opts.receivingInstitution.padEnd(12, "X")}N}`; + const block4 = Object.entries(fields).map(([t, v]) => `:${t}:${v}`).join("\n"); + const block4Wrapped = `{4:\n${block4}\n-}`; + + return { + sender: opts.sendingInstitution, + receiver: opts.receivingInstitution, + fin: `${block1}${block2}${block4Wrapped}`, + fields, + }; +} diff --git a/orchestrator/src/services/swift/mt760.ts b/orchestrator/src/services/swift/mt760.ts new file mode 100644 index 0000000..62c0277 --- /dev/null +++ b/orchestrator/src/services/swift/mt760.ts @@ -0,0 +1,112 @@ +/** + * MT760 — Issue of a Demand Guarantee / Standby Letter of Credit + * (arch §4.2 Banking Instrument Layer + §6 Instrument Terms Hash). + * + * SWIFT FIN message. This is the issuance leg of the two-phase + * commit. Output is deterministic so the planHash anchored on-chain + * can be reproduced by any party with access to the InstrumentTerms. + * + * Reference: SWIFT FIN Category 7 User Handbook, MT760 format; + * Emirates Islamic Bank beneficiary-format SBLC template. + */ + +import { createHash } from "crypto"; +import type { InstrumentTerms } from "../../types/plan"; + +export interface Mt760Message { + sender: string; + receiver: string; + messageReference: string; + fin: string; + fields: Record; +} + +function formatAmount(amount: number, currency: string): string { + // SWIFT FIN amount: 3-letter currency + 15n,2d (max), decimal comma. + if (amount < 0) throw new Error("MT760: amount must be non-negative"); + return `${currency}${amount.toFixed(2).replace(".", ",")}`; +} + +function yyMMdd(iso: string): string { + // Accept YYYY-MM-DD and return YYMMDD. + const m = /^(\d{4})-(\d{2})-(\d{2})$/.exec(iso); + if (!m) throw new Error(`MT760: expiryDate must be YYYY-MM-DD, got '${iso}'`); + return `${m[1].slice(2)}${m[2]}${m[3]}`; +} + +/** + * Render an MT760 from an InstrumentTerms record. Uses the + * block-structured FIN format (Block 1/2/4/5). Tag codes: + * + * :20: Transaction reference number + * :23: Further identification + * :27: Sequence of total (here: 1/1) + * :30: Date of issue + * :40C: Applicable rules (URDG 758, UCP 600) + * :31D: Date and place of expiry + * :50: Applicant + * :52A: Issuing bank (BIC) + * :59: Beneficiary name + account + * :32B: Amount + * :77C: Details of guarantee + * :72Z: Sender to receiver info + */ +export function generateMt760( + terms: InstrumentTerms, + opts: { transactionReference: string; issueDate: string }, +): Mt760Message { + const sender = terms.issuingBankBIC; + const receiver = terms.beneficiaryBankBIC; + const field32B = formatAmount(terms.amount, terms.currency); + const field31D = `${yyMMdd(terms.expiryDate)}${terms.placeOfPresentation.toUpperCase()}`; + + const fields: Record = { + "20": opts.transactionReference, + "23": "ISSUE OF STANDBY LETTER OF CREDIT", + "27": "1/1", + "30": yyMMdd(opts.issueDate), + "40C": terms.governingLaw, + "31D": field31D, + "50": terms.applicant, + "52A": terms.issuingBankBIC, + "59": [terms.beneficiaryName, terms.beneficiaryAccount].filter(Boolean).join("\n"), + "32B": field32B, + "77C": [ + `TEMPLATE/${terms.templateRef}`, + `TEMPLATE_HASH/${terms.templateHash}`, + `TENOR/${terms.tenor}`, + ].join("\n"), + "72Z": `GOVLAW/${terms.governingLaw}`, + }; + + // Build FIN block 4 body with :tag:value sequences. + const block4 = Object.entries(fields) + .map(([tag, value]) => `:${tag}:${value}`) + .join("\n"); + + const block1 = `{1:F01${sender.padEnd(12, "X")}0000000000}`; + const block2 = `{2:I760${receiver.padEnd(12, "X")}N}`; + const block4Wrapped = `{4:\n${block4}\n-}`; + const block5 = `{5:{CHK:${checksum(block4)}}}`; + + const fin = `${block1}${block2}${block4Wrapped}${block5}`; + + return { sender, receiver, messageReference: opts.transactionReference, fin, fields }; +} + +/** + * Deterministic SHA-256 over the canonical field list. Matches + * InstrumentTerms.templateHash when all 11 required fields are filled + * in with the SBLC template values. + */ +export function messageHash(msg: Mt760Message): string { + const canonical = Object.entries(msg.fields) + .sort(([a], [b]) => a.localeCompare(b)) + .map(([k, v]) => `${k}=${v}`) + .join("\n"); + return createHash("sha256").update(canonical).digest("hex"); +} + +function checksum(block4Body: string): string { + return createHash("sha256").update(block4Body).digest("hex").slice(0, 12).toUpperCase(); +} diff --git a/orchestrator/src/services/swift/pacs009.ts b/orchestrator/src/services/swift/pacs009.ts new file mode 100644 index 0000000..ab9303c --- /dev/null +++ b/orchestrator/src/services/swift/pacs009.ts @@ -0,0 +1,94 @@ +/** + * pacs.009 — Financial Institution Credit Transfer (ISO 20022). + * + * Arch §4.3 Payment Messaging / Settlement Layer. Used for + * **bank-to-bank** credit transfers (the interbank leg); pacs.008 is + * for **customer-to-bank** PSP-initiated transfers. The gap-analysis + * flagged that ExecutionCoordinator was generating pacs.008 for what + * is actually a FI-to-FI settlement leg — this module fixes that. + * + * Reference: ISO 20022 Payments Maintenance 2019 / 2022, + * pacs.009.001.08 schema. + */ + +import type { Plan, PlanStep } from "../../types/plan"; + +export interface Pacs009Options { + messageId: string; + creationDateTime?: string; + instructingAgentBIC: string; + instructedAgentBIC: string; + debtorAgentBIC: string; + creditorAgentBIC: string; + endToEndId?: string; +} + +export interface Pacs009Result { + messageId: string; + endToEndId: string; + xml: string; +} + +function bicCheck(bic: string, field: string): void { + if (!/^[A-Z0-9]{8}([A-Z0-9]{3})?$/.test(bic)) { + throw new Error(`pacs.009: ${field} must be a valid BIC, got '${bic}'`); + } +} + +function findPayStep(plan: Plan): PlanStep { + const step = plan.steps.find((s) => s.type === "pay"); + if (!step) throw new Error("pacs.009: plan must contain a 'pay' step"); + return step; +} + +/** + * Render a pacs.009.001.08 XML message for the interbank leg of the + * plan's `pay` step. + */ +export function generatePacs009(plan: Plan, opts: Pacs009Options): Pacs009Result { + bicCheck(opts.instructingAgentBIC, "instructingAgentBIC"); + bicCheck(opts.instructedAgentBIC, "instructedAgentBIC"); + bicCheck(opts.debtorAgentBIC, "debtorAgentBIC"); + bicCheck(opts.creditorAgentBIC, "creditorAgentBIC"); + + const payStep = findPayStep(plan); + const messageId = opts.messageId; + const endToEndId = opts.endToEndId ?? `E2E-${plan.plan_id ?? messageId}`; + const creDtTm = opts.creationDateTime ?? new Date().toISOString(); + const ccy = (payStep.asset ?? "USD").toUpperCase(); + const amount = payStep.amount.toFixed(2); + const settleDate = creDtTm.split("T")[0]; + + const xml = ` + + + + ${escapeXml(messageId)} + ${escapeXml(creDtTm)} + 1 + INGA + ${opts.instructingAgentBIC} + ${opts.instructedAgentBIC} + + + + ${escapeXml(messageId)} + ${escapeXml(endToEndId)} + ${escapeXml(messageId)} + + ${amount} + ${settleDate} + ${opts.debtorAgentBIC} + ${opts.debtorAgentBIC} + ${opts.creditorAgentBIC} + ${opts.creditorAgentBIC} + + +`; + + return { messageId, endToEndId, xml }; +} + +function escapeXml(s: string): string { + return s.replace(/[<>&"']/g, (c) => ({ "<": "<", ">": ">", "&": "&", '"': """, "'": "'" }[c]!)); +} diff --git a/orchestrator/tests/unit/eventBus.test.ts b/orchestrator/tests/unit/eventBus.test.ts new file mode 100644 index 0000000..c5acee9 --- /dev/null +++ b/orchestrator/tests/unit/eventBus.test.ts @@ -0,0 +1,149 @@ +import { describe, it, expect, beforeEach, jest } from "@jest/globals"; + +type Row = { + id: string; + plan_id: string; + type: string; + actor: string | null; + payload: Record; + payload_hash: string; + prev_hash: string | null; + signature: string; + created_at: string; +}; + +const rows: Row[] = []; +let idSeq = 0; + +jest.mock("../../src/db/postgres", () => ({ + query: async (sql: string, params: unknown[] = []) => { + if (sql.startsWith("SELECT signature")) { + const planId = params[0] as string; + const matches = rows.filter((r) => r.plan_id === planId); + if (matches.length === 0) return []; + return [{ signature: matches[matches.length - 1].signature }]; + } + if (sql.startsWith("INSERT INTO events")) { + const [plan_id, type, actor, payloadJson, payload_hash, prev_hash, signature] = + params as [string, string, string | null, string, string, string | null, string]; + const rec: Row = { + id: `evt-${++idSeq}`, + plan_id, + type, + actor, + payload: JSON.parse(payloadJson), + payload_hash, + prev_hash, + signature, + created_at: new Date(Date.now() + idSeq).toISOString(), + }; + rows.push(rec); + return [rec]; + } + if (sql.startsWith("SELECT id, plan_id")) { + const planId = params[0] as string; + return rows.filter((r) => r.plan_id === planId); + } + return []; + }, +})); + +import { publish, getEventsForPlan, verifyChain, EVENT_TYPES } from "../../src/services/eventBus"; + +describe("Event Bus", () => { + beforeEach(() => { + rows.length = 0; + idSeq = 0; + }); + + it("EVENT_TYPES covers all arch §7.2 categories", () => { + expect(EVENT_TYPES).toContain("transaction.created"); + expect(EVENT_TYPES).toContain("transaction.committed"); + expect(EVENT_TYPES).toContain("transaction.aborted"); + expect(EVENT_TYPES).toContain("payment.settled"); + expect(EVENT_TYPES).toContain("instrument.dispatched"); + expect(EVENT_TYPES.length).toBe(15); + }); + + it("publish persists with payload_hash, prev_hash=null, and signature", async () => { + const rec = await publish({ + planId: "p-1", + type: "transaction.created", + actor: "coordinator", + payload: { foo: 1 }, + }); + expect(rec.id).toMatch(/evt-/); + expect(rec.prev_hash).toBeNull(); + expect(rec.payload_hash).toMatch(/^[0-9a-f]{64}$/); + expect(rec.signature).toMatch(/^[0-9a-f]{64}$/); + expect(rec.payload).toEqual({ foo: 1 }); + }); + + it("prev_hash chains consecutive events for the same plan", async () => { + const a = await publish({ planId: "p-1", type: "transaction.created" }); + const b = await publish({ planId: "p-1", type: "participants.authorized" }); + const c = await publish({ planId: "p-1", type: "preconditions.satisfied" }); + expect(a.prev_hash).toBeNull(); + expect(b.prev_hash).toBe(a.signature); + expect(c.prev_hash).toBe(b.signature); + }); + + it("events are isolated per plan_id", async () => { + const a1 = await publish({ planId: "p-1", type: "transaction.created" }); + const b1 = await publish({ planId: "p-2", type: "transaction.created" }); + expect(a1.prev_hash).toBeNull(); + expect(b1.prev_hash).toBeNull(); + }); + + it("verifyChain returns ok for an untampered chain", async () => { + await publish({ planId: "p-1", type: "transaction.created" }); + await publish({ planId: "p-1", type: "transaction.prepared" }); + await publish({ planId: "p-1", type: "transaction.committed" }); + const result = await verifyChain("p-1"); + expect(result.ok).toBe(true); + }); + + it("verifyChain detects payload tampering", async () => { + await publish({ planId: "p-1", type: "transaction.created", payload: { amount: 100 } }); + await publish({ planId: "p-1", type: "transaction.committed" }); + rows[0].payload = { amount: 999_999 }; // tamper + const result = await verifyChain("p-1"); + expect(result.ok).toBe(false); + if (!result.ok) { + expect(result.brokenAt).toBe(0); + expect(result.reason).toBe("payload_hash mismatch"); + } + }); + + it("verifyChain detects signature tampering", async () => { + await publish({ planId: "p-1", type: "transaction.created" }); + await publish({ planId: "p-1", type: "transaction.committed" }); + rows[1].signature = "0".repeat(64); // tamper + const result = await verifyChain("p-1"); + expect(result.ok).toBe(false); + if (!result.ok) { + expect(result.brokenAt).toBe(1); + } + }); + + it("verifyChain detects broken prev_hash link", async () => { + await publish({ planId: "p-1", type: "transaction.created" }); + await publish({ planId: "p-1", type: "transaction.committed" }); + rows[1].prev_hash = "0".repeat(64); + const result = await verifyChain("p-1"); + expect(result.ok).toBe(false); + if (!result.ok) { + expect(result.reason).toBe("prev_hash mismatch"); + } + }); + + it("getEventsForPlan returns events in chronological order", async () => { + await publish({ planId: "p-1", type: "transaction.created" }); + await publish({ planId: "p-1", type: "transaction.prepared" }); + const events = await getEventsForPlan("p-1"); + expect(events.map((e) => e.type)).toEqual([ + "transaction.created", + "transaction.prepared", + ]); + }); +}); diff --git a/orchestrator/tests/unit/swift.test.ts b/orchestrator/tests/unit/swift.test.ts new file mode 100644 index 0000000..805cd8d --- /dev/null +++ b/orchestrator/tests/unit/swift.test.ts @@ -0,0 +1,169 @@ +import { describe, it, expect } from "@jest/globals"; +import { + generateMt760, + messageHash, + generatePacs009, + generateMt202, + parseCamt025, + parseCamt054, + parseCamt, + reconcileCamt054, +} from "../../src/services/swift"; +import type { InstrumentTerms, Plan } from "../../src/types/plan"; + +const TERMS: InstrumentTerms = { + applicant: "ACME TRADING FZE", + issuingBankBIC: "EBILAEAD", + beneficiaryBankBIC: "EMBKAEAD", + beneficiaryName: "BLUE OCEAN SHIPPING LLC", + beneficiaryAccount: "AE070260001015104203701", + amount: 1_500_000, + currency: "USD", + tenor: "365D", + expiryDate: "2027-04-18", + placeOfPresentation: "DUBAI", + governingLaw: "URDG 758", + templateRef: "EIB-SBLC-2024-01", + templateHash: "a".repeat(64), +}; + +const PLAN: Plan = { + plan_id: "11111111-2222-3333-4444-555555555555", + creator: "0xabc", + steps: [{ type: "pay", asset: "USD", amount: 1_500_000 }], +}; + +describe("SWIFT gateway — MT760", () => { + it("renders all 12 required tags", () => { + const msg = generateMt760(TERMS, { transactionReference: "TXN1", issueDate: "2026-04-18" }); + expect(msg.sender).toBe("EBILAEAD"); + expect(msg.receiver).toBe("EMBKAEAD"); + expect(msg.fields["20"]).toBe("TXN1"); + expect(msg.fields["30"]).toBe("260418"); + expect(msg.fields["32B"]).toBe("USD1500000,00"); + expect(msg.fields["31D"]).toBe("270418DUBAI"); + expect(msg.fin).toContain("{1:F01EBILAEADXXXX0000000000}"); + expect(msg.fin).toContain("{2:I760EMBKAEADXXXXN}"); + expect(msg.fin).toContain(":32B:USD1500000,00"); + }); + + it("rejects malformed expiry date", () => { + expect(() => + generateMt760({ ...TERMS, expiryDate: "not-a-date" }, { transactionReference: "T", issueDate: "2026-04-18" }), + ).toThrow(/YYYY-MM-DD/); + }); + + it("rejects negative amount", () => { + expect(() => + generateMt760({ ...TERMS, amount: -1 }, { transactionReference: "T", issueDate: "2026-04-18" }), + ).toThrow(/non-negative/); + }); + + it("messageHash is deterministic", () => { + const a = generateMt760(TERMS, { transactionReference: "T", issueDate: "2026-04-18" }); + const b = generateMt760(TERMS, { transactionReference: "T", issueDate: "2026-04-18" }); + expect(messageHash(a)).toBe(messageHash(b)); + expect(messageHash(a)).toMatch(/^[0-9a-f]{64}$/); + }); +}); + +describe("SWIFT gateway — pacs.009", () => { + const opts = { + messageId: "MSG-1", + creationDateTime: "2026-04-18T10:00:00Z", + instructingAgentBIC: "EBILAEAD", + instructedAgentBIC: "EMBKAEAD", + debtorAgentBIC: "EBILAEAD", + creditorAgentBIC: "EMBKAEAD", + }; + + it("emits well-formed pacs.009.001.08 XML", () => { + const result = generatePacs009(PLAN, opts); + expect(result.messageId).toBe("MSG-1"); + expect(result.xml).toContain("urn:iso:std:iso:20022:tech:xsd:pacs.009.001.08"); + expect(result.xml).toContain("1500000.00"); + expect(result.xml).toContain("EBILAEAD"); + expect(result.xml).toContain("EMBKAEAD"); + expect(result.endToEndId).toBe(`E2E-${PLAN.plan_id}`); + }); + + it("rejects invalid BIC", () => { + expect(() => generatePacs009(PLAN, { ...opts, instructingAgentBIC: "BAD" })).toThrow(/BIC/); + }); + + it("requires a pay step", () => { + expect(() => + generatePacs009({ ...PLAN, steps: [{ type: "borrow", amount: 1, asset: "USD" }] }, opts), + ).toThrow(/pay/); + }); +}); + +describe("SWIFT gateway — MT202", () => { + it("renders the 6 required tags", () => { + const msg = generateMt202(PLAN, { + transactionReference: "TXN-1", + valueDate: "2026-04-18", + sendingInstitution: "EBILAEAD", + receivingInstitution: "EMBKAEAD", + beneficiaryInstitution: "EMBKAEAD", + }); + expect(msg.fields["20"]).toBe("TXN-1"); + expect(msg.fields["32A"]).toBe("260418USD1500000,00"); + expect(msg.fields["58A"]).toBe("EMBKAEAD"); + expect(msg.fin).toContain(":20:TXN-1"); + }); +}); + +describe("SWIFT gateway — camt parsers", () => { + it("parseCamt025 extracts status + ids", () => { + const xml = `R1MSG-1ACSC2026-04-18T10:01:00Z`; + const r = parseCamt025(xml); + expect(r.type).toBe("camt.025"); + expect(r.originalMessageId).toBe("MSG-1"); + expect(r.status).toBe("ACSC"); + }); + + it("parseCamt054 extracts credit amount + endToEndId", () => { + const xml = `N11500000.00CRDT
2026-04-18
2026-04-18
E2E-plan-1
`; + const r = parseCamt054(xml); + expect(r.type).toBe("camt.054"); + expect(r.creditDebitIndicator).toBe("CRDT"); + expect(r.amount).toBe(1_500_000); + expect(r.currency).toBe("USD"); + expect(r.endToEndId).toBe("E2E-plan-1"); + }); + + it("parseCamt dispatches on xmlns marker", () => { + const xml025 = `ROACSC`; + expect(parseCamt(xml025).type).toBe("camt.025"); + }); + + it("parseCamt rejects unknown xmlns", () => { + expect(() => parseCamt('')).toThrow(/unsupported/); + }); + + it("reconcileCamt054 returns empty array when everything matches", () => { + const msg = { + type: "camt.054" as const, + messageId: "N1", + creditDebitIndicator: "CRDT" as const, + amount: 1_500_000, + currency: "USD", + endToEndId: "E2E-1", + }; + expect(reconcileCamt054(msg, { amount: 1_500_000, currency: "USD", endToEndId: "E2E-1" })).toEqual([]); + }); + + it("reconcileCamt054 reports amount + currency + direction mismatches", () => { + const msg = { + type: "camt.054" as const, + messageId: "N1", + creditDebitIndicator: "DBIT" as const, + amount: 1_400_000, + currency: "EUR", + endToEndId: "E2E-2", + }; + const result = reconcileCamt054(msg, { amount: 1_500_000, currency: "USD", endToEndId: "E2E-1" }); + expect(result.map((m) => m.field).sort()).toEqual(["amount", "creditDebitIndicator", "currency", "endToEndId"]); + }); +});