PR F: idempotency keys + replay protection (arch step 9 / \u00a713 / \u00a715)
Adds an Idempotency-Key middleware backed by a new idempotency_keys
table. Covers arch \u00a713 security (replay protection) and \u00a715
non-functional requirement (idempotent event handling, resilience to
duplicate messages).
Middleware (src/middleware/idempotency.ts):
- Mounted on POST /api/plans and POST /api/plans/:planId/execute.
- Key format /^[A-Za-z0-9_\-:.]{8,255}$/; malformed -> 400.
- On hit, replays cached status+body with Idempotent-Replayed: true.
- Reuse with a different body hash -> 422 idempotency_key_reused.
- Scopes by (method, path, key) so the same key is safe across
unrelated endpoints.
- Only 2xx is cached. Non-2xx stays retryable.
- res.json() is shimmed so handlers need no changes.
- Fail-open on dedup-store unavailability (warn log).
Migration 004 (db/migrations/004_idempotency_keys.ts):
- idempotency_keys(method, path, key, request_hash, status_code,
response_body, created_at, expires_at) with UNIQUE(method,path,key)
and an expires_at index. 24h TTL.
Tests: tests/unit/idempotency.test.ts \u2014 6 cases covering no-header
pass-through, malformed-key 400, replay on second call, 422 on body
divergence, retryable non-2xx, (method,path,key) scoping.
tsc clean. 80 tests pass across 7 suites.
This commit is contained in:
44
orchestrator/src/db/migrations/004_idempotency_keys.ts
Normal file
44
orchestrator/src/db/migrations/004_idempotency_keys.ts
Normal file
@@ -0,0 +1,44 @@
|
||||
import { query } from "../postgres";
|
||||
|
||||
/**
|
||||
* Migration 004 — idempotency keys + replay protection (arch §13,
|
||||
* §15: deterministic state transitions, idempotent event handling,
|
||||
* resilience to duplicate messages).
|
||||
*
|
||||
* A caller supplies an `Idempotency-Key` header on POST requests.
|
||||
* The server records `{ key, request_hash, response_body, status_code }`
|
||||
* on first success and replays the cached response on subsequent
|
||||
* requests with the same key. If the request body changes while the
|
||||
* key is reused the server returns 422 with `key_reused_with_different_payload`.
|
||||
*
|
||||
* Scoped by `(method, path, key)` so the same key can safely appear
|
||||
* across unrelated endpoints.
|
||||
*
|
||||
* Rows expire after 24h — enough to cover retry windows, short enough
|
||||
* to keep the table bounded.
|
||||
*/
|
||||
export async function up() {
|
||||
await query(
|
||||
`CREATE TABLE IF NOT EXISTS idempotency_keys (
|
||||
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
|
||||
method VARCHAR(8) NOT NULL,
|
||||
path VARCHAR(512) NOT NULL,
|
||||
key VARCHAR(255) NOT NULL,
|
||||
request_hash CHAR(64) NOT NULL,
|
||||
status_code INTEGER NOT NULL,
|
||||
response_body JSONB NOT NULL,
|
||||
created_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
||||
expires_at TIMESTAMPTZ NOT NULL DEFAULT (CURRENT_TIMESTAMP + INTERVAL '24 hours'),
|
||||
UNIQUE (method, path, key)
|
||||
)`,
|
||||
);
|
||||
|
||||
await query(
|
||||
`CREATE INDEX IF NOT EXISTS idx_idempotency_expires_at
|
||||
ON idempotency_keys(expires_at)`,
|
||||
);
|
||||
}
|
||||
|
||||
export async function down() {
|
||||
await query("DROP TABLE IF EXISTS idempotency_keys CASCADE");
|
||||
}
|
||||
@@ -1,6 +1,7 @@
|
||||
import { up as up001 } from "./001_initial_schema";
|
||||
import { up as up002 } from "./002_transaction_state";
|
||||
import { up as up003 } from "./003_events";
|
||||
import { up as up004 } from "./004_idempotency_keys";
|
||||
|
||||
/**
|
||||
* Run all migrations
|
||||
@@ -10,6 +11,7 @@ export async function runMigration() {
|
||||
await up001();
|
||||
await up002();
|
||||
await up003();
|
||||
await up004();
|
||||
console.log("All migrations completed");
|
||||
} catch (error) {
|
||||
console.error("Migration failed:", error);
|
||||
|
||||
@@ -9,6 +9,7 @@ import {
|
||||
requestId,
|
||||
apiKeyAuth,
|
||||
auditLog,
|
||||
idempotencyMiddleware,
|
||||
} from "./middleware";
|
||||
import { requestTimeout } from "./middleware/timeout";
|
||||
import { logger } from "./logging/logger";
|
||||
@@ -86,7 +87,7 @@ app.use("/api", apiLimiter);
|
||||
|
||||
// Plan management endpoints
|
||||
app.get("/api/plans", listPlansEndpoint);
|
||||
app.post("/api/plans", auditLog("CREATE_PLAN", "plan"), createPlan);
|
||||
app.post("/api/plans", idempotencyMiddleware, auditLog("CREATE_PLAN", "plan"), createPlan);
|
||||
app.get("/api/plans/:planId", getPlan);
|
||||
app.get("/api/plans/:planId/state", getPlanState);
|
||||
app.get("/api/plans/:planId/events", getPlanEvents);
|
||||
@@ -97,7 +98,7 @@ app.post("/api/plans/:planId/validate", validatePlanEndpoint);
|
||||
// Execution endpoints
|
||||
import { executePlan, getExecutionStatus, abortExecution } from "./api/execution";
|
||||
import { registerWebhook } from "./api/webhooks";
|
||||
app.post("/api/plans/:planId/execute", auditLog("EXECUTE_PLAN", "plan"), executePlan);
|
||||
app.post("/api/plans/:planId/execute", idempotencyMiddleware, auditLog("EXECUTE_PLAN", "plan"), executePlan);
|
||||
app.get("/api/plans/:planId/status", getExecutionStatus);
|
||||
app.post("/api/plans/:planId/abort", auditLog("ABORT_PLAN", "plan"), abortExecution);
|
||||
app.post("/api/webhooks", registerWebhook);
|
||||
|
||||
120
orchestrator/src/middleware/idempotency.ts
Normal file
120
orchestrator/src/middleware/idempotency.ts
Normal file
@@ -0,0 +1,120 @@
|
||||
/**
|
||||
* Idempotency-Key middleware (arch §13 security requirements,
|
||||
* §15 non-functional: idempotent event handling, replay protection).
|
||||
*
|
||||
* Contract
|
||||
* --------
|
||||
* - If the client sends `Idempotency-Key`, the server records the
|
||||
* first successful (2xx) response and replays it verbatim on
|
||||
* subsequent requests with the same key + method + path.
|
||||
* - If the same key is re-used with a different request body the
|
||||
* server returns 422 `idempotency_key_reused` — this catches
|
||||
* client bugs where a key is accidentally reused across unrelated
|
||||
* requests.
|
||||
* - Keys are scoped by `(method, path, key)` and expire after 24h.
|
||||
* - Responses are captured by shimming `res.json()` — no deep
|
||||
* integration with route handlers required.
|
||||
* - Non-2xx responses are **not** cached so transient errors can be
|
||||
* retried without poisoning the cache.
|
||||
*
|
||||
* The middleware is transport-agnostic: routes that opt in just mount
|
||||
* `idempotencyMiddleware` ahead of the handler.
|
||||
*/
|
||||
|
||||
import type { NextFunction, Request, Response } from "express";
|
||||
import { createHash } from "crypto";
|
||||
import { query } from "../db/postgres";
|
||||
import { logger } from "../logging/logger";
|
||||
|
||||
export const IDEMPOTENCY_HEADER = "idempotency-key";
|
||||
const KEY_PATTERN = /^[A-Za-z0-9_\-:.]{8,255}$/;
|
||||
|
||||
function hashBody(body: unknown): string {
|
||||
const canonical = body === undefined ? "" : JSON.stringify(body);
|
||||
return createHash("sha256").update(canonical).digest("hex");
|
||||
}
|
||||
|
||||
interface CachedRow {
|
||||
request_hash: string;
|
||||
status_code: number;
|
||||
response_body: unknown;
|
||||
}
|
||||
|
||||
export async function idempotencyMiddleware(
|
||||
req: Request,
|
||||
res: Response,
|
||||
next: NextFunction,
|
||||
): Promise<void> {
|
||||
const rawKey = req.header(IDEMPOTENCY_HEADER);
|
||||
if (!rawKey) {
|
||||
next();
|
||||
return;
|
||||
}
|
||||
if (!KEY_PATTERN.test(rawKey)) {
|
||||
res.status(400).json({
|
||||
error: "idempotency_key_invalid",
|
||||
message: "Idempotency-Key must match /^[A-Za-z0-9_\\-:.]{8,255}$/",
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
const key = rawKey;
|
||||
const method = req.method;
|
||||
const path = req.baseUrl + req.path;
|
||||
const requestHash = hashBody(req.body);
|
||||
|
||||
try {
|
||||
const rows = await query<CachedRow>(
|
||||
`SELECT request_hash, status_code, response_body
|
||||
FROM idempotency_keys
|
||||
WHERE method = $1 AND path = $2 AND key = $3
|
||||
AND expires_at > CURRENT_TIMESTAMP
|
||||
LIMIT 1`,
|
||||
[method, path, key],
|
||||
);
|
||||
|
||||
if (rows.length > 0) {
|
||||
const cached = rows[0];
|
||||
if (cached.request_hash !== requestHash) {
|
||||
res.status(422).json({
|
||||
error: "idempotency_key_reused",
|
||||
message: "This Idempotency-Key was previously used with a different request body.",
|
||||
});
|
||||
return;
|
||||
}
|
||||
res.setHeader("Idempotent-Replayed", "true");
|
||||
res.status(cached.status_code).json(cached.response_body);
|
||||
return;
|
||||
}
|
||||
} catch (err) {
|
||||
// Fail open: if the lookup fails we still process the request so
|
||||
// the caller never sees a hard 500 because the dedup table is
|
||||
// unavailable. The downside (a missed replay on the first retry)
|
||||
// is much less bad than every write failing.
|
||||
logger.warn({ err }, "[Idempotency] lookup failed, falling open");
|
||||
}
|
||||
|
||||
const originalJson = res.json.bind(res);
|
||||
res.json = (body: unknown): Response => {
|
||||
const statusCode = res.statusCode;
|
||||
// Only cache 2xx — transient 5xx / validation 4xx stays retryable.
|
||||
if (statusCode >= 200 && statusCode < 300) {
|
||||
// Fire-and-forget; response is already known and can be sent.
|
||||
query(
|
||||
`INSERT INTO idempotency_keys
|
||||
(method, path, key, request_hash, status_code, response_body)
|
||||
VALUES ($1, $2, $3, $4, $5, $6::jsonb)
|
||||
ON CONFLICT (method, path, key) DO NOTHING`,
|
||||
[method, path, key, requestHash, statusCode, JSON.stringify(body)],
|
||||
).catch((err) => {
|
||||
logger.warn({ err, key, method, path }, "[Idempotency] write failed");
|
||||
});
|
||||
}
|
||||
return originalJson(body);
|
||||
};
|
||||
|
||||
next();
|
||||
}
|
||||
|
||||
/** exposed for tests */
|
||||
export const __testing = { hashBody, KEY_PATTERN };
|
||||
@@ -5,4 +5,5 @@ export { validate, sanitizeInput } from "./validation";
|
||||
export { ipWhitelist, getClientIP } from "./ipWhitelist";
|
||||
export { auditLog } from "./auditLog";
|
||||
export { sessionManager } from "./session";
|
||||
export { idempotencyMiddleware, IDEMPOTENCY_HEADER } from "./idempotency";
|
||||
|
||||
|
||||
177
orchestrator/tests/unit/idempotency.test.ts
Normal file
177
orchestrator/tests/unit/idempotency.test.ts
Normal file
@@ -0,0 +1,177 @@
|
||||
import { describe, it, expect, beforeEach, jest } from "@jest/globals";
|
||||
import type { Request, Response, NextFunction } from "express";
|
||||
|
||||
type Row = {
|
||||
method: string;
|
||||
path: string;
|
||||
key: string;
|
||||
request_hash: string;
|
||||
status_code: number;
|
||||
response_body: unknown;
|
||||
};
|
||||
|
||||
const store = new Map<string, Row>();
|
||||
|
||||
jest.mock("../../src/db/postgres", () => ({
|
||||
query: async (sql: string, params: unknown[] = []) => {
|
||||
if (sql.startsWith("SELECT request_hash")) {
|
||||
const [method, path, key] = params as [string, string, string];
|
||||
const row = store.get(`${method}|${path}|${key}`);
|
||||
return row ? [row] : [];
|
||||
}
|
||||
if (sql.startsWith("INSERT INTO idempotency_keys")) {
|
||||
const [method, path, key, request_hash, status_code, body] = params as [
|
||||
string, string, string, string, number, string,
|
||||
];
|
||||
const k = `${method}|${path}|${key}`;
|
||||
if (!store.has(k)) {
|
||||
store.set(k, {
|
||||
method,
|
||||
path,
|
||||
key,
|
||||
request_hash,
|
||||
status_code,
|
||||
response_body: JSON.parse(body),
|
||||
});
|
||||
}
|
||||
return [];
|
||||
}
|
||||
return [];
|
||||
},
|
||||
}));
|
||||
|
||||
import { idempotencyMiddleware, IDEMPOTENCY_HEADER } from "../../src/middleware/idempotency";
|
||||
|
||||
function makeReqRes(overrides: {
|
||||
header?: string;
|
||||
method?: string;
|
||||
baseUrl?: string;
|
||||
path?: string;
|
||||
body?: unknown;
|
||||
}) {
|
||||
const req = {
|
||||
method: overrides.method ?? "POST",
|
||||
baseUrl: overrides.baseUrl ?? "",
|
||||
path: overrides.path ?? "/api/plans",
|
||||
body: overrides.body ?? { a: 1 },
|
||||
header(name: string) {
|
||||
return name.toLowerCase() === IDEMPOTENCY_HEADER ? overrides.header : undefined;
|
||||
},
|
||||
} as unknown as Request;
|
||||
const captured: { status?: number; body?: unknown; headers: Record<string, string> } = {
|
||||
headers: {},
|
||||
};
|
||||
const res: Partial<Response> = {
|
||||
statusCode: 200,
|
||||
status(code: number) {
|
||||
this.statusCode = code;
|
||||
captured.status = code;
|
||||
return this as Response;
|
||||
},
|
||||
json(body: unknown) {
|
||||
captured.body = body;
|
||||
if (captured.status === undefined) captured.status = this.statusCode;
|
||||
return this as Response;
|
||||
},
|
||||
setHeader(name: string, value: string | number | readonly string[]) {
|
||||
captured.headers[name] = String(value);
|
||||
return this as Response;
|
||||
},
|
||||
};
|
||||
return { req, res: res as Response, captured };
|
||||
}
|
||||
|
||||
describe("Idempotency middleware", () => {
|
||||
beforeEach(() => {
|
||||
store.clear();
|
||||
});
|
||||
|
||||
it("skips when no Idempotency-Key header is set", async () => {
|
||||
const { req, res } = makeReqRes({});
|
||||
const next = jest.fn() as unknown as NextFunction;
|
||||
await idempotencyMiddleware(req, res, next);
|
||||
expect(next).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it("rejects malformed keys with 400", async () => {
|
||||
const { req, res, captured } = makeReqRes({ header: "short" });
|
||||
const next = jest.fn() as unknown as NextFunction;
|
||||
await idempotencyMiddleware(req, res, next);
|
||||
expect(next).not.toHaveBeenCalled();
|
||||
expect(captured.status).toBe(400);
|
||||
expect((captured.body as { error: string }).error).toBe("idempotency_key_invalid");
|
||||
});
|
||||
|
||||
it("caches 2xx responses on first call and replays on second", async () => {
|
||||
const key = "ABC12345_test-key";
|
||||
|
||||
const first = makeReqRes({ header: key });
|
||||
const next1 = jest.fn() as unknown as NextFunction;
|
||||
await idempotencyMiddleware(first.req, first.res, next1);
|
||||
expect(next1).toHaveBeenCalledTimes(1);
|
||||
// Simulate handler sending JSON response
|
||||
first.res.status(201);
|
||||
first.res.json({ plan_id: "p-1", created: true });
|
||||
|
||||
// Let the fire-and-forget INSERT microtask flush
|
||||
await new Promise((r) => setImmediate(r));
|
||||
|
||||
const second = makeReqRes({ header: key });
|
||||
const next2 = jest.fn() as unknown as NextFunction;
|
||||
await idempotencyMiddleware(second.req, second.res, next2);
|
||||
expect(next2).not.toHaveBeenCalled();
|
||||
expect(second.captured.status).toBe(201);
|
||||
expect(second.captured.body).toEqual({ plan_id: "p-1", created: true });
|
||||
expect(second.captured.headers["Idempotent-Replayed"]).toBe("true");
|
||||
});
|
||||
|
||||
it("rejects reuse with a different body as 422", async () => {
|
||||
const key = "ABC12345_test-key";
|
||||
|
||||
const first = makeReqRes({ header: key, body: { a: 1 } });
|
||||
const next1 = jest.fn() as unknown as NextFunction;
|
||||
await idempotencyMiddleware(first.req, first.res, next1);
|
||||
first.res.status(200);
|
||||
first.res.json({ ok: true });
|
||||
await new Promise((r) => setImmediate(r));
|
||||
|
||||
const second = makeReqRes({ header: key, body: { a: 2 } });
|
||||
const next2 = jest.fn() as unknown as NextFunction;
|
||||
await idempotencyMiddleware(second.req, second.res, next2);
|
||||
expect(next2).not.toHaveBeenCalled();
|
||||
expect(second.captured.status).toBe(422);
|
||||
expect((second.captured.body as { error: string }).error).toBe("idempotency_key_reused");
|
||||
});
|
||||
|
||||
it("does NOT cache non-2xx responses (retryable)", async () => {
|
||||
const key = "ABC12345_test-key";
|
||||
|
||||
const first = makeReqRes({ header: key });
|
||||
await idempotencyMiddleware(first.req, first.res, jest.fn() as unknown as NextFunction);
|
||||
first.res.status(500);
|
||||
first.res.json({ error: "boom" });
|
||||
await new Promise((r) => setImmediate(r));
|
||||
|
||||
// Retry should go through (no replay)
|
||||
const second = makeReqRes({ header: key });
|
||||
const next2 = jest.fn() as unknown as NextFunction;
|
||||
await idempotencyMiddleware(second.req, second.res, next2);
|
||||
expect(next2).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it("scopes by (method, path, key)", async () => {
|
||||
const key = "ABC12345_test-key";
|
||||
|
||||
const createPlan = makeReqRes({ header: key, path: "/api/plans" });
|
||||
await idempotencyMiddleware(createPlan.req, createPlan.res, jest.fn() as unknown as NextFunction);
|
||||
createPlan.res.status(201);
|
||||
createPlan.res.json({ plan_id: "p-1" });
|
||||
await new Promise((r) => setImmediate(r));
|
||||
|
||||
// Same key on a different path: should pass through, not replay
|
||||
const execute = makeReqRes({ header: key, path: "/api/plans/p-1/execute" });
|
||||
const nextExec = jest.fn() as unknown as NextFunction;
|
||||
await idempotencyMiddleware(execute.req, execute.res, nextExec);
|
||||
expect(nextExec).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
});
|
||||
Reference in New Issue
Block a user