import { EventEmitter } from "events"; import { getPlanById, updatePlanStatus } from "../db/plans"; import { prepareDLTExecution, commitDLTExecution, abortDLTExecution, } from "./dlt"; import { prepareBankInstruction, commitBankInstruction, abortBankInstruction, } from "./bank"; import { registerPlan, finalizePlan } from "./notary"; import { getTransactionState, transition } from "./stateMachine"; import { Control, Data, SettlementException, handle, } from "./exceptionManager"; import type { Plan } from "../types/plan"; import type { PlanStatusEvent } from "../types/execution"; /** * Actors driving the segregation-of-duties checkpoints (§13). * * Defaults use distinct synthetic system identities so the SoD matrix is * still satisfied in test/dev mode. Production callers MUST override. */ export interface ExecutionActors { approver?: string; releaser?: string; validator?: string; } const DEFAULT_ACTORS: Required = { approver: "system-approver", releaser: "system-releaser", validator: "system-validator", }; /** * Reconciliation evidence captured during the VALIDATING phase. * * §9.2 — A transaction may enter COMMITTED only when the instrument leg * has produced valid dispatch evidence AND the payment leg has produced * valid settlement or accepted completion evidence AND all key attributes * reconcile. */ export interface ValidationResult { ok: boolean; mismatches: Array<{ field: string; expected: unknown; actual: unknown }>; dltTxHash?: string; isoMessageId?: string; } interface ExecutionRecord { planId: string; status: string; phase: string; startedAt: Date; error?: string; dltTxHash?: string; isoMessageId?: string; } export class ExecutionCoordinator extends EventEmitter { private executions: Map = new Map(); /** * Drive a plan through the 12-state machine (arch §8) end-to-end. * * DRAFT -> INITIATED -> PRECONDITIONS_PENDING -> READY_FOR_PREPARE * -> PREPARED (approver) -> EXECUTING (releaser) * -> VALIDATING -> COMMITTED (approver) -> CLOSED * on failure: * -> ABORTED -> CLOSED */ async executePlan( planId: string, actors: ExecutionActors = {}, ): Promise<{ executionId: string }> { const executionId = `exec-${Date.now()}`; const act = { ...DEFAULT_ACTORS, ...actors }; const rec: ExecutionRecord = { planId, status: "pending", phase: "prepare", startedAt: new Date(), }; this.executions.set(executionId, rec); const plan = await getPlanById(planId); if (!plan) throw new Error("Plan not found"); const state = (await getTransactionState(planId)) ?? "DRAFT"; if (state !== "DRAFT") { throw new Error( `Plan ${planId} is in state '${state}', executePlan only accepts 'DRAFT'`, ); } try { // Move through the preparatory states (coordinator-driven, non-SoD). await transition({ planId, from: "DRAFT", to: "INITIATED", actor: "coordinator", actorRole: "coordinator", reason: "executePlan initiated" }); await transition({ planId, from: "INITIATED", to: "PRECONDITIONS_PENDING", actor: "coordinator", actorRole: "coordinator", reason: "preconditions check" }); await transition({ planId, from: "PRECONDITIONS_PENDING", to: "READY_FOR_PREPARE", actor: "coordinator", actorRole: "coordinator", reason: "preconditions satisfied" }); await this.preparePhase(executionId, plan); // SoD: approver gates the PREPARED transition. await transition({ planId, from: "READY_FOR_PREPARE", to: "PREPARED", actor: act.approver, actorRole: "approver", reason: "both legs ready" }); // SoD: releaser triggers the release (different human from approver). await transition({ planId, from: "PREPARED", to: "EXECUTING", actor: act.releaser, actorRole: "releaser", reason: "release authorised" }); const dlt = await this.executeDLTPhase(executionId, plan); const bank = await this.bankInstructionPhase(executionId, plan); // Enter VALIDATING (§9.2): reconcile dispatch + evidence. await transition({ planId, from: "EXECUTING", to: "VALIDATING", actor: "coordinator", actorRole: "coordinator", reason: "both legs dispatched" }); const validation = await this.validatePhase(executionId, plan, dlt, bank); if (!validation.ok) { throw Data.valueMismatch({ mismatches: validation.mismatches, dltTxHash: validation.dltTxHash, isoMessageId: validation.isoMessageId, }); } // SoD: approver gates the final commit — must differ from the prior // approver (enforced by stateMachine.transition). await transition({ planId, from: "VALIDATING", to: "COMMITTED", actor: act.validator, actorRole: "approver", reason: "evidence reconciled" }); await this.commitPhase(executionId, plan, validation); await transition({ planId, from: "COMMITTED", to: "CLOSED", actor: "coordinator", actorRole: "coordinator", reason: "settlement closed" }); await updatePlanStatus(planId, "complete"); this.emitStatus(executionId, { phase: "complete", status: "complete", timestamp: new Date().toISOString() }); return { executionId }; } catch (err: any) { const result = await handle(err, { queue: "execution", context: { planId, executionId } }); await this.abortExecution(executionId, planId, result.exception.message).catch(() => {}); throw err; } } private async preparePhase(executionId: string, plan: Plan) { this.emitStatus(executionId, { phase: "prepare", status: "in_progress", timestamp: new Date().toISOString() }); const dltPrepared = await prepareDLTExecution(plan); if (!dltPrepared) throw Control.missingApproval({ leg: "dlt" }); const bankPrepared = await prepareBankInstruction(plan); if (!bankPrepared) { await abortDLTExecution(plan.plan_id!); throw Control.missingApproval({ leg: "bank" }); } await registerPlan(plan); this.emitStatus(executionId, { phase: "prepare", status: "complete", timestamp: new Date().toISOString() }); } private async executeDLTPhase(executionId: string, plan: Plan): Promise<{ txHash: string }> { this.emitStatus(executionId, { phase: "execute_dlt", status: "in_progress", timestamp: new Date().toISOString() }); const result = await commitDLTExecution(plan); if (!result.success || !result.txHash) { await abortDLTExecution(plan.plan_id!); await abortBankInstruction(plan.plan_id!); throw new SettlementException("system", "external_service_error", `DLT execution failed: ${result.error ?? "unknown"}`); } const rec = this.executions.get(executionId); if (rec) rec.dltTxHash = result.txHash; this.emitStatus(executionId, { phase: "execute_dlt", status: "complete", dltTxHash: result.txHash, timestamp: new Date().toISOString() }); return { txHash: result.txHash }; } private async bankInstructionPhase(executionId: string, plan: Plan): Promise<{ isoMessageId: string }> { this.emitStatus(executionId, { phase: "bank_instruction", status: "in_progress", timestamp: new Date().toISOString() }); const result = await commitBankInstruction(plan); if (!result.success || !result.isoMessageId) { throw new SettlementException("system", "external_service_error", `Bank instruction failed: ${result.error ?? "unknown"}`); } const rec = this.executions.get(executionId); if (rec) rec.isoMessageId = result.isoMessageId; this.emitStatus(executionId, { phase: "bank_instruction", status: "complete", isoMessageId: result.isoMessageId, timestamp: new Date().toISOString() }); return { isoMessageId: result.isoMessageId }; } /** * VALIDATING phase (arch §8 + §9.2). Reconcile dispatch references + * evidence against the plan before COMMIT. * * Today's checks — stub shape, will be expanded by PRs C-E: * - dlt.txHash is a 0x-prefixed 32-byte hex * - bank.isoMessageId is a non-empty opaque reference * - sum(amount) across DLT + bank legs matches the plan totals per asset */ private async validatePhase( executionId: string, plan: Plan, dlt: { txHash: string }, bank: { isoMessageId: string }, ): Promise { this.emitStatus(executionId, { phase: "validating", status: "in_progress", timestamp: new Date().toISOString() }); const mismatches: ValidationResult["mismatches"] = []; if (!/^0x[0-9a-fA-F]{64}$/.test(dlt.txHash)) { mismatches.push({ field: "dlt.txHash", expected: "0x + 64 hex chars", actual: dlt.txHash }); } if (!bank.isoMessageId || bank.isoMessageId.trim() === "") { mismatches.push({ field: "bank.isoMessageId", expected: "non-empty string", actual: bank.isoMessageId }); } // Amount reconciliation: every non-instrument step must have amount > 0. for (const [i, step] of plan.steps.entries()) { if (step.type !== "issueInstrument" && !(step.amount > 0)) { mismatches.push({ field: `steps[${i}].amount`, expected: "> 0", actual: step.amount }); } } const result: ValidationResult = { ok: mismatches.length === 0, mismatches, dltTxHash: dlt.txHash, isoMessageId: bank.isoMessageId, }; this.emitStatus(executionId, { phase: "validating", status: result.ok ? "complete" : "failed", timestamp: new Date().toISOString(), ...(result.ok ? {} : { error: `${mismatches.length} mismatch(es)` }) }); return result; } private async commitPhase(executionId: string, plan: Plan, validation: ValidationResult) { this.emitStatus(executionId, { phase: "commit", status: "in_progress", timestamp: new Date().toISOString() }); await finalizePlan(plan.plan_id!, { dltTxHash: validation.dltTxHash ?? "mock-tx-hash", isoMessageId: validation.isoMessageId ?? "mock-iso-id", }); this.emitStatus(executionId, { phase: "commit", status: "complete", timestamp: new Date().toISOString() }); } async abortExecution(executionId: string, planId: string, error: string) { if (!this.executions.has(executionId)) return; try { await abortDLTExecution(planId); await abortBankInstruction(planId); await updatePlanStatus(planId, "aborted"); const current = await getTransactionState(planId); if (current && current !== "ABORTED" && current !== "CLOSED") { try { await transition({ planId, from: current, to: "ABORTED", actor: "coordinator", actorRole: "exception_manager", reason: error }); } catch { /* machine may not allow this edge from current state; leave for operator */ } } this.emitStatus(executionId, { phase: "aborted", status: "failed", error, timestamp: new Date().toISOString() }); } catch (abortError: any) { console.error("Abort failed:", abortError); } } async getExecutionStatus(executionId: string) { return this.executions.get(executionId); } private emitStatus(executionId: string, event: PlanStatusEvent) { this.emit("status", executionId, event); } onStatus(callback: (executionId: string, event: PlanStatusEvent) => void) { this.on("status", callback); } } export const executionCoordinator = new ExecutionCoordinator();