fix(relay): defer on inventory probe errors; optional retry without budget

- MessageQueue: resetRetryCount and retry({ increment: false }) for shedder/inventory paths
- RelayService: treat bridge_inventory_probe like other soft-failure scopes; wrap inventory check in try/catch
- Token aggregation: catch DB pool lookup errors and fall back to live DODO path
- Mainnet WETH profile: START_BLOCK=latest; extend RELAY_SKIP_MESSAGE_IDS for backlog hygiene
- Extend relay test.js for deferred requeue behavior

Made-with: Cursor
This commit is contained in:
defiQUG
2026-04-12 11:35:18 -07:00
parent 06c4bebcb7
commit 8ec6af94d5
5 changed files with 67 additions and 22 deletions

View File

@@ -17,7 +17,7 @@ DEST_RELAY_BRIDGE_ALLOWLIST=0xF9A32F37099c582D28b4dE7Fca6eaC1e5259f939
RELAYER_PRIVATE_KEY=${PRIVATE_KEY}
RELAYER_ADDRESS=0x4A666F96fC8764181194447A7dFdb7d471b301C8
START_BLOCK=2706088
START_BLOCK=latest
POLL_INTERVAL=5000
CONFIRMATION_BLOCKS=1
MAX_RETRIES=3
@@ -28,9 +28,8 @@ RELAY_SHEDDING=0
RELAY_DELIVERY_ENABLED=1
RELAY_ENFORCE_BRIDGE_TOKEN_BALANCE=1
# Park the known oversized WETH release message until the Mainnet bridge inventory is restored.
# 2026-04-05 purge: drop the historical WETH backlog that reloaded into the paused queue so
# the worker reflects current forward-only monitoring instead of retaining undeliverable legacy sends.
RELAY_SKIP_MESSAGE_IDS=0xf718c9895c0a5442349996383184d017d2fa041af7aaeb9f0c0675d3ceed756b,0x19656fe758fc0e36ce5ce16ad9101e76c9eae19e5ed6bea08335dfb664215edc,0x249042e74fc322b2a8dc9fabe63b18094df11aaaed86b149287a6feea1b94157,0x263fa601b709c1c71a78004936eb195b43ed9da4dce23cf12dcfd24d40880375,0x300d38035aebd97bfbfa13737dc60ed23dba91991348259fd01ea1bc3109b260,0x3d3e6978c9e796b23fb8709fff4102131648825728ad0dd4197b98c6ba7a46cc,0x42fad60f851a43c6a52a216d211679d6fb786130f34dc5f26e7ddad350e7c83e,0x47b36fc517e7055efbc7408b17fca08f5fa41dbeea24d72e02f4995e22a4601f,0x4a4cab9082800ddb10ac60cae94dc2c5a6509134e6c8f915dc0ff636752b449d,0x523f8a202f069644488747dbd2a221cafdfac3f0a0fc7271685f5b23736fe8eb,0x63e56db9e3d6f2864e284b32c84ffa7118c65bee0559567cddf3288d812ef3cc,0x6ae76d1ec258666a1e6a95e63d911f4178f34ec312dcb88e3f237ba1288e6f79,0x75881681e8b2c793a8386f471cad44768c4e6f125e3f888978cc4c14d74049cf,0x770e246987c22c32fd2c9627c37e28316ec3390b33fb9cb9e9c3f21670af5ba3,0x779894438af9602eee92bc6c9c02475d6659ab9ed4bdd7250e6d0d331e628366,0x781eb1072c501efc10f92be6dc3355cf95d2f6f0c992468275d69fc5ded52a30,0xa447bdb1962882920ca8e966d7e8ba0cc016b80252bff5d5741317f0484a74fd,0xb11ca230b35d706eb0a43dc99c8806647aaeef29cdfa14762fa2a397bcbe82ae,0xc076289b0120a9b010e7851c4b00566ecdc8f46f2108d87ebebd042a005fb250,0xcc5ec02070b51ff927e540c62b2aa0c4b4f237efc8b34bbd6a5e8827f57f0a0b,0xd606e745392b1385870bcf5c7a1177833d2872a4e1a0beb33319e5b645be5b12
# Park the known oversized WETH release messages until Mainnet bridge inventory policy changes.
# Keep the worker on forward-only monitoring so restarts do not reload old underfunded backlog.
RELAY_SKIP_MESSAGE_IDS=0xf718c9895c0a5442349996383184d017d2fa041af7aaeb9f0c0675d3ceed756b,0x19656fe758fc0e36ce5ce16ad9101e76c9eae19e5ed6bea08335dfb664215edc,0x249042e74fc322b2a8dc9fabe63b18094df11aaaed86b149287a6feea1b94157,0x263fa601b709c1c71a78004936eb195b43ed9da4dce23cf12dcfd24d40880375,0x300d38035aebd97bfbfa13737dc60ed23dba91991348259fd01ea1bc3109b260,0x3d3e6978c9e796b23fb8709fff4102131648825728ad0dd4197b98c6ba7a46cc,0x42fad60f851a43c6a52a216d211679d6fb786130f34dc5f26e7ddad350e7c83e,0x47b36fc517e7055efbc7408b17fca08f5fa41dbeea24d72e02f4995e22a4601f,0x4a4cab9082800ddb10ac60cae94dc2c5a6509134e6c8f915dc0ff636752b449d,0x523f8a202f069644488747dbd2a221cafdfac3f0a0fc7271685f5b23736fe8eb,0x5571002da2d2202280b11df4c772978decd007c18e0eacc5f80839f4be95dc65,0x63e56db9e3d6f2864e284b32c84ffa7118c65bee0559567cddf3288d812ef3cc,0x6ae76d1ec258666a1e6a95e63d911f4178f34ec312dcb88e3f237ba1288e6f79,0x75881681e8b2c793a8386f471cad44768c4e6f125e3f888978cc4c14d74049cf,0x770e246987c22c32fd2c9627c37e28316ec3390b33fb9cb9e9c3f21670af5ba3,0x779894438af9602eee92bc6c9c02475d6659ab9ed4bdd7250e6d0d331e628366,0x781eb1072c501efc10f92be6dc3355cf95d2f6f0c992468275d69fc5ded52a30,0x7a49b584a1966c9c568036169b227a2293b74132a21bcfbd253b2e8d621f1dde,0xa447bdb1962882920ca8e966d7e8ba0cc016b80252bff5d5741317f0484a74fd,0xb11ca230b35d706eb0a43dc99c8806647aaeef29cdfa14762fa2a397bcbe82ae,0xc076289b0120a9b010e7851c4b00566ecdc8f46f2108d87ebebd042a005fb250,0xcc5ec02070b51ff927e540c62b2aa0c4b4f237efc8b34bbd6a5e8827f57f0a0b,0xd606e745392b1385870bcf5c7a1177833d2872a4e1a0beb33319e5b645be5b12
LOG_LEVEL=info

View File

@@ -65,14 +65,22 @@ export class MessageQueue {
async getRetryCount(messageId) {
return this.retryCounts.get(messageId) || 0;
}
async resetRetryCount(messageId) {
this.retryCounts.delete(messageId);
}
async retry(messageId) {
async retry(messageId, options = {}) {
const { increment = true } = options;
const count = this.retryCounts.get(messageId) || 0;
this.retryCounts.set(messageId, count + 1);
if (increment) {
this.retryCounts.set(messageId, count + 1);
}
const existingIndex = this.queue.findIndex(m => m.messageId === messageId);
if (existingIndex >= 0) {
this.logger.info(`Message ${messageId} retry count: ${count + 1}`);
const nextCount = increment ? count + 1 : count;
this.logger.info(`Message ${messageId} retry count: ${nextCount}`);
return;
}
@@ -84,7 +92,8 @@ export class MessageQueue {
this.inFlight.delete(messageId);
this.queue.push(messageData);
this.logger.info(`Message ${messageId} requeued. Retry count: ${count + 1}. Queue size: ${this.queue.length}`);
const nextCount = increment ? count + 1 : count;
this.logger.info(`Message ${messageId} requeued. Retry count: ${nextCount}. Queue size: ${this.queue.length}`);
}
getStats() {

View File

@@ -161,7 +161,11 @@ export class RelayService {
const lastSuccessMs = this.lastRelaySuccess && this.lastRelaySuccess.at ? Date.parse(this.lastRelaySuccess.at) : 0;
if (
this.lastError &&
(this.lastError.scope === 'relay_message' || this.lastError.scope === 'bridge_inventory') &&
(
this.lastError.scope === 'relay_message' ||
this.lastError.scope === 'bridge_inventory' ||
this.lastError.scope === 'bridge_inventory_probe'
) &&
Number.isFinite(lastErrorMs) &&
lastErrorMs > 0 &&
lastErrorMs >= lastSuccessMs
@@ -731,7 +735,8 @@ export class RelayService {
this.logger.warn(
`Destination relay router is paused; deferring ${messageId} (enable RELAY_SHEDDING=1 to pause off-chain too)`
);
await this.messageQueue.retry(messageId);
await this.messageQueue.resetRetryCount(messageId);
await this.messageQueue.retry(messageId, { increment: false });
await new Promise((r) => setTimeout(r, 15000));
return null;
}
@@ -807,11 +812,27 @@ export class RelayService {
};
});
const inventoryCheck = await this.ensureTargetBridgeInventory(
messageId,
targetBridge,
mappedTokenAmounts
);
let inventoryCheck;
try {
inventoryCheck = await this.ensureTargetBridgeInventory(
messageId,
targetBridge,
mappedTokenAmounts
);
} catch (inventoryProbeError) {
this.logger.warn(
`Bridge inventory probe failed for ${messageId}; deferring message until the next cycle`,
inventoryProbeError
);
this.recordError('bridge_inventory_probe', inventoryProbeError, {
message_id: messageId,
target_bridge: targetBridge
});
await this.messageQueue.resetRetryCount(messageId);
await this.messageQueue.retry(messageId, { increment: false });
await new Promise(resolve => setTimeout(resolve, this.config.retry.retryDelay));
return null;
}
if (!inventoryCheck.ok) {
const inventoryError = new Error(inventoryCheck.message);
this.logger.warn(inventoryCheck.message);
@@ -823,7 +844,8 @@ export class RelayService {
required_amount: inventoryCheck.requiredAmount.toString(),
shortfall: inventoryCheck.shortfall.toString()
});
await this.messageQueue.retry(messageId);
await this.messageQueue.resetRetryCount(messageId);
await this.messageQueue.retry(messageId, { increment: false });
await new Promise(resolve => setTimeout(resolve, this.config.retry.retryDelay));
return null;
}
@@ -896,7 +918,14 @@ export class RelayService {
target_bridge: targetBridge,
tx_hash: receipt.hash
};
if (this.lastError && (this.lastError.scope === 'relay_message' || this.lastError.scope === 'bridge_inventory')) {
if (
this.lastError &&
(
this.lastError.scope === 'relay_message' ||
this.lastError.scope === 'bridge_inventory' ||
this.lastError.scope === 'bridge_inventory_probe'
)
) {
this.lastError = null;
}

View File

@@ -107,6 +107,9 @@ assert(inFlightMessage?.messageId === queuedMessage.messageId, 'getNext should r
await queue.retry(queuedMessage.messageId);
const retriedMessage = await queue.getNext();
assert(retriedMessage?.messageId === queuedMessage.messageId, 'retry should requeue the original message payload');
await queue.resetRetryCount(queuedMessage.messageId);
await queue.retry(queuedMessage.messageId, { increment: false });
assert((await queue.getRetryCount(queuedMessage.messageId)) === 0, 'deferred requeue should not consume retry budget');
console.log('OK: relay service structure valid');
process.exit(0);

View File

@@ -45,10 +45,15 @@ function tokenFromCanonical(chainId: number, address: string): Token | null {
async function getPoolsByTokenWithFallback(chainId: number, address: string): Promise<LiquidityPool[]> {
const normalized = address.toLowerCase();
const resolution = resolveCanonicalQuoteAddress(chainId, normalized);
const dbPools = filterPoolsForExposure(
chainId,
await poolRepo.getPoolsByToken(chainId, resolution.lookupAddress)
);
let dbPools: LiquidityPool[] = [];
try {
dbPools = filterPoolsForExposure(
chainId,
await poolRepo.getPoolsByToken(chainId, resolution.lookupAddress)
);
} catch (error) {
logger.warn('DB pool lookup failed; using live DODO fallback', { chainId, address: resolution.lookupAddress, error });
}
if (dbPools.length > 0) {
return dbPools;
}