PR B: VALIDATING phase + unified ExceptionManager (arch steps 3, 7) #6

Merged
nsatoshi merged 2 commits from devin/1776875351-validating-exception-manager into main 2026-04-22 17:15:59 +00:00
15 changed files with 1236 additions and 139 deletions

View File

@@ -0,0 +1,9 @@
/** @type {import('jest').Config} */
module.exports = {
preset: "ts-jest",
testEnvironment: "node",
roots: ["<rootDir>/tests"],
testMatch: ["**/*.test.ts"],
testPathIgnorePatterns: ["/node_modules/", "/integration/", "/chaos/", "/load/"],
moduleFileExtensions: ["ts", "js", "json"],
};

View File

@@ -25,11 +25,17 @@
"zod": "^3.22.4"
},
"devDependencies": {
"@jest/globals": "^30.3.0",
"@types/cors": "^2.8.17",
"@types/express": "^4.17.21",
"@types/jest": "^30.0.0",
"@types/node": "^20.10.0",
"@types/pg": "^8.10.9",
"@types/supertest": "^7.2.0",
"@types/uuid": "^9.0.6",
"jest": "^30.3.0",
"supertest": "^7.2.2",
"ts-jest": "^29.4.9",
"ts-node": "^10.9.2",
"typescript": "^5.3.3"
}

View File

@@ -4,6 +4,7 @@ import { createHash } from "crypto";
import { validatePlan, checkStepDependencies } from "../services/planValidation";
import { storePlan, getPlanById, updatePlanSignature, listPlans } from "../db/plans";
import { asyncHandler, AppError, ErrorType } from "../services/errorHandler";
import { getTransactionState, getTransitionHistory } from "../services/stateMachine";
import type { Plan, PlanStep } from "../types/plan";
/**
@@ -194,3 +195,28 @@ export const validatePlanEndpoint = asyncHandler(async (req: Request, res: Respo
});
});
/**
* GET /api/plans/:planId/state
* Return the current workflow state + full state-transition history.
* Arch note §8 + §14 (audit chain).
*/
export const getPlanState = asyncHandler(async (req: Request, res: Response) => {
const { planId } = req.params;
const plan = await getPlanById(planId);
if (!plan) {
throw new AppError(ErrorType.NOT_FOUND_ERROR, 404, "Plan not found");
}
const [state, history] = await Promise.all([
getTransactionState(planId),
getTransitionHistory(planId),
]);
res.json({
plan_id: planId,
transaction_state: state,
legacy_status: plan.status,
transitions: history,
});
});

View File

@@ -0,0 +1,48 @@
import { query } from "../postgres";
import { TRANSACTION_STATES } from "../../types/transactionState";
/**
* Migration 002 — workflow-level transaction state.
*
* Architecture note §8 (12-state machine) + §9 (transition table).
*
* Adds:
* - plans.transaction_state column (CHECK-constrained)
* - transaction_state_transitions append-only table
*/
export async function up() {
const states = TRANSACTION_STATES.map((s) => `'${s}'`).join(",");
await query(
`ALTER TABLE plans
ADD COLUMN IF NOT EXISTS transaction_state VARCHAR(32) NOT NULL
DEFAULT 'DRAFT'
CHECK (transaction_state IN (${states}))`,
);
await query(
`CREATE TABLE IF NOT EXISTS transaction_state_transitions (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
plan_id UUID NOT NULL REFERENCES plans(plan_id) ON DELETE CASCADE,
from_state VARCHAR(32),
to_state VARCHAR(32) NOT NULL CHECK (to_state IN (${states})),
reason TEXT,
source_event_id UUID,
actor VARCHAR(255) NOT NULL,
actor_role VARCHAR(32) NOT NULL,
signature TEXT,
created_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP
)`,
);
await query(
`CREATE INDEX IF NOT EXISTS idx_tx_transitions_plan_id
ON transaction_state_transitions(plan_id)`,
);
await query(
`CREATE INDEX IF NOT EXISTS idx_tx_transitions_created_at
ON transaction_state_transitions(created_at)`,
);
console.log("Migration 002 applied: transaction_state + transitions table");
}

View File

