PR L: persistent participant registry (plans_participants + API) #16
93
orchestrator/src/api/participants.ts
Normal file
93
orchestrator/src/api/participants.ts
Normal file
@@ -0,0 +1,93 @@
|
||||
import type { Request, Response } from "express";
|
||||
import { asyncHandler } from "../services/errorHandler";
|
||||
import * as participants from "../services/participants";
|
||||
import { publish } from "../services/eventBus";
|
||||
import { getPlanById } from "../db/plans";
|
||||
import type { Participant } from "../types/plan";
|
||||
|
||||
/**
|
||||
* GET /api/plans/:id/participants — list every participant bound to a plan.
|
||||
* Arch §4 canonical data objects "Participant Registry"; gap v2 §10.9.
|
||||
*/
|
||||
export const listParticipants = asyncHandler(
|
||||
async (req: Request, res: Response) => {
|
||||
const planId = req.params.planId;
|
||||
const plan = await getPlanById(planId);
|
||||
if (!plan) {
|
||||
res.status(404).json({ error: "plan not found", planId });
|
||||
return;
|
||||
}
|
||||
const rows = await participants.listForPlan(planId);
|
||||
res.json({ planId, participants: rows });
|
||||
},
|
||||
);
|
||||
|
||||
/**
|
||||
* POST /api/plans/:id/participants — bind a participant to a plan.
|
||||
* Emits the `participants.authorized` event on first successful bind
|
||||
* (arch §7.2). Idempotent on `(plan_id, role, participant_id)`.
|
||||
*
|
||||
* Body: { id, role, lei?, did? }
|
||||
*/
|
||||
export const addParticipant = asyncHandler(
|
||||
async (req: Request, res: Response) => {
|
||||
const planId = req.params.planId;
|
||||
const plan = await getPlanById(planId);
|
||||
if (!plan) {
|
||||
res.status(404).json({ error: "plan not found", planId });
|
||||
return;
|
||||
}
|
||||
|
||||
const body = req.body as Partial<Participant>;
|
||||
if (!body?.id || !body?.role) {
|
||||
res.status(400).json({ error: "id and role are required" });
|
||||
return;
|
||||
}
|
||||
|
||||
const input: Participant = {
|
||||
id: String(body.id),
|
||||
role: body.role,
|
||||
lei: body.lei ? String(body.lei) : undefined,
|
||||
did: body.did ? String(body.did) : undefined,
|
||||
};
|
||||
|
||||
const row = await participants.add(planId, input);
|
||||
await publish({
|
||||
planId,
|
||||
type: "participants.authorized",
|
||||
actor: (req as { actor?: string }).actor,
|
||||
payload: {
|
||||
participantId: row.id,
|
||||
role: row.role,
|
||||
lei: row.lei ?? null,
|
||||
did: row.did ?? null,
|
||||
},
|
||||
});
|
||||
res.status(201).json(row);
|
||||
},
|
||||
);
|
||||
|
||||
/**
|
||||
* DELETE /api/plans/:id/participants/:role/:participantId — unbind a
|
||||
* participant from a plan. Returns 404 if the row doesn't exist.
|
||||
*/
|
||||
export const removeParticipant = asyncHandler(
|
||||
async (req: Request, res: Response) => {
|
||||
const { planId, role, participantId } = req.params;
|
||||
const plan = await getPlanById(planId);
|
||||
if (!plan) {
|
||||
res.status(404).json({ error: "plan not found", planId });
|
||||
return;
|
||||
}
|
||||
const removed = await participants.remove(
|
||||
planId,
|
||||
role as Participant["role"],
|
||||
participantId,
|
||||
);
|
||||
if (removed === 0) {
|
||||
res.status(404).json({ error: "participant not found" });
|
||||
return;
|
||||
}
|
||||
res.status(204).end();
|
||||
},
|
||||
);
|
||||
57
orchestrator/src/db/migrations/005_plans_participants.ts
Normal file
57
orchestrator/src/db/migrations/005_plans_participants.ts
Normal file
@@ -0,0 +1,57 @@
|
||||
import { query } from "../postgres";
|
||||
|
||||
/**
|
||||
* Migration 005 — plans_participants registry table (arch §4 canonical
|
||||
* data objects "Participant Registry" + §11 Phase 1 "register participants";
|
||||
* gap-analysis v2 §4 + §10.9).
|
||||
*
|
||||
* Each plan binds one or more Participants by role. The registry is the
|
||||
* source of truth for segregation-of-duties decisions: the coordinator
|
||||
* resolves the authorised actor set for a transition by looking up
|
||||
* participants with the role required by that transition
|
||||
* (see ROLE_FOR_TRANSITION in types/transactionState.ts).
|
||||
*
|
||||
* Row semantics:
|
||||
* - `(plan_id, role, participant_id)` is unique — one participant can
|
||||
* hold multiple roles on a plan but not the same role twice.
|
||||
* - `lei` and `did` are optional strong-identity anchors (LEI per
|
||||
* ISO-17442; W3C DID). Applications that only have an internal
|
||||
* identifier fill `participant_id` alone.
|
||||
* - `created_at` is append-only; role changes are modelled as a new
|
||||
* row + an audit event (`participants.authorized`) rather than an
|
||||
* in-place update.
|
||||
*/
|
||||
export async function up() {
|
||||
await query(
|
||||
`CREATE TABLE IF NOT EXISTS plans_participants (
|
||||
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
|
||||
plan_id UUID NOT NULL REFERENCES plans(id) ON DELETE CASCADE,
|
||||
participant_id VARCHAR(255) NOT NULL,
|
||||
role VARCHAR(32) NOT NULL CHECK (role IN (
|
||||
'applicant',
|
||||
'issuing_bank',
|
||||
'beneficiary_bank',
|
||||
'beneficiary',
|
||||
'coordinator',
|
||||
'observer'
|
||||
)),
|
||||
lei VARCHAR(20),
|
||||
did VARCHAR(255),
|
||||
created_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
||||
UNIQUE (plan_id, role, participant_id)
|
||||
)`,
|
||||
);
|
||||
|
||||
await query(
|
||||
`CREATE INDEX IF NOT EXISTS idx_plans_participants_plan_id
|
||||
ON plans_participants(plan_id)`,
|
||||
);
|
||||
await query(
|
||||
`CREATE INDEX IF NOT EXISTS idx_plans_participants_role
|
||||
ON plans_participants(role)`,
|
||||
);
|
||||
}
|
||||
|
||||
export async function down() {
|
||||
await query("DROP TABLE IF EXISTS plans_participants CASCADE");
|
||||
}
|
||||
@@ -2,6 +2,7 @@ import { up as up001 } from "./001_initial_schema";
|
||||
import { up as up002 } from "./002_transaction_state";
|
||||
import { up as up003 } from "./003_events";
|
||||
import { up as up004 } from "./004_idempotency_keys";
|
||||
import { up as up005 } from "./005_plans_participants";
|
||||
|
||||
/**
|
||||
* Run all migrations
|
||||
@@ -12,6 +13,7 @@ export async function runMigration() {
|
||||
await up002();
|
||||
await up003();
|
||||
await up004();
|
||||
await up005();
|
||||
console.log("All migrations completed");
|
||||
} catch (error) {
|
||||
console.error("Migration failed:", error);
|
||||
|
||||
@@ -16,6 +16,7 @@ import { logger } from "./logging/logger";
|
||||
import { getMetrics, httpRequestDuration, httpRequestTotal, register } from "./metrics/prometheus";
|
||||
import { healthCheck, readinessCheck, livenessCheck } from "./health/health";
|
||||
import { listPlansEndpoint, createPlan, getPlan, getPlanState, getPlanEvents, streamPlanEvents, addSignature, validatePlanEndpoint } from "./api/plans";
|
||||
import { listParticipants, addParticipant, removeParticipant } from "./api/participants";
|
||||
import { streamPlanStatus } from "./api/sse";
|
||||
import { executionCoordinator } from "./services/execution";
|
||||
import { runMigration } from "./db/migrations";
|
||||
@@ -94,6 +95,9 @@ app.get("/api/plans/:planId/events", getPlanEvents);
|
||||
app.get("/api/plans/:planId/events/stream", streamPlanEvents);
|
||||
app.post("/api/plans/:planId/signature", addSignature);
|
||||
app.post("/api/plans/:planId/validate", validatePlanEndpoint);
|
||||
app.get("/api/plans/:planId/participants", listParticipants);
|
||||
app.post("/api/plans/:planId/participants", auditLog("ADD_PARTICIPANT", "plan"), addParticipant);
|
||||
app.delete("/api/plans/:planId/participants/:role/:participantId", auditLog("REMOVE_PARTICIPANT", "plan"), removeParticipant);
|
||||
|
||||
// Execution endpoints
|
||||
import { executePlan, getExecutionStatus, abortExecution } from "./api/execution";
|
||||
|
||||
116
orchestrator/src/services/participants.ts
Normal file
116
orchestrator/src/services/participants.ts
Normal file
@@ -0,0 +1,116 @@
|
||||
/**
|
||||
* Participant registry service (arch §4 canonical data objects;
|
||||
* §11 Phase 1 "register participants"; gap v2 §4 + §10.9).
|
||||
*
|
||||
* The participant registry is the source of truth for segregation-of-duties
|
||||
* decisions. The coordinator resolves the authorised actor set for a
|
||||
* transition by calling `listByRole(planId, role)` and matching against
|
||||
* the `ROLE_FOR_TRANSITION` map in `types/transactionState`.
|
||||
*
|
||||
* Writes are idempotent on `(plan_id, role, participant_id)` so replaying
|
||||
* an event stream never creates duplicates.
|
||||
*/
|
||||
|
||||
import { query } from "../db/postgres";
|
||||
import type { Participant } from "../types/plan";
|
||||
|
||||
export interface ParticipantRow extends Participant {
|
||||
plan_id: string;
|
||||
created_at: string;
|
||||
}
|
||||
|
||||
type ParticipantRole = Participant["role"];
|
||||
|
||||
const ROLES: ReadonlySet<ParticipantRole> = new Set([
|
||||
"applicant",
|
||||
"issuing_bank",
|
||||
"beneficiary_bank",
|
||||
"beneficiary",
|
||||
"coordinator",
|
||||
"observer",
|
||||
]);
|
||||
|
||||
function assertRole(role: string): asserts role is ParticipantRole {
|
||||
if (!ROLES.has(role as ParticipantRole)) {
|
||||
throw new Error(`unknown participant role: ${role}`);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Persist a participant for a plan. Idempotent on
|
||||
* (plan_id, role, participant_id) — repeated calls return the first row.
|
||||
*/
|
||||
export async function add(
|
||||
planId: string,
|
||||
participant: Participant,
|
||||
): Promise<ParticipantRow> {
|
||||
assertRole(participant.role);
|
||||
const rows = await query<ParticipantRow>(
|
||||
`INSERT INTO plans_participants (plan_id, participant_id, role, lei, did)
|
||||
VALUES ($1, $2, $3, $4, $5)
|
||||
ON CONFLICT (plan_id, role, participant_id) DO UPDATE
|
||||
SET lei = COALESCE(EXCLUDED.lei, plans_participants.lei),
|
||||
did = COALESCE(EXCLUDED.did, plans_participants.did)
|
||||
RETURNING plan_id, participant_id AS id, role, lei, did, created_at`,
|
||||
[planId, participant.id, participant.role, participant.lei ?? null, participant.did ?? null],
|
||||
);
|
||||
return rows[0];
|
||||
}
|
||||
|
||||
/**
|
||||
* Convenience: insert a full participant array for a newly created plan.
|
||||
* Caller is responsible for publishing the `participants.authorized`
|
||||
* event once this resolves.
|
||||
*/
|
||||
export async function bulkAdd(
|
||||
planId: string,
|
||||
participants: Participant[],
|
||||
): Promise<ParticipantRow[]> {
|
||||
const out: ParticipantRow[] = [];
|
||||
for (const p of participants) {
|
||||
out.push(await add(planId, p));
|
||||
}
|
||||
return out;
|
||||
}
|
||||
|
||||
/** List every participant for a plan. */
|
||||
export async function listForPlan(planId: string): Promise<ParticipantRow[]> {
|
||||
return query<ParticipantRow>(
|
||||
`SELECT plan_id, participant_id AS id, role, lei, did, created_at
|
||||
FROM plans_participants
|
||||
WHERE plan_id = $1
|
||||
ORDER BY created_at ASC, id ASC`,
|
||||
[planId],
|
||||
);
|
||||
}
|
||||
|
||||
/** List participants on a plan with a given role (SoD lookup). */
|
||||
export async function listByRole(
|
||||
planId: string,
|
||||
role: ParticipantRole,
|
||||
): Promise<ParticipantRow[]> {
|
||||
assertRole(role);
|
||||
return query<ParticipantRow>(
|
||||
`SELECT plan_id, participant_id AS id, role, lei, did, created_at
|
||||
FROM plans_participants
|
||||
WHERE plan_id = $1 AND role = $2
|
||||
ORDER BY created_at ASC, id ASC`,
|
||||
[planId, role],
|
||||
);
|
||||
}
|
||||
|
||||
/** Remove a participant from a plan (audit event handled by caller). */
|
||||
export async function remove(
|
||||
planId: string,
|
||||
role: ParticipantRole,
|
||||
participantId: string,
|
||||
): Promise<number> {
|
||||
assertRole(role);
|
||||
const rows = await query<{ id: string }>(
|
||||
`DELETE FROM plans_participants
|
||||
WHERE plan_id = $1 AND role = $2 AND participant_id = $3
|
||||
RETURNING id`,
|
||||
[planId, role, participantId],
|
||||
);
|
||||
return rows.length;
|
||||
}
|
||||
182
orchestrator/tests/unit/participants.test.ts
Normal file
182
orchestrator/tests/unit/participants.test.ts
Normal file
@@ -0,0 +1,182 @@
|
||||
/**
|
||||
* Tests for the plans_participants registry service (arch §4 + gap v2 §10.9).
|
||||
* Stubs the Postgres `query` to a Map-backed fake so we can exercise the
|
||||
* service contract without standing up a real database.
|
||||
*/
|
||||
|
||||
import { describe, it, expect, beforeEach, jest } from "@jest/globals";
|
||||
|
||||
interface Row {
|
||||
id: string;
|
||||
plan_id: string;
|
||||
participant_id: string;
|
||||
role: string;
|
||||
lei: string | null;
|
||||
did: string | null;
|
||||
created_at: string;
|
||||
}
|
||||
|
||||
const rows: Row[] = [];
|
||||
let rowCounter = 1;
|
||||
|
||||
jest.mock("../../src/db/postgres", () => ({
|
||||
query: async (sql: string, params: unknown[] = []) => {
|
||||
if (sql.startsWith("INSERT INTO plans_participants")) {
|
||||
const [plan_id, participant_id, role, lei, did] = params as [
|
||||
string, string, string, string | null, string | null,
|
||||
];
|
||||
const existing = rows.find(
|
||||
(r) =>
|
||||
r.plan_id === plan_id &&
|
||||
r.role === role &&
|
||||
r.participant_id === participant_id,
|
||||
);
|
||||
if (existing) {
|
||||
if (lei !== null) existing.lei = lei;
|
||||
if (did !== null) existing.did = did;
|
||||
return [
|
||||
{
|
||||
plan_id: existing.plan_id,
|
||||
id: existing.participant_id,
|
||||
role: existing.role,
|
||||
lei: existing.lei,
|
||||
did: existing.did,
|
||||
created_at: existing.created_at,
|
||||
},
|
||||
];
|
||||
}
|
||||
const row: Row = {
|
||||
id: `row-${rowCounter++}`,
|
||||
plan_id,
|
||||
participant_id,
|
||||
role,
|
||||
lei,
|
||||
did,
|
||||
created_at: new Date().toISOString(),
|
||||
};
|
||||
rows.push(row);
|
||||
return [
|
||||
{
|
||||
plan_id: row.plan_id,
|
||||
id: row.participant_id,
|
||||
role: row.role,
|
||||
lei: row.lei,
|
||||
did: row.did,
|
||||
created_at: row.created_at,
|
||||
},
|
||||
];
|
||||
}
|
||||
if (sql.startsWith("DELETE FROM plans_participants")) {
|
||||
const [plan_id, role, participant_id] = params as [string, string, string];
|
||||
const idx = rows.findIndex(
|
||||
(r) =>
|
||||
r.plan_id === plan_id &&
|
||||
r.role === role &&
|
||||
r.participant_id === participant_id,
|
||||
);
|
||||
if (idx === -1) return [];
|
||||
const [removed] = rows.splice(idx, 1);
|
||||
return [{ id: removed.id }];
|
||||
}
|
||||
if (sql.startsWith("SELECT") && sql.includes("FROM plans_participants") && sql.includes("WHERE plan_id = $1 AND role = $2")) {
|
||||
const [plan_id, role] = params as [string, string];
|
||||
return rows
|
||||
.filter((r) => r.plan_id === plan_id && r.role === role)
|
||||
.map((r) => ({
|
||||
plan_id: r.plan_id,
|
||||
id: r.participant_id,
|
||||
role: r.role,
|
||||
lei: r.lei,
|
||||
did: r.did,
|
||||
created_at: r.created_at,
|
||||
}));
|
||||
}
|
||||
if (sql.includes("FROM plans_participants") && sql.includes("WHERE plan_id = $1")) {
|
||||
const [plan_id] = params as [string];
|
||||
return rows
|
||||
.filter((r) => r.plan_id === plan_id)
|
||||
.map((r) => ({
|
||||
plan_id: r.plan_id,
|
||||
id: r.participant_id,
|
||||
role: r.role,
|
||||
lei: r.lei,
|
||||
did: r.did,
|
||||
created_at: r.created_at,
|
||||
}));
|
||||
}
|
||||
return [];
|
||||
},
|
||||
}));
|
||||
|
||||
import * as participants from "../../src/services/participants";
|
||||
|
||||
describe("participants service", () => {
|
||||
beforeEach(() => {
|
||||
rows.splice(0, rows.length);
|
||||
rowCounter = 1;
|
||||
});
|
||||
|
||||
it("persists a participant and returns the row", async () => {
|
||||
const r = await participants.add("plan-1", {
|
||||
id: "p-42",
|
||||
role: "applicant",
|
||||
lei: "HWUPKR0MPOU8FGXBT394",
|
||||
});
|
||||
expect(r.id).toBe("p-42");
|
||||
expect(r.role).toBe("applicant");
|
||||
expect(r.lei).toBe("HWUPKR0MPOU8FGXBT394");
|
||||
});
|
||||
|
||||
it("is idempotent on (plan_id, role, participant_id)", async () => {
|
||||
await participants.add("plan-1", { id: "p-1", role: "applicant" });
|
||||
await participants.add("plan-1", { id: "p-1", role: "applicant" });
|
||||
const all = await participants.listForPlan("plan-1");
|
||||
expect(all).toHaveLength(1);
|
||||
});
|
||||
|
||||
it("allows the same id in a different role on the same plan", async () => {
|
||||
await participants.add("plan-1", { id: "p-1", role: "applicant" });
|
||||
await participants.add("plan-1", { id: "p-1", role: "coordinator" });
|
||||
const all = await participants.listForPlan("plan-1");
|
||||
expect(all.map((r) => r.role).sort()).toEqual([
|
||||
"applicant",
|
||||
"coordinator",
|
||||
]);
|
||||
});
|
||||
|
||||
it("listByRole filters correctly for SoD lookups", async () => {
|
||||
await participants.add("plan-1", { id: "bank-A", role: "issuing_bank" });
|
||||
await participants.add("plan-1", { id: "bank-B", role: "beneficiary_bank" });
|
||||
const issuers = await participants.listByRole("plan-1", "issuing_bank");
|
||||
expect(issuers).toHaveLength(1);
|
||||
expect(issuers[0].id).toBe("bank-A");
|
||||
});
|
||||
|
||||
it("rejects unknown roles", async () => {
|
||||
await expect(
|
||||
participants.add("plan-1", {
|
||||
id: "p-1",
|
||||
role: "intruder" as unknown as "applicant",
|
||||
}),
|
||||
).rejects.toThrow(/unknown participant role/);
|
||||
});
|
||||
|
||||
it("bulkAdd persists every participant in order", async () => {
|
||||
const out = await participants.bulkAdd("plan-1", [
|
||||
{ id: "a", role: "applicant" },
|
||||
{ id: "b", role: "issuing_bank" },
|
||||
{ id: "c", role: "beneficiary" },
|
||||
]);
|
||||
expect(out).toHaveLength(3);
|
||||
expect(out.map((r) => r.id)).toEqual(["a", "b", "c"]);
|
||||
});
|
||||
|
||||
it("remove() returns 0 for missing row and 1 on success", async () => {
|
||||
expect(
|
||||
await participants.remove("plan-1", "applicant", "nobody"),
|
||||
).toBe(0);
|
||||
await participants.add("plan-1", { id: "p-1", role: "applicant" });
|
||||
expect(await participants.remove("plan-1", "applicant", "p-1")).toBe(1);
|
||||
expect(await participants.listForPlan("plan-1")).toHaveLength(0);
|
||||
});
|
||||
});
|
||||
Reference in New Issue
Block a user