PR L: persistent participant registry (plans_participants + API) #16

Open
nsatoshi wants to merge 1 commits from devin/1776881508-pr-l-participants into main
6 changed files with 454 additions and 0 deletions

View File

@@ -0,0 +1,93 @@
import type { Request, Response } from "express";
import { asyncHandler } from "../services/errorHandler";
import * as participants from "../services/participants";
import { publish } from "../services/eventBus";
import { getPlanById } from "../db/plans";
import type { Participant } from "../types/plan";
/**
* GET /api/plans/:id/participants — list every participant bound to a plan.
* Arch §4 canonical data objects "Participant Registry"; gap v2 §10.9.
*/
export const listParticipants = asyncHandler(
async (req: Request, res: Response) => {
const planId = req.params.planId;
const plan = await getPlanById(planId);
if (!plan) {
res.status(404).json({ error: "plan not found", planId });
return;
}
const rows = await participants.listForPlan(planId);
res.json({ planId, participants: rows });
},
);
/**
* POST /api/plans/:id/participants — bind a participant to a plan.
* Emits the `participants.authorized` event on first successful bind
* (arch §7.2). Idempotent on `(plan_id, role, participant_id)`.
*
* Body: { id, role, lei?, did? }
*/
export const addParticipant = asyncHandler(
async (req: Request, res: Response) => {
const planId = req.params.planId;
const plan = await getPlanById(planId);
if (!plan) {
res.status(404).json({ error: "plan not found", planId });
return;
}
const body = req.body as Partial<Participant>;
if (!body?.id || !body?.role) {
res.status(400).json({ error: "id and role are required" });
return;
}
const input: Participant = {
id: String(body.id),
role: body.role,
lei: body.lei ? String(body.lei) : undefined,
did: body.did ? String(body.did) : undefined,
};
const row = await participants.add(planId, input);
await publish({
planId,
type: "participants.authorized",
actor: (req as { actor?: string }).actor,
payload: {
participantId: row.id,
role: row.role,
lei: row.lei ?? null,
did: row.did ?? null,
},
});
res.status(201).json(row);
},
);
/**
* DELETE /api/plans/:id/participants/:role/:participantId — unbind a
* participant from a plan. Returns 404 if the row doesn't exist.
*/
export const removeParticipant = asyncHandler(
async (req: Request, res: Response) => {
const { planId, role, participantId } = req.params;
const plan = await getPlanById(planId);
if (!plan) {
res.status(404).json({ error: "plan not found", planId });
return;
}
const removed = await participants.remove(
planId,
role as Participant["role"],
participantId,
);
if (removed === 0) {
res.status(404).json({ error: "participant not found" });
return;
}
res.status(204).end();
},
);

View File

@@ -0,0 +1,57 @@
import { query } from "../postgres";
/**
* Migration 005 — plans_participants registry table (arch §4 canonical
* data objects "Participant Registry" + §11 Phase 1 "register participants";
* gap-analysis v2 §4 + §10.9).
*
* Each plan binds one or more Participants by role. The registry is the
* source of truth for segregation-of-duties decisions: the coordinator
* resolves the authorised actor set for a transition by looking up
* participants with the role required by that transition
* (see ROLE_FOR_TRANSITION in types/transactionState.ts).
*
* Row semantics:
* - `(plan_id, role, participant_id)` is unique — one participant can
* hold multiple roles on a plan but not the same role twice.
* - `lei` and `did` are optional strong-identity anchors (LEI per
* ISO-17442; W3C DID). Applications that only have an internal
* identifier fill `participant_id` alone.
* - `created_at` is append-only; role changes are modelled as a new
* row + an audit event (`participants.authorized`) rather than an
* in-place update.
*/
export async function up() {
await query(
`CREATE TABLE IF NOT EXISTS plans_participants (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
plan_id UUID NOT NULL REFERENCES plans(id) ON DELETE CASCADE,
participant_id VARCHAR(255) NOT NULL,
role VARCHAR(32) NOT NULL CHECK (role IN (
'applicant',
'issuing_bank',
'beneficiary_bank',
'beneficiary',
'coordinator',
'observer'
)),
lei VARCHAR(20),
did VARCHAR(255),
created_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP,
UNIQUE (plan_id, role, participant_id)
)`,
);
await query(
`CREATE INDEX IF NOT EXISTS idx_plans_participants_plan_id
ON plans_participants(plan_id)`,
);
await query(
`CREATE INDEX IF NOT EXISTS idx_plans_participants_role
ON plans_participants(role)`,
);
}
export async function down() {
await query("DROP TABLE IF EXISTS plans_participants CASCADE");
}

View File

