Consolidate webapp structure by merging nested components into the main repository
This commit is contained in:
202
orchestrator/src/services/execution.ts
Normal file
202
orchestrator/src/services/execution.ts
Normal file
@@ -0,0 +1,202 @@
|
||||
import { EventEmitter } from "events";
|
||||
import { getPlanById, updatePlanStatus } from "../db/plans";
|
||||
import { prepareDLTExecution, commitDLTExecution, abortDLTExecution } from "./dlt";
|
||||
import { prepareBankInstruction, commitBankInstruction, abortBankInstruction } from "./bank";
|
||||
import { registerPlan, finalizePlan } from "./notary";
|
||||
import type { PlanStatusEvent } from "../types/execution";
|
||||
|
||||
export class ExecutionCoordinator extends EventEmitter {
|
||||
private executions: Map<string, {
|
||||
planId: string;
|
||||
status: string;
|
||||
phase: string;
|
||||
startedAt: Date;
|
||||
error?: string;
|
||||
}> = new Map();
|
||||
|
||||
/**
|
||||
* Execute a plan using 2PC (two-phase commit) pattern
|
||||
*/
|
||||
async executePlan(planId: string): Promise<{ executionId: string }> {
|
||||
const executionId = `exec-${Date.now()}`;
|
||||
|
||||
this.executions.set(executionId, {
|
||||
planId,
|
||||
status: "pending",
|
||||
phase: "prepare",
|
||||
startedAt: new Date(),
|
||||
});
|
||||
|
||||
this.emitStatus(executionId, {
|
||||
phase: "prepare",
|
||||
status: "in_progress",
|
||||
timestamp: new Date().toISOString(),
|
||||
});
|
||||
|
||||
try {
|
||||
// Get plan
|
||||
const plan = await getPlanById(planId);
|
||||
if (!plan) {
|
||||
throw new Error("Plan not found");
|
||||
}
|
||||
|
||||
// PHASE 1: PREPARE
|
||||
await this.preparePhase(executionId, plan);
|
||||
|
||||
// PHASE 2: EXECUTE DLT
|
||||
await this.executeDLTPhase(executionId, plan);
|
||||
|
||||
// PHASE 3: BANK INSTRUCTION
|
||||
await this.bankInstructionPhase(executionId, plan);
|
||||
|
||||
// PHASE 4: COMMIT
|
||||
await this.commitPhase(executionId, plan);
|
||||
|
||||
this.emitStatus(executionId, {
|
||||
phase: "complete",
|
||||
status: "complete",
|
||||
timestamp: new Date().toISOString(),
|
||||
});
|
||||
|
||||
await updatePlanStatus(planId, "complete");
|
||||
|
||||
return { executionId };
|
||||
} catch (error: any) {
|
||||
// Rollback on error
|
||||
await this.abortExecution(executionId, planId, error.message);
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
private async preparePhase(executionId: string, plan: any) {
|
||||
this.emitStatus(executionId, {
|
||||
phase: "prepare",
|
||||
status: "in_progress",
|
||||
timestamp: new Date().toISOString(),
|
||||
});
|
||||
|
||||
// Prepare DLT execution
|
||||
const dltPrepared = await prepareDLTExecution(plan);
|
||||
if (!dltPrepared) {
|
||||
throw new Error("DLT preparation failed");
|
||||
}
|
||||
|
||||
// Prepare bank instruction (provisional)
|
||||
const bankPrepared = await prepareBankInstruction(plan);
|
||||
if (!bankPrepared) {
|
||||
await abortDLTExecution(plan.plan_id);
|
||||
throw new Error("Bank preparation failed");
|
||||
}
|
||||
|
||||
// Register plan with notary
|
||||
await registerPlan(plan);
|
||||
|
||||
this.emitStatus(executionId, {
|
||||
phase: "prepare",
|
||||
status: "complete",
|
||||
timestamp: new Date().toISOString(),
|
||||
});
|
||||
}
|
||||
|
||||
private async executeDLTPhase(executionId: string, plan: any) {
|
||||
this.emitStatus(executionId, {
|
||||
phase: "execute_dlt",
|
||||
status: "in_progress",
|
||||
timestamp: new Date().toISOString(),
|
||||
});
|
||||
|
||||
const result = await commitDLTExecution(plan);
|
||||
if (!result.success) {
|
||||
await abortDLTExecution(plan.plan_id);
|
||||
await abortBankInstruction(plan.plan_id);
|
||||
throw new Error("DLT execution failed: " + result.error);
|
||||
}
|
||||
|
||||
this.emitStatus(executionId, {
|
||||
phase: "execute_dlt",
|
||||
status: "complete",
|
||||
dltTxHash: result.txHash,
|
||||
timestamp: new Date().toISOString(),
|
||||
});
|
||||
}
|
||||
|
||||
private async bankInstructionPhase(executionId: string, plan: any) {
|
||||
this.emitStatus(executionId, {
|
||||
phase: "bank_instruction",
|
||||
status: "in_progress",
|
||||
timestamp: new Date().toISOString(),
|
||||
});
|
||||
|
||||
const result = await commitBankInstruction(plan);
|
||||
if (!result.success) {
|
||||
// DLT already committed, need to handle rollback
|
||||
throw new Error("Bank instruction failed: " + result.error);
|
||||
}
|
||||
|
||||
this.emitStatus(executionId, {
|
||||
phase: "bank_instruction",
|
||||
status: "complete",
|
||||
isoMessageId: result.isoMessageId,
|
||||
timestamp: new Date().toISOString(),
|
||||
});
|
||||
}
|
||||
|
||||
private async commitPhase(executionId: string, plan: any) {
|
||||
this.emitStatus(executionId, {
|
||||
phase: "commit",
|
||||
status: "in_progress",
|
||||
timestamp: new Date().toISOString(),
|
||||
});
|
||||
|
||||
// Finalize with notary
|
||||
await finalizePlan(plan.plan_id, {
|
||||
dltTxHash: "mock-tx-hash",
|
||||
isoMessageId: "mock-iso-id",
|
||||
});
|
||||
|
||||
this.emitStatus(executionId, {
|
||||
phase: "commit",
|
||||
status: "complete",
|
||||
timestamp: new Date().toISOString(),
|
||||
});
|
||||
}
|
||||
|
||||
async abortExecution(executionId: string, planId: string, error: string) {
|
||||
const execution = this.executions.get(executionId);
|
||||
if (!execution) return;
|
||||
|
||||
try {
|
||||
// Abort DLT
|
||||
await abortDLTExecution(planId);
|
||||
|
||||
// Abort bank
|
||||
await abortBankInstruction(planId);
|
||||
|
||||
await updatePlanStatus(planId, "aborted");
|
||||
|
||||
this.emitStatus(executionId, {
|
||||
phase: "aborted",
|
||||
status: "failed",
|
||||
error,
|
||||
timestamp: new Date().toISOString(),
|
||||
});
|
||||
} catch (abortError: any) {
|
||||
console.error("Abort failed:", abortError);
|
||||
}
|
||||
}
|
||||
|
||||
async getExecutionStatus(executionId: string) {
|
||||
return this.executions.get(executionId);
|
||||
}
|
||||
|
||||
private emitStatus(executionId: string, event: PlanStatusEvent) {
|
||||
this.emit("status", executionId, event);
|
||||
}
|
||||
|
||||
onStatus(callback: (executionId: string, event: PlanStatusEvent) => void) {
|
||||
this.on("status", callback);
|
||||
}
|
||||
}
|
||||
|
||||
export const executionCoordinator = new ExecutionCoordinator();
|
||||
|
||||
Reference in New Issue
Block a user