From 5741fd5d9b205c3b4e64de34648c144238f247e6 Mon Sep 17 00:00:00 2001 From: Devin Date: Wed, 22 Apr 2026 18:15:26 +0000 Subject: [PATCH] Persistent participant registry (plans_participants + API) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Closes gap-analysis v2 §4 + §10.9. - Migration 005 adds plans_participants (plan_id, participant_id, role, lei, did, created_at) with a composite UNIQUE(plan_id, role, participant_id) and CHECK on role vocabulary matching types/plan.ts. - services/participants.ts: idempotent add() / bulkAdd() / listForPlan() / listByRole() / remove() — the SoD layer reads listByRole() to resolve the authorised actor set for a transition. - api/participants.ts + wiring in index.ts: GET /api/plans/:planId/participants POST /api/plans/:planId/participants -> participants.authorized DELETE /api/plans/:planId/participants/:role/:participantId - 7 unit tests against a Map-backed query stub; full suite 87/87 green. --- orchestrator/src/api/participants.ts | 93 +++++++++ .../db/migrations/005_plans_participants.ts | 57 ++++++ orchestrator/src/db/migrations/index.ts | 2 + orchestrator/src/index.ts | 4 + orchestrator/src/services/participants.ts | 116 +++++++++++ orchestrator/tests/unit/participants.test.ts | 182 ++++++++++++++++++ 6 files changed, 454 insertions(+) create mode 100644 orchestrator/src/api/participants.ts create mode 100644 orchestrator/src/db/migrations/005_plans_participants.ts create mode 100644 orchestrator/src/services/participants.ts create mode 100644 orchestrator/tests/unit/participants.test.ts diff --git a/orchestrator/src/api/participants.ts b/orchestrator/src/api/participants.ts new file mode 100644 index 0000000..07265e5 --- /dev/null +++ b/orchestrator/src/api/participants.ts @@ -0,0 +1,93 @@ +import type { Request, Response } from "express"; +import { asyncHandler } from "../services/errorHandler"; +import * as participants from "../services/participants"; +import { publish } from "../services/eventBus"; +import { getPlanById } from "../db/plans"; +import type { Participant } from "../types/plan"; + +/** + * GET /api/plans/:id/participants — list every participant bound to a plan. + * Arch §4 canonical data objects "Participant Registry"; gap v2 §10.9. + */ +export const listParticipants = asyncHandler( + async (req: Request, res: Response) => { + const planId = req.params.planId; + const plan = await getPlanById(planId); + if (!plan) { + res.status(404).json({ error: "plan not found", planId }); + return; + } + const rows = await participants.listForPlan(planId); + res.json({ planId, participants: rows }); + }, +); + +/** + * POST /api/plans/:id/participants — bind a participant to a plan. + * Emits the `participants.authorized` event on first successful bind + * (arch §7.2). Idempotent on `(plan_id, role, participant_id)`. + * + * Body: { id, role, lei?, did? } + */ +export const addParticipant = asyncHandler( + async (req: Request, res: Response) => { + const planId = req.params.planId; + const plan = await getPlanById(planId); + if (!plan) { + res.status(404).json({ error: "plan not found", planId }); + return; + } + + const body = req.body as Partial; + if (!body?.id || !body?.role) { + res.status(400).json({ error: "id and role are required" }); + return; + } + + const input: Participant = { + id: String(body.id), + role: body.role, + lei: body.lei ? String(body.lei) : undefined, + did: body.did ? String(body.did) : undefined, + }; + + const row = await participants.add(planId, input); + await publish({ + planId, + type: "participants.authorized", + actor: (req as { actor?: string }).actor, + payload: { + participantId: row.id, + role: row.role, + lei: row.lei ?? null, + did: row.did ?? null, + }, + }); + res.status(201).json(row); + }, +); + +/** + * DELETE /api/plans/:id/participants/:role/:participantId — unbind a + * participant from a plan. Returns 404 if the row doesn't exist. + */ +export const removeParticipant = asyncHandler( + async (req: Request, res: Response) => { + const { planId, role, participantId } = req.params; + const plan = await getPlanById(planId); + if (!plan) { + res.status(404).json({ error: "plan not found", planId }); + return; + } + const removed = await participants.remove( + planId, + role as Participant["role"], + participantId, + ); + if (removed === 0) { + res.status(404).json({ error: "participant not found" }); + return; + } + res.status(204).end(); + }, +); diff --git a/orchestrator/src/db/migrations/005_plans_participants.ts b/orchestrator/src/db/migrations/005_plans_participants.ts new file mode 100644 index 0000000..ea128fc --- /dev/null +++ b/orchestrator/src/db/migrations/005_plans_participants.ts @@ -0,0 +1,57 @@ +import { query } from "../postgres"; + +/** + * Migration 005 — plans_participants registry table (arch §4 canonical + * data objects "Participant Registry" + §11 Phase 1 "register participants"; + * gap-analysis v2 §4 + §10.9). + * + * Each plan binds one or more Participants by role. The registry is the + * source of truth for segregation-of-duties decisions: the coordinator + * resolves the authorised actor set for a transition by looking up + * participants with the role required by that transition + * (see ROLE_FOR_TRANSITION in types/transactionState.ts). + * + * Row semantics: + * - `(plan_id, role, participant_id)` is unique — one participant can + * hold multiple roles on a plan but not the same role twice. + * - `lei` and `did` are optional strong-identity anchors (LEI per + * ISO-17442; W3C DID). Applications that only have an internal + * identifier fill `participant_id` alone. + * - `created_at` is append-only; role changes are modelled as a new + * row + an audit event (`participants.authorized`) rather than an + * in-place update. + */ +export async function up() { + await query( + `CREATE TABLE IF NOT EXISTS plans_participants ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + plan_id UUID NOT NULL REFERENCES plans(id) ON DELETE CASCADE, + participant_id VARCHAR(255) NOT NULL, + role VARCHAR(32) NOT NULL CHECK (role IN ( + 'applicant', + 'issuing_bank', + 'beneficiary_bank', + 'beneficiary', + 'coordinator', + 'observer' + )), + lei VARCHAR(20), + did VARCHAR(255), + created_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP, + UNIQUE (plan_id, role, participant_id) + )`, + ); + + await query( + `CREATE INDEX IF NOT EXISTS idx_plans_participants_plan_id + ON plans_participants(plan_id)`, + ); + await query( + `CREATE INDEX IF NOT EXISTS idx_plans_participants_role + ON plans_participants(role)`, + ); +} + +export async function down() { + await query("DROP TABLE IF EXISTS plans_participants CASCADE"); +} diff --git a/orchestrator/src/db/migrations/index.ts b/orchestrator/src/db/migrations/index.ts index 2158e8a..42024f6 100644 --- a/orchestrator/src/db/migrations/index.ts +++ b/orchestrator/src/db/migrations/index.ts @@ -2,6 +2,7 @@ import { up as up001 } from "./001_initial_schema"; import { up as up002 } from "./002_transaction_state"; import { up as up003 } from "./003_events"; import { up as up004 } from "./004_idempotency_keys"; +import { up as up005 } from "./005_plans_participants"; /** * Run all migrations @@ -12,6 +13,7 @@ export async function runMigration() { await up002(); await up003(); await up004(); + await up005(); console.log("All migrations completed"); } catch (error) { console.error("Migration failed:", error); diff --git a/orchestrator/src/index.ts b/orchestrator/src/index.ts index c7ccdbd..18f3a0d 100644 --- a/orchestrator/src/index.ts +++ b/orchestrator/src/index.ts @@ -16,6 +16,7 @@ import { logger } from "./logging/logger"; import { getMetrics, httpRequestDuration, httpRequestTotal, register } from "./metrics/prometheus"; import { healthCheck, readinessCheck, livenessCheck } from "./health/health"; import { listPlansEndpoint, createPlan, getPlan, getPlanState, getPlanEvents, streamPlanEvents, addSignature, validatePlanEndpoint } from "./api/plans"; +import { listParticipants, addParticipant, removeParticipant } from "./api/participants"; import { streamPlanStatus } from "./api/sse"; import { executionCoordinator } from "./services/execution"; import { runMigration } from "./db/migrations"; @@ -94,6 +95,9 @@ app.get("/api/plans/:planId/events", getPlanEvents); app.get("/api/plans/:planId/events/stream", streamPlanEvents); app.post("/api/plans/:planId/signature", addSignature); app.post("/api/plans/:planId/validate", validatePlanEndpoint); +app.get("/api/plans/:planId/participants", listParticipants); +app.post("/api/plans/:planId/participants", auditLog("ADD_PARTICIPANT", "plan"), addParticipant); +app.delete("/api/plans/:planId/participants/:role/:participantId", auditLog("REMOVE_PARTICIPANT", "plan"), removeParticipant); // Execution endpoints import { executePlan, getExecutionStatus, abortExecution } from "./api/execution"; diff --git a/orchestrator/src/services/participants.ts b/orchestrator/src/services/participants.ts new file mode 100644 index 0000000..8cb4109 --- /dev/null +++ b/orchestrator/src/services/participants.ts @@ -0,0 +1,116 @@ +/** + * Participant registry service (arch §4 canonical data objects; + * §11 Phase 1 "register participants"; gap v2 §4 + §10.9). + * + * The participant registry is the source of truth for segregation-of-duties + * decisions. The coordinator resolves the authorised actor set for a + * transition by calling `listByRole(planId, role)` and matching against + * the `ROLE_FOR_TRANSITION` map in `types/transactionState`. + * + * Writes are idempotent on `(plan_id, role, participant_id)` so replaying + * an event stream never creates duplicates. + */ + +import { query } from "../db/postgres"; +import type { Participant } from "../types/plan"; + +export interface ParticipantRow extends Participant { + plan_id: string; + created_at: string; +} + +type ParticipantRole = Participant["role"]; + +const ROLES: ReadonlySet = new Set([ + "applicant", + "issuing_bank", + "beneficiary_bank", + "beneficiary", + "coordinator", + "observer", +]); + +function assertRole(role: string): asserts role is ParticipantRole { + if (!ROLES.has(role as ParticipantRole)) { + throw new Error(`unknown participant role: ${role}`); + } +} + +/** + * Persist a participant for a plan. Idempotent on + * (plan_id, role, participant_id) — repeated calls return the first row. + */ +export async function add( + planId: string, + participant: Participant, +): Promise { + assertRole(participant.role); + const rows = await query( + `INSERT INTO plans_participants (plan_id, participant_id, role, lei, did) + VALUES ($1, $2, $3, $4, $5) + ON CONFLICT (plan_id, role, participant_id) DO UPDATE + SET lei = COALESCE(EXCLUDED.lei, plans_participants.lei), + did = COALESCE(EXCLUDED.did, plans_participants.did) + RETURNING plan_id, participant_id AS id, role, lei, did, created_at`, + [planId, participant.id, participant.role, participant.lei ?? null, participant.did ?? null], + ); + return rows[0]; +} + +/** + * Convenience: insert a full participant array for a newly created plan. + * Caller is responsible for publishing the `participants.authorized` + * event once this resolves. + */ +export async function bulkAdd( + planId: string, + participants: Participant[], +): Promise { + const out: ParticipantRow[] = []; + for (const p of participants) { + out.push(await add(planId, p)); + } + return out; +} + +/** List every participant for a plan. */ +export async function listForPlan(planId: string): Promise { + return query( + `SELECT plan_id, participant_id AS id, role, lei, did, created_at + FROM plans_participants + WHERE plan_id = $1 + ORDER BY created_at ASC, id ASC`, + [planId], + ); +} + +/** List participants on a plan with a given role (SoD lookup). */ +export async function listByRole( + planId: string, + role: ParticipantRole, +): Promise { + assertRole(role); + return query( + `SELECT plan_id, participant_id AS id, role, lei, did, created_at + FROM plans_participants + WHERE plan_id = $1 AND role = $2 + ORDER BY created_at ASC, id ASC`, + [planId, role], + ); +} + +/** Remove a participant from a plan (audit event handled by caller). */ +export async function remove( + planId: string, + role: ParticipantRole, + participantId: string, +): Promise { + assertRole(role); + const rows = await query<{ id: string }>( + `DELETE FROM plans_participants + WHERE plan_id = $1 AND role = $2 AND participant_id = $3 + RETURNING id`, + [planId, role, participantId], + ); + return rows.length; +} diff --git a/orchestrator/tests/unit/participants.test.ts b/orchestrator/tests/unit/participants.test.ts new file mode 100644 index 0000000..1d63e97 --- /dev/null +++ b/orchestrator/tests/unit/participants.test.ts @@ -0,0 +1,182 @@ +/** + * Tests for the plans_participants registry service (arch §4 + gap v2 §10.9). + * Stubs the Postgres `query` to a Map-backed fake so we can exercise the + * service contract without standing up a real database. + */ + +import { describe, it, expect, beforeEach, jest } from "@jest/globals"; + +interface Row { + id: string; + plan_id: string; + participant_id: string; + role: string; + lei: string | null; + did: string | null; + created_at: string; +} + +const rows: Row[] = []; +let rowCounter = 1; + +jest.mock("../../src/db/postgres", () => ({ + query: async (sql: string, params: unknown[] = []) => { + if (sql.startsWith("INSERT INTO plans_participants")) { + const [plan_id, participant_id, role, lei, did] = params as [ + string, string, string, string | null, string | null, + ]; + const existing = rows.find( + (r) => + r.plan_id === plan_id && + r.role === role && + r.participant_id === participant_id, + ); + if (existing) { + if (lei !== null) existing.lei = lei; + if (did !== null) existing.did = did; + return [ + { + plan_id: existing.plan_id, + id: existing.participant_id, + role: existing.role, + lei: existing.lei, + did: existing.did, + created_at: existing.created_at, + }, + ]; + } + const row: Row = { + id: `row-${rowCounter++}`, + plan_id, + participant_id, + role, + lei, + did, + created_at: new Date().toISOString(), + }; + rows.push(row); + return [ + { + plan_id: row.plan_id, + id: row.participant_id, + role: row.role, + lei: row.lei, + did: row.did, + created_at: row.created_at, + }, + ]; + } + if (sql.startsWith("DELETE FROM plans_participants")) { + const [plan_id, role, participant_id] = params as [string, string, string]; + const idx = rows.findIndex( + (r) => + r.plan_id === plan_id && + r.role === role && + r.participant_id === participant_id, + ); + if (idx === -1) return []; + const [removed] = rows.splice(idx, 1); + return [{ id: removed.id }]; + } + if (sql.startsWith("SELECT") && sql.includes("FROM plans_participants") && sql.includes("WHERE plan_id = $1 AND role = $2")) { + const [plan_id, role] = params as [string, string]; + return rows + .filter((r) => r.plan_id === plan_id && r.role === role) + .map((r) => ({ + plan_id: r.plan_id, + id: r.participant_id, + role: r.role, + lei: r.lei, + did: r.did, + created_at: r.created_at, + })); + } + if (sql.includes("FROM plans_participants") && sql.includes("WHERE plan_id = $1")) { + const [plan_id] = params as [string]; + return rows + .filter((r) => r.plan_id === plan_id) + .map((r) => ({ + plan_id: r.plan_id, + id: r.participant_id, + role: r.role, + lei: r.lei, + did: r.did, + created_at: r.created_at, + })); + } + return []; + }, +})); + +import * as participants from "../../src/services/participants"; + +describe("participants service", () => { + beforeEach(() => { + rows.splice(0, rows.length); + rowCounter = 1; + }); + + it("persists a participant and returns the row", async () => { + const r = await participants.add("plan-1", { + id: "p-42", + role: "applicant", + lei: "HWUPKR0MPOU8FGXBT394", + }); + expect(r.id).toBe("p-42"); + expect(r.role).toBe("applicant"); + expect(r.lei).toBe("HWUPKR0MPOU8FGXBT394"); + }); + + it("is idempotent on (plan_id, role, participant_id)", async () => { + await participants.add("plan-1", { id: "p-1", role: "applicant" }); + await participants.add("plan-1", { id: "p-1", role: "applicant" }); + const all = await participants.listForPlan("plan-1"); + expect(all).toHaveLength(1); + }); + + it("allows the same id in a different role on the same plan", async () => { + await participants.add("plan-1", { id: "p-1", role: "applicant" }); + await participants.add("plan-1", { id: "p-1", role: "coordinator" }); + const all = await participants.listForPlan("plan-1"); + expect(all.map((r) => r.role).sort()).toEqual([ + "applicant", + "coordinator", + ]); + }); + + it("listByRole filters correctly for SoD lookups", async () => { + await participants.add("plan-1", { id: "bank-A", role: "issuing_bank" }); + await participants.add("plan-1", { id: "bank-B", role: "beneficiary_bank" }); + const issuers = await participants.listByRole("plan-1", "issuing_bank"); + expect(issuers).toHaveLength(1); + expect(issuers[0].id).toBe("bank-A"); + }); + + it("rejects unknown roles", async () => { + await expect( + participants.add("plan-1", { + id: "p-1", + role: "intruder" as unknown as "applicant", + }), + ).rejects.toThrow(/unknown participant role/); + }); + + it("bulkAdd persists every participant in order", async () => { + const out = await participants.bulkAdd("plan-1", [ + { id: "a", role: "applicant" }, + { id: "b", role: "issuing_bank" }, + { id: "c", role: "beneficiary" }, + ]); + expect(out).toHaveLength(3); + expect(out.map((r) => r.id)).toEqual(["a", "b", "c"]); + }); + + it("remove() returns 0 for missing row and 1 on success", async () => { + expect( + await participants.remove("plan-1", "applicant", "nobody"), + ).toBe(0); + await participants.add("plan-1", { id: "p-1", role: "applicant" }); + expect(await participants.remove("plan-1", "applicant", "p-1")).toBe(1); + expect(await participants.listForPlan("plan-1")).toHaveLength(0); + }); +}); -- 2.34.1