@@ -2,6 +2,7 @@ import { up as up001 } from "./001_initial_schema";
import { up as up002 } from "./002_transaction_state";
import { up as up003 } from "./003_events";
import { up as up004 } from "./004_idempotency_keys";
import { up as up005 } from "./005_plans_participants";
/**
* Run all migrations
@@ -12,6 +13,7 @@ export async function runMigration() {
await up002();
await up003();
await up004();
await up005();
console.log("All migrations completed");
} catch (error) {
console.error("Migration failed:", error);

View File

@@ -16,6 +16,7 @@ import { logger } from "./logging/logger";
import { getMetrics, httpRequestDuration, httpRequestTotal, register } from "./metrics/prometheus";
import { healthCheck, readinessCheck, livenessCheck } from "./health/health";
import { listPlansEndpoint, createPlan, getPlan, getPlanState, getPlanEvents, streamPlanEvents, addSignature, validatePlanEndpoint } from "./api/plans";
import { listParticipants, addParticipant, removeParticipant } from "./api/participants";
import { streamPlanStatus } from "./api/sse";
import { executionCoordinator } from "./services/execution";
import { runMigration } from "./db/migrations";
@@ -94,6 +95,9 @@ app.get("/api/plans/:planId/events", getPlanEvents);
app.get("/api/plans/:planId/events/stream", streamPlanEvents);
app.post("/api/plans/:planId/signature", addSignature);
app.post("/api/plans/:planId/validate", validatePlanEndpoint);
app.get("/api/plans/:planId/participants", listParticipants);
app.post("/api/plans/:planId/participants", auditLog("ADD_PARTICIPANT", "plan"), addParticipant);
app.delete("/api/plans/:planId/participants/:role/:participantId", auditLog("REMOVE_PARTICIPANT", "plan"), removeParticipant);
// Execution endpoints
import { executePlan, getExecutionStatus, abortExecution } from "./api/execution";

View File

@@ -0,0 +1,116 @@
/**
* Participant registry service (arch §4 canonical data objects;
* §11 Phase 1 "register participants"; gap v2 §4 + §10.9).
*
* The participant registry is the source of truth for segregation-of-duties
* decisions. The coordinator resolves the authorised actor set for a
* transition by calling `listByRole(planId, role)` and matching against
* the `ROLE_FOR_TRANSITION` map in `types/transactionState`.
*
* Writes are idempotent on `(plan_id, role, participant_id)` so replaying
* an event stream never creates duplicates.
*/
import { query } from "../db/postgres";
import type { Participant } from "../types/plan";
export interface ParticipantRow extends Participant {
plan_id: string;
created_at: string;
}
type ParticipantRole = Participant["role"];
const ROLES: ReadonlySet<ParticipantRole> = new Set([
"applicant",
"issuing_bank",
"beneficiary_bank",
"beneficiary",
"coordinator",
"observer",
]);
function assertRole(role: string): asserts role is ParticipantRole {
if (!ROLES.has(role as ParticipantRole)) {
throw new Error(`unknown participant role: ${role}`);
}
}
/**
* Persist a participant for a plan. Idempotent on
* (plan_id, role, participant_id) — repeated calls return the first row.
*/
export async function add(
planId: string,
participant: Participant,
): Promise<ParticipantRow> {
assertRole(participant.role);
const rows = await query<ParticipantRow>(
`INSERT INTO plans_participants (plan_id, participant_id, role, lei, did)
VALUES ($1, $2, $3, $4, $5)
ON CONFLICT (plan_id, role, participant_id) DO UPDATE
SET lei = COALESCE(EXCLUDED.lei, plans_participants.lei),
did = COALESCE(EXCLUDED.did, plans_participants.did)
RETURNING plan_id, participant_id AS id, role, lei, did, created_at`,
[planId, participant.id, participant.role, participant.lei ?? null, participant.did ?? null],
);
return rows[0];
}
/**
* Convenience: insert a full participant array for a newly created plan.
* Caller is responsible for publishing the `participants.authorized`
* event once this resolves.
*/
export async function bulkAdd(
planId: string,
participants: Participant[],
): Promise<ParticipantRow[]> {
const out: ParticipantRow[] = [];
for (const p of participants) {
out.push(await add(planId, p));
}
return out;
}
/** List every participant for a plan. */
export async function listForPlan(planId: string): Promise<ParticipantRow[]> {
return query<ParticipantRow>(
`SELECT plan_id, participant_id AS id, role, lei, did, created_at
FROM plans_participants
WHERE plan_id = $1
ORDER BY created_at ASC, id ASC`,
[planId],
);
}
/** List participants on a plan with a given role (SoD lookup). */
export async function listByRole(
planId: string,
role: ParticipantRole,
): Promise<ParticipantRow[]> {
assertRole(role);
return query<ParticipantRow>(
`SELECT plan_id, participant_id AS id, role, lei, did, created_at
FROM plans_participants
WHERE plan_id = $1 AND role = $2
ORDER BY created_at ASC, id ASC`,
[planId, role],
);
}
/** Remove a participant from a plan (audit event handled by caller). */
export async function remove(
planId: string,
role: ParticipantRole,
participantId: string,
): Promise<number> {
assertRole(role);
const rows = await query<{ id: string }>(
`DELETE FROM plans_participants
WHERE plan_id = $1 AND role = $2 AND participant_id = $3
RETURNING id`,
[planId, role, participantId],
);
return rows.length;
}

View File