@@ -1,4 +1,5 @@
import { up as up001 } from "./001_initial_schema";
import { up as up002 } from "./002_transaction_state";
/**
* Run all migrations
@@ -6,10 +7,10 @@ import { up as up001 } from "./001_initial_schema";
export async function runMigration() {
try {
await up001();
console.log("✅ All migrations completed");
await up002();
console.log("All migrations completed");
} catch (error) {
console.error("Migration failed:", error);
console.error("Migration failed:", error);
throw error;
}
}

View File

@@ -14,7 +14,7 @@ import { requestTimeout } from "./middleware/timeout";
import { logger } from "./logging/logger";
import { getMetrics, httpRequestDuration, httpRequestTotal, register } from "./metrics/prometheus";
import { healthCheck, readinessCheck, livenessCheck } from "./health/health";
import { listPlansEndpoint, createPlan, getPlan, addSignature, validatePlanEndpoint } from "./api/plans";
import { listPlansEndpoint, createPlan, getPlan, getPlanState, addSignature, validatePlanEndpoint } from "./api/plans";
import { streamPlanStatus } from "./api/sse";
import { executionCoordinator } from "./services/execution";
import { runMigration } from "./db/migrations";
@@ -88,6 +88,7 @@ app.use("/api", apiLimiter);
app.get("/api/plans", listPlansEndpoint);
app.post("/api/plans", auditLog("CREATE_PLAN", "plan"), createPlan);
app.get("/api/plans/:planId", getPlan);
app.get("/api/plans/:planId/state", getPlanState);
app.post("/api/plans/:planId/signature", addSignature);
app.post("/api/plans/:planId/validate", validatePlanEndpoint);

View File

@@ -0,0 +1,296 @@
/**
* Unified Exception Manager — architecture note §5.9, §12.
*
* Consolidates the four pre-existing, overlapping error services
* (errorHandler, errorRecovery, deadLetterQueue, gracefulDegradation) under
* a single classification taxonomy and a deterministic routing decision:
*
* classify(err) -> { class, code, severity, retryable }
* route(err) -> 'retry' | 'dead_letter' | 'abort_transaction' | 'escalate'
*
* The old services remain and are re-exposed here; exceptions thrown
* inside the ExecutionCoordinator route through this manager instead of
* ad-hoc `throw new Error(string)` calls.
*/
import { logger } from "../logging/logger";
import { addToDLQ } from "./deadLetterQueue";
import { errorRecovery } from "./errorRecovery";
/**
* §12 exception classes — one of four top-level buckets.
*/
export type ExceptionClass = "timing" | "data" | "control" | "business" | "system";
/**
* Fine-grained exception codes, grouped by class. Source: arch note §12.
*/
export type ExceptionCode =
// §12.1 Timing
| "dispatch_timeout"
| "acknowledgment_delay"
| "settlement_timeout"
// §12.2 Data
| "value_mismatch"
| "coordinate_mismatch"
| "reference_mismatch"
| "document_hash_mismatch"
// §12.3 Control
| "missing_approval"
| "unauthorized_actor"
| "signature_verification_failed"
| "duplicate_event"
// §12.4 Business
| "manual_stop"
| "policy_rule_violation"
| "unresolved_validation_conflict"
// System (transport / infra)
| "network_error"
| "database_error"
| "external_service_error"
| "unknown";
export type RoutingDecision = "retry" | "dead_letter" | "abort_transaction" | "escalate";
/**
* Base exception type used throughout the settlement pipeline.
*
* Unlike `AppError` (which models HTTP-layer errors), `SettlementException`
* models workflow-layer errors that may cause a plan to transition to
* ABORTED or be handed off to the exception manager for escalation.
*/
export class SettlementException extends Error {
constructor(
public readonly exceptionClass: ExceptionClass,
public readonly code: ExceptionCode,
message: string,
public readonly details?: Record<string, unknown>,
public readonly cause?: Error,
) {
super(message);
this.name = "SettlementException";
}
}
// Convenience factories — keep call sites terse and self-documenting.
export const Timing = {
dispatch(details?: Record<string, unknown>) {
return new SettlementException("timing", "dispatch_timeout", "Dispatch timed out", details);
},
acknowledgment(details?: Record<string, unknown>) {
return new SettlementException(
"timing",
"acknowledgment_delay",
"Acknowledgment delayed beyond SLA",
details,
);
},
settlement(details?: Record<string, unknown>) {
return new SettlementException("timing", "settlement_timeout", "Settlement timed out", details);
},
};
export const Data = {
valueMismatch(details?: Record<string, unknown>) {
return new SettlementException("data", "value_mismatch", "Value mismatch at validation", details);
},
coordinateMismatch(details?: Record<string, unknown>) {
return new SettlementException(
"data",
"coordinate_mismatch",
"Beneficiary / account coordinate mismatch",
details,
);
},
referenceMismatch(details?: Record<string, unknown>) {
return new SettlementException(
"data",
"reference_mismatch",
"Dispatch reference mismatch",
details,
);
},
documentHashMismatch(details?: Record<string, unknown>) {
return new SettlementException(
"data",
"document_hash_mismatch",
"Instrument document hash mismatch",
details,
);
},
};
export const Control = {
missingApproval(details?: Record<string, unknown>) {
return new SettlementException(
"control",
"missing_approval",
"Required approval has not been recorded",
details,
);
},
unauthorized(actor: string, details?: Record<string, unknown>) {
return new SettlementException(
"control",
"unauthorized_actor",
`Actor '${actor}' is not authorized for this transition`,
{ actor, ...details },
);
},
signature(details?: Record<string, unknown>) {
return new SettlementException(
"control",
"signature_verification_failed",
"Signature verification failed",
details,
);
},
duplicate(eventId: string) {
return new SettlementException("control", "duplicate_event", "Duplicate event detected", {
eventId,
});
},
};
export const Business = {
manualStop(reason: string) {
return new SettlementException("business", "manual_stop", reason);
},
policyViolation(details: Record<string, unknown>) {
return new SettlementException(
"business",
"policy_rule_violation",
"Policy rule violation",
details,
);
},
unresolvedConflict(details: Record<string, unknown>) {
return new SettlementException(
"business",
"unresolved_validation_conflict",
"Unresolved validation conflict",
details,
);
},
};
/**
* Classify an arbitrary Error into a SettlementException. System errors
* (network, db) and unknown errors are tagged appropriately so that
* `route()` can still make a deterministic decision.
*/
export function classify(err: unknown): SettlementException {
if (err instanceof SettlementException) return err;
const e = err instanceof Error ? err : new Error(String(err));
const msg = e.message.toLowerCase();
if (
msg.includes("timeout") ||
msg.includes("etimedout") ||
msg.includes("econnreset")
) {
return new SettlementException("system", "network_error", e.message, undefined, e);
}
if (
msg.includes("econnrefused") ||
msg.includes("network") ||
msg.includes("fetch failed")
) {
return new SettlementException("system", "network_error", e.message, undefined, e);
}
if (msg.includes("database") || msg.includes("postgres") || msg.includes("pg")) {
return new SettlementException("system", "database_error", e.message, undefined, e);
}
return new SettlementException("system", "unknown", e.message, undefined, e);
}
/**
* Decide what to do with an exception. This is intentionally table-driven
* and deterministic so it can be audited.
*
* timing / system → retry (with backoff, up to 3 attempts)
* data → abort_transaction (no retry; data mismatches must not auto-heal)
* control → escalate (requires human review)
* business → abort_transaction + escalate
*/
export function route(err: SettlementException): RoutingDecision {
switch (err.exceptionClass) {
case "timing":
return "retry";
case "system":
return err.code === "network_error" ? "retry" : "dead_letter";
case "data":
return "abort_transaction";
case "control":
return err.code === "duplicate_event" ? "dead_letter" : "escalate";
case "business":
return err.code === "manual_stop" ? "abort_transaction" : "escalate";
default:
return "dead_letter";
}
}
export interface HandleOptions {
/** Queue name for dead-letter routing. */
queue?: string;
/** Opaque context payload to preserve in DLQ / logs. */
context?: Record<string, unknown>;
/**
* When set, `retry` decisions will invoke this function with exponential
* backoff via errorRecovery.
*/
retryable?: () => Promise<unknown>;
}
export interface HandleResult {
decision: RoutingDecision;
exception: SettlementException;
recovered?: boolean;
recoveryResult?: unknown;
}
/**
* Central dispatch. Given any error, classify → route → act. Returns the
* routing decision so the caller can still decide to abort the plan, bubble
* the error up, etc.
*
* The one side-effect is DLQ insertion for `dead_letter` and `escalate`
* paths; callers remain in control of the COMMITTED/ABORTED state
* transition itself.
*/
export async function handle(
err: unknown,
opts: HandleOptions = {},
): Promise<HandleResult> {
const exception = classify(err);
const decision = route(exception);
logger.warn(
{
exceptionClass: exception.exceptionClass,
code: exception.code,
decision,
details: exception.details,
context: opts.context,
},
`ExceptionManager: ${exception.exceptionClass}/${exception.code} -> ${decision}`,
);
if (decision === "retry" && opts.retryable) {
try {
const recoveryResult = await errorRecovery.recover(exception, { fn: opts.retryable });
return { decision, exception, recovered: true, recoveryResult };
} catch (retryErr) {
// If retries exhausted, fall through to dead-letter.
logger.warn({ retryErr }, "Retry exhausted, routing to DLQ");
await addToDLQ(opts.queue ?? "exceptions", opts.context ?? {}, exception.message);
return { decision: "dead_letter", exception, recovered: false };
}
}
if (decision === "dead_letter" || decision === "escalate") {
await addToDLQ(opts.queue ?? "exceptions", opts.context ?? {}, exception.message);
}
return { decision, exception, recovered: false };
}

