From 8ec6af94d514405b403e7c8bfba1d39f0a0456f0 Mon Sep 17 00:00:00 2001 From: defiQUG Date: Sun, 12 Apr 2026 11:35:18 -0700 Subject: [PATCH] fix(relay): defer on inventory probe errors; optional retry without budget - MessageQueue: resetRetryCount and retry({ increment: false }) for shedder/inventory paths - RelayService: treat bridge_inventory_probe like other soft-failure scopes; wrap inventory check in try/catch - Token aggregation: catch DB pool lookup errors and fall back to live DODO path - Mainnet WETH profile: START_BLOCK=latest; extend RELAY_SKIP_MESSAGE_IDS for backlog hygiene - Extend relay test.js for deferred requeue behavior Made-with: Cursor --- services/relay/.env.mainnet-weth | 9 ++-- services/relay/src/MessageQueue.js | 17 +++++-- services/relay/src/RelayService.js | 47 +++++++++++++++---- services/relay/test.js | 3 ++ .../src/api/routes/tokens.ts | 13 +++-- 5 files changed, 67 insertions(+), 22 deletions(-) diff --git a/services/relay/.env.mainnet-weth b/services/relay/.env.mainnet-weth index 2911af2..7907851 100644 --- a/services/relay/.env.mainnet-weth +++ b/services/relay/.env.mainnet-weth @@ -17,7 +17,7 @@ DEST_RELAY_BRIDGE_ALLOWLIST=0xF9A32F37099c582D28b4dE7Fca6eaC1e5259f939 RELAYER_PRIVATE_KEY=${PRIVATE_KEY} RELAYER_ADDRESS=0x4A666F96fC8764181194447A7dFdb7d471b301C8 -START_BLOCK=2706088 +START_BLOCK=latest POLL_INTERVAL=5000 CONFIRMATION_BLOCKS=1 MAX_RETRIES=3 @@ -28,9 +28,8 @@ RELAY_SHEDDING=0 RELAY_DELIVERY_ENABLED=1 RELAY_ENFORCE_BRIDGE_TOKEN_BALANCE=1 -# Park the known oversized WETH release message until the Mainnet bridge inventory is restored. -# 2026-04-05 purge: drop the historical WETH backlog that reloaded into the paused queue so -# the worker reflects current forward-only monitoring instead of retaining undeliverable legacy sends. -RELAY_SKIP_MESSAGE_IDS=0xf718c9895c0a5442349996383184d017d2fa041af7aaeb9f0c0675d3ceed756b,0x19656fe758fc0e36ce5ce16ad9101e76c9eae19e5ed6bea08335dfb664215edc,0x249042e74fc322b2a8dc9fabe63b18094df11aaaed86b149287a6feea1b94157,0x263fa601b709c1c71a78004936eb195b43ed9da4dce23cf12dcfd24d40880375,0x300d38035aebd97bfbfa13737dc60ed23dba91991348259fd01ea1bc3109b260,0x3d3e6978c9e796b23fb8709fff4102131648825728ad0dd4197b98c6ba7a46cc,0x42fad60f851a43c6a52a216d211679d6fb786130f34dc5f26e7ddad350e7c83e,0x47b36fc517e7055efbc7408b17fca08f5fa41dbeea24d72e02f4995e22a4601f,0x4a4cab9082800ddb10ac60cae94dc2c5a6509134e6c8f915dc0ff636752b449d,0x523f8a202f069644488747dbd2a221cafdfac3f0a0fc7271685f5b23736fe8eb,0x63e56db9e3d6f2864e284b32c84ffa7118c65bee0559567cddf3288d812ef3cc,0x6ae76d1ec258666a1e6a95e63d911f4178f34ec312dcb88e3f237ba1288e6f79,0x75881681e8b2c793a8386f471cad44768c4e6f125e3f888978cc4c14d74049cf,0x770e246987c22c32fd2c9627c37e28316ec3390b33fb9cb9e9c3f21670af5ba3,0x779894438af9602eee92bc6c9c02475d6659ab9ed4bdd7250e6d0d331e628366,0x781eb1072c501efc10f92be6dc3355cf95d2f6f0c992468275d69fc5ded52a30,0xa447bdb1962882920ca8e966d7e8ba0cc016b80252bff5d5741317f0484a74fd,0xb11ca230b35d706eb0a43dc99c8806647aaeef29cdfa14762fa2a397bcbe82ae,0xc076289b0120a9b010e7851c4b00566ecdc8f46f2108d87ebebd042a005fb250,0xcc5ec02070b51ff927e540c62b2aa0c4b4f237efc8b34bbd6a5e8827f57f0a0b,0xd606e745392b1385870bcf5c7a1177833d2872a4e1a0beb33319e5b645be5b12 +# Park the known oversized WETH release messages until Mainnet bridge inventory policy changes. +# Keep the worker on forward-only monitoring so restarts do not reload old underfunded backlog. +RELAY_SKIP_MESSAGE_IDS=0xf718c9895c0a5442349996383184d017d2fa041af7aaeb9f0c0675d3ceed756b,0x19656fe758fc0e36ce5ce16ad9101e76c9eae19e5ed6bea08335dfb664215edc,0x249042e74fc322b2a8dc9fabe63b18094df11aaaed86b149287a6feea1b94157,0x263fa601b709c1c71a78004936eb195b43ed9da4dce23cf12dcfd24d40880375,0x300d38035aebd97bfbfa13737dc60ed23dba91991348259fd01ea1bc3109b260,0x3d3e6978c9e796b23fb8709fff4102131648825728ad0dd4197b98c6ba7a46cc,0x42fad60f851a43c6a52a216d211679d6fb786130f34dc5f26e7ddad350e7c83e,0x47b36fc517e7055efbc7408b17fca08f5fa41dbeea24d72e02f4995e22a4601f,0x4a4cab9082800ddb10ac60cae94dc2c5a6509134e6c8f915dc0ff636752b449d,0x523f8a202f069644488747dbd2a221cafdfac3f0a0fc7271685f5b23736fe8eb,0x5571002da2d2202280b11df4c772978decd007c18e0eacc5f80839f4be95dc65,0x63e56db9e3d6f2864e284b32c84ffa7118c65bee0559567cddf3288d812ef3cc,0x6ae76d1ec258666a1e6a95e63d911f4178f34ec312dcb88e3f237ba1288e6f79,0x75881681e8b2c793a8386f471cad44768c4e6f125e3f888978cc4c14d74049cf,0x770e246987c22c32fd2c9627c37e28316ec3390b33fb9cb9e9c3f21670af5ba3,0x779894438af9602eee92bc6c9c02475d6659ab9ed4bdd7250e6d0d331e628366,0x781eb1072c501efc10f92be6dc3355cf95d2f6f0c992468275d69fc5ded52a30,0x7a49b584a1966c9c568036169b227a2293b74132a21bcfbd253b2e8d621f1dde,0xa447bdb1962882920ca8e966d7e8ba0cc016b80252bff5d5741317f0484a74fd,0xb11ca230b35d706eb0a43dc99c8806647aaeef29cdfa14762fa2a397bcbe82ae,0xc076289b0120a9b010e7851c4b00566ecdc8f46f2108d87ebebd042a005fb250,0xcc5ec02070b51ff927e540c62b2aa0c4b4f237efc8b34bbd6a5e8827f57f0a0b,0xd606e745392b1385870bcf5c7a1177833d2872a4e1a0beb33319e5b645be5b12 LOG_LEVEL=info diff --git a/services/relay/src/MessageQueue.js b/services/relay/src/MessageQueue.js index 7718e69..43cdf21 100644 --- a/services/relay/src/MessageQueue.js +++ b/services/relay/src/MessageQueue.js @@ -65,14 +65,22 @@ export class MessageQueue { async getRetryCount(messageId) { return this.retryCounts.get(messageId) || 0; } + + async resetRetryCount(messageId) { + this.retryCounts.delete(messageId); + } - async retry(messageId) { + async retry(messageId, options = {}) { + const { increment = true } = options; const count = this.retryCounts.get(messageId) || 0; - this.retryCounts.set(messageId, count + 1); + if (increment) { + this.retryCounts.set(messageId, count + 1); + } const existingIndex = this.queue.findIndex(m => m.messageId === messageId); if (existingIndex >= 0) { - this.logger.info(`Message ${messageId} retry count: ${count + 1}`); + const nextCount = increment ? count + 1 : count; + this.logger.info(`Message ${messageId} retry count: ${nextCount}`); return; } @@ -84,7 +92,8 @@ export class MessageQueue { this.inFlight.delete(messageId); this.queue.push(messageData); - this.logger.info(`Message ${messageId} requeued. Retry count: ${count + 1}. Queue size: ${this.queue.length}`); + const nextCount = increment ? count + 1 : count; + this.logger.info(`Message ${messageId} requeued. Retry count: ${nextCount}. Queue size: ${this.queue.length}`); } getStats() { diff --git a/services/relay/src/RelayService.js b/services/relay/src/RelayService.js index 4647b11..8758b51 100644 --- a/services/relay/src/RelayService.js +++ b/services/relay/src/RelayService.js @@ -161,7 +161,11 @@ export class RelayService { const lastSuccessMs = this.lastRelaySuccess && this.lastRelaySuccess.at ? Date.parse(this.lastRelaySuccess.at) : 0; if ( this.lastError && - (this.lastError.scope === 'relay_message' || this.lastError.scope === 'bridge_inventory') && + ( + this.lastError.scope === 'relay_message' || + this.lastError.scope === 'bridge_inventory' || + this.lastError.scope === 'bridge_inventory_probe' + ) && Number.isFinite(lastErrorMs) && lastErrorMs > 0 && lastErrorMs >= lastSuccessMs @@ -731,7 +735,8 @@ export class RelayService { this.logger.warn( `Destination relay router is paused; deferring ${messageId} (enable RELAY_SHEDDING=1 to pause off-chain too)` ); - await this.messageQueue.retry(messageId); + await this.messageQueue.resetRetryCount(messageId); + await this.messageQueue.retry(messageId, { increment: false }); await new Promise((r) => setTimeout(r, 15000)); return null; } @@ -807,11 +812,27 @@ export class RelayService { }; }); - const inventoryCheck = await this.ensureTargetBridgeInventory( - messageId, - targetBridge, - mappedTokenAmounts - ); + let inventoryCheck; + try { + inventoryCheck = await this.ensureTargetBridgeInventory( + messageId, + targetBridge, + mappedTokenAmounts + ); + } catch (inventoryProbeError) { + this.logger.warn( + `Bridge inventory probe failed for ${messageId}; deferring message until the next cycle`, + inventoryProbeError + ); + this.recordError('bridge_inventory_probe', inventoryProbeError, { + message_id: messageId, + target_bridge: targetBridge + }); + await this.messageQueue.resetRetryCount(messageId); + await this.messageQueue.retry(messageId, { increment: false }); + await new Promise(resolve => setTimeout(resolve, this.config.retry.retryDelay)); + return null; + } if (!inventoryCheck.ok) { const inventoryError = new Error(inventoryCheck.message); this.logger.warn(inventoryCheck.message); @@ -823,7 +844,8 @@ export class RelayService { required_amount: inventoryCheck.requiredAmount.toString(), shortfall: inventoryCheck.shortfall.toString() }); - await this.messageQueue.retry(messageId); + await this.messageQueue.resetRetryCount(messageId); + await this.messageQueue.retry(messageId, { increment: false }); await new Promise(resolve => setTimeout(resolve, this.config.retry.retryDelay)); return null; } @@ -896,7 +918,14 @@ export class RelayService { target_bridge: targetBridge, tx_hash: receipt.hash }; - if (this.lastError && (this.lastError.scope === 'relay_message' || this.lastError.scope === 'bridge_inventory')) { + if ( + this.lastError && + ( + this.lastError.scope === 'relay_message' || + this.lastError.scope === 'bridge_inventory' || + this.lastError.scope === 'bridge_inventory_probe' + ) + ) { this.lastError = null; } diff --git a/services/relay/test.js b/services/relay/test.js index 625b27b..3c21902 100644 --- a/services/relay/test.js +++ b/services/relay/test.js @@ -107,6 +107,9 @@ assert(inFlightMessage?.messageId === queuedMessage.messageId, 'getNext should r await queue.retry(queuedMessage.messageId); const retriedMessage = await queue.getNext(); assert(retriedMessage?.messageId === queuedMessage.messageId, 'retry should requeue the original message payload'); +await queue.resetRetryCount(queuedMessage.messageId); +await queue.retry(queuedMessage.messageId, { increment: false }); +assert((await queue.getRetryCount(queuedMessage.messageId)) === 0, 'deferred requeue should not consume retry budget'); console.log('OK: relay service structure valid'); process.exit(0); diff --git a/services/token-aggregation/src/api/routes/tokens.ts b/services/token-aggregation/src/api/routes/tokens.ts index de7dde5..c3f76a7 100644 --- a/services/token-aggregation/src/api/routes/tokens.ts +++ b/services/token-aggregation/src/api/routes/tokens.ts @@ -45,10 +45,15 @@ function tokenFromCanonical(chainId: number, address: string): Token | null { async function getPoolsByTokenWithFallback(chainId: number, address: string): Promise { const normalized = address.toLowerCase(); const resolution = resolveCanonicalQuoteAddress(chainId, normalized); - const dbPools = filterPoolsForExposure( - chainId, - await poolRepo.getPoolsByToken(chainId, resolution.lookupAddress) - ); + let dbPools: LiquidityPool[] = []; + try { + dbPools = filterPoolsForExposure( + chainId, + await poolRepo.getPoolsByToken(chainId, resolution.lookupAddress) + ); + } catch (error) { + logger.warn('DB pool lookup failed; using live DODO fallback', { chainId, address: resolution.lookupAddress, error }); + } if (dbPools.length > 0) { return dbPools; }