@@ -0,0 +1,182 @@
/**
* Tests for the plans_participants registry service (arch §4 + gap v2 §10.9).
* Stubs the Postgres `query` to a Map-backed fake so we can exercise the
* service contract without standing up a real database.
*/
import { describe, it, expect, beforeEach, jest } from "@jest/globals";
interface Row {
id: string;
plan_id: string;
participant_id: string;
role: string;
lei: string | null;
did: string | null;
created_at: string;
}
const rows: Row[] = [];
let rowCounter = 1;
jest.mock("../../src/db/postgres", () => ({
query: async (sql: string, params: unknown[] = []) => {
if (sql.startsWith("INSERT INTO plans_participants")) {
const [plan_id, participant_id, role, lei, did] = params as [
string, string, string, string | null, string | null,
];
const existing = rows.find(
(r) =>
r.plan_id === plan_id &&
r.role === role &&
r.participant_id === participant_id,
);
if (existing) {
if (lei !== null) existing.lei = lei;
if (did !== null) existing.did = did;
return [
{
plan_id: existing.plan_id,
id: existing.participant_id,
role: existing.role,
lei: existing.lei,
did: existing.did,
created_at: existing.created_at,
},
];
}
const row: Row = {
id: `row-${rowCounter++}`,
plan_id,
participant_id,
role,
lei,
did,
created_at: new Date().toISOString(),
};
rows.push(row);
return [
{
plan_id: row.plan_id,
id: row.participant_id,
role: row.role,
lei: row.lei,
did: row.did,
created_at: row.created_at,
},
];
}
if (sql.startsWith("DELETE FROM plans_participants")) {
const [plan_id, role, participant_id] = params as [string, string, string];
const idx = rows.findIndex(
(r) =>
r.plan_id === plan_id &&
r.role === role &&
r.participant_id === participant_id,
);
if (idx === -1) return [];
const [removed] = rows.splice(idx, 1);
return [{ id: removed.id }];
}
if (sql.startsWith("SELECT") && sql.includes("FROM plans_participants") && sql.includes("WHERE plan_id = $1 AND role = $2")) {
const [plan_id, role] = params as [string, string];
return rows
.filter((r) => r.plan_id === plan_id && r.role === role)
.map((r) => ({
plan_id: r.plan_id,
id: r.participant_id,
role: r.role,
lei: r.lei,
did: r.did,
created_at: r.created_at,
}));
}
if (sql.includes("FROM plans_participants") && sql.includes("WHERE plan_id = $1")) {
const [plan_id] = params as [string];
return rows
.filter((r) => r.plan_id === plan_id)
.map((r) => ({
plan_id: r.plan_id,
id: r.participant_id,
role: r.role,
lei: r.lei,
did: r.did,
created_at: r.created_at,
}));
}
return [];
},
}));
import * as participants from "../../src/services/participants";
describe("participants service", () => {
beforeEach(() => {
rows.splice(0, rows.length);
rowCounter = 1;
});
it("persists a participant and returns the row", async () => {
const r = await participants.add("plan-1", {
id: "p-42",
role: "applicant",
lei: "HWUPKR0MPOU8FGXBT394",
});
expect(r.id).toBe("p-42");
expect(r.role).toBe("applicant");
expect(r.lei).toBe("HWUPKR0MPOU8FGXBT394");
});
it("is idempotent on (plan_id, role, participant_id)", async () => {
await participants.add("plan-1", { id: "p-1", role: "applicant" });
await participants.add("plan-1", { id: "p-1", role: "applicant" });
const all = await participants.listForPlan("plan-1");
expect(all).toHaveLength(1);
});
it("allows the same id in a different role on the same plan", async () => {
await participants.add("plan-1", { id: "p-1", role: "applicant" });
await participants.add("plan-1", { id: "p-1", role: "coordinator" });
const all = await participants.listForPlan("plan-1");
expect(all.map((r) => r.role).sort()).toEqual([
"applicant",
"coordinator",
]);
});
it("listByRole filters correctly for SoD lookups", async () => {
await participants.add("plan-1", { id: "bank-A", role: "issuing_bank" });
await participants.add("plan-1", { id: "bank-B", role: "beneficiary_bank" });
const issuers = await participants.listByRole("plan-1", "issuing_bank");
expect(issuers).toHaveLength(1);
expect(issuers[0].id).toBe("bank-A");
});
it("rejects unknown roles", async () => {
await expect(
participants.add("plan-1", {
id: "p-1",
role: "intruder" as unknown as "applicant",
}),
).rejects.toThrow(/unknown participant role/);
});
it("bulkAdd persists every participant in order", async () => {
const out = await participants.bulkAdd("plan-1", [
{ id: "a", role: "applicant" },
{ id: "b", role: "issuing_bank" },
{ id: "c", role: "beneficiary" },
]);
expect(out).toHaveLength(3);
expect(out.map((r) => r.id)).toEqual(["a", "b", "c"]);
});
it("remove() returns 0 for missing row and 1 on success", async () => {
expect(
await participants.remove("plan-1", "applicant", "nobody"),
).toBe(0);
await participants.add("plan-1", { id: "p-1", role: "applicant" });
expect(await participants.remove("plan-1", "applicant", "p-1")).toBe(1);
expect(await participants.listForPlan("plan-1")).toHaveLength(0);
});
});