Compare commits
1 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c72f9cd807 |
112
orchestrator/src/db/executions.ts
Normal file
112
orchestrator/src/db/executions.ts
Normal file
@@ -0,0 +1,112 @@
|
|||||||
|
/**
|
||||||
|
* Executions DB helpers — arch §4 canonical "Execution Reference Set".
|
||||||
|
*
|
||||||
|
* The executions row is the join point for the three dispatch references
|
||||||
|
* that must be reconciled at VALIDATING time (arch §9.2):
|
||||||
|
*
|
||||||
|
* - dlt_tx_hash — shared-state / ledger anchor (Chain-138)
|
||||||
|
* - iso_message_id — ISO-20022 envelope id (pacs.009 / pacs.008)
|
||||||
|
* - swift_message_id — SWIFT FIN reference for the leg (MT760 / MT202)
|
||||||
|
* - swift_message_type — FIN msg type ("MT760" | "MT202" | "pacs.009" …)
|
||||||
|
*
|
||||||
|
* `recordExecution()` UPSERTs by (plan_id, execution_id).
|
||||||
|
*/
|
||||||
|
|
||||||
|
import { query } from "./postgres";
|
||||||
|
|
||||||
|
export interface ExecutionRow {
|
||||||
|
execution_id: string;
|
||||||
|
plan_id: string;
|
||||||
|
status: string;
|
||||||
|
phase: string | null;
|
||||||
|
started_at: string;
|
||||||
|
completed_at: string | null;
|
||||||
|
error: string | null;
|
||||||
|
dlt_tx_hash: string | null;
|
||||||
|
iso_message_id: string | null;
|
||||||
|
swift_message_id: string | null;
|
||||||
|
swift_message_type: string | null;
|
||||||
|
}
|
||||||
|
|
||||||
|
export interface ExecutionPatch {
|
||||||
|
status?: string;
|
||||||
|
phase?: string;
|
||||||
|
completedAt?: Date | null;
|
||||||
|
error?: string | null;
|
||||||
|
dltTxHash?: string | null;
|
||||||
|
isoMessageId?: string | null;
|
||||||
|
swiftMessageId?: string | null;
|
||||||
|
swiftMessageType?: string | null;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* UPSERT an execution row. Safe to call repeatedly — phased fields
|
||||||
|
* are merged via COALESCE semantics (new NULL never clobbers a prior
|
||||||
|
* non-NULL value; explicit empty-string caller still overrides via
|
||||||
|
* patch semantics below).
|
||||||
|
*/
|
||||||
|
export async function recordExecution(
|
||||||
|
executionId: string,
|
||||||
|
planId: string,
|
||||||
|
patch: ExecutionPatch = {},
|
||||||
|
): Promise<void> {
|
||||||
|
await query(
|
||||||
|
`INSERT INTO executions (
|
||||||
|
execution_id, plan_id, status, phase, completed_at, error,
|
||||||
|
dlt_tx_hash, iso_message_id, swift_message_id, swift_message_type
|
||||||
|
)
|
||||||
|
VALUES ($1, $2, COALESCE($3, 'pending'), $4, $5, $6, $7, $8, $9, $10)
|
||||||
|
ON CONFLICT (execution_id) DO UPDATE SET
|
||||||
|
status = COALESCE(EXCLUDED.status, executions.status),
|
||||||
|
phase = COALESCE(EXCLUDED.phase, executions.phase),
|
||||||
|
completed_at = COALESCE(EXCLUDED.completed_at, executions.completed_at),
|
||||||
|
error = COALESCE(EXCLUDED.error, executions.error),
|
||||||
|
dlt_tx_hash = COALESCE(EXCLUDED.dlt_tx_hash, executions.dlt_tx_hash),
|
||||||
|
iso_message_id = COALESCE(EXCLUDED.iso_message_id, executions.iso_message_id),
|
||||||
|
swift_message_id = COALESCE(EXCLUDED.swift_message_id, executions.swift_message_id),
|
||||||
|
swift_message_type = COALESCE(EXCLUDED.swift_message_type, executions.swift_message_type),
|
||||||
|
updated_at = CURRENT_TIMESTAMP`,
|
||||||
|
[
|
||||||
|
executionId,
|
||||||
|
planId,
|
||||||
|
patch.status ?? null,
|
||||||
|
patch.phase ?? null,
|
||||||
|
patch.completedAt ?? null,
|
||||||
|
patch.error ?? null,
|
||||||
|
patch.dltTxHash ?? null,
|
||||||
|
patch.isoMessageId ?? null,
|
||||||
|
patch.swiftMessageId ?? null,
|
||||||
|
patch.swiftMessageType ?? null,
|
||||||
|
],
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
export async function getExecution(
|
||||||
|
executionId: string,
|
||||||
|
): Promise<ExecutionRow | null> {
|
||||||
|
const rows = await query<ExecutionRow>(
|
||||||
|
`SELECT execution_id, plan_id, status, phase, started_at, completed_at,
|
||||||
|
error, dlt_tx_hash, iso_message_id,
|
||||||
|
swift_message_id, swift_message_type
|
||||||
|
FROM executions
|
||||||
|
WHERE execution_id = $1`,
|
||||||
|
[executionId],
|
||||||
|
);
|
||||||
|
return rows[0] ?? null;
|
||||||
|
}
|
||||||
|
|
||||||
|
export async function findBySwiftMessageId(
|
||||||
|
swiftMessageId: string,
|
||||||
|
): Promise<ExecutionRow | null> {
|
||||||
|
const rows = await query<ExecutionRow>(
|
||||||
|
`SELECT execution_id, plan_id, status, phase, started_at, completed_at,
|
||||||
|
error, dlt_tx_hash, iso_message_id,
|
||||||
|
swift_message_id, swift_message_type
|
||||||
|
FROM executions
|
||||||
|
WHERE swift_message_id = $1
|
||||||
|
ORDER BY started_at DESC
|
||||||
|
LIMIT 1`,
|
||||||
|
[swiftMessageId],
|
||||||
|
);
|
||||||
|
return rows[0] ?? null;
|
||||||
|
}
|
||||||
34
orchestrator/src/db/migrations/006_executions_swift.ts
Normal file
34
orchestrator/src/db/migrations/006_executions_swift.ts
Normal file
@@ -0,0 +1,34 @@
|
|||||||
|
import { query } from "../postgres";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Migration 006 — executions.swift_message_id + swift_message_type
|
||||||
|
* (arch §4 canonical "Execution Reference Set"; gap v2 §4 partial,
|
||||||
|
* §10.6 SWIFT message ID persistence).
|
||||||
|
*
|
||||||
|
* The Execution Reference Set needs the SWIFT FIN reference for each
|
||||||
|
* leg (MT760 for the instrument leg, pacs.009/MT202 for the payment
|
||||||
|
* leg), alongside the existing `dlt_tx_hash` for the shared-state
|
||||||
|
* anchor and `iso_message_id` for the ISO-20022 envelope. Keeping them
|
||||||
|
* separate makes it trivial to reconcile a SWIFT acknowledgment
|
||||||
|
* (camt.025/054) against the originating dispatch.
|
||||||
|
*/
|
||||||
|
export async function up() {
|
||||||
|
await query(
|
||||||
|
`ALTER TABLE executions
|
||||||
|
ADD COLUMN IF NOT EXISTS swift_message_id VARCHAR(255),
|
||||||
|
ADD COLUMN IF NOT EXISTS swift_message_type VARCHAR(32)`,
|
||||||
|
);
|
||||||
|
await query(
|
||||||
|
`CREATE INDEX IF NOT EXISTS idx_executions_swift_message_id
|
||||||
|
ON executions(swift_message_id)
|
||||||
|
WHERE swift_message_id IS NOT NULL`,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
export async function down() {
|
||||||
|
await query(
|
||||||
|
`ALTER TABLE executions
|
||||||
|
DROP COLUMN IF EXISTS swift_message_id,
|
||||||
|
DROP COLUMN IF EXISTS swift_message_type`,
|
||||||
|
);
|
||||||
|
}
|
||||||
@@ -2,6 +2,7 @@ import { up as up001 } from "./001_initial_schema";
|
|||||||
import { up as up002 } from "./002_transaction_state";
|
import { up as up002 } from "./002_transaction_state";
|
||||||
import { up as up003 } from "./003_events";
|
import { up as up003 } from "./003_events";
|
||||||
import { up as up004 } from "./004_idempotency_keys";
|
import { up as up004 } from "./004_idempotency_keys";
|
||||||
|
import { up as up006 } from "./006_executions_swift";
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Run all migrations
|
* Run all migrations
|
||||||
@@ -12,6 +13,7 @@ export async function runMigration() {
|
|||||||
await up002();
|
await up002();
|
||||||
await up003();
|
await up003();
|
||||||
await up004();
|
await up004();
|
||||||
|
await up006();
|
||||||
console.log("All migrations completed");
|
console.log("All migrations completed");
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
console.error("Migration failed:", error);
|
console.error("Migration failed:", error);
|
||||||
|
|||||||
@@ -1,5 +1,6 @@
|
|||||||
import type { Plan } from "../types/plan";
|
import type { Plan } from "../types/plan";
|
||||||
import { generatePacs008 } from "./iso20022";
|
import { generatePacs008 } from "./iso20022";
|
||||||
|
import { generateMt760 } from "./swift";
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Prepare bank instruction (2PC prepare phase)
|
* Prepare bank instruction (2PC prepare phase)
|
||||||
@@ -25,27 +26,57 @@ export async function prepareBankInstruction(plan: Plan): Promise<boolean> {
|
|||||||
export async function commitBankInstruction(plan: Plan): Promise<{
|
export async function commitBankInstruction(plan: Plan): Promise<{
|
||||||
success: boolean;
|
success: boolean;
|
||||||
isoMessageId?: string;
|
isoMessageId?: string;
|
||||||
|
/** SWIFT FIN reference for the leg (arch §4 Execution Reference Set). */
|
||||||
|
swiftMessageId?: string;
|
||||||
|
/** FIN message type, e.g. "MT760" for instrument issue, "MT202"/"pacs.009" for FI transfer. */
|
||||||
|
swiftMessageType?: string;
|
||||||
error?: string;
|
error?: string;
|
||||||
}> {
|
}> {
|
||||||
console.log(`[Bank] Committing instruction for plan ${plan.plan_id}`);
|
console.log(`[Bank] Committing instruction for plan ${plan.plan_id}`);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
// Generate final ISO-20022 message
|
// Generate final ISO-20022 envelope.
|
||||||
const isoMessage = await generatePacs008(plan);
|
await generatePacs008(plan);
|
||||||
|
|
||||||
// Mock: In real implementation, this would:
|
|
||||||
// 1. Send ISO message to bank connector
|
|
||||||
// 2. Receive confirmation and message ID
|
|
||||||
// 3. Store message ID for audit trail
|
|
||||||
|
|
||||||
const isoMessageId = `MSG-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`;
|
const isoMessageId = `MSG-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`;
|
||||||
|
|
||||||
// Simulate processing delay
|
// Generate a SWIFT reference for the leg. If any step is an
|
||||||
|
// instrument issuance (issueInstrument) we pin MT760; otherwise
|
||||||
|
// this is a pacs.009 / MT202 FI credit transfer. We don't send
|
||||||
|
// anything over the wire here — PR R stands up the FIN-link
|
||||||
|
// sandbox transport.
|
||||||
|
const hasInstrument = plan.steps.some((s) => s.type === "issueInstrument");
|
||||||
|
let swiftMessageId: string | undefined;
|
||||||
|
let swiftMessageType: string | undefined;
|
||||||
|
try {
|
||||||
|
if (hasInstrument) {
|
||||||
|
const instrumentStep = plan.steps.find((s) => s.type === "issueInstrument");
|
||||||
|
if (instrumentStep?.instrument) {
|
||||||
|
const txRef = `MT760-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`.toUpperCase();
|
||||||
|
const mt760 = generateMt760(instrumentStep.instrument, {
|
||||||
|
transactionReference: txRef,
|
||||||
|
issueDate: new Date().toISOString().slice(0, 10),
|
||||||
|
});
|
||||||
|
swiftMessageId = mt760.messageReference;
|
||||||
|
swiftMessageType = "MT760";
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
swiftMessageId = `MT202-${Date.now()}-${Math.random().toString(36).slice(2, 10)}`.toUpperCase();
|
||||||
|
swiftMessageType = "MT202";
|
||||||
|
}
|
||||||
|
} catch (err) {
|
||||||
|
// SWIFT generator errors should not fail the leg in mock mode — we
|
||||||
|
// still have an ISO message id. Surface the error in the log.
|
||||||
|
console.warn(`[Bank] SWIFT reference generation skipped: ${(err as Error).message}`);
|
||||||
|
}
|
||||||
|
|
||||||
await new Promise((resolve) => setTimeout(resolve, 300));
|
await new Promise((resolve) => setTimeout(resolve, 300));
|
||||||
|
|
||||||
return {
|
return {
|
||||||
success: true,
|
success: true,
|
||||||
isoMessageId,
|
isoMessageId,
|
||||||
|
swiftMessageId,
|
||||||
|
swiftMessageType,
|
||||||
};
|
};
|
||||||
} catch (error: any) {
|
} catch (error: any) {
|
||||||
return {
|
return {
|
||||||
|
|||||||
@@ -1,5 +1,6 @@
|
|||||||
import { EventEmitter } from "events";
|
import { EventEmitter } from "events";
|
||||||
import { getPlanById, updatePlanStatus } from "../db/plans";
|
import { getPlanById, updatePlanStatus } from "../db/plans";
|
||||||
|
import { recordExecution } from "../db/executions";
|
||||||
import {
|
import {
|
||||||
prepareDLTExecution,
|
prepareDLTExecution,
|
||||||
commitDLTExecution,
|
commitDLTExecution,
|
||||||
@@ -182,7 +183,7 @@ export class ExecutionCoordinator extends EventEmitter {
|
|||||||
return { txHash: result.txHash };
|
return { txHash: result.txHash };
|
||||||
}
|
}
|
||||||
|
|
||||||
private async bankInstructionPhase(executionId: string, plan: Plan): Promise<{ isoMessageId: string }> {
|
private async bankInstructionPhase(executionId: string, plan: Plan): Promise<{ isoMessageId: string; swiftMessageId?: string; swiftMessageType?: string }> {
|
||||||
this.emitStatus(executionId, { phase: "bank_instruction", status: "in_progress", timestamp: new Date().toISOString() });
|
this.emitStatus(executionId, { phase: "bank_instruction", status: "in_progress", timestamp: new Date().toISOString() });
|
||||||
|
|
||||||
const result = await commitBankInstruction(plan);
|
const result = await commitBankInstruction(plan);
|
||||||
@@ -193,8 +194,25 @@ export class ExecutionCoordinator extends EventEmitter {
|
|||||||
const rec = this.executions.get(executionId);
|
const rec = this.executions.get(executionId);
|
||||||
if (rec) rec.isoMessageId = result.isoMessageId;
|
if (rec) rec.isoMessageId = result.isoMessageId;
|
||||||
|
|
||||||
|
// Persist the SWIFT reference set (arch §4 canonical "Execution
|
||||||
|
// Reference Set"; gap v2 §4 partial, §10.6).
|
||||||
|
const swiftMessageId = (result as { swiftMessageId?: string }).swiftMessageId;
|
||||||
|
const swiftMessageType = (result as { swiftMessageType?: string }).swiftMessageType;
|
||||||
|
try {
|
||||||
|
await recordExecution(executionId, plan.plan_id!, {
|
||||||
|
phase: "bank_instruction",
|
||||||
|
isoMessageId: result.isoMessageId,
|
||||||
|
swiftMessageId: swiftMessageId ?? null,
|
||||||
|
swiftMessageType: swiftMessageType ?? null,
|
||||||
|
});
|
||||||
|
} catch (err) {
|
||||||
|
// DB persistence is best-effort here; a failure should not abort
|
||||||
|
// the leg — the in-memory execution record still carries the id.
|
||||||
|
console.warn(`recordExecution failed for ${executionId}:`, err);
|
||||||
|
}
|
||||||
|
|
||||||
this.emitStatus(executionId, { phase: "bank_instruction", status: "complete", isoMessageId: result.isoMessageId, timestamp: new Date().toISOString() });
|
this.emitStatus(executionId, { phase: "bank_instruction", status: "complete", isoMessageId: result.isoMessageId, timestamp: new Date().toISOString() });
|
||||||
return { isoMessageId: result.isoMessageId };
|
return { isoMessageId: result.isoMessageId, swiftMessageId, swiftMessageType };
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|||||||
139
orchestrator/tests/unit/swiftMessageId.test.ts
Normal file
139
orchestrator/tests/unit/swiftMessageId.test.ts
Normal file
@@ -0,0 +1,139 @@
|
|||||||
|
/**
|
||||||
|
* SWIFT message-id persistence (arch §4 Execution Reference Set,
|
||||||
|
* gap v2 §4 partial, §10.6).
|
||||||
|
*
|
||||||
|
* Commands tested:
|
||||||
|
* - commitBankInstruction returns swiftMessageId + swiftMessageType
|
||||||
|
* depending on whether the plan has an issueInstrument step
|
||||||
|
* - MT760 reference for instrument legs; MT202 synthetic ref for
|
||||||
|
* payment-only legs
|
||||||
|
* - db/executions.recordExecution upserts the SWIFT fields
|
||||||
|
*/
|
||||||
|
|
||||||
|
import { describe, it, expect, jest } from "@jest/globals";
|
||||||
|
import type { Plan } from "../../src/types/plan";
|
||||||
|
import { commitBankInstruction } from "../../src/services/bank";
|
||||||
|
|
||||||
|
jest.mock("../../src/db/postgres", () => {
|
||||||
|
const calls: Array<{ sql: string; params?: unknown[] }> = [];
|
||||||
|
return {
|
||||||
|
query: jest.fn(async (sql: string, params?: unknown[]) => {
|
||||||
|
calls.push({ sql, params });
|
||||||
|
return [];
|
||||||
|
}),
|
||||||
|
__calls: calls,
|
||||||
|
};
|
||||||
|
});
|
||||||
|
|
||||||
|
jest.mock("../../src/services/compliance", () => ({
|
||||||
|
getComplianceData: jest.fn(async () => ({ lei: "TEST-LEI", status: "ok" })),
|
||||||
|
}));
|
||||||
|
|
||||||
|
import { recordExecution, getExecution } from "../../src/db/executions";
|
||||||
|
|
||||||
|
function basePlan(overrides: Partial<Plan> = {}): Plan {
|
||||||
|
return {
|
||||||
|
plan_id: "plan-test-1",
|
||||||
|
schema_version: 1,
|
||||||
|
creator: "0xabc",
|
||||||
|
nonce: 1,
|
||||||
|
created_at: new Date().toISOString(),
|
||||||
|
steps: [
|
||||||
|
{
|
||||||
|
type: "pay",
|
||||||
|
from: "acct-a",
|
||||||
|
to: "acct-b",
|
||||||
|
amount: 100,
|
||||||
|
currency: "USD",
|
||||||
|
} as any,
|
||||||
|
],
|
||||||
|
...overrides,
|
||||||
|
} as Plan;
|
||||||
|
}
|
||||||
|
|
||||||
|
function instrumentPlan(): Plan {
|
||||||
|
return basePlan({
|
||||||
|
steps: [
|
||||||
|
{
|
||||||
|
type: "issueInstrument",
|
||||||
|
instrument: {
|
||||||
|
instrumentType: "SBLC",
|
||||||
|
amount: 1000000,
|
||||||
|
currency: "USD",
|
||||||
|
issuingBankBIC: "EIBIAEAD",
|
||||||
|
beneficiaryBankBIC: "ADCBAEAA",
|
||||||
|
beneficiaryName: "ACME TRADING LLC",
|
||||||
|
beneficiaryAccount: "AE12 3456 7890 1234",
|
||||||
|
expiryDate: "2026-12-31",
|
||||||
|
placeOfPresentation: "DUBAI",
|
||||||
|
governingLaw: "URDG 758",
|
||||||
|
applicant: "APPLICANT INC",
|
||||||
|
templateRef: "EI-SBLC-v1",
|
||||||
|
templateHash: "a".repeat(64),
|
||||||
|
tenor: "12M",
|
||||||
|
},
|
||||||
|
} as any,
|
||||||
|
{
|
||||||
|
type: "pay",
|
||||||
|
from: "acct-a",
|
||||||
|
to: "acct-b",
|
||||||
|
amount: 1000000,
|
||||||
|
currency: "USD",
|
||||||
|
} as any,
|
||||||
|
],
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
describe("commitBankInstruction SWIFT reference output", () => {
|
||||||
|
it("issues an MT760 reference when the plan contains issueInstrument", async () => {
|
||||||
|
const result = await commitBankInstruction(instrumentPlan());
|
||||||
|
expect(result.success).toBe(true);
|
||||||
|
expect(result.swiftMessageType).toBe("MT760");
|
||||||
|
expect(result.swiftMessageId).toMatch(/^MT760-/);
|
||||||
|
expect(result.isoMessageId).toMatch(/^MSG-/);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("issues an MT202 reference when no issueInstrument step is present", async () => {
|
||||||
|
const result = await commitBankInstruction(basePlan());
|
||||||
|
expect(result.success).toBe(true);
|
||||||
|
expect(result.swiftMessageType).toBe("MT202");
|
||||||
|
expect(result.swiftMessageId).toMatch(/^MT202-/);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("returns different swiftMessageIds across successive calls", async () => {
|
||||||
|
const a = await commitBankInstruction(basePlan());
|
||||||
|
const b = await commitBankInstruction(basePlan());
|
||||||
|
expect(a.swiftMessageId).not.toBe(b.swiftMessageId);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe("db/executions SQL wiring", () => {
|
||||||
|
it("recordExecution builds an UPSERT including swift_message_id fields", async () => {
|
||||||
|
const pg = require("../../src/db/postgres");
|
||||||
|
pg.__calls.length = 0;
|
||||||
|
await recordExecution("exec-1", "plan-1", {
|
||||||
|
phase: "bank_instruction",
|
||||||
|
isoMessageId: "iso-1",
|
||||||
|
swiftMessageId: "MT760-ABC",
|
||||||
|
swiftMessageType: "MT760",
|
||||||
|
});
|
||||||
|
expect(pg.query).toHaveBeenCalled();
|
||||||
|
const call = pg.__calls[0];
|
||||||
|
expect(call.sql).toMatch(/INSERT INTO executions/);
|
||||||
|
expect(call.sql).toMatch(/swift_message_id/);
|
||||||
|
expect(call.sql).toMatch(/swift_message_type/);
|
||||||
|
expect(call.sql).toMatch(/ON CONFLICT \(execution_id\) DO UPDATE/);
|
||||||
|
expect(call.params).toEqual(
|
||||||
|
expect.arrayContaining(["exec-1", "plan-1", "bank_instruction", "iso-1", "MT760-ABC", "MT760"]),
|
||||||
|
);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("getExecution selects the swift_message_* columns", async () => {
|
||||||
|
const pg = require("../../src/db/postgres");
|
||||||
|
pg.__calls.length = 0;
|
||||||
|
await getExecution("exec-1");
|
||||||
|
const call = pg.__calls[0];
|
||||||
|
expect(call.sql).toMatch(/swift_message_id/);
|
||||||
|
expect(call.sql).toMatch(/swift_message_type/);
|
||||||
|
});
|
||||||
|
});
|
||||||
Reference in New Issue
Block a user