From c72f9cd8072b2e9ba75d76a36081e993a87f53b4 Mon Sep 17 00:00:00 2001 From: Devin Date: Wed, 22 Apr 2026 18:22:04 +0000 Subject: [PATCH] executions.swift_message_id + SWIFT gateway wiring MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Closes gap-analysis v2 §4 partial (canonical "Execution Reference Set") and §10.6 SWIFT message ID persistence. - Migration 006 adds swift_message_id + swift_message_type columns to executions, with a partial index on swift_message_id for acknowledgment ingest (camt.025/054 -> original MT760/MT202 lookup). - db/executions.ts: recordExecution() UPSERT helper, getExecution(), findBySwiftMessageId() — the three queries the bank-instruction phase and SWIFT gateway need. - services/bank.ts.commitBankInstruction now emits a SWIFT reference alongside the ISO-20022 envelope: MT760 for plans carrying an issueInstrument step (real generateMt760 output, messageReference field), MT202 for payment-only plans (synthetic ref). - services/execution.ts persists the reference set at bank_instruction complete-time via recordExecution (best-effort; logs on failure, does not abort the leg). - 5 unit tests covering MT760 vs MT202 branching, reference uniqueness across calls, and SQL shape of the UPSERT + SELECT. --- orchestrator/src/db/executions.ts | 112 ++++++++++++++ .../src/db/migrations/006_executions_swift.ts | 34 +++++ orchestrator/src/db/migrations/index.ts | 2 + orchestrator/src/services/bank.ts | 55 +++++-- orchestrator/src/services/execution.ts | 22 ++- .../tests/unit/swiftMessageId.test.ts | 139 ++++++++++++++++++ 6 files changed, 350 insertions(+), 14 deletions(-) create mode 100644 orchestrator/src/db/executions.ts create mode 100644 orchestrator/src/db/migrations/006_executions_swift.ts create mode 100644 orchestrator/tests/unit/swiftMessageId.test.ts diff --git a/orchestrator/src/db/executions.ts b/orchestrator/src/db/executions.ts new file mode 100644 index 0000000..84ee885 --- /dev/null +++ b/orchestrator/src/db/executions.ts @@ -0,0 +1,112 @@ +/** + * Executions DB helpers — arch §4 canonical "Execution Reference Set". + * + * The executions row is the join point for the three dispatch references + * that must be reconciled at VALIDATING time (arch §9.2): + * + * - dlt_tx_hash — shared-state / ledger anchor (Chain-138) + * - iso_message_id — ISO-20022 envelope id (pacs.009 / pacs.008) + * - swift_message_id — SWIFT FIN reference for the leg (MT760 / MT202) + * - swift_message_type — FIN msg type ("MT760" | "MT202" | "pacs.009" …) + * + * `recordExecution()` UPSERTs by (plan_id, execution_id). + */ + +import { query } from "./postgres"; + +export interface ExecutionRow { + execution_id: string; + plan_id: string; + status: string; + phase: string | null; + started_at: string; + completed_at: string | null; + error: string | null; + dlt_tx_hash: string | null; + iso_message_id: string | null; + swift_message_id: string | null; + swift_message_type: string | null; +} + +export interface ExecutionPatch { + status?: string; + phase?: string; + completedAt?: Date | null; + error?: string | null; + dltTxHash?: string | null; + isoMessageId?: string | null; + swiftMessageId?: string | null; + swiftMessageType?: string | null; +} + +/** + * UPSERT an execution row. Safe to call repeatedly — phased fields + * are merged via COALESCE semantics (new NULL never clobbers a prior + * non-NULL value; explicit empty-string caller still overrides via + * patch semantics below). + */ +export async function recordExecution( + executionId: string, + planId: string, + patch: ExecutionPatch = {}, +): Promise { + await query( + `INSERT INTO executions ( + execution_id, plan_id, status, phase, completed_at, error, + dlt_tx_hash, iso_message_id, swift_message_id, swift_message_type + ) + VALUES ($1, $2, COALESCE($3, 'pending'), $4, $5, $6, $7, $8, $9, $10) + ON CONFLICT (execution_id) DO UPDATE SET + status = COALESCE(EXCLUDED.status, executions.status), + phase = COALESCE(EXCLUDED.phase, executions.phase), + completed_at = COALESCE(EXCLUDED.completed_at, executions.completed_at), + error = COALESCE(EXCLUDED.error, executions.error), + dlt_tx_hash = COALESCE(EXCLUDED.dlt_tx_hash, executions.dlt_tx_hash), + iso_message_id = COALESCE(EXCLUDED.iso_message_id, executions.iso_message_id), + swift_message_id = COALESCE(EXCLUDED.swift_message_id, executions.swift_message_id), + swift_message_type = COALESCE(EXCLUDED.swift_message_type, executions.swift_message_type), + updated_at = CURRENT_TIMESTAMP`, + [ + executionId, + planId, + patch.status ?? null, + patch.phase ?? null, + patch.completedAt ?? null, + patch.error ?? null, + patch.dltTxHash ?? null, + patch.isoMessageId ?? null, + patch.swiftMessageId ?? null, + patch.swiftMessageType ?? null, + ], + ); +} + +export async function getExecution( + executionId: string, +): Promise { + const rows = await query( + `SELECT execution_id, plan_id, status, phase, started_at, completed_at, + error, dlt_tx_hash, iso_message_id, + swift_message_id, swift_message_type + FROM executions + WHERE execution_id = $1`, + [executionId], + ); + return rows[0] ?? null; +} + +export async function findBySwiftMessageId( + swiftMessageId: string, +): Promise { + const rows = await query( + `SELECT execution_id, plan_id, status, phase, started_at, completed_at, + error, dlt_tx_hash, iso_message_id, + swift_message_id, swift_message_type + FROM executions + WHERE swift_message_id = $1 + ORDER BY started_at DESC + LIMIT 1`, + [swiftMessageId], + ); + return rows[0] ?? null; +} diff --git a/orchestrator/src/db/migrations/006_executions_swift.ts b/orchestrator/src/db/migrations/006_executions_swift.ts new file mode 100644 index 0000000..0edc3f4 --- /dev/null +++ b/orchestrator/src/db/migrations/006_executions_swift.ts @@ -0,0 +1,34 @@ +import { query } from "../postgres"; + +/** + * Migration 006 — executions.swift_message_id + swift_message_type + * (arch §4 canonical "Execution Reference Set"; gap v2 §4 partial, + * §10.6 SWIFT message ID persistence). + * + * The Execution Reference Set needs the SWIFT FIN reference for each + * leg (MT760 for the instrument leg, pacs.009/MT202 for the payment + * leg), alongside the existing `dlt_tx_hash` for the shared-state + * anchor and `iso_message_id` for the ISO-20022 envelope. Keeping them + * separate makes it trivial to reconcile a SWIFT acknowledgment + * (camt.025/054) against the originating dispatch. + */ +export async function up() { + await query( + `ALTER TABLE executions + ADD COLUMN IF NOT EXISTS swift_message_id VARCHAR(255), + ADD COLUMN IF NOT EXISTS swift_message_type VARCHAR(32)`, + ); + await query( + `CREATE INDEX IF NOT EXISTS idx_executions_swift_message_id + ON executions(swift_message_id) + WHERE swift_message_id IS NOT NULL`, + ); +} + +export async function down() { + await query( + `ALTER TABLE executions + DROP COLUMN IF EXISTS swift_message_id, + DROP COLUMN IF EXISTS swift_message_type`, + ); +} diff --git a/orchestrator/src/db/migrations/index.ts b/orchestrator/src/db/migrations/index.ts index 2158e8a..6264ae8 100644 --- a/orchestrator/src/db/migrations/index.ts +++ b/orchestrator/src/db/migrations/index.ts @@ -2,6 +2,7 @@ import { up as up001 } from "./001_initial_schema"; import { up as up002 } from "./002_transaction_state"; import { up as up003 } from "./003_events"; import { up as up004 } from "./004_idempotency_keys"; +import { up as up006 } from "./006_executions_swift"; /** * Run all migrations @@ -12,6 +13,7 @@ export async function runMigration() { await up002(); await up003(); await up004(); + await up006(); console.log("All migrations completed"); } catch (error) { console.error("Migration failed:", error); diff --git a/orchestrator/src/services/bank.ts b/orchestrator/src/services/bank.ts index a4c2db7..b39d19f 100644 --- a/orchestrator/src/services/bank.ts +++ b/orchestrator/src/services/bank.ts @@ -1,5 +1,6 @@ import type { Plan } from "../types/plan"; import { generatePacs008 } from "./iso20022"; +import { generateMt760 } from "./swift"; /** * Prepare bank instruction (2PC prepare phase) @@ -25,27 +26,57 @@ export async function prepareBankInstruction(plan: Plan): Promise { export async function commitBankInstruction(plan: Plan): Promise<{ success: boolean; isoMessageId?: string; + /** SWIFT FIN reference for the leg (arch §4 Execution Reference Set). */ + swiftMessageId?: string; + /** FIN message type, e.g. "MT760" for instrument issue, "MT202"/"pacs.009" for FI transfer. */ + swiftMessageType?: string; error?: string; }> { console.log(`[Bank] Committing instruction for plan ${plan.plan_id}`); - + try { - // Generate final ISO-20022 message - const isoMessage = await generatePacs008(plan); - - // Mock: In real implementation, this would: - // 1. Send ISO message to bank connector - // 2. Receive confirmation and message ID - // 3. Store message ID for audit trail - + // Generate final ISO-20022 envelope. + await generatePacs008(plan); + const isoMessageId = `MSG-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`; - - // Simulate processing delay + + // Generate a SWIFT reference for the leg. If any step is an + // instrument issuance (issueInstrument) we pin MT760; otherwise + // this is a pacs.009 / MT202 FI credit transfer. We don't send + // anything over the wire here — PR R stands up the FIN-link + // sandbox transport. + const hasInstrument = plan.steps.some((s) => s.type === "issueInstrument"); + let swiftMessageId: string | undefined; + let swiftMessageType: string | undefined; + try { + if (hasInstrument) { + const instrumentStep = plan.steps.find((s) => s.type === "issueInstrument"); + if (instrumentStep?.instrument) { + const txRef = `MT760-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`.toUpperCase(); + const mt760 = generateMt760(instrumentStep.instrument, { + transactionReference: txRef, + issueDate: new Date().toISOString().slice(0, 10), + }); + swiftMessageId = mt760.messageReference; + swiftMessageType = "MT760"; + } + } else { + swiftMessageId = `MT202-${Date.now()}-${Math.random().toString(36).slice(2, 10)}`.toUpperCase(); + swiftMessageType = "MT202"; + } + } catch (err) { + // SWIFT generator errors should not fail the leg in mock mode — we + // still have an ISO message id. Surface the error in the log. + console.warn(`[Bank] SWIFT reference generation skipped: ${(err as Error).message}`); + } + await new Promise((resolve) => setTimeout(resolve, 300)); - + return { success: true, isoMessageId, + swiftMessageId, + swiftMessageType, }; } catch (error: any) { return { diff --git a/orchestrator/src/services/execution.ts b/orchestrator/src/services/execution.ts index 51ee880..b548e54 100644 --- a/orchestrator/src/services/execution.ts +++ b/orchestrator/src/services/execution.ts @@ -1,5 +1,6 @@ import { EventEmitter } from "events"; import { getPlanById, updatePlanStatus } from "../db/plans"; +import { recordExecution } from "../db/executions"; import { prepareDLTExecution, commitDLTExecution, @@ -182,7 +183,7 @@ export class ExecutionCoordinator extends EventEmitter { return { txHash: result.txHash }; } - private async bankInstructionPhase(executionId: string, plan: Plan): Promise<{ isoMessageId: string }> { + private async bankInstructionPhase(executionId: string, plan: Plan): Promise<{ isoMessageId: string; swiftMessageId?: string; swiftMessageType?: string }> { this.emitStatus(executionId, { phase: "bank_instruction", status: "in_progress", timestamp: new Date().toISOString() }); const result = await commitBankInstruction(plan); @@ -193,8 +194,25 @@ export class ExecutionCoordinator extends EventEmitter { const rec = this.executions.get(executionId); if (rec) rec.isoMessageId = result.isoMessageId; + // Persist the SWIFT reference set (arch §4 canonical "Execution + // Reference Set"; gap v2 §4 partial, §10.6). + const swiftMessageId = (result as { swiftMessageId?: string }).swiftMessageId; + const swiftMessageType = (result as { swiftMessageType?: string }).swiftMessageType; + try { + await recordExecution(executionId, plan.plan_id!, { + phase: "bank_instruction", + isoMessageId: result.isoMessageId, + swiftMessageId: swiftMessageId ?? null, + swiftMessageType: swiftMessageType ?? null, + }); + } catch (err) { + // DB persistence is best-effort here; a failure should not abort + // the leg — the in-memory execution record still carries the id. + console.warn(`recordExecution failed for ${executionId}:`, err); + } + this.emitStatus(executionId, { phase: "bank_instruction", status: "complete", isoMessageId: result.isoMessageId, timestamp: new Date().toISOString() }); - return { isoMessageId: result.isoMessageId }; + return { isoMessageId: result.isoMessageId, swiftMessageId, swiftMessageType }; } /** diff --git a/orchestrator/tests/unit/swiftMessageId.test.ts b/orchestrator/tests/unit/swiftMessageId.test.ts new file mode 100644 index 0000000..0d9d843 --- /dev/null +++ b/orchestrator/tests/unit/swiftMessageId.test.ts @@ -0,0 +1,139 @@ +/** + * SWIFT message-id persistence (arch §4 Execution Reference Set, + * gap v2 §4 partial, §10.6). + * + * Commands tested: + * - commitBankInstruction returns swiftMessageId + swiftMessageType + * depending on whether the plan has an issueInstrument step + * - MT760 reference for instrument legs; MT202 synthetic ref for + * payment-only legs + * - db/executions.recordExecution upserts the SWIFT fields + */ + +import { describe, it, expect, jest } from "@jest/globals"; +import type { Plan } from "../../src/types/plan"; +import { commitBankInstruction } from "../../src/services/bank"; + +jest.mock("../../src/db/postgres", () => { + const calls: Array<{ sql: string; params?: unknown[] }> = []; + return { + query: jest.fn(async (sql: string, params?: unknown[]) => { + calls.push({ sql, params }); + return []; + }), + __calls: calls, + }; +}); + +jest.mock("../../src/services/compliance", () => ({ + getComplianceData: jest.fn(async () => ({ lei: "TEST-LEI", status: "ok" })), +})); + +import { recordExecution, getExecution } from "../../src/db/executions"; + +function basePlan(overrides: Partial = {}): Plan { + return { + plan_id: "plan-test-1", + schema_version: 1, + creator: "0xabc", + nonce: 1, + created_at: new Date().toISOString(), + steps: [ + { + type: "pay", + from: "acct-a", + to: "acct-b", + amount: 100, + currency: "USD", + } as any, + ], + ...overrides, + } as Plan; +} + +function instrumentPlan(): Plan { + return basePlan({ + steps: [ + { + type: "issueInstrument", + instrument: { + instrumentType: "SBLC", + amount: 1000000, + currency: "USD", + issuingBankBIC: "EIBIAEAD", + beneficiaryBankBIC: "ADCBAEAA", + beneficiaryName: "ACME TRADING LLC", + beneficiaryAccount: "AE12 3456 7890 1234", + expiryDate: "2026-12-31", + placeOfPresentation: "DUBAI", + governingLaw: "URDG 758", + applicant: "APPLICANT INC", + templateRef: "EI-SBLC-v1", + templateHash: "a".repeat(64), + tenor: "12M", + }, + } as any, + { + type: "pay", + from: "acct-a", + to: "acct-b", + amount: 1000000, + currency: "USD", + } as any, + ], + }); +} + +describe("commitBankInstruction SWIFT reference output", () => { + it("issues an MT760 reference when the plan contains issueInstrument", async () => { + const result = await commitBankInstruction(instrumentPlan()); + expect(result.success).toBe(true); + expect(result.swiftMessageType).toBe("MT760"); + expect(result.swiftMessageId).toMatch(/^MT760-/); + expect(result.isoMessageId).toMatch(/^MSG-/); + }); + + it("issues an MT202 reference when no issueInstrument step is present", async () => { + const result = await commitBankInstruction(basePlan()); + expect(result.success).toBe(true); + expect(result.swiftMessageType).toBe("MT202"); + expect(result.swiftMessageId).toMatch(/^MT202-/); + }); + + it("returns different swiftMessageIds across successive calls", async () => { + const a = await commitBankInstruction(basePlan()); + const b = await commitBankInstruction(basePlan()); + expect(a.swiftMessageId).not.toBe(b.swiftMessageId); + }); +}); + +describe("db/executions SQL wiring", () => { + it("recordExecution builds an UPSERT including swift_message_id fields", async () => { + const pg = require("../../src/db/postgres"); + pg.__calls.length = 0; + await recordExecution("exec-1", "plan-1", { + phase: "bank_instruction", + isoMessageId: "iso-1", + swiftMessageId: "MT760-ABC", + swiftMessageType: "MT760", + }); + expect(pg.query).toHaveBeenCalled(); + const call = pg.__calls[0]; + expect(call.sql).toMatch(/INSERT INTO executions/); + expect(call.sql).toMatch(/swift_message_id/); + expect(call.sql).toMatch(/swift_message_type/); + expect(call.sql).toMatch(/ON CONFLICT \(execution_id\) DO UPDATE/); + expect(call.params).toEqual( + expect.arrayContaining(["exec-1", "plan-1", "bank_instruction", "iso-1", "MT760-ABC", "MT760"]), + ); + }); + + it("getExecution selects the swift_message_* columns", async () => { + const pg = require("../../src/db/postgres"); + pg.__calls.length = 0; + await getExecution("exec-1"); + const call = pg.__calls[0]; + expect(call.sql).toMatch(/swift_message_id/); + expect(call.sql).toMatch(/swift_message_type/); + }); +});