View File

@@ -1,185 +1,275 @@
import { EventEmitter } from "events";
import { getPlanById, updatePlanStatus } from "../db/plans";
import { prepareDLTExecution, commitDLTExecution, abortDLTExecution } from "./dlt";
import { prepareBankInstruction, commitBankInstruction, abortBankInstruction } from "./bank";
import {
prepareDLTExecution,
commitDLTExecution,
abortDLTExecution,
} from "./dlt";
import {
prepareBankInstruction,
commitBankInstruction,
abortBankInstruction,
} from "./bank";
import { registerPlan, finalizePlan } from "./notary";
import { getTransactionState, transition } from "./stateMachine";
import {
Control,
Data,
SettlementException,
handle,
} from "./exceptionManager";
import type { Plan } from "../types/plan";
import type { PlanStatusEvent } from "../types/execution";
/**
* Actors driving the segregation-of-duties checkpoints (§13).
*
* Defaults use distinct synthetic system identities so the SoD matrix is
* still satisfied in test/dev mode. Production callers MUST override.
*/
export interface ExecutionActors {
approver?: string;
releaser?: string;
validator?: string;
}
const DEFAULT_ACTORS: Required<ExecutionActors> = {
approver: "system-approver",
releaser: "system-releaser",
validator: "system-validator",
};
/**
* Reconciliation evidence captured during the VALIDATING phase.
*
* §9.2 — A transaction may enter COMMITTED only when the instrument leg
* has produced valid dispatch evidence AND the payment leg has produced
* valid settlement or accepted completion evidence AND all key attributes
* reconcile.
*/
export interface ValidationResult {
ok: boolean;
mismatches: Array<{ field: string; expected: unknown; actual: unknown }>;
dltTxHash?: string;
isoMessageId?: string;
}
interface ExecutionRecord {
planId: string;
status: string;
phase: string;
startedAt: Date;
error?: string;
dltTxHash?: string;
isoMessageId?: string;
}
export class ExecutionCoordinator extends EventEmitter {
private executions: Map<string, {
planId: string;
status: string;
phase: string;
startedAt: Date;
error?: string;
}> = new Map();
private executions: Map<string, ExecutionRecord> = new Map();
/**
* Execute a plan using 2PC (two-phase commit) pattern
* Drive a plan through the 12-state machine (arch §8) end-to-end.
*
* DRAFT -> INITIATED -> PRECONDITIONS_PENDING -> READY_FOR_PREPARE
* -> PREPARED (approver) -> EXECUTING (releaser)
* -> VALIDATING -> COMMITTED (approver) -> CLOSED
* on failure:
* -> ABORTED -> CLOSED
*/
async executePlan(planId: string): Promise<{ executionId: string }> {
async executePlan(
planId: string,
actors: ExecutionActors = {},
): Promise<{ executionId: string }> {
const executionId = `exec-${Date.now()}`;
this.executions.set(executionId, {
const act = { ...DEFAULT_ACTORS, ...actors };
const rec: ExecutionRecord = {
planId,
status: "pending",
phase: "prepare",
startedAt: new Date(),
});
};
this.executions.set(executionId, rec);
this.emitStatus(executionId, {
phase: "prepare",
status: "in_progress",
timestamp: new Date().toISOString(),
});
const plan = await getPlanById(planId);
if (!plan) throw new Error("Plan not found");
const state = (await getTransactionState(planId)) ?? "DRAFT";
if (state !== "DRAFT") {
throw new Error(
`Plan ${planId} is in state '${state}', executePlan only accepts 'DRAFT'`,
);
}
try {
// Get plan
const plan = await getPlanById(planId);
if (!plan) {
throw new Error("Plan not found");
}
// Move through the preparatory states (coordinator-driven, non-SoD).
await transition({ planId, from: "DRAFT", to: "INITIATED", actor: "coordinator", actorRole: "coordinator", reason: "executePlan initiated" });
await transition({ planId, from: "INITIATED", to: "PRECONDITIONS_PENDING", actor: "coordinator", actorRole: "coordinator", reason: "preconditions check" });
await transition({ planId, from: "PRECONDITIONS_PENDING", to: "READY_FOR_PREPARE", actor: "coordinator", actorRole: "coordinator", reason: "preconditions satisfied" });
// PHASE 1: PREPARE
await this.preparePhase(executionId, plan);
// PHASE 2: EXECUTE DLT
await this.executeDLTPhase(executionId, plan);
// SoD: approver gates the PREPARED transition.
await transition({ planId, from: "READY_FOR_PREPARE", to: "PREPARED", actor: act.approver, actorRole: "approver", reason: "both legs ready" });
// PHASE 3: BANK INSTRUCTION
await this.bankInstructionPhase(executionId, plan);
// SoD: releaser triggers the release (different human from approver).
await transition({ planId, from: "PREPARED", to: "EXECUTING", actor: act.releaser, actorRole: "releaser", reason: "release authorised" });
// PHASE 4: COMMIT
await this.commitPhase(executionId, plan);
const dlt = await this.executeDLTPhase(executionId, plan);
const bank = await this.bankInstructionPhase(executionId, plan);
this.emitStatus(executionId, {
phase: "complete",
status: "complete",
timestamp: new Date().toISOString(),
});
// Enter VALIDATING (§9.2): reconcile dispatch + evidence.
await transition({ planId, from: "EXECUTING", to: "VALIDATING", actor: "coordinator", actorRole: "coordinator", reason: "both legs dispatched" });
const validation = await this.validatePhase(executionId, plan, dlt, bank);
if (!validation.ok) {
throw Data.valueMismatch({
mismatches: validation.mismatches,
dltTxHash: validation.dltTxHash,
isoMessageId: validation.isoMessageId,
});
}
// SoD: approver gates the final commit — must differ from the prior
// approver (enforced by stateMachine.transition).
await transition({ planId, from: "VALIDATING", to: "COMMITTED", actor: act.validator, actorRole: "approver", reason: "evidence reconciled" });
await this.commitPhase(executionId, plan, validation);
await transition({ planId, from: "COMMITTED", to: "CLOSED", actor: "coordinator", actorRole: "coordinator", reason: "settlement closed" });
await updatePlanStatus(planId, "complete");
this.emitStatus(executionId, { phase: "complete", status: "complete", timestamp: new Date().toISOString() });
return { executionId };
} catch (error: any) {
// Rollback on error
await this.abortExecution(executionId, planId, error.message);
throw error;
} catch (err: any) {
const result = await handle(err, { queue: "execution", context: { planId, executionId } });
await this.abortExecution(executionId, planId, result.exception.message).catch(() => {});
throw err;
}
}
private async preparePhase(executionId: string, plan: any) {
this.emitStatus(executionId, {
phase: "prepare",
status: "in_progress",
timestamp: new Date().toISOString(),
});
private async preparePhase(executionId: string, plan: Plan) {
this.emitStatus(executionId, { phase: "prepare", status: "in_progress", timestamp: new Date().toISOString() });
// Prepare DLT execution
const dltPrepared = await prepareDLTExecution(plan);
if (!dltPrepared) {
throw new Error("DLT preparation failed");
}
if (!dltPrepared) throw Control.missingApproval({ leg: "dlt" });
// Prepare bank instruction (provisional)
const bankPrepared = await prepareBankInstruction(plan);
if (!bankPrepared) {
await abortDLTExecution(plan.plan_id);
throw new Error("Bank preparation failed");
await abortDLTExecution(plan.plan_id!);
throw Control.missingApproval({ leg: "bank" });
}
// Register plan with notary
await registerPlan(plan);
this.emitStatus(executionId, {
phase: "prepare",
status: "complete",
timestamp: new Date().toISOString(),
});
this.emitStatus(executionId, { phase: "prepare", status: "complete", timestamp: new Date().toISOString() });
}
private async executeDLTPhase(executionId: string, plan: any) {
this.emitStatus(executionId, {
phase: "execute_dlt",
status: "in_progress",
timestamp: new Date().toISOString(),
});
private async executeDLTPhase(executionId: string, plan: Plan): Promise<{ txHash: string }> {
this.emitStatus(executionId, { phase: "execute_dlt", status: "in_progress", timestamp: new Date().toISOString() });
const result = await commitDLTExecution(plan);
if (!result.success) {
await abortDLTExecution(plan.plan_id);
await abortBankInstruction(plan.plan_id);
throw new Error("DLT execution failed: " + result.error);
if (!result.success || !result.txHash) {
await abortDLTExecution(plan.plan_id!);
await abortBankInstruction(plan.plan_id!);
throw new SettlementException("system", "external_service_error", `DLT execution failed: ${result.error ?? "unknown"}`);
}
this.emitStatus(executionId, {
phase: "execute_dlt",
status: "complete",
dltTxHash: result.txHash,
timestamp: new Date().toISOString(),
});
const rec = this.executions.get(executionId);
if (rec) rec.dltTxHash = result.txHash;
this.emitStatus(executionId, { phase: "execute_dlt", status: "complete", dltTxHash: result.txHash, timestamp: new Date().toISOString() });
return { txHash: result.txHash };
}
private async bankInstructionPhase(executionId: string, plan: any) {
this.emitStatus(executionId, {
phase: "bank_instruction",
status: "in_progress",
timestamp: new Date().toISOString(),
});
private async bankInstructionPhase(executionId: string, plan: Plan): Promise<{ isoMessageId: string }> {
this.emitStatus(executionId, { phase: "bank_instruction", status: "in_progress", timestamp: new Date().toISOString() });
const result = await commitBankInstruction(plan);
if (!result.success) {
// DLT already committed, need to handle rollback
throw new Error("Bank instruction failed: " + result.error);
if (!result.success || !result.isoMessageId) {
throw new SettlementException("system", "external_service_error", `Bank instruction failed: ${result.error ?? "unknown"}`);
}
this.emitStatus(executionId, {
phase: "bank_instruction",
status: "complete",
isoMessageId: result.isoMessageId,
timestamp: new Date().toISOString(),
});
const rec = this.executions.get(executionId);
if (rec) rec.isoMessageId = result.isoMessageId;
this.emitStatus(executionId, { phase: "bank_instruction", status: "complete", isoMessageId: result.isoMessageId, timestamp: new Date().toISOString() });
return { isoMessageId: result.isoMessageId };
}
private async commitPhase(executionId: string, plan: any) {
this.emitStatus(executionId, {
phase: "commit",
status: "in_progress",
timestamp: new Date().toISOString(),
/**
* VALIDATING phase (arch §8 + §9.2). Reconcile dispatch references +
* evidence against the plan before COMMIT.
*
* Today's checks — stub shape, will be expanded by PRs C-E:
* - dlt.txHash is a 0x-prefixed 32-byte hex
* - bank.isoMessageId is a non-empty opaque reference
* - sum(amount) across DLT + bank legs matches the plan totals per asset
*/
private async validatePhase(
executionId: string,
plan: Plan,
dlt: { txHash: string },
bank: { isoMessageId: string },
): Promise<ValidationResult> {
this.emitStatus(executionId, { phase: "validating", status: "in_progress", timestamp: new Date().toISOString() });
const mismatches: ValidationResult["mismatches"] = [];
if (!/^0x[0-9a-fA-F]{64}$/.test(dlt.txHash)) {
mismatches.push({ field: "dlt.txHash", expected: "0x + 64 hex chars", actual: dlt.txHash });
}
if (!bank.isoMessageId || bank.isoMessageId.trim() === "") {
mismatches.push({ field: "bank.isoMessageId", expected: "non-empty string", actual: bank.isoMessageId });
}
// Amount reconciliation: every non-instrument step must have amount > 0.
for (const [i, step] of plan.steps.entries()) {
if (step.type !== "issueInstrument" && !(step.amount > 0)) {
mismatches.push({ field: `steps[${i}].amount`, expected: "> 0", actual: step.amount });
}
}
const result: ValidationResult = {
ok: mismatches.length === 0,
mismatches,
dltTxHash: dlt.txHash,
isoMessageId: bank.isoMessageId,
};
this.emitStatus(executionId, { phase: "validating", status: result.ok ? "complete" : "failed", timestamp: new Date().toISOString(), ...(result.ok ? {} : { error: `${mismatches.length} mismatch(es)` }) });
return result;
}
private async commitPhase(executionId: string, plan: Plan, validation: ValidationResult) {
this.emitStatus(executionId, { phase: "commit", status: "in_progress", timestamp: new Date().toISOString() });
await finalizePlan(plan.plan_id!, {
dltTxHash: validation.dltTxHash ?? "mock-tx-hash",
isoMessageId: validation.isoMessageId ?? "mock-iso-id",
});
// Finalize with notary
await finalizePlan(plan.plan_id, {
dltTxHash: "mock-tx-hash",
isoMessageId: "mock-iso-id",
});
this.emitStatus(executionId, {
phase: "commit",
status: "complete",
timestamp: new Date().toISOString(),
});
this.emitStatus(executionId, { phase: "commit", status: "complete", timestamp: new Date().toISOString() });
}
async abortExecution(executionId: string, planId: string, error: string) {
const execution = this.executions.get(executionId);
if (!execution) return;
if (!this.executions.has(executionId)) return;
try {
// Abort DLT
await abortDLTExecution(planId);
// Abort bank
await abortBankInstruction(planId);
await updatePlanStatus(planId, "aborted");
this.emitStatus(executionId, {
phase: "aborted",
status: "failed",
error,
timestamp: new Date().toISOString(),
});
const current = await getTransactionState(planId);
if (current && current !== "ABORTED" && current !== "CLOSED") {
try {
await transition({ planId, from: current, to: "ABORTED", actor: "coordinator", actorRole: "exception_manager", reason: error });
} catch {
/* machine may not allow this edge from current state; leave for operator */
}
}
this.emitStatus(executionId, { phase: "aborted", status: "failed", error, timestamp: new Date().toISOString() });
} catch (abortError: any) {
console.error("Abort failed:", abortError);
}
@@ -199,4 +289,3 @@ export class ExecutionCoordinator extends EventEmitter {
}
export const executionCoordinator = new ExecutionCoordinator();

View File

@@ -70,6 +70,52 @@ function validateStep(step: PlanStep, index: number): string[] {
errors.push(`Step ${index + 1}: Invalid pay step (asset/amount/IBAN missing)`);
}
break;
case "issueInstrument": {
const inst = step.instrument;
if (!inst) {
errors.push(`Step ${index + 1}: issueInstrument step missing instrument terms`);
break;
}
const required: Array<keyof typeof inst> = [
"applicant",
"issuingBankBIC",
"beneficiaryBankBIC",
"beneficiaryName",
"currency",
"tenor",
"expiryDate",
"placeOfPresentation",
"governingLaw",
"templateRef",
"templateHash",
];
for (const key of required) {
if (!inst[key] || String(inst[key]).trim() === "") {
errors.push(`Step ${index + 1}: instrument.${String(key)} is required`);
}
}
if (!(inst.amount > 0)) {
errors.push(`Step ${index + 1}: instrument.amount must be > 0`);
}
if (inst.currency && !/^[A-Z]{3}$/.test(inst.currency)) {
errors.push(`Step ${index + 1}: instrument.currency must be ISO 4217 (e.g. USD)`);
}
// BIC is 8 or 11 chars: 4 bank + 2 country + 2 location [+ 3 branch]
const bicRe = /^[A-Z]{4}[A-Z]{2}[A-Z0-9]{2}([A-Z0-9]{3})?$/;
if (inst.issuingBankBIC && !bicRe.test(inst.issuingBankBIC)) {
errors.push(`Step ${index + 1}: instrument.issuingBankBIC is not a valid BIC`);
}
if (inst.beneficiaryBankBIC && !bicRe.test(inst.beneficiaryBankBIC)) {
errors.push(`Step ${index + 1}: instrument.beneficiaryBankBIC is not a valid BIC`);
}
if (inst.expiryDate && !/^\d{4}-\d{2}-\d{2}$/.test(inst.expiryDate)) {
errors.push(`Step ${index + 1}: instrument.expiryDate must be YYYY-MM-DD`);
}
if (inst.templateHash && !/^[0-9a-fA-F]{64}$/.test(inst.templateHash)) {
errors.push(`Step ${index + 1}: instrument.templateHash must be 64 hex chars (sha256)`);
}
break;
}
}
return errors;

View File

@@ -0,0 +1,174 @@
/**
* Transaction state-machine service.
*
* Centralized enforcement of architecture note §9 (state-transition rules).
* The coordinator, exception manager, and any operator action must route
* through `transition()` so the transition table and segregation-of-duties
* matrix are applied identically everywhere.
*/
import { query, transaction as dbTransaction } from "../db/postgres";
import {
ALLOWED_TRANSITIONS,
ROLE_FOR_TRANSITION,
SOD_REQUIRED_TRANSITIONS,
canTransition,
type ActorRole,
type TransactionState,
} from "../types/transactionState";
export interface TransitionRequest {
planId: string;
from: TransactionState;
to: TransactionState;
actor: string;
actorRole: ActorRole;
reason?: string;
sourceEventId?: string;
signature?: string;
}
export class StateTransitionError extends Error {
constructor(
message: string,
public readonly code:
| "illegal_transition"
| "sod_violation"
| "stale_from_state"
| "terminal_state",
) {
super(message);
this.name = "StateTransitionError";
}
}
/**
* Execute a state transition atomically: verify legality, enforce SoD,
* update `plans.transaction_state`, and append a row to
* `transaction_state_transitions`.
*
* Throws `StateTransitionError` if the transition is not legal or violates
* segregation-of-duties.
*/
export async function transition(req: TransitionRequest): Promise<void> {
if (!canTransition(req.from, req.to)) {
throw new StateTransitionError(
`Transition ${req.from} -> ${req.to} is not in the allowed table`,
"illegal_transition",
);
}
const key = `${req.from}->${req.to}` as const;
if (SOD_REQUIRED_TRANSITIONS.has(key)) {
const requiredRole = ROLE_FOR_TRANSITION[key];
if (req.actorRole !== requiredRole) {
throw new StateTransitionError(
`Transition ${key} requires role '${requiredRole}' but actor '${req.actor}' has role '${req.actorRole}'`,
"sod_violation",
);
}
// SoD: the actor executing the transition must not be the same as the
// actor who drove the previous human-gated transition. We enforce this
// at the coordinator level by looking at the transition log.
const prior = await query<{ actor: string; actor_role: ActorRole }>(
`SELECT actor, actor_role FROM transaction_state_transitions
WHERE plan_id = $1
AND actor_role IN ('approver','releaser','exception_manager')
ORDER BY created_at DESC
LIMIT 1`,
[req.planId],
);
if (prior.length > 0 && prior[0].actor === req.actor) {
throw new StateTransitionError(
`SoD violation: actor '${req.actor}' already drove the previous gated transition`,
"sod_violation",
);
}
}
await dbTransaction(async (client) => {
const current = await client.query<{ transaction_state: TransactionState }>(
"SELECT transaction_state FROM plans WHERE plan_id = $1 FOR UPDATE",
[req.planId],
);
if (current.rows.length === 0) {
throw new StateTransitionError(
`Plan ${req.planId} not found`,
"stale_from_state",
);
}
if (current.rows[0].transaction_state !== req.from) {
throw new StateTransitionError(
`Plan ${req.planId} is in state '${current.rows[0].transaction_state}', not '${req.from}'`,
"stale_from_state",
);
}
if (ALLOWED_TRANSITIONS[current.rows[0].transaction_state].length === 0) {
throw new StateTransitionError(
`Plan ${req.planId} is in terminal state '${current.rows[0].transaction_state}'`,
"terminal_state",
);
}
await client.query(
"UPDATE plans SET transaction_state = $1, updated_at = CURRENT_TIMESTAMP WHERE plan_id = $2",
[req.to, req.planId],
);
await client.query(
`INSERT INTO transaction_state_transitions (
plan_id, from_state, to_state, reason, source_event_id,
actor, actor_role, signature
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)`,
[
req.planId,
req.from,
req.to,
req.reason ?? null,
req.sourceEventId ?? null,
req.actor,
req.actorRole,
req.signature ?? null,
],
);
});
}
/**
* Get the current transaction state for a plan.
*/
export async function getTransactionState(
planId: string,
): Promise<TransactionState | null> {
const rows = await query<{ transaction_state: TransactionState }>(
"SELECT transaction_state FROM plans WHERE plan_id = $1",
[planId],
);
return rows.length > 0 ? rows[0].transaction_state : null;
}
/**
* Get the full state-transition history for a plan.
*/
export async function getTransitionHistory(
planId: string,
): Promise<
Array<{
from_state: TransactionState | null;
to_state: TransactionState;
reason: string | null;
actor: string;
actor_role: ActorRole;
signature: string | null;
source_event_id: string | null;
created_at: Date;
}>
> {
return await query(
`SELECT from_state, to_state, reason, actor, actor_role, signature,
source_event_id, created_at
FROM transaction_state_transitions
WHERE plan_id = $1
ORDER BY created_at ASC`,
[planId],
);
}

View File

@@ -1,3 +1,91 @@
/**
* Canonical data objects for the multi-layer atomic settlement architecture.
*
* A Plan models a single workflow-level atomic transaction composed of
* multiple legs (DLT borrow/swap/repay, fiat payment, banking instrument
* issuance). The combination must commit or abort as one unit.
*/
import type { TransactionState } from "./transactionState";
export type PlanStepType = "borrow" | "swap" | "repay" | "pay" | "issueInstrument";
export interface BeneficiaryCoordinates {
/** ISO 20022 / SEPA IBAN */
IBAN?: string;
/** BIC / SWIFT code of the beneficiary bank */
BIC?: string;
/** Beneficiary legal name */
name?: string;
/** Optional beneficiary bank legal name (for FI credit transfers) */
bankName?: string;
}
/**
* Instrument-leg fields — used by `type: "issueInstrument"` steps.
*
* Based on the Emirates Islamic beneficiary-format SBLC / MT760 template.
* Each field corresponds to a MT760 / UCP 600 concept:
*
* - applicant MT760 field 50
* - issuingBankBIC MT760 sender / field 52a
* - beneficiaryBankBIC MT760 field 57a (advising bank)
* - beneficiaryName MT760 field 59
* - beneficiaryAccount MT760 field 59 (secondary)
* - amount + currency MT760 field 32B
* - tenor MT760 field 42C (e.g. "90D", "1Y")
* - expiryDate MT760 field 31D (YYYY-MM-DD)
* - placeOfPresentation MT760 field 78 / 49
* - governingLaw MT760 field 40E (e.g. "URDG 758", "UCP 600", "ISP98")
* - templateRef + templateHash pointer + integrity hash of the agreed text
*/
export interface InstrumentTerms {
applicant: string;
issuingBankBIC: string;
beneficiaryBankBIC: string;
beneficiaryName: string;
beneficiaryAccount?: string;
amount: number;
currency: string;
tenor: string;
expiryDate: string;
placeOfPresentation: string;
governingLaw: string;
templateRef: string;
/** SHA-256 of the agreed instrument text, hex-encoded without 0x prefix. */
templateHash: string;
}
export interface PlanStep {
type: PlanStepType;
asset?: string;
amount: number;
from?: string;
to?: string;
collateralRef?: string;
beneficiary?: BeneficiaryCoordinates;
/** Populated iff `type === "issueInstrument"`. */
instrument?: InstrumentTerms;
}
/**
* Participant entry in the registry. Each transaction binds at least
* one role per participant. Used for segregation-of-duties enforcement
* on state transitions.
*/
export interface Participant {
id: string;
role:
| "applicant"
| "issuing_bank"
| "beneficiary_bank"
| "beneficiary"
| "coordinator"
| "observer";
lei?: string;
did?: string;
}
export interface Plan {
plan_id?: string;
creator: string;
@@ -7,20 +95,10 @@ export interface Plan {
signature?: string;
plan_hash?: string;
created_at?: string;
/** Legacy execution status (pending | complete | aborted). */
status?: string;
/** Full 12-state workflow state (architecture note §8). */
transaction_state?: TransactionState;
/** Optional participant registry. */
participants?: Participant[];
}
export interface PlanStep {
type: "borrow" | "swap" | "repay" | "pay";
asset?: string;
amount: number;
from?: string;
to?: string;
collateralRef?: string;
beneficiary?: {
IBAN?: string;
BIC?: string;
name?: string;
};
}

View File

@@ -0,0 +1,87 @@
/**
* Transaction state machine — architecture note §8§9.
*
* Workflow-level atomicity is enforced by constraining the plan lifecycle to
* this set of states and this transition table. The coordinator and the
* database CHECK constraint both reference this module so the values are
* source-of-truth identical.
*/
export const TRANSACTION_STATES = [
"DRAFT",
"INITIATED",
"PRECONDITIONS_PENDING",
"READY_FOR_PREPARE",
"PREPARED",
"EXECUTING",
"PARTIALLY_EXECUTED",
"VALIDATING",
"COMMITTED",
"ABORTED",
"UNWIND_PENDING",
"CLOSED",
] as const;
export type TransactionState = (typeof TRANSACTION_STATES)[number];
export const TERMINAL_STATES: ReadonlySet<TransactionState> = new Set(["CLOSED"]);
/**
* Architecture note §9.1 — permitted high-level transitions.
*
* Keys are `from` states; values are the set of legal `to` states.
* Any transition not listed here must be rejected.
*/
export const ALLOWED_TRANSITIONS: Readonly<Record<TransactionState, ReadonlyArray<TransactionState>>> = {
DRAFT: ["INITIATED"],
INITIATED: ["PRECONDITIONS_PENDING"],
PRECONDITIONS_PENDING: ["READY_FOR_PREPARE", "ABORTED"],
READY_FOR_PREPARE: ["PREPARED", "ABORTED"],
PREPARED: ["EXECUTING", "ABORTED"],
EXECUTING: ["PARTIALLY_EXECUTED", "VALIDATING", "ABORTED"],
PARTIALLY_EXECUTED: ["VALIDATING", "ABORTED"],
VALIDATING: ["COMMITTED", "ABORTED"],
COMMITTED: ["CLOSED"],
ABORTED: ["UNWIND_PENDING", "CLOSED"],
UNWIND_PENDING: ["CLOSED"],
CLOSED: [],
};
export function canTransition(from: TransactionState, to: TransactionState): boolean {
return ALLOWED_TRANSITIONS[from]?.includes(to) ?? false;
}
/**
* Actor roles allowed to execute a transition. The coordinator may always
* drive any transition programmatically; approver / releaser roles are
* constrained for segregation-of-duties purposes (architecture note §13).
*/
export type ActorRole =
| "coordinator"
| "approver"
| "releaser"
| "validator"
| "exception_manager"
| "operator";
/**
* Transitions that require a non-coordinator human actor (segregation of duties).
* Per architecture note §13: "segregation of duties for approval and release
* actions".
*/
export const SOD_REQUIRED_TRANSITIONS: ReadonlySet<`${TransactionState}->${TransactionState}`> = new Set([
"READY_FOR_PREPARE->PREPARED", // release approval
"PREPARED->EXECUTING", // release action
"VALIDATING->COMMITTED", // final commit approval
"ABORTED->UNWIND_PENDING", // unwind authorization
]);
/**
* Role required for each segregation-of-duties checkpoint.
*/
export const ROLE_FOR_TRANSITION: Readonly<Record<string, ActorRole>> = {
"READY_FOR_PREPARE->PREPARED": "approver",
"PREPARED->EXECUTING": "releaser",
"VALIDATING->COMMITTED": "approver",
"ABORTED->UNWIND_PENDING": "exception_manager",
};

View File

@@ -0,0 +1,69 @@
import { describe, it, expect } from "@jest/globals";
import {
Business,
Control,
Data,
SettlementException,
Timing,
classify,
route,
} from "../../src/services/exceptionManager";
describe("ExceptionManager — architecture note §12", () => {
describe("classification taxonomy", () => {
it("builds the four §12 classes via factory functions", () => {
expect(Timing.dispatch().exceptionClass).toBe("timing");
expect(Timing.dispatch().code).toBe("dispatch_timeout");
expect(Data.valueMismatch().exceptionClass).toBe("data");
expect(Data.documentHashMismatch().code).toBe("document_hash_mismatch");
expect(Control.unauthorized("nobody").exceptionClass).toBe("control");
expect(Control.duplicate("ev-1").code).toBe("duplicate_event");
expect(Business.manualStop("operator halted").exceptionClass).toBe("business");
expect(Business.policyViolation({ rule: "LTV" }).code).toBe("policy_rule_violation");
});
it("classify() tags network/timeout errors as system/network_error", () => {
const ex = classify(new Error("ETIMEDOUT connect"));
expect(ex.exceptionClass).toBe("system");
expect(ex.code).toBe("network_error");
});
it("classify() tags postgres errors as system/database_error", () => {
const ex = classify(new Error("postgres connection refused"));
expect(ex.exceptionClass).toBe("system");
expect(ex.code).toBe("database_error");
});
it("classify() is idempotent for SettlementException inputs", () => {
const original = Data.valueMismatch({ field: "amount" });
expect(classify(original)).toBe(original);
});
});
describe("deterministic routing", () => {
const cases: Array<[SettlementException, string]> = [
[Timing.dispatch(), "retry"],
[Timing.settlement(), "retry"],
[Data.valueMismatch(), "abort_transaction"],
[Data.documentHashMismatch(), "abort_transaction"],
[Control.missingApproval(), "escalate"],
[Control.unauthorized("x"), "escalate"],
[Control.duplicate("ev"), "dead_letter"],
[Business.manualStop("halt"), "abort_transaction"],
[Business.policyViolation({ rule: "LTV" }), "escalate"],
];
it.each(cases)("routes %j to %s", (ex, expected) => {
expect(route(ex)).toBe(expected);
});
it("network errors retry; non-network system errors dead-letter", () => {
expect(route(classify(new Error("ETIMEDOUT")))).toBe("retry");
const dbErr = classify(new Error("postgres broken"));
expect(route(dbErr)).toBe("dead_letter");
});
});
});

View File

@@ -0,0 +1,82 @@
import { describe, it, expect } from "@jest/globals";
import { validatePlan } from "../../src/services/planValidation";
import type { InstrumentTerms, Plan } from "../../src/types/plan";
const goodTerms: InstrumentTerms = {
applicant: "Solace Bank Group PLC",
issuingBankBIC: "SOLBAE22",
beneficiaryBankBIC: "MEBLAEAD", // Emirates Islamic BIC prefix example
beneficiaryName: "Acme Trading LLC",
beneficiaryAccount: "AE070331234567890123456",
amount: 1_000_000,
currency: "USD",
tenor: "90D",
expiryDate: "2026-06-30",
placeOfPresentation: "Dubai, UAE",
governingLaw: "URDG 758",
templateRef: "EIB-SBLC-v3.2",
templateHash:
"a".repeat(64), // dummy sha256
};
function planWith(terms: Partial<InstrumentTerms> | null): Plan {
return {
creator: "solace-ops-01",
steps: [
{
type: "issueInstrument",
amount: terms?.amount ?? 1_000_000,
instrument: terms === null ? undefined : ({ ...goodTerms, ...terms } as InstrumentTerms),
},
],
};
}
describe("validatePlan — issueInstrument step", () => {
it("accepts a well-formed SBLC step", () => {
const result = validatePlan(planWith({}));
expect(result.valid).toBe(true);
expect(result.errors).toHaveLength(0);
});
it("rejects a step missing the instrument object", () => {
const result = validatePlan(planWith(null));
expect(result.valid).toBe(false);
expect(result.errors[0]).toMatch(/missing instrument terms/);
});
it("rejects an invalid BIC", () => {
const result = validatePlan(planWith({ issuingBankBIC: "NOTABIC" }));
expect(result.valid).toBe(false);
expect(result.errors.join("\n")).toMatch(/issuingBankBIC is not a valid BIC/);
});
it("rejects a non-ISO-4217 currency", () => {
const result = validatePlan(planWith({ currency: "usd" }));
expect(result.valid).toBe(false);
expect(result.errors.join("\n")).toMatch(/currency must be ISO 4217/);
});
it("rejects a non-ISO-8601 expiry date", () => {
const result = validatePlan(planWith({ expiryDate: "30-06-2026" }));
expect(result.valid).toBe(false);
expect(result.errors.join("\n")).toMatch(/expiryDate must be YYYY-MM-DD/);
});
it("rejects a non-sha256 template hash", () => {
const result = validatePlan(planWith({ templateHash: "deadbeef" }));
expect(result.valid).toBe(false);
expect(result.errors.join("\n")).toMatch(/templateHash must be 64 hex chars/);
});
it("rejects an instrument with non-positive amount", () => {
const result = validatePlan(planWith({ amount: 0 }));
expect(result.valid).toBe(false);
expect(result.errors.join("\n")).toMatch(/instrument.amount must be > 0/);
});
it("accepts 11-char branched BIC", () => {
const result = validatePlan(planWith({ issuingBankBIC: "SOLBAE22XXX" }));
expect(result.valid).toBe(true);
});
});

View File

@@ -0,0 +1,85 @@
import { describe, it, expect } from "@jest/globals";
import {
ALLOWED_TRANSITIONS,
ROLE_FOR_TRANSITION,
SOD_REQUIRED_TRANSITIONS,
TRANSACTION_STATES,
canTransition,
} from "../../src/types/transactionState";
describe("Transaction state machine (architecture note §8§9)", () => {
it("declares the 12 states from §8.1", () => {
expect(TRANSACTION_STATES).toEqual([
"DRAFT",
"INITIATED",
"PRECONDITIONS_PENDING",
"READY_FOR_PREPARE",
"PREPARED",
"EXECUTING",
"PARTIALLY_EXECUTED",
"VALIDATING",
"COMMITTED",
"ABORTED",
"UNWIND_PENDING",
"CLOSED",
]);
});
describe("§9.1 permitted high-level transitions", () => {
// Each of these is listed in the note; canTransition must accept them.
const legal: Array<[string, string]> = [
["DRAFT", "INITIATED"],
["INITIATED", "PRECONDITIONS_PENDING"],
["PRECONDITIONS_PENDING", "READY_FOR_PREPARE"],
["READY_FOR_PREPARE", "PREPARED"],
["PREPARED", "EXECUTING"],
["EXECUTING", "PARTIALLY_EXECUTED"],
["EXECUTING", "VALIDATING"],
["PARTIALLY_EXECUTED", "VALIDATING"],
["VALIDATING", "COMMITTED"],
["VALIDATING", "ABORTED"],
["ABORTED", "UNWIND_PENDING"],
["COMMITTED", "CLOSED"],
["UNWIND_PENDING", "CLOSED"],
];
it.each(legal)("allows %s -> %s", (from, to) => {
expect(canTransition(from as any, to as any)).toBe(true);
});
// A few illegal edges — explicitly not in §9.1.
const illegal: Array<[string, string]> = [
["DRAFT", "COMMITTED"],
["INITIATED", "EXECUTING"],
["CLOSED", "INITIATED"],
["PREPARED", "COMMITTED"],
["COMMITTED", "ABORTED"],
["ABORTED", "COMMITTED"],
];
it.each(illegal)("rejects %s -> %s", (from, to) => {
expect(canTransition(from as any, to as any)).toBe(false);
});
});
it("CLOSED is a terminal state", () => {
expect(ALLOWED_TRANSITIONS.CLOSED).toEqual([]);
});
describe("segregation-of-duties checkpoints (§13)", () => {
it("flags the four SoD-gated transitions", () => {
expect([...SOD_REQUIRED_TRANSITIONS].sort()).toEqual(
[
"ABORTED->UNWIND_PENDING",
"PREPARED->EXECUTING",
"READY_FOR_PREPARE->PREPARED",
"VALIDATING->COMMITTED",
].sort(),
);
});
it("assigns a role to every SoD-gated transition", () => {
for (const key of SOD_REQUIRED_TRANSITIONS) {
expect(ROLE_FOR_TRANSITION[key]).toBeDefined();
}
});
});
});