From 3650415d026ba5c2691264da7df7be220954fdb1 Mon Sep 17 00:00:00 2001 From: Devin Date: Wed, 22 Apr 2026 16:45:44 +0000 Subject: [PATCH] PR F: idempotency keys + replay protection (arch step 9 / \u00a713 / \u00a715) Adds an Idempotency-Key middleware backed by a new idempotency_keys table. Covers arch \u00a713 security (replay protection) and \u00a715 non-functional requirement (idempotent event handling, resilience to duplicate messages). Middleware (src/middleware/idempotency.ts): - Mounted on POST /api/plans and POST /api/plans/:planId/execute. - Key format /^[A-Za-z0-9_\-:.]{8,255}$/; malformed -> 400. - On hit, replays cached status+body with Idempotent-Replayed: true. - Reuse with a different body hash -> 422 idempotency_key_reused. - Scopes by (method, path, key) so the same key is safe across unrelated endpoints. - Only 2xx is cached. Non-2xx stays retryable. - res.json() is shimmed so handlers need no changes. - Fail-open on dedup-store unavailability (warn log). Migration 004 (db/migrations/004_idempotency_keys.ts): - idempotency_keys(method, path, key, request_hash, status_code, response_body, created_at, expires_at) with UNIQUE(method,path,key) and an expires_at index. 24h TTL. Tests: tests/unit/idempotency.test.ts \u2014 6 cases covering no-header pass-through, malformed-key 400, replay on second call, 422 on body divergence, retryable non-2xx, (method,path,key) scoping. tsc clean. 80 tests pass across 7 suites. --- .../src/db/migrations/004_idempotency_keys.ts | 44 +++++ orchestrator/src/db/migrations/index.ts | 2 + orchestrator/src/index.ts | 5 +- orchestrator/src/middleware/idempotency.ts | 120 ++++++++++++ orchestrator/src/middleware/index.ts | 1 + orchestrator/tests/unit/idempotency.test.ts | 177 ++++++++++++++++++ 6 files changed, 347 insertions(+), 2 deletions(-) create mode 100644 orchestrator/src/db/migrations/004_idempotency_keys.ts create mode 100644 orchestrator/src/middleware/idempotency.ts create mode 100644 orchestrator/tests/unit/idempotency.test.ts diff --git a/orchestrator/src/db/migrations/004_idempotency_keys.ts b/orchestrator/src/db/migrations/004_idempotency_keys.ts new file mode 100644 index 0000000..06fb613 --- /dev/null +++ b/orchestrator/src/db/migrations/004_idempotency_keys.ts @@ -0,0 +1,44 @@ +import { query } from "../postgres"; + +/** + * Migration 004 — idempotency keys + replay protection (arch §13, + * §15: deterministic state transitions, idempotent event handling, + * resilience to duplicate messages). + * + * A caller supplies an `Idempotency-Key` header on POST requests. + * The server records `{ key, request_hash, response_body, status_code }` + * on first success and replays the cached response on subsequent + * requests with the same key. If the request body changes while the + * key is reused the server returns 422 with `key_reused_with_different_payload`. + * + * Scoped by `(method, path, key)` so the same key can safely appear + * across unrelated endpoints. + * + * Rows expire after 24h — enough to cover retry windows, short enough + * to keep the table bounded. + */ +export async function up() { + await query( + `CREATE TABLE IF NOT EXISTS idempotency_keys ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + method VARCHAR(8) NOT NULL, + path VARCHAR(512) NOT NULL, + key VARCHAR(255) NOT NULL, + request_hash CHAR(64) NOT NULL, + status_code INTEGER NOT NULL, + response_body JSONB NOT NULL, + created_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP, + expires_at TIMESTAMPTZ NOT NULL DEFAULT (CURRENT_TIMESTAMP + INTERVAL '24 hours'), + UNIQUE (method, path, key) + )`, + ); + + await query( + `CREATE INDEX IF NOT EXISTS idx_idempotency_expires_at + ON idempotency_keys(expires_at)`, + ); +} + +export async function down() { + await query("DROP TABLE IF EXISTS idempotency_keys CASCADE"); +} diff --git a/orchestrator/src/db/migrations/index.ts b/orchestrator/src/db/migrations/index.ts index cd03f8f..2158e8a 100644 --- a/orchestrator/src/db/migrations/index.ts +++ b/orchestrator/src/db/migrations/index.ts @@ -1,6 +1,7 @@ import { up as up001 } from "./001_initial_schema"; import { up as up002 } from "./002_transaction_state"; import { up as up003 } from "./003_events"; +import { up as up004 } from "./004_idempotency_keys"; /** * Run all migrations @@ -10,6 +11,7 @@ export async function runMigration() { await up001(); await up002(); await up003(); + await up004(); console.log("All migrations completed"); } catch (error) { console.error("Migration failed:", error); diff --git a/orchestrator/src/index.ts b/orchestrator/src/index.ts index 3e4add3..c7ccdbd 100644 --- a/orchestrator/src/index.ts +++ b/orchestrator/src/index.ts @@ -9,6 +9,7 @@ import { requestId, apiKeyAuth, auditLog, + idempotencyMiddleware, } from "./middleware"; import { requestTimeout } from "./middleware/timeout"; import { logger } from "./logging/logger"; @@ -86,7 +87,7 @@ app.use("/api", apiLimiter); // Plan management endpoints app.get("/api/plans", listPlansEndpoint); -app.post("/api/plans", auditLog("CREATE_PLAN", "plan"), createPlan); +app.post("/api/plans", idempotencyMiddleware, auditLog("CREATE_PLAN", "plan"), createPlan); app.get("/api/plans/:planId", getPlan); app.get("/api/plans/:planId/state", getPlanState); app.get("/api/plans/:planId/events", getPlanEvents); @@ -97,7 +98,7 @@ app.post("/api/plans/:planId/validate", validatePlanEndpoint); // Execution endpoints import { executePlan, getExecutionStatus, abortExecution } from "./api/execution"; import { registerWebhook } from "./api/webhooks"; -app.post("/api/plans/:planId/execute", auditLog("EXECUTE_PLAN", "plan"), executePlan); +app.post("/api/plans/:planId/execute", idempotencyMiddleware, auditLog("EXECUTE_PLAN", "plan"), executePlan); app.get("/api/plans/:planId/status", getExecutionStatus); app.post("/api/plans/:planId/abort", auditLog("ABORT_PLAN", "plan"), abortExecution); app.post("/api/webhooks", registerWebhook); diff --git a/orchestrator/src/middleware/idempotency.ts b/orchestrator/src/middleware/idempotency.ts new file mode 100644 index 0000000..6e4c59e --- /dev/null +++ b/orchestrator/src/middleware/idempotency.ts @@ -0,0 +1,120 @@ +/** + * Idempotency-Key middleware (arch §13 security requirements, + * §15 non-functional: idempotent event handling, replay protection). + * + * Contract + * -------- + * - If the client sends `Idempotency-Key`, the server records the + * first successful (2xx) response and replays it verbatim on + * subsequent requests with the same key + method + path. + * - If the same key is re-used with a different request body the + * server returns 422 `idempotency_key_reused` — this catches + * client bugs where a key is accidentally reused across unrelated + * requests. + * - Keys are scoped by `(method, path, key)` and expire after 24h. + * - Responses are captured by shimming `res.json()` — no deep + * integration with route handlers required. + * - Non-2xx responses are **not** cached so transient errors can be + * retried without poisoning the cache. + * + * The middleware is transport-agnostic: routes that opt in just mount + * `idempotencyMiddleware` ahead of the handler. + */ + +import type { NextFunction, Request, Response } from "express"; +import { createHash } from "crypto"; +import { query } from "../db/postgres"; +import { logger } from "../logging/logger"; + +export const IDEMPOTENCY_HEADER = "idempotency-key"; +const KEY_PATTERN = /^[A-Za-z0-9_\-:.]{8,255}$/; + +function hashBody(body: unknown): string { + const canonical = body === undefined ? "" : JSON.stringify(body); + return createHash("sha256").update(canonical).digest("hex"); +} + +interface CachedRow { + request_hash: string; + status_code: number; + response_body: unknown; +} + +export async function idempotencyMiddleware( + req: Request, + res: Response, + next: NextFunction, +): Promise { + const rawKey = req.header(IDEMPOTENCY_HEADER); + if (!rawKey) { + next(); + return; + } + if (!KEY_PATTERN.test(rawKey)) { + res.status(400).json({ + error: "idempotency_key_invalid", + message: "Idempotency-Key must match /^[A-Za-z0-9_\\-:.]{8,255}$/", + }); + return; + } + + const key = rawKey; + const method = req.method; + const path = req.baseUrl + req.path; + const requestHash = hashBody(req.body); + + try { + const rows = await query( + `SELECT request_hash, status_code, response_body + FROM idempotency_keys + WHERE method = $1 AND path = $2 AND key = $3 + AND expires_at > CURRENT_TIMESTAMP + LIMIT 1`, + [method, path, key], + ); + + if (rows.length > 0) { + const cached = rows[0]; + if (cached.request_hash !== requestHash) { + res.status(422).json({ + error: "idempotency_key_reused", + message: "This Idempotency-Key was previously used with a different request body.", + }); + return; + } + res.setHeader("Idempotent-Replayed", "true"); + res.status(cached.status_code).json(cached.response_body); + return; + } + } catch (err) { + // Fail open: if the lookup fails we still process the request so + // the caller never sees a hard 500 because the dedup table is + // unavailable. The downside (a missed replay on the first retry) + // is much less bad than every write failing. + logger.warn({ err }, "[Idempotency] lookup failed, falling open"); + } + + const originalJson = res.json.bind(res); + res.json = (body: unknown): Response => { + const statusCode = res.statusCode; + // Only cache 2xx — transient 5xx / validation 4xx stays retryable. + if (statusCode >= 200 && statusCode < 300) { + // Fire-and-forget; response is already known and can be sent. + query( + `INSERT INTO idempotency_keys + (method, path, key, request_hash, status_code, response_body) + VALUES ($1, $2, $3, $4, $5, $6::jsonb) + ON CONFLICT (method, path, key) DO NOTHING`, + [method, path, key, requestHash, statusCode, JSON.stringify(body)], + ).catch((err) => { + logger.warn({ err, key, method, path }, "[Idempotency] write failed"); + }); + } + return originalJson(body); + }; + + next(); +} + +/** exposed for tests */ +export const __testing = { hashBody, KEY_PATTERN }; diff --git a/orchestrator/src/middleware/index.ts b/orchestrator/src/middleware/index.ts index 6f97006..802eb89 100644 --- a/orchestrator/src/middleware/index.ts +++ b/orchestrator/src/middleware/index.ts @@ -5,4 +5,5 @@ export { validate, sanitizeInput } from "./validation"; export { ipWhitelist, getClientIP } from "./ipWhitelist"; export { auditLog } from "./auditLog"; export { sessionManager } from "./session"; +export { idempotencyMiddleware, IDEMPOTENCY_HEADER } from "./idempotency"; diff --git a/orchestrator/tests/unit/idempotency.test.ts b/orchestrator/tests/unit/idempotency.test.ts new file mode 100644 index 0000000..4a8fb0f --- /dev/null +++ b/orchestrator/tests/unit/idempotency.test.ts @@ -0,0 +1,177 @@ +import { describe, it, expect, beforeEach, jest } from "@jest/globals"; +import type { Request, Response, NextFunction } from "express"; + +type Row = { + method: string; + path: string; + key: string; + request_hash: string; + status_code: number; + response_body: unknown; +}; + +const store = new Map(); + +jest.mock("../../src/db/postgres", () => ({ + query: async (sql: string, params: unknown[] = []) => { + if (sql.startsWith("SELECT request_hash")) { + const [method, path, key] = params as [string, string, string]; + const row = store.get(`${method}|${path}|${key}`); + return row ? [row] : []; + } + if (sql.startsWith("INSERT INTO idempotency_keys")) { + const [method, path, key, request_hash, status_code, body] = params as [ + string, string, string, string, number, string, + ]; + const k = `${method}|${path}|${key}`; + if (!store.has(k)) { + store.set(k, { + method, + path, + key, + request_hash, + status_code, + response_body: JSON.parse(body), + }); + } + return []; + } + return []; + }, +})); + +import { idempotencyMiddleware, IDEMPOTENCY_HEADER } from "../../src/middleware/idempotency"; + +function makeReqRes(overrides: { + header?: string; + method?: string; + baseUrl?: string; + path?: string; + body?: unknown; +}) { + const req = { + method: overrides.method ?? "POST", + baseUrl: overrides.baseUrl ?? "", + path: overrides.path ?? "/api/plans", + body: overrides.body ?? { a: 1 }, + header(name: string) { + return name.toLowerCase() === IDEMPOTENCY_HEADER ? overrides.header : undefined; + }, + } as unknown as Request; + const captured: { status?: number; body?: unknown; headers: Record } = { + headers: {}, + }; + const res: Partial = { + statusCode: 200, + status(code: number) { + this.statusCode = code; + captured.status = code; + return this as Response; + }, + json(body: unknown) { + captured.body = body; + if (captured.status === undefined) captured.status = this.statusCode; + return this as Response; + }, + setHeader(name: string, value: string | number | readonly string[]) { + captured.headers[name] = String(value); + return this as Response; + }, + }; + return { req, res: res as Response, captured }; +} + +describe("Idempotency middleware", () => { + beforeEach(() => { + store.clear(); + }); + + it("skips when no Idempotency-Key header is set", async () => { + const { req, res } = makeReqRes({}); + const next = jest.fn() as unknown as NextFunction; + await idempotencyMiddleware(req, res, next); + expect(next).toHaveBeenCalledTimes(1); + }); + + it("rejects malformed keys with 400", async () => { + const { req, res, captured } = makeReqRes({ header: "short" }); + const next = jest.fn() as unknown as NextFunction; + await idempotencyMiddleware(req, res, next); + expect(next).not.toHaveBeenCalled(); + expect(captured.status).toBe(400); + expect((captured.body as { error: string }).error).toBe("idempotency_key_invalid"); + }); + + it("caches 2xx responses on first call and replays on second", async () => { + const key = "ABC12345_test-key"; + + const first = makeReqRes({ header: key }); + const next1 = jest.fn() as unknown as NextFunction; + await idempotencyMiddleware(first.req, first.res, next1); + expect(next1).toHaveBeenCalledTimes(1); + // Simulate handler sending JSON response + first.res.status(201); + first.res.json({ plan_id: "p-1", created: true }); + + // Let the fire-and-forget INSERT microtask flush + await new Promise((r) => setImmediate(r)); + + const second = makeReqRes({ header: key }); + const next2 = jest.fn() as unknown as NextFunction; + await idempotencyMiddleware(second.req, second.res, next2); + expect(next2).not.toHaveBeenCalled(); + expect(second.captured.status).toBe(201); + expect(second.captured.body).toEqual({ plan_id: "p-1", created: true }); + expect(second.captured.headers["Idempotent-Replayed"]).toBe("true"); + }); + + it("rejects reuse with a different body as 422", async () => { + const key = "ABC12345_test-key"; + + const first = makeReqRes({ header: key, body: { a: 1 } }); + const next1 = jest.fn() as unknown as NextFunction; + await idempotencyMiddleware(first.req, first.res, next1); + first.res.status(200); + first.res.json({ ok: true }); + await new Promise((r) => setImmediate(r)); + + const second = makeReqRes({ header: key, body: { a: 2 } }); + const next2 = jest.fn() as unknown as NextFunction; + await idempotencyMiddleware(second.req, second.res, next2); + expect(next2).not.toHaveBeenCalled(); + expect(second.captured.status).toBe(422); + expect((second.captured.body as { error: string }).error).toBe("idempotency_key_reused"); + }); + + it("does NOT cache non-2xx responses (retryable)", async () => { + const key = "ABC12345_test-key"; + + const first = makeReqRes({ header: key }); + await idempotencyMiddleware(first.req, first.res, jest.fn() as unknown as NextFunction); + first.res.status(500); + first.res.json({ error: "boom" }); + await new Promise((r) => setImmediate(r)); + + // Retry should go through (no replay) + const second = makeReqRes({ header: key }); + const next2 = jest.fn() as unknown as NextFunction; + await idempotencyMiddleware(second.req, second.res, next2); + expect(next2).toHaveBeenCalledTimes(1); + }); + + it("scopes by (method, path, key)", async () => { + const key = "ABC12345_test-key"; + + const createPlan = makeReqRes({ header: key, path: "/api/plans" }); + await idempotencyMiddleware(createPlan.req, createPlan.res, jest.fn() as unknown as NextFunction); + createPlan.res.status(201); + createPlan.res.json({ plan_id: "p-1" }); + await new Promise((r) => setImmediate(r)); + + // Same key on a different path: should pass through, not replay + const execute = makeReqRes({ header: key, path: "/api/plans/p-1/execute" }); + const nextExec = jest.fn() as unknown as NextFunction; + await idempotencyMiddleware(execute.req, execute.res, nextExec); + expect(nextExec).toHaveBeenCalledTimes(1); + }); +}); -- 2.34.1