Add ECDSA signature verification and enhance ComboHandler functionality

- Integrated ECDSA for signature verification in ComboHandler.
- Updated event emissions to include additional parameters for better tracking.
- Improved gas tracking during execution of combo plans.
- Enhanced database interactions for storing and retrieving plans, including conflict resolution and status updates.
- Added new dependencies for security and database management in orchestrator.
This commit is contained in:
defiQUG
2025-11-05 16:28:48 -08:00
parent 3b09c35c47
commit f600b7b15e
48 changed files with 3381 additions and 46 deletions

39
Dockerfile Normal file
View File

@@ -0,0 +1,39 @@
# Multi-stage Dockerfile for orchestrator service
FROM node:18-alpine AS builder
WORKDIR /app
# Copy package files
COPY orchestrator/package*.json ./
RUN npm ci
# Copy source
COPY orchestrator/ ./
# Build
RUN npm run build
# Production stage
FROM node:18-alpine
WORKDIR /app
# Copy package files
COPY orchestrator/package*.json ./
# Install production dependencies only
RUN npm ci --only=production
# Copy built files
COPY --from=builder /app/dist ./dist
# Expose port
EXPOSE 8080
# Health check
HEALTHCHECK --interval=30s --timeout=3s --start-period=5s --retries=3 \
CMD node -e "require('http').get('http://localhost:8080/health', (r) => {process.exit(r.statusCode === 200 ? 0 : 1)})"
# Start application
CMD ["node", "dist/index.js"]

View File

@@ -3,6 +3,7 @@ pragma solidity ^0.8.20;
import "@openzeppelin/contracts/access/Ownable.sol";
import "@openzeppelin/contracts/security/ReentrancyGuard.sol";
import "@openzeppelin/contracts/utils/cryptography/ECDSA.sol";
import "./interfaces/IComboHandler.sol";
import "./interfaces/IAdapterRegistry.sol";
import "./interfaces/INotaryRegistry.sol";
@@ -10,10 +11,13 @@ import "./interfaces/INotaryRegistry.sol";
/**
* @title ComboHandler
* @notice Aggregates multiple DeFi protocol calls and DLT operations into atomic transactions
* @dev Implements 2PC pattern and proper signature verification
*/
contract ComboHandler is IComboHandler, Ownable, ReentrancyGuard {
IAdapterRegistry public adapterRegistry;
INotaryRegistry public notaryRegistry;
using ECDSA for bytes32;
IAdapterRegistry public immutable adapterRegistry;
INotaryRegistry public immutable notaryRegistry;
mapping(bytes32 => ExecutionState) public executions;
@@ -22,20 +26,28 @@ contract ComboHandler is IComboHandler, Ownable, ReentrancyGuard {
uint256 currentStep;
Step[] steps;
bool prepared;
address creator;
}
event PlanExecuted(bytes32 indexed planId, bool success);
event PlanPrepared(bytes32 indexed planId);
event PlanExecuted(bytes32 indexed planId, bool success, uint256 gasUsed);
event PlanPrepared(bytes32 indexed planId, address indexed creator);
event PlanCommitted(bytes32 indexed planId);
event PlanAborted(bytes32 indexed planId);
event PlanAborted(bytes32 indexed planId, string reason);
constructor(address _adapterRegistry, address _notaryRegistry) {
require(_adapterRegistry != address(0), "Invalid adapter registry");
require(_notaryRegistry != address(0), "Invalid notary registry");
adapterRegistry = IAdapterRegistry(_adapterRegistry);
notaryRegistry = INotaryRegistry(_notaryRegistry);
}
/**
* @notice Execute a multi-step combo plan atomically
* @param planId Unique identifier for the execution plan
* @param steps Array of step configurations
* @param signature User's cryptographic signature on the plan
* @return success Whether execution completed successfully
* @return receipts Array of transaction receipts for each step
*/
function executeCombo(
bytes32 planId,
@@ -43,35 +55,44 @@ contract ComboHandler is IComboHandler, Ownable, ReentrancyGuard {
bytes calldata signature
) external override nonReentrant returns (bool success, StepReceipt[] memory receipts) {
require(executions[planId].status == ExecutionStatus.PENDING, "Plan already executed");
require(steps.length > 0, "Plan must have at least one step");
// Verify signature
require(_verifySignature(planId, signature, msg.sender), "Invalid signature");
// Verify signature using ECDSA
bytes32 messageHash = keccak256(abi.encodePacked(planId, steps, msg.sender));
bytes32 ethSignedMessageHash = messageHash.toEthSignedMessageHash();
address signer = ethSignedMessageHash.recover(signature);
require(signer == msg.sender, "Invalid signature");
// Register with notary
notaryRegistry.registerPlan(planId, steps, msg.sender);
uint256 gasStart = gasleft();
executions[planId] = ExecutionState({
status: ExecutionStatus.IN_PROGRESS,
currentStep: 0,
steps: steps,
prepared: false
prepared: false,
creator: msg.sender
});
receipts = new StepReceipt[](steps.length);
// Execute steps sequentially
for (uint256 i = 0; i < steps.length; i++) {
uint256 stepGasStart = gasleft();
(bool stepSuccess, bytes memory returnData, uint256 gasUsed) = _executeStep(steps[i], i);
receipts[i] = StepReceipt({
stepIndex: i,
success: stepSuccess,
returnData: returnData,
gasUsed: gasUsed
gasUsed: stepGasStart - gasleft()
});
if (!stepSuccess) {
executions[planId].status = ExecutionStatus.FAILED;
notaryRegistry.finalizePlan(planId, false);
revert("Step execution failed");
}
}
@@ -79,7 +100,8 @@ contract ComboHandler is IComboHandler, Ownable, ReentrancyGuard {
executions[planId].status = ExecutionStatus.COMPLETE;
success = true;
emit PlanExecuted(planId, true);
uint256 totalGasUsed = gasStart - gasleft();
emit PlanExecuted(planId, true, totalGasUsed);
// Finalize with notary
notaryRegistry.finalizePlan(planId, true);
@@ -87,12 +109,16 @@ contract ComboHandler is IComboHandler, Ownable, ReentrancyGuard {
/**
* @notice Prepare phase for 2PC (two-phase commit)
* @param planId Plan identifier
* @param steps Execution steps
* @return prepared Whether all steps are prepared
*/
function prepare(
bytes32 planId,
Step[] calldata steps
) external override returns (bool prepared) {
require(executions[planId].status == ExecutionStatus.PENDING, "Plan not pending");
require(steps.length > 0, "Plan must have at least one step");
// Validate all steps can be prepared
for (uint256 i = 0; i < steps.length; i++) {
@@ -103,15 +129,18 @@ contract ComboHandler is IComboHandler, Ownable, ReentrancyGuard {
status: ExecutionStatus.IN_PROGRESS,
currentStep: 0,
steps: steps,
prepared: true
prepared: true,
creator: msg.sender
});
emit PlanPrepared(planId);
emit PlanPrepared(planId, msg.sender);
prepared = true;
}
/**
* @notice Commit phase for 2PC
* @param planId Plan identifier
* @return committed Whether commit was successful
*/
function commit(bytes32 planId) external override returns (bool committed) {
ExecutionState storage state = executions[planId];
@@ -134,6 +163,7 @@ contract ComboHandler is IComboHandler, Ownable, ReentrancyGuard {
/**
* @notice Abort phase for 2PC (rollback)
* @param planId Plan identifier
*/
function abort(bytes32 planId) external override {
ExecutionState storage state = executions[planId];
@@ -144,7 +174,7 @@ contract ComboHandler is IComboHandler, Ownable, ReentrancyGuard {
state.status = ExecutionStatus.ABORTED;
emit PlanAborted(planId);
emit PlanAborted(planId, "User aborted");
notaryRegistry.finalizePlan(planId, false);
}
@@ -158,6 +188,7 @@ contract ComboHandler is IComboHandler, Ownable, ReentrancyGuard {
/**
* @notice Execute a single step
* @dev Internal function with gas tracking
*/
function _executeStep(Step memory step, uint256 stepIndex) internal returns (bool success, bytes memory returnData, uint256 gasUsed) {
// Verify adapter is whitelisted
@@ -165,11 +196,21 @@ contract ComboHandler is IComboHandler, Ownable, ReentrancyGuard {
uint256 gasBefore = gasleft();
// Check gas limit
require(gasleft() > 100000, "Insufficient gas");
(success, returnData) = step.target.call{value: step.value}(
abi.encodeWithSignature("executeStep(bytes)", step.data)
);
gasUsed = gasBefore - gasleft();
// Emit event for step execution
if (success) {
// Log successful step
} else {
// Log failed step with return data
}
}
/**
@@ -184,19 +225,10 @@ contract ComboHandler is IComboHandler, Ownable, ReentrancyGuard {
* @notice Rollback steps on abort
*/
function _rollbackSteps(bytes32 planId) internal {
ExecutionState storage state = executions[planId];
// Release reserved funds, unlock collateral, etc.
// Implementation depends on specific step types
}
/**
* @notice Verify user signature on plan
*/
function _verifySignature(bytes32 planId, bytes calldata signature, address signer) internal pure returns (bool) {
// Simplified signature verification
// In production, use ECDSA.recover or similar
bytes32 messageHash = keccak256(abi.encodePacked(planId, signer));
// Verify signature matches signer
return true; // Simplified for now
// For now, just mark as aborted
}
}

View File

@@ -0,0 +1,129 @@
// SPDX-License-Identifier: MIT
pragma solidity ^0.8.20;
/**
* @title MultiSigWallet
* @notice Multi-signature wallet for admin functions
* @dev Requires multiple signatures for critical operations
*/
contract MultiSigWallet {
address[] public owners;
uint256 public required;
mapping(bytes32 => bool) public executed;
event Deposit(address indexed sender, uint256 amount);
event SubmitTransaction(uint256 indexed txIndex, address indexed owner, address indexed to, uint256 value, bytes data);
event ConfirmTransaction(uint256 indexed txIndex, address indexed owner);
event RevokeConfirmation(uint256 indexed txIndex, address indexed owner);
event ExecuteTransaction(uint256 indexed txIndex, address indexed owner);
modifier onlyOwner() {
require(isOwner(msg.sender), "Not owner");
_;
}
modifier txExists(uint256 _txIndex) {
require(_txIndex < transactions.length, "Transaction does not exist");
_;
}
modifier notExecuted(uint256 _txIndex) {
require(!transactions[_txIndex].executed, "Transaction already executed");
_;
}
modifier notConfirmed(uint256 _txIndex) {
require(!confirmations[_txIndex][msg.sender], "Transaction already confirmed");
_;
}
struct Transaction {
address to;
uint256 value;
bytes data;
bool executed;
}
Transaction[] public transactions;
mapping(uint256 => mapping(address => bool)) public confirmations;
constructor(address[] memory _owners, uint256 _required) {
require(_owners.length > 0, "Owners required");
require(_required > 0 && _required <= _owners.length, "Invalid required");
owners = _owners;
required = _required;
}
receive() external payable {
emit Deposit(msg.sender, msg.value);
}
function isOwner(address addr) public view returns (bool) {
for (uint256 i = 0; i < owners.length; i++) {
if (owners[i] == addr) return true;
}
return false;
}
function submitTransaction(address _to, uint256 _value, bytes memory _data) public onlyOwner returns (uint256) {
uint256 txIndex = transactions.length;
transactions.push(Transaction({
to: _to,
value: _value,
data: _data,
executed: false
}));
emit SubmitTransaction(txIndex, msg.sender, _to, _value, _data);
confirmTransaction(txIndex);
return txIndex;
}
function confirmTransaction(uint256 _txIndex) public onlyOwner txExists(_txIndex) notExecuted(_txIndex) notConfirmed(_txIndex) {
confirmations[_txIndex][msg.sender] = true;
emit ConfirmTransaction(_txIndex, msg.sender);
if (isConfirmed(_txIndex)) {
executeTransaction(_txIndex);
}
}
function revokeConfirmation(uint256 _txIndex) public onlyOwner txExists(_txIndex) notExecuted(_txIndex) {
require(confirmations[_txIndex][msg.sender], "Transaction not confirmed");
confirmations[_txIndex][msg.sender] = false;
emit RevokeConfirmation(_txIndex, msg.sender);
}
function executeTransaction(uint256 _txIndex) public txExists(_txIndex) notExecuted(_txIndex) {
require(isConfirmed(_txIndex), "Transaction not confirmed");
Transaction storage transaction = transactions[_txIndex];
transaction.executed = true;
(bool success, ) = transaction.to.call{value: transaction.value}(transaction.data);
require(success, "Transaction execution failed");
emit ExecuteTransaction(_txIndex, msg.sender);
}
function isConfirmed(uint256 _txIndex) public view returns (bool) {
uint256 count = 0;
for (uint256 i = 0; i < owners.length; i++) {
if (confirmations[_txIndex][owners[i]]) count++;
if (count == required) return true;
}
return false;
}
function getTransactionCount() public view returns (uint256) {
return transactions.length;
}
function getOwners() public view returns (address[] memory) {
return owners;
}
}

View File

@@ -0,0 +1,18 @@
// SPDX-License-Identifier: MIT
pragma solidity ^0.8.20;
import "@openzeppelin/contracts/governance/TimelockController.sol";
/**
* @title TimelockController
* @notice Time-lock for critical operations (upgrades, admin functions)
* @dev Uses OpenZeppelin's TimelockController
*/
contract ComboTimelock is TimelockController {
constructor(
uint256 minDelay,
address[] memory proposers,
address[] memory executors
) TimelockController(minDelay, proposers, executors) {}
}

View File

@@ -0,0 +1,40 @@
import { HardhatRuntimeEnvironment } from "hardhat/types";
export default async function deploy(hre: HardhatRuntimeEnvironment) {
const { ethers, deployments, getNamedAccounts } = hre;
const { deploy } = deployments;
const { deployer } = await getNamedAccounts();
// Deploy AdapterRegistry
const adapterRegistry = await deploy("AdapterRegistry", {
from: deployer,
args: [],
log: true,
});
// Deploy NotaryRegistry
const notaryRegistry = await deploy("NotaryRegistry", {
from: deployer,
args: [],
log: true,
});
// Deploy ComboHandler
const comboHandler = await deploy("ComboHandler", {
from: deployer,
args: [adapterRegistry.address, notaryRegistry.address],
log: true,
});
console.log("✅ Contracts deployed:");
console.log(` AdapterRegistry: ${adapterRegistry.address}`);
console.log(` NotaryRegistry: ${notaryRegistry.address}`);
console.log(` ComboHandler: ${comboHandler.address}`);
return {
adapterRegistry: adapterRegistry.address,
notaryRegistry: notaryRegistry.address,
comboHandler: comboHandler.address,
};
}

73
docker-compose.yml Normal file
View File

@@ -0,0 +1,73 @@
version: '3.8'
services:
# PostgreSQL database
postgres:
image: postgres:15-alpine
environment:
POSTGRES_DB: comboflow
POSTGRES_USER: comboflow
POSTGRES_PASSWORD: comboflow
ports:
- "5432:5432"
volumes:
- postgres_data:/var/lib/postgresql/data
healthcheck:
test: ["CMD-SHELL", "pg_isready -U comboflow"]
interval: 10s
timeout: 5s
retries: 5
# Redis cache
redis:
image: redis:7-alpine
ports:
- "6379:6379"
volumes:
- redis_data:/data
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 10s
timeout: 3s
retries: 5
# Orchestrator service
orchestrator:
build:
context: .
dockerfile: Dockerfile
ports:
- "8080:8080"
environment:
NODE_ENV: production
PORT: 8080
DATABASE_URL: postgresql://comboflow:comboflow@postgres:5432/comboflow
REDIS_URL: redis://redis:6379
depends_on:
postgres:
condition: service_healthy
redis:
condition: service_healthy
healthcheck:
test: ["CMD", "wget", "--quiet", "--tries=1", "--spider", "http://localhost:8080/health"]
interval: 30s
timeout: 10s
retries: 3
# Frontend
webapp:
build:
context: ./webapp
dockerfile: Dockerfile
ports:
- "3000:3000"
environment:
NODE_ENV: production
NEXT_PUBLIC_ORCH_URL: http://orchestrator:8080
depends_on:
- orchestrator
volumes:
postgres_data:
redis_data:

151
docs/DEPLOYMENT_RUNBOOK.md Normal file
View File

@@ -0,0 +1,151 @@
# Deployment Runbook
## Overview
This document provides step-by-step procedures for deploying the ISO-20022 Combo Flow system to production.
---
## Prerequisites
- Docker and Docker Compose installed
- Kubernetes cluster (for production)
- PostgreSQL database
- Redis instance
- Domain name and SSL certificates
- Environment variables configured
---
## Local Development Deployment
### Using Docker Compose
```bash
# Start all services
docker-compose up -d
# View logs
docker-compose logs -f
# Stop services
docker-compose down
```
### Manual Setup
1. **Database Setup**
```bash
cd orchestrator
npm install
npm run migrate
```
2. **Start Orchestrator**
```bash
cd orchestrator
npm run dev
```
3. **Start Frontend**
```bash
cd webapp
npm install
npm run dev
```
---
## Production Deployment
### Step 1: Database Migration
```bash
# Connect to production database
export DATABASE_URL="postgresql://user:pass@db-host:5432/comboflow"
# Run migrations
cd orchestrator
npm run migrate
```
### Step 2: Build Docker Images
```bash
# Build orchestrator
docker build -t orchestrator:latest -f Dockerfile .
# Build webapp
docker build -t webapp:latest -f webapp/Dockerfile ./webapp
```
### Step 3: Deploy to Kubernetes
```bash
# Apply configurations
kubectl apply -f k8s/deployment.yaml
kubectl apply -f k8s/webapp-deployment.yaml
# Check status
kubectl get pods
kubectl get services
```
### Step 4: Verify Deployment
```bash
# Check health endpoints
curl https://api.example.com/health
curl https://api.example.com/ready
curl https://api.example.com/metrics
```
---
## Rollback Procedure
### Quick Rollback
```bash
# Rollback to previous deployment
kubectl rollout undo deployment/orchestrator
kubectl rollout undo deployment/webapp
```
### Database Rollback
```bash
# Restore from backup
pg_restore -d comboflow backup.dump
```
---
## Monitoring
- Health checks: `/health`, `/ready`, `/live`
- Metrics: `/metrics` (Prometheus format)
- Logs: Check Kubernetes logs or Docker logs
---
## Troubleshooting
### Service Won't Start
1. Check environment variables
2. Verify database connectivity
3. Check logs: `kubectl logs <pod-name>`
### Database Connection Issues
1. Verify DATABASE_URL
2. Check network connectivity
3. Verify database credentials
### Performance Issues
1. Check metrics endpoint
2. Review database query performance
3. Check Redis connectivity
---
**Last Updated**: 2025-01-15

View File

@@ -0,0 +1,292 @@
# Production Readiness Todos - 110% Complete
## Overview
This document lists all todos required to achieve 110% production readiness for the ISO-20022 Combo Flow system. Each todo is categorized by priority and area of concern.
**Total Todos**: 127 items across 12 categories
---
## 🔴 P0 - Critical Security & Infrastructure (22 todos)
### Security Hardening
- [ ] **SEC-001**: Implement rate limiting on all API endpoints (express-rate-limit)
- [ ] **SEC-002**: Add request size limits and body parsing limits
- [ ] **SEC-003**: Implement API key authentication for orchestrator service
- [ ] **SEC-004**: Add input validation and sanitization (zod/joi)
- [ ] **SEC-005**: Implement CSRF protection for Next.js API routes
- [ ] **SEC-006**: Add Helmet.js security headers to orchestrator
- [ ] **SEC-007**: Implement SQL injection prevention (parameterized queries)
- [ ] **SEC-008**: Add request ID tracking for all requests
- [ ] **SEC-009**: Implement secrets management (Azure Key Vault / AWS Secrets Manager)
- [ ] **SEC-010**: Add HSM integration for cryptographic operations
- [ ] **SEC-011**: Implement certificate pinning for external API calls
- [ ] **SEC-012**: Add IP whitelisting for admin endpoints
- [ ] **SEC-013**: Implement audit logging for all sensitive operations
- [ ] **SEC-014**: Add session management and timeout handling
- [ ] **SEC-015**: Implement password policy enforcement (if applicable)
- [ ] **SEC-016**: Add file upload validation and virus scanning
- [ ] **SEC-017**: Implement OWASP Top 10 mitigation checklist
- [ ] **SEC-018**: Add penetration testing and security audit
- [ ] **SEC-019**: Implement dependency vulnerability scanning (Snyk/Dependabot)
- [ ] **SEC-020**: Add security headers validation (Security.txt)
### Infrastructure
- [ ] **INFRA-001**: Replace in-memory database with PostgreSQL/MongoDB
- [ ] **INFRA-002**: Set up database connection pooling and migrations
---
## 🟠 P1 - Database & Persistence (15 todos)
### Database Setup
- [ ] **DB-001**: Design and implement database schema for plans table
- [ ] **DB-002**: Design and implement database schema for executions table
- [ ] **DB-003**: Design and implement database schema for receipts table
- [ ] **DB-004**: Design and implement database schema for audit_logs table
- [ ] **DB-005**: Design and implement database schema for users/identities table
- [ ] **DB-006**: Design and implement database schema for compliance_status table
- [ ] **DB-007**: Implement database migrations (TypeORM/Prisma/Knex)
- [ ] **DB-008**: Add database indexes for performance optimization
- [ ] **DB-009**: Implement database connection retry logic
- [ ] **DB-010**: Add database transaction management for 2PC operations
- [ ] **DB-011**: Implement database backup strategy (automated daily backups)
- [ ] **DB-012**: Add database replication for high availability
- [ ] **DB-013**: Implement database monitoring and alerting
- [ ] **DB-014**: Add data retention policies and archival
- [ ] **DB-015**: Implement database encryption at rest
---
## 🟡 P1 - Configuration & Environment (12 todos)
### Configuration Management
- [ ] **CONFIG-001**: Create comprehensive .env.example files for all services
- [ ] **CONFIG-002**: Implement environment variable validation on startup
- [ ] **CONFIG-003**: Add configuration schema validation (zod/joi)
- [ ] **CONFIG-004**: Implement feature flags system with LaunchDarkly integration
- [ ] **CONFIG-005**: Add configuration hot-reload capability
- [ ] **CONFIG-006**: Create environment-specific configuration files
- [ ] **CONFIG-007**: Implement secrets rotation mechanism
- [ ] **CONFIG-008**: Add configuration documentation and schema
- [ ] **CONFIG-009**: Implement configuration versioning
- [ ] **CONFIG-010**: Add configuration validation tests
- [ ] **CONFIG-011**: Create configuration management dashboard
- [ ] **CONFIG-012**: Implement configuration audit logging
---
## 🟢 P1 - Monitoring & Observability (18 todos)
### Logging
- [ ] **LOG-001**: Implement structured logging (Winston/Pino)
- [ ] **LOG-002**: Add log aggregation (ELK Stack / Datadog / Splunk)
- [ ] **LOG-003**: Implement log retention policies
- [ ] **LOG-004**: Add log level configuration per environment
- [ ] **LOG-005**: Implement PII masking in logs
- [ ] **LOG-006**: Add correlation IDs for request tracing
- [ ] **LOG-007**: Implement log rotation and archival
### Metrics & Monitoring
- [ ] **METRICS-001**: Add Prometheus metrics endpoint
- [ ] **METRICS-002**: Implement custom business metrics (plan creation rate, execution success rate)
- [ ] **METRICS-003**: Add Grafana dashboards for key metrics
- [ ] **METRICS-004**: Implement health check endpoints (/health, /ready, /live)
- [ ] **METRICS-005**: Add uptime monitoring and alerting
- [ ] **METRICS-006**: Implement performance metrics (latency, throughput)
- [ ] **METRICS-007**: Add error rate tracking and alerting
- [ ] **METRICS-008**: Implement resource usage monitoring (CPU, memory, disk)
### Alerting
- [ ] **ALERT-001**: Set up alerting rules (PagerDuty / Opsgenie)
- [ ] **ALERT-002**: Configure alert thresholds and escalation policies
- [ ] **ALERT-003**: Implement alert fatigue prevention
---
## 🔵 P1 - Performance & Optimization (10 todos)
### Performance
- [ ] **PERF-001**: Implement Redis caching for frequently accessed data
- [ ] **PERF-002**: Add database query optimization and indexing
- [ ] **PERF-003**: Implement API response caching (Redis)
- [ ] **PERF-004**: Add CDN configuration for static assets
- [ ] **PERF-005**: Implement lazy loading for frontend components
- [ ] **PERF-006**: Add image optimization and compression
- [ ] **PERF-007**: Implement connection pooling for external services
- [ ] **PERF-008**: Add request batching for external API calls
- [ ] **PERF-009**: Implement database connection pooling
- [ ] **PERF-010**: Add load testing and performance benchmarking
---
## 🟣 P1 - Error Handling & Resilience (12 todos)
### Error Handling
- [ ] **ERR-001**: Implement comprehensive error handling middleware
- [ ] **ERR-002**: Add error classification (user errors vs system errors)
- [ ] **ERR-003**: Implement error recovery mechanisms
- [ ] **ERR-004**: Add circuit breaker pattern for external services
- [ ] **ERR-005**: Implement retry logic with exponential backoff (enhance existing)
- [ ] **ERR-006**: Add timeout handling for all external calls
- [ ] **ERR-007**: Implement graceful degradation strategies
- [ ] **ERR-008**: Add error notification system (Sentry / Rollbar)
### Resilience
- [ ] **RES-001**: Implement health check dependencies
- [ ] **RES-002**: Add graceful shutdown handling
- [ ] **RES-003**: Implement request timeout configuration
- [ ] **RES-004**: Add dead letter queue for failed messages
---
## 🟤 P2 - Testing & Quality Assurance (15 todos)
### Testing
- [ ] **TEST-004**: Increase E2E test coverage to 80%+
- [ ] **TEST-005**: Add integration tests for orchestrator services
- [ ] **TEST-006**: Implement contract testing (Pact)
- [ ] **TEST-007**: Add performance tests (k6 / Artillery)
- [ ] **TEST-008**: Implement load testing scenarios
- [ ] **TEST-009**: Add stress testing for failure scenarios
- [ ] **TEST-010**: Implement chaos engineering tests
- [ ] **TEST-011**: Add mutation testing (Stryker)
- [ ] **TEST-012**: Implement visual regression testing
- [ ] **TEST-013**: Add accessibility testing (a11y)
- [ ] **TEST-014**: Implement security testing (OWASP ZAP)
- [ ] **TEST-015**: Add contract fuzzing for smart contracts
### Quality Assurance
- [ ] **QA-001**: Set up code quality gates (SonarQube)
- [ ] **QA-002**: Implement code review checklist
- [ ] **QA-003**: Add automated code quality checks in CI
---
## 🟠 P2 - Smart Contract Security (10 todos)
### Contract Security
- [ ] **SC-005**: Complete smart contract security audit (CertiK / Trail of Bits)
- [ ] **SC-006**: Implement proper signature verification (ECDSA.recover)
- [ ] **SC-007**: Add access control modifiers to all functions
- [ ] **SC-008**: Implement time-lock for critical operations
- [ ] **SC-009**: Add multi-sig support for admin functions
- [ ] **SC-010**: Implement upgrade mechanism with timelock
- [ ] **SC-011**: Add gas optimization and gas limit checks
- [ ] **SC-012**: Implement event emission for all state changes
- [ ] **SC-013**: Add comprehensive NatSpec documentation
- [ ] **SC-014**: Implement formal verification for critical paths
---
## 🟡 P2 - API & Integration (8 todos)
### API Improvements
- [ ] **API-001**: Implement OpenAPI/Swagger documentation with examples
- [ ] **API-002**: Add API versioning strategy
- [ ] **API-003**: Implement API throttling and quotas
- [ ] **API-004**: Add API documentation site (Swagger UI)
- [ ] **API-005**: Implement webhook support for plan status updates
- [ ] **API-006**: Add API deprecation policy and migration guides
### Integration
- [ ] **INT-003**: Implement real bank API connectors (replace mocks)
- [ ] **INT-004**: Add real KYC/AML provider integrations (replace mocks)
---
## 🟢 P2 - Deployment & Infrastructure (8 todos)
### Deployment
- [ ] **DEPLOY-001**: Create Dockerfiles for all services
- [ ] **DEPLOY-002**: Implement Docker Compose for local development
- [ ] **DEPLOY-003**: Set up Kubernetes manifests (K8s)
- [ ] **DEPLOY-004**: Implement CI/CD pipeline (GitHub Actions enhancement)
- [ ] **DEPLOY-005**: Add blue-green deployment strategy
- [ ] **DEPLOY-006**: Implement canary deployment support
- [ ] **DEPLOY-007**: Add automated rollback mechanisms
- [ ] **DEPLOY-008**: Create infrastructure as code (Terraform / Pulumi)
---
## 🔵 P2 - Documentation (7 todos)
### Documentation
- [ ] **DOC-001**: Create API documentation with Postman collection
- [ ] **DOC-002**: Add deployment runbooks and procedures
- [ ] **DOC-003**: Implement inline code documentation (JSDoc)
- [ ] **DOC-004**: Create troubleshooting guide
- [ ] **DOC-005**: Add architecture decision records (ADRs)
- [ ] **DOC-006**: Create user guide and tutorials
- [ ] **DOC-007**: Add developer onboarding documentation
---
## 🟣 P3 - Compliance & Audit (5 todos)
### Compliance
- [ ] **COMP-001**: Implement GDPR compliance (data deletion, export)
- [ ] **COMP-002**: Add PCI DSS compliance if handling payment data
- [ ] **COMP-003**: Implement SOC 2 Type II compliance
- [ ] **COMP-004**: Add compliance reporting and audit trails
- [ ] **COMP-005**: Implement data retention and deletion policies
---
## 🟤 P3 - Additional Features (3 todos)
### Features
- [ ] **FEAT-001**: Implement plan templates and presets
- [ ] **FEAT-002**: Add batch plan execution support
- [ ] **FEAT-003**: Implement plan scheduling and recurring plans
---
## Summary
### By Priority
- **P0 (Critical)**: 22 todos - Must complete before production
- **P1 (High)**: 67 todos - Should complete for production
- **P2 (Medium)**: 33 todos - Nice to have for production
- **P3 (Low)**: 5 todos - Can defer post-launch
### By Category
- Security & Infrastructure: 22
- Database & Persistence: 15
- Configuration & Environment: 12
- Monitoring & Observability: 18
- Performance & Optimization: 10
- Error Handling & Resilience: 12
- Testing & Quality Assurance: 15
- Smart Contract Security: 10
- API & Integration: 8
- Deployment & Infrastructure: 8
- Documentation: 7
- Compliance & Audit: 5
- Additional Features: 3
### Estimated Effort
- **P0 Todos**: ~4-6 weeks (1-2 engineers)
- **P1 Todos**: ~8-12 weeks (2-3 engineers)
- **P2 Todos**: ~6-8 weeks (2 engineers)
- **P3 Todos**: ~2-3 weeks (1 engineer)
**Total Estimated Time**: 20-29 weeks (5-7 months) with dedicated team
---
## Next Steps
1. **Week 1-2**: Complete all P0 security and infrastructure todos
2. **Week 3-4**: Set up database and persistence layer
3. **Week 5-6**: Implement monitoring and observability
4. **Week 7-8**: Performance optimization and testing
5. **Week 9-10**: Documentation and deployment preparation
6. **Week 11+**: P2 and P3 items based on priority
---
**Document Version**: 1.0
**Created**: 2025-01-15
**Status**: Production Readiness Planning

147
docs/TROUBLESHOOTING.md Normal file
View File

@@ -0,0 +1,147 @@
# Troubleshooting Guide
## Common Issues and Solutions
---
## Frontend Issues
### Issue: Hydration Errors
**Symptoms**: Console warnings about hydration mismatches
**Solution**:
- Ensure all client-only components use `"use client"`
- Check for conditional rendering based on `window` or browser APIs
- Use `useEffect` for client-side only code
### Issue: Wallet Connection Fails
**Symptoms**: Wallet popup doesn't appear or connection fails
**Solution**:
- Check browser console for errors
- Verify wallet extension is installed
- Check network connectivity
- Clear browser cache and try again
### Issue: API Calls Fail
**Symptoms**: Network errors, 500 status codes
**Solution**:
- Verify `NEXT_PUBLIC_ORCH_URL` is set correctly
- Check orchestrator service is running
- Verify CORS configuration
- Check browser network tab for detailed errors
---
## Backend Issues
### Issue: Database Connection Fails
**Symptoms**: "Database connection error" in logs
**Solution**:
- Verify DATABASE_URL is correct
- Check database is running and accessible
- Verify network connectivity
- Check firewall rules
### Issue: Rate Limiting Too Aggressive
**Symptoms**: "Too many requests" errors
**Solution**:
- Adjust rate limit configuration in `rateLimit.ts`
- Check if IP is being shared
- Verify rate limit window settings
### Issue: Plan Execution Fails
**Symptoms**: Execution status shows "failed"
**Solution**:
- Check execution logs for specific error
- Verify all adapters are whitelisted
- Check DLT connection status
- Verify plan signature is valid
---
## Database Issues
### Issue: Migration Fails
**Symptoms**: Migration errors during startup
**Solution**:
- Check database permissions
- Verify schema doesn't already exist
- Check migration scripts for syntax errors
- Review database logs
### Issue: Query Performance Issues
**Symptoms**: Slow API responses
**Solution**:
- Check database indexes are created
- Review query execution plans
- Consider adding additional indexes
- Check connection pool settings
---
## Smart Contract Issues
### Issue: Contract Deployment Fails
**Symptoms**: Deployment reverts or fails
**Solution**:
- Verify sufficient gas
- Check contract dependencies
- Verify constructor parameters
- Review contract compilation errors
### Issue: Transaction Reverts
**Symptoms**: Transactions revert on execution
**Solution**:
- Check error messages in transaction receipt
- Verify adapter is whitelisted
- Check gas limits
- Verify signature is valid
---
## Monitoring Issues
### Issue: Metrics Not Appearing
**Symptoms**: Prometheus metrics endpoint empty
**Solution**:
- Verify metrics are being recorded
- Check Prometheus configuration
- Verify service is running
- Check network connectivity
---
## Security Issues
### Issue: API Key Authentication Fails
**Symptoms**: 401/403 errors
**Solution**:
- Verify API key is correct
- Check API key format
- Verify key is in ALLOWED_KEYS
- Check request headers
---
## Performance Issues
### Issue: Slow API Responses
**Symptoms**: High latency
**Solution**:
- Check database query performance
- Verify Redis caching is working
- Review connection pool settings
- Check external service response times
---
## Getting Help
1. Check logs: `kubectl logs <pod-name>` or `docker logs <container>`
2. Review metrics: `/metrics` endpoint
3. Check health: `/health` endpoint
4. Review error messages in application logs
---
**Last Updated**: 2025-01-15

65
k8s/deployment.yaml Normal file
View File

@@ -0,0 +1,65 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: orchestrator
labels:
app: orchestrator
spec:
replicas: 3
selector:
matchLabels:
app: orchestrator
template:
metadata:
labels:
app: orchestrator
spec:
containers:
- name: orchestrator
image: orchestrator:latest
ports:
- containerPort: 8080
env:
- name: DATABASE_URL
valueFrom:
secretKeyRef:
name: orchestrator-secrets
key: database-url
- name: REDIS_URL
valueFrom:
secretKeyRef:
name: orchestrator-secrets
key: redis-url
livenessProbe:
httpGet:
path: /live
port: 8080
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
httpGet:
path: /ready
port: 8080
initialDelaySeconds: 5
periodSeconds: 5
resources:
requests:
memory: "256Mi"
cpu: "250m"
limits:
memory: "512Mi"
cpu: "500m"
---
apiVersion: v1
kind: Service
metadata:
name: orchestrator
spec:
selector:
app: orchestrator
ports:
- port: 8080
targetPort: 8080
type: LoadBalancer

View File

@@ -0,0 +1,45 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: webapp
labels:
app: webapp
spec:
replicas: 2
selector:
matchLabels:
app: webapp
template:
metadata:
labels:
app: webapp
spec:
containers:
- name: webapp
image: webapp:latest
ports:
- containerPort: 3000
env:
- name: NEXT_PUBLIC_ORCH_URL
value: "http://orchestrator:8080"
resources:
requests:
memory: "512Mi"
cpu: "500m"
limits:
memory: "1Gi"
cpu: "1000m"
---
apiVersion: v1
kind: Service
metadata:
name: webapp
spec:
selector:
app: webapp
ports:
- port: 3000
targetPort: 3000
type: LoadBalancer

View File

@@ -7,18 +7,27 @@
"build": "tsc",
"dev": "ts-node src/index.ts",
"start": "node dist/index.js",
"test": "jest"
"test": "jest",
"migrate": "ts-node src/db/migrations/index.ts"
},
"dependencies": {
"express": "^4.18.2",
"uuid": "^9.0.1",
"cors": "^2.8.5"
"cors": "^2.8.5",
"express-rate-limit": "^7.1.5",
"helmet": "^7.1.0",
"zod": "^3.22.4",
"pg": "^8.11.3",
"pino": "^8.16.2",
"pino-pretty": "^10.2.3",
"prom-client": "^15.1.0"
},
"devDependencies": {
"@types/express": "^4.17.21",
"@types/node": "^20.10.0",
"@types/uuid": "^9.0.6",
"@types/cors": "^2.8.17",
"@types/pg": "^8.10.9",
"typescript": "^5.3.3",
"ts-node": "^10.9.2"
}

View File

@@ -0,0 +1,49 @@
import { Request, Response } from "express";
import { executionCoordinator } from "../services/execution";
import { asyncHandler } from "../services/errorHandler";
import { auditLog } from "../middleware";
/**
* POST /api/plans/:planId/execute
* Execute a plan
*/
export const executePlan = asyncHandler(async (req: Request, res: Response) => {
const { planId } = req.params;
const result = await executionCoordinator.executePlan(planId);
res.json(result);
});
/**
* GET /api/plans/:planId/status
* Get execution status
*/
export const getExecutionStatus = asyncHandler(async (req: Request, res: Response) => {
const { planId } = req.params;
const executionId = req.query.executionId as string;
if (executionId) {
const status = await executionCoordinator.getExecutionStatus(executionId);
return res.json(status);
}
// Get latest execution for plan
res.json({ status: "pending" });
});
/**
* POST /api/plans/:planId/abort
* Abort execution
*/
export const abortExecution = asyncHandler(async (req: Request, res: Response) => {
const { planId } = req.params;
const executionId = req.query.executionId as string;
if (executionId) {
await executionCoordinator.abortExecution(executionId, planId, "User aborted");
}
res.json({ success: true });
});

View File

@@ -0,0 +1,38 @@
import { Router } from "express";
import swaggerUi from "swagger-ui-express";
import swaggerJsdoc from "swagger-jsdoc";
const options: swaggerJsdoc.Options = {
definition: {
openapi: "3.0.0",
info: {
title: "ISO-20022 Combo Flow Orchestrator API",
version: "1.0.0",
description: "API for managing and executing financial workflow plans",
},
servers: [
{
url: "http://localhost:8080",
description: "Development server",
},
],
components: {
securitySchemes: {
ApiKeyAuth: {
type: "apiKey",
in: "header",
name: "X-API-Key",
},
},
},
},
apis: ["./src/api/**/*.ts"],
};
const specs = swaggerJsdoc(options);
export function setupSwagger(router: Router) {
router.use("/api-docs", swaggerUi.serve);
router.get("/api-docs", swaggerUi.setup(specs));
}

View File

@@ -0,0 +1,22 @@
import { Router } from "express";
/**
* API versioning middleware
*/
export function apiVersion(version: string) {
return (req: any, res: any, next: any) => {
req.apiVersion = version;
res.setHeader("API-Version", version);
next();
};
}
/**
* Create versioned router
*/
export function createVersionedRouter(version: string) {
const router = Router();
router.use(apiVersion(version));
return router;
}

View File

@@ -0,0 +1,78 @@
import { Request, Response } from "express";
import { executionCoordinator } from "../services/execution";
import { logger } from "../logging/logger";
interface WebhookConfig {
url: string;
secret: string;
events: string[];
}
const webhooks: Map<string, WebhookConfig> = new Map();
/**
* POST /api/webhooks
* Register a webhook
*/
export async function registerWebhook(req: Request, res: Response) {
try {
const { url, secret, events } = req.body;
if (!url || !secret || !events || !Array.isArray(events)) {
return res.status(400).json({
error: "Invalid webhook configuration",
});
}
const webhookId = `webhook-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`;
webhooks.set(webhookId, { url, secret, events });
res.json({ webhookId, url, events });
} catch (error: any) {
logger.error({ error }, "Failed to register webhook");
res.status(500).json({ error: error.message });
}
}
/**
* Send webhook notification
*/
export async function sendWebhook(event: string, payload: any) {
for (const [webhookId, config] of webhooks.entries()) {
if (config.events.includes(event) || config.events.includes("*")) {
try {
const signature = createWebhookSignature(JSON.stringify(payload), config.secret);
await fetch(config.url, {
method: "POST",
headers: {
"Content-Type": "application/json",
"X-Webhook-Event": event,
"X-Webhook-Signature": signature,
"X-Webhook-Id": webhookId,
},
body: JSON.stringify(payload),
});
} catch (error) {
logger.error({ error, webhookId, event }, "Failed to send webhook");
}
}
}
}
/**
* Create webhook signature
*/
function createWebhookSignature(payload: string, secret: string): string {
const crypto = require("crypto");
return crypto.createHmac("sha256", secret).update(payload).digest("hex");
}
// Listen to execution events
executionCoordinator.onStatus((executionId, event) => {
sendWebhook("plan.status", {
executionId,
...event,
});
});

View File

@@ -0,0 +1,57 @@
import { z } from "zod";
/**
* Environment variable validation schema
*/
const envSchema = z.object({
NODE_ENV: z.enum(["development", "production", "test"]).default("development"),
PORT: z.string().transform(Number).pipe(z.number().int().positive()),
DATABASE_URL: z.string().url().optional(),
API_KEYS: z.string().optional(),
REDIS_URL: z.string().url().optional(),
LOG_LEVEL: z.enum(["error", "warn", "info", "debug"]).default("info"),
ALLOWED_IPS: z.string().optional(),
SESSION_SECRET: z.string().min(32),
JWT_SECRET: z.string().min(32).optional(),
AZURE_KEY_VAULT_URL: z.string().url().optional(),
AWS_SECRETS_MANAGER_REGION: z.string().optional(),
SENTRY_DSN: z.string().url().optional(),
});
/**
* Validated environment variables
*/
export const env = envSchema.parse({
NODE_ENV: process.env.NODE_ENV,
PORT: process.env.PORT || "8080",
DATABASE_URL: process.env.DATABASE_URL,
API_KEYS: process.env.API_KEYS,
REDIS_URL: process.env.REDIS_URL,
LOG_LEVEL: process.env.LOG_LEVEL,
ALLOWED_IPS: process.env.ALLOWED_IPS,
SESSION_SECRET: process.env.SESSION_SECRET || "dev-secret-change-in-production-min-32-chars",
JWT_SECRET: process.env.JWT_SECRET,
AZURE_KEY_VAULT_URL: process.env.AZURE_KEY_VAULT_URL,
AWS_SECRETS_MANAGER_REGION: process.env.AWS_SECRETS_MANAGER_REGION,
SENTRY_DSN: process.env.SENTRY_DSN,
});
/**
* Validate environment on startup
*/
export function validateEnv() {
try {
envSchema.parse(process.env);
console.log("✅ Environment variables validated");
} catch (error) {
if (error instanceof z.ZodError) {
console.error("❌ Environment validation failed:");
error.errors.forEach((err) => {
console.error(` - ${err.path.join(".")}: ${err.message}`);
});
process.exit(1);
}
throw error;
}
}

View File

@@ -0,0 +1,47 @@
import { query } from "../postgres";
import fs from "fs";
import path from "path";
/**
* Run initial database schema migration
*/
export async function up() {
const schemaPath = path.join(__dirname, "../schema.sql");
const schema = fs.readFileSync(schemaPath, "utf-8");
// Split by semicolons and execute each statement
const statements = schema
.split(";")
.map((s) => s.trim())
.filter((s) => s.length > 0 && !s.startsWith("--"));
for (const statement of statements) {
try {
await query(statement);
} catch (error: any) {
// Ignore "already exists" errors
if (!error.message.includes("already exists")) {
throw error;
}
}
}
console.log("✅ Database schema migrated successfully");
}
/**
* Rollback migration (not implemented for initial schema)
*/
export async function down() {
// Drop tables in reverse order
await query("DROP TABLE IF EXISTS compliance_status CASCADE");
await query("DROP TABLE IF EXISTS users CASCADE");
await query("DROP TABLE IF EXISTS audit_logs CASCADE");
await query("DROP TABLE IF EXISTS receipts CASCADE");
await query("DROP TABLE IF EXISTS executions CASCADE");
await query("DROP TABLE IF EXISTS plans CASCADE");
await query("DROP FUNCTION IF EXISTS update_updated_at_column CASCADE");
console.log("✅ Database schema rolled back");
}

View File

@@ -0,0 +1,15 @@
import { up as up001 } from "./001_initial_schema";
/**
* Run all migrations
*/
export async function runMigration() {
try {
await up001();
console.log("✅ All migrations completed");
} catch (error) {
console.error("❌ Migration failed:", error);
throw error;
}
}

View File

@@ -1,29 +1,101 @@
// In-memory database for plans (mock implementation)
// In production, replace with actual database (PostgreSQL, MongoDB, etc.)
import { query, transaction } from "./postgres";
import type { Plan } from "../types/plan";
const plans: Map<string, any> = new Map();
export async function storePlan(plan: any): Promise<void> {
plans.set(plan.plan_id, plan);
/**
* Store plan in database
*/
export async function storePlan(plan: Plan): Promise<void> {
await query(
`INSERT INTO plans (
plan_id, creator, plan_hash, steps, max_recursion, max_ltv,
signature, message_hash, signer_address, signed_at, status
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
ON CONFLICT (plan_id) DO UPDATE SET
steps = EXCLUDED.steps,
status = EXCLUDED.status,
updated_at = CURRENT_TIMESTAMP`,
[
plan.plan_id,
plan.creator,
plan.plan_hash,
JSON.stringify(plan.steps),
plan.maxRecursion || 3,
plan.maxLTV || 0.6,
plan.signature || null,
null, // message_hash
null, // signer_address
null, // signed_at
plan.status || "pending",
]
);
}
export async function getPlanById(planId: string): Promise<any | null> {
return plans.get(planId) || null;
}
/**
* Get plan by ID
*/
export async function getPlanById(planId: string): Promise<Plan | null> {
const result = await query<Plan>(
"SELECT * FROM plans WHERE plan_id = $1",
[planId]
);
export async function updatePlanSignature(planId: string, signature: any): Promise<void> {
const plan = plans.get(planId);
if (plan) {
plan.signature = signature;
plans.set(planId, plan);
if (result.length === 0) {
return null;
}
const row = result[0];
return {
plan_id: row.plan_id,
creator: row.creator,
steps: typeof row.steps === "string" ? JSON.parse(row.steps) : row.steps,
maxRecursion: row.max_recursion,
maxLTV: row.max_ltv,
signature: row.signature,
plan_hash: row.plan_hash,
created_at: row.created_at?.toISOString(),
status: row.status,
};
}
export async function updatePlanStatus(planId: string, status: string): Promise<void> {
const plan = plans.get(planId);
if (plan) {
plan.status = status;
plans.set(planId, plan);
/**
* Update plan signature
*/
export async function updatePlanSignature(
planId: string,
signature: {
signature: string;
messageHash: string;
signerAddress: string;
signedAt: string;
}
): Promise<void> {
await query(
`UPDATE plans SET
signature = $1,
message_hash = $2,
signer_address = $3,
signed_at = $4,
updated_at = CURRENT_TIMESTAMP
WHERE plan_id = $5`,
[
signature.signature,
signature.messageHash,
signature.signerAddress,
signature.signedAt,
planId,
]
);
}
/**
* Update plan status
*/
export async function updatePlanStatus(
planId: string,
status: string
): Promise<void> {
await query(
"UPDATE plans SET status = $1, updated_at = CURRENT_TIMESTAMP WHERE plan_id = $2",
[status, planId]
);
}

View File

@@ -0,0 +1,94 @@
import { Pool, PoolClient } from "pg";
import { env } from "../config/env";
/**
* PostgreSQL connection pool
*/
let pool: Pool | null = null;
/**
* Get database connection pool
*/
export function getPool(): Pool {
if (!pool) {
pool = new Pool({
connectionString: env.DATABASE_URL || "postgresql://user:pass@localhost:5432/comboflow",
max: 20, // Maximum number of clients in the pool
idleTimeoutMillis: 30000,
connectionTimeoutMillis: 2000,
});
pool.on("error", (err) => {
console.error("Unexpected error on idle client", err);
});
}
return pool;
}
/**
* Execute query with automatic retry
*/
export async function query<T = any>(
text: string,
params?: any[],
retries = 3
): Promise<T[]> {
const pool = getPool();
let lastError: Error | null = null;
for (let attempt = 0; attempt <= retries; attempt++) {
try {
const result = await pool.query(text, params);
return result.rows as T[];
} catch (error: any) {
lastError = error;
// Don't retry on certain errors
if (error.code === "23505" || error.code === "23503") {
throw error;
}
if (attempt < retries) {
const delay = Math.min(1000 * Math.pow(2, attempt), 10000);
await new Promise((resolve) => setTimeout(resolve, delay));
console.log(`Database query retry ${attempt + 1}/${retries}`);
}
}
}
throw lastError || new Error("Database query failed after retries");
}
/**
* Execute transaction
*/
export async function transaction<T>(
callback: (client: PoolClient) => Promise<T>
): Promise<T> {
const pool = getPool();
const client = await pool.connect();
try {
await client.query("BEGIN");
const result = await callback(client);
await client.query("COMMIT");
return result;
} catch (error) {
await client.query("ROLLBACK");
throw error;
} finally {
client.release();
}
}
/**
* Close database connections
*/
export async function closePool(): Promise<void> {
if (pool) {
await pool.end();
pool = null;
}
}

View File

@@ -0,0 +1,139 @@
-- Database schema for ISO-20022 Combo Flow Orchestrator
-- Plans table
CREATE TABLE IF NOT EXISTS plans (
plan_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
creator VARCHAR(255) NOT NULL,
plan_hash VARCHAR(64) NOT NULL UNIQUE,
steps JSONB NOT NULL,
max_recursion INTEGER DEFAULT 3,
max_ltv DECIMAL(5,2) DEFAULT 0.60,
signature TEXT,
message_hash VARCHAR(64),
signer_address VARCHAR(42),
signed_at TIMESTAMP,
status VARCHAR(20) DEFAULT 'pending',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX idx_plans_creator ON plans(creator);
CREATE INDEX idx_plans_status ON plans(status);
CREATE INDEX idx_plans_created_at ON plans(created_at);
CREATE INDEX idx_plans_plan_hash ON plans(plan_hash);
-- Executions table
CREATE TABLE IF NOT EXISTS executions (
execution_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
plan_id UUID NOT NULL REFERENCES plans(plan_id) ON DELETE CASCADE,
status VARCHAR(20) DEFAULT 'pending',
phase VARCHAR(50),
started_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
completed_at TIMESTAMP,
error TEXT,
dlt_tx_hash VARCHAR(66),
iso_message_id VARCHAR(255),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX idx_executions_plan_id ON executions(plan_id);
CREATE INDEX idx_executions_status ON executions(status);
CREATE INDEX idx_executions_started_at ON executions(started_at);
-- Receipts table
CREATE TABLE IF NOT EXISTS receipts (
receipt_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
plan_id UUID NOT NULL REFERENCES plans(plan_id) ON DELETE CASCADE,
execution_id UUID REFERENCES executions(execution_id),
receipt_hash VARCHAR(64) NOT NULL UNIQUE,
dlt_transaction JSONB,
iso_message JSONB,
notary_proof JSONB,
status VARCHAR(20) DEFAULT 'pending',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX idx_receipts_plan_id ON receipts(plan_id);
CREATE INDEX idx_receipts_receipt_hash ON receipts(receipt_hash);
-- Audit logs table
CREATE TABLE IF NOT EXISTS audit_logs (
log_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
request_id VARCHAR(255),
user_id VARCHAR(255),
action VARCHAR(100) NOT NULL,
resource VARCHAR(255) NOT NULL,
ip_address VARCHAR(45),
user_agent TEXT,
success BOOLEAN DEFAULT true,
error_message TEXT,
metadata JSONB,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX idx_audit_logs_user_id ON audit_logs(user_id);
CREATE INDEX idx_audit_logs_action ON audit_logs(action);
CREATE INDEX idx_audit_logs_created_at ON audit_logs(created_at);
CREATE INDEX idx_audit_logs_request_id ON audit_logs(request_id);
-- Users/Identities table
CREATE TABLE IF NOT EXISTS users (
user_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
email VARCHAR(255) UNIQUE NOT NULL,
lei VARCHAR(20),
did VARCHAR(255),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX idx_users_email ON users(email);
CREATE INDEX idx_users_lei ON users(lei);
-- Compliance status table
CREATE TABLE IF NOT EXISTS compliance_status (
compliance_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
user_id UUID NOT NULL REFERENCES users(user_id) ON DELETE CASCADE,
lei VARCHAR(20),
did VARCHAR(255),
kyc_level INTEGER DEFAULT 0,
kyc_verified BOOLEAN DEFAULT false,
kyc_expires_at TIMESTAMP,
aml_passed BOOLEAN DEFAULT false,
aml_last_check TIMESTAMP,
aml_risk_level VARCHAR(20),
valid BOOLEAN DEFAULT false,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX idx_compliance_user_id ON compliance_status(user_id);
CREATE INDEX idx_compliance_valid ON compliance_status(valid);
CREATE INDEX idx_compliance_kyc_expires ON compliance_status(kyc_expires_at);
-- Update timestamp trigger function
CREATE OR REPLACE FUNCTION update_updated_at_column()
RETURNS TRIGGER AS $$
BEGIN
NEW.updated_at = CURRENT_TIMESTAMP;
RETURN NEW;
END;
$$ language 'plpgsql';
-- Apply update triggers
CREATE TRIGGER update_plans_updated_at BEFORE UPDATE ON plans
FOR EACH ROW EXECUTE FUNCTION update_updated_at_column();
CREATE TRIGGER update_executions_updated_at BEFORE UPDATE ON executions
FOR EACH ROW EXECUTE FUNCTION update_updated_at_column();
CREATE TRIGGER update_receipts_updated_at BEFORE UPDATE ON receipts
FOR EACH ROW EXECUTE FUNCTION update_updated_at_column();
CREATE TRIGGER update_users_updated_at BEFORE UPDATE ON users
FOR EACH ROW EXECUTE FUNCTION update_updated_at_column();
CREATE TRIGGER update_compliance_status_updated_at BEFORE UPDATE ON compliance_status
FOR EACH ROW EXECUTE FUNCTION update_updated_at_column();

View File

@@ -0,0 +1,78 @@
import { getPool } from "../db/postgres";
interface HealthStatus {
status: "healthy" | "unhealthy";
timestamp: string;
checks: {
database: "up" | "down";
memory: "ok" | "warning" | "critical";
disk: "ok" | "warning" | "critical";
};
uptime: number;
version: string;
}
/**
* Health check endpoint
*/
export async function healthCheck(): Promise<HealthStatus> {
const startTime = Date.now();
const checks: HealthStatus["checks"] = {
database: "down",
memory: "ok",
disk: "ok",
};
// Check database
try {
const pool = getPool();
await pool.query("SELECT 1");
checks.database = "up";
} catch (error) {
checks.database = "down";
}
// Check memory usage
const memUsage = process.memoryUsage();
const memUsagePercent = (memUsage.heapUsed / memUsage.heapTotal) * 100;
if (memUsagePercent > 90) {
checks.memory = "critical";
} else if (memUsagePercent > 75) {
checks.memory = "warning";
}
// Check disk space (mock - in production use actual disk stats)
checks.disk = "ok";
const allHealthy = checks.database === "up" && checks.memory !== "critical" && checks.disk !== "critical";
return {
status: allHealthy ? "healthy" : "unhealthy",
timestamp: new Date().toISOString(),
checks,
uptime: Date.now() - startTime,
version: process.env.npm_package_version || "1.0.0",
};
}
/**
* Readiness check (for Kubernetes)
*/
export async function readinessCheck(): Promise<boolean> {
try {
const pool = getPool();
await pool.query("SELECT 1");
return true;
} catch {
return false;
}
}
/**
* Liveness check (for Kubernetes)
*/
export async function livenessCheck(): Promise<boolean> {
// Simple check - if process is running, we're alive
return true;
}

139
orchestrator/src/index.ts Normal file
View File

@@ -0,0 +1,139 @@
import express from "express";
import cors from "cors";
import { validateEnv } from "./config/env";
import {
apiLimiter,
securityHeaders,
requestSizeLimits,
requestId,
apiKeyAuth,
auditLog,
} from "./middleware";
import { logger } from "./logging/logger";
import { getMetrics, httpRequestDuration, httpRequestTotal, register } from "./metrics/prometheus";
import { healthCheck, readinessCheck, livenessCheck } from "./health/health";
import { createPlan, getPlan, addSignature, validatePlanEndpoint } from "./api/plans";
import { streamPlanStatus } from "./api/sse";
import { executionCoordinator } from "./services/execution";
import { runMigration } from "./db/migrations";
// Validate environment on startup
validateEnv();
const app = express();
const PORT = process.env.PORT || 8080;
// Middleware
app.use(cors());
app.use(securityHeaders);
app.use(requestSizeLimits);
app.use(requestId);
app.use(express.json({ limit: "10mb" }));
app.use(express.urlencoded({ extended: true, limit: "10mb" }));
// Request logging middleware
app.use((req, res, next) => {
const start = Date.now();
const requestId = req.headers["x-request-id"] as string || "unknown";
res.on("finish", () => {
const duration = Date.now() - start;
httpRequestDuration.observe(
{ method: req.method, route: req.route?.path || req.path, status: res.statusCode },
duration / 1000
);
httpRequestTotal.inc({ method: req.method, route: req.route?.path || req.path, status: res.statusCode });
logger.info({
req,
res,
duration,
requestId,
}, `${req.method} ${req.path} ${res.statusCode}`);
});
next();
});
// Health check endpoints (no auth required)
app.get("/health", async (req, res) => {
const health = await healthCheck();
res.status(health.status === "healthy" ? 200 : 503).json(health);
});
app.get("/ready", async (req, res) => {
const ready = await readinessCheck();
res.status(ready ? 200 : 503).json({ ready });
});
app.get("/live", async (req, res) => {
const alive = await livenessCheck();
res.status(alive ? 200 : 503).json({ alive });
});
// Metrics endpoint
app.get("/metrics", async (req, res) => {
res.setHeader("Content-Type", register.contentType);
const metrics = await getMetrics();
res.send(metrics);
});
// API routes with rate limiting
app.use("/api", apiLimiter);
// Plan management endpoints
app.post("/api/plans", auditLog("CREATE_PLAN", "plan"), createPlan);
app.get("/api/plans/:planId", getPlan);
app.post("/api/plans/:planId/signature", addSignature);
app.post("/api/plans/:planId/validate", validatePlanEndpoint);
// Execution endpoints
import { executePlan, getExecutionStatus, abortExecution } from "./api/execution";
app.post("/api/plans/:planId/execute", auditLog("EXECUTE_PLAN", "plan"), executePlan);
app.get("/api/plans/:planId/status", getExecutionStatus);
app.post("/api/plans/:planId/abort", auditLog("ABORT_PLAN", "plan"), abortExecution);
app.get("/api/plans/:planId/status/stream", streamPlanStatus);
// Error handling middleware
app.use((err: any, req: express.Request, res: express.Response, next: express.NextFunction) => {
logger.error({ err, req }, "Unhandled error");
res.status(err.status || 500).json({
error: "Internal server error",
message: process.env.NODE_ENV === "development" ? err.message : undefined,
requestId: req.headers["x-request-id"],
});
});
// Graceful shutdown
process.on("SIGTERM", async () => {
logger.info("SIGTERM received, shutting down gracefully");
// Close database connections
// Close SSE connections
process.exit(0);
});
process.on("SIGINT", async () => {
logger.info("SIGINT received, shutting down gracefully");
process.exit(0);
});
// Start server
async function start() {
try {
// Run database migrations
if (process.env.RUN_MIGRATIONS === "true") {
await runMigration();
}
app.listen(PORT, () => {
logger.info({ port: PORT }, "Orchestrator service started");
});
} catch (error) {
logger.error({ error }, "Failed to start server");
process.exit(1);
}
}
start();

View File

@@ -0,0 +1,74 @@
import pino from "pino";
import { env } from "../config/env";
/**
* Configure Pino logger with structured logging
*/
export const logger = pino({
level: env.LOG_LEVEL,
transport: {
target: "pino-pretty",
options: {
colorize: true,
translateTime: "SYS:standard",
ignore: "pid,hostname",
},
},
formatters: {
level: (label) => {
return { level: label };
},
},
serializers: {
req: (req) => ({
id: req.id,
method: req.method,
url: req.url,
headers: {
host: req.headers.host,
"user-agent": req.headers["user-agent"],
"x-request-id": req.headers["x-request-id"],
},
}),
res: (res) => ({
statusCode: res.statusCode,
}),
err: pino.stdSerializers.err,
},
});
/**
* Mask PII in log data
*/
export function maskPII(data: any): any {
if (typeof data === "string") {
// Mask email addresses
return data.replace(/\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b/g, "[EMAIL]");
}
if (Array.isArray(data)) {
return data.map(maskPII);
}
if (data && typeof data === "object") {
const masked: any = {};
for (const key in data) {
const lowerKey = key.toLowerCase();
if (lowerKey.includes("email") || lowerKey.includes("password") || lowerKey.includes("secret") || lowerKey.includes("token")) {
masked[key] = "[REDACTED]";
} else if (lowerKey.includes("iban") || lowerKey.includes("account")) {
masked[key] = data[key] ? `${String(data[key]).substring(0, 4)}****` : data[key];
} else {
masked[key] = maskPII(data[key]);
}
}
return masked;
}
return data;
}
/**
* Create child logger with context
*/
export function createChildLogger(context: Record<string, any>) {
return logger.child(maskPII(context));
}

View File

@@ -0,0 +1,79 @@
import { Registry, Counter, Histogram, Gauge } from "prom-client";
/**
* Prometheus metrics registry
*/
export const register = new Registry();
/**
* HTTP request metrics
*/
export const httpRequestDuration = new Histogram({
name: "http_request_duration_seconds",
help: "Duration of HTTP requests in seconds",
labelNames: ["method", "route", "status"],
buckets: [0.1, 0.5, 1, 2, 5, 10],
registers: [register],
});
export const httpRequestTotal = new Counter({
name: "http_requests_total",
help: "Total number of HTTP requests",
labelNames: ["method", "route", "status"],
registers: [register],
});
/**
* Business metrics
*/
export const planCreationTotal = new Counter({
name: "plans_created_total",
help: "Total number of plans created",
labelNames: ["status"],
registers: [register],
});
export const planExecutionTotal = new Counter({
name: "plans_executed_total",
help: "Total number of plans executed",
labelNames: ["status"],
registers: [register],
});
export const planExecutionDuration = new Histogram({
name: "plan_execution_duration_seconds",
help: "Duration of plan execution in seconds",
labelNames: ["status"],
buckets: [1, 5, 10, 30, 60, 120],
registers: [register],
});
export const complianceCheckTotal = new Counter({
name: "compliance_checks_total",
help: "Total number of compliance checks",
labelNames: ["status"],
registers: [register],
});
/**
* System metrics
*/
export const activeExecutions = new Gauge({
name: "active_executions",
help: "Number of currently active plan executions",
registers: [register],
});
export const databaseConnections = new Gauge({
name: "database_connections",
help: "Number of active database connections",
registers: [register],
});
/**
* Get metrics endpoint handler
*/
export async function getMetrics(): Promise<string> {
return register.metrics();
}

View File

@@ -0,0 +1,44 @@
import { Request, Response, NextFunction } from "express";
/**
* API Key authentication middleware
*/
export const apiKeyAuth = (req: Request, res: Response, next: NextFunction) => {
const apiKey = req.headers["x-api-key"] || req.headers["authorization"]?.replace("Bearer ", "");
if (!apiKey) {
return res.status(401).json({
error: "Unauthorized",
message: "API key is required",
});
}
// Validate API key (in production, check against database)
const validApiKeys = process.env.API_KEYS?.split(",") || [];
if (!validApiKeys.includes(apiKey as string)) {
return res.status(403).json({
error: "Forbidden",
message: "Invalid API key",
});
}
// Attach API key info to request
(req as any).apiKey = apiKey;
next();
};
/**
* Optional API key authentication (for public endpoints)
*/
export const optionalApiKeyAuth = (req: Request, res: Response, next: NextFunction) => {
const apiKey = req.headers["x-api-key"] || req.headers["authorization"]?.replace("Bearer ", "");
if (apiKey) {
const validApiKeys = process.env.API_KEYS?.split(",") || [];
if (validApiKeys.includes(apiKey as string)) {
(req as any).apiKey = apiKey;
(req as any).authenticated = true;
}
}
next();
};

View File

@@ -0,0 +1,53 @@
import { Request, Response, NextFunction } from "express";
interface AuditLogEntry {
timestamp: string;
requestId: string;
userId?: string;
action: string;
resource: string;
ip: string;
userAgent?: string;
success: boolean;
error?: string;
}
/**
* Audit logging middleware for sensitive operations
*/
export const auditLog = (action: string, resource: string) => {
return (req: Request, res: Response, next: NextFunction) => {
const originalSend = res.send;
const startTime = Date.now();
res.send = function (body: any) {
const duration = Date.now() - startTime;
const requestId = req.headers["x-request-id"] as string || "unknown";
const userId = (req as any).user?.id || (req as any).apiKey || "anonymous";
const ip = req.ip || req.headers["x-forwarded-for"] || req.socket.remoteAddress || "unknown";
const auditEntry: AuditLogEntry = {
timestamp: new Date().toISOString(),
requestId,
userId: userId as string,
action,
resource,
ip: ip as string,
userAgent: req.headers["user-agent"],
success: res.statusCode < 400,
error: res.statusCode >= 400 ? body : undefined,
};
// Log to audit system (in production, send to dedicated audit service)
console.log("[AUDIT]", JSON.stringify(auditEntry));
// In production, send to audit service
// auditService.log(auditEntry);
return originalSend.call(this, body);
};
next();
};
};

View File

@@ -0,0 +1,8 @@
export { apiLimiter, authLimiter, planCreationLimiter, executionLimiter } from "./rateLimit";
export { securityHeaders, requestSizeLimits, requestId } from "./security";
export { apiKeyAuth, optionalApiKeyAuth } from "./apiKeyAuth";
export { validate, sanitizeInput } from "./validation";
export { ipWhitelist, getClientIP } from "./ipWhitelist";
export { auditLog } from "./auditLog";
export { sessionManager } from "./session";

View File

@@ -0,0 +1,31 @@
import { Request, Response, NextFunction } from "express";
/**
* IP whitelist middleware for admin endpoints
*/
export const ipWhitelist = (allowedIPs: string[]) => {
return (req: Request, res: Response, next: NextFunction) => {
const clientIP = req.ip || req.headers["x-forwarded-for"] || req.socket.remoteAddress;
if (!clientIP || !allowedIPs.includes(clientIP as string)) {
return res.status(403).json({
error: "Forbidden",
message: "Access denied from this IP address",
});
}
next();
};
};
/**
* Get client IP from request
*/
export const getClientIP = (req: Request): string => {
return (req.headers["x-forwarded-for"] as string)?.split(",")[0]?.trim() ||
req.headers["x-real-ip"] as string ||
req.ip ||
req.socket.remoteAddress ||
"unknown";
};

View File

@@ -0,0 +1,41 @@
import rateLimit from "express-rate-limit";
/**
* General API rate limiter
*/
export const apiLimiter = rateLimit({
windowMs: 15 * 60 * 1000, // 15 minutes
max: 100, // Limit each IP to 100 requests per windowMs
message: "Too many requests from this IP, please try again later.",
standardHeaders: true,
legacyHeaders: false,
});
/**
* Strict rate limiter for authentication endpoints
*/
export const authLimiter = rateLimit({
windowMs: 15 * 60 * 1000, // 15 minutes
max: 5, // Limit each IP to 5 requests per windowMs
message: "Too many authentication attempts, please try again later.",
skipSuccessfulRequests: true,
});
/**
* Rate limiter for plan creation
*/
export const planCreationLimiter = rateLimit({
windowMs: 60 * 60 * 1000, // 1 hour
max: 10, // Limit each IP to 10 plan creations per hour
message: "Too many plan creation attempts, please try again later.",
});
/**
* Rate limiter for execution endpoints
*/
export const executionLimiter = rateLimit({
windowMs: 60 * 60 * 1000, // 1 hour
max: 20, // Limit each IP to 20 executions per hour
message: "Too many execution attempts, please try again later.",
});

View File

@@ -0,0 +1,59 @@
import helmet from "helmet";
import { Request, Response, NextFunction } from "express";
/**
* Security headers middleware
*/
export const securityHeaders = helmet({
contentSecurityPolicy: {
directives: {
defaultSrc: ["'self'"],
scriptSrc: ["'self'"],
styleSrc: ["'self'", "'unsafe-inline'"],
imgSrc: ["'self'", "data:", "https:"],
connectSrc: ["'self'"],
fontSrc: ["'self'"],
objectSrc: ["'none'"],
mediaSrc: ["'self'"],
frameSrc: ["'none'"],
},
},
hsts: {
maxAge: 31536000,
includeSubDomains: true,
preload: true,
},
frameguard: {
action: "deny",
},
noSniff: true,
xssFilter: true,
});
/**
* Request size limits
*/
export const requestSizeLimits = (req: Request, res: Response, next: NextFunction) => {
// Set body size limit to 10MB
if (req.headers["content-length"]) {
const contentLength = parseInt(req.headers["content-length"], 10);
if (contentLength > 10 * 1024 * 1024) {
return res.status(413).json({
error: "Request entity too large",
message: "Maximum request size is 10MB",
});
}
}
next();
};
/**
* Request ID middleware for tracking
*/
export const requestId = (req: Request, res: Response, next: NextFunction) => {
const id = req.headers["x-request-id"] || `req-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`;
req.headers["x-request-id"] = id;
res.setHeader("X-Request-ID", id);
next();
};

View File

@@ -0,0 +1,71 @@
import { Request, Response, NextFunction } from "express";
import { v4 as uuidv4 } from "uuid";
interface SessionData {
sessionId: string;
userId?: string;
createdAt: number;
lastActivity: number;
expiresAt: number;
}
const sessions: Map<string, SessionData> = new Map();
const SESSION_TIMEOUT = 30 * 60 * 1000; // 30 minutes
const MAX_SESSION_AGE = 24 * 60 * 60 * 1000; // 24 hours
/**
* Session management middleware
*/
export const sessionManager = (req: Request, res: Response, next: NextFunction) => {
const sessionId = req.headers["x-session-id"] || req.cookies?.sessionId;
if (sessionId && sessions.has(sessionId)) {
const session = sessions.get(sessionId)!;
const now = Date.now();
// Check if session expired
if (now > session.expiresAt || now - session.lastActivity > SESSION_TIMEOUT) {
sessions.delete(sessionId);
return res.status(401).json({
error: "Session expired",
message: "Please sign in again",
});
}
// Update last activity
session.lastActivity = now;
(req as any).session = session;
} else {
// Create new session
const newSession: SessionData = {
sessionId: uuidv4(),
createdAt: Date.now(),
lastActivity: Date.now(),
expiresAt: Date.now() + MAX_SESSION_AGE,
};
sessions.set(newSession.sessionId, newSession);
(req as any).session = newSession;
res.setHeader("X-Session-ID", newSession.sessionId);
}
// Cleanup expired sessions
cleanupExpiredSessions();
next();
};
/**
* Cleanup expired sessions
*/
function cleanupExpiredSessions() {
const now = Date.now();
for (const [sessionId, session] of sessions.entries()) {
if (now > session.expiresAt || now - session.lastActivity > SESSION_TIMEOUT) {
sessions.delete(sessionId);
}
}
}
// Run cleanup every 5 minutes
setInterval(cleanupExpiredSessions, 5 * 60 * 1000);

View File

@@ -0,0 +1,57 @@
import { Request, Response, NextFunction } from "express";
import { z } from "zod";
/**
* Request validation middleware using Zod
*/
export const validate = (schema: z.ZodSchema) => {
return (req: Request, res: Response, next: NextFunction) => {
try {
schema.parse(req.body);
next();
} catch (error) {
if (error instanceof z.ZodError) {
return res.status(400).json({
error: "Validation failed",
errors: error.errors,
});
}
next(error);
}
};
};
/**
* Sanitize input to prevent XSS
*/
export const sanitizeInput = (req: Request, res: Response, next: NextFunction) => {
const sanitize = (obj: any): any => {
if (typeof obj === "string") {
// Remove potentially dangerous characters
return obj
.replace(/<script\b[^<]*(?:(?!<\/script>)<[^<]*)*<\/script>/gi, "")
.replace(/javascript:/gi, "")
.replace(/on\w+\s*=/gi, "");
}
if (Array.isArray(obj)) {
return obj.map(sanitize);
}
if (obj && typeof obj === "object") {
const sanitized: any = {};
for (const key in obj) {
sanitized[key] = sanitize(obj[key]);
}
return sanitized;
}
return obj;
};
if (req.body) {
req.body = sanitize(req.body);
}
if (req.query) {
req.query = sanitize(req.query);
}
next();
};

View File

@@ -0,0 +1,106 @@
import Redis from "ioredis";
/**
* Redis caching service
*/
let redis: Redis | null = null;
/**
* Initialize Redis connection
*/
export function initRedis(url?: string): Redis {
if (!redis) {
redis = new Redis(url || process.env.REDIS_URL || "redis://localhost:6379", {
maxRetriesPerRequest: 3,
retryStrategy: (times) => {
const delay = Math.min(times * 50, 2000);
return delay;
},
});
redis.on("error", (err) => {
console.error("Redis connection error:", err);
});
redis.on("connect", () => {
console.log("✅ Redis connected");
});
}
return redis;
}
/**
* Get Redis client
*/
export function getRedis(): Redis | null {
if (!redis && process.env.REDIS_URL) {
initRedis();
}
return redis;
}
/**
* Cache wrapper with TTL
*/
export async function cacheGet<T>(key: string): Promise<T | null> {
const client = getRedis();
if (!client) return null;
try {
const value = await client.get(key);
return value ? JSON.parse(value) : null;
} catch (error) {
console.error("Cache get error:", error);
return null;
}
}
export async function cacheSet<T>(key: string, value: T, ttlSeconds = 3600): Promise<void> {
const client = getRedis();
if (!client) return;
try {
await client.setex(key, ttlSeconds, JSON.stringify(value));
} catch (error) {
console.error("Cache set error:", error);
}
}
export async function cacheDelete(key: string): Promise<void> {
const client = getRedis();
if (!client) return;
try {
await client.del(key);
} catch (error) {
console.error("Cache delete error:", error);
}
}
/**
* Cache middleware for Express routes
*/
export function cacheMiddleware(ttlSeconds = 300) {
return async (req: express.Request, res: express.Response, next: express.NextFunction) => {
if (req.method !== "GET") {
return next();
}
const cacheKey = `cache:${req.path}:${JSON.stringify(req.query)}`;
const cached = await cacheGet(cacheKey);
if (cached) {
return res.json(cached);
}
const originalSend = res.json;
res.json = function (body: any) {
cacheSet(cacheKey, body, ttlSeconds).catch(console.error);
return originalSend.call(this, body);
};
next();
};
}

View File

@@ -0,0 +1,62 @@
import { query } from "../db/postgres";
interface DeadLetterMessage {
messageId: string;
originalQueue: string;
payload: any;
error: string;
retryCount: number;
createdAt: string;
}
/**
* Add message to dead letter queue
*/
export async function addToDLQ(
queue: string,
payload: any,
error: string
): Promise<void> {
const messageId = `dlq-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`;
await query(
`INSERT INTO dead_letter_queue (message_id, queue, payload, error, retry_count, created_at)
VALUES ($1, $2, $3, $4, $5, $6)`,
[messageId, queue, JSON.stringify(payload), error, 0, new Date().toISOString()]
);
}
/**
* Get messages from DLQ for retry
*/
export async function getDLQMessages(queue: string, limit = 10): Promise<DeadLetterMessage[]> {
const result = await query<DeadLetterMessage>(
`SELECT * FROM dead_letter_queue
WHERE queue = $1 AND retry_count < 3
ORDER BY created_at ASC
LIMIT $2`,
[queue, limit]
);
return result.map((row) => ({
messageId: row.message_id,
originalQueue: row.queue,
payload: typeof row.payload === "string" ? JSON.parse(row.payload) : row.payload,
error: row.error,
retryCount: row.retry_count,
createdAt: row.created_at,
}));
}
/**
* Increment retry count
*/
export async function incrementRetryCount(messageId: string): Promise<void> {
await query(
`UPDATE dead_letter_queue
SET retry_count = retry_count + 1, updated_at = CURRENT_TIMESTAMP
WHERE message_id = $1`,
[messageId]
);
}

View File

@@ -0,0 +1,103 @@
import { Request, Response, NextFunction } from "express";
import { logger } from "../logging/logger";
/**
* Error classification
*/
export enum ErrorType {
USER_ERROR = "USER_ERROR",
SYSTEM_ERROR = "SYSTEM_ERROR",
VALIDATION_ERROR = "VALIDATION_ERROR",
AUTHENTICATION_ERROR = "AUTHENTICATION_ERROR",
AUTHORIZATION_ERROR = "AUTHORIZATION_ERROR",
NOT_FOUND_ERROR = "NOT_FOUND_ERROR",
RATE_LIMIT_ERROR = "RATE_LIMIT_ERROR",
EXTERNAL_SERVICE_ERROR = "EXTERNAL_SERVICE_ERROR",
}
/**
* Custom error class
*/
export class AppError extends Error {
constructor(
public type: ErrorType,
public statusCode: number,
message: string,
public details?: any
) {
super(message);
this.name = "AppError";
}
}
/**
* Error handling middleware
*/
export function errorHandler(
err: Error | AppError,
req: Request,
res: Response,
next: NextFunction
) {
const requestId = req.headers["x-request-id"] as string || "unknown";
// Handle known application errors
if (err instanceof AppError) {
logger.warn({
error: err,
type: err.type,
requestId,
path: req.path,
}, `Application error: ${err.message}`);
return res.status(err.statusCode).json({
error: err.type,
message: err.message,
details: err.details,
requestId,
});
}
// Handle validation errors
if (err.name === "ValidationError" || err.name === "ZodError") {
logger.warn({
error: err,
requestId,
path: req.path,
}, "Validation error");
return res.status(400).json({
error: ErrorType.VALIDATION_ERROR,
message: "Validation failed",
details: err.message,
requestId,
});
}
// Handle unknown errors
logger.error({
error: err,
requestId,
path: req.path,
stack: err.stack,
}, "Unhandled error");
res.status(500).json({
error: ErrorType.SYSTEM_ERROR,
message: "An internal server error occurred",
requestId,
...(process.env.NODE_ENV === "development" && { details: err.message }),
});
}
/**
* Async error wrapper
*/
export function asyncHandler(
fn: (req: Request, res: Response, next: NextFunction) => Promise<any>
) {
return (req: Request, res: Response, next: NextFunction) => {
Promise.resolve(fn(req, res, next)).catch(next);
};
}

View File

@@ -0,0 +1,61 @@
/**
* Feature flags service with LaunchDarkly integration
*/
interface FeatureFlag {
key: string;
value: boolean;
defaultValue: boolean;
}
const flags: Map<string, boolean> = new Map();
/**
* Initialize feature flags
*/
export function initFeatureFlags() {
// Load from environment variables
const envFlags = {
enableRecursion: process.env.ENABLE_RECURSION === "true",
enableFlashLoans: process.env.ENABLE_FLASH_LOANS === "true",
enableSimulation: process.env.ENABLE_SIMULATION === "true",
enableWebSocket: process.env.ENABLE_WEBSOCKET === "true",
};
Object.entries(envFlags).forEach(([key, value]) => {
flags.set(key, value);
});
}
/**
* Get feature flag value
*/
export function getFeatureFlag(key: string, defaultValue = false): boolean {
return flags.get(key) ?? defaultValue;
}
/**
* Set feature flag (for testing/admin)
*/
export function setFeatureFlag(key: string, value: boolean) {
flags.set(key, value);
}
/**
* LaunchDarkly integration (optional)
*/
export class LaunchDarklyService {
private client: any;
constructor(ldClient: any) {
this.client = ldClient;
}
async getFlag(key: string, defaultValue = false): Promise<boolean> {
if (this.client) {
return await this.client.variation(key, { key: "user" }, defaultValue);
}
return defaultValue;
}
}

View File

@@ -0,0 +1,62 @@
/**
* Graceful degradation strategies
*/
export interface DegradationStrategy {
fallback: () => Promise<any>;
timeout?: number;
}
/**
* Execute with graceful degradation
*/
export async function executeWithDegradation<T>(
primary: () => Promise<T>,
strategies: DegradationStrategy[]
): Promise<T> {
try {
return await primary();
} catch (error) {
// Try fallback strategies in order
for (const strategy of strategies) {
try {
if (strategy.timeout) {
return await Promise.race([
strategy.fallback(),
new Promise<T>((_, reject) =>
setTimeout(() => reject(new Error("Fallback timeout")), strategy.timeout)
),
]);
}
return await strategy.fallback();
} catch (fallbackError) {
// Try next strategy
continue;
}
}
throw error; // All fallbacks failed
}
}
/**
* Example: Get plan with fallback to cache
*/
export async function getPlanWithFallback(planId: string, getFromCache: () => Promise<any>) {
return executeWithDegradation(
async () => {
// Primary: Get from database
const { getPlanById } = await import("../db/plans");
return await getPlanById(planId);
},
[
{
fallback: getFromCache,
timeout: 1000,
},
{
fallback: async () => ({ planId, status: "unknown" }),
},
]
);
}

View File

@@ -0,0 +1,66 @@
/**
* HSM (Hardware Security Module) integration service
* For cryptographic operations in production
*/
export interface HSMService {
sign(data: Buffer, keyId: string): Promise<Buffer>;
verify(data: Buffer, signature: Buffer, keyId: string): Promise<boolean>;
generateKey(keyId: string): Promise<string>;
encrypt(data: Buffer, keyId: string): Promise<Buffer>;
decrypt(encrypted: Buffer, keyId: string): Promise<Buffer>;
}
/**
* Mock HSM service (for development)
* In production, integrate with actual HSM (AWS CloudHSM, Azure Dedicated HSM, etc.)
*/
export class MockHSMService implements HSMService {
private keys: Map<string, Buffer> = new Map();
async sign(data: Buffer, keyId: string): Promise<Buffer> {
// Mock implementation - in production use HSM SDK
const key = this.keys.get(keyId) || Buffer.from(keyId);
// In production: return await hsmClient.sign(data, keyId);
return Buffer.from("mock-signature");
}
async verify(data: Buffer, signature: Buffer, keyId: string): Promise<boolean> {
// Mock implementation
// In production: return await hsmClient.verify(data, signature, keyId);
return true;
}
async generateKey(keyId: string): Promise<string> {
// Mock implementation
// In production: return await hsmClient.generateKey(keyId);
const key = Buffer.from(`key-${keyId}-${Date.now()}`);
this.keys.set(keyId, key);
return keyId;
}
async encrypt(data: Buffer, keyId: string): Promise<Buffer> {
// Mock implementation
// In production: return await hsmClient.encrypt(data, keyId);
return Buffer.from(`encrypted-${data.toString()}`);
}
async decrypt(encrypted: Buffer, keyId: string): Promise<Buffer> {
// Mock implementation
// In production: return await hsmClient.decrypt(encrypted, keyId);
return Buffer.from(encrypted.toString().replace("encrypted-", ""));
}
}
/**
* Get HSM service instance
*/
export function getHSMService(): HSMService {
// In production, initialize actual HSM client
// const hsmUrl = process.env.HSM_URL;
// const hsmClient = new HSMClient(hsmUrl);
// return new HSMService(hsmClient);
return new MockHSMService();
}

View File

@@ -0,0 +1,3 @@
// Re-export cache functions
export { initRedis, getRedis, cacheGet, cacheSet, cacheDelete, cacheMiddleware } from "./cache";

View File

@@ -0,0 +1,104 @@
/**
* Secrets management service
* Supports Azure Key Vault and AWS Secrets Manager
*/
export interface SecretsService {
getSecret(name: string): Promise<string | null>;
setSecret(name: string, value: string): Promise<void>;
deleteSecret(name: string): Promise<void>;
}
/**
* Azure Key Vault implementation
*/
export class AzureKeyVaultService implements SecretsService {
private vaultUrl: string;
constructor(vaultUrl: string) {
this.vaultUrl = vaultUrl;
}
async getSecret(name: string): Promise<string | null> {
// Mock implementation - in production use @azure/keyvault-secrets
try {
// const client = new SecretClient(this.vaultUrl, credential);
// const secret = await client.getSecret(name);
// return secret.value;
return process.env[name] || null;
} catch (error) {
console.error(`Failed to get secret ${name}:`, error);
return null;
}
}
async setSecret(name: string, value: string): Promise<void> {
// Mock implementation
// const client = new SecretClient(this.vaultUrl, credential);
// await client.setSecret(name, value);
console.log(`[Secrets] Setting secret ${name} (mock)`);
}
async deleteSecret(name: string): Promise<void> {
// Mock implementation
// const client = new SecretClient(this.vaultUrl, credential);
// await client.beginDeleteSecret(name);
console.log(`[Secrets] Deleting secret ${name} (mock)`);
}
}
/**
* AWS Secrets Manager implementation
*/
export class AWSSecretsManagerService implements SecretsService {
private region: string;
constructor(region: string) {
this.region = region;
}
async getSecret(name: string): Promise<string | null> {
// Mock implementation - in production use AWS SDK
try {
// const client = new SecretsManagerClient({ region: this.region });
// const response = await client.send(new GetSecretValueCommand({ SecretId: name }));
// return response.SecretString || null;
return process.env[name] || null;
} catch (error) {
console.error(`Failed to get secret ${name}:`, error);
return null;
}
}
async setSecret(name: string, value: string): Promise<void> {
// Mock implementation
console.log(`[Secrets] Setting secret ${name} (mock)`);
}
async deleteSecret(name: string): Promise<void> {
// Mock implementation
console.log(`[Secrets] Deleting secret ${name} (mock)`);
}
}
/**
* Get secrets service instance
*/
export function getSecretsService(): SecretsService {
const vaultUrl = process.env.AZURE_KEY_VAULT_URL;
const awsRegion = process.env.AWS_SECRETS_MANAGER_REGION;
if (vaultUrl) {
return new AzureKeyVaultService(vaultUrl);
} else if (awsRegion) {
return new AWSSecretsManagerService(awsRegion);
} else {
// Fallback to environment variables
return {
getSecret: async (name: string) => process.env[name] || null,
setSecret: async () => {},
deleteSecret: async () => {},
};
}
}

View File

@@ -0,0 +1,27 @@
/**
* Timeout wrapper for async operations
*/
export function withTimeout<T>(
promise: Promise<T>,
timeoutMs: number,
errorMessage = "Operation timed out"
): Promise<T> {
return Promise.race([
promise,
new Promise<T>((_, reject) =>
setTimeout(() => reject(new Error(errorMessage)), timeoutMs)
),
]);
}
/**
* Create timeout configuration for different operation types
*/
export const TIMEOUTS = {
DLT_EXECUTION: 300000, // 5 minutes
BANK_API_CALL: 60000, // 1 minute
COMPLIANCE_CHECK: 30000, // 30 seconds
DATABASE_QUERY: 10000, // 10 seconds
EXTERNAL_API: 30000, // 30 seconds
};

View File

@@ -0,0 +1,68 @@
import https from "https";
import { createHash } from "crypto";
/**
* Certificate pinning for external API calls
* Prevents MITM attacks by verifying server certificates
*/
interface PinnedCertificate {
hostname: string;
fingerprints: string[]; // SHA-256 fingerprints
}
const pinnedCertificates: PinnedCertificate[] = [
// Add production certificates here
// {
// hostname: "api.bank.com",
// fingerprints: ["sha256/ABC123..."],
// },
];
/**
* Get certificate fingerprint
*/
function getCertificateFingerprint(cert: any): string {
const certBuffer = Buffer.from(cert.raw || cert.toString(), "base64");
return `sha256/${createHash("sha256").update(certBuffer).digest("base64")}`;
}
/**
* Create HTTPS agent with certificate pinning
*/
export function createPinnedAgent(hostname: string): https.Agent | null {
const pinned = pinnedCertificates.find((p) => p.hostname === hostname);
if (!pinned) {
// No pinning configured for this hostname
return null;
}
return new https.Agent({
checkServerIdentity: (servername: string, cert: any) => {
const fingerprint = getCertificateFingerprint(cert);
if (!pinned.fingerprints.includes(fingerprint)) {
throw new Error(
`Certificate pinning failed for ${servername}. Expected one of: ${pinned.fingerprints.join(", ")}, got: ${fingerprint}`
);
}
// Default certificate validation
return undefined;
},
});
}
/**
* Add certificate pin
*/
export function addCertificatePin(hostname: string, fingerprints: string[]) {
const existing = pinnedCertificates.findIndex((p) => p.hostname === hostname);
if (existing >= 0) {
pinnedCertificates[existing].fingerprints = fingerprints;
} else {
pinnedCertificates.push({ hostname, fingerprints });
}
}

View File

@@ -0,0 +1,72 @@
import { z } from "zod";
/**
* Plan validation schema
*/
export const planSchema = z.object({
creator: z.string().min(1),
steps: z.array(z.object({
type: z.enum(["borrow", "swap", "repay", "pay"]),
asset: z.string().optional(),
amount: z.number().positive(),
from: z.string().optional(),
to: z.string().optional(),
collateralRef: z.string().optional(),
beneficiary: z.object({
IBAN: z.string().optional(),
BIC: z.string().optional(),
name: z.string().optional(),
}).optional(),
})).min(1),
maxRecursion: z.number().int().min(0).max(10).optional(),
maxLTV: z.number().min(0).max(1).optional(),
signature: z.string().optional(),
});
/**
* Signature validation schema
*/
export const signatureSchema = z.object({
signature: z.string().min(1),
messageHash: z.string().min(1),
signerAddress: z.string().min(1),
});
/**
* Compliance check schema
*/
export const complianceCheckSchema = z.object({
steps: z.array(z.any()),
});
/**
* Sanitize string input
*/
export function sanitizeString(input: string): string {
return input
.replace(/[<>]/g, "") // Remove angle brackets
.replace(/javascript:/gi, "") // Remove javascript: protocol
.replace(/on\w+\s*=/gi, "") // Remove event handlers
.trim();
}
/**
* Sanitize object recursively
*/
export function sanitizeObject<T>(obj: T): T {
if (typeof obj === "string") {
return sanitizeString(obj) as T;
}
if (Array.isArray(obj)) {
return obj.map(sanitizeObject) as T;
}
if (obj && typeof obj === "object") {
const sanitized: any = {};
for (const key in obj) {
sanitized[key] = sanitizeObject(obj[key]);
}
return sanitized as T;
}
return obj;
}

View File

@@ -0,0 +1,21 @@
{
"compilerOptions": {
"target": "ES2020",
"module": "commonjs",
"lib": ["ES2020"],
"outDir": "./dist",
"rootDir": "./src",
"strict": true,
"esModuleInterop": true,
"skipLibCheck": true,
"forceConsistentCasingInFileNames": true,
"resolveJsonModule": true,
"moduleResolution": "node",
"declaration": true,
"declarationMap": true,
"sourceMap": true
},
"include": ["src/**/*"],
"exclude": ["node_modules", "dist"]
}

177
terraform/main.tf Normal file
View File

@@ -0,0 +1,177 @@
# Terraform configuration for ISO-20022 Combo Flow infrastructure
terraform {
required_version = ">= 1.0"
required_providers {
aws = {
source = "hashicorp/aws"
version = "~> 5.0"
}
}
}
provider "aws" {
region = var.aws_region
}
# VPC
resource "aws_vpc" "main" {
cidr_block = "10.0.0.0/16"
enable_dns_hostnames = true
enable_dns_support = true
tags = {
Name = "comboflow-vpc"
}
}
# Subnets
resource "aws_subnet" "public" {
vpc_id = aws_vpc.main.id
cidr_block = "10.0.1.0/24"
availability_zone = "${var.aws_region}a"
tags = {
Name = "comboflow-public"
}
}
resource "aws_subnet" "private" {
vpc_id = aws_vpc.main.id
cidr_block = "10.0.2.0/24"
availability_zone = "${var.aws_region}b"
tags = {
Name = "comboflow-private"
}
}
# RDS PostgreSQL
resource "aws_db_instance" "postgres" {
identifier = "comboflow-db"
engine = "postgres"
engine_version = "15.4"
instance_class = "db.t3.micro"
allocated_storage = 20
max_allocated_storage = 100
storage_encrypted = true
db_name = "comboflow"
username = "comboflow"
password = var.db_password
vpc_security_group_ids = [aws_security_group.rds.id]
db_subnet_group_name = aws_db_subnet_group.main.name
backup_retention_period = 7
backup_window = "03:00-04:00"
maintenance_window = "mon:04:00-mon:05:00"
skip_final_snapshot = false
final_snapshot_identifier = "comboflow-final-snapshot"
tags = {
Name = "comboflow-database"
}
}
# ElastiCache Redis
resource "aws_elasticache_cluster" "redis" {
cluster_id = "comboflow-redis"
engine = "redis"
node_type = "cache.t3.micro"
num_cache_nodes = 1
parameter_group_name = "default.redis7"
port = 6379
subnet_group_name = aws_elasticache_subnet_group.main.name
security_group_ids = [aws_security_group.redis.id]
}
# ECS Cluster
resource "aws_ecs_cluster" "main" {
name = "comboflow-cluster"
setting {
name = "containerInsights"
value = "enabled"
}
}
# Load Balancer
resource "aws_lb" "main" {
name = "comboflow-lb"
internal = false
load_balancer_type = "application"
security_groups = [aws_security_group.lb.id]
subnets = [aws_subnet.public.id]
enable_deletion_protection = false
}
# Security Groups
resource "aws_security_group" "rds" {
name = "comboflow-rds-sg"
description = "Security group for RDS"
vpc_id = aws_vpc.main.id
ingress {
from_port = 5432
to_port = 5432
protocol = "tcp"
cidr_blocks = [aws_vpc.main.cidr_block]
}
}
resource "aws_security_group" "redis" {
name = "comboflow-redis-sg"
description = "Security group for Redis"
vpc_id = aws_vpc.main.id
ingress {
from_port = 6379
to_port = 6379
protocol = "tcp"
cidr_blocks = [aws_vpc.main.cidr_block]
}
}
resource "aws_security_group" "lb" {
name = "comboflow-lb-sg"
description = "Security group for Load Balancer"
vpc_id = aws_vpc.main.id
ingress {
from_port = 80
to_port = 80
protocol = "tcp"
cidr_blocks = ["0.0.0.0/0"]
}
ingress {
from_port = 443
to_port = 443
protocol = "tcp"
cidr_blocks = ["0.0.0.0/0"]
}
egress {
from_port = 0
to_port = 0
protocol = "-1"
cidr_blocks = ["0.0.0.0/0"]
}
}
# Variables
variable "aws_region" {
description = "AWS region"
default = "us-east-1"
}
variable "db_password" {
description = "Database password"
type = string
sensitive = true
}

18
terraform/variables.tf Normal file
View File

@@ -0,0 +1,18 @@
variable "aws_region" {
description = "AWS region for resources"
type = string
default = "us-east-1"
}
variable "db_password" {
description = "PostgreSQL database password"
type = string
sensitive = true
}
variable "environment" {
description = "Environment name"
type = string
default = "production"
}