PR F: Idempotency-Key + replay protection on POST /plans and /execute (#10)
Some checks failed
CI / Frontend Lint (push) Has been cancelled
CI / Frontend Type Check (push) Has been cancelled
CI / Frontend Build (push) Has been cancelled
CI / Frontend E2E Tests (push) Has been cancelled
CI / Orchestrator Build (push) Has been cancelled
CI / Contracts Compile (push) Has been cancelled
CI / Contracts Test (push) Has been cancelled
Security Scan / Dependency Vulnerability Scan (push) Has been cancelled
Security Scan / OWASP ZAP Scan (push) Has been cancelled
Some checks failed
CI / Frontend Lint (push) Has been cancelled
CI / Frontend Type Check (push) Has been cancelled
CI / Frontend Build (push) Has been cancelled
CI / Frontend E2E Tests (push) Has been cancelled
CI / Orchestrator Build (push) Has been cancelled
CI / Contracts Compile (push) Has been cancelled
CI / Contracts Test (push) Has been cancelled
Security Scan / Dependency Vulnerability Scan (push) Has been cancelled
Security Scan / OWASP ZAP Scan (push) Has been cancelled
This commit was merged in pull request #10.
This commit is contained in:
44
orchestrator/src/db/migrations/004_idempotency_keys.ts
Normal file
44
orchestrator/src/db/migrations/004_idempotency_keys.ts
Normal file
@@ -0,0 +1,44 @@
|
||||
import { query } from "../postgres";
|
||||
|
||||
/**
|
||||
* Migration 004 — idempotency keys + replay protection (arch §13,
|
||||
* §15: deterministic state transitions, idempotent event handling,
|
||||
* resilience to duplicate messages).
|
||||
*
|
||||
* A caller supplies an `Idempotency-Key` header on POST requests.
|
||||
* The server records `{ key, request_hash, response_body, status_code }`
|
||||
* on first success and replays the cached response on subsequent
|
||||
* requests with the same key. If the request body changes while the
|
||||
* key is reused the server returns 422 with `key_reused_with_different_payload`.
|
||||
*
|
||||
* Scoped by `(method, path, key)` so the same key can safely appear
|
||||
* across unrelated endpoints.
|
||||
*
|
||||
* Rows expire after 24h — enough to cover retry windows, short enough
|
||||
* to keep the table bounded.
|
||||
*/
|
||||
export async function up() {
|
||||
await query(
|
||||
`CREATE TABLE IF NOT EXISTS idempotency_keys (
|
||||
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
|
||||
method VARCHAR(8) NOT NULL,
|
||||
path VARCHAR(512) NOT NULL,
|
||||
key VARCHAR(255) NOT NULL,
|
||||
request_hash CHAR(64) NOT NULL,
|
||||
status_code INTEGER NOT NULL,
|
||||
response_body JSONB NOT NULL,
|
||||
created_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
||||
expires_at TIMESTAMPTZ NOT NULL DEFAULT (CURRENT_TIMESTAMP + INTERVAL '24 hours'),
|
||||
UNIQUE (method, path, key)
|
||||
)`,
|
||||
);
|
||||
|
||||
await query(
|
||||
`CREATE INDEX IF NOT EXISTS idx_idempotency_expires_at
|
||||
ON idempotency_keys(expires_at)`,
|
||||
);
|
||||
}
|
||||
|
||||
export async function down() {
|
||||
await query("DROP TABLE IF EXISTS idempotency_keys CASCADE");
|
||||
}
|
||||
@@ -1,6 +1,7 @@
|
||||
import { up as up001 } from "./001_initial_schema";
|
||||
import { up as up002 } from "./002_transaction_state";
|
||||
import { up as up003 } from "./003_events";
|
||||
import { up as up004 } from "./004_idempotency_keys";
|
||||
|
||||
/**
|
||||
* Run all migrations
|
||||
@@ -10,6 +11,7 @@ export async function runMigration() {
|
||||
await up001();
|
||||
await up002();
|
||||
await up003();
|
||||
await up004();
|
||||
console.log("All migrations completed");
|
||||
} catch (error) {
|
||||
console.error("Migration failed:", error);
|
||||
|
||||
@@ -9,6 +9,7 @@ import {
|
||||
requestId,
|
||||
apiKeyAuth,
|
||||
auditLog,
|
||||
idempotencyMiddleware,
|
||||
} from "./middleware";
|
||||
import { requestTimeout } from "./middleware/timeout";
|
||||
import { logger } from "./logging/logger";
|
||||
@@ -86,7 +87,7 @@ app.use("/api", apiLimiter);
|
||||
|
||||
// Plan management endpoints
|
||||
app.get("/api/plans", listPlansEndpoint);
|
||||
app.post("/api/plans", auditLog("CREATE_PLAN", "plan"), createPlan);
|
||||
app.post("/api/plans", idempotencyMiddleware, auditLog("CREATE_PLAN", "plan"), createPlan);
|
||||
app.get("/api/plans/:planId", getPlan);
|
||||
app.get("/api/plans/:planId/state", getPlanState);
|
||||
app.get("/api/plans/:planId/events", getPlanEvents);
|
||||
@@ -97,7 +98,7 @@ app.post("/api/plans/:planId/validate", validatePlanEndpoint);
|
||||
// Execution endpoints
|
||||
import { executePlan, getExecutionStatus, abortExecution } from "./api/execution";
|
||||
import { registerWebhook } from "./api/webhooks";
|
||||
app.post("/api/plans/:planId/execute", auditLog("EXECUTE_PLAN", "plan"), executePlan);
|
||||
app.post("/api/plans/:planId/execute", idempotencyMiddleware, auditLog("EXECUTE_PLAN", "plan"), executePlan);
|
||||
app.get("/api/plans/:planId/status", getExecutionStatus);
|
||||
app.post("/api/plans/:planId/abort", auditLog("ABORT_PLAN", "plan"), abortExecution);
|
||||
app.post("/api/webhooks", registerWebhook);
|
||||
|
||||
120
orchestrator/src/middleware/idempotency.ts
Normal file
120
orchestrator/src/middleware/idempotency.ts
Normal file
@@ -0,0 +1,120 @@
|
||||
/**
|
||||
* Idempotency-Key middleware (arch §13 security requirements,
|
||||
* §15 non-functional: idempotent event handling, replay protection).
|
||||
*
|
||||
* Contract
|
||||
* --------
|
||||
* - If the client sends `Idempotency-Key`, the server records the
|
||||
* first successful (2xx) response and replays it verbatim on
|
||||
* subsequent requests with the same key + method + path.
|
||||
* - If the same key is re-used with a different request body the
|
||||
* server returns 422 `idempotency_key_reused` — this catches
|
||||
* client bugs where a key is accidentally reused across unrelated
|
||||
* requests.
|
||||
* - Keys are scoped by `(method, path, key)` and expire after 24h.
|
||||
* - Responses are captured by shimming `res.json()` — no deep
|
||||
* integration with route handlers required.
|
||||
* - Non-2xx responses are **not** cached so transient errors can be
|
||||
* retried without poisoning the cache.
|
||||
*
|
||||
* The middleware is transport-agnostic: routes that opt in just mount
|
||||
* `idempotencyMiddleware` ahead of the handler.
|
||||
*/
|
||||
|
||||
import type { NextFunction, Request, Response } from "express";
|
||||
import { createHash } from "crypto";
|
||||
import { query } from "../db/postgres";
|
||||
import { logger } from "../logging/logger";
|
||||
|
||||
export const IDEMPOTENCY_HEADER = "idempotency-key";
|
||||
const KEY_PATTERN = /^[A-Za-z0-9_\-:.]{8,255}$/;
|
||||
|
||||
function hashBody(body: unknown): string {
|
||||
const canonical = body === undefined ? "" : JSON.stringify(body);
|
||||
return createHash("sha256").update(canonical).digest("hex");
|
||||
}
|
||||
|
||||
interface CachedRow {
|
||||
request_hash: string;
|
||||
status_code: number;
|
||||
response_body: unknown;
|
||||
}
|
||||
|
||||
export async function idempotencyMiddleware(
|
||||
req: Request,
|
||||
res: Response,
|
||||
next: NextFunction,
|
||||
): Promise<void> {
|
||||
const rawKey = req.header(IDEMPOTENCY_HEADER);
|
||||
if (!rawKey) {
|
||||
next();
|
||||
return;
|
||||
}
|
||||
if (!KEY_PATTERN.test(rawKey)) {
|
||||
res.status(400).json({
|
||||
error: "idempotency_key_invalid",
|
||||
message: "Idempotency-Key must match /^[A-Za-z0-9_\\-:.]{8,255}$/",
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
const key = rawKey;
|
||||
const method = req.method;
|
||||
const path = req.baseUrl + req.path;
|
||||
const requestHash = hashBody(req.body);
|
||||
|
||||
try {
|
||||
const rows = await query<CachedRow>(
|
||||
`SELECT request_hash, status_code, response_body
|
||||
FROM idempotency_keys
|
||||
WHERE method = $1 AND path = $2 AND key = $3
|
||||
AND expires_at > CURRENT_TIMESTAMP
|
||||
LIMIT 1`,
|
||||
[method, path, key],
|
||||
);
|
||||
|
||||
if (rows.length > 0) {
|
||||
const cached = rows[0];
|
||||
if (cached.request_hash !== requestHash) {
|
||||
res.status(422).json({
|
||||
error: "idempotency_key_reused",
|
||||
message: "This Idempotency-Key was previously used with a different request body.",
|
||||
});
|
||||
return;
|
||||
}
|
||||
res.setHeader("Idempotent-Replayed", "true");
|
||||
res.status(cached.status_code).json(cached.response_body);
|
||||
return;
|
||||
}
|
||||
} catch (err) {
|
||||
// Fail open: if the lookup fails we still process the request so
|
||||
// the caller never sees a hard 500 because the dedup table is
|
||||
// unavailable. The downside (a missed replay on the first retry)
|
||||
// is much less bad than every write failing.
|
||||
logger.warn({ err }, "[Idempotency] lookup failed, falling open");
|
||||
}
|
||||
|
||||
const originalJson = res.json.bind(res);
|
||||
res.json = (body: unknown): Response => {
|
||||
const statusCode = res.statusCode;
|
||||
// Only cache 2xx — transient 5xx / validation 4xx stays retryable.
|
||||
if (statusCode >= 200 && statusCode < 300) {
|
||||
// Fire-and-forget; response is already known and can be sent.
|
||||
query(
|
||||
`INSERT INTO idempotency_keys
|
||||
(method, path, key, request_hash, status_code, response_body)
|
||||
VALUES ($1, $2, $3, $4, $5, $6::jsonb)
|
||||
ON CONFLICT (method, path, key) DO NOTHING`,
|
||||
[method, path, key, requestHash, statusCode, JSON.stringify(body)],
|
||||
).catch((err) => {
|
||||
logger.warn({ err, key, method, path }, "[Idempotency] write failed");
|
||||
});
|
||||
}
|
||||
return originalJson(body);
|
||||
};
|
||||
|
||||
next();
|
||||
}
|
||||
|
||||
/** exposed for tests */
|
||||
export const __testing = { hashBody, KEY_PATTERN };
|
||||
@@ -5,4 +5,5 @@ export { validate, sanitizeInput } from "./validation";
|
||||
export { ipWhitelist, getClientIP } from "./ipWhitelist";
|
||||
export { auditLog } from "./auditLog";
|
||||
export { sessionManager } from "./session";
|
||||
export { idempotencyMiddleware, IDEMPOTENCY_HEADER } from "./idempotency";
|
||||
|
||||
|
||||
Reference in New Issue
Block a user