refactor(archive): move historical contracts and adapters to archive directory
- Archived multiple non-EVM adapters (Algorand, Hedera, Tron, TON, Cosmos, Solana) and compliance contracts (IndyVerifier) to `archive/solidity/contracts/`. - Updated documentation to reflect the historical status of archived components. - Adjusted `foundry.toml` and `README.md` for clarity on historical dependencies and configurations. - Enhanced Makefile and package.json scripts for improved contract testing and building processes. - Removed obsolete contracts (AlltraCustomBridge, CommodityCCIPBridge, ISO4217WCCIPBridge, VaultBridgeAdapter) from the main directory. - Updated implementation reports to indicate archived status for various components.
This commit is contained in:
@@ -71,7 +71,7 @@ When **no** 138→Mainnet (or configured destination) relay deliveries are neede
|
||||
| `RELAY_SHEDDING_SOURCE_POLL_INTERVAL_MS` | Source router log poll interval while shedding (default **60000** ms, min 5000). Reduces Chain 138 RPC usage. |
|
||||
| `RELAY_SHEDDING_QUEUE_POLL_MS` | Idle interval for the queue loop while shedding (default **5000** ms, min 1000). |
|
||||
|
||||
**Behavior:** Source `MessageSent` logs are still ingested and messages **queue in memory**. When you set `RELAY_SHEDDING=0` (and `RELAY_DELIVERY_ENABLED=1`) and **restart** the service, pending messages are delivered as usual. For production, plan shedding around low bridge traffic so the queue stays small (in-memory queue is lost on process crash).
|
||||
**Behavior:** Source `MessageSent` logs are still ingested and messages queue locally. Pending queue state is now persisted to `services/relay/data/queue-state.json` by default (override with `RELAY_QUEUE_STATE_PATH`), so a restart no longer drops queued work. For production, still plan shedding around low bridge traffic so the persisted backlog stays small and intentional.
|
||||
|
||||
## Skip specific message IDs
|
||||
|
||||
@@ -90,6 +90,10 @@ RELAY_SKIP_MESSAGE_IDS=0xf718c9895c0a5442349996383184d017d2fa041af7aaeb9f0c0675d
|
||||
|
||||
The relay checks this list during live event ingestion, historical replay, and queue processing.
|
||||
|
||||
For the current Mainnet WETH backlog policy, see:
|
||||
|
||||
- [`docs/03-deployment/MAINNET_WETH_RELAY_BACKLOG_POLICY.md`](../../../docs/03-deployment/MAINNET_WETH_RELAY_BACKLOG_POLICY.md)
|
||||
|
||||
### On-chain pause (`CCIPRelayRouter`)
|
||||
|
||||
The destination **CCIPRelayRouter** inherits OpenZeppelin **`Pausable`**: admins with `DEFAULT_ADMIN_ROLE` may call **`pause()`** / **`unpause()`**. While paused, **`relayMessage` reverts** (no delivery through the router).
|
||||
|
||||
@@ -2,15 +2,115 @@
|
||||
* Message Queue for managing pending relay messages
|
||||
*/
|
||||
|
||||
import { mkdir, readFile, rename, writeFile } from 'fs/promises';
|
||||
import path from 'path';
|
||||
|
||||
export class MessageQueue {
|
||||
constructor(logger) {
|
||||
constructor(logger, options = {}) {
|
||||
this.logger = logger;
|
||||
this.persistencePath = options.persistencePath || '';
|
||||
this.queue = [];
|
||||
this.processed = new Set();
|
||||
this.failed = new Set();
|
||||
this.retryCounts = new Map();
|
||||
this.messageStore = new Map();
|
||||
this.inFlight = new Map();
|
||||
this.lastPersistedAt = null;
|
||||
this.persistenceEnabled = Boolean(this.persistencePath);
|
||||
}
|
||||
|
||||
async init() {
|
||||
await this.loadSnapshot();
|
||||
}
|
||||
|
||||
serialize() {
|
||||
return {
|
||||
version: 1,
|
||||
queue: this.queue,
|
||||
retryCounts: Object.fromEntries(this.retryCounts.entries()),
|
||||
savedAt: new Date().toISOString()
|
||||
};
|
||||
}
|
||||
|
||||
static encodeForPersistence(value) {
|
||||
if (typeof value === 'bigint') {
|
||||
return { __type: 'bigint', value: value.toString() };
|
||||
}
|
||||
if (Array.isArray(value)) {
|
||||
return value.map((item) => MessageQueue.encodeForPersistence(item));
|
||||
}
|
||||
if (value && typeof value === 'object') {
|
||||
return Object.fromEntries(
|
||||
Object.entries(value).map(([key, entry]) => [key, MessageQueue.encodeForPersistence(entry)])
|
||||
);
|
||||
}
|
||||
return value;
|
||||
}
|
||||
|
||||
static decodeFromPersistence(value) {
|
||||
if (Array.isArray(value)) {
|
||||
return value.map((item) => MessageQueue.decodeFromPersistence(item));
|
||||
}
|
||||
if (value && typeof value === 'object') {
|
||||
if (value.__type === 'bigint' && typeof value.value === 'string') {
|
||||
return BigInt(value.value);
|
||||
}
|
||||
return Object.fromEntries(
|
||||
Object.entries(value).map(([key, entry]) => [key, MessageQueue.decodeFromPersistence(entry)])
|
||||
);
|
||||
}
|
||||
return value;
|
||||
}
|
||||
|
||||
async persistSnapshot() {
|
||||
if (!this.persistenceEnabled) return;
|
||||
|
||||
const directory = path.dirname(this.persistencePath);
|
||||
const tmpPath = `${this.persistencePath}.tmp`;
|
||||
await mkdir(directory, { recursive: true });
|
||||
await writeFile(
|
||||
tmpPath,
|
||||
JSON.stringify(MessageQueue.encodeForPersistence(this.serialize()), null, 2)
|
||||
);
|
||||
await rename(tmpPath, this.persistencePath);
|
||||
this.lastPersistedAt = new Date().toISOString();
|
||||
}
|
||||
|
||||
async loadSnapshot() {
|
||||
if (!this.persistenceEnabled) return;
|
||||
|
||||
try {
|
||||
const raw = await readFile(this.persistencePath, 'utf8');
|
||||
const parsed = JSON.parse(raw);
|
||||
const restoredQueueRaw = Array.isArray(parsed.queue) ? parsed.queue : [];
|
||||
const restoredQueue = MessageQueue.decodeFromPersistence(restoredQueueRaw);
|
||||
const restoredRetryCounts = parsed.retryCounts && typeof parsed.retryCounts === 'object'
|
||||
? Object.entries(parsed.retryCounts)
|
||||
: [];
|
||||
|
||||
this.queue = restoredQueue;
|
||||
this.messageStore.clear();
|
||||
for (const message of restoredQueue) {
|
||||
if (message?.messageId) {
|
||||
this.messageStore.set(message.messageId, message);
|
||||
}
|
||||
}
|
||||
this.retryCounts = new Map(
|
||||
restoredRetryCounts.map(([messageId, count]) => [messageId, Number(count) || 0])
|
||||
);
|
||||
this.lastPersistedAt = parsed.savedAt || null;
|
||||
|
||||
if (restoredQueue.length > 0) {
|
||||
this.logger.info(
|
||||
`Loaded ${restoredQueue.length} queued relay message(s) from ${this.persistencePath}`
|
||||
);
|
||||
}
|
||||
} catch (error) {
|
||||
if (error?.code === 'ENOENT') {
|
||||
return;
|
||||
}
|
||||
this.logger.warn(`Failed to load relay queue snapshot from ${this.persistencePath}`, error);
|
||||
}
|
||||
}
|
||||
|
||||
async add(messageData) {
|
||||
@@ -32,6 +132,7 @@ export class MessageQueue {
|
||||
this.messageStore.set(messageId, messageData);
|
||||
// Add to queue
|
||||
this.queue.push(messageData);
|
||||
await this.persistSnapshot();
|
||||
this.logger.info(`Added message ${messageId} to queue. Queue size: ${this.queue.length}`);
|
||||
}
|
||||
|
||||
@@ -43,6 +144,7 @@ export class MessageQueue {
|
||||
const messageData = this.queue.shift();
|
||||
if (messageData && messageData.messageId) {
|
||||
this.inFlight.set(messageData.messageId, messageData);
|
||||
await this.persistSnapshot();
|
||||
}
|
||||
return messageData;
|
||||
}
|
||||
@@ -52,6 +154,7 @@ export class MessageQueue {
|
||||
this.retryCounts.delete(messageId);
|
||||
this.inFlight.delete(messageId);
|
||||
this.messageStore.delete(messageId);
|
||||
await this.persistSnapshot();
|
||||
this.logger.info(`Message ${messageId} marked as processed`);
|
||||
}
|
||||
|
||||
@@ -59,6 +162,7 @@ export class MessageQueue {
|
||||
this.failed.add(messageId);
|
||||
this.retryCounts.delete(messageId);
|
||||
this.inFlight.delete(messageId);
|
||||
await this.persistSnapshot();
|
||||
this.logger.error(`Message ${messageId} marked as failed`);
|
||||
}
|
||||
|
||||
@@ -68,6 +172,7 @@ export class MessageQueue {
|
||||
|
||||
async resetRetryCount(messageId) {
|
||||
this.retryCounts.delete(messageId);
|
||||
await this.persistSnapshot();
|
||||
}
|
||||
|
||||
async retry(messageId, options = {}) {
|
||||
@@ -75,6 +180,7 @@ export class MessageQueue {
|
||||
const count = this.retryCounts.get(messageId) || 0;
|
||||
if (increment) {
|
||||
this.retryCounts.set(messageId, count + 1);
|
||||
await this.persistSnapshot();
|
||||
}
|
||||
|
||||
const existingIndex = this.queue.findIndex(m => m.messageId === messageId);
|
||||
@@ -92,6 +198,7 @@ export class MessageQueue {
|
||||
|
||||
this.inFlight.delete(messageId);
|
||||
this.queue.push(messageData);
|
||||
await this.persistSnapshot();
|
||||
const nextCount = increment ? count + 1 : count;
|
||||
this.logger.info(`Message ${messageId} requeued. Retry count: ${nextCount}. Queue size: ${this.queue.length}`);
|
||||
}
|
||||
@@ -100,7 +207,9 @@ export class MessageQueue {
|
||||
return {
|
||||
queueSize: this.queue.length,
|
||||
processed: this.processed.size,
|
||||
failed: this.failed.size
|
||||
failed: this.failed.size,
|
||||
persistenceEnabled: this.persistenceEnabled,
|
||||
lastPersistedAt: this.lastPersistedAt
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
@@ -21,7 +21,9 @@ export class RelayService {
|
||||
this.destinationProvider = null;
|
||||
this.sourceSigner = null;
|
||||
this.destinationSigner = null;
|
||||
this.messageQueue = new MessageQueue(this.logger);
|
||||
this.messageQueue = new MessageQueue(this.logger, {
|
||||
persistencePath: config.queuePersistence?.path || ''
|
||||
});
|
||||
|
||||
// Contract instances
|
||||
this.sourceRouter = null;
|
||||
@@ -260,7 +262,9 @@ export class RelayService {
|
||||
queue: {
|
||||
size: queueStats.queueSize,
|
||||
processed: queueStats.processed,
|
||||
failed: queueStats.failed
|
||||
failed: queueStats.failed,
|
||||
persistence_enabled: queueStats.persistenceEnabled,
|
||||
last_persisted_at: queueStats.lastPersistedAt
|
||||
},
|
||||
last_source_poll: this.lastSourcePoll,
|
||||
last_seen_message: this.lastSeenMessage,
|
||||
@@ -333,6 +337,7 @@ export class RelayService {
|
||||
this.sourceSigner = new ethers.Wallet(this.config.relayer.privateKey, this.sourceProvider);
|
||||
this.destinationSigner = new ethers.Wallet(this.config.relayer.privateKey, this.destinationProvider);
|
||||
this.relayerAddress = String(this.destinationSigner.address);
|
||||
await this.messageQueue.init();
|
||||
|
||||
this.logger.info('Relayer address: %s', this.relayerAddress);
|
||||
|
||||
@@ -375,6 +380,7 @@ export class RelayService {
|
||||
async stop() {
|
||||
this.logger.info('Stopping relay service...');
|
||||
this.isRunning = false;
|
||||
await this.messageQueue.persistSnapshot();
|
||||
// Additional cleanup if needed
|
||||
}
|
||||
|
||||
|
||||
@@ -235,6 +235,12 @@ export const config = {
|
||||
retryDelay: process.env.RETRY_DELAY ? parseInt(process.env.RETRY_DELAY) : 5000 // 5 seconds
|
||||
},
|
||||
|
||||
queuePersistence: {
|
||||
path:
|
||||
process.env.RELAY_QUEUE_STATE_PATH ||
|
||||
path.resolve(__dirname, '../data/queue-state.json')
|
||||
},
|
||||
|
||||
skipMessageIds: getSkipMessageIds()
|
||||
};
|
||||
|
||||
|
||||
@@ -8,7 +8,9 @@
|
||||
*/
|
||||
|
||||
import { existsSync } from 'fs';
|
||||
import { mkdtemp, rm } from 'fs/promises';
|
||||
import { join, dirname } from 'path';
|
||||
import { tmpdir } from 'os';
|
||||
import { fileURLToPath } from 'url';
|
||||
import { ethers } from 'ethers';
|
||||
import { MessageQueue } from './src/MessageQueue.js';
|
||||
@@ -93,7 +95,10 @@ const rejected = relay.evaluateMessageScope({
|
||||
});
|
||||
assert(rejected.inScope === false, 'WETH message should be rejected by the cW worker scope');
|
||||
|
||||
const queue = new MessageQueue(logger);
|
||||
const tempDir = await mkdtemp(join(tmpdir(), 'relay-queue-test-'));
|
||||
const queueStatePath = join(tempDir, 'queue-state.json');
|
||||
const queue = new MessageQueue(logger, { persistencePath: queueStatePath });
|
||||
await queue.init();
|
||||
const queuedMessage = {
|
||||
messageId: '0xtest-message',
|
||||
sender: '0x152ed3e9912161b76bdfd368d0c84b7c31c10de7',
|
||||
@@ -111,5 +116,76 @@ await queue.resetRetryCount(queuedMessage.messageId);
|
||||
await queue.retry(queuedMessage.messageId, { increment: false });
|
||||
assert((await queue.getRetryCount(queuedMessage.messageId)) === 0, 'deferred requeue should not consume retry budget');
|
||||
|
||||
const persistentQueue = new MessageQueue(logger, { persistencePath: queueStatePath });
|
||||
await persistentQueue.init();
|
||||
assert(
|
||||
persistentQueue.getStats().queueSize === 1,
|
||||
'persisted queue snapshot should reload queued messages after restart'
|
||||
);
|
||||
|
||||
const probeRelay = new RelayService({
|
||||
sourceChain: {
|
||||
name: 'Chain 138',
|
||||
chainId: 138,
|
||||
rpcUrl: 'http://example.invalid',
|
||||
routerAddress: '0x42DAb7b888Dd382bD5Adcf9E038dBF1fD03b4817',
|
||||
bridgeAddress: '0x152ed3e9912161b76bdfd368d0c84b7c31c10de7',
|
||||
},
|
||||
destinationChain: {
|
||||
name: 'Ethereum Mainnet',
|
||||
chainId: 1,
|
||||
rpcUrl: 'http://example.invalid',
|
||||
relayRouterAddress: '0x416564Ab73Ad5710855E98dC7bC7Bff7387285BA',
|
||||
relayBridgeAddress: '0x2bF74583206A49Be07E0E8A94197C12987AbD7B5',
|
||||
relayBridgeAllowlist: ['0x2bF74583206A49Be07E0E8A94197C12987AbD7B5'],
|
||||
chainSelector: 5009297550715157269n,
|
||||
deliveryMode: 'router',
|
||||
},
|
||||
tokenMapping: {},
|
||||
sourceChainSelector: 138n,
|
||||
relayer: { privateKey: '0x' + '11'.repeat(32), address: '' },
|
||||
monitoring: {
|
||||
startBlock: 'latest',
|
||||
pollInterval: 5000,
|
||||
confirmationBlocks: 1,
|
||||
finalityDelayBlocks: 2,
|
||||
replayWindowBlocks: 32,
|
||||
},
|
||||
retry: { maxRetries: 3, retryDelay: 1 },
|
||||
queuePersistence: { path: join(tempDir, 'probe-queue-state.json') },
|
||||
skipMessageIds: new Set(),
|
||||
}, logger);
|
||||
await probeRelay.messageQueue.init();
|
||||
await probeRelay.messageQueue.add({
|
||||
messageId: '0xprobe-fail',
|
||||
destinationChainSelector: 5009297550715157269n,
|
||||
sender: '0x152ed3e9912161b76bdfd368d0c84b7c31c10de7',
|
||||
receiver: cwReceiver,
|
||||
data: '0x',
|
||||
tokenAmounts: [{ token: '0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2', amount: 1n, amountType: 0 }],
|
||||
});
|
||||
probeRelay.destinationRelayRouter = { paused: async () => false };
|
||||
probeRelay.destinationProvider = {};
|
||||
probeRelay.getDestinationTxOptions = async () => ({ gasLimit: 1000000n });
|
||||
probeRelay.ensureTargetBridgeInventory = async () => {
|
||||
throw new Error('temporary eth_call failure');
|
||||
};
|
||||
const deferredMessage = await probeRelay.messageQueue.getNext();
|
||||
await probeRelay.relayMessage(deferredMessage);
|
||||
assert(
|
||||
probeRelay.lastError?.scope === 'bridge_inventory_probe',
|
||||
'inventory probe failures should be tracked under bridge_inventory_probe'
|
||||
);
|
||||
assert(
|
||||
(await probeRelay.messageQueue.getRetryCount('0xprobe-fail')) === 0,
|
||||
'inventory probe deferrals should not consume retry budget'
|
||||
);
|
||||
assert(
|
||||
probeRelay.messageQueue.getStats().queueSize === 1,
|
||||
'inventory probe deferrals should requeue the original message'
|
||||
);
|
||||
|
||||
await rm(tempDir, { recursive: true, force: true });
|
||||
|
||||
console.log('OK: relay service structure valid');
|
||||
process.exit(0);
|
||||
|
||||
Reference in New Issue
Block a user