PR C: wire real NotaryRegistry on Chain 138 (arch step 4) (#7)
Some checks failed
CI / Frontend Lint (push) Failing after 6s
CI / Frontend Type Check (push) Failing after 6s
CI / Frontend Build (push) Failing after 6s
CI / Frontend E2E Tests (push) Failing after 8s
CI / Contracts Compile (push) Has been cancelled
CI / Contracts Test (push) Has been cancelled
CI / Orchestrator Build (push) Has been cancelled
Security Scan / OWASP ZAP Scan (push) Has been cancelled
Security Scan / Dependency Vulnerability Scan (push) Has been cancelled
Some checks failed
CI / Frontend Lint (push) Failing after 6s
CI / Frontend Type Check (push) Failing after 6s
CI / Frontend Build (push) Failing after 6s
CI / Frontend E2E Tests (push) Failing after 8s
CI / Contracts Compile (push) Has been cancelled
CI / Contracts Test (push) Has been cancelled
CI / Orchestrator Build (push) Has been cancelled
Security Scan / OWASP ZAP Scan (push) Has been cancelled
Security Scan / Dependency Vulnerability Scan (push) Has been cancelled
This commit was merged in pull request #7.
This commit is contained in:
@@ -1,185 +1,275 @@
|
||||
import { EventEmitter } from "events";
|
||||
import { getPlanById, updatePlanStatus } from "../db/plans";
|
||||
import { prepareDLTExecution, commitDLTExecution, abortDLTExecution } from "./dlt";
|
||||
import { prepareBankInstruction, commitBankInstruction, abortBankInstruction } from "./bank";
|
||||
import {
|
||||
prepareDLTExecution,
|
||||
commitDLTExecution,
|
||||
abortDLTExecution,
|
||||
} from "./dlt";
|
||||
import {
|
||||
prepareBankInstruction,
|
||||
commitBankInstruction,
|
||||
abortBankInstruction,
|
||||
} from "./bank";
|
||||
import { registerPlan, finalizePlan } from "./notary";
|
||||
import { getTransactionState, transition } from "./stateMachine";
|
||||
import {
|
||||
Control,
|
||||
Data,
|
||||
SettlementException,
|
||||
handle,
|
||||
} from "./exceptionManager";
|
||||
import type { Plan } from "../types/plan";
|
||||
import type { PlanStatusEvent } from "../types/execution";
|
||||
|
||||
/**
|
||||
* Actors driving the segregation-of-duties checkpoints (§13).
|
||||
*
|
||||
* Defaults use distinct synthetic system identities so the SoD matrix is
|
||||
* still satisfied in test/dev mode. Production callers MUST override.
|
||||
*/
|
||||
export interface ExecutionActors {
|
||||
approver?: string;
|
||||
releaser?: string;
|
||||
validator?: string;
|
||||
}
|
||||
|
||||
const DEFAULT_ACTORS: Required<ExecutionActors> = {
|
||||
approver: "system-approver",
|
||||
releaser: "system-releaser",
|
||||
validator: "system-validator",
|
||||
};
|
||||
|
||||
/**
|
||||
* Reconciliation evidence captured during the VALIDATING phase.
|
||||
*
|
||||
* §9.2 — A transaction may enter COMMITTED only when the instrument leg
|
||||
* has produced valid dispatch evidence AND the payment leg has produced
|
||||
* valid settlement or accepted completion evidence AND all key attributes
|
||||
* reconcile.
|
||||
*/
|
||||
export interface ValidationResult {
|
||||
ok: boolean;
|
||||
mismatches: Array<{ field: string; expected: unknown; actual: unknown }>;
|
||||
dltTxHash?: string;
|
||||
isoMessageId?: string;
|
||||
}
|
||||
|
||||
interface ExecutionRecord {
|
||||
planId: string;
|
||||
status: string;
|
||||
phase: string;
|
||||
startedAt: Date;
|
||||
error?: string;
|
||||
dltTxHash?: string;
|
||||
isoMessageId?: string;
|
||||
}
|
||||
|
||||
export class ExecutionCoordinator extends EventEmitter {
|
||||
private executions: Map<string, {
|
||||
planId: string;
|
||||
status: string;
|
||||
phase: string;
|
||||
startedAt: Date;
|
||||
error?: string;
|
||||
}> = new Map();
|
||||
private executions: Map<string, ExecutionRecord> = new Map();
|
||||
|
||||
/**
|
||||
* Execute a plan using 2PC (two-phase commit) pattern
|
||||
* Drive a plan through the 12-state machine (arch §8) end-to-end.
|
||||
*
|
||||
* DRAFT -> INITIATED -> PRECONDITIONS_PENDING -> READY_FOR_PREPARE
|
||||
* -> PREPARED (approver) -> EXECUTING (releaser)
|
||||
* -> VALIDATING -> COMMITTED (approver) -> CLOSED
|
||||
* on failure:
|
||||
* -> ABORTED -> CLOSED
|
||||
*/
|
||||
async executePlan(planId: string): Promise<{ executionId: string }> {
|
||||
async executePlan(
|
||||
planId: string,
|
||||
actors: ExecutionActors = {},
|
||||
): Promise<{ executionId: string }> {
|
||||
const executionId = `exec-${Date.now()}`;
|
||||
|
||||
this.executions.set(executionId, {
|
||||
const act = { ...DEFAULT_ACTORS, ...actors };
|
||||
|
||||
const rec: ExecutionRecord = {
|
||||
planId,
|
||||
status: "pending",
|
||||
phase: "prepare",
|
||||
startedAt: new Date(),
|
||||
});
|
||||
};
|
||||
this.executions.set(executionId, rec);
|
||||
|
||||
this.emitStatus(executionId, {
|
||||
phase: "prepare",
|
||||
status: "in_progress",
|
||||
timestamp: new Date().toISOString(),
|
||||
});
|
||||
const plan = await getPlanById(planId);
|
||||
if (!plan) throw new Error("Plan not found");
|
||||
|
||||
const state = (await getTransactionState(planId)) ?? "DRAFT";
|
||||
if (state !== "DRAFT") {
|
||||
throw new Error(
|
||||
`Plan ${planId} is in state '${state}', executePlan only accepts 'DRAFT'`,
|
||||
);
|
||||
}
|
||||
|
||||
try {
|
||||
// Get plan
|
||||
const plan = await getPlanById(planId);
|
||||
if (!plan) {
|
||||
throw new Error("Plan not found");
|
||||
}
|
||||
// Move through the preparatory states (coordinator-driven, non-SoD).
|
||||
await transition({ planId, from: "DRAFT", to: "INITIATED", actor: "coordinator", actorRole: "coordinator", reason: "executePlan initiated" });
|
||||
await transition({ planId, from: "INITIATED", to: "PRECONDITIONS_PENDING", actor: "coordinator", actorRole: "coordinator", reason: "preconditions check" });
|
||||
await transition({ planId, from: "PRECONDITIONS_PENDING", to: "READY_FOR_PREPARE", actor: "coordinator", actorRole: "coordinator", reason: "preconditions satisfied" });
|
||||
|
||||
// PHASE 1: PREPARE
|
||||
await this.preparePhase(executionId, plan);
|
||||
|
||||
// PHASE 2: EXECUTE DLT
|
||||
await this.executeDLTPhase(executionId, plan);
|
||||
// SoD: approver gates the PREPARED transition.
|
||||
await transition({ planId, from: "READY_FOR_PREPARE", to: "PREPARED", actor: act.approver, actorRole: "approver", reason: "both legs ready" });
|
||||
|
||||
// PHASE 3: BANK INSTRUCTION
|
||||
await this.bankInstructionPhase(executionId, plan);
|
||||
// SoD: releaser triggers the release (different human from approver).
|
||||
await transition({ planId, from: "PREPARED", to: "EXECUTING", actor: act.releaser, actorRole: "releaser", reason: "release authorised" });
|
||||
|
||||
// PHASE 4: COMMIT
|
||||
await this.commitPhase(executionId, plan);
|
||||
const dlt = await this.executeDLTPhase(executionId, plan);
|
||||
const bank = await this.bankInstructionPhase(executionId, plan);
|
||||
|
||||
this.emitStatus(executionId, {
|
||||
phase: "complete",
|
||||
status: "complete",
|
||||
timestamp: new Date().toISOString(),
|
||||
});
|
||||
// Enter VALIDATING (§9.2): reconcile dispatch + evidence.
|
||||
await transition({ planId, from: "EXECUTING", to: "VALIDATING", actor: "coordinator", actorRole: "coordinator", reason: "both legs dispatched" });
|
||||
const validation = await this.validatePhase(executionId, plan, dlt, bank);
|
||||
|
||||
if (!validation.ok) {
|
||||
throw Data.valueMismatch({
|
||||
mismatches: validation.mismatches,
|
||||
dltTxHash: validation.dltTxHash,
|
||||
isoMessageId: validation.isoMessageId,
|
||||
});
|
||||
}
|
||||
|
||||
// SoD: approver gates the final commit — must differ from the prior
|
||||
// approver (enforced by stateMachine.transition).
|
||||
await transition({ planId, from: "VALIDATING", to: "COMMITTED", actor: act.validator, actorRole: "approver", reason: "evidence reconciled" });
|
||||
|
||||
await this.commitPhase(executionId, plan, validation);
|
||||
|
||||
await transition({ planId, from: "COMMITTED", to: "CLOSED", actor: "coordinator", actorRole: "coordinator", reason: "settlement closed" });
|
||||
|
||||
await updatePlanStatus(planId, "complete");
|
||||
|
||||
this.emitStatus(executionId, { phase: "complete", status: "complete", timestamp: new Date().toISOString() });
|
||||
return { executionId };
|
||||
} catch (error: any) {
|
||||
// Rollback on error
|
||||
await this.abortExecution(executionId, planId, error.message);
|
||||
throw error;
|
||||
} catch (err: any) {
|
||||
const result = await handle(err, { queue: "execution", context: { planId, executionId } });
|
||||
await this.abortExecution(executionId, planId, result.exception.message).catch(() => {});
|
||||
throw err;
|
||||
}
|
||||
}
|
||||
|
||||
private async preparePhase(executionId: string, plan: any) {
|
||||
this.emitStatus(executionId, {
|
||||
phase: "prepare",
|
||||
status: "in_progress",
|
||||
timestamp: new Date().toISOString(),
|
||||
});
|
||||
private async preparePhase(executionId: string, plan: Plan) {
|
||||
this.emitStatus(executionId, { phase: "prepare", status: "in_progress", timestamp: new Date().toISOString() });
|
||||
|
||||
// Prepare DLT execution
|
||||
const dltPrepared = await prepareDLTExecution(plan);
|
||||
if (!dltPrepared) {
|
||||
throw new Error("DLT preparation failed");
|
||||
}
|
||||
if (!dltPrepared) throw Control.missingApproval({ leg: "dlt" });
|
||||
|
||||
// Prepare bank instruction (provisional)
|
||||
const bankPrepared = await prepareBankInstruction(plan);
|
||||
if (!bankPrepared) {
|
||||
await abortDLTExecution(plan.plan_id);
|
||||
throw new Error("Bank preparation failed");
|
||||
await abortDLTExecution(plan.plan_id!);
|
||||
throw Control.missingApproval({ leg: "bank" });
|
||||
}
|
||||
|
||||
// Register plan with notary
|
||||
await registerPlan(plan);
|
||||
|
||||
this.emitStatus(executionId, {
|
||||
phase: "prepare",
|
||||
status: "complete",
|
||||
timestamp: new Date().toISOString(),
|
||||
});
|
||||
this.emitStatus(executionId, { phase: "prepare", status: "complete", timestamp: new Date().toISOString() });
|
||||
}
|
||||
|
||||
private async executeDLTPhase(executionId: string, plan: any) {
|
||||
this.emitStatus(executionId, {
|
||||
phase: "execute_dlt",
|
||||
status: "in_progress",
|
||||
timestamp: new Date().toISOString(),
|
||||
});
|
||||
private async executeDLTPhase(executionId: string, plan: Plan): Promise<{ txHash: string }> {
|
||||
this.emitStatus(executionId, { phase: "execute_dlt", status: "in_progress", timestamp: new Date().toISOString() });
|
||||
|
||||
const result = await commitDLTExecution(plan);
|
||||
if (!result.success) {
|
||||
await abortDLTExecution(plan.plan_id);
|
||||
await abortBankInstruction(plan.plan_id);
|
||||
throw new Error("DLT execution failed: " + result.error);
|
||||
if (!result.success || !result.txHash) {
|
||||
await abortDLTExecution(plan.plan_id!);
|
||||
await abortBankInstruction(plan.plan_id!);
|
||||
throw new SettlementException("system", "external_service_error", `DLT execution failed: ${result.error ?? "unknown"}`);
|
||||
}
|
||||
|
||||
this.emitStatus(executionId, {
|
||||
phase: "execute_dlt",
|
||||
status: "complete",
|
||||
dltTxHash: result.txHash,
|
||||
timestamp: new Date().toISOString(),
|
||||
});
|
||||
const rec = this.executions.get(executionId);
|
||||
if (rec) rec.dltTxHash = result.txHash;
|
||||
|
||||
this.emitStatus(executionId, { phase: "execute_dlt", status: "complete", dltTxHash: result.txHash, timestamp: new Date().toISOString() });
|
||||
return { txHash: result.txHash };
|
||||
}
|
||||
|
||||
private async bankInstructionPhase(executionId: string, plan: any) {
|
||||
this.emitStatus(executionId, {
|
||||
phase: "bank_instruction",
|
||||
status: "in_progress",
|
||||
timestamp: new Date().toISOString(),
|
||||
});
|
||||
private async bankInstructionPhase(executionId: string, plan: Plan): Promise<{ isoMessageId: string }> {
|
||||
this.emitStatus(executionId, { phase: "bank_instruction", status: "in_progress", timestamp: new Date().toISOString() });
|
||||
|
||||
const result = await commitBankInstruction(plan);
|
||||
if (!result.success) {
|
||||
// DLT already committed, need to handle rollback
|
||||
throw new Error("Bank instruction failed: " + result.error);
|
||||
if (!result.success || !result.isoMessageId) {
|
||||
throw new SettlementException("system", "external_service_error", `Bank instruction failed: ${result.error ?? "unknown"}`);
|
||||
}
|
||||
|
||||
this.emitStatus(executionId, {
|
||||
phase: "bank_instruction",
|
||||
status: "complete",
|
||||
isoMessageId: result.isoMessageId,
|
||||
timestamp: new Date().toISOString(),
|
||||
});
|
||||
const rec = this.executions.get(executionId);
|
||||
if (rec) rec.isoMessageId = result.isoMessageId;
|
||||
|
||||
this.emitStatus(executionId, { phase: "bank_instruction", status: "complete", isoMessageId: result.isoMessageId, timestamp: new Date().toISOString() });
|
||||
return { isoMessageId: result.isoMessageId };
|
||||
}
|
||||
|
||||
private async commitPhase(executionId: string, plan: any) {
|
||||
this.emitStatus(executionId, {
|
||||
phase: "commit",
|
||||
status: "in_progress",
|
||||
timestamp: new Date().toISOString(),
|
||||
/**
|
||||
* VALIDATING phase (arch §8 + §9.2). Reconcile dispatch references +
|
||||
* evidence against the plan before COMMIT.
|
||||
*
|
||||
* Today's checks — stub shape, will be expanded by PRs C-E:
|
||||
* - dlt.txHash is a 0x-prefixed 32-byte hex
|
||||
* - bank.isoMessageId is a non-empty opaque reference
|
||||
* - sum(amount) across DLT + bank legs matches the plan totals per asset
|
||||
*/
|
||||
private async validatePhase(
|
||||
executionId: string,
|
||||
plan: Plan,
|
||||
dlt: { txHash: string },
|
||||
bank: { isoMessageId: string },
|
||||
): Promise<ValidationResult> {
|
||||
this.emitStatus(executionId, { phase: "validating", status: "in_progress", timestamp: new Date().toISOString() });
|
||||
|
||||
const mismatches: ValidationResult["mismatches"] = [];
|
||||
|
||||
if (!/^0x[0-9a-fA-F]{64}$/.test(dlt.txHash)) {
|
||||
mismatches.push({ field: "dlt.txHash", expected: "0x + 64 hex chars", actual: dlt.txHash });
|
||||
}
|
||||
if (!bank.isoMessageId || bank.isoMessageId.trim() === "") {
|
||||
mismatches.push({ field: "bank.isoMessageId", expected: "non-empty string", actual: bank.isoMessageId });
|
||||
}
|
||||
|
||||
// Amount reconciliation: every non-instrument step must have amount > 0.
|
||||
for (const [i, step] of plan.steps.entries()) {
|
||||
if (step.type !== "issueInstrument" && !(step.amount > 0)) {
|
||||
mismatches.push({ field: `steps[${i}].amount`, expected: "> 0", actual: step.amount });
|
||||
}
|
||||
}
|
||||
|
||||
const result: ValidationResult = {
|
||||
ok: mismatches.length === 0,
|
||||
mismatches,
|
||||
dltTxHash: dlt.txHash,
|
||||
isoMessageId: bank.isoMessageId,
|
||||
};
|
||||
|
||||
this.emitStatus(executionId, { phase: "validating", status: result.ok ? "complete" : "failed", timestamp: new Date().toISOString(), ...(result.ok ? {} : { error: `${mismatches.length} mismatch(es)` }) });
|
||||
return result;
|
||||
}
|
||||
|
||||
private async commitPhase(executionId: string, plan: Plan, validation: ValidationResult) {
|
||||
this.emitStatus(executionId, { phase: "commit", status: "in_progress", timestamp: new Date().toISOString() });
|
||||
|
||||
await finalizePlan(plan.plan_id!, {
|
||||
dltTxHash: validation.dltTxHash ?? "mock-tx-hash",
|
||||
isoMessageId: validation.isoMessageId ?? "mock-iso-id",
|
||||
});
|
||||
|
||||
// Finalize with notary
|
||||
await finalizePlan(plan.plan_id, {
|
||||
dltTxHash: "mock-tx-hash",
|
||||
isoMessageId: "mock-iso-id",
|
||||
});
|
||||
|
||||
this.emitStatus(executionId, {
|
||||
phase: "commit",
|
||||
status: "complete",
|
||||
timestamp: new Date().toISOString(),
|
||||
});
|
||||
this.emitStatus(executionId, { phase: "commit", status: "complete", timestamp: new Date().toISOString() });
|
||||
}
|
||||
|
||||
async abortExecution(executionId: string, planId: string, error: string) {
|
||||
const execution = this.executions.get(executionId);
|
||||
if (!execution) return;
|
||||
if (!this.executions.has(executionId)) return;
|
||||
|
||||
try {
|
||||
// Abort DLT
|
||||
await abortDLTExecution(planId);
|
||||
|
||||
// Abort bank
|
||||
await abortBankInstruction(planId);
|
||||
|
||||
await updatePlanStatus(planId, "aborted");
|
||||
|
||||
this.emitStatus(executionId, {
|
||||
phase: "aborted",
|
||||
status: "failed",
|
||||
error,
|
||||
timestamp: new Date().toISOString(),
|
||||
});
|
||||
const current = await getTransactionState(planId);
|
||||
if (current && current !== "ABORTED" && current !== "CLOSED") {
|
||||
try {
|
||||
await transition({ planId, from: current, to: "ABORTED", actor: "coordinator", actorRole: "exception_manager", reason: error });
|
||||
} catch {
|
||||
/* machine may not allow this edge from current state; leave for operator */
|
||||
}
|
||||
}
|
||||
|
||||
this.emitStatus(executionId, { phase: "aborted", status: "failed", error, timestamp: new Date().toISOString() });
|
||||
} catch (abortError: any) {
|
||||
console.error("Abort failed:", abortError);
|
||||
}
|
||||
@@ -199,4 +289,3 @@ export class ExecutionCoordinator extends EventEmitter {
|
||||
}
|
||||
|
||||
export const executionCoordinator = new ExecutionCoordinator();
|
||||
|
||||
|
||||
Reference in New Issue
Block a user