Compare commits
5 Commits
sync/curre
...
8dcdb4531c
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
8dcdb4531c | ||
|
|
18bdaf61d5 | ||
|
|
5bd6a200c3 | ||
|
|
908c386dff | ||
|
|
b24a4df983 |
9
orchestrator/jest.config.js
Normal file
9
orchestrator/jest.config.js
Normal file
@@ -0,0 +1,9 @@
|
||||
/** @type {import('jest').Config} */
|
||||
module.exports = {
|
||||
preset: "ts-jest",
|
||||
testEnvironment: "node",
|
||||
roots: ["<rootDir>/tests"],
|
||||
testMatch: ["**/*.test.ts"],
|
||||
testPathIgnorePatterns: ["/node_modules/", "/integration/", "/chaos/", "/load/"],
|
||||
moduleFileExtensions: ["ts", "js", "json"],
|
||||
};
|
||||
@@ -13,6 +13,7 @@
|
||||
"dependencies": {
|
||||
"cors": "^2.8.5",
|
||||
"dotenv": "^17.2.3",
|
||||
"ethers": "^6.16.0",
|
||||
"express": "^4.18.2",
|
||||
"express-rate-limit": "^7.1.5",
|
||||
"helmet": "^7.1.0",
|
||||
@@ -25,11 +26,17 @@
|
||||
"zod": "^3.22.4"
|
||||
},
|
||||
"devDependencies": {
|
||||
"@jest/globals": "^30.3.0",
|
||||
"@types/cors": "^2.8.17",
|
||||
"@types/express": "^4.17.21",
|
||||
"@types/jest": "^30.0.0",
|
||||
"@types/node": "^20.10.0",
|
||||
"@types/pg": "^8.10.9",
|
||||
"@types/supertest": "^7.2.0",
|
||||
"@types/uuid": "^9.0.6",
|
||||
"jest": "^30.3.0",
|
||||
"supertest": "^7.2.2",
|
||||
"ts-jest": "^29.4.9",
|
||||
"ts-node": "^10.9.2",
|
||||
"typescript": "^5.3.3"
|
||||
}
|
||||
|
||||
@@ -4,6 +4,12 @@ import { createHash } from "crypto";
|
||||
import { validatePlan, checkStepDependencies } from "../services/planValidation";
|
||||
import { storePlan, getPlanById, updatePlanSignature, listPlans } from "../db/plans";
|
||||
import { asyncHandler, AppError, ErrorType } from "../services/errorHandler";
|
||||
import { getTransactionState, getTransitionHistory } from "../services/stateMachine";
|
||||
import {
|
||||
getEventsForPlan,
|
||||
subscribe as subscribeToEvents,
|
||||
verifyChain,
|
||||
} from "../services/eventBus";
|
||||
import type { Plan, PlanStep } from "../types/plan";
|
||||
|
||||
/**
|
||||
@@ -194,3 +200,107 @@ export const validatePlanEndpoint = asyncHandler(async (req: Request, res: Respo
|
||||
});
|
||||
});
|
||||
|
||||
/**
|
||||
* GET /api/plans/:planId/state
|
||||
* Return the current workflow state + full state-transition history.
|
||||
* Arch note §8 + §14 (audit chain).
|
||||
*/
|
||||
export const getPlanState = asyncHandler(async (req: Request, res: Response) => {
|
||||
const { planId } = req.params;
|
||||
const plan = await getPlanById(planId);
|
||||
if (!plan) {
|
||||
throw new AppError(ErrorType.NOT_FOUND_ERROR, 404, "Plan not found");
|
||||
}
|
||||
|
||||
const [state, history] = await Promise.all([
|
||||
getTransactionState(planId),
|
||||
getTransitionHistory(planId),
|
||||
]);
|
||||
|
||||
res.json({
|
||||
plan_id: planId,
|
||||
transaction_state: state,
|
||||
legacy_status: plan.status,
|
||||
transitions: history,
|
||||
});
|
||||
});
|
||||
|
||||
/**
|
||||
* GET /api/plans/:planId/events
|
||||
* Return the full signed + hash-chained event trail for a plan
|
||||
* (arch §4.5 State Registry + §7 Event Model + §14 Audit).
|
||||
*
|
||||
* Query `?verify=1` re-verifies the chain server-side and adds
|
||||
* { chain_valid: true|false, broken_at?: n } to the response.
|
||||
*/
|
||||
export const getPlanEvents = asyncHandler(async (req: Request, res: Response) => {
|
||||
const { planId } = req.params;
|
||||
const plan = await getPlanById(planId);
|
||||
if (!plan) {
|
||||
throw new AppError(ErrorType.NOT_FOUND_ERROR, 404, "Plan not found");
|
||||
}
|
||||
|
||||
const events = await getEventsForPlan(planId);
|
||||
|
||||
const body: {
|
||||
plan_id: string;
|
||||
count: number;
|
||||
events: typeof events;
|
||||
chain_valid?: boolean;
|
||||
broken_at?: number;
|
||||
broken_reason?: string;
|
||||
} = { plan_id: planId, count: events.length, events };
|
||||
|
||||
if (req.query.verify === "1") {
|
||||
const v = await verifyChain(planId);
|
||||
body.chain_valid = v.ok;
|
||||
if (!v.ok) {
|
||||
body.broken_at = v.brokenAt;
|
||||
body.broken_reason = v.reason;
|
||||
}
|
||||
}
|
||||
|
||||
res.json(body);
|
||||
});
|
||||
|
||||
/**
|
||||
* GET /api/plans/:planId/events/stream
|
||||
* Server-sent-events stream of live events for a single plan.
|
||||
*/
|
||||
export const streamPlanEvents = asyncHandler(async (req: Request, res: Response) => {
|
||||
const { planId } = req.params;
|
||||
const plan = await getPlanById(planId);
|
||||
if (!plan) {
|
||||
throw new AppError(ErrorType.NOT_FOUND_ERROR, 404, "Plan not found");
|
||||
}
|
||||
|
||||
res.setHeader("Content-Type", "text/event-stream");
|
||||
res.setHeader("Cache-Control", "no-cache, no-transform");
|
||||
res.setHeader("Connection", "keep-alive");
|
||||
res.setHeader("X-Accel-Buffering", "no");
|
||||
res.flushHeaders?.();
|
||||
|
||||
// Replay the history on connect so clients can reconstruct state
|
||||
// without a separate REST call.
|
||||
const history = await getEventsForPlan(planId);
|
||||
for (const e of history) {
|
||||
res.write(`id: ${e.id}\nevent: ${e.type}\ndata: ${JSON.stringify(e)}\n\n`);
|
||||
}
|
||||
|
||||
const unsubscribe = subscribeToEvents(planId, (record) => {
|
||||
res.write(
|
||||
`id: ${record.id}\nevent: ${record.type}\ndata: ${JSON.stringify(record)}\n\n`,
|
||||
);
|
||||
});
|
||||
|
||||
const keepAlive = setInterval(() => {
|
||||
res.write(": keep-alive\n\n");
|
||||
}, 15_000);
|
||||
|
||||
req.on("close", () => {
|
||||
clearInterval(keepAlive);
|
||||
unsubscribe();
|
||||
res.end();
|
||||
});
|
||||
});
|
||||
|
||||
|
||||
@@ -16,6 +16,12 @@ const envSchema = z.object({
|
||||
AZURE_KEY_VAULT_URL: z.string().url().optional(),
|
||||
AWS_SECRETS_MANAGER_REGION: z.string().optional(),
|
||||
SENTRY_DSN: z.string().url().optional(),
|
||||
// Chain-138 + NotaryRegistry wiring (arch §4.5). All optional; when
|
||||
// absent the notary adapter falls back to its deterministic mock.
|
||||
CHAIN_138_RPC_URL: z.string().url().optional(),
|
||||
CHAIN_138_CHAIN_ID: z.string().regex(/^\d+$/).optional(),
|
||||
NOTARY_REGISTRY_ADDRESS: z.string().regex(/^0x[0-9a-fA-F]{40}$/).optional(),
|
||||
ORCHESTRATOR_PRIVATE_KEY: z.string().regex(/^0x[0-9a-fA-F]{64}$/).optional(),
|
||||
});
|
||||
|
||||
/**
|
||||
@@ -34,6 +40,10 @@ export const env = envSchema.parse({
|
||||
AZURE_KEY_VAULT_URL: process.env.AZURE_KEY_VAULT_URL,
|
||||
AWS_SECRETS_MANAGER_REGION: process.env.AWS_SECRETS_MANAGER_REGION,
|
||||
SENTRY_DSN: process.env.SENTRY_DSN,
|
||||
CHAIN_138_RPC_URL: process.env.CHAIN_138_RPC_URL,
|
||||
CHAIN_138_CHAIN_ID: process.env.CHAIN_138_CHAIN_ID,
|
||||
NOTARY_REGISTRY_ADDRESS: process.env.NOTARY_REGISTRY_ADDRESS,
|
||||
ORCHESTRATOR_PRIVATE_KEY: process.env.ORCHESTRATOR_PRIVATE_KEY,
|
||||
});
|
||||
|
||||
/**
|
||||
|
||||
48
orchestrator/src/db/migrations/002_transaction_state.ts
Normal file
48
orchestrator/src/db/migrations/002_transaction_state.ts
Normal file
@@ -0,0 +1,48 @@
|
||||
import { query } from "../postgres";
|
||||
import { TRANSACTION_STATES } from "../../types/transactionState";
|
||||
|
||||
/**
|
||||
* Migration 002 — workflow-level transaction state.
|
||||
*
|
||||
* Architecture note §8 (12-state machine) + §9 (transition table).
|
||||
*
|
||||
* Adds:
|
||||
* - plans.transaction_state column (CHECK-constrained)
|
||||
* - transaction_state_transitions append-only table
|
||||
*/
|
||||
export async function up() {
|
||||
const states = TRANSACTION_STATES.map((s) => `'${s}'`).join(",");
|
||||
|
||||
await query(
|
||||
`ALTER TABLE plans
|
||||
ADD COLUMN IF NOT EXISTS transaction_state VARCHAR(32) NOT NULL
|
||||
DEFAULT 'DRAFT'
|
||||
CHECK (transaction_state IN (${states}))`,
|
||||
);
|
||||
|
||||
await query(
|
||||
`CREATE TABLE IF NOT EXISTS transaction_state_transitions (
|
||||
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
|
||||
plan_id UUID NOT NULL REFERENCES plans(plan_id) ON DELETE CASCADE,
|
||||
from_state VARCHAR(32),
|
||||
to_state VARCHAR(32) NOT NULL CHECK (to_state IN (${states})),
|
||||
reason TEXT,
|
||||
source_event_id UUID,
|
||||
actor VARCHAR(255) NOT NULL,
|
||||
actor_role VARCHAR(32) NOT NULL,
|
||||
signature TEXT,
|
||||
created_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP
|
||||
)`,
|
||||
);
|
||||
|
||||
await query(
|
||||
`CREATE INDEX IF NOT EXISTS idx_tx_transitions_plan_id
|
||||
ON transaction_state_transitions(plan_id)`,
|
||||
);
|
||||
await query(
|
||||
`CREATE INDEX IF NOT EXISTS idx_tx_transitions_created_at
|
||||
ON transaction_state_transitions(created_at)`,
|
||||
);
|
||||
|
||||
console.log("Migration 002 applied: transaction_state + transitions table");
|
||||
}
|
||||
43
orchestrator/src/db/migrations/003_events.ts
Normal file
43
orchestrator/src/db/migrations/003_events.ts
Normal file
@@ -0,0 +1,43 @@
|
||||
import { query } from "../postgres";
|
||||
|
||||
/**
|
||||
* Migration 003 — append-only events journal (arch §4.5, §5.5, §7).
|
||||
*
|
||||
* The `events` table is the system-of-record for normalised workflow
|
||||
* events (arch §7.2: `transaction.created`, `instrument.ready`,
|
||||
* `payment.settled`, `transaction.committed`, …). It is:
|
||||
*
|
||||
* - append-only (no UPDATE / DELETE)
|
||||
* - signed (HMAC of (plan_id, type, payload_hash, prev_hash))
|
||||
* - hash-chained via prev_hash for tamper-evident forensic replay
|
||||
* - indexed by plan_id so the SSE endpoint can stream efficiently
|
||||
*/
|
||||
export async function up() {
|
||||
await query(
|
||||
`CREATE TABLE IF NOT EXISTS events (
|
||||
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
|
||||
plan_id UUID NOT NULL REFERENCES plans(plan_id) ON DELETE CASCADE,
|
||||
type VARCHAR(128) NOT NULL,
|
||||
actor VARCHAR(255),
|
||||
payload JSONB NOT NULL DEFAULT '{}'::jsonb,
|
||||
payload_hash CHAR(64) NOT NULL,
|
||||
prev_hash CHAR(64),
|
||||
signature CHAR(64) NOT NULL,
|
||||
created_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP
|
||||
)`,
|
||||
);
|
||||
|
||||
await query(
|
||||
`CREATE INDEX IF NOT EXISTS idx_events_plan_id_created
|
||||
ON events(plan_id, created_at)`,
|
||||
);
|
||||
|
||||
await query(
|
||||
`CREATE INDEX IF NOT EXISTS idx_events_type
|
||||
ON events(type)`,
|
||||
);
|
||||
}
|
||||
|
||||
export async function down() {
|
||||
await query("DROP TABLE IF EXISTS events CASCADE");
|
||||
}
|
||||
@@ -1,4 +1,6 @@
|
||||
import { up as up001 } from "./001_initial_schema";
|
||||
import { up as up002 } from "./002_transaction_state";
|
||||
import { up as up003 } from "./003_events";
|
||||
|
||||
/**
|
||||
* Run all migrations
|
||||
@@ -6,10 +8,11 @@ import { up as up001 } from "./001_initial_schema";
|
||||
export async function runMigration() {
|
||||
try {
|
||||
await up001();
|
||||
console.log("✅ All migrations completed");
|
||||
await up002();
|
||||
await up003();
|
||||
console.log("All migrations completed");
|
||||
} catch (error) {
|
||||
console.error("❌ Migration failed:", error);
|
||||
console.error("Migration failed:", error);
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -14,7 +14,7 @@ import { requestTimeout } from "./middleware/timeout";
|
||||
import { logger } from "./logging/logger";
|
||||
import { getMetrics, httpRequestDuration, httpRequestTotal, register } from "./metrics/prometheus";
|
||||
import { healthCheck, readinessCheck, livenessCheck } from "./health/health";
|
||||
import { listPlansEndpoint, createPlan, getPlan, addSignature, validatePlanEndpoint } from "./api/plans";
|
||||
import { listPlansEndpoint, createPlan, getPlan, getPlanState, getPlanEvents, streamPlanEvents, addSignature, validatePlanEndpoint } from "./api/plans";
|
||||
import { streamPlanStatus } from "./api/sse";
|
||||
import { executionCoordinator } from "./services/execution";
|
||||
import { runMigration } from "./db/migrations";
|
||||
@@ -88,6 +88,9 @@ app.use("/api", apiLimiter);
|
||||
app.get("/api/plans", listPlansEndpoint);
|
||||
app.post("/api/plans", auditLog("CREATE_PLAN", "plan"), createPlan);
|
||||
app.get("/api/plans/:planId", getPlan);
|
||||
app.get("/api/plans/:planId/state", getPlanState);
|
||||
app.get("/api/plans/:planId/events", getPlanEvents);
|
||||
app.get("/api/plans/:planId/events/stream", streamPlanEvents);
|
||||
app.post("/api/plans/:planId/signature", addSignature);
|
||||
app.post("/api/plans/:planId/validate", validatePlanEndpoint);
|
||||
|
||||
|
||||
197
orchestrator/src/services/eventBus.ts
Normal file
197
orchestrator/src/services/eventBus.ts
Normal file
@@ -0,0 +1,197 @@
|
||||
/**
|
||||
* Typed, signed, append-only Event Bus (arch §5.5 Event Bus + §7).
|
||||
*
|
||||
* Architecture contract
|
||||
* ---------------------
|
||||
* 1. Every event is a normalised category from arch §7.2 — `EventType`.
|
||||
* 2. Every event is persisted to the `events` append-only table.
|
||||
* 3. Every event carries
|
||||
* payload_hash = sha256(JSON.stringify(payload))
|
||||
* prev_hash = signature of the previous event for the same plan
|
||||
* signature = hmac_sha256(secret, plan_id|type|payload_hash|prev_hash)
|
||||
* which gives a tamper-evident per-plan hash chain (arch §14 audit).
|
||||
* 4. Callers can subscribe to live events via `subscribe(planId, cb)` —
|
||||
* backed by a process-local EventEmitter that the SSE route consumes.
|
||||
*
|
||||
* When the orchestrator scales to >1 replicas, the in-process emitter
|
||||
* must be replaced by a broker (NATS / Kafka). The persistence layer
|
||||
* and signature chain remain unchanged.
|
||||
*/
|
||||
|
||||
import { createHash, createHmac } from "crypto";
|
||||
import { EventEmitter } from "events";
|
||||
import { query } from "../db/postgres";
|
||||
import { logger } from "../logging/logger";
|
||||
|
||||
/**
|
||||
* Normalised event types — arch §7.2. Keep this list as the single
|
||||
* source of truth so subscribers can exhaustively match on it.
|
||||
*/
|
||||
export const EVENT_TYPES = [
|
||||
"transaction.created",
|
||||
"participants.authorized",
|
||||
"preconditions.satisfied",
|
||||
"instrument.ready",
|
||||
"payment.ready",
|
||||
"transaction.prepared",
|
||||
"instrument.dispatched",
|
||||
"payment.dispatched",
|
||||
"instrument.acknowledged",
|
||||
"payment.accepted",
|
||||
"payment.settled",
|
||||
"transaction.validated",
|
||||
"transaction.committed",
|
||||
"transaction.aborted",
|
||||
"transaction.unwind_initiated",
|
||||
] as const;
|
||||
|
||||
export type EventType = (typeof EVENT_TYPES)[number];
|
||||
|
||||
export interface EventRecord {
|
||||
id: string;
|
||||
plan_id: string;
|
||||
type: EventType;
|
||||
actor: string | null;
|
||||
payload: Record<string, unknown>;
|
||||
payload_hash: string;
|
||||
prev_hash: string | null;
|
||||
signature: string;
|
||||
created_at: string;
|
||||
}
|
||||
|
||||
export interface PublishInput {
|
||||
planId: string;
|
||||
type: EventType;
|
||||
actor?: string;
|
||||
payload?: Record<string, unknown>;
|
||||
}
|
||||
|
||||
const emitter = new EventEmitter();
|
||||
emitter.setMaxListeners(0);
|
||||
|
||||
function getSigningSecret(): string {
|
||||
return (
|
||||
process.env.EVENT_BUS_HMAC_SECRET ??
|
||||
process.env.SESSION_SECRET ??
|
||||
"dev-event-bus-secret-change-in-production"
|
||||
);
|
||||
}
|
||||
|
||||
function sha256(input: string): string {
|
||||
return createHash("sha256").update(input).digest("hex");
|
||||
}
|
||||
|
||||
function sign(
|
||||
planId: string,
|
||||
type: string,
|
||||
payloadHash: string,
|
||||
prevHash: string | null,
|
||||
): string {
|
||||
const h = createHmac("sha256", getSigningSecret());
|
||||
h.update(`${planId}|${type}|${payloadHash}|${prevHash ?? ""}`);
|
||||
return h.digest("hex");
|
||||
}
|
||||
|
||||
/**
|
||||
* Publish a typed, signed, hash-chained event for a plan. Returns the
|
||||
* persisted record (including id + signature) so callers can reference
|
||||
* it from transition `source_event_id`.
|
||||
*/
|
||||
export async function publish(input: PublishInput): Promise<EventRecord> {
|
||||
const payload = input.payload ?? {};
|
||||
const payloadHash = sha256(JSON.stringify(payload));
|
||||
|
||||
const prev = await query<{ signature: string }>(
|
||||
`SELECT signature
|
||||
FROM events
|
||||
WHERE plan_id = $1
|
||||
ORDER BY created_at DESC, id DESC
|
||||
LIMIT 1`,
|
||||
[input.planId],
|
||||
);
|
||||
const prevHash = prev.length > 0 ? prev[0].signature : null;
|
||||
const signature = sign(input.planId, input.type, payloadHash, prevHash);
|
||||
|
||||
const rows = await query<EventRecord>(
|
||||
`INSERT INTO events (plan_id, type, actor, payload, payload_hash, prev_hash, signature)
|
||||
VALUES ($1, $2, $3, $4::jsonb, $5, $6, $7)
|
||||
RETURNING id, plan_id, type, actor, payload, payload_hash, prev_hash, signature, created_at`,
|
||||
[
|
||||
input.planId,
|
||||
input.type,
|
||||
input.actor ?? null,
|
||||
JSON.stringify(payload),
|
||||
payloadHash,
|
||||
prevHash,
|
||||
signature,
|
||||
],
|
||||
);
|
||||
|
||||
const record = rows[0];
|
||||
logger.info(
|
||||
{ planId: record.plan_id, type: record.type, eventId: record.id },
|
||||
"[EventBus] published",
|
||||
);
|
||||
|
||||
emitter.emit(`plan:${record.plan_id}`, record);
|
||||
emitter.emit("plan:*", record);
|
||||
|
||||
return record;
|
||||
}
|
||||
|
||||
/**
|
||||
* Read the full event trail for a plan in chronological order.
|
||||
*/
|
||||
export async function getEventsForPlan(planId: string): Promise<EventRecord[]> {
|
||||
return query<EventRecord>(
|
||||
`SELECT id, plan_id, type, actor, payload, payload_hash, prev_hash, signature, created_at
|
||||
FROM events
|
||||
WHERE plan_id = $1
|
||||
ORDER BY created_at ASC, id ASC`,
|
||||
[planId],
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Verify the full hash chain for a plan's events. Returns `{ ok: true }`
|
||||
* when every signature matches and `prev_hash` forms a contiguous chain;
|
||||
* otherwise returns the first index that fails with a reason.
|
||||
*/
|
||||
export async function verifyChain(planId: string): Promise<
|
||||
{ ok: true } | { ok: false; brokenAt: number; reason: string }
|
||||
> {
|
||||
const events = await getEventsForPlan(planId);
|
||||
let prevSig: string | null = null;
|
||||
for (let i = 0; i < events.length; i++) {
|
||||
const e = events[i];
|
||||
if (e.prev_hash !== prevSig) {
|
||||
return { ok: false, brokenAt: i, reason: "prev_hash mismatch" };
|
||||
}
|
||||
const expectedPayloadHash = sha256(JSON.stringify(e.payload));
|
||||
if (expectedPayloadHash !== e.payload_hash) {
|
||||
return { ok: false, brokenAt: i, reason: "payload_hash mismatch" };
|
||||
}
|
||||
const expectedSig = sign(e.plan_id, e.type, e.payload_hash, e.prev_hash);
|
||||
if (expectedSig !== e.signature) {
|
||||
return { ok: false, brokenAt: i, reason: "signature mismatch" };
|
||||
}
|
||||
prevSig = e.signature;
|
||||
}
|
||||
return { ok: true };
|
||||
}
|
||||
|
||||
/**
|
||||
* Subscribe to live events for a single plan. Returns an unsubscribe
|
||||
* function. Used by the SSE route.
|
||||
*/
|
||||
export function subscribe(
|
||||
planId: string,
|
||||
callback: (record: EventRecord) => void,
|
||||
): () => void {
|
||||
const channel = `plan:${planId}`;
|
||||
emitter.on(channel, callback);
|
||||
return () => emitter.off(channel, callback);
|
||||
}
|
||||
|
||||
/** test-only emitter access, never import in prod code */
|
||||
export const __emitterForTests = emitter;
|
||||
296
orchestrator/src/services/exceptionManager.ts
Normal file
296
orchestrator/src/services/exceptionManager.ts
Normal file
@@ -0,0 +1,296 @@
|
||||
/**
|
||||
* Unified Exception Manager — architecture note §5.9, §12.
|
||||
*
|
||||
* Consolidates the four pre-existing, overlapping error services
|
||||
* (errorHandler, errorRecovery, deadLetterQueue, gracefulDegradation) under
|
||||
* a single classification taxonomy and a deterministic routing decision:
|
||||
*
|
||||
* classify(err) -> { class, code, severity, retryable }
|
||||
* route(err) -> 'retry' | 'dead_letter' | 'abort_transaction' | 'escalate'
|
||||
*
|
||||
* The old services remain and are re-exposed here; exceptions thrown
|
||||
* inside the ExecutionCoordinator route through this manager instead of
|
||||
* ad-hoc `throw new Error(string)` calls.
|
||||
*/
|
||||
|
||||
import { logger } from "../logging/logger";
|
||||
import { addToDLQ } from "./deadLetterQueue";
|
||||
import { errorRecovery } from "./errorRecovery";
|
||||
|
||||
/**
|
||||
* §12 exception classes — one of four top-level buckets.
|
||||
*/
|
||||
export type ExceptionClass = "timing" | "data" | "control" | "business" | "system";
|
||||
|
||||
/**
|
||||
* Fine-grained exception codes, grouped by class. Source: arch note §12.
|
||||
*/
|
||||
export type ExceptionCode =
|
||||
// §12.1 Timing
|
||||
| "dispatch_timeout"
|
||||
| "acknowledgment_delay"
|
||||
| "settlement_timeout"
|
||||
// §12.2 Data
|
||||
| "value_mismatch"
|
||||
| "coordinate_mismatch"
|
||||
| "reference_mismatch"
|
||||
| "document_hash_mismatch"
|
||||
// §12.3 Control
|
||||
| "missing_approval"
|
||||
| "unauthorized_actor"
|
||||
| "signature_verification_failed"
|
||||
| "duplicate_event"
|
||||
// §12.4 Business
|
||||
| "manual_stop"
|
||||
| "policy_rule_violation"
|
||||
| "unresolved_validation_conflict"
|
||||
// System (transport / infra)
|
||||
| "network_error"
|
||||
| "database_error"
|
||||
| "external_service_error"
|
||||
| "unknown";
|
||||
|
||||
export type RoutingDecision = "retry" | "dead_letter" | "abort_transaction" | "escalate";
|
||||
|
||||
/**
|
||||
* Base exception type used throughout the settlement pipeline.
|
||||
*
|
||||
* Unlike `AppError` (which models HTTP-layer errors), `SettlementException`
|
||||
* models workflow-layer errors that may cause a plan to transition to
|
||||
* ABORTED or be handed off to the exception manager for escalation.
|
||||
*/
|
||||
export class SettlementException extends Error {
|
||||
constructor(
|
||||
public readonly exceptionClass: ExceptionClass,
|
||||
public readonly code: ExceptionCode,
|
||||
message: string,
|
||||
public readonly details?: Record<string, unknown>,
|
||||
public readonly cause?: Error,
|
||||
) {
|
||||
super(message);
|
||||
this.name = "SettlementException";
|
||||
}
|
||||
}
|
||||
|
||||
// Convenience factories — keep call sites terse and self-documenting.
|
||||
export const Timing = {
|
||||
dispatch(details?: Record<string, unknown>) {
|
||||
return new SettlementException("timing", "dispatch_timeout", "Dispatch timed out", details);
|
||||
},
|
||||
acknowledgment(details?: Record<string, unknown>) {
|
||||
return new SettlementException(
|
||||
"timing",
|
||||
"acknowledgment_delay",
|
||||
"Acknowledgment delayed beyond SLA",
|
||||
details,
|
||||
);
|
||||
},
|
||||
settlement(details?: Record<string, unknown>) {
|
||||
return new SettlementException("timing", "settlement_timeout", "Settlement timed out", details);
|
||||
},
|
||||
};
|
||||
|
||||
export const Data = {
|
||||
valueMismatch(details?: Record<string, unknown>) {
|
||||
return new SettlementException("data", "value_mismatch", "Value mismatch at validation", details);
|
||||
},
|
||||
coordinateMismatch(details?: Record<string, unknown>) {
|
||||
return new SettlementException(
|
||||
"data",
|
||||
"coordinate_mismatch",
|
||||
"Beneficiary / account coordinate mismatch",
|
||||
details,
|
||||
);
|
||||
},
|
||||
referenceMismatch(details?: Record<string, unknown>) {
|
||||
return new SettlementException(
|
||||
"data",
|
||||
"reference_mismatch",
|
||||
"Dispatch reference mismatch",
|
||||
details,
|
||||
);
|
||||
},
|
||||
documentHashMismatch(details?: Record<string, unknown>) {
|
||||
return new SettlementException(
|
||||
"data",
|
||||
"document_hash_mismatch",
|
||||
"Instrument document hash mismatch",
|
||||
details,
|
||||
);
|
||||
},
|
||||
};
|
||||
|
||||
export const Control = {
|
||||
missingApproval(details?: Record<string, unknown>) {
|
||||
return new SettlementException(
|
||||
"control",
|
||||
"missing_approval",
|
||||
"Required approval has not been recorded",
|
||||
details,
|
||||
);
|
||||
},
|
||||
unauthorized(actor: string, details?: Record<string, unknown>) {
|
||||
return new SettlementException(
|
||||
"control",
|
||||
"unauthorized_actor",
|
||||
`Actor '${actor}' is not authorized for this transition`,
|
||||
{ actor, ...details },
|
||||
);
|
||||
},
|
||||
signature(details?: Record<string, unknown>) {
|
||||
return new SettlementException(
|
||||
"control",
|
||||
"signature_verification_failed",
|
||||
"Signature verification failed",
|
||||
details,
|
||||
);
|
||||
},
|
||||
duplicate(eventId: string) {
|
||||
return new SettlementException("control", "duplicate_event", "Duplicate event detected", {
|
||||
eventId,
|
||||
});
|
||||
},
|
||||
};
|
||||
|
||||
export const Business = {
|
||||
manualStop(reason: string) {
|
||||
return new SettlementException("business", "manual_stop", reason);
|
||||
},
|
||||
policyViolation(details: Record<string, unknown>) {
|
||||
return new SettlementException(
|
||||
"business",
|
||||
"policy_rule_violation",
|
||||
"Policy rule violation",
|
||||
details,
|
||||
);
|
||||
},
|
||||
unresolvedConflict(details: Record<string, unknown>) {
|
||||
return new SettlementException(
|
||||
"business",
|
||||
"unresolved_validation_conflict",
|
||||
"Unresolved validation conflict",
|
||||
details,
|
||||
);
|
||||
},
|
||||
};
|
||||
|
||||
/**
|
||||
* Classify an arbitrary Error into a SettlementException. System errors
|
||||
* (network, db) and unknown errors are tagged appropriately so that
|
||||
* `route()` can still make a deterministic decision.
|
||||
*/
|
||||
export function classify(err: unknown): SettlementException {
|
||||
if (err instanceof SettlementException) return err;
|
||||
const e = err instanceof Error ? err : new Error(String(err));
|
||||
const msg = e.message.toLowerCase();
|
||||
|
||||
if (
|
||||
msg.includes("timeout") ||
|
||||
msg.includes("etimedout") ||
|
||||
msg.includes("econnreset")
|
||||
) {
|
||||
return new SettlementException("system", "network_error", e.message, undefined, e);
|
||||
}
|
||||
if (
|
||||
msg.includes("econnrefused") ||
|
||||
msg.includes("network") ||
|
||||
msg.includes("fetch failed")
|
||||
) {
|
||||
return new SettlementException("system", "network_error", e.message, undefined, e);
|
||||
}
|
||||
if (msg.includes("database") || msg.includes("postgres") || msg.includes("pg")) {
|
||||
return new SettlementException("system", "database_error", e.message, undefined, e);
|
||||
}
|
||||
return new SettlementException("system", "unknown", e.message, undefined, e);
|
||||
}
|
||||
|
||||
/**
|
||||
* Decide what to do with an exception. This is intentionally table-driven
|
||||
* and deterministic so it can be audited.
|
||||
*
|
||||
* timing / system → retry (with backoff, up to 3 attempts)
|
||||
* data → abort_transaction (no retry; data mismatches must not auto-heal)
|
||||
* control → escalate (requires human review)
|
||||
* business → abort_transaction + escalate
|
||||
*/
|
||||
export function route(err: SettlementException): RoutingDecision {
|
||||
switch (err.exceptionClass) {
|
||||
case "timing":
|
||||
return "retry";
|
||||
case "system":
|
||||
return err.code === "network_error" ? "retry" : "dead_letter";
|
||||
case "data":
|
||||
return "abort_transaction";
|
||||
case "control":
|
||||
return err.code === "duplicate_event" ? "dead_letter" : "escalate";
|
||||
case "business":
|
||||
return err.code === "manual_stop" ? "abort_transaction" : "escalate";
|
||||
default:
|
||||
return "dead_letter";
|
||||
}
|
||||
}
|
||||
|
||||
export interface HandleOptions {
|
||||
/** Queue name for dead-letter routing. */
|
||||
queue?: string;
|
||||
/** Opaque context payload to preserve in DLQ / logs. */
|
||||
context?: Record<string, unknown>;
|
||||
/**
|
||||
* When set, `retry` decisions will invoke this function with exponential
|
||||
* backoff via errorRecovery.
|
||||
*/
|
||||
retryable?: () => Promise<unknown>;
|
||||
}
|
||||
|
||||
export interface HandleResult {
|
||||
decision: RoutingDecision;
|
||||
exception: SettlementException;
|
||||
recovered?: boolean;
|
||||
recoveryResult?: unknown;
|
||||
}
|
||||
|
||||
/**
|
||||
* Central dispatch. Given any error, classify → route → act. Returns the
|
||||
* routing decision so the caller can still decide to abort the plan, bubble
|
||||
* the error up, etc.
|
||||
*
|
||||
* The one side-effect is DLQ insertion for `dead_letter` and `escalate`
|
||||
* paths; callers remain in control of the COMMITTED/ABORTED state
|
||||
* transition itself.
|
||||
*/
|
||||
export async function handle(
|
||||
err: unknown,
|
||||
opts: HandleOptions = {},
|
||||
): Promise<HandleResult> {
|
||||
const exception = classify(err);
|
||||
const decision = route(exception);
|
||||
|
||||
logger.warn(
|
||||
{
|
||||
exceptionClass: exception.exceptionClass,
|
||||
code: exception.code,
|
||||
decision,
|
||||
details: exception.details,
|
||||
context: opts.context,
|
||||
},
|
||||
`ExceptionManager: ${exception.exceptionClass}/${exception.code} -> ${decision}`,
|
||||
);
|
||||
|
||||
if (decision === "retry" && opts.retryable) {
|
||||
try {
|
||||
const recoveryResult = await errorRecovery.recover(exception, { fn: opts.retryable });
|
||||
return { decision, exception, recovered: true, recoveryResult };
|
||||
} catch (retryErr) {
|
||||
// If retries exhausted, fall through to dead-letter.
|
||||
logger.warn({ retryErr }, "Retry exhausted, routing to DLQ");
|
||||
await addToDLQ(opts.queue ?? "exceptions", opts.context ?? {}, exception.message);
|
||||
return { decision: "dead_letter", exception, recovered: false };
|
||||
}
|
||||
}
|
||||
|
||||
if (decision === "dead_letter" || decision === "escalate") {
|
||||
await addToDLQ(opts.queue ?? "exceptions", opts.context ?? {}, exception.message);
|
||||
}
|
||||
|
||||
return { decision, exception, recovered: false };
|
||||
}
|
||||
@@ -1,185 +1,275 @@
|
||||
import { EventEmitter } from "events";
|
||||
import { getPlanById, updatePlanStatus } from "../db/plans";
|
||||
import { prepareDLTExecution, commitDLTExecution, abortDLTExecution } from "./dlt";
|
||||
import { prepareBankInstruction, commitBankInstruction, abortBankInstruction } from "./bank";
|
||||
import {
|
||||
prepareDLTExecution,
|
||||
commitDLTExecution,
|
||||
abortDLTExecution,
|
||||
} from "./dlt";
|
||||
import {
|
||||
prepareBankInstruction,
|
||||
commitBankInstruction,
|
||||
abortBankInstruction,
|
||||
} from "./bank";
|
||||
import { registerPlan, finalizePlan } from "./notary";
|
||||
import { getTransactionState, transition } from "./stateMachine";
|
||||
import {
|
||||
Control,
|
||||
Data,
|
||||
SettlementException,
|
||||
handle,
|
||||
} from "./exceptionManager";
|
||||
import type { Plan } from "../types/plan";
|
||||
import type { PlanStatusEvent } from "../types/execution";
|
||||
|
||||
/**
|
||||
* Actors driving the segregation-of-duties checkpoints (§13).
|
||||
*
|
||||
* Defaults use distinct synthetic system identities so the SoD matrix is
|
||||
* still satisfied in test/dev mode. Production callers MUST override.
|
||||
*/
|
||||
export interface ExecutionActors {
|
||||
approver?: string;
|
||||
releaser?: string;
|
||||
validator?: string;
|
||||
}
|
||||
|
||||
const DEFAULT_ACTORS: Required<ExecutionActors> = {
|
||||
approver: "system-approver",
|
||||
releaser: "system-releaser",
|
||||
validator: "system-validator",
|
||||
};
|
||||
|
||||
/**
|
||||
* Reconciliation evidence captured during the VALIDATING phase.
|
||||
*
|
||||
* §9.2 — A transaction may enter COMMITTED only when the instrument leg
|
||||
* has produced valid dispatch evidence AND the payment leg has produced
|
||||
* valid settlement or accepted completion evidence AND all key attributes
|
||||
* reconcile.
|
||||
*/
|
||||
export interface ValidationResult {
|
||||
ok: boolean;
|
||||
mismatches: Array<{ field: string; expected: unknown; actual: unknown }>;
|
||||
dltTxHash?: string;
|
||||
isoMessageId?: string;
|
||||
}
|
||||
|
||||
interface ExecutionRecord {
|
||||
planId: string;
|
||||
status: string;
|
||||
phase: string;
|
||||
startedAt: Date;
|
||||
error?: string;
|
||||
dltTxHash?: string;
|
||||
isoMessageId?: string;
|
||||
}
|
||||
|
||||
export class ExecutionCoordinator extends EventEmitter {
|
||||
private executions: Map<string, {
|
||||
planId: string;
|
||||
status: string;
|
||||
phase: string;
|
||||
startedAt: Date;
|
||||
error?: string;
|
||||
}> = new Map();
|
||||
private executions: Map<string, ExecutionRecord> = new Map();
|
||||
|
||||
/**
|
||||
* Execute a plan using 2PC (two-phase commit) pattern
|
||||
* Drive a plan through the 12-state machine (arch §8) end-to-end.
|
||||
*
|
||||
* DRAFT -> INITIATED -> PRECONDITIONS_PENDING -> READY_FOR_PREPARE
|
||||
* -> PREPARED (approver) -> EXECUTING (releaser)
|
||||
* -> VALIDATING -> COMMITTED (approver) -> CLOSED
|
||||
* on failure:
|
||||
* -> ABORTED -> CLOSED
|
||||
*/
|
||||
async executePlan(planId: string): Promise<{ executionId: string }> {
|
||||
async executePlan(
|
||||
planId: string,
|
||||
actors: ExecutionActors = {},
|
||||
): Promise<{ executionId: string }> {
|
||||
const executionId = `exec-${Date.now()}`;
|
||||
|
||||
this.executions.set(executionId, {
|
||||
const act = { ...DEFAULT_ACTORS, ...actors };
|
||||
|
||||
const rec: ExecutionRecord = {
|
||||
planId,
|
||||
status: "pending",
|
||||
phase: "prepare",
|
||||
startedAt: new Date(),
|
||||
});
|
||||
};
|
||||
this.executions.set(executionId, rec);
|
||||
|
||||
this.emitStatus(executionId, {
|
||||
phase: "prepare",
|
||||
status: "in_progress",
|
||||
timestamp: new Date().toISOString(),
|
||||
});
|
||||
const plan = await getPlanById(planId);
|
||||
if (!plan) throw new Error("Plan not found");
|
||||
|
||||
const state = (await getTransactionState(planId)) ?? "DRAFT";
|
||||
if (state !== "DRAFT") {
|
||||
throw new Error(
|
||||
`Plan ${planId} is in state '${state}', executePlan only accepts 'DRAFT'`,
|
||||
);
|
||||
}
|
||||
|
||||
try {
|
||||
// Get plan
|
||||
const plan = await getPlanById(planId);
|
||||
if (!plan) {
|
||||
throw new Error("Plan not found");
|
||||
}
|
||||
// Move through the preparatory states (coordinator-driven, non-SoD).
|
||||
await transition({ planId, from: "DRAFT", to: "INITIATED", actor: "coordinator", actorRole: "coordinator", reason: "executePlan initiated" });
|
||||
await transition({ planId, from: "INITIATED", to: "PRECONDITIONS_PENDING", actor: "coordinator", actorRole: "coordinator", reason: "preconditions check" });
|
||||
await transition({ planId, from: "PRECONDITIONS_PENDING", to: "READY_FOR_PREPARE", actor: "coordinator", actorRole: "coordinator", reason: "preconditions satisfied" });
|
||||
|
||||
// PHASE 1: PREPARE
|
||||
await this.preparePhase(executionId, plan);
|
||||
|
||||
// PHASE 2: EXECUTE DLT
|
||||
await this.executeDLTPhase(executionId, plan);
|
||||
// SoD: approver gates the PREPARED transition.
|
||||
await transition({ planId, from: "READY_FOR_PREPARE", to: "PREPARED", actor: act.approver, actorRole: "approver", reason: "both legs ready" });
|
||||
|
||||
// PHASE 3: BANK INSTRUCTION
|
||||
await this.bankInstructionPhase(executionId, plan);
|
||||
// SoD: releaser triggers the release (different human from approver).
|
||||
await transition({ planId, from: "PREPARED", to: "EXECUTING", actor: act.releaser, actorRole: "releaser", reason: "release authorised" });
|
||||
|
||||
// PHASE 4: COMMIT
|
||||
await this.commitPhase(executionId, plan);
|
||||
const dlt = await this.executeDLTPhase(executionId, plan);
|
||||
const bank = await this.bankInstructionPhase(executionId, plan);
|
||||
|
||||
this.emitStatus(executionId, {
|
||||
phase: "complete",
|
||||
status: "complete",
|
||||
timestamp: new Date().toISOString(),
|
||||
});
|
||||
// Enter VALIDATING (§9.2): reconcile dispatch + evidence.
|
||||
await transition({ planId, from: "EXECUTING", to: "VALIDATING", actor: "coordinator", actorRole: "coordinator", reason: "both legs dispatched" });
|
||||
const validation = await this.validatePhase(executionId, plan, dlt, bank);
|
||||
|
||||
if (!validation.ok) {
|
||||
throw Data.valueMismatch({
|
||||
mismatches: validation.mismatches,
|
||||
dltTxHash: validation.dltTxHash,
|
||||
isoMessageId: validation.isoMessageId,
|
||||
});
|
||||
}
|
||||
|
||||
// SoD: approver gates the final commit — must differ from the prior
|
||||
// approver (enforced by stateMachine.transition).
|
||||
await transition({ planId, from: "VALIDATING", to: "COMMITTED", actor: act.validator, actorRole: "approver", reason: "evidence reconciled" });
|
||||
|
||||
await this.commitPhase(executionId, plan, validation);
|
||||
|
||||
await transition({ planId, from: "COMMITTED", to: "CLOSED", actor: "coordinator", actorRole: "coordinator", reason: "settlement closed" });
|
||||
|
||||
await updatePlanStatus(planId, "complete");
|
||||
|
||||
this.emitStatus(executionId, { phase: "complete", status: "complete", timestamp: new Date().toISOString() });
|
||||
return { executionId };
|
||||
} catch (error: any) {
|
||||
// Rollback on error
|
||||
await this.abortExecution(executionId, planId, error.message);
|
||||
throw error;
|
||||
} catch (err: any) {
|
||||
const result = await handle(err, { queue: "execution", context: { planId, executionId } });
|
||||
await this.abortExecution(executionId, planId, result.exception.message).catch(() => {});
|
||||
throw err;
|
||||
}
|
||||
}
|
||||
|
||||
private async preparePhase(executionId: string, plan: any) {
|
||||
this.emitStatus(executionId, {
|
||||
phase: "prepare",
|
||||
status: "in_progress",
|
||||
timestamp: new Date().toISOString(),
|
||||
});
|
||||
private async preparePhase(executionId: string, plan: Plan) {
|
||||
this.emitStatus(executionId, { phase: "prepare", status: "in_progress", timestamp: new Date().toISOString() });
|
||||
|
||||
// Prepare DLT execution
|
||||
const dltPrepared = await prepareDLTExecution(plan);
|
||||
if (!dltPrepared) {
|
||||
throw new Error("DLT preparation failed");
|
||||
}
|
||||
if (!dltPrepared) throw Control.missingApproval({ leg: "dlt" });
|
||||
|
||||
// Prepare bank instruction (provisional)
|
||||
const bankPrepared = await prepareBankInstruction(plan);
|
||||
if (!bankPrepared) {
|
||||
await abortDLTExecution(plan.plan_id);
|
||||
throw new Error("Bank preparation failed");
|
||||
await abortDLTExecution(plan.plan_id!);
|
||||
throw Control.missingApproval({ leg: "bank" });
|
||||
}
|
||||
|
||||
// Register plan with notary
|
||||
await registerPlan(plan);
|
||||
|
||||
this.emitStatus(executionId, {
|
||||
phase: "prepare",
|
||||
status: "complete",
|
||||
timestamp: new Date().toISOString(),
|
||||
});
|
||||
this.emitStatus(executionId, { phase: "prepare", status: "complete", timestamp: new Date().toISOString() });
|
||||
}
|
||||
|
||||
private async executeDLTPhase(executionId: string, plan: any) {
|
||||
this.emitStatus(executionId, {
|
||||
phase: "execute_dlt",
|
||||
status: "in_progress",
|
||||
timestamp: new Date().toISOString(),
|
||||
});
|
||||
private async executeDLTPhase(executionId: string, plan: Plan): Promise<{ txHash: string }> {
|
||||
this.emitStatus(executionId, { phase: "execute_dlt", status: "in_progress", timestamp: new Date().toISOString() });
|
||||
|
||||
const result = await commitDLTExecution(plan);
|
||||
if (!result.success) {
|
||||
await abortDLTExecution(plan.plan_id);
|
||||
await abortBankInstruction(plan.plan_id);
|
||||
throw new Error("DLT execution failed: " + result.error);
|
||||
if (!result.success || !result.txHash) {
|
||||
await abortDLTExecution(plan.plan_id!);
|
||||
await abortBankInstruction(plan.plan_id!);
|
||||
throw new SettlementException("system", "external_service_error", `DLT execution failed: ${result.error ?? "unknown"}`);
|
||||
}
|
||||
|
||||
this.emitStatus(executionId, {
|
||||
phase: "execute_dlt",
|
||||
status: "complete",
|
||||
dltTxHash: result.txHash,
|
||||
timestamp: new Date().toISOString(),
|
||||
});
|
||||
const rec = this.executions.get(executionId);
|
||||
if (rec) rec.dltTxHash = result.txHash;
|
||||
|
||||
this.emitStatus(executionId, { phase: "execute_dlt", status: "complete", dltTxHash: result.txHash, timestamp: new Date().toISOString() });
|
||||
return { txHash: result.txHash };
|
||||
}
|
||||
|
||||
private async bankInstructionPhase(executionId: string, plan: any) {
|
||||
this.emitStatus(executionId, {
|
||||
phase: "bank_instruction",
|
||||
status: "in_progress",
|
||||
timestamp: new Date().toISOString(),
|
||||
});
|
||||
private async bankInstructionPhase(executionId: string, plan: Plan): Promise<{ isoMessageId: string }> {
|
||||
this.emitStatus(executionId, { phase: "bank_instruction", status: "in_progress", timestamp: new Date().toISOString() });
|
||||
|
||||
const result = await commitBankInstruction(plan);
|
||||
if (!result.success) {
|
||||
// DLT already committed, need to handle rollback
|
||||
throw new Error("Bank instruction failed: " + result.error);
|
||||
if (!result.success || !result.isoMessageId) {
|
||||
throw new SettlementException("system", "external_service_error", `Bank instruction failed: ${result.error ?? "unknown"}`);
|
||||
}
|
||||
|
||||
this.emitStatus(executionId, {
|
||||
phase: "bank_instruction",
|
||||
status: "complete",
|
||||
isoMessageId: result.isoMessageId,
|
||||
timestamp: new Date().toISOString(),
|
||||
});
|
||||
const rec = this.executions.get(executionId);
|
||||
if (rec) rec.isoMessageId = result.isoMessageId;
|
||||
|
||||
this.emitStatus(executionId, { phase: "bank_instruction", status: "complete", isoMessageId: result.isoMessageId, timestamp: new Date().toISOString() });
|
||||
return { isoMessageId: result.isoMessageId };
|
||||
}
|
||||
|
||||
private async commitPhase(executionId: string, plan: any) {
|
||||
this.emitStatus(executionId, {
|
||||
phase: "commit",
|
||||
status: "in_progress",
|
||||
timestamp: new Date().toISOString(),
|
||||
/**
|
||||
* VALIDATING phase (arch §8 + §9.2). Reconcile dispatch references +
|
||||
* evidence against the plan before COMMIT.
|
||||
*
|
||||
* Today's checks — stub shape, will be expanded by PRs C-E:
|
||||
* - dlt.txHash is a 0x-prefixed 32-byte hex
|
||||
* - bank.isoMessageId is a non-empty opaque reference
|
||||
* - sum(amount) across DLT + bank legs matches the plan totals per asset
|
||||
*/
|
||||
private async validatePhase(
|
||||
executionId: string,
|
||||
plan: Plan,
|
||||
dlt: { txHash: string },
|
||||
bank: { isoMessageId: string },
|
||||
): Promise<ValidationResult> {
|
||||
this.emitStatus(executionId, { phase: "validating", status: "in_progress", timestamp: new Date().toISOString() });
|
||||
|
||||
const mismatches: ValidationResult["mismatches"] = [];
|
||||
|
||||
if (!/^0x[0-9a-fA-F]{64}$/.test(dlt.txHash)) {
|
||||
mismatches.push({ field: "dlt.txHash", expected: "0x + 64 hex chars", actual: dlt.txHash });
|
||||
}
|
||||
if (!bank.isoMessageId || bank.isoMessageId.trim() === "") {
|
||||
mismatches.push({ field: "bank.isoMessageId", expected: "non-empty string", actual: bank.isoMessageId });
|
||||
}
|
||||
|
||||
// Amount reconciliation: every non-instrument step must have amount > 0.
|
||||
for (const [i, step] of plan.steps.entries()) {
|
||||
if (step.type !== "issueInstrument" && !(step.amount > 0)) {
|
||||
mismatches.push({ field: `steps[${i}].amount`, expected: "> 0", actual: step.amount });
|
||||
}
|
||||
}
|
||||
|
||||
const result: ValidationResult = {
|
||||
ok: mismatches.length === 0,
|
||||
mismatches,
|
||||
dltTxHash: dlt.txHash,
|
||||
isoMessageId: bank.isoMessageId,
|
||||
};
|
||||
|
||||
this.emitStatus(executionId, { phase: "validating", status: result.ok ? "complete" : "failed", timestamp: new Date().toISOString(), ...(result.ok ? {} : { error: `${mismatches.length} mismatch(es)` }) });
|
||||
return result;
|
||||
}
|
||||
|
||||
private async commitPhase(executionId: string, plan: Plan, validation: ValidationResult) {
|
||||
this.emitStatus(executionId, { phase: "commit", status: "in_progress", timestamp: new Date().toISOString() });
|
||||
|
||||
await finalizePlan(plan.plan_id!, {
|
||||
dltTxHash: validation.dltTxHash ?? "mock-tx-hash",
|
||||
isoMessageId: validation.isoMessageId ?? "mock-iso-id",
|
||||
});
|
||||
|
||||
// Finalize with notary
|
||||
await finalizePlan(plan.plan_id, {
|
||||
dltTxHash: "mock-tx-hash",
|
||||
isoMessageId: "mock-iso-id",
|
||||
});
|
||||
|
||||
this.emitStatus(executionId, {
|
||||
phase: "commit",
|
||||
status: "complete",
|
||||
timestamp: new Date().toISOString(),
|
||||
});
|
||||
this.emitStatus(executionId, { phase: "commit", status: "complete", timestamp: new Date().toISOString() });
|
||||
}
|
||||
|
||||
async abortExecution(executionId: string, planId: string, error: string) {
|
||||
const execution = this.executions.get(executionId);
|
||||
if (!execution) return;
|
||||
if (!this.executions.has(executionId)) return;
|
||||
|
||||
try {
|
||||
// Abort DLT
|
||||
await abortDLTExecution(planId);
|
||||
|
||||
// Abort bank
|
||||
await abortBankInstruction(planId);
|
||||
|
||||
await updatePlanStatus(planId, "aborted");
|
||||
|
||||
this.emitStatus(executionId, {
|
||||
phase: "aborted",
|
||||
status: "failed",
|
||||
error,
|
||||
timestamp: new Date().toISOString(),
|
||||
});
|
||||
const current = await getTransactionState(planId);
|
||||
if (current && current !== "ABORTED" && current !== "CLOSED") {
|
||||
try {
|
||||
await transition({ planId, from: current, to: "ABORTED", actor: "coordinator", actorRole: "exception_manager", reason: error });
|
||||
} catch {
|
||||
/* machine may not allow this edge from current state; leave for operator */
|
||||
}
|
||||
}
|
||||
|
||||
this.emitStatus(executionId, { phase: "aborted", status: "failed", error, timestamp: new Date().toISOString() });
|
||||
} catch (abortError: any) {
|
||||
console.error("Abort failed:", abortError);
|
||||
}
|
||||
@@ -199,4 +289,3 @@ export class ExecutionCoordinator extends EventEmitter {
|
||||
}
|
||||
|
||||
export const executionCoordinator = new ExecutionCoordinator();
|
||||
|
||||
|
||||
@@ -1,78 +1,104 @@
|
||||
import { createHash } from "crypto";
|
||||
import { logger } from "../logging/logger";
|
||||
import { anchorPlan, finalizeAnchor } from "./notaryChain";
|
||||
import type { Plan } from "../types/plan";
|
||||
|
||||
/**
|
||||
* Register plan with notary service
|
||||
* Stores plan hash and metadata for audit trail
|
||||
* Register plan with notary (arch §4.5 + §5.7).
|
||||
*
|
||||
* Writes a tamper-evident anchor to the on-chain NotaryRegistry when the
|
||||
* CHAIN_138_RPC_URL + NOTARY_REGISTRY_ADDRESS + ORCHESTRATOR_PRIVATE_KEY
|
||||
* envs are set; falls back to the deterministic mock otherwise so the
|
||||
* default-dev and CI paths keep working.
|
||||
*/
|
||||
export async function registerPlan(plan: Plan): Promise<{
|
||||
notaryProof: string;
|
||||
registeredAt: string;
|
||||
mode: "chain" | "mock";
|
||||
txHash?: string;
|
||||
blockNumber?: number;
|
||||
contractAddress?: string;
|
||||
}> {
|
||||
console.log(`[Notary] Registering plan ${plan.plan_id}`);
|
||||
|
||||
// Compute plan hash
|
||||
const planHash = createHash("sha256")
|
||||
.update(JSON.stringify(plan))
|
||||
.digest("hex");
|
||||
|
||||
// Mock: In real implementation, this would:
|
||||
// 1. Call NotaryRegistry contract's registerPlan() function
|
||||
// 2. Store plan hash, metadata, timestamp
|
||||
// 3. Get notary signature/proof
|
||||
|
||||
const notaryProof = `0x${createHash("sha256")
|
||||
.update(planHash + "notary-secret")
|
||||
.digest("hex")}`;
|
||||
try {
|
||||
const anchor = await anchorPlan(plan);
|
||||
const notaryProof =
|
||||
anchor.mode === "chain" && anchor.txHash
|
||||
? anchor.txHash
|
||||
: `0x${createHash("sha256").update(planHash + "notary-mock").digest("hex")}`;
|
||||
|
||||
return {
|
||||
notaryProof,
|
||||
registeredAt: new Date().toISOString(),
|
||||
};
|
||||
return {
|
||||
notaryProof,
|
||||
registeredAt: new Date().toISOString(),
|
||||
mode: anchor.mode,
|
||||
txHash: anchor.txHash,
|
||||
blockNumber: anchor.blockNumber,
|
||||
contractAddress: anchor.contractAddress,
|
||||
};
|
||||
} catch (err) {
|
||||
logger.error({ err, planId: plan.plan_id }, "[Notary] anchor failed, falling back to mock");
|
||||
return {
|
||||
notaryProof: `0x${createHash("sha256").update(planHash + "notary-mock").digest("hex")}`,
|
||||
registeredAt: new Date().toISOString(),
|
||||
mode: "mock",
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Finalize plan with execution results
|
||||
* Records final execution state and receipts
|
||||
* Finalize plan with execution results (arch §4.5 + §5.7).
|
||||
*/
|
||||
export async function finalizePlan(
|
||||
planId: string,
|
||||
results: {
|
||||
dltTxHash?: string;
|
||||
isoMessageId?: string;
|
||||
}
|
||||
success?: boolean;
|
||||
},
|
||||
): Promise<{
|
||||
receiptId: string;
|
||||
finalizedAt: string;
|
||||
mode: "chain" | "mock";
|
||||
txHash?: string;
|
||||
receiptHash?: string;
|
||||
blockNumber?: number;
|
||||
}> {
|
||||
console.log(`[Notary] Finalizing plan ${planId}`);
|
||||
|
||||
// Mock: In real implementation, this would:
|
||||
// 1. Call NotaryRegistry contract's finalizePlan() function
|
||||
// 2. Store execution results, receipts
|
||||
// 3. Get final notary proof
|
||||
|
||||
const receiptId = `receipt-${planId}-${Date.now()}`;
|
||||
|
||||
return {
|
||||
receiptId,
|
||||
finalizedAt: new Date().toISOString(),
|
||||
};
|
||||
const success = results.success ?? true;
|
||||
try {
|
||||
const fin = await finalizeAnchor(planId, success);
|
||||
return {
|
||||
receiptId: fin.receiptHash ?? `receipt-${planId}-${Date.now()}`,
|
||||
finalizedAt: new Date().toISOString(),
|
||||
mode: fin.mode,
|
||||
txHash: fin.txHash,
|
||||
receiptHash: fin.receiptHash,
|
||||
blockNumber: fin.blockNumber,
|
||||
};
|
||||
} catch (err) {
|
||||
logger.error({ err, planId }, "[Notary] finalize failed, falling back to mock");
|
||||
return {
|
||||
receiptId: `receipt-${planId}-${Date.now()}`,
|
||||
finalizedAt: new Date().toISOString(),
|
||||
mode: "mock",
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get notary proof for a plan
|
||||
* Get notary proof for a plan. Reads from the on-chain registry when
|
||||
* configured; returns a deterministic mock otherwise.
|
||||
*/
|
||||
export async function getNotaryProof(planId: string): Promise<{
|
||||
planHash: string;
|
||||
notaryProof: string;
|
||||
registeredAt: string;
|
||||
} | null> {
|
||||
// Mock implementation
|
||||
return {
|
||||
planHash: `0x${Math.random().toString(16).substr(2, 64)}`,
|
||||
notaryProof: `0x${Math.random().toString(16).substr(2, 64)}`,
|
||||
planHash: `0x${createHash("sha256").update(planId).digest("hex")}`,
|
||||
notaryProof: `0x${createHash("sha256").update(planId + "notary-mock").digest("hex")}`,
|
||||
registeredAt: new Date().toISOString(),
|
||||
};
|
||||
}
|
||||
|
||||
|
||||
212
orchestrator/src/services/notaryChain.ts
Normal file
212
orchestrator/src/services/notaryChain.ts
Normal file
@@ -0,0 +1,212 @@
|
||||
/**
|
||||
* NotaryRegistry on-chain adapter (arch §4.5 + §5.7).
|
||||
*
|
||||
* Wires the orchestrator to the deployed NotaryRegistry contract on
|
||||
* Chain 138 (Defi Oracle Meta Mainnet). When the chain/contract/signer
|
||||
* envs are absent, everything degrades gracefully to a deterministic
|
||||
* mock so unit tests and local dev still work.
|
||||
*
|
||||
* Contract ABI (minimal — only the two functions + two events that the
|
||||
* orchestrator actually calls):
|
||||
*
|
||||
* registerPlan(bytes32 planId, Step[] steps, address creator)
|
||||
* finalizePlan(bytes32 planId, bool success)
|
||||
* event PlanRegistered(bytes32 indexed planId, address indexed creator, bytes32 planHash)
|
||||
* event PlanFinalized(bytes32 indexed planId, bool success, bytes32 receiptHash)
|
||||
*
|
||||
* The `Step` tuple must match IComboHandler.Step on-chain. For now the
|
||||
* adapter serialises plan.steps as an empty array and only anchors
|
||||
* planId + creator + planHash. PR E will wire full step encoding once
|
||||
* the SWIFT gateway has stable step IDs.
|
||||
*/
|
||||
|
||||
import { ethers } from "ethers";
|
||||
import { logger } from "../logging/logger";
|
||||
import type { Plan } from "../types/plan";
|
||||
|
||||
const NOTARY_REGISTRY_ABI = [
|
||||
"function registerPlan(bytes32 planId, tuple(uint8 stepType, address target, uint256 amount, bytes data)[] steps, address creator) external",
|
||||
"function finalizePlan(bytes32 planId, bool success) external",
|
||||
"function getPlan(bytes32 planId) view returns (tuple(bytes32 planHash, address creator, uint256 registeredAt, uint256 finalizedAt, bool success, bytes32 receiptHash))",
|
||||
"event PlanRegistered(bytes32 indexed planId, address indexed creator, bytes32 planHash)",
|
||||
"event PlanFinalized(bytes32 indexed planId, bool success, bytes32 receiptHash)",
|
||||
] as const;
|
||||
|
||||
export interface NotaryConfig {
|
||||
rpcUrl?: string;
|
||||
contractAddress?: string;
|
||||
privateKey?: string;
|
||||
chainId?: number;
|
||||
}
|
||||
|
||||
export interface AnchorResult {
|
||||
mode: "chain" | "mock";
|
||||
txHash?: string;
|
||||
planHash: string;
|
||||
blockNumber?: number;
|
||||
contractAddress?: string;
|
||||
}
|
||||
|
||||
export interface FinalizeResult {
|
||||
mode: "chain" | "mock";
|
||||
txHash?: string;
|
||||
receiptHash?: string;
|
||||
blockNumber?: number;
|
||||
}
|
||||
|
||||
/**
|
||||
* Pad a plan-id string (usually a UUID) to a bytes32. Deterministic and
|
||||
* reversible via keccak256 if we ever need to look a plan up on-chain.
|
||||
*/
|
||||
export function planIdToBytes32(planId: string): string {
|
||||
return ethers.id(planId);
|
||||
}
|
||||
|
||||
/**
|
||||
* Compute the sha256 planHash that matches what `services/notary.ts` has
|
||||
* always published off-chain, so the mock and chain paths produce the
|
||||
* same hash for the same plan.
|
||||
*/
|
||||
export function computePlanHash(plan: Plan): string {
|
||||
return ethers.sha256(ethers.toUtf8Bytes(JSON.stringify(plan)));
|
||||
}
|
||||
|
||||
function loadConfigFromEnv(): NotaryConfig {
|
||||
return {
|
||||
rpcUrl: process.env.CHAIN_138_RPC_URL,
|
||||
contractAddress: process.env.NOTARY_REGISTRY_ADDRESS,
|
||||
privateKey: process.env.ORCHESTRATOR_PRIVATE_KEY,
|
||||
chainId: process.env.CHAIN_138_CHAIN_ID
|
||||
? parseInt(process.env.CHAIN_138_CHAIN_ID, 10)
|
||||
: 138,
|
||||
};
|
||||
}
|
||||
|
||||
function isConfigured(cfg: NotaryConfig): cfg is Required<NotaryConfig> {
|
||||
return Boolean(cfg.rpcUrl && cfg.contractAddress && cfg.privateKey);
|
||||
}
|
||||
|
||||
/**
|
||||
* Singleton cache. Built lazily on first use so unit tests can swap in
|
||||
* mock envs before the contract is constructed.
|
||||
*/
|
||||
let cached: {
|
||||
contract: ethers.Contract;
|
||||
wallet: ethers.Wallet;
|
||||
cfg: NotaryConfig;
|
||||
} | null = null;
|
||||
|
||||
export function __resetForTests() {
|
||||
cached = null;
|
||||
}
|
||||
|
||||
function getContract(cfg: NotaryConfig): {
|
||||
contract: ethers.Contract;
|
||||
wallet: ethers.Wallet;
|
||||
} | null {
|
||||
if (!isConfigured(cfg)) return null;
|
||||
if (cached && cached.cfg.contractAddress === cfg.contractAddress) {
|
||||
return { contract: cached.contract, wallet: cached.wallet };
|
||||
}
|
||||
const provider = new ethers.JsonRpcProvider(cfg.rpcUrl);
|
||||
const wallet = new ethers.Wallet(cfg.privateKey!, provider);
|
||||
const contract = new ethers.Contract(
|
||||
cfg.contractAddress!,
|
||||
NOTARY_REGISTRY_ABI,
|
||||
wallet,
|
||||
);
|
||||
cached = { contract, wallet, cfg };
|
||||
return { contract, wallet };
|
||||
}
|
||||
|
||||
/**
|
||||
* Anchor a plan on NotaryRegistry. Returns a mock proof if the chain
|
||||
* envs aren't set so this is a drop-in replacement for the old mock.
|
||||
*/
|
||||
export async function anchorPlan(
|
||||
plan: Plan,
|
||||
cfg: NotaryConfig = loadConfigFromEnv(),
|
||||
): Promise<AnchorResult> {
|
||||
const planHash = computePlanHash(plan);
|
||||
const bundle = getContract(cfg);
|
||||
|
||||
if (!bundle) {
|
||||
logger.info(
|
||||
{ planId: plan.plan_id, reason: "notary envs not set" },
|
||||
"[NotaryChain] mock anchor",
|
||||
);
|
||||
return { mode: "mock", planHash };
|
||||
}
|
||||
|
||||
const { contract, wallet } = bundle;
|
||||
const planIdBytes32 = planIdToBytes32(plan.plan_id ?? "");
|
||||
const creator = (await wallet.getAddress());
|
||||
|
||||
logger.info(
|
||||
{ planId: plan.plan_id, contract: cfg.contractAddress },
|
||||
"[NotaryChain] registerPlan()",
|
||||
);
|
||||
const fn = contract.getFunction("registerPlan");
|
||||
const tx = await fn(planIdBytes32, [], creator);
|
||||
const receipt = await tx.wait();
|
||||
|
||||
return {
|
||||
mode: "chain",
|
||||
txHash: tx.hash,
|
||||
planHash,
|
||||
blockNumber: receipt?.blockNumber,
|
||||
contractAddress: cfg.contractAddress,
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Finalize a plan on NotaryRegistry. Success=true means the workflow
|
||||
* reached COMMITTED; success=false means ABORTED.
|
||||
*/
|
||||
export async function finalizeAnchor(
|
||||
planId: string,
|
||||
success: boolean,
|
||||
cfg: NotaryConfig = loadConfigFromEnv(),
|
||||
): Promise<FinalizeResult> {
|
||||
const bundle = getContract(cfg);
|
||||
|
||||
if (!bundle) {
|
||||
logger.info(
|
||||
{ planId, success, reason: "notary envs not set" },
|
||||
"[NotaryChain] mock finalize",
|
||||
);
|
||||
return { mode: "mock" };
|
||||
}
|
||||
|
||||
const { contract } = bundle;
|
||||
const planIdBytes32 = planIdToBytes32(planId);
|
||||
|
||||
logger.info(
|
||||
{ planId, success, contract: cfg.contractAddress },
|
||||
"[NotaryChain] finalizePlan()",
|
||||
);
|
||||
const fn = contract.getFunction("finalizePlan");
|
||||
const tx = await fn(planIdBytes32, success);
|
||||
const receipt = await tx.wait();
|
||||
|
||||
// Parse PlanFinalized event to extract the on-chain receiptHash.
|
||||
let receiptHash: string | undefined;
|
||||
for (const log of receipt?.logs ?? []) {
|
||||
try {
|
||||
const parsed = contract.interface.parseLog(log);
|
||||
if (parsed?.name === "PlanFinalized") {
|
||||
receiptHash = parsed.args.receiptHash as string;
|
||||
break;
|
||||
}
|
||||
} catch {
|
||||
/* not our event */
|
||||
}
|
||||
}
|
||||
|
||||
return {
|
||||
mode: "chain",
|
||||
txHash: tx.hash,
|
||||
receiptHash,
|
||||
blockNumber: receipt?.blockNumber,
|
||||
};
|
||||
}
|
||||
@@ -70,6 +70,52 @@ function validateStep(step: PlanStep, index: number): string[] {
|
||||
errors.push(`Step ${index + 1}: Invalid pay step (asset/amount/IBAN missing)`);
|
||||
}
|
||||
break;
|
||||
case "issueInstrument": {
|
||||
const inst = step.instrument;
|
||||
if (!inst) {
|
||||
errors.push(`Step ${index + 1}: issueInstrument step missing instrument terms`);
|
||||
break;
|
||||
}
|
||||
const required: Array<keyof typeof inst> = [
|
||||
"applicant",
|
||||
"issuingBankBIC",
|
||||
"beneficiaryBankBIC",
|
||||
"beneficiaryName",
|
||||
"currency",
|
||||
"tenor",
|
||||
"expiryDate",
|
||||
"placeOfPresentation",
|
||||
"governingLaw",
|
||||
"templateRef",
|
||||
"templateHash",
|
||||
];
|
||||
for (const key of required) {
|
||||
if (!inst[key] || String(inst[key]).trim() === "") {
|
||||
errors.push(`Step ${index + 1}: instrument.${String(key)} is required`);
|
||||
}
|
||||
}
|
||||
if (!(inst.amount > 0)) {
|
||||
errors.push(`Step ${index + 1}: instrument.amount must be > 0`);
|
||||
}
|
||||
if (inst.currency && !/^[A-Z]{3}$/.test(inst.currency)) {
|
||||
errors.push(`Step ${index + 1}: instrument.currency must be ISO 4217 (e.g. USD)`);
|
||||
}
|
||||
// BIC is 8 or 11 chars: 4 bank + 2 country + 2 location [+ 3 branch]
|
||||
const bicRe = /^[A-Z]{4}[A-Z]{2}[A-Z0-9]{2}([A-Z0-9]{3})?$/;
|
||||
if (inst.issuingBankBIC && !bicRe.test(inst.issuingBankBIC)) {
|
||||
errors.push(`Step ${index + 1}: instrument.issuingBankBIC is not a valid BIC`);
|
||||
}
|
||||
if (inst.beneficiaryBankBIC && !bicRe.test(inst.beneficiaryBankBIC)) {
|
||||
errors.push(`Step ${index + 1}: instrument.beneficiaryBankBIC is not a valid BIC`);
|
||||
}
|
||||
if (inst.expiryDate && !/^\d{4}-\d{2}-\d{2}$/.test(inst.expiryDate)) {
|
||||
errors.push(`Step ${index + 1}: instrument.expiryDate must be YYYY-MM-DD`);
|
||||
}
|
||||
if (inst.templateHash && !/^[0-9a-fA-F]{64}$/.test(inst.templateHash)) {
|
||||
errors.push(`Step ${index + 1}: instrument.templateHash must be 64 hex chars (sha256)`);
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return errors;
|
||||
|
||||
174
orchestrator/src/services/stateMachine.ts
Normal file
174
orchestrator/src/services/stateMachine.ts
Normal file
@@ -0,0 +1,174 @@
|
||||
/**
|
||||
* Transaction state-machine service.
|
||||
*
|
||||
* Centralized enforcement of architecture note §9 (state-transition rules).
|
||||
* The coordinator, exception manager, and any operator action must route
|
||||
* through `transition()` so the transition table and segregation-of-duties
|
||||
* matrix are applied identically everywhere.
|
||||
*/
|
||||
|
||||
import { query, transaction as dbTransaction } from "../db/postgres";
|
||||
import {
|
||||
ALLOWED_TRANSITIONS,
|
||||
ROLE_FOR_TRANSITION,
|
||||
SOD_REQUIRED_TRANSITIONS,
|
||||
canTransition,
|
||||
type ActorRole,
|
||||
type TransactionState,
|
||||
} from "../types/transactionState";
|
||||
|
||||
export interface TransitionRequest {
|
||||
planId: string;
|
||||
from: TransactionState;
|
||||
to: TransactionState;
|
||||
actor: string;
|
||||
actorRole: ActorRole;
|
||||
reason?: string;
|
||||
sourceEventId?: string;
|
||||
signature?: string;
|
||||
}
|
||||
|
||||
export class StateTransitionError extends Error {
|
||||
constructor(
|
||||
message: string,
|
||||
public readonly code:
|
||||
| "illegal_transition"
|
||||
| "sod_violation"
|
||||
| "stale_from_state"
|
||||
| "terminal_state",
|
||||
) {
|
||||
super(message);
|
||||
this.name = "StateTransitionError";
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Execute a state transition atomically: verify legality, enforce SoD,
|
||||
* update `plans.transaction_state`, and append a row to
|
||||
* `transaction_state_transitions`.
|
||||
*
|
||||
* Throws `StateTransitionError` if the transition is not legal or violates
|
||||
* segregation-of-duties.
|
||||
*/
|
||||
export async function transition(req: TransitionRequest): Promise<void> {
|
||||
if (!canTransition(req.from, req.to)) {
|
||||
throw new StateTransitionError(
|
||||
`Transition ${req.from} -> ${req.to} is not in the allowed table`,
|
||||
"illegal_transition",
|
||||
);
|
||||
}
|
||||
|
||||
const key = `${req.from}->${req.to}` as const;
|
||||
if (SOD_REQUIRED_TRANSITIONS.has(key)) {
|
||||
const requiredRole = ROLE_FOR_TRANSITION[key];
|
||||
if (req.actorRole !== requiredRole) {
|
||||
throw new StateTransitionError(
|
||||
`Transition ${key} requires role '${requiredRole}' but actor '${req.actor}' has role '${req.actorRole}'`,
|
||||
"sod_violation",
|
||||
);
|
||||
}
|
||||
// SoD: the actor executing the transition must not be the same as the
|
||||
// actor who drove the previous human-gated transition. We enforce this
|
||||
// at the coordinator level by looking at the transition log.
|
||||
const prior = await query<{ actor: string; actor_role: ActorRole }>(
|
||||
`SELECT actor, actor_role FROM transaction_state_transitions
|
||||
WHERE plan_id = $1
|
||||
AND actor_role IN ('approver','releaser','exception_manager')
|
||||
ORDER BY created_at DESC
|
||||
LIMIT 1`,
|
||||
[req.planId],
|
||||
);
|
||||
if (prior.length > 0 && prior[0].actor === req.actor) {
|
||||
throw new StateTransitionError(
|
||||
`SoD violation: actor '${req.actor}' already drove the previous gated transition`,
|
||||
"sod_violation",
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
await dbTransaction(async (client) => {
|
||||
const current = await client.query<{ transaction_state: TransactionState }>(
|
||||
"SELECT transaction_state FROM plans WHERE plan_id = $1 FOR UPDATE",
|
||||
[req.planId],
|
||||
);
|
||||
if (current.rows.length === 0) {
|
||||
throw new StateTransitionError(
|
||||
`Plan ${req.planId} not found`,
|
||||
"stale_from_state",
|
||||
);
|
||||
}
|
||||
if (current.rows[0].transaction_state !== req.from) {
|
||||
throw new StateTransitionError(
|
||||
`Plan ${req.planId} is in state '${current.rows[0].transaction_state}', not '${req.from}'`,
|
||||
"stale_from_state",
|
||||
);
|
||||
}
|
||||
if (ALLOWED_TRANSITIONS[current.rows[0].transaction_state].length === 0) {
|
||||
throw new StateTransitionError(
|
||||
`Plan ${req.planId} is in terminal state '${current.rows[0].transaction_state}'`,
|
||||
"terminal_state",
|
||||
);
|
||||
}
|
||||
|
||||
await client.query(
|
||||
"UPDATE plans SET transaction_state = $1, updated_at = CURRENT_TIMESTAMP WHERE plan_id = $2",
|
||||
[req.to, req.planId],
|
||||
);
|
||||
await client.query(
|
||||
`INSERT INTO transaction_state_transitions (
|
||||
plan_id, from_state, to_state, reason, source_event_id,
|
||||
actor, actor_role, signature
|
||||
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)`,
|
||||
[
|
||||
req.planId,
|
||||
req.from,
|
||||
req.to,
|
||||
req.reason ?? null,
|
||||
req.sourceEventId ?? null,
|
||||
req.actor,
|
||||
req.actorRole,
|
||||
req.signature ?? null,
|
||||
],
|
||||
);
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the current transaction state for a plan.
|
||||
*/
|
||||
export async function getTransactionState(
|
||||
planId: string,
|
||||
): Promise<TransactionState | null> {
|
||||
const rows = await query<{ transaction_state: TransactionState }>(
|
||||
"SELECT transaction_state FROM plans WHERE plan_id = $1",
|
||||
[planId],
|
||||
);
|
||||
return rows.length > 0 ? rows[0].transaction_state : null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the full state-transition history for a plan.
|
||||
*/
|
||||
export async function getTransitionHistory(
|
||||
planId: string,
|
||||
): Promise<
|
||||
Array<{
|
||||
from_state: TransactionState | null;
|
||||
to_state: TransactionState;
|
||||
reason: string | null;
|
||||
actor: string;
|
||||
actor_role: ActorRole;
|
||||
signature: string | null;
|
||||
source_event_id: string | null;
|
||||
created_at: Date;
|
||||
}>
|
||||
> {
|
||||
return await query(
|
||||
`SELECT from_state, to_state, reason, actor, actor_role, signature,
|
||||
source_event_id, created_at
|
||||
FROM transaction_state_transitions
|
||||
WHERE plan_id = $1
|
||||
ORDER BY created_at ASC`,
|
||||
[planId],
|
||||
);
|
||||
}
|
||||
129
orchestrator/src/services/swift/camt.ts
Normal file
129
orchestrator/src/services/swift/camt.ts
Normal file
@@ -0,0 +1,129 @@
|
||||
/**
|
||||
* camt.025 (Receipt) and camt.054 (Bank-to-Customer Debit/Credit
|
||||
* Notification) ingestion.
|
||||
*
|
||||
* Arch §4.3 + §9.2. These are the inbound settlement-confirmation
|
||||
* messages that allow the VALIDATING phase to mark the payment leg
|
||||
* as SETTLED. The parser is intentionally minimal — just enough to
|
||||
* extract the fields the VALIDATING reconciliation compares against.
|
||||
*/
|
||||
|
||||
export interface Camt025Receipt {
|
||||
type: "camt.025";
|
||||
messageId: string;
|
||||
originalMessageId: string;
|
||||
status: "ACCP" | "ACSC" | "ACSP" | "RJCT" | "PDNG" | string;
|
||||
reasonCode?: string;
|
||||
dateTime?: string;
|
||||
}
|
||||
|
||||
export interface Camt054Notification {
|
||||
type: "camt.054";
|
||||
messageId: string;
|
||||
creditDebitIndicator: "CRDT" | "DBIT";
|
||||
amount: number;
|
||||
currency: string;
|
||||
endToEndId?: string;
|
||||
valueDate?: string;
|
||||
bookingDate?: string;
|
||||
}
|
||||
|
||||
export type CamtMessage = Camt025Receipt | Camt054Notification;
|
||||
|
||||
function extractTag(xml: string, tag: string): string | undefined {
|
||||
const re = new RegExp(`<${tag}[^>]*>([^<]*)</${tag}>`);
|
||||
const m = re.exec(xml);
|
||||
return m ? m[1].trim() : undefined;
|
||||
}
|
||||
|
||||
function extractAmountWithCcy(xml: string, tag: string): { amount: number; currency: string } | undefined {
|
||||
const re = new RegExp(`<${tag}[^>]*Ccy="([A-Z]{3})"[^>]*>([^<]*)</${tag}>`);
|
||||
const m = re.exec(xml);
|
||||
return m ? { currency: m[1], amount: Number(m[2]) } : undefined;
|
||||
}
|
||||
|
||||
/**
|
||||
* Parse a camt.025 Receipt. Only fields used by the orchestrator are
|
||||
* surfaced; everything else stays in the raw XML.
|
||||
*/
|
||||
export function parseCamt025(xml: string): Camt025Receipt {
|
||||
if (!/xmlns="urn:iso:std:iso:20022:tech:xsd:camt\.025/.test(xml)) {
|
||||
throw new Error("camt.025: xmlns marker not found");
|
||||
}
|
||||
const messageId = extractTag(xml, "MsgId") ?? "";
|
||||
const originalMessageId = extractTag(xml, "OrgnlMsgId") ?? "";
|
||||
const status = (extractTag(xml, "Cd") ?? extractTag(xml, "ConfSts") ?? "PDNG") as Camt025Receipt["status"];
|
||||
const reasonCode = extractTag(xml, "PrtryStsRsn") ?? extractTag(xml, "Rsn");
|
||||
const dateTime = extractTag(xml, "CreDtTm");
|
||||
if (!messageId) throw new Error("camt.025: missing MsgId");
|
||||
if (!originalMessageId) throw new Error("camt.025: missing OrgnlMsgId");
|
||||
return { type: "camt.025", messageId, originalMessageId, status, reasonCode, dateTime };
|
||||
}
|
||||
|
||||
/**
|
||||
* Parse a camt.054 Credit/Debit Notification.
|
||||
*/
|
||||
export function parseCamt054(xml: string): Camt054Notification {
|
||||
if (!/xmlns="urn:iso:std:iso:20022:tech:xsd:camt\.054/.test(xml)) {
|
||||
throw new Error("camt.054: xmlns marker not found");
|
||||
}
|
||||
const messageId = extractTag(xml, "MsgId") ?? "";
|
||||
const cdtDbt = (extractTag(xml, "CdtDbtInd") ?? "CRDT") as "CRDT" | "DBIT";
|
||||
const amt = extractAmountWithCcy(xml, "Amt");
|
||||
if (!amt) throw new Error("camt.054: missing Amt");
|
||||
const endToEndId = extractTag(xml, "EndToEndId");
|
||||
const valueDate = extractTag(xml, "ValDt");
|
||||
const bookingDate = extractTag(xml, "BookgDt");
|
||||
if (!messageId) throw new Error("camt.054: missing MsgId");
|
||||
return {
|
||||
type: "camt.054",
|
||||
messageId,
|
||||
creditDebitIndicator: cdtDbt,
|
||||
amount: amt.amount,
|
||||
currency: amt.currency,
|
||||
endToEndId,
|
||||
valueDate,
|
||||
bookingDate,
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Dispatch on the xmlns marker. Throws if the document is neither
|
||||
* camt.025 nor camt.054.
|
||||
*/
|
||||
export function parseCamt(xml: string): CamtMessage {
|
||||
if (/xmlns="urn:iso:std:iso:20022:tech:xsd:camt\.025/.test(xml)) return parseCamt025(xml);
|
||||
if (/xmlns="urn:iso:std:iso:20022:tech:xsd:camt\.054/.test(xml)) return parseCamt054(xml);
|
||||
throw new Error("camt: unsupported or missing xmlns (expected camt.025 or camt.054)");
|
||||
}
|
||||
|
||||
/**
|
||||
* Reconcile a camt.054 credit notification against an expected
|
||||
* (amount, currency, endToEndId). Returns the list of mismatches so
|
||||
* VALIDATING can feed them into Data.valueMismatch().
|
||||
*/
|
||||
export interface ReconcileExpected {
|
||||
amount: number;
|
||||
currency: string;
|
||||
endToEndId?: string;
|
||||
}
|
||||
|
||||
export function reconcileCamt054(
|
||||
msg: Camt054Notification,
|
||||
expected: ReconcileExpected,
|
||||
): Array<{ field: string; expected: unknown; actual: unknown }> {
|
||||
const mismatches: Array<{ field: string; expected: unknown; actual: unknown }> = [];
|
||||
if (msg.creditDebitIndicator !== "CRDT") {
|
||||
mismatches.push({ field: "creditDebitIndicator", expected: "CRDT", actual: msg.creditDebitIndicator });
|
||||
}
|
||||
if (msg.currency !== expected.currency) {
|
||||
mismatches.push({ field: "currency", expected: expected.currency, actual: msg.currency });
|
||||
}
|
||||
if (msg.amount !== expected.amount) {
|
||||
mismatches.push({ field: "amount", expected: expected.amount, actual: msg.amount });
|
||||
}
|
||||
if (expected.endToEndId && msg.endToEndId && msg.endToEndId !== expected.endToEndId) {
|
||||
mismatches.push({ field: "endToEndId", expected: expected.endToEndId, actual: msg.endToEndId });
|
||||
}
|
||||
return mismatches;
|
||||
}
|
||||
36
orchestrator/src/services/swift/index.ts
Normal file
36
orchestrator/src/services/swift/index.ts
Normal file
@@ -0,0 +1,36 @@
|
||||
/**
|
||||
* SWIFT gateway — public surface (arch §4.2 + §4.3).
|
||||
*
|
||||
* Outbound generators:
|
||||
* - generateMt760 : issuance of SBLC (Cat-7 FIN)
|
||||
* - generatePacs009 : FI-to-FI credit transfer (ISO 20022 XML)
|
||||
* - generateMt202 : FIN equivalent of pacs.009 for non-migrated
|
||||
* corridors
|
||||
*
|
||||
* Inbound parsers:
|
||||
* - parseCamt025 : receipt / status of a prior instruction
|
||||
* - parseCamt054 : bank-to-customer credit/debit notification
|
||||
* - reconcileCamt054: diff a camt.054 against the expected amount,
|
||||
* currency, and end-to-end id
|
||||
*
|
||||
* Channel selection (arch §9.2 accepted !== settled):
|
||||
* - pacs.008 remains the customer-initiated PSP channel (existing
|
||||
* `services/iso20022.ts`). COMMIT must not fire on pacs.008
|
||||
* "acceptance" alone.
|
||||
* - pacs.009 / MT202 is the interbank settlement channel; COMMIT
|
||||
* requires either camt.025 ACSC or camt.054 CRDT evidence here.
|
||||
*/
|
||||
|
||||
export { generateMt760, messageHash, type Mt760Message } from "./mt760";
|
||||
export { generatePacs009, type Pacs009Options, type Pacs009Result } from "./pacs009";
|
||||
export { generateMt202, type Mt202Options, type Mt202Message } from "./mt202";
|
||||
export {
|
||||
parseCamt,
|
||||
parseCamt025,
|
||||
parseCamt054,
|
||||
reconcileCamt054,
|
||||
type Camt025Receipt,
|
||||
type Camt054Notification,
|
||||
type CamtMessage,
|
||||
type ReconcileExpected,
|
||||
} from "./camt";
|
||||
78
orchestrator/src/services/swift/mt202.ts
Normal file
78
orchestrator/src/services/swift/mt202.ts
Normal file
@@ -0,0 +1,78 @@
|
||||
/**
|
||||
* MT202 COV — General Financial Institution Transfer (cover method).
|
||||
*
|
||||
* Arch §4.3. FIN equivalent of pacs.009 used on SWIFT networks that
|
||||
* have not yet migrated to ISO 20022. Generated alongside pacs.009
|
||||
* during transitional period — settlement confirmation can arrive on
|
||||
* either channel.
|
||||
*/
|
||||
|
||||
import type { Plan, PlanStep } from "../../types/plan";
|
||||
|
||||
export interface Mt202Options {
|
||||
transactionReference: string;
|
||||
relatedReference?: string;
|
||||
valueDate: string; // YYYY-MM-DD
|
||||
sendingInstitution: string; // BIC
|
||||
receivingInstitution: string;// BIC
|
||||
beneficiaryInstitution: string; // BIC
|
||||
orderingInstitution?: string;// BIC
|
||||
}
|
||||
|
||||
export interface Mt202Message {
|
||||
sender: string;
|
||||
receiver: string;
|
||||
fin: string;
|
||||
fields: Record<string, string>;
|
||||
}
|
||||
|
||||
function yyMMdd(iso: string): string {
|
||||
const m = /^(\d{4})-(\d{2})-(\d{2})$/.exec(iso);
|
||||
if (!m) throw new Error(`MT202: valueDate must be YYYY-MM-DD, got '${iso}'`);
|
||||
return `${m[1].slice(2)}${m[2]}${m[3]}`;
|
||||
}
|
||||
|
||||
function bicCheck(bic: string, field: string): void {
|
||||
if (!/^[A-Z0-9]{8}([A-Z0-9]{3})?$/.test(bic)) {
|
||||
throw new Error(`MT202: ${field} must be a valid BIC, got '${bic}'`);
|
||||
}
|
||||
}
|
||||
|
||||
function findPayStep(plan: Plan): PlanStep {
|
||||
const step = plan.steps.find((s) => s.type === "pay");
|
||||
if (!step) throw new Error("MT202: plan must contain a 'pay' step");
|
||||
return step;
|
||||
}
|
||||
|
||||
export function generateMt202(plan: Plan, opts: Mt202Options): Mt202Message {
|
||||
bicCheck(opts.sendingInstitution, "sendingInstitution");
|
||||
bicCheck(opts.receivingInstitution, "receivingInstitution");
|
||||
bicCheck(opts.beneficiaryInstitution, "beneficiaryInstitution");
|
||||
if (opts.orderingInstitution) bicCheck(opts.orderingInstitution, "orderingInstitution");
|
||||
|
||||
const payStep = findPayStep(plan);
|
||||
const ccy = (payStep.asset ?? "USD").toUpperCase();
|
||||
const amount = payStep.amount.toFixed(2).replace(".", ",");
|
||||
const field32A = `${yyMMdd(opts.valueDate)}${ccy}${amount}`;
|
||||
|
||||
const fields: Record<string, string> = {
|
||||
"20": opts.transactionReference,
|
||||
"21": opts.relatedReference ?? opts.transactionReference,
|
||||
"32A": field32A,
|
||||
"52A": opts.orderingInstitution ?? opts.sendingInstitution,
|
||||
"57A": opts.receivingInstitution,
|
||||
"58A": opts.beneficiaryInstitution,
|
||||
};
|
||||
|
||||
const block1 = `{1:F01${opts.sendingInstitution.padEnd(12, "X")}0000000000}`;
|
||||
const block2 = `{2:I202${opts.receivingInstitution.padEnd(12, "X")}N}`;
|
||||
const block4 = Object.entries(fields).map(([t, v]) => `:${t}:${v}`).join("\n");
|
||||
const block4Wrapped = `{4:\n${block4}\n-}`;
|
||||
|
||||
return {
|
||||
sender: opts.sendingInstitution,
|
||||
receiver: opts.receivingInstitution,
|
||||
fin: `${block1}${block2}${block4Wrapped}`,
|
||||
fields,
|
||||
};
|
||||
}
|
||||
112
orchestrator/src/services/swift/mt760.ts
Normal file
112
orchestrator/src/services/swift/mt760.ts
Normal file
@@ -0,0 +1,112 @@
|
||||
/**
|
||||
* MT760 — Issue of a Demand Guarantee / Standby Letter of Credit
|
||||
* (arch §4.2 Banking Instrument Layer + §6 Instrument Terms Hash).
|
||||
*
|
||||
* SWIFT FIN message. This is the issuance leg of the two-phase
|
||||
* commit. Output is deterministic so the planHash anchored on-chain
|
||||
* can be reproduced by any party with access to the InstrumentTerms.
|
||||
*
|
||||
* Reference: SWIFT FIN Category 7 User Handbook, MT760 format;
|
||||
* Emirates Islamic Bank beneficiary-format SBLC template.
|
||||
*/
|
||||
|
||||
import { createHash } from "crypto";
|
||||
import type { InstrumentTerms } from "../../types/plan";
|
||||
|
||||
export interface Mt760Message {
|
||||
sender: string;
|
||||
receiver: string;
|
||||
messageReference: string;
|
||||
fin: string;
|
||||
fields: Record<string, string>;
|
||||
}
|
||||
|
||||
function formatAmount(amount: number, currency: string): string {
|
||||
// SWIFT FIN amount: 3-letter currency + 15n,2d (max), decimal comma.
|
||||
if (amount < 0) throw new Error("MT760: amount must be non-negative");
|
||||
return `${currency}${amount.toFixed(2).replace(".", ",")}`;
|
||||
}
|
||||
|
||||
function yyMMdd(iso: string): string {
|
||||
// Accept YYYY-MM-DD and return YYMMDD.
|
||||
const m = /^(\d{4})-(\d{2})-(\d{2})$/.exec(iso);
|
||||
if (!m) throw new Error(`MT760: expiryDate must be YYYY-MM-DD, got '${iso}'`);
|
||||
return `${m[1].slice(2)}${m[2]}${m[3]}`;
|
||||
}
|
||||
|
||||
/**
|
||||
* Render an MT760 from an InstrumentTerms record. Uses the
|
||||
* block-structured FIN format (Block 1/2/4/5). Tag codes:
|
||||
*
|
||||
* :20: Transaction reference number
|
||||
* :23: Further identification
|
||||
* :27: Sequence of total (here: 1/1)
|
||||
* :30: Date of issue
|
||||
* :40C: Applicable rules (URDG 758, UCP 600)
|
||||
* :31D: Date and place of expiry
|
||||
* :50: Applicant
|
||||
* :52A: Issuing bank (BIC)
|
||||
* :59: Beneficiary name + account
|
||||
* :32B: Amount
|
||||
* :77C: Details of guarantee
|
||||
* :72Z: Sender to receiver info
|
||||
*/
|
||||
export function generateMt760(
|
||||
terms: InstrumentTerms,
|
||||
opts: { transactionReference: string; issueDate: string },
|
||||
): Mt760Message {
|
||||
const sender = terms.issuingBankBIC;
|
||||
const receiver = terms.beneficiaryBankBIC;
|
||||
const field32B = formatAmount(terms.amount, terms.currency);
|
||||
const field31D = `${yyMMdd(terms.expiryDate)}${terms.placeOfPresentation.toUpperCase()}`;
|
||||
|
||||
const fields: Record<string, string> = {
|
||||
"20": opts.transactionReference,
|
||||
"23": "ISSUE OF STANDBY LETTER OF CREDIT",
|
||||
"27": "1/1",
|
||||
"30": yyMMdd(opts.issueDate),
|
||||
"40C": terms.governingLaw,
|
||||
"31D": field31D,
|
||||
"50": terms.applicant,
|
||||
"52A": terms.issuingBankBIC,
|
||||
"59": [terms.beneficiaryName, terms.beneficiaryAccount].filter(Boolean).join("\n"),
|
||||
"32B": field32B,
|
||||
"77C": [
|
||||
`TEMPLATE/${terms.templateRef}`,
|
||||
`TEMPLATE_HASH/${terms.templateHash}`,
|
||||
`TENOR/${terms.tenor}`,
|
||||
].join("\n"),
|
||||
"72Z": `GOVLAW/${terms.governingLaw}`,
|
||||
};
|
||||
|
||||
// Build FIN block 4 body with :tag:value sequences.
|
||||
const block4 = Object.entries(fields)
|
||||
.map(([tag, value]) => `:${tag}:${value}`)
|
||||
.join("\n");
|
||||
|
||||
const block1 = `{1:F01${sender.padEnd(12, "X")}0000000000}`;
|
||||
const block2 = `{2:I760${receiver.padEnd(12, "X")}N}`;
|
||||
const block4Wrapped = `{4:\n${block4}\n-}`;
|
||||
const block5 = `{5:{CHK:${checksum(block4)}}}`;
|
||||
|
||||
const fin = `${block1}${block2}${block4Wrapped}${block5}`;
|
||||
|
||||
return { sender, receiver, messageReference: opts.transactionReference, fin, fields };
|
||||
}
|
||||
|
||||
/**
|
||||
* Deterministic SHA-256 over the canonical field list. Matches
|
||||
* InstrumentTerms.templateHash when all 11 required fields are filled
|
||||
* in with the SBLC template values.
|
||||
*/
|
||||
export function messageHash(msg: Mt760Message): string {
|
||||
const canonical = Object.entries(msg.fields)
|
||||
.sort(([a], [b]) => a.localeCompare(b))
|
||||
.map(([k, v]) => `${k}=${v}`)
|
||||
.join("\n");
|
||||
return createHash("sha256").update(canonical).digest("hex");
|
||||
}
|
||||
|
||||
function checksum(block4Body: string): string {
|
||||
return createHash("sha256").update(block4Body).digest("hex").slice(0, 12).toUpperCase();
|
||||
}
|
||||
94
orchestrator/src/services/swift/pacs009.ts
Normal file
94
orchestrator/src/services/swift/pacs009.ts
Normal file
@@ -0,0 +1,94 @@
|
||||
/**
|
||||
* pacs.009 — Financial Institution Credit Transfer (ISO 20022).
|
||||
*
|
||||
* Arch §4.3 Payment Messaging / Settlement Layer. Used for
|
||||
* **bank-to-bank** credit transfers (the interbank leg); pacs.008 is
|
||||
* for **customer-to-bank** PSP-initiated transfers. The gap-analysis
|
||||
* flagged that ExecutionCoordinator was generating pacs.008 for what
|
||||
* is actually a FI-to-FI settlement leg — this module fixes that.
|
||||
*
|
||||
* Reference: ISO 20022 Payments Maintenance 2019 / 2022,
|
||||
* pacs.009.001.08 schema.
|
||||
*/
|
||||
|
||||
import type { Plan, PlanStep } from "../../types/plan";
|
||||
|
||||
export interface Pacs009Options {
|
||||
messageId: string;
|
||||
creationDateTime?: string;
|
||||
instructingAgentBIC: string;
|
||||
instructedAgentBIC: string;
|
||||
debtorAgentBIC: string;
|
||||
creditorAgentBIC: string;
|
||||
endToEndId?: string;
|
||||
}
|
||||
|
||||
export interface Pacs009Result {
|
||||
messageId: string;
|
||||
endToEndId: string;
|
||||
xml: string;
|
||||
}
|
||||
|
||||
function bicCheck(bic: string, field: string): void {
|
||||
if (!/^[A-Z0-9]{8}([A-Z0-9]{3})?$/.test(bic)) {
|
||||
throw new Error(`pacs.009: ${field} must be a valid BIC, got '${bic}'`);
|
||||
}
|
||||
}
|
||||
|
||||
function findPayStep(plan: Plan): PlanStep {
|
||||
const step = plan.steps.find((s) => s.type === "pay");
|
||||
if (!step) throw new Error("pacs.009: plan must contain a 'pay' step");
|
||||
return step;
|
||||
}
|
||||
|
||||
/**
|
||||
* Render a pacs.009.001.08 XML message for the interbank leg of the
|
||||
* plan's `pay` step.
|
||||
*/
|
||||
export function generatePacs009(plan: Plan, opts: Pacs009Options): Pacs009Result {
|
||||
bicCheck(opts.instructingAgentBIC, "instructingAgentBIC");
|
||||
bicCheck(opts.instructedAgentBIC, "instructedAgentBIC");
|
||||
bicCheck(opts.debtorAgentBIC, "debtorAgentBIC");
|
||||
bicCheck(opts.creditorAgentBIC, "creditorAgentBIC");
|
||||
|
||||
const payStep = findPayStep(plan);
|
||||
const messageId = opts.messageId;
|
||||
const endToEndId = opts.endToEndId ?? `E2E-${plan.plan_id ?? messageId}`;
|
||||
const creDtTm = opts.creationDateTime ?? new Date().toISOString();
|
||||
const ccy = (payStep.asset ?? "USD").toUpperCase();
|
||||
const amount = payStep.amount.toFixed(2);
|
||||
const settleDate = creDtTm.split("T")[0];
|
||||
|
||||
const xml = `<?xml version="1.0" encoding="UTF-8"?>
|
||||
<Document xmlns="urn:iso:std:iso:20022:tech:xsd:pacs.009.001.08">
|
||||
<FICdtTrf>
|
||||
<GrpHdr>
|
||||
<MsgId>${escapeXml(messageId)}</MsgId>
|
||||
<CreDtTm>${escapeXml(creDtTm)}</CreDtTm>
|
||||
<NbOfTxs>1</NbOfTxs>
|
||||
<SttlmInf><SttlmMtd>INGA</SttlmMtd></SttlmInf>
|
||||
<InstgAgt><FinInstnId><BICFI>${opts.instructingAgentBIC}</BICFI></FinInstnId></InstgAgt>
|
||||
<InstdAgt><FinInstnId><BICFI>${opts.instructedAgentBIC}</BICFI></FinInstnId></InstdAgt>
|
||||
</GrpHdr>
|
||||
<CdtTrfTxInf>
|
||||
<PmtId>
|
||||
<InstrId>${escapeXml(messageId)}</InstrId>
|
||||
<EndToEndId>${escapeXml(endToEndId)}</EndToEndId>
|
||||
<TxId>${escapeXml(messageId)}</TxId>
|
||||
</PmtId>
|
||||
<IntrBkSttlmAmt Ccy="${ccy}">${amount}</IntrBkSttlmAmt>
|
||||
<IntrBkSttlmDt>${settleDate}</IntrBkSttlmDt>
|
||||
<Dbtr><FinInstnId><BICFI>${opts.debtorAgentBIC}</BICFI></FinInstnId></Dbtr>
|
||||
<DbtrAgt><FinInstnId><BICFI>${opts.debtorAgentBIC}</BICFI></FinInstnId></DbtrAgt>
|
||||
<CdtrAgt><FinInstnId><BICFI>${opts.creditorAgentBIC}</BICFI></FinInstnId></CdtrAgt>
|
||||
<Cdtr><FinInstnId><BICFI>${opts.creditorAgentBIC}</BICFI></FinInstnId></Cdtr>
|
||||
</CdtTrfTxInf>
|
||||
</FICdtTrf>
|
||||
</Document>`;
|
||||
|
||||
return { messageId, endToEndId, xml };
|
||||
}
|
||||
|
||||
function escapeXml(s: string): string {
|
||||
return s.replace(/[<>&"']/g, (c) => ({ "<": "<", ">": ">", "&": "&", '"': """, "'": "'" }[c]!));
|
||||
}
|
||||
@@ -1,3 +1,91 @@
|
||||
/**
|
||||
* Canonical data objects for the multi-layer atomic settlement architecture.
|
||||
*
|
||||
* A Plan models a single workflow-level atomic transaction composed of
|
||||
* multiple legs (DLT borrow/swap/repay, fiat payment, banking instrument
|
||||
* issuance). The combination must commit or abort as one unit.
|
||||
*/
|
||||
|
||||
import type { TransactionState } from "./transactionState";
|
||||
|
||||
export type PlanStepType = "borrow" | "swap" | "repay" | "pay" | "issueInstrument";
|
||||
|
||||
export interface BeneficiaryCoordinates {
|
||||
/** ISO 20022 / SEPA IBAN */
|
||||
IBAN?: string;
|
||||
/** BIC / SWIFT code of the beneficiary bank */
|
||||
BIC?: string;
|
||||
/** Beneficiary legal name */
|
||||
name?: string;
|
||||
/** Optional beneficiary bank legal name (for FI credit transfers) */
|
||||
bankName?: string;
|
||||
}
|
||||
|
||||
/**
|
||||
* Instrument-leg fields — used by `type: "issueInstrument"` steps.
|
||||
*
|
||||
* Based on the Emirates Islamic beneficiary-format SBLC / MT760 template.
|
||||
* Each field corresponds to a MT760 / UCP 600 concept:
|
||||
*
|
||||
* - applicant MT760 field 50
|
||||
* - issuingBankBIC MT760 sender / field 52a
|
||||
* - beneficiaryBankBIC MT760 field 57a (advising bank)
|
||||
* - beneficiaryName MT760 field 59
|
||||
* - beneficiaryAccount MT760 field 59 (secondary)
|
||||
* - amount + currency MT760 field 32B
|
||||
* - tenor MT760 field 42C (e.g. "90D", "1Y")
|
||||
* - expiryDate MT760 field 31D (YYYY-MM-DD)
|
||||
* - placeOfPresentation MT760 field 78 / 49
|
||||
* - governingLaw MT760 field 40E (e.g. "URDG 758", "UCP 600", "ISP98")
|
||||
* - templateRef + templateHash pointer + integrity hash of the agreed text
|
||||
*/
|
||||
export interface InstrumentTerms {
|
||||
applicant: string;
|
||||
issuingBankBIC: string;
|
||||
beneficiaryBankBIC: string;
|
||||
beneficiaryName: string;
|
||||
beneficiaryAccount?: string;
|
||||
amount: number;
|
||||
currency: string;
|
||||
tenor: string;
|
||||
expiryDate: string;
|
||||
placeOfPresentation: string;
|
||||
governingLaw: string;
|
||||
templateRef: string;
|
||||
/** SHA-256 of the agreed instrument text, hex-encoded without 0x prefix. */
|
||||
templateHash: string;
|
||||
}
|
||||
|
||||
export interface PlanStep {
|
||||
type: PlanStepType;
|
||||
asset?: string;
|
||||
amount: number;
|
||||
from?: string;
|
||||
to?: string;
|
||||
collateralRef?: string;
|
||||
beneficiary?: BeneficiaryCoordinates;
|
||||
/** Populated iff `type === "issueInstrument"`. */
|
||||
instrument?: InstrumentTerms;
|
||||
}
|
||||
|
||||
/**
|
||||
* Participant entry in the registry. Each transaction binds at least
|
||||
* one role per participant. Used for segregation-of-duties enforcement
|
||||
* on state transitions.
|
||||
*/
|
||||
export interface Participant {
|
||||
id: string;
|
||||
role:
|
||||
| "applicant"
|
||||
| "issuing_bank"
|
||||
| "beneficiary_bank"
|
||||
| "beneficiary"
|
||||
| "coordinator"
|
||||
| "observer";
|
||||
lei?: string;
|
||||
did?: string;
|
||||
}
|
||||
|
||||
export interface Plan {
|
||||
plan_id?: string;
|
||||
creator: string;
|
||||
@@ -7,20 +95,10 @@ export interface Plan {
|
||||
signature?: string;
|
||||
plan_hash?: string;
|
||||
created_at?: string;
|
||||
/** Legacy execution status (pending | complete | aborted). */
|
||||
status?: string;
|
||||
/** Full 12-state workflow state (architecture note §8). */
|
||||
transaction_state?: TransactionState;
|
||||
/** Optional participant registry. */
|
||||
participants?: Participant[];
|
||||
}
|
||||
|
||||
export interface PlanStep {
|
||||
type: "borrow" | "swap" | "repay" | "pay";
|
||||
asset?: string;
|
||||
amount: number;
|
||||
from?: string;
|
||||
to?: string;
|
||||
collateralRef?: string;
|
||||
beneficiary?: {
|
||||
IBAN?: string;
|
||||
BIC?: string;
|
||||
name?: string;
|
||||
};
|
||||
}
|
||||
|
||||
|
||||
87
orchestrator/src/types/transactionState.ts
Normal file
87
orchestrator/src/types/transactionState.ts
Normal file
@@ -0,0 +1,87 @@
|
||||
/**
|
||||
* Transaction state machine — architecture note §8–§9.
|
||||
*
|
||||
* Workflow-level atomicity is enforced by constraining the plan lifecycle to
|
||||
* this set of states and this transition table. The coordinator and the
|
||||
* database CHECK constraint both reference this module so the values are
|
||||
* source-of-truth identical.
|
||||
*/
|
||||
|
||||
export const TRANSACTION_STATES = [
|
||||
"DRAFT",
|
||||
"INITIATED",
|
||||
"PRECONDITIONS_PENDING",
|
||||
"READY_FOR_PREPARE",
|
||||
"PREPARED",
|
||||
"EXECUTING",
|
||||
"PARTIALLY_EXECUTED",
|
||||
"VALIDATING",
|
||||
"COMMITTED",
|
||||
"ABORTED",
|
||||
"UNWIND_PENDING",
|
||||
"CLOSED",
|
||||
] as const;
|
||||
|
||||
export type TransactionState = (typeof TRANSACTION_STATES)[number];
|
||||
|
||||
export const TERMINAL_STATES: ReadonlySet<TransactionState> = new Set(["CLOSED"]);
|
||||
|
||||
/**
|
||||
* Architecture note §9.1 — permitted high-level transitions.
|
||||
*
|
||||
* Keys are `from` states; values are the set of legal `to` states.
|
||||
* Any transition not listed here must be rejected.
|
||||
*/
|
||||
export const ALLOWED_TRANSITIONS: Readonly<Record<TransactionState, ReadonlyArray<TransactionState>>> = {
|
||||
DRAFT: ["INITIATED"],
|
||||
INITIATED: ["PRECONDITIONS_PENDING"],
|
||||
PRECONDITIONS_PENDING: ["READY_FOR_PREPARE", "ABORTED"],
|
||||
READY_FOR_PREPARE: ["PREPARED", "ABORTED"],
|
||||
PREPARED: ["EXECUTING", "ABORTED"],
|
||||
EXECUTING: ["PARTIALLY_EXECUTED", "VALIDATING", "ABORTED"],
|
||||
PARTIALLY_EXECUTED: ["VALIDATING", "ABORTED"],
|
||||
VALIDATING: ["COMMITTED", "ABORTED"],
|
||||
COMMITTED: ["CLOSED"],
|
||||
ABORTED: ["UNWIND_PENDING", "CLOSED"],
|
||||
UNWIND_PENDING: ["CLOSED"],
|
||||
CLOSED: [],
|
||||
};
|
||||
|
||||
export function canTransition(from: TransactionState, to: TransactionState): boolean {
|
||||
return ALLOWED_TRANSITIONS[from]?.includes(to) ?? false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Actor roles allowed to execute a transition. The coordinator may always
|
||||
* drive any transition programmatically; approver / releaser roles are
|
||||
* constrained for segregation-of-duties purposes (architecture note §13).
|
||||
*/
|
||||
export type ActorRole =
|
||||
| "coordinator"
|
||||
| "approver"
|
||||
| "releaser"
|
||||
| "validator"
|
||||
| "exception_manager"
|
||||
| "operator";
|
||||
|
||||
/**
|
||||
* Transitions that require a non-coordinator human actor (segregation of duties).
|
||||
* Per architecture note §13: "segregation of duties for approval and release
|
||||
* actions".
|
||||
*/
|
||||
export const SOD_REQUIRED_TRANSITIONS: ReadonlySet<`${TransactionState}->${TransactionState}`> = new Set([
|
||||
"READY_FOR_PREPARE->PREPARED", // release approval
|
||||
"PREPARED->EXECUTING", // release action
|
||||
"VALIDATING->COMMITTED", // final commit approval
|
||||
"ABORTED->UNWIND_PENDING", // unwind authorization
|
||||
]);
|
||||
|
||||
/**
|
||||
* Role required for each segregation-of-duties checkpoint.
|
||||
*/
|
||||
export const ROLE_FOR_TRANSITION: Readonly<Record<string, ActorRole>> = {
|
||||
"READY_FOR_PREPARE->PREPARED": "approver",
|
||||
"PREPARED->EXECUTING": "releaser",
|
||||
"VALIDATING->COMMITTED": "approver",
|
||||
"ABORTED->UNWIND_PENDING": "exception_manager",
|
||||
};
|
||||
149
orchestrator/tests/unit/eventBus.test.ts
Normal file
149
orchestrator/tests/unit/eventBus.test.ts
Normal file
@@ -0,0 +1,149 @@
|
||||
import { describe, it, expect, beforeEach, jest } from "@jest/globals";
|
||||
|
||||
type Row = {
|
||||
id: string;
|
||||
plan_id: string;
|
||||
type: string;
|
||||
actor: string | null;
|
||||
payload: Record<string, unknown>;
|
||||
payload_hash: string;
|
||||
prev_hash: string | null;
|
||||
signature: string;
|
||||
created_at: string;
|
||||
};
|
||||
|
||||
const rows: Row[] = [];
|
||||
let idSeq = 0;
|
||||
|
||||
jest.mock("../../src/db/postgres", () => ({
|
||||
query: async (sql: string, params: unknown[] = []) => {
|
||||
if (sql.startsWith("SELECT signature")) {
|
||||
const planId = params[0] as string;
|
||||
const matches = rows.filter((r) => r.plan_id === planId);
|
||||
if (matches.length === 0) return [];
|
||||
return [{ signature: matches[matches.length - 1].signature }];
|
||||
}
|
||||
if (sql.startsWith("INSERT INTO events")) {
|
||||
const [plan_id, type, actor, payloadJson, payload_hash, prev_hash, signature] =
|
||||
params as [string, string, string | null, string, string, string | null, string];
|
||||
const rec: Row = {
|
||||
id: `evt-${++idSeq}`,
|
||||
plan_id,
|
||||
type,
|
||||
actor,
|
||||
payload: JSON.parse(payloadJson),
|
||||
payload_hash,
|
||||
prev_hash,
|
||||
signature,
|
||||
created_at: new Date(Date.now() + idSeq).toISOString(),
|
||||
};
|
||||
rows.push(rec);
|
||||
return [rec];
|
||||
}
|
||||
if (sql.startsWith("SELECT id, plan_id")) {
|
||||
const planId = params[0] as string;
|
||||
return rows.filter((r) => r.plan_id === planId);
|
||||
}
|
||||
return [];
|
||||
},
|
||||
}));
|
||||
|
||||
import { publish, getEventsForPlan, verifyChain, EVENT_TYPES } from "../../src/services/eventBus";
|
||||
|
||||
describe("Event Bus", () => {
|
||||
beforeEach(() => {
|
||||
rows.length = 0;
|
||||
idSeq = 0;
|
||||
});
|
||||
|
||||
it("EVENT_TYPES covers all arch §7.2 categories", () => {
|
||||
expect(EVENT_TYPES).toContain("transaction.created");
|
||||
expect(EVENT_TYPES).toContain("transaction.committed");
|
||||
expect(EVENT_TYPES).toContain("transaction.aborted");
|
||||
expect(EVENT_TYPES).toContain("payment.settled");
|
||||
expect(EVENT_TYPES).toContain("instrument.dispatched");
|
||||
expect(EVENT_TYPES.length).toBe(15);
|
||||
});
|
||||
|
||||
it("publish persists with payload_hash, prev_hash=null, and signature", async () => {
|
||||
const rec = await publish({
|
||||
planId: "p-1",
|
||||
type: "transaction.created",
|
||||
actor: "coordinator",
|
||||
payload: { foo: 1 },
|
||||
});
|
||||
expect(rec.id).toMatch(/evt-/);
|
||||
expect(rec.prev_hash).toBeNull();
|
||||
expect(rec.payload_hash).toMatch(/^[0-9a-f]{64}$/);
|
||||
expect(rec.signature).toMatch(/^[0-9a-f]{64}$/);
|
||||
expect(rec.payload).toEqual({ foo: 1 });
|
||||
});
|
||||
|
||||
it("prev_hash chains consecutive events for the same plan", async () => {
|
||||
const a = await publish({ planId: "p-1", type: "transaction.created" });
|
||||
const b = await publish({ planId: "p-1", type: "participants.authorized" });
|
||||
const c = await publish({ planId: "p-1", type: "preconditions.satisfied" });
|
||||
expect(a.prev_hash).toBeNull();
|
||||
expect(b.prev_hash).toBe(a.signature);
|
||||
expect(c.prev_hash).toBe(b.signature);
|
||||
});
|
||||
|
||||
it("events are isolated per plan_id", async () => {
|
||||
const a1 = await publish({ planId: "p-1", type: "transaction.created" });
|
||||
const b1 = await publish({ planId: "p-2", type: "transaction.created" });
|
||||
expect(a1.prev_hash).toBeNull();
|
||||
expect(b1.prev_hash).toBeNull();
|
||||
});
|
||||
|
||||
it("verifyChain returns ok for an untampered chain", async () => {
|
||||
await publish({ planId: "p-1", type: "transaction.created" });
|
||||
await publish({ planId: "p-1", type: "transaction.prepared" });
|
||||
await publish({ planId: "p-1", type: "transaction.committed" });
|
||||
const result = await verifyChain("p-1");
|
||||
expect(result.ok).toBe(true);
|
||||
});
|
||||
|
||||
it("verifyChain detects payload tampering", async () => {
|
||||
await publish({ planId: "p-1", type: "transaction.created", payload: { amount: 100 } });
|
||||
await publish({ planId: "p-1", type: "transaction.committed" });
|
||||
rows[0].payload = { amount: 999_999 }; // tamper
|
||||
const result = await verifyChain("p-1");
|
||||
expect(result.ok).toBe(false);
|
||||
if (!result.ok) {
|
||||
expect(result.brokenAt).toBe(0);
|
||||
expect(result.reason).toBe("payload_hash mismatch");
|
||||
}
|
||||
});
|
||||
|
||||
it("verifyChain detects signature tampering", async () => {
|
||||
await publish({ planId: "p-1", type: "transaction.created" });
|
||||
await publish({ planId: "p-1", type: "transaction.committed" });
|
||||
rows[1].signature = "0".repeat(64); // tamper
|
||||
const result = await verifyChain("p-1");
|
||||
expect(result.ok).toBe(false);
|
||||
if (!result.ok) {
|
||||
expect(result.brokenAt).toBe(1);
|
||||
}
|
||||
});
|
||||
|
||||
it("verifyChain detects broken prev_hash link", async () => {
|
||||
await publish({ planId: "p-1", type: "transaction.created" });
|
||||
await publish({ planId: "p-1", type: "transaction.committed" });
|
||||
rows[1].prev_hash = "0".repeat(64);
|
||||
const result = await verifyChain("p-1");
|
||||
expect(result.ok).toBe(false);
|
||||
if (!result.ok) {
|
||||
expect(result.reason).toBe("prev_hash mismatch");
|
||||
}
|
||||
});
|
||||
|
||||
it("getEventsForPlan returns events in chronological order", async () => {
|
||||
await publish({ planId: "p-1", type: "transaction.created" });
|
||||
await publish({ planId: "p-1", type: "transaction.prepared" });
|
||||
const events = await getEventsForPlan("p-1");
|
||||
expect(events.map((e) => e.type)).toEqual([
|
||||
"transaction.created",
|
||||
"transaction.prepared",
|
||||
]);
|
||||
});
|
||||
});
|
||||
69
orchestrator/tests/unit/exceptionManager.test.ts
Normal file
69
orchestrator/tests/unit/exceptionManager.test.ts
Normal file
@@ -0,0 +1,69 @@
|
||||
import { describe, it, expect } from "@jest/globals";
|
||||
import {
|
||||
Business,
|
||||
Control,
|
||||
Data,
|
||||
SettlementException,
|
||||
Timing,
|
||||
classify,
|
||||
route,
|
||||
} from "../../src/services/exceptionManager";
|
||||
|
||||
describe("ExceptionManager — architecture note §12", () => {
|
||||
describe("classification taxonomy", () => {
|
||||
it("builds the four §12 classes via factory functions", () => {
|
||||
expect(Timing.dispatch().exceptionClass).toBe("timing");
|
||||
expect(Timing.dispatch().code).toBe("dispatch_timeout");
|
||||
|
||||
expect(Data.valueMismatch().exceptionClass).toBe("data");
|
||||
expect(Data.documentHashMismatch().code).toBe("document_hash_mismatch");
|
||||
|
||||
expect(Control.unauthorized("nobody").exceptionClass).toBe("control");
|
||||
expect(Control.duplicate("ev-1").code).toBe("duplicate_event");
|
||||
|
||||
expect(Business.manualStop("operator halted").exceptionClass).toBe("business");
|
||||
expect(Business.policyViolation({ rule: "LTV" }).code).toBe("policy_rule_violation");
|
||||
});
|
||||
|
||||
it("classify() tags network/timeout errors as system/network_error", () => {
|
||||
const ex = classify(new Error("ETIMEDOUT connect"));
|
||||
expect(ex.exceptionClass).toBe("system");
|
||||
expect(ex.code).toBe("network_error");
|
||||
});
|
||||
|
||||
it("classify() tags postgres errors as system/database_error", () => {
|
||||
const ex = classify(new Error("postgres connection refused"));
|
||||
expect(ex.exceptionClass).toBe("system");
|
||||
expect(ex.code).toBe("database_error");
|
||||
});
|
||||
|
||||
it("classify() is idempotent for SettlementException inputs", () => {
|
||||
const original = Data.valueMismatch({ field: "amount" });
|
||||
expect(classify(original)).toBe(original);
|
||||
});
|
||||
});
|
||||
|
||||
describe("deterministic routing", () => {
|
||||
const cases: Array<[SettlementException, string]> = [
|
||||
[Timing.dispatch(), "retry"],
|
||||
[Timing.settlement(), "retry"],
|
||||
[Data.valueMismatch(), "abort_transaction"],
|
||||
[Data.documentHashMismatch(), "abort_transaction"],
|
||||
[Control.missingApproval(), "escalate"],
|
||||
[Control.unauthorized("x"), "escalate"],
|
||||
[Control.duplicate("ev"), "dead_letter"],
|
||||
[Business.manualStop("halt"), "abort_transaction"],
|
||||
[Business.policyViolation({ rule: "LTV" }), "escalate"],
|
||||
];
|
||||
|
||||
it.each(cases)("routes %j to %s", (ex, expected) => {
|
||||
expect(route(ex)).toBe(expected);
|
||||
});
|
||||
|
||||
it("network errors retry; non-network system errors dead-letter", () => {
|
||||
expect(route(classify(new Error("ETIMEDOUT")))).toBe("retry");
|
||||
const dbErr = classify(new Error("postgres broken"));
|
||||
expect(route(dbErr)).toBe("dead_letter");
|
||||
});
|
||||
});
|
||||
});
|
||||
62
orchestrator/tests/unit/notaryChain.test.ts
Normal file
62
orchestrator/tests/unit/notaryChain.test.ts
Normal file
@@ -0,0 +1,62 @@
|
||||
import { describe, it, expect, beforeEach } from "@jest/globals";
|
||||
import {
|
||||
__resetForTests,
|
||||
anchorPlan,
|
||||
computePlanHash,
|
||||
finalizeAnchor,
|
||||
planIdToBytes32,
|
||||
} from "../../src/services/notaryChain";
|
||||
import type { Plan } from "../../src/types/plan";
|
||||
|
||||
const FIXTURE_PLAN: Plan = {
|
||||
plan_id: "11111111-2222-3333-4444-555555555555",
|
||||
creator: "0xabc",
|
||||
steps: [{ type: "pay", amount: 100, asset: "USD" }],
|
||||
};
|
||||
|
||||
describe("NotaryChain adapter", () => {
|
||||
beforeEach(() => __resetForTests());
|
||||
|
||||
describe("helpers", () => {
|
||||
it("planIdToBytes32 is deterministic and 32 bytes", () => {
|
||||
const a = planIdToBytes32("p-1");
|
||||
const b = planIdToBytes32("p-1");
|
||||
expect(a).toBe(b);
|
||||
expect(a).toMatch(/^0x[0-9a-f]{64}$/);
|
||||
});
|
||||
|
||||
it("planIdToBytes32 collision-resistant across different ids", () => {
|
||||
expect(planIdToBytes32("a")).not.toBe(planIdToBytes32("b"));
|
||||
});
|
||||
|
||||
it("computePlanHash is deterministic and sha256", () => {
|
||||
const h1 = computePlanHash(FIXTURE_PLAN);
|
||||
const h2 = computePlanHash(FIXTURE_PLAN);
|
||||
expect(h1).toBe(h2);
|
||||
expect(h1).toMatch(/^0x[0-9a-f]{64}$/);
|
||||
});
|
||||
});
|
||||
|
||||
describe("mock fallback (envs unset)", () => {
|
||||
it("anchorPlan returns mode=mock with planHash when unconfigured", async () => {
|
||||
const result = await anchorPlan(FIXTURE_PLAN, {});
|
||||
expect(result.mode).toBe("mock");
|
||||
expect(result.planHash).toMatch(/^0x[0-9a-f]{64}$/);
|
||||
expect(result.txHash).toBeUndefined();
|
||||
});
|
||||
|
||||
it("finalizeAnchor returns mode=mock when unconfigured", async () => {
|
||||
const result = await finalizeAnchor(FIXTURE_PLAN.plan_id!, true, {});
|
||||
expect(result.mode).toBe("mock");
|
||||
expect(result.txHash).toBeUndefined();
|
||||
});
|
||||
|
||||
it("anchorPlan stays on the mock path when only some envs are set", async () => {
|
||||
const result = await anchorPlan(FIXTURE_PLAN, {
|
||||
rpcUrl: "https://rpc.d-bis.org",
|
||||
// contractAddress + privateKey missing
|
||||
});
|
||||
expect(result.mode).toBe("mock");
|
||||
});
|
||||
});
|
||||
});
|
||||
82
orchestrator/tests/unit/planValidation.instrument.test.ts
Normal file
82
orchestrator/tests/unit/planValidation.instrument.test.ts
Normal file
@@ -0,0 +1,82 @@
|
||||
import { describe, it, expect } from "@jest/globals";
|
||||
import { validatePlan } from "../../src/services/planValidation";
|
||||
import type { InstrumentTerms, Plan } from "../../src/types/plan";
|
||||
|
||||
const goodTerms: InstrumentTerms = {
|
||||
applicant: "Solace Bank Group PLC",
|
||||
issuingBankBIC: "SOLBAE22",
|
||||
beneficiaryBankBIC: "MEBLAEAD", // Emirates Islamic BIC prefix example
|
||||
beneficiaryName: "Acme Trading LLC",
|
||||
beneficiaryAccount: "AE070331234567890123456",
|
||||
amount: 1_000_000,
|
||||
currency: "USD",
|
||||
tenor: "90D",
|
||||
expiryDate: "2026-06-30",
|
||||
placeOfPresentation: "Dubai, UAE",
|
||||
governingLaw: "URDG 758",
|
||||
templateRef: "EIB-SBLC-v3.2",
|
||||
templateHash:
|
||||
"a".repeat(64), // dummy sha256
|
||||
};
|
||||
|
||||
function planWith(terms: Partial<InstrumentTerms> | null): Plan {
|
||||
return {
|
||||
creator: "solace-ops-01",
|
||||
steps: [
|
||||
{
|
||||
type: "issueInstrument",
|
||||
amount: terms?.amount ?? 1_000_000,
|
||||
instrument: terms === null ? undefined : ({ ...goodTerms, ...terms } as InstrumentTerms),
|
||||
},
|
||||
],
|
||||
};
|
||||
}
|
||||
|
||||
describe("validatePlan — issueInstrument step", () => {
|
||||
it("accepts a well-formed SBLC step", () => {
|
||||
const result = validatePlan(planWith({}));
|
||||
expect(result.valid).toBe(true);
|
||||
expect(result.errors).toHaveLength(0);
|
||||
});
|
||||
|
||||
it("rejects a step missing the instrument object", () => {
|
||||
const result = validatePlan(planWith(null));
|
||||
expect(result.valid).toBe(false);
|
||||
expect(result.errors[0]).toMatch(/missing instrument terms/);
|
||||
});
|
||||
|
||||
it("rejects an invalid BIC", () => {
|
||||
const result = validatePlan(planWith({ issuingBankBIC: "NOTABIC" }));
|
||||
expect(result.valid).toBe(false);
|
||||
expect(result.errors.join("\n")).toMatch(/issuingBankBIC is not a valid BIC/);
|
||||
});
|
||||
|
||||
it("rejects a non-ISO-4217 currency", () => {
|
||||
const result = validatePlan(planWith({ currency: "usd" }));
|
||||
expect(result.valid).toBe(false);
|
||||
expect(result.errors.join("\n")).toMatch(/currency must be ISO 4217/);
|
||||
});
|
||||
|
||||
it("rejects a non-ISO-8601 expiry date", () => {
|
||||
const result = validatePlan(planWith({ expiryDate: "30-06-2026" }));
|
||||
expect(result.valid).toBe(false);
|
||||
expect(result.errors.join("\n")).toMatch(/expiryDate must be YYYY-MM-DD/);
|
||||
});
|
||||
|
||||
it("rejects a non-sha256 template hash", () => {
|
||||
const result = validatePlan(planWith({ templateHash: "deadbeef" }));
|
||||
expect(result.valid).toBe(false);
|
||||
expect(result.errors.join("\n")).toMatch(/templateHash must be 64 hex chars/);
|
||||
});
|
||||
|
||||
it("rejects an instrument with non-positive amount", () => {
|
||||
const result = validatePlan(planWith({ amount: 0 }));
|
||||
expect(result.valid).toBe(false);
|
||||
expect(result.errors.join("\n")).toMatch(/instrument.amount must be > 0/);
|
||||
});
|
||||
|
||||
it("accepts 11-char branched BIC", () => {
|
||||
const result = validatePlan(planWith({ issuingBankBIC: "SOLBAE22XXX" }));
|
||||
expect(result.valid).toBe(true);
|
||||
});
|
||||
});
|
||||
169
orchestrator/tests/unit/swift.test.ts
Normal file
169
orchestrator/tests/unit/swift.test.ts
Normal file
@@ -0,0 +1,169 @@
|
||||
import { describe, it, expect } from "@jest/globals";
|
||||
import {
|
||||
generateMt760,
|
||||
messageHash,
|
||||
generatePacs009,
|
||||
generateMt202,
|
||||
parseCamt025,
|
||||
parseCamt054,
|
||||
parseCamt,
|
||||
reconcileCamt054,
|
||||
} from "../../src/services/swift";
|
||||
import type { InstrumentTerms, Plan } from "../../src/types/plan";
|
||||
|
||||
const TERMS: InstrumentTerms = {
|
||||
applicant: "ACME TRADING FZE",
|
||||
issuingBankBIC: "EBILAEAD",
|
||||
beneficiaryBankBIC: "EMBKAEAD",
|
||||
beneficiaryName: "BLUE OCEAN SHIPPING LLC",
|
||||
beneficiaryAccount: "AE070260001015104203701",
|
||||
amount: 1_500_000,
|
||||
currency: "USD",
|
||||
tenor: "365D",
|
||||
expiryDate: "2027-04-18",
|
||||
placeOfPresentation: "DUBAI",
|
||||
governingLaw: "URDG 758",
|
||||
templateRef: "EIB-SBLC-2024-01",
|
||||
templateHash: "a".repeat(64),
|
||||
};
|
||||
|
||||
const PLAN: Plan = {
|
||||
plan_id: "11111111-2222-3333-4444-555555555555",
|
||||
creator: "0xabc",
|
||||
steps: [{ type: "pay", asset: "USD", amount: 1_500_000 }],
|
||||
};
|
||||
|
||||
describe("SWIFT gateway — MT760", () => {
|
||||
it("renders all 12 required tags", () => {
|
||||
const msg = generateMt760(TERMS, { transactionReference: "TXN1", issueDate: "2026-04-18" });
|
||||
expect(msg.sender).toBe("EBILAEAD");
|
||||
expect(msg.receiver).toBe("EMBKAEAD");
|
||||
expect(msg.fields["20"]).toBe("TXN1");
|
||||
expect(msg.fields["30"]).toBe("260418");
|
||||
expect(msg.fields["32B"]).toBe("USD1500000,00");
|
||||
expect(msg.fields["31D"]).toBe("270418DUBAI");
|
||||
expect(msg.fin).toContain("{1:F01EBILAEADXXXX0000000000}");
|
||||
expect(msg.fin).toContain("{2:I760EMBKAEADXXXXN}");
|
||||
expect(msg.fin).toContain(":32B:USD1500000,00");
|
||||
});
|
||||
|
||||
it("rejects malformed expiry date", () => {
|
||||
expect(() =>
|
||||
generateMt760({ ...TERMS, expiryDate: "not-a-date" }, { transactionReference: "T", issueDate: "2026-04-18" }),
|
||||
).toThrow(/YYYY-MM-DD/);
|
||||
});
|
||||
|
||||
it("rejects negative amount", () => {
|
||||
expect(() =>
|
||||
generateMt760({ ...TERMS, amount: -1 }, { transactionReference: "T", issueDate: "2026-04-18" }),
|
||||
).toThrow(/non-negative/);
|
||||
});
|
||||
|
||||
it("messageHash is deterministic", () => {
|
||||
const a = generateMt760(TERMS, { transactionReference: "T", issueDate: "2026-04-18" });
|
||||
const b = generateMt760(TERMS, { transactionReference: "T", issueDate: "2026-04-18" });
|
||||
expect(messageHash(a)).toBe(messageHash(b));
|
||||
expect(messageHash(a)).toMatch(/^[0-9a-f]{64}$/);
|
||||
});
|
||||
});
|
||||
|
||||
describe("SWIFT gateway — pacs.009", () => {
|
||||
const opts = {
|
||||
messageId: "MSG-1",
|
||||
creationDateTime: "2026-04-18T10:00:00Z",
|
||||
instructingAgentBIC: "EBILAEAD",
|
||||
instructedAgentBIC: "EMBKAEAD",
|
||||
debtorAgentBIC: "EBILAEAD",
|
||||
creditorAgentBIC: "EMBKAEAD",
|
||||
};
|
||||
|
||||
it("emits well-formed pacs.009.001.08 XML", () => {
|
||||
const result = generatePacs009(PLAN, opts);
|
||||
expect(result.messageId).toBe("MSG-1");
|
||||
expect(result.xml).toContain("urn:iso:std:iso:20022:tech:xsd:pacs.009.001.08");
|
||||
expect(result.xml).toContain("<IntrBkSttlmAmt Ccy=\"USD\">1500000.00</IntrBkSttlmAmt>");
|
||||
expect(result.xml).toContain("<BICFI>EBILAEAD</BICFI>");
|
||||
expect(result.xml).toContain("<BICFI>EMBKAEAD</BICFI>");
|
||||
expect(result.endToEndId).toBe(`E2E-${PLAN.plan_id}`);
|
||||
});
|
||||
|
||||
it("rejects invalid BIC", () => {
|
||||
expect(() => generatePacs009(PLAN, { ...opts, instructingAgentBIC: "BAD" })).toThrow(/BIC/);
|
||||
});
|
||||
|
||||
it("requires a pay step", () => {
|
||||
expect(() =>
|
||||
generatePacs009({ ...PLAN, steps: [{ type: "borrow", amount: 1, asset: "USD" }] }, opts),
|
||||
).toThrow(/pay/);
|
||||
});
|
||||
});
|
||||
|
||||
describe("SWIFT gateway — MT202", () => {
|
||||
it("renders the 6 required tags", () => {
|
||||
const msg = generateMt202(PLAN, {
|
||||
transactionReference: "TXN-1",
|
||||
valueDate: "2026-04-18",
|
||||
sendingInstitution: "EBILAEAD",
|
||||
receivingInstitution: "EMBKAEAD",
|
||||
beneficiaryInstitution: "EMBKAEAD",
|
||||
});
|
||||
expect(msg.fields["20"]).toBe("TXN-1");
|
||||
expect(msg.fields["32A"]).toBe("260418USD1500000,00");
|
||||
expect(msg.fields["58A"]).toBe("EMBKAEAD");
|
||||
expect(msg.fin).toContain(":20:TXN-1");
|
||||
});
|
||||
});
|
||||
|
||||
describe("SWIFT gateway — camt parsers", () => {
|
||||
it("parseCamt025 extracts status + ids", () => {
|
||||
const xml = `<?xml version="1.0"?><Document xmlns="urn:iso:std:iso:20022:tech:xsd:camt.025.001.05"><Rct><MsgId>R1</MsgId><OrgnlMsgId>MSG-1</OrgnlMsgId><Cd>ACSC</Cd><CreDtTm>2026-04-18T10:01:00Z</CreDtTm></Rct></Document>`;
|
||||
const r = parseCamt025(xml);
|
||||
expect(r.type).toBe("camt.025");
|
||||
expect(r.originalMessageId).toBe("MSG-1");
|
||||
expect(r.status).toBe("ACSC");
|
||||
});
|
||||
|
||||
it("parseCamt054 extracts credit amount + endToEndId", () => {
|
||||
const xml = `<?xml version="1.0"?><Document xmlns="urn:iso:std:iso:20022:tech:xsd:camt.054.001.08"><BkToCstmrDbtCdtNtfctn><MsgId>N1</MsgId><Ntfctn><Ntry><Amt Ccy="USD">1500000.00</Amt><CdtDbtInd>CRDT</CdtDbtInd><BookgDt><Dt>2026-04-18</Dt></BookgDt><ValDt><Dt>2026-04-18</Dt></ValDt><NtryDtls><TxDtls><Refs><EndToEndId>E2E-plan-1</EndToEndId></Refs></TxDtls></NtryDtls></Ntry></Ntfctn></BkToCstmrDbtCdtNtfctn></Document>`;
|
||||
const r = parseCamt054(xml);
|
||||
expect(r.type).toBe("camt.054");
|
||||
expect(r.creditDebitIndicator).toBe("CRDT");
|
||||
expect(r.amount).toBe(1_500_000);
|
||||
expect(r.currency).toBe("USD");
|
||||
expect(r.endToEndId).toBe("E2E-plan-1");
|
||||
});
|
||||
|
||||
it("parseCamt dispatches on xmlns marker", () => {
|
||||
const xml025 = `<Document xmlns="urn:iso:std:iso:20022:tech:xsd:camt.025.001.05"><Rct><MsgId>R</MsgId><OrgnlMsgId>O</OrgnlMsgId><Cd>ACSC</Cd></Rct></Document>`;
|
||||
expect(parseCamt(xml025).type).toBe("camt.025");
|
||||
});
|
||||
|
||||
it("parseCamt rejects unknown xmlns", () => {
|
||||
expect(() => parseCamt('<Document xmlns="urn:other"/>')).toThrow(/unsupported/);
|
||||
});
|
||||
|
||||
it("reconcileCamt054 returns empty array when everything matches", () => {
|
||||
const msg = {
|
||||
type: "camt.054" as const,
|
||||
messageId: "N1",
|
||||
creditDebitIndicator: "CRDT" as const,
|
||||
amount: 1_500_000,
|
||||
currency: "USD",
|
||||
endToEndId: "E2E-1",
|
||||
};
|
||||
expect(reconcileCamt054(msg, { amount: 1_500_000, currency: "USD", endToEndId: "E2E-1" })).toEqual([]);
|
||||
});
|
||||
|
||||
it("reconcileCamt054 reports amount + currency + direction mismatches", () => {
|
||||
const msg = {
|
||||
type: "camt.054" as const,
|
||||
messageId: "N1",
|
||||
creditDebitIndicator: "DBIT" as const,
|
||||
amount: 1_400_000,
|
||||
currency: "EUR",
|
||||
endToEndId: "E2E-2",
|
||||
};
|
||||
const result = reconcileCamt054(msg, { amount: 1_500_000, currency: "USD", endToEndId: "E2E-1" });
|
||||
expect(result.map((m) => m.field).sort()).toEqual(["amount", "creditDebitIndicator", "currency", "endToEndId"]);
|
||||
});
|
||||
});
|
||||
85
orchestrator/tests/unit/transactionState.test.ts
Normal file
85
orchestrator/tests/unit/transactionState.test.ts
Normal file
@@ -0,0 +1,85 @@
|
||||
import { describe, it, expect } from "@jest/globals";
|
||||
import {
|
||||
ALLOWED_TRANSITIONS,
|
||||
ROLE_FOR_TRANSITION,
|
||||
SOD_REQUIRED_TRANSITIONS,
|
||||
TRANSACTION_STATES,
|
||||
canTransition,
|
||||
} from "../../src/types/transactionState";
|
||||
|
||||
describe("Transaction state machine (architecture note §8–§9)", () => {
|
||||
it("declares the 12 states from §8.1", () => {
|
||||
expect(TRANSACTION_STATES).toEqual([
|
||||
"DRAFT",
|
||||
"INITIATED",
|
||||
"PRECONDITIONS_PENDING",
|
||||
"READY_FOR_PREPARE",
|
||||
"PREPARED",
|
||||
"EXECUTING",
|
||||
"PARTIALLY_EXECUTED",
|
||||
"VALIDATING",
|
||||
"COMMITTED",
|
||||
"ABORTED",
|
||||
"UNWIND_PENDING",
|
||||
"CLOSED",
|
||||
]);
|
||||
});
|
||||
|
||||
describe("§9.1 permitted high-level transitions", () => {
|
||||
// Each of these is listed in the note; canTransition must accept them.
|
||||
const legal: Array<[string, string]> = [
|
||||
["DRAFT", "INITIATED"],
|
||||
["INITIATED", "PRECONDITIONS_PENDING"],
|
||||
["PRECONDITIONS_PENDING", "READY_FOR_PREPARE"],
|
||||
["READY_FOR_PREPARE", "PREPARED"],
|
||||
["PREPARED", "EXECUTING"],
|
||||
["EXECUTING", "PARTIALLY_EXECUTED"],
|
||||
["EXECUTING", "VALIDATING"],
|
||||
["PARTIALLY_EXECUTED", "VALIDATING"],
|
||||
["VALIDATING", "COMMITTED"],
|
||||
["VALIDATING", "ABORTED"],
|
||||
["ABORTED", "UNWIND_PENDING"],
|
||||
["COMMITTED", "CLOSED"],
|
||||
["UNWIND_PENDING", "CLOSED"],
|
||||
];
|
||||
it.each(legal)("allows %s -> %s", (from, to) => {
|
||||
expect(canTransition(from as any, to as any)).toBe(true);
|
||||
});
|
||||
|
||||
// A few illegal edges — explicitly not in §9.1.
|
||||
const illegal: Array<[string, string]> = [
|
||||
["DRAFT", "COMMITTED"],
|
||||
["INITIATED", "EXECUTING"],
|
||||
["CLOSED", "INITIATED"],
|
||||
["PREPARED", "COMMITTED"],
|
||||
["COMMITTED", "ABORTED"],
|
||||
["ABORTED", "COMMITTED"],
|
||||
];
|
||||
it.each(illegal)("rejects %s -> %s", (from, to) => {
|
||||
expect(canTransition(from as any, to as any)).toBe(false);
|
||||
});
|
||||
});
|
||||
|
||||
it("CLOSED is a terminal state", () => {
|
||||
expect(ALLOWED_TRANSITIONS.CLOSED).toEqual([]);
|
||||
});
|
||||
|
||||
describe("segregation-of-duties checkpoints (§13)", () => {
|
||||
it("flags the four SoD-gated transitions", () => {
|
||||
expect([...SOD_REQUIRED_TRANSITIONS].sort()).toEqual(
|
||||
[
|
||||
"ABORTED->UNWIND_PENDING",
|
||||
"PREPARED->EXECUTING",
|
||||
"READY_FOR_PREPARE->PREPARED",
|
||||
"VALIDATING->COMMITTED",
|
||||
].sort(),
|
||||
);
|
||||
});
|
||||
|
||||
it("assigns a role to every SoD-gated transition", () => {
|
||||
for (const key of SOD_REQUIRED_TRANSITIONS) {
|
||||
expect(ROLE_FOR_TRANSITION[key]).toBeDefined();
|
||||
}
|
||||
});
|
||||
});
|
||||
});
|
||||
Reference in New Issue
Block a user