feat(token-aggregation): add historical pricing context, backfill, and indexer hardening
Some checks failed
CI/CD Pipeline / Solidity Contracts (pull_request) Failing after 1m6s
CI/CD Pipeline / Security Scanning (pull_request) Successful in 12m42s
CI/CD Pipeline / Lint and Format (pull_request) Failing after 42s
CI/CD Pipeline / Terraform Validation (pull_request) Failing after 25s
CI/CD Pipeline / Kubernetes Validation (pull_request) Successful in 27s
HYBX OMNL TypeScript & anchor / token-aggregation build + reconcile artifact (pull_request) Failing after 49s
OMNL reconcile anchor / Run omnl:reconcile and upload artifacts (pull_request) Failing after 26s
Validation / validate-genesis (pull_request) Successful in 36s
Validation / validate-terraform (pull_request) Failing after 28s
Validation / validate-kubernetes (pull_request) Failing after 12s
Validation / validate-smart-contracts (pull_request) Failing after 13s
Validation / validate-security (pull_request) Failing after 1m39s
Validation / validate-documentation (pull_request) Failing after 18s

This commit is contained in:
defiQUG
2026-04-25 23:45:07 -07:00
parent c3b1b2cebc
commit fcd55aa9c4
18 changed files with 1963 additions and 1095 deletions

View File

@@ -26,7 +26,7 @@ log_ok "Built"
# Package
log_info "Creating package..."
PACKAGE_ITEMS=(dist src package.json tsconfig.json)
PACKAGE_ITEMS=(dist src package.json package-lock.json tsconfig.json scripts)
for optional in .env.example .env; do
[ -e "$SCRIPT_DIR/$optional" ] && PACKAGE_ITEMS+=("$optional")
done
@@ -41,10 +41,10 @@ ssh "$PROXMOX_USER@$PROXMOX_HOST" "
if ! command -v node &>/dev/null; then
curl -fsSL https://deb.nodesource.com/setup_20.x | bash - && apt-get install -y nodejs postgresql-client
fi
command -v pnpm &>/dev/null || npm install -g pnpm@10
mkdir -p /opt/token-aggregation && cd /opt/token-aggregation
tar xzf /tmp/token-agg.tar.gz
pnpm install --prod
npm ci --omit=dev
npm_config_build_from_source=true npm rebuild bcrypt >/dev/null 2>&1 || true
if [ ! -f .env ] && [ -f .env.example ]; then
cp .env.example .env
fi
@@ -62,7 +62,8 @@ Type=simple
User=root
WorkingDirectory=/opt/token-aggregation
Environment=\"NODE_ENV=production\"
ExecStart=/usr/bin/pnpm start
Environment=\"PORT=$SERVICE_PORT\"
ExecStart=/usr/bin/node /opt/token-aggregation/dist/index.js
Restart=on-failure
RestartSec=5s

View File

@@ -177,6 +177,39 @@ OHLCV (candlestick) data for charts.
**Response:** `{ chainId, tokenAddress, interval, data: OHLCV[] }`
### GET /api/v1/tokens/:address/price-at
Point-in-time USD valuation for a token at or immediately before a requested timestamp. Intended for explorer transaction detail pages where the transfer-time value should stay anchored to the transaction, while wallet/address/token overview pages can continue showing current market value.
**Query:**
| Param | Type | Required | Description |
|-------|------|----------|-------------|
| chainId | number | yes | 138 or 651940 |
| timestamp | ISO date | yes | Transfer time to price against |
**Response:** `{ chainId, tokenAddress, requestedTimestamp, effectiveTimestamp?, priceUsd?, source }`
- `source` is one of `swap_event`, `ohlcv_5m`, `ohlcv_15m`, `ohlcv_1h`, `ohlcv_4h`, `ohlcv_24h`, `current_market_fallback`, `canonical_fallback`, or `unavailable`
- `effectiveTimestamp` is the candle/snapshot time actually used for valuation
### GET /api/v1/tokens/:address/pricing-context
Snap-ready combined pricing response for one token. Returns the current market valuation and, when `timestamp` is supplied, a transfer-time historical valuation in the same payload.
**Query:**
| Param | Type | Required | Description |
|-------|------|----------|-------------|
| chainId | number | yes | 138 or 651940 |
| timestamp | ISO date | no | Transfer time to price against |
**Response:** `{ chainId, tokenAddress, current, historical? }`
- `current` includes `{ chainId, tokenAddress, priceUsd?, asOf?, sourceLayer, stale, marketLastUpdated? }`
- `historical` includes `{ chainId, tokenAddress, requestedTimestamp, effectiveTimestamp?, priceUsd?, source, locked }`
- `locked` is `true` only when the historical price came from `swap_event` or an `ohlcv_*` source; fallback values are returned with `locked: false`
### GET /api/v1/tokens/:address/signals
Trending/signals (e.g. CoinGecko trending rank) for a token.

File diff suppressed because it is too large Load Diff

View File

@@ -2,7 +2,7 @@
"name": "token-aggregation-service",
"version": "1.0.0",
"description": "Token aggregation service for ChainID 138 and 651940 with external API enrichment",
"packageManager": "pnpm@10.0.0",
"packageManager": "npm@10.8.2",
"main": "dist/index.js",
"scripts": {
"build": "tsc",
@@ -12,38 +12,48 @@
"test:ci": "jest --runInBand",
"test:omnl": "jest --runInBand --testPathPattern=omnl",
"lint": "eslint src --ext .ts",
"backfill:historical-pricing": "node dist/backfill-historical-pricing.js",
"generate:route-matrix:v2": "ts-node scripts/generate-route-matrix-v2.ts",
"migrate": "node -r dotenv/config dist/database/migrations.js",
"example:partner-payloads": "node scripts/resolve-partner-payloads-example.mjs",
"omnl:reconcile": "node scripts/omnl-reconcile-report.mjs"
},
"dependencies": {
"axios": "^1.13.5",
"bcrypt": "^5.1.1",
"axios": "^1.15.2",
"bcrypt": "^6.0.0",
"compression": "^1.8.1",
"cors": "^2.8.6",
"dotenv": "^16.6.1",
"ethers": "^6.16.0",
"express": "^4.22.1",
"express-rate-limit": "^7.5.1",
"express": "^5.1.0",
"express-rate-limit": "^8.4.1",
"jsonwebtoken": "^9.0.3",
"node-cron": "^3.0.3",
"node-cron": "^4.2.1",
"pg": "^8.18.0",
"winston": "^3.19.0"
},
"overrides": {
"ajv": "^6.14.0",
"brace-expansion": "^2.0.3",
"flatted": "^3.4.1",
"follow-redirects": "1.16.0",
"handlebars": "^4.7.9",
"minimatch": "^9.0.7",
"picomatch": "^2.3.2"
},
"devDependencies": {
"@types/bcrypt": "^5.0.2",
"@types/bcrypt": "^6.0.0",
"@types/compression": "^1.8.1",
"@types/cookie-parser": "^1.4.10",
"@types/cors": "^2.8.19",
"@types/express": "^4.17.25",
"@types/express": "^5.0.6",
"@types/jest": "^29.5.14",
"@types/jsonwebtoken": "^9.0.10",
"@types/node": "^20.19.33",
"@types/node-cron": "^3.0.11",
"@types/pg": "^8.16.0",
"@typescript-eslint/eslint-plugin": "^6.21.0",
"@typescript-eslint/parser": "^6.21.0",
"@typescript-eslint/eslint-plugin": "^8.59.0",
"@typescript-eslint/parser": "^8.59.0",
"eslint": "^8.57.1",
"jest": "^29.7.0",
"ts-jest": "^29.4.6",

View File

@@ -0,0 +1,49 @@
import * as dotenv from 'dotenv';
import path from 'path';
import { existsSync } from 'fs';
import { HistoricalPricingBackfillService } from '../src/services/historical-pricing-backfill';
const rootEnvCandidates = [
path.resolve(__dirname, '../../../.env'),
path.resolve(__dirname, '../../../../.env'),
];
for (const candidate of rootEnvCandidates) {
if (existsSync(candidate)) {
dotenv.config({ path: candidate });
break;
}
}
dotenv.config();
function readInt(name: string, fallback: number): number {
const raw = String(process.env[name] || '').trim();
if (!raw) return fallback;
const parsed = Number(raw);
return Number.isFinite(parsed) ? parsed : fallback;
}
async function main(): Promise<void> {
const chainId = readInt('BACKFILL_CHAIN_ID', 138);
const days = readInt('BACKFILL_DAYS', 30);
const chunkSize = readInt('BACKFILL_CHUNK_SIZE', 2500);
const poolLimit = readInt('BACKFILL_POOL_LIMIT', 500);
const service = new HistoricalPricingBackfillService();
const summary = await service.backfillChain({
chainId,
days,
chunkSize,
poolLimit,
});
// eslint-disable-next-line no-console
console.log(JSON.stringify(summary, null, 2));
}
main().catch((error) => {
// eslint-disable-next-line no-console
console.error(error);
process.exit(1);
});

View File

@@ -91,13 +91,21 @@ CREATE TABLE IF NOT EXISTS swap_events (
id BIGSERIAL PRIMARY KEY,
chain_id INTEGER NOT NULL,
pool_address TEXT NOT NULL,
transaction_hash TEXT,
block_number BIGINT,
log_index INTEGER,
token0_address TEXT NOT NULL,
token1_address TEXT NOT NULL,
amount0_in NUMERIC(78, 0) NOT NULL DEFAULT 0,
amount1_in NUMERIC(78, 0) NOT NULL DEFAULT 0,
amount0_out NUMERIC(78, 0) NOT NULL DEFAULT 0,
amount1_out NUMERIC(78, 0) NOT NULL DEFAULT 0,
amount_usd NUMERIC(38, 18) NOT NULL DEFAULT 0,
price_usd NUMERIC(38, 18),
transaction_hash TEXT,
log_index INTEGER,
block_number BIGINT,
token0_price_usd NUMERIC(38, 18),
token1_price_usd NUMERIC(38, 18),
sender TEXT,
to_address TEXT,
timestamp TIMESTAMPTZ NOT NULL
);
@@ -105,13 +113,9 @@ CREATE INDEX IF NOT EXISTS idx_swap_events_pool_time
ON swap_events (chain_id, pool_address, timestamp DESC);
CREATE INDEX IF NOT EXISTS idx_swap_events_token_time
ON swap_events (chain_id, token0_address, token1_address, timestamp DESC);
CREATE UNIQUE INDEX IF NOT EXISTS idx_swap_events_unique_log
ON swap_events (
chain_id,
pool_address,
COALESCE(transaction_hash, ''),
COALESCE(log_index, -1)
);
DROP INDEX IF EXISTS idx_swap_events_unique_log;
CREATE UNIQUE INDEX IF NOT EXISTS idx_swap_events_chain_tx_log
ON swap_events (chain_id, transaction_hash, log_index);
CREATE TABLE IF NOT EXISTS token_ohlcv (
id BIGSERIAL PRIMARY KEY,

View File

@@ -67,6 +67,10 @@ interface CoinGeckoTrending {
}>;
}
interface CoinGeckoMarketChartRangeResponse {
prices?: Array<[number, number]>;
}
// Chain ID to CoinGecko platform ID mapping
const CHAIN_TO_PLATFORM: Record<number, string> = {
1: 'ethereum',
@@ -79,6 +83,17 @@ const CHAIN_TO_PLATFORM: Record<number, string> = {
// Note: 138 and 651940 are likely not supported, will return null gracefully
};
const REFERENCE_SYMBOL_TO_COIN_ID: Record<string, string> = {
ETH: 'ethereum',
BTC: 'bitcoin',
BNB: 'binancecoin',
POL: 'matic-network',
AVAX: 'avalanche-2',
CELO: 'celo',
CRO: 'crypto-com-chain',
XDAI: 'xdai',
};
export class CoinGeckoAdapter implements ExternalApiAdapter {
private api: AxiosInstance;
private apiKey?: string;
@@ -322,4 +337,73 @@ export class CoinGeckoAdapter implements ExternalApiAdapter {
return [];
}
}
async getHistoricalReferencePrice(
referenceSymbol: string,
timestamp: Date
): Promise<{ priceUsd: number; effectiveTimestamp: Date } | null> {
const symbol = referenceSymbol.trim().toUpperCase();
const coinId = REFERENCE_SYMBOL_TO_COIN_ID[symbol];
if (!coinId) {
return null;
}
const from = Math.floor((timestamp.getTime() - (6 * 60 * 60 * 1000)) / 1000);
const to = Math.floor((timestamp.getTime() + (6 * 60 * 60 * 1000)) / 1000);
const cacheKey = `history_${coinId}_${from}_${to}`;
const cached = this.cache.get(cacheKey);
if (cached && cached.expiresAt > new Date()) {
return cached.data as { priceUsd: number; effectiveTimestamp: Date } | null;
}
try {
const response = await this.api.get<CoinGeckoMarketChartRangeResponse>(
`/coins/${coinId}/market_chart/range`,
{
params: {
vs_currency: 'usd',
from,
to,
},
}
);
const prices = response.data.prices || [];
if (prices.length === 0) {
return null;
}
const targetMs = timestamp.getTime();
let bestPoint: [number, number] | null = null;
let bestDistance = Number.POSITIVE_INFINITY;
for (const point of prices) {
if (point[0] > targetMs) {
continue;
}
const distance = Math.abs(point[0] - targetMs);
if (distance < bestDistance) {
bestDistance = distance;
bestPoint = point;
}
}
if (!bestPoint || !Number.isFinite(bestPoint[1])) {
return null;
}
const resolved = {
priceUsd: bestPoint[1],
effectiveTimestamp: new Date(bestPoint[0]),
};
this.cache.set(cacheKey, {
data: resolved,
expiresAt: new Date(Date.now() + 60 * 60 * 1000),
});
return resolved;
} catch (error) {
logger.error(`Error fetching CoinGecko historical reference price for ${referenceSymbol}:`, error);
return null;
}
}
}

View File

@@ -8,6 +8,10 @@ import { appendCentralAudit } from '../central-audit';
const router: Router = Router();
const adminRepo = new AdminRepository();
function firstString(value: string | string[] | undefined): string | undefined {
return Array.isArray(value) ? value[0] : value;
}
// Authentication routes (public)
router.post('/auth/login', async (req: Request, res: Response) => {
try {
@@ -110,7 +114,7 @@ router.post('/api-keys', requireRole('admin', 'super_admin'), async (req: AuthRe
router.put('/api-keys/:id', requireRole('admin', 'super_admin'), async (req: AuthRequest, res: Response) => {
try {
const id = parseInt(req.params.id, 10);
const id = parseInt(firstString(req.params.id) || '', 10);
const updates: { isActive?: boolean; rateLimitPerMinute?: number; expiresAt?: Date } = {};
if (req.body.isActive !== undefined) updates.isActive = req.body.isActive;
@@ -142,7 +146,7 @@ router.put('/api-keys/:id', requireRole('admin', 'super_admin'), async (req: Aut
router.delete('/api-keys/:id', requireRole('admin', 'super_admin'), async (req: AuthRequest, res: Response) => {
try {
const id = parseInt(req.params.id, 10);
const id = parseInt(firstString(req.params.id) || '', 10);
const oldKey = await adminRepo.getApiKey(id);
await adminRepo.deleteApiKey(id);
@@ -237,7 +241,7 @@ router.post('/endpoints', requireRole('admin', 'super_admin'), async (req: AuthR
router.put('/endpoints/:id', requireRole('admin', 'super_admin'), async (req: AuthRequest, res: Response) => {
try {
const id = parseInt(req.params.id, 10);
const id = parseInt(firstString(req.params.id) || '', 10);
const updates: { endpointUrl?: string; isActive?: boolean; isPrimary?: boolean } = {};
if (req.body.endpointUrl !== undefined) updates.endpointUrl = req.body.endpointUrl;

View File

@@ -166,7 +166,7 @@ router.get('/omnl/ipsas/fineract-compare', omnlSensitiveRouteGuard, async (_req:
*/
router.get('/omnl/ipsas/layer/:layer', (req: Request, res: Response) => {
try {
const layer = req.params.layer;
const layer = Array.isArray(req.params.layer) ? req.params.layer[0] : req.params.layer;
const registry = loadIpsasRegistry();
const hint = registry.monetaryLayerHints[layer];
if (!hint) {

View File

@@ -12,6 +12,8 @@ const mockGetLiveDodoPools = jest.fn();
const mockResolveTokenDisplay = jest.fn();
const mockResolvePoolTokenDisplays = jest.fn();
const mockGetTokenByContract = jest.fn();
const mockGetOHLCV = jest.fn();
const mockGetHistoricalReferencePrice = jest.fn();
jest.mock('../../database/repositories/token-repo', () => ({
TokenRepository: jest.fn().mockImplementation(() => ({
@@ -36,7 +38,7 @@ jest.mock('../../database/repositories/pool-repo', () => ({
jest.mock('../../indexer/ohlcv-generator', () => ({
OHLCVGenerator: jest.fn().mockImplementation(() => ({
getOHLCV: jest.fn().mockResolvedValue([]),
getOHLCV: mockGetOHLCV,
})),
}));
@@ -46,6 +48,7 @@ jest.mock('../../adapters/coingecko-adapter', () => ({
CoinGeckoAdapter: jest.fn().mockImplementation(() => ({
getTokenByContract: mockGetTokenByContract,
getMarketData: mockGetMarketDataAdapter,
getHistoricalReferencePrice: mockGetHistoricalReferencePrice,
getTrending: jest.fn().mockResolvedValue([]),
})),
}));
@@ -110,6 +113,7 @@ describe('Tokens API', () => {
mockGetMarketData.mockResolvedValue(null);
mockGetPoolsByToken.mockResolvedValue([]);
mockGetPool.mockResolvedValue(null);
mockGetOHLCV.mockResolvedValue([]);
mockGetLiveDodoPools.mockResolvedValue([]);
mockResolveTokenDisplay.mockResolvedValue({
address: '',
@@ -123,6 +127,7 @@ describe('Tokens API', () => {
token1: { address: '', symbol: 'UNKNOWN', name: 'Unknown Token', source: 'fallback' },
});
mockGetTokenByContract.mockResolvedValue(null);
mockGetHistoricalReferencePrice.mockResolvedValue(null);
});
afterAll(async () => {
@@ -216,4 +221,170 @@ describe('Tokens API', () => {
});
expect(body.token.canonicalLiquidity).toBeUndefined();
});
it('returns historical price snapshots for a token at a requested timestamp', async () => {
const weth = getCanonicalTokenBySymbol(138, 'WETH');
expect(weth?.addresses[138]).toBeTruthy();
const wethAddress = String(weth?.addresses[138]).toLowerCase();
mockGetOHLCV
.mockResolvedValueOnce([
{
timestamp: new Date('2026-04-26T01:30:00.000Z'),
open: 2488,
high: 2491,
low: 2487,
close: 2490,
volume: 10,
volumeUsd: 24900,
},
]);
const res = await fetch(
`${baseUrl}/api/v1/tokens/${wethAddress}/price-at?chainId=138&timestamp=${encodeURIComponent('2026-04-26T01:33:02.000Z')}`
);
expect(res.status).toBe(200);
const body = (await res.json()) as Record<string, any>;
expect(body).toMatchObject({
chainId: 138,
tokenAddress: wethAddress,
requestedTimestamp: '2026-04-26T01:33:02.000Z',
effectiveTimestamp: '2026-04-26T01:30:00.000Z',
priceUsd: 2490,
source: 'ohlcv_5m',
});
});
it('returns pricing context with current and locked historical snapshots', async () => {
const weth = getCanonicalTokenBySymbol(138, 'WETH');
expect(weth?.addresses[138]).toBeTruthy();
const wethAddress = String(weth?.addresses[138]).toLowerCase();
mockGetOHLCV.mockResolvedValueOnce([
{
timestamp: new Date('2026-04-26T01:30:00.000Z'),
open: 2488,
high: 2491,
low: 2487,
close: 2490,
volume: 10,
volumeUsd: 24900,
},
]);
mockGetMarketData.mockResolvedValue({
chainId: 138,
tokenAddress: wethAddress,
priceUsd: 2490,
volume24h: 1000,
volume7d: 2000,
volume30d: 3000,
liquidityUsd: 4000,
holdersCount: 5,
transfers24h: 6,
lastUpdated: new Date('2026-04-26T03:31:01.988Z'),
});
const res = await fetch(
`${baseUrl}/api/v1/tokens/${wethAddress}/pricing-context?chainId=138&timestamp=${encodeURIComponent('2026-04-26T01:33:02.000Z')}`
);
expect(res.status).toBe(200);
const body = (await res.json()) as Record<string, any>;
expect(body).toMatchObject({
chainId: 138,
tokenAddress: wethAddress,
current: {
chainId: 138,
tokenAddress: wethAddress,
priceUsd: 2490,
stale: false,
},
historical: {
chainId: 138,
tokenAddress: wethAddress,
requestedTimestamp: '2026-04-26T01:33:02.000Z',
effectiveTimestamp: '2026-04-26T01:30:00.000Z',
priceUsd: 2490,
source: 'ohlcv_5m',
locked: true,
},
});
expect(body.current.asOf).toEqual(expect.any(String));
expect(body.current.sourceLayer).toEqual(expect.any(String));
});
it('falls back to canonical CoinGecko history for native reference assets when local OHLCV is unavailable', async () => {
const weth = getCanonicalTokenBySymbol(138, 'WETH');
expect(weth?.addresses[138]).toBeTruthy();
const wethAddress = String(weth?.addresses[138]).toLowerCase();
mockGetHistoricalReferencePrice.mockResolvedValue({
priceUsd: 2484.12,
effectiveTimestamp: new Date('2026-04-26T01:31:00.000Z'),
});
const res = await fetch(
`${baseUrl}/api/v1/tokens/${wethAddress}/pricing-context?chainId=138&timestamp=${encodeURIComponent('2026-04-26T01:33:02.000Z')}`
);
expect(res.status).toBe(200);
const body = (await res.json()) as Record<string, any>;
expect(body.historical).toMatchObject({
chainId: 138,
tokenAddress: wethAddress,
requestedTimestamp: '2026-04-26T01:33:02.000Z',
effectiveTimestamp: '2026-04-26T01:31:00.000Z',
priceUsd: 2484.12,
source: 'coingecko_history',
locked: true,
});
});
it('does not lock stale OHLCV candles that are too far before the requested timestamp', async () => {
const usdt = getCanonicalTokenBySymbol(138, 'USDT');
expect(usdt?.addresses[138]).toBeTruthy();
const usdtAddress = String(usdt?.addresses[138]).toLowerCase();
mockGetOHLCV.mockResolvedValue([
{
timestamp: new Date('2026-04-11T00:00:00.000Z'),
open: 1,
high: 1,
low: 1,
close: 1,
volume: 15,
volumeUsd: 15,
},
]);
mockGetHistoricalReferencePrice.mockResolvedValue(null);
mockGetMarketData.mockResolvedValue({
chainId: 138,
tokenAddress: usdtAddress,
priceUsd: 1,
volume24h: 0,
volume7d: 0,
volume30d: 0,
liquidityUsd: 0,
holdersCount: 0,
transfers24h: 0,
lastUpdated: new Date('2026-04-26T03:31:01.988Z'),
});
const res = await fetch(
`${baseUrl}/api/v1/tokens/${usdtAddress}/pricing-context?chainId=138&timestamp=${encodeURIComponent('2026-04-26T01:33:02.000Z')}`
);
expect(res.status).toBe(200);
const body = (await res.json()) as Record<string, any>;
expect(body.historical).toMatchObject({
chainId: 138,
tokenAddress: usdtAddress,
requestedTimestamp: '2026-04-26T01:33:02.000Z',
source: 'current_market_fallback',
locked: false,
});
});
});

View File

@@ -15,6 +15,7 @@ import {
getCanonicalTokensByChain,
resolveCanonicalQuoteAddress,
} from '../../config/canonical-tokens';
import { resolveCanonicalPriceUsd } from '../../services/canonical-price-oracle';
import { getLiveDodoPools } from '../../services/live-dodo-fallback';
import {
buildExplorerLinks,
@@ -27,10 +28,64 @@ const tokenRepo = new TokenRepository();
const marketDataRepo = new MarketDataRepository();
const poolRepo = new PoolRepository();
const ohlcvGenerator = new OHLCVGenerator();
type HistoricalPriceSource =
| 'swap_event'
| 'ohlcv_5m'
| 'ohlcv_15m'
| 'ohlcv_1h'
| 'ohlcv_4h'
| 'ohlcv_24h'
| 'coingecko_history'
| 'current_market_fallback'
| 'canonical_fallback'
| 'unavailable';
const coingeckoAdapter = new CoinGeckoAdapter();
const cmcAdapter = new CoinMarketCapAdapter();
const dexscreenerAdapter = new DexScreenerAdapter();
function firstString(value: string | string[] | undefined): string | undefined {
return Array.isArray(value) ? value[0] : value;
}
type HistoricalPriceSnapshot = {
chainId: number;
tokenAddress: string;
requestedTimestamp: string;
effectiveTimestamp?: string;
priceUsd?: number;
source: HistoricalPriceSource;
};
type CurrentPriceSnapshot = {
chainId: number;
tokenAddress: string;
priceUsd?: number;
asOf?: string;
sourceLayer: string;
stale: boolean;
marketLastUpdated?: string;
};
const HISTORICAL_INTERVAL_MAX_AGE_MS: Record<'5m' | '15m' | '1h' | '4h' | '24h', number> = {
'5m': 15 * 60 * 1000,
'15m': 45 * 60 * 1000,
'1h': 3 * 60 * 60 * 1000,
'4h': 12 * 60 * 60 * 1000,
'24h': 36 * 60 * 60 * 1000,
};
const COINGECKO_HISTORY_MAX_SKEW_MS = 6 * 60 * 60 * 1000;
function isAcceptableHistoricalTimestamp(
candidateTimestamp: Date,
requestedTimestamp: Date,
maxAgeMs: number
): boolean {
const deltaMs = requestedTimestamp.getTime() - candidateTimestamp.getTime();
return deltaMs >= 0 && deltaMs <= maxAgeMs;
}
function buildMarketPricingExplorer(
chainId: number,
displayAddress: string,
@@ -126,6 +181,189 @@ async function getTokenWithFallback(chainId: number, address: string): Promise<T
};
}
async function resolveHistoricalPriceAt(
chainId: number,
address: string,
timestamp: Date
): Promise<HistoricalPriceSnapshot> {
const normalizedAddress = address.toLowerCase();
const resolution = resolveCanonicalQuoteAddress(chainId, normalizedAddress);
const requestedTimestamp = timestamp.toISOString();
try {
const direct5m = await ohlcvGenerator.getOHLCV(
chainId,
resolution.lookupAddress,
'5m',
new Date(timestamp.getTime() - 10 * 60 * 1000),
timestamp
);
const directCandidate = direct5m[direct5m.length - 1];
if (
directCandidate?.close != null &&
isAcceptableHistoricalTimestamp(
directCandidate.timestamp,
timestamp,
HISTORICAL_INTERVAL_MAX_AGE_MS['5m']
)
) {
return {
chainId,
tokenAddress: normalizedAddress,
requestedTimestamp,
effectiveTimestamp: directCandidate.timestamp.toISOString(),
priceUsd: directCandidate.close,
source: 'ohlcv_5m',
};
}
const intervalWindows: Array<{ interval: '15m' | '1h' | '4h' | '24h'; windowMs: number }> = [
{ interval: '15m', windowMs: 2 * 60 * 60 * 1000 },
{ interval: '1h', windowMs: 24 * 60 * 60 * 1000 },
{ interval: '4h', windowMs: 7 * 24 * 60 * 60 * 1000 },
{ interval: '24h', windowMs: 60 * 24 * 60 * 60 * 1000 },
];
for (const { interval, windowMs } of intervalWindows) {
const candles = await ohlcvGenerator.getOHLCV(
chainId,
resolution.lookupAddress,
interval,
new Date(timestamp.getTime() - windowMs),
timestamp
);
const candidate = candles[candles.length - 1];
if (
candidate?.close != null &&
isAcceptableHistoricalTimestamp(
candidate.timestamp,
timestamp,
HISTORICAL_INTERVAL_MAX_AGE_MS[interval]
)
) {
return {
chainId,
tokenAddress: normalizedAddress,
requestedTimestamp,
effectiveTimestamp: candidate.timestamp.toISOString(),
priceUsd: candidate.close,
source: `ohlcv_${interval}`,
};
}
}
} catch (error) {
logger.warn('Historical OHLCV lookup failed; falling back to current layers', {
chainId,
address: resolution.lookupAddress,
requestedTimestamp,
error,
});
}
const canonicalResolution = resolveCanonicalPriceUsd(chainId, resolution.lookupAddress);
if (canonicalResolution.referenceSymbol) {
const coingeckoHistorical = await coingeckoAdapter.getHistoricalReferencePrice(
canonicalResolution.referenceSymbol,
timestamp
);
if (
coingeckoHistorical?.priceUsd != null &&
isAcceptableHistoricalTimestamp(
coingeckoHistorical.effectiveTimestamp,
timestamp,
COINGECKO_HISTORY_MAX_SKEW_MS
)
) {
return {
chainId,
tokenAddress: normalizedAddress,
requestedTimestamp,
effectiveTimestamp: coingeckoHistorical.effectiveTimestamp.toISOString(),
priceUsd: coingeckoHistorical.priceUsd,
source: 'coingecko_history',
};
}
}
const marketData = await marketDataRepo.getMarketData(chainId, resolution.lookupAddress);
if (marketData?.priceUsd != null) {
return {
chainId,
tokenAddress: normalizedAddress,
requestedTimestamp,
effectiveTimestamp: marketData.lastUpdated instanceof Date ? marketData.lastUpdated.toISOString() : new Date(marketData.lastUpdated).toISOString(),
priceUsd: marketData.priceUsd,
source: 'current_market_fallback',
};
}
const canonicalPricing = resolveUsdValuation({
chainId,
normalizedAddress: resolution.lookupAddress,
indexer: null,
coingecko: undefined,
cmc: undefined,
dexscreener: undefined,
});
if (canonicalPricing.priceUsd != null) {
return {
chainId,
tokenAddress: normalizedAddress,
requestedTimestamp,
effectiveTimestamp: canonicalPricing.asOf,
priceUsd: canonicalPricing.priceUsd,
source: 'canonical_fallback',
};
}
return {
chainId,
tokenAddress: normalizedAddress,
requestedTimestamp,
source: 'unavailable',
};
}
async function resolveCurrentPriceSnapshot(
chainId: number,
address: string
): Promise<CurrentPriceSnapshot> {
const normalizedAddress = address.toLowerCase();
const resolution = resolveCanonicalQuoteAddress(chainId, normalizedAddress);
const marketData = await marketDataRepo.getMarketData(chainId, resolution.lookupAddress);
const pricing = resolveUsdValuation({
chainId,
normalizedAddress: resolution.lookupAddress,
indexer: marketData,
coingecko: undefined,
cmc: undefined,
dexscreener: undefined,
});
const asOf = pricing.asOf
|| (marketData?.lastUpdated instanceof Date
? marketData.lastUpdated.toISOString()
: marketData?.lastUpdated
? new Date(marketData.lastUpdated).toISOString()
: undefined);
return {
chainId,
tokenAddress: normalizedAddress,
priceUsd: pricing.priceUsd,
asOf,
sourceLayer: pricing.sourceLayer,
stale: pricing.stale,
marketLastUpdated:
marketData?.lastUpdated instanceof Date
? marketData.lastUpdated.toISOString()
: marketData?.lastUpdated
? new Date(marketData.lastUpdated).toISOString()
: undefined,
};
}
async function getTokensWithFallback(
chainId: number,
limit: number,
@@ -250,7 +488,7 @@ router.get('/tokens', cacheMiddleware(60 * 1000), async (req: Request, res: Resp
router.get('/tokens/:address', cacheMiddleware(60 * 1000), async (req: Request, res: Response) => {
try {
const chainId = parseInt(req.query.chainId as string, 10);
const address = req.params.address;
const address = firstString(req.params.address) || '';
if (!chainId) {
return res.status(400).json({ error: 'chainId is required' });
@@ -338,7 +576,7 @@ router.get('/tokens/:address', cacheMiddleware(60 * 1000), async (req: Request,
router.get('/tokens/:address/pools', cacheMiddleware(60 * 1000), async (req: Request, res: Response) => {
try {
const chainId = parseInt(req.query.chainId as string, 10);
const address = req.params.address;
const address = firstString(req.params.address) || '';
if (!chainId) {
return res.status(400).json({ error: 'chainId is required' });
@@ -404,11 +642,13 @@ router.get('/tokens/:address/pools', cacheMiddleware(60 * 1000), async (req: Req
router.get('/tokens/:address/ohlcv', cacheMiddleware(5 * 60 * 1000), async (req: Request, res: Response) => {
try {
const chainId = parseInt(req.query.chainId as string, 10);
const address = req.params.address;
const interval = (req.query.interval as string) || '1h';
const from = req.query.from ? new Date(req.query.from as string) : new Date(Date.now() - 7 * 24 * 60 * 60 * 1000);
const to = req.query.to ? new Date(req.query.to as string) : new Date();
const poolAddress = req.query.poolAddress as string | undefined;
const address = firstString(req.params.address) || '';
const interval = firstString(req.query.interval as string | string[] | undefined) || '1h';
const fromRaw = firstString(req.query.from as string | string[] | undefined);
const toRaw = firstString(req.query.to as string | string[] | undefined);
const from = fromRaw ? new Date(fromRaw) : new Date(Date.now() - 7 * 24 * 60 * 60 * 1000);
const to = toRaw ? new Date(toRaw) : new Date();
const poolAddress = firstString(req.query.poolAddress as string | string[] | undefined);
if (!chainId) {
return res.status(400).json({ error: 'chainId is required' });
@@ -439,10 +679,83 @@ router.get('/tokens/:address/ohlcv', cacheMiddleware(5 * 60 * 1000), async (req:
}
});
router.get('/tokens/:address/price-at', cacheMiddleware(60 * 1000), async (req: Request, res: Response) => {
try {
const chainId = parseInt(req.query.chainId as string, 10);
const address = firstString(req.params.address) || '';
const rawTimestamp = String(firstString(req.query.timestamp as string | string[] | undefined) || '').trim();
if (!chainId) {
return res.status(400).json({ error: 'chainId is required' });
}
if (!rawTimestamp) {
return res.status(400).json({ error: 'timestamp is required' });
}
const timestamp = new Date(rawTimestamp);
if (Number.isNaN(timestamp.getTime())) {
return res.status(400).json({ error: 'timestamp must be a valid ISO-8601 datetime' });
}
const historicalPrice = await resolveHistoricalPriceAt(chainId, address, timestamp);
res.json(historicalPrice);
} catch (error) {
logger.error('Error fetching historical token price:', error);
res.status(500).json({ error: 'Internal server error' });
}
});
router.get('/tokens/:address/pricing-context', cacheMiddleware(60 * 1000), async (req: Request, res: Response) => {
try {
const chainId = parseInt(req.query.chainId as string, 10);
const address = firstString(req.params.address) || '';
const rawTimestamp = String(firstString(req.query.timestamp as string | string[] | undefined) || '').trim();
if (!chainId) {
return res.status(400).json({ error: 'chainId is required' });
}
const token = await getTokenWithFallback(chainId, address.toLowerCase());
if (!token) {
return res.status(404).json({ error: 'Token not found' });
}
const current = await resolveCurrentPriceSnapshot(chainId, address);
let historical: (HistoricalPriceSnapshot & { locked: boolean }) | undefined;
if (rawTimestamp) {
const timestamp = new Date(rawTimestamp);
if (Number.isNaN(timestamp.getTime())) {
return res.status(400).json({ error: 'timestamp must be a valid ISO-8601 datetime' });
}
const historicalPrice = await resolveHistoricalPriceAt(chainId, address, timestamp);
historical = {
...historicalPrice,
locked:
historicalPrice.source.startsWith('ohlcv_') ||
historicalPrice.source === 'swap_event' ||
historicalPrice.source === 'coingecko_history',
};
}
return res.json({
chainId,
tokenAddress: address.toLowerCase(),
current,
historical,
});
} catch (error) {
logger.error('Error fetching pricing context:', error);
return res.status(500).json({ error: 'Internal server error' });
}
});
router.get('/tokens/:address/signals', cacheMiddleware(5 * 60 * 1000), async (req: Request, res: Response) => {
try {
const chainId = parseInt(req.query.chainId as string, 10);
const address = req.params.address;
const address = firstString(req.params.address) || '';
if (!chainId) {
return res.status(400).json({ error: 'chainId is required' });
@@ -466,7 +779,7 @@ router.get('/tokens/:address/signals', cacheMiddleware(5 * 60 * 1000), async (re
router.get('/search', cacheMiddleware(60 * 1000), async (req: Request, res: Response) => {
try {
const chainId = parseInt(req.query.chainId as string, 10);
const query = req.query.q as string;
const query = firstString(req.query.q as string | string[] | undefined) || '';
if (!chainId || !query) {
return res.status(400).json({ error: 'chainId and q (query) are required' });
@@ -488,7 +801,7 @@ router.get('/search', cacheMiddleware(60 * 1000), async (req: Request, res: Resp
router.get('/pools/:poolAddress', cacheMiddleware(60 * 1000), async (req: Request, res: Response) => {
try {
const chainId = parseInt(req.query.chainId as string, 10);
const poolAddress = req.params.poolAddress;
const poolAddress = firstString(req.params.poolAddress) || '';
if (!chainId) {
return res.status(400).json({ error: 'chainId is required' });

View File

@@ -0,0 +1,49 @@
import * as dotenv from 'dotenv';
import path from 'path';
import { existsSync } from 'fs';
import { HistoricalPricingBackfillService } from './services/historical-pricing-backfill';
const rootEnvCandidates = [
path.resolve(__dirname, '../../.env'),
path.resolve(__dirname, '../../../.env'),
];
for (const candidate of rootEnvCandidates) {
if (existsSync(candidate)) {
dotenv.config({ path: candidate });
break;
}
}
dotenv.config();
function readInt(name: string, fallback: number): number {
const raw = String(process.env[name] || '').trim();
if (!raw) return fallback;
const parsed = Number(raw);
return Number.isFinite(parsed) ? parsed : fallback;
}
async function main(): Promise<void> {
const chainId = readInt('BACKFILL_CHAIN_ID', 138);
const days = readInt('BACKFILL_DAYS', 30);
const chunkSize = readInt('BACKFILL_CHUNK_SIZE', 2500);
const poolLimit = readInt('BACKFILL_POOL_LIMIT', 500);
const service = new HistoricalPricingBackfillService();
const summary = await service.backfillChain({
chainId,
days,
chunkSize,
poolLimit,
});
// eslint-disable-next-line no-console
console.log(JSON.stringify(summary, null, 2));
}
main().catch((error) => {
// eslint-disable-next-line no-console
console.error(error);
process.exit(1);
});

View File

@@ -0,0 +1,99 @@
import { Pool } from 'pg';
import { getDatabasePool } from '../client';
export interface SwapEventRecord {
chainId: number;
poolAddress: string;
transactionHash: string;
blockNumber: number;
logIndex: number;
token0Address: string;
token1Address: string;
amount0In: string;
amount1In: string;
amount0Out: string;
amount1Out: string;
amountUsd?: number;
priceUsd?: number;
token0PriceUsd?: number;
token1PriceUsd?: number;
sender?: string;
toAddress?: string;
timestamp: Date;
}
export class SwapEventRepository {
private pool: Pool;
constructor() {
this.pool = getDatabasePool();
}
private isMissingRelationError(error: unknown): boolean {
if (!error || typeof error !== 'object') {
return false;
}
const code = (error as { code?: string }).code;
const message = (error as { message?: string }).message || '';
return code === '42P01' || (message.includes('relation "') && message.includes('" does not exist'));
}
async upsertSwapEvent(event: SwapEventRecord): Promise<void> {
try {
await this.pool.query(
`INSERT INTO swap_events (
chain_id, pool_address, transaction_hash, block_number, log_index,
token0_address, token1_address, amount0_in, amount1_in, amount0_out, amount1_out,
amount_usd, price_usd, token0_price_usd, token1_price_usd, sender, to_address, timestamp
)
VALUES (
$1, $2, $3, $4, $5,
$6, $7, $8, $9, $10, $11,
$12, $13, $14, $15, $16, $17, $18
)
ON CONFLICT (chain_id, transaction_hash, log_index) DO UPDATE SET
pool_address = EXCLUDED.pool_address,
block_number = EXCLUDED.block_number,
token0_address = EXCLUDED.token0_address,
token1_address = EXCLUDED.token1_address,
amount0_in = EXCLUDED.amount0_in,
amount1_in = EXCLUDED.amount1_in,
amount0_out = EXCLUDED.amount0_out,
amount1_out = EXCLUDED.amount1_out,
amount_usd = EXCLUDED.amount_usd,
price_usd = EXCLUDED.price_usd,
token0_price_usd = EXCLUDED.token0_price_usd,
token1_price_usd = EXCLUDED.token1_price_usd,
sender = EXCLUDED.sender,
to_address = EXCLUDED.to_address,
timestamp = EXCLUDED.timestamp`,
[
event.chainId,
event.poolAddress.toLowerCase(),
event.transactionHash.toLowerCase(),
event.blockNumber,
event.logIndex,
event.token0Address.toLowerCase(),
event.token1Address.toLowerCase(),
event.amount0In,
event.amount1In,
event.amount0Out,
event.amount1Out,
event.amountUsd,
event.priceUsd,
event.token0PriceUsd,
event.token1PriceUsd,
event.sender?.toLowerCase(),
event.toAddress?.toLowerCase(),
event.timestamp,
]
);
} catch (error) {
if (this.isMissingRelationError(error)) {
return;
}
throw error;
}
}
}

View File

@@ -26,6 +26,7 @@ export class ChainIndexer {
dexscreener: DexScreenerAdapter;
};
private isRunning: boolean = false;
private isIndexing: boolean = false;
private indexingInterval?: NodeJS.Timeout;
constructor(chainId: number) {
@@ -94,6 +95,12 @@ export class ChainIndexer {
* Index all data (pools, tokens, market data)
*/
private async indexAll(): Promise<void> {
if (this.isIndexing) {
logger.info(`Skipping overlapping index cycle for chain ${this.chainId}`);
return;
}
this.isIndexing = true;
try {
// 1. Index pools
logger.info(`Indexing pools for chain ${this.chainId}...`);
@@ -134,6 +141,8 @@ export class ChainIndexer {
} catch (error) {
logger.error(`Error in indexAll for chain ${this.chainId}:`, error);
throw error;
} finally {
this.isIndexing = false;
}
}

View File

@@ -124,13 +124,18 @@ async function queryFilterWithRangeFallback(
}
const logs: ethers.EventLog[] = [];
for (
let start = fromBlock;
start <= toBlock;
start += CROSS_CHAIN_QUERY_FALLBACK_BLOCK_SPAN
) {
const end = Math.min(start + CROSS_CHAIN_QUERY_FALLBACK_BLOCK_SPAN - 1, toBlock);
const chunk = (await contract.queryFilter(filter as never, start, end)) as ethers.EventLog[];
const totalSpan = toBlock - fromBlock + 1;
const chunkSpan = Math.min(CROSS_CHAIN_QUERY_FALLBACK_BLOCK_SPAN, totalSpan);
for (let start = fromBlock; start <= toBlock; start += chunkSpan) {
const end = Math.min(start + chunkSpan - 1, toBlock);
if (start === fromBlock && end === toBlock) {
const mid = Math.floor((start + end) / 2);
logs.push(...await queryFilterWithRangeFallback(contract, filter, start, mid));
logs.push(...await queryFilterWithRangeFallback(contract, filter, mid + 1, end));
continue;
}
const chunk = await queryFilterWithRangeFallback(contract, filter, start, end);
logs.push(...chunk);
}
return logs;

View File

@@ -73,8 +73,14 @@ export class OHLCVGenerator {
const intervalMs = this.getIntervalMs(interval);
// Get swap events for the time range
const priceColumn = `CASE
WHEN token0_address = $2 THEN COALESCE(token0_price_usd, price_usd)
WHEN token1_address = $2 THEN COALESCE(token1_price_usd, price_usd)
ELSE price_usd
END AS token_price_usd`;
let query = `
SELECT timestamp, amount_usd, price_usd
SELECT timestamp, amount_usd, ${priceColumn}
FROM swap_events
WHERE chain_id = $1
AND (token0_address = $2 OR token1_address = $2)
@@ -102,7 +108,7 @@ export class OHLCVGenerator {
result.rows.forEach((row) => {
const timestamp = new Date(row.timestamp);
const intervalStart = Math.floor(timestamp.getTime() / intervalMs) * intervalMs;
const price = parseFloat(row.price_usd || '0');
const price = parseFloat((row.token_price_usd ?? row.price_usd ?? '0').toString());
const volume = parseFloat(row.amount_usd || '0');
if (!intervals.has(intervalStart)) {

View File

@@ -50,6 +50,25 @@ const DODO_PMM_INTEGRATION_ABI = [
'function getPoolPriceOrOracle(address) view returns (uint256 price)',
];
const POOL_QUERY_FALLBACK_BLOCK_SPAN = Math.max(
1,
Number(process.env.POOL_QUERY_FALLBACK_BLOCK_SPAN || 5000)
);
function isRpcRangeLimitError(error: unknown): boolean {
const parts = [
typeof error === 'object' && error ? (error as { message?: string }).message : '',
typeof error === 'object' && error ? (error as { shortMessage?: string }).shortMessage : '',
typeof error === 'object' && error
? ((error as { error?: { message?: string } }).error?.message ?? '')
: '',
];
return parts.some((part) =>
typeof part === 'string' && /range limit|exceeds maximum rpc range/i.test(part)
);
}
export class PoolIndexer {
private static missingDexConfigLogged = new Set<number>();
private static staleDodoPoolsLogged = new Set<string>();
@@ -75,6 +94,37 @@ export class PoolIndexer {
pools.push(pool);
}
private async queryFilterWithRangeFallback(
contract: ethers.Contract,
filter: ReturnType<ethers.Contract['filters'][string]>,
fromBlock: number,
toBlock: number
): Promise<ethers.EventLog[]> {
try {
return (await contract.queryFilter(filter as never, fromBlock, toBlock)) as ethers.EventLog[];
} catch (error) {
if (!isRpcRangeLimitError(error) || fromBlock >= toBlock) {
throw error;
}
}
const logs: ethers.EventLog[] = [];
const totalSpan = toBlock - fromBlock + 1;
const chunkSpan = Math.min(POOL_QUERY_FALLBACK_BLOCK_SPAN, totalSpan);
for (let start = fromBlock; start <= toBlock; start += chunkSpan) {
const end = Math.min(start + chunkSpan - 1, toBlock);
if (start === fromBlock && end === toBlock) {
const mid = Math.floor((start + end) / 2);
logs.push(...await this.queryFilterWithRangeFallback(contract, filter, start, mid));
logs.push(...await this.queryFilterWithRangeFallback(contract, filter, mid + 1, end));
continue;
}
logs.push(...await this.queryFilterWithRangeFallback(contract, filter, start, end));
}
return logs;
}
/**
* Index all pools for configured DEX types
*/
@@ -230,7 +280,7 @@ export class PoolIndexer {
// Listen for PairCreated events
const filter = factory.filters.PairCreated();
const events = await factory.queryFilter(filter, fromBlock, currentBlock);
const events = await this.queryFilterWithRangeFallback(factory, filter, fromBlock, currentBlock);
for (const event of events) {
const ev = event as ethers.EventLog;
@@ -284,7 +334,7 @@ export class PoolIndexer {
const fromBlock = config.startBlock || Math.max(0, currentBlock - 10000);
const filter = factory.filters.PoolCreated();
const events = await factory.queryFilter(filter, fromBlock, currentBlock);
const events = await this.queryFilterWithRangeFallback(factory, filter, fromBlock, currentBlock);
for (const event of events) {
const ev = event as ethers.EventLog;

View File

@@ -0,0 +1,489 @@
import { ethers } from 'ethers';
import { getChainConfig } from '../config/chains';
import { getCanonicalPriceUsd } from './canonical-price-oracle';
import { resolveCanonicalQuoteAddress } from '../config/canonical-tokens';
import { TokenRepository } from '../database/repositories/token-repo';
import { PoolRepository, type LiquidityPool } from '../database/repositories/pool-repo';
import { MarketDataRepository } from '../database/repositories/market-data-repo';
import { SwapEventRepository, type SwapEventRecord } from '../database/repositories/swap-event-repo';
import { OHLCVGenerator, type OHLCVInterval } from '../indexer/ohlcv-generator';
import { logger } from '../utils/logger';
const ERC20_METADATA_ABI = [
'function decimals() view returns (uint8)',
'function symbol() view returns (string)',
] as const;
const UNISWAP_V2_SWAP_IFACE = new ethers.Interface([
'event Swap(address indexed sender, uint256 amount0In, uint256 amount1In, uint256 amount0Out, uint256 amount1Out, address indexed to)',
]);
const UNISWAP_V3_SWAP_IFACE = new ethers.Interface([
'event Swap(address indexed sender, address indexed recipient, int256 amount0, int256 amount1, uint160 sqrtPriceX96, uint128 liquidity, int24 tick)',
]);
const DODO_SWAP_IFACE = new ethers.Interface([
'event DODOSwap(address fromToken, address toToken, uint256 fromAmount, uint256 toAmount, address trader, address receiver)',
]);
type BackfillOptions = {
chainId: number;
days?: number;
chunkSize?: number;
poolLimit?: number;
};
type TokenMetadata = {
address: string;
decimals: number;
symbol?: string;
};
type BackfillSummary = {
chainId: number;
poolsScanned: number;
poolsProcessed: number;
swapsUpserted: number;
ohlcvTokensGenerated: number;
fromBlock: number;
toBlock: number;
days: number;
};
export class HistoricalPricingBackfillService {
private tokenRepo = new TokenRepository();
private poolRepo = new PoolRepository();
private marketDataRepo = new MarketDataRepository();
private swapEventRepo = new SwapEventRepository();
private ohlcvGenerator = new OHLCVGenerator();
private providerCache = new Map<number, ethers.JsonRpcProvider>();
private tokenMetadataCache = new Map<string, TokenMetadata>();
private anchorPriceCache = new Map<string, number | undefined>();
private blockTimestampCache = new Map<string, Date>();
async backfillChain(options: BackfillOptions): Promise<BackfillSummary> {
const chainId = options.chainId;
const days = Math.max(1, options.days ?? 30);
const chunkSize = Math.max(100, options.chunkSize ?? 2500);
const config = getChainConfig(chainId);
if (!config) {
throw new Error(`Chain ${chainId} is not configured`);
}
const provider = this.getProvider(chainId);
const currentBlock = await provider.getBlockNumber();
const blocksPerDay = Math.max(1, Math.ceil((24 * 60 * 60) / Math.max(config.blockTime, 1)));
const defaultFromBlock = Math.max(0, currentBlock - (blocksPerDay * days));
const pools = await this.poolRepo.getPoolsByChain(chainId, options.poolLimit ?? 500);
const tokenAddresses = new Set<string>();
let poolsProcessed = 0;
let swapsUpserted = 0;
for (const pool of pools) {
tokenAddresses.add(pool.token0Address.toLowerCase());
tokenAddresses.add(pool.token1Address.toLowerCase());
if (!this.isSupportedDexType(pool.dexType)) {
continue;
}
const fromBlock = Math.max(defaultFromBlock, Number(pool.createdAtBlock || 0));
const count = await this.backfillPool(pool, fromBlock, currentBlock, chunkSize);
if (count > 0) {
poolsProcessed += 1;
swapsUpserted += count;
}
}
const now = new Date();
const from = new Date(now.getTime() - days * 24 * 60 * 60 * 1000);
let ohlcvTokensGenerated = 0;
const intervals: OHLCVInterval[] = ['5m', '15m', '1h', '4h', '24h'];
for (const tokenAddress of tokenAddresses) {
for (const interval of intervals) {
await this.ohlcvGenerator.generateAndStore(chainId, tokenAddress, interval, from, now);
}
ohlcvTokensGenerated += 1;
}
return {
chainId,
poolsScanned: pools.length,
poolsProcessed,
swapsUpserted,
ohlcvTokensGenerated,
fromBlock: defaultFromBlock,
toBlock: currentBlock,
days,
};
}
private isSupportedDexType(dexType: string): boolean {
return ['uniswap_v2', 'sushiswap', 'uniswap_v3', 'dodo'].includes(dexType);
}
private getProvider(chainId: number): ethers.JsonRpcProvider {
const cached = this.providerCache.get(chainId);
if (cached) {
return cached;
}
const config = getChainConfig(chainId);
if (!config) {
throw new Error(`Chain ${chainId} is not configured`);
}
const provider = new ethers.JsonRpcProvider(config.rpcUrl);
this.providerCache.set(chainId, provider);
return provider;
}
private async backfillPool(
pool: LiquidityPool,
fromBlock: number,
toBlock: number,
chunkSize: number
): Promise<number> {
let count = 0;
const provider = this.getProvider(pool.chainId);
for (let start = fromBlock; start <= toBlock; start += chunkSize) {
const end = Math.min(toBlock, start + chunkSize - 1);
const logs = await this.fetchPoolLogs(pool, provider, start, end);
for (const log of logs) {
const event = await this.toSwapEventRecord(pool, log, provider);
if (!event) {
continue;
}
await this.swapEventRepo.upsertSwapEvent(event);
count += 1;
}
}
logger.info('Historical pricing pool backfill complete', {
chainId: pool.chainId,
poolAddress: pool.poolAddress,
dexType: pool.dexType,
fromBlock,
toBlock,
swapsUpserted: count,
});
return count;
}
private async fetchPoolLogs(
pool: LiquidityPool,
provider: ethers.JsonRpcProvider,
fromBlock: number,
toBlock: number
): Promise<ethers.Log[]> {
let iface: ethers.Interface;
let topic: string;
switch (pool.dexType) {
case 'uniswap_v2':
case 'sushiswap':
iface = UNISWAP_V2_SWAP_IFACE;
topic = UNISWAP_V2_SWAP_IFACE.getEvent('Swap')!.topicHash;
break;
case 'uniswap_v3':
iface = UNISWAP_V3_SWAP_IFACE;
topic = UNISWAP_V3_SWAP_IFACE.getEvent('Swap')!.topicHash;
break;
case 'dodo':
iface = DODO_SWAP_IFACE;
topic = DODO_SWAP_IFACE.getEvent('DODOSwap')!.topicHash;
break;
default:
return [];
}
try {
return await provider.getLogs({
address: pool.poolAddress,
fromBlock,
toBlock,
topics: [topic],
});
} catch (error) {
logger.warn('Historical pricing log fetch failed for pool chunk', {
chainId: pool.chainId,
poolAddress: pool.poolAddress,
dexType: pool.dexType,
fromBlock,
toBlock,
error,
});
return [];
}
}
private async toSwapEventRecord(
pool: LiquidityPool,
log: ethers.Log,
provider: ethers.JsonRpcProvider
): Promise<SwapEventRecord | null> {
try {
if (pool.dexType === 'uniswap_v2' || pool.dexType === 'sushiswap') {
const parsed = UNISWAP_V2_SWAP_IFACE.parseLog(log)!;
const amount0In = BigInt(parsed.args.amount0In.toString());
const amount1In = BigInt(parsed.args.amount1In.toString());
const amount0Out = BigInt(parsed.args.amount0Out.toString());
const amount1Out = BigInt(parsed.args.amount1Out.toString());
return await this.buildSwapEventRecord(pool, log, provider, {
amount0In,
amount1In,
amount0Out,
amount1Out,
sender: String(parsed.args.sender),
toAddress: String(parsed.args.to),
});
}
if (pool.dexType === 'uniswap_v3') {
const parsed = UNISWAP_V3_SWAP_IFACE.parseLog(log)!;
const amount0Signed = BigInt(parsed.args.amount0.toString());
const amount1Signed = BigInt(parsed.args.amount1.toString());
return await this.buildSwapEventRecord(pool, log, provider, {
amount0In: amount0Signed > 0n ? amount0Signed : 0n,
amount1In: amount1Signed > 0n ? amount1Signed : 0n,
amount0Out: amount0Signed < 0n ? -amount0Signed : 0n,
amount1Out: amount1Signed < 0n ? -amount1Signed : 0n,
sender: String(parsed.args.sender),
toAddress: String(parsed.args.recipient),
});
}
if (pool.dexType === 'dodo') {
const parsed = DODO_SWAP_IFACE.parseLog(log)!;
const fromToken = String(parsed.args.fromToken).toLowerCase();
const toToken = String(parsed.args.toToken).toLowerCase();
const fromAmount = BigInt(parsed.args.fromAmount.toString());
const toAmount = BigInt(parsed.args.toAmount.toString());
if (fromToken === pool.token0Address.toLowerCase() && toToken === pool.token1Address.toLowerCase()) {
return await this.buildSwapEventRecord(pool, log, provider, {
amount0In: fromAmount,
amount1In: 0n,
amount0Out: 0n,
amount1Out: toAmount,
sender: String(parsed.args.trader),
toAddress: String(parsed.args.receiver),
});
}
if (fromToken === pool.token1Address.toLowerCase() && toToken === pool.token0Address.toLowerCase()) {
return await this.buildSwapEventRecord(pool, log, provider, {
amount0In: 0n,
amount1In: fromAmount,
amount0Out: toAmount,
amount1Out: 0n,
sender: String(parsed.args.trader),
toAddress: String(parsed.args.receiver),
});
}
}
} catch (error) {
logger.warn('Historical pricing log parse failed', {
chainId: pool.chainId,
poolAddress: pool.poolAddress,
dexType: pool.dexType,
txHash: log.transactionHash,
error,
});
}
return null;
}
private async buildSwapEventRecord(
pool: LiquidityPool,
log: ethers.Log,
provider: ethers.JsonRpcProvider,
amounts: {
amount0In: bigint;
amount1In: bigint;
amount0Out: bigint;
amount1Out: bigint;
sender?: string;
toAddress?: string;
}
): Promise<SwapEventRecord> {
const moved0 = amounts.amount0In > 0n ? amounts.amount0In : amounts.amount0Out;
const moved1 = amounts.amount1In > 0n ? amounts.amount1In : amounts.amount1Out;
const valuation = await this.deriveSwapValuation(pool.chainId, pool.token0Address, pool.token1Address, moved0, moved1);
return {
chainId: pool.chainId,
poolAddress: pool.poolAddress,
transactionHash: log.transactionHash,
blockNumber: Number(log.blockNumber),
logIndex: log.index,
token0Address: pool.token0Address,
token1Address: pool.token1Address,
amount0In: amounts.amount0In.toString(),
amount1In: amounts.amount1In.toString(),
amount0Out: amounts.amount0Out.toString(),
amount1Out: amounts.amount1Out.toString(),
amountUsd: valuation.amountUsd,
priceUsd: valuation.priceUsd,
token0PriceUsd: valuation.token0PriceUsd,
token1PriceUsd: valuation.token1PriceUsd,
sender: amounts.sender,
toAddress: amounts.toAddress,
timestamp: await this.getBlockTimestamp(provider, pool.chainId, Number(log.blockNumber)),
};
}
private async deriveSwapValuation(
chainId: number,
token0Address: string,
token1Address: string,
amount0Raw: bigint,
amount1Raw: bigint
): Promise<{
amountUsd?: number;
priceUsd?: number;
token0PriceUsd?: number;
token1PriceUsd?: number;
}> {
if (amount0Raw <= 0n || amount1Raw <= 0n) {
return {};
}
const [token0Meta, token1Meta] = await Promise.all([
this.getTokenMetadata(chainId, token0Address),
this.getTokenMetadata(chainId, token1Address),
]);
const amount0 = Number(ethers.formatUnits(amount0Raw, token0Meta.decimals));
const amount1 = Number(ethers.formatUnits(amount1Raw, token1Meta.decimals));
if (!Number.isFinite(amount0) || !Number.isFinite(amount1) || amount0 <= 0 || amount1 <= 0) {
return {};
}
const [anchor0, anchor1] = await Promise.all([
this.resolveAnchorUsdPrice(chainId, token0Address),
this.resolveAnchorUsdPrice(chainId, token1Address),
]);
let token0PriceUsd = anchor0;
let token1PriceUsd = anchor1;
if (anchor1 != null && anchor1 > 0) {
token0PriceUsd = (amount1 * anchor1) / amount0;
}
if (anchor0 != null && anchor0 > 0) {
token1PriceUsd = (amount0 * anchor0) / amount1;
}
if ((token0PriceUsd == null || token0PriceUsd <= 0) && token1PriceUsd != null && token1PriceUsd > 0) {
token0PriceUsd = (amount1 * token1PriceUsd) / amount0;
}
if ((token1PriceUsd == null || token1PriceUsd <= 0) && token0PriceUsd != null && token0PriceUsd > 0) {
token1PriceUsd = (amount0 * token0PriceUsd) / amount1;
}
const amountUsd =
token0PriceUsd != null && token0PriceUsd > 0
? amount0 * token0PriceUsd
: token1PriceUsd != null && token1PriceUsd > 0
? amount1 * token1PriceUsd
: undefined;
return {
amountUsd,
priceUsd: token0PriceUsd ?? token1PriceUsd,
token0PriceUsd,
token1PriceUsd,
};
}
private async resolveAnchorUsdPrice(chainId: number, address: string): Promise<number | undefined> {
const normalized = `${chainId}:${address.toLowerCase()}`;
if (this.anchorPriceCache.has(normalized)) {
return this.anchorPriceCache.get(normalized);
}
const canonical = getCanonicalPriceUsd(chainId, address);
if (canonical != null && canonical > 0) {
this.anchorPriceCache.set(normalized, canonical);
return canonical;
}
const resolution = resolveCanonicalQuoteAddress(chainId, address.toLowerCase());
const marketData = await this.marketDataRepo.getMarketData(chainId, resolution.lookupAddress);
const price = marketData?.priceUsd;
this.anchorPriceCache.set(normalized, price);
return price;
}
private async getTokenMetadata(chainId: number, address: string): Promise<TokenMetadata> {
const normalized = `${chainId}:${address.toLowerCase()}`;
const cached = this.tokenMetadataCache.get(normalized);
if (cached) {
return cached;
}
const token = await this.tokenRepo.getToken(chainId, address.toLowerCase());
if (token?.decimals != null) {
const metadata = {
address: address.toLowerCase(),
decimals: token.decimals,
symbol: token.symbol,
};
this.tokenMetadataCache.set(normalized, metadata);
return metadata;
}
const provider = this.getProvider(chainId);
const contract = new ethers.Contract(address, ERC20_METADATA_ABI, provider);
try {
const [decimals, symbol] = await Promise.all([
contract.decimals().catch(() => 18),
contract.symbol().catch(() => undefined),
]);
const metadata = {
address: address.toLowerCase(),
decimals: Number(decimals),
symbol: typeof symbol === 'string' ? symbol : undefined,
};
this.tokenMetadataCache.set(normalized, metadata);
return metadata;
} catch {
const metadata = {
address: address.toLowerCase(),
decimals: 18,
};
this.tokenMetadataCache.set(normalized, metadata);
return metadata;
}
}
private async getBlockTimestamp(
provider: ethers.JsonRpcProvider,
chainId: number,
blockNumber: number
): Promise<Date> {
const key = `${chainId}:${blockNumber}`;
const cached = this.blockTimestampCache.get(key);
if (cached) {
return cached;
}
const block = await provider.getBlock(blockNumber);
const timestamp = new Date(Number(block?.timestamp || 0) * 1000);
this.blockTimestampCache.set(key, timestamp);
return timestamp;
}
}