Add full monorepo: virtual-banker, backend, frontend, docs, scripts, deployment

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
defiQUG
2026-02-10 11:32:49 -08:00
parent aafcd913c2
commit 88bc76da91
815 changed files with 125522 additions and 264 deletions

View File

@@ -0,0 +1,118 @@
package backfill
import (
"context"
"fmt"
"log"
"math/big"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/explorer/backend/indexer/processor"
"github.com/jackc/pgx/v5/pgxpool"
)
// BackfillWorker handles historical block indexing
type BackfillWorker struct {
db *pgxpool.Pool
client *ethclient.Client
processor *processor.BlockProcessor
chainID int
batchSize int
startBlock int64
endBlock int64
}
// NewBackfillWorker creates a new backfill worker
func NewBackfillWorker(db *pgxpool.Pool, client *ethclient.Client, chainID int, batchSize int) *BackfillWorker {
proc := processor.NewBlockProcessor(db, client, chainID)
return &BackfillWorker{
db: db,
client: client,
processor: proc,
chainID: chainID,
batchSize: batchSize,
}
}
// SetRange sets the block range to backfill
func (bw *BackfillWorker) SetRange(startBlock, endBlock int64) {
bw.startBlock = startBlock
bw.endBlock = endBlock
}
// Run starts the backfill process
func (bw *BackfillWorker) Run(ctx context.Context) error {
currentBlock := bw.startBlock
checkpoint := bw.getCheckpoint(ctx)
if checkpoint > currentBlock {
currentBlock = checkpoint
log.Printf("Resuming from checkpoint: block %d", currentBlock)
}
for currentBlock <= bw.endBlock {
// Process batch
endBatch := currentBlock + int64(bw.batchSize) - 1
if endBatch > bw.endBlock {
endBatch = bw.endBlock
}
if err := bw.processBatch(ctx, currentBlock, endBatch); err != nil {
return fmt.Errorf("failed to process batch %d-%d: %w", currentBlock, endBatch, err)
}
// Update checkpoint
if err := bw.saveCheckpoint(ctx, endBatch); err != nil {
log.Printf("Warning: failed to save checkpoint: %v", err)
}
log.Printf("Processed blocks %d-%d", currentBlock, endBatch)
currentBlock = endBatch + 1
// Check for cancellation
select {
case <-ctx.Done():
return ctx.Err()
default:
}
}
return nil
}
// processBatch processes a batch of blocks
func (bw *BackfillWorker) processBatch(ctx context.Context, start, end int64) error {
for blockNum := start; blockNum <= end; blockNum++ {
block, err := bw.client.BlockByNumber(ctx, big.NewInt(blockNum))
if err != nil {
return fmt.Errorf("failed to fetch block %d: %w", blockNum, err)
}
if err := bw.processor.ProcessBlock(ctx, block); err != nil {
return fmt.Errorf("failed to process block %d: %w", blockNum, err)
}
}
return nil
}
// getCheckpoint gets the last processed block from checkpoint
func (bw *BackfillWorker) getCheckpoint(ctx context.Context) int64 {
var checkpoint int64
query := `SELECT last_block FROM backfill_checkpoints WHERE chain_id = $1`
err := bw.db.QueryRow(ctx, query, bw.chainID).Scan(&checkpoint)
if err != nil {
return 0
}
return checkpoint
}
// saveCheckpoint saves the checkpoint
func (bw *BackfillWorker) saveCheckpoint(ctx context.Context, blockNum int64) error {
query := `
INSERT INTO backfill_checkpoints (chain_id, last_block, updated_at)
VALUES ($1, $2, NOW())
ON CONFLICT (chain_id) DO UPDATE SET last_block = $2, updated_at = NOW()
`
_, err := bw.db.Exec(ctx, query, bw.chainID, blockNum)
return err
}

View File

@@ -0,0 +1,157 @@
package listener
import (
"context"
"fmt"
"math/big"
"time"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethclient"
)
// BlockListener listens for new blocks on the blockchain
type BlockListener struct {
client *ethclient.Client
wsClient *ethclient.Client
chainID int64
useWebSocket bool
queue chan *types.Block
ctx context.Context
cancel context.CancelFunc
}
// NewBlockListener creates a new block listener
func NewBlockListener(rpcURL, wsURL string, chainID int64) (*BlockListener, error) {
ctx, cancel := context.WithCancel(context.Background())
// Connect to RPC
client, err := ethclient.Dial(rpcURL)
if err != nil {
cancel()
return nil, fmt.Errorf("failed to connect to RPC: %w", err)
}
// Try to connect to WebSocket (optional)
var wsClient *ethclient.Client
useWebSocket := false
if wsURL != "" {
wsClient, err = ethclient.Dial(wsURL)
if err == nil {
useWebSocket = true
}
}
return &BlockListener{
client: client,
wsClient: wsClient,
chainID: chainID,
useWebSocket: useWebSocket,
queue: make(chan *types.Block, 100),
ctx: ctx,
cancel: cancel,
}, nil
}
// Start starts listening for new blocks
func (bl *BlockListener) Start() error {
if bl.useWebSocket {
return bl.startWebSocketListener()
}
return bl.startPollingListener()
}
// startWebSocketListener listens via WebSocket subscription
func (bl *BlockListener) startWebSocketListener() error {
headers := make(chan *types.Header)
sub, err := bl.wsClient.SubscribeNewHead(bl.ctx, headers)
if err != nil {
return fmt.Errorf("failed to subscribe to new heads: %w", err)
}
go func() {
defer sub.Unsubscribe()
for {
select {
case err := <-sub.Err():
if err != nil {
// Fallback to polling on WebSocket error
bl.useWebSocket = false
go bl.startPollingListener()
return
}
case header := <-headers:
bl.fetchAndQueueBlock(header.Number.Int64())
case <-bl.ctx.Done():
return
}
}
}()
return nil
}
// startPollingListener polls for new blocks
func (bl *BlockListener) startPollingListener() error {
var lastBlock int64 = -1
ticker := time.NewTicker(2 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
blockNumber, err := bl.client.BlockNumber(bl.ctx)
if err != nil {
continue
}
currentBlock := int64(blockNumber)
if lastBlock == -1 {
lastBlock = currentBlock
continue
}
// Process all blocks from lastBlock+1 to currentBlock
for i := lastBlock + 1; i <= currentBlock; i++ {
bl.fetchAndQueueBlock(i)
}
lastBlock = currentBlock
case <-bl.ctx.Done():
return nil
}
}
}
// fetchAndQueueBlock fetches a block and queues it for processing
func (bl *BlockListener) fetchAndQueueBlock(blockNumber int64) {
block, err := bl.client.BlockByNumber(bl.ctx, big.NewInt(blockNumber))
if err != nil {
return
}
select {
case bl.queue <- block:
case <-bl.ctx.Done():
return
}
}
// GetBlockChannel returns the channel for receiving blocks
func (bl *BlockListener) GetBlockChannel() <-chan *types.Block {
return bl.queue
}
// Stop stops the listener
func (bl *BlockListener) Stop() {
bl.cancel()
if bl.client != nil {
bl.client.Close()
}
if bl.wsClient != nil {
bl.wsClient.Close()
}
close(bl.queue)
}

82
backend/indexer/main.go Normal file
View File

@@ -0,0 +1,82 @@
package main
import (
"context"
"log"
"os"
"os/signal"
"syscall"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/explorer/backend/database/config"
"github.com/explorer/backend/indexer/listener"
"github.com/explorer/backend/indexer/processor"
"github.com/jackc/pgx/v5/pgxpool"
)
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Load configuration
dbConfig := config.LoadDatabaseConfig()
poolConfig, err := dbConfig.PoolConfig()
if err != nil {
log.Fatalf("Failed to create pool config: %v", err)
}
// Connect to database
db, err := pgxpool.NewWithConfig(ctx, poolConfig)
if err != nil {
log.Fatalf("Failed to connect to database: %v", err)
}
defer db.Close()
// Connect to RPC
rpcURL := os.Getenv("RPC_URL")
if rpcURL == "" {
rpcURL = "http://localhost:8545"
}
wsURL := os.Getenv("WS_URL")
chainID := 138 // ChainID 138
client, err := ethclient.Dial(rpcURL)
if err != nil {
log.Fatalf("Failed to connect to RPC: %v", err)
}
defer client.Close()
// Create block listener
blockListener, err := listener.NewBlockListener(rpcURL, wsURL, int64(chainID))
if err != nil {
log.Fatalf("Failed to create block listener: %v", err)
}
defer blockListener.Stop()
// Create block processor
blockProcessor := processor.NewBlockProcessor(db, client, chainID)
// Start listening
if err := blockListener.Start(); err != nil {
log.Fatalf("Failed to start block listener: %v", err)
}
// Process blocks
go func() {
for block := range blockListener.GetBlockChannel() {
if err := blockProcessor.ProcessBlock(ctx, block); err != nil {
log.Printf("Failed to process block %d: %v", block.Number().Int64(), err)
} else {
log.Printf("Processed block %d", block.Number().Int64())
}
}
}()
// Wait for interrupt
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM)
<-sigChan
log.Println("Shutting down...")
}

View File

@@ -0,0 +1,252 @@
package processor
import (
"context"
"database/sql"
"fmt"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
)
// BlockProcessor processes blocks and extracts data
type BlockProcessor struct {
db *pgxpool.Pool
client *ethclient.Client
chainID int
}
// NewBlockProcessor creates a new block processor
func NewBlockProcessor(db *pgxpool.Pool, client *ethclient.Client, chainID int) *BlockProcessor {
return &BlockProcessor{
db: db,
client: client,
chainID: chainID,
}
}
// ProcessBlock processes a block and stores it in the database
func (bp *BlockProcessor) ProcessBlock(ctx context.Context, block *types.Block) error {
tx, err := bp.db.Begin(ctx)
if err != nil {
return fmt.Errorf("failed to begin transaction: %w", err)
}
defer tx.Rollback(ctx)
// Insert block
if err := bp.insertBlock(ctx, tx, block); err != nil {
return fmt.Errorf("failed to insert block: %w", err)
}
// Process transactions
for i, txData := range block.Transactions() {
if err := bp.processTransaction(ctx, tx, block, txData, i); err != nil {
return fmt.Errorf("failed to process transaction: %w", err)
}
}
return tx.Commit(ctx)
}
// insertBlock inserts a block into the database
func (bp *BlockProcessor) insertBlock(ctx context.Context, tx pgx.Tx, block *types.Block) error {
query := `
INSERT INTO blocks (
chain_id, number, hash, parent_hash, nonce, sha3_uncles,
logs_bloom, transactions_root, state_root, receipts_root,
miner, difficulty, total_difficulty, size, extra_data,
gas_limit, gas_used, timestamp, transaction_count, base_fee_per_gas
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20)
ON CONFLICT (chain_id, number) DO NOTHING
`
var nonce, sha3Uncles, difficulty, totalDifficulty sql.NullString
if block.Header().Nonce.Uint64() > 0 {
nonce.String = fmt.Sprintf("0x%x", block.Header().Nonce.Uint64())
nonce.Valid = true
}
if len(block.Header().UncleHash.Bytes()) > 0 {
sha3Uncles.String = block.Header().UncleHash.Hex()
sha3Uncles.Valid = true
}
if block.Header().Difficulty != nil {
difficulty.String = block.Header().Difficulty.String()
difficulty.Valid = true
totalDifficulty.String = block.Header().Difficulty.String() // Simplified
totalDifficulty.Valid = true
}
var baseFeePerGas sql.NullInt64
if block.Header().BaseFee != nil {
baseFeePerGas.Int64 = block.Header().BaseFee.Int64()
baseFeePerGas.Valid = true
}
_, err := tx.Exec(ctx, query,
bp.chainID,
block.Number().Int64(),
block.Hash().Hex(),
block.ParentHash().Hex(),
nonce,
sha3Uncles,
block.Header().Bloom.Big().String(),
block.Header().TxHash.Hex(),
block.Header().Root.Hex(),
block.Header().ReceiptHash.Hex(),
block.Coinbase().Hex(),
difficulty,
totalDifficulty,
int64(block.Size()),
fmt.Sprintf("0x%x", block.Header().Extra),
block.Header().GasLimit,
block.Header().GasUsed,
time.Unix(int64(block.Header().Time), 0),
len(block.Transactions()),
baseFeePerGas,
)
return err
}
// processTransaction processes a transaction and stores it
func (bp *BlockProcessor) processTransaction(ctx context.Context, tx pgx.Tx, block *types.Block, txData *types.Transaction, index int) error {
// Get receipt
receipt, err := bp.getReceipt(ctx, txData.Hash())
if err != nil {
return fmt.Errorf("failed to get receipt: %w", err)
}
query := `
INSERT INTO transactions (
chain_id, hash, block_number, block_hash, transaction_index,
from_address, to_address, value, gas_price, max_fee_per_gas,
max_priority_fee_per_gas, gas_limit, gas_used, nonce,
input_data, status, contract_address, cumulative_gas_used,
effective_gas_price
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19)
ON CONFLICT (chain_id, hash) DO NOTHING
`
from, _ := types.Sender(types.LatestSignerForChainID(txData.ChainId()), txData)
var toAddress sql.NullString
if txData.To() != nil {
toAddress.String = txData.To().Hex()
toAddress.Valid = true
}
var maxFeePerGas, maxPriorityFeePerGas sql.NullInt64
if txData.Type() == types.DynamicFeeTxType {
if txData.GasFeeCap() != nil {
maxFeePerGas.Int64 = txData.GasFeeCap().Int64()
maxFeePerGas.Valid = true
}
if txData.GasTipCap() != nil {
maxPriorityFeePerGas.Int64 = txData.GasTipCap().Int64()
maxPriorityFeePerGas.Valid = true
}
}
var contractAddress sql.NullString
if receipt != nil && receipt.ContractAddress != (common.Address{}) {
contractAddress.String = receipt.ContractAddress.Hex()
contractAddress.Valid = true
}
var status sql.NullInt64
if receipt != nil {
status.Int64 = int64(receipt.Status)
status.Valid = true
}
var effectiveGasPrice sql.NullInt64
if receipt != nil && receipt.EffectiveGasPrice != nil {
effectiveGasPrice.Int64 = receipt.EffectiveGasPrice.Int64()
effectiveGasPrice.Valid = true
}
_, err = tx.Exec(ctx, query,
bp.chainID,
txData.Hash().Hex(),
block.Number().Int64(),
block.Hash().Hex(),
index,
from.Hex(),
toAddress,
txData.Value().String(),
txData.GasPrice().Int64(),
maxFeePerGas,
maxPriorityFeePerGas,
txData.Gas(),
receipt.GasUsed,
txData.Nonce(),
fmt.Sprintf("0x%x", txData.Data()),
status,
contractAddress,
receipt.CumulativeGasUsed,
effectiveGasPrice,
)
if err != nil {
return err
}
// Process logs
return bp.processLogs(ctx, tx, block, txData, receipt)
}
// processLogs processes transaction logs
func (bp *BlockProcessor) processLogs(ctx context.Context, tx pgx.Tx, block *types.Block, txData *types.Transaction, receipt *types.Receipt) error {
if receipt == nil {
return nil
}
for i, log := range receipt.Logs {
query := `
INSERT INTO logs (
chain_id, transaction_hash, block_number, block_hash,
log_index, address, topic0, topic1, topic2, topic3, data
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
ON CONFLICT (chain_id, transaction_hash, log_index) DO NOTHING
`
var topics [4]sql.NullString
for j, topic := range log.Topics {
if j < 4 {
topics[j].String = topic.Hex()
topics[j].Valid = true
}
}
_, err := tx.Exec(ctx, query,
bp.chainID,
txData.Hash().Hex(),
block.Number().Int64(),
block.Hash().Hex(),
i,
log.Address.Hex(),
topics[0],
topics[1],
topics[2],
topics[3],
fmt.Sprintf("0x%x", log.Data),
)
if err != nil {
return err
}
}
return nil
}
// getReceipt gets a transaction receipt
func (bp *BlockProcessor) getReceipt(ctx context.Context, txHash common.Hash) (*types.Receipt, error) {
if bp.client == nil {
return nil, fmt.Errorf("RPC client not configured")
}
return bp.client.TransactionReceipt(ctx, txHash)
}

View File

@@ -0,0 +1,127 @@
package reorg
import (
"context"
"fmt"
"log"
"math/big"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/jackc/pgx/v5/pgxpool"
)
// ReorgHandler handles blockchain reorganizations
type ReorgHandler struct {
db *pgxpool.Pool
client *ethclient.Client
chainID int
}
// NewReorgHandler creates a new reorg handler
func NewReorgHandler(db *pgxpool.Pool, client *ethclient.Client, chainID int) *ReorgHandler {
return &ReorgHandler{
db: db,
client: client,
chainID: chainID,
}
}
// DetectReorg detects if a reorg has occurred
func (rh *ReorgHandler) DetectReorg(ctx context.Context, blockNumber int64) (bool, int64, error) {
// Get stored block hash
var storedHash string
query := `SELECT hash FROM blocks WHERE chain_id = $1 AND number = $2`
err := rh.db.QueryRow(ctx, query, rh.chainID, blockNumber).Scan(&storedHash)
if err != nil {
return false, 0, err
}
// Get current block hash from chain
block, err := rh.client.BlockByNumber(ctx, big.NewInt(blockNumber))
if err != nil {
return false, 0, err
}
currentHash := block.Hash().Hex()
// Compare hashes
if storedHash != currentHash {
// Reorg detected, find common ancestor
commonAncestor, err := rh.findCommonAncestor(ctx, blockNumber)
if err != nil {
return false, 0, err
}
return true, commonAncestor, nil
}
return false, 0, nil
}
// findCommonAncestor finds the common ancestor block
func (rh *ReorgHandler) findCommonAncestor(ctx context.Context, startBlock int64) (int64, error) {
// Binary search to find common ancestor
low := int64(0)
high := startBlock
for low <= high {
mid := (low + high) / 2
var storedHash string
err := rh.db.QueryRow(ctx,
`SELECT hash FROM blocks WHERE chain_id = $1 AND number = $2`,
rh.chainID, mid,
).Scan(&storedHash)
if err != nil {
high = mid - 1
continue
}
block, err := rh.client.BlockByNumber(ctx, big.NewInt(mid))
if err != nil {
high = mid - 1
continue
}
if storedHash == block.Hash().Hex() {
low = mid + 1
} else {
high = mid - 1
}
}
return high, nil
}
// HandleReorg handles a detected reorg
func (rh *ReorgHandler) HandleReorg(ctx context.Context, commonAncestor int64) error {
log.Printf("Handling reorg: common ancestor at block %d", commonAncestor)
tx, err := rh.db.Begin(ctx)
if err != nil {
return fmt.Errorf("failed to begin transaction: %w", err)
}
defer tx.Rollback(ctx)
// Mark blocks as orphaned
_, err = tx.Exec(ctx, `
UPDATE blocks
SET orphaned = true, orphaned_at = NOW()
WHERE chain_id = $1 AND number > $2 AND orphaned = false
`, rh.chainID, commonAncestor)
if err != nil {
return fmt.Errorf("failed to mark blocks as orphaned: %w", err)
}
// Delete orphaned data (cascade will handle related records)
_, err = tx.Exec(ctx, `
DELETE FROM blocks
WHERE chain_id = $1 AND number > $2 AND orphaned = true
`, rh.chainID, commonAncestor)
if err != nil {
return fmt.Errorf("failed to delete orphaned blocks: %w", err)
}
return tx.Commit(ctx)
}

View File

@@ -0,0 +1,180 @@
package tokens
import (
"context"
"fmt"
"math/big"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/jackc/pgx/v5/pgxpool"
)
// Extractor extracts token transfers from transaction logs
type Extractor struct {
db *pgxpool.Pool
chainID int
}
// NewExtractor creates a new token extractor
func NewExtractor(db *pgxpool.Pool, chainID int) *Extractor {
return &Extractor{
db: db,
chainID: chainID,
}
}
// ERC20 Transfer event signature: Transfer(address,address,uint256)
var ERC20TransferSignature = common.HexToHash("0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef")
// ERC721 Transfer event signature: Transfer(address,address,uint256)
var ERC721TransferSignature = ERC20TransferSignature
// ERC1155 TransferSingle event signature: TransferSingle(address,address,address,uint256,uint256)
var ERC1155TransferSingleSignature = common.HexToHash("0xc3d58168c5ae7397731d063d5bbf3d657854427343f4c083240f7aacaa2d0f62")
// ExtractTokenTransfers extracts token transfers from logs
func (e *Extractor) ExtractTokenTransfers(ctx context.Context, txHash common.Hash, blockNumber int64, logs []types.Log) error {
for i, log := range logs {
// Check for ERC20/ERC721 Transfer
if len(log.Topics) == 3 && log.Topics[0] == ERC20TransferSignature {
if err := e.extractERC20Transfer(ctx, txHash, blockNumber, i, log); err != nil {
return fmt.Errorf("failed to extract ERC20 transfer: %w", err)
}
}
// Check for ERC1155 TransferSingle
if len(log.Topics) == 4 && log.Topics[0] == ERC1155TransferSingleSignature {
if err := e.extractERC1155Transfer(ctx, txHash, blockNumber, i, log); err != nil {
return fmt.Errorf("failed to extract ERC1155 transfer: %w", err)
}
}
}
return nil
}
// extractERC20Transfer extracts ERC20 transfer
func (e *Extractor) extractERC20Transfer(ctx context.Context, txHash common.Hash, blockNumber int64, logIndex int, log types.Log) error {
if len(log.Topics) != 3 || len(log.Data) != 32 {
return fmt.Errorf("invalid ERC20 transfer log")
}
from := common.BytesToAddress(log.Topics[1].Bytes())
to := common.BytesToAddress(log.Topics[2].Bytes())
amount := new(big.Int).SetBytes(log.Data)
// Determine token type (ERC20 or ERC721)
tokenType := "ERC20"
if len(log.Data) == 0 {
tokenType = "ERC721"
}
query := `
INSERT INTO token_transfers (
chain_id, transaction_hash, block_number, log_index,
token_address, token_type, from_address, to_address, amount
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
ON CONFLICT (chain_id, transaction_hash, log_index) DO NOTHING
`
_, err := e.db.Exec(ctx, query,
e.chainID,
txHash.Hex(),
blockNumber,
logIndex,
log.Address.Hex(),
tokenType,
from.Hex(),
to.Hex(),
amount.String(),
)
if err != nil {
return err
}
// Update token holder count
return e.updateTokenStats(ctx, log.Address)
}
// extractERC1155Transfer extracts ERC1155 transfer
func (e *Extractor) extractERC1155Transfer(ctx context.Context, txHash common.Hash, blockNumber int64, logIndex int, log types.Log) error {
if len(log.Topics) != 4 || len(log.Data) != 64 {
return fmt.Errorf("invalid ERC1155 transfer log")
}
operator := common.BytesToAddress(log.Topics[1].Bytes())
from := common.BytesToAddress(log.Topics[2].Bytes())
to := common.BytesToAddress(log.Topics[3].Bytes())
tokenID := new(big.Int).SetBytes(log.Data[:32])
amount := new(big.Int).SetBytes(log.Data[32:])
query := `
INSERT INTO token_transfers (
chain_id, transaction_hash, block_number, log_index,
token_address, token_type, from_address, to_address, amount, token_id, operator
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
ON CONFLICT (chain_id, transaction_hash, log_index) DO NOTHING
`
_, err := e.db.Exec(ctx, query,
e.chainID,
txHash.Hex(),
blockNumber,
logIndex,
log.Address.Hex(),
"ERC1155",
from.Hex(),
to.Hex(),
amount.String(),
tokenID.String(),
operator.Hex(),
)
if err != nil {
return err
}
return e.updateTokenStats(ctx, log.Address)
}
// updateTokenStats updates token statistics
func (e *Extractor) updateTokenStats(ctx context.Context, tokenAddress common.Address) error {
// Count holders
var holderCount int
err := e.db.QueryRow(ctx, `
SELECT COUNT(DISTINCT to_address)
FROM token_transfers
WHERE chain_id = $1 AND token_address = $2
`, e.chainID, tokenAddress.Hex()).Scan(&holderCount)
if err != nil {
holderCount = 0
}
// Count transfers
var transferCount int
err = e.db.QueryRow(ctx, `
SELECT COUNT(*)
FROM token_transfers
WHERE chain_id = $1 AND token_address = $2
`, e.chainID, tokenAddress.Hex()).Scan(&transferCount)
if err != nil {
transferCount = 0
}
// Update token
query := `
INSERT INTO tokens (chain_id, address, type, holder_count, transfer_count)
VALUES ($1, $2, 'ERC20', $3, $4)
ON CONFLICT (chain_id, address) DO UPDATE SET
holder_count = $3,
transfer_count = $4,
updated_at = NOW()
`
_, err = e.db.Exec(ctx, query, e.chainID, tokenAddress.Hex(), holderCount, transferCount)
return err
}

View File

@@ -0,0 +1,112 @@
package traces
import (
"context"
"encoding/json"
"fmt"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/jackc/pgx/v5/pgxpool"
)
// Tracer extracts and stores transaction traces
type Tracer struct {
db *pgxpool.Pool
client *ethclient.Client
chainID int
}
// NewTracer creates a new tracer
func NewTracer(db *pgxpool.Pool, client *ethclient.Client, chainID int) *Tracer {
return &Tracer{
db: db,
client: client,
chainID: chainID,
}
}
// Trace represents a transaction trace
type Trace struct {
TransactionHash common.Hash
BlockNumber int64
Traces []CallTrace
}
// CallTrace represents a single call in a trace
type CallTrace struct {
Type string
From common.Address
To common.Address
Value string
Gas uint64
GasUsed uint64
Input string
Output string
Error string
Calls []CallTrace
}
// ExtractTrace extracts trace for a transaction
func (t *Tracer) ExtractTrace(ctx context.Context, txHash common.Hash, blockNumber int64) error {
// Call trace_block RPC method
var result json.RawMessage
err := t.client.Client().CallContext(ctx, &result, "debug_traceTransaction", txHash.Hex(), map[string]interface{}{
"tracer": "callTracer",
"tracerConfig": map[string]interface{}{
"withLog": true,
},
})
if err != nil {
return fmt.Errorf("failed to trace transaction: %w", err)
}
// Parse trace
var trace CallTrace
if err := json.Unmarshal(result, &trace); err != nil {
return fmt.Errorf("failed to parse trace: %w", err)
}
// Store trace
return t.storeTrace(ctx, txHash, blockNumber, trace)
}
// storeTrace stores trace in database
func (t *Tracer) storeTrace(ctx context.Context, txHash common.Hash, blockNumber int64, trace CallTrace) error {
// Create traces table if it doesn't exist
// For now, store as JSONB
query := `
CREATE TABLE IF NOT EXISTS traces (
chain_id INTEGER NOT NULL,
transaction_hash VARCHAR(66) NOT NULL,
block_number BIGINT NOT NULL,
trace_data JSONB NOT NULL,
created_at TIMESTAMP DEFAULT NOW(),
PRIMARY KEY (chain_id, transaction_hash)
) PARTITION BY LIST (chain_id)
`
_, err := t.db.Exec(ctx, query)
if err != nil {
// Table might already exist
}
// Insert trace
insertQuery := `
INSERT INTO traces (chain_id, transaction_hash, block_number, trace_data)
VALUES ($1, $2, $3, $4)
ON CONFLICT (chain_id, transaction_hash) DO UPDATE SET
trace_data = $4,
created_at = NOW()
`
traceJSON, err := json.Marshal(trace)
if err != nil {
return fmt.Errorf("failed to marshal trace: %w", err)
}
_, err = t.db.Exec(ctx, insertQuery, t.chainID, txHash.Hex(), blockNumber, traceJSON)
return err
}

View File

@@ -0,0 +1,98 @@
package track2
import (
"context"
"fmt"
"math/big"
"time"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/jackc/pgx/v5/pgxpool"
)
// BlockIndexer indexes blocks for Track 2
type BlockIndexer struct {
db *pgxpool.Pool
client *ethclient.Client
chainID int
}
// NewBlockIndexer creates a new block indexer
func NewBlockIndexer(db *pgxpool.Pool, client *ethclient.Client, chainID int) *BlockIndexer {
return &BlockIndexer{
db: db,
client: client,
chainID: chainID,
}
}
// IndexBlock indexes a single block
func (bi *BlockIndexer) IndexBlock(ctx context.Context, blockNumber uint64) error {
block, err := bi.client.BlockByNumber(ctx, big.NewInt(int64(blockNumber)))
if err != nil {
return fmt.Errorf("failed to get block: %w", err)
}
// Check if block already indexed
var exists bool
checkQuery := `SELECT EXISTS(SELECT 1 FROM blocks WHERE chain_id = $1 AND number = $2)`
err = bi.db.QueryRow(ctx, checkQuery, bi.chainID, blockNumber).Scan(&exists)
if err != nil {
return fmt.Errorf("failed to check block existence: %w", err)
}
if exists {
return nil // Already indexed
}
// Insert block
insertQuery := `
INSERT INTO blocks (
chain_id, number, hash, parent_hash, miner, difficulty, total_difficulty,
size, gas_limit, gas_used, timestamp, transaction_count, base_fee_per_gas
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)
ON CONFLICT (chain_id, number) DO NOTHING
`
_, err = bi.db.Exec(ctx, insertQuery,
bi.chainID,
block.Number().Int64(),
block.Hash().Hex(),
block.ParentHash().Hex(),
block.Coinbase().Hex(),
block.Difficulty().String(),
"0", // total_difficulty
block.Size(),
block.GasLimit(),
block.GasUsed(),
time.Unix(int64(block.Time()), 0),
len(block.Transactions()),
block.BaseFee(),
)
if err != nil {
return fmt.Errorf("failed to insert block: %w", err)
}
return nil
}
// IndexLatestBlocks indexes the latest N blocks
func (bi *BlockIndexer) IndexLatestBlocks(ctx context.Context, count int) error {
header, err := bi.client.HeaderByNumber(ctx, nil)
if err != nil {
return fmt.Errorf("failed to get latest header: %w", err)
}
latestBlock := header.Number.Uint64()
for i := 0; i < count && latestBlock-uint64(i) >= 0; i++ {
blockNum := latestBlock - uint64(i)
if err := bi.IndexBlock(ctx, blockNum); err != nil {
// Log error but continue
fmt.Printf("Failed to index block %d: %v\n", blockNum, err)
}
}
return nil
}

View File

@@ -0,0 +1,136 @@
package track2
import (
"context"
"fmt"
"math/big"
"strings"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/jackc/pgx/v5/pgxpool"
)
// TokenIndexer indexes ERC-20 token transfers for Track 2
type TokenIndexer struct {
db *pgxpool.Pool
client *ethclient.Client
chainID int
}
// NewTokenIndexer creates a new token indexer
func NewTokenIndexer(db *pgxpool.Pool, client *ethclient.Client, chainID int) *TokenIndexer {
return &TokenIndexer{
db: db,
client: client,
chainID: chainID,
}
}
// ERC20TransferEventSignature is the signature for ERC-20 Transfer event
const ERC20TransferEventSignature = "Transfer(address,address,uint256)"
// IndexTokenTransfers indexes token transfers from a transaction receipt
func (ti *TokenIndexer) IndexTokenTransfers(ctx context.Context, receipt *types.Receipt, blockNumber uint64, blockHash common.Hash, timestamp time.Time) error {
// Parse Transfer event signature
transferEventSig := common.HexToHash("0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef") // keccak256("Transfer(address,address,uint256)")
for _, log := range receipt.Logs {
// Check if this is a Transfer event
if len(log.Topics) != 3 || log.Topics[0] != transferEventSig {
continue
}
// Extract token contract, from, to, and value
tokenContract := log.Address.Hex()
from := common.BytesToAddress(log.Topics[1].Bytes()).Hex()
to := common.BytesToAddress(log.Topics[2].Bytes()).Hex()
// Decode value from data
value := new(big.Int).SetBytes(log.Data)
// Insert token transfer
insertQuery := `
INSERT INTO token_transfers (
chain_id, transaction_hash, log_index, block_number, block_hash,
timestamp, token_contract, from_address, to_address, value
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
ON CONFLICT (chain_id, transaction_hash, log_index) DO NOTHING
`
_, err := ti.db.Exec(ctx, insertQuery,
ti.chainID,
receipt.TxHash.Hex(),
log.Index,
blockNumber,
blockHash.Hex(),
timestamp,
tokenContract,
from,
to,
value.String(),
)
if err != nil {
return fmt.Errorf("failed to insert token transfer: %w", err)
}
// Update token balances
ti.updateTokenBalances(ctx, tokenContract, from, to, value)
}
return nil
}
// updateTokenBalances updates token balances for addresses
func (ti *TokenIndexer) updateTokenBalances(ctx context.Context, tokenContract, from, to string, value *big.Int) {
// Decrease from balance
if from != "" && from != "0x0000000000000000000000000000000000000000" {
updateFromQuery := `
INSERT INTO token_balances (address, token_contract, chain_id, balance, last_updated_timestamp)
VALUES ($1, $2, $3, 0, NOW())
ON CONFLICT (address, token_contract, chain_id) DO UPDATE SET
balance = GREATEST(0, token_balances.balance - $4::numeric),
last_updated_timestamp = NOW(),
updated_at = NOW()
`
ti.db.Exec(ctx, updateFromQuery, strings.ToLower(from), strings.ToLower(tokenContract), ti.chainID, value.String())
}
// Increase to balance
if to != "" && to != "0x0000000000000000000000000000000000000000" {
updateToQuery := `
INSERT INTO token_balances (address, token_contract, chain_id, balance, last_updated_timestamp)
VALUES ($1, $2, $3, $4, NOW())
ON CONFLICT (address, token_contract, chain_id) DO UPDATE SET
balance = token_balances.balance + $4::numeric,
last_updated_timestamp = NOW(),
updated_at = NOW()
`
ti.db.Exec(ctx, updateToQuery, strings.ToLower(to), strings.ToLower(tokenContract), ti.chainID, value.String())
}
}
// IndexBlockTokenTransfers indexes all token transfers in a block
func (ti *TokenIndexer) IndexBlockTokenTransfers(ctx context.Context, blockNumber uint64) error {
block, err := ti.client.BlockByNumber(ctx, big.NewInt(int64(blockNumber)))
if err != nil {
return fmt.Errorf("failed to get block: %w", err)
}
for _, tx := range block.Transactions() {
receipt, err := ti.client.TransactionReceipt(ctx, tx.Hash())
if err != nil {
continue
}
if err := ti.IndexTokenTransfers(ctx, receipt, blockNumber, block.Hash(), time.Unix(int64(block.Time()), 0)); err != nil {
fmt.Printf("Failed to index token transfers for tx %s: %v\n", tx.Hash().Hex(), err)
}
}
return nil
}

View File

@@ -0,0 +1,164 @@
package track2
import (
"context"
"fmt"
"math/big"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/jackc/pgx/v5/pgxpool"
)
// TransactionIndexer indexes transactions for Track 2
type TransactionIndexer struct {
db *pgxpool.Pool
client *ethclient.Client
chainID int
}
// NewTransactionIndexer creates a new transaction indexer
func NewTransactionIndexer(db *pgxpool.Pool, client *ethclient.Client, chainID int) *TransactionIndexer {
return &TransactionIndexer{
db: db,
client: client,
chainID: chainID,
}
}
// IndexTransaction indexes a single transaction
func (ti *TransactionIndexer) IndexTransaction(ctx context.Context, txHash common.Hash, blockNumber uint64, txIndex uint) error {
// Check if transaction already indexed
var exists bool
checkQuery := `SELECT EXISTS(SELECT 1 FROM transactions WHERE chain_id = $1 AND hash = $2)`
err := ti.db.QueryRow(ctx, checkQuery, ti.chainID, txHash.Hex()).Scan(&exists)
if err != nil {
return fmt.Errorf("failed to check transaction existence: %w", err)
}
if exists {
return nil // Already indexed
}
// Get transaction receipt
receipt, err := ti.client.TransactionReceipt(ctx, txHash)
if err != nil {
return fmt.Errorf("failed to get transaction receipt: %w", err)
}
// Get transaction
tx, _, err := ti.client.TransactionByHash(ctx, txHash)
if err != nil {
return fmt.Errorf("failed to get transaction: %w", err)
}
// Get block for timestamp
block, err := ti.client.BlockByNumber(ctx, big.NewInt(int64(blockNumber)))
if err != nil {
return fmt.Errorf("failed to get block: %w", err)
}
// Determine status
status := "success"
if receipt.Status == 0 {
status = "failed"
}
// Get sender address
signer := types.NewEIP155Signer(big.NewInt(int64(ti.chainID)))
fromAddr, err := types.Sender(signer, tx)
if err != nil {
return fmt.Errorf("failed to get sender address: %w", err)
}
// Insert transaction
insertQuery := `
INSERT INTO transactions (
chain_id, hash, block_number, block_hash, transaction_index,
from_address, to_address, value, gas, gas_price, gas_used,
cumulative_gas_used, status, timestamp
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)
ON CONFLICT (chain_id, hash) DO NOTHING
`
toAddr := ""
if tx.To() != nil {
toAddr = tx.To().Hex()
}
_, err = ti.db.Exec(ctx, insertQuery,
ti.chainID,
txHash.Hex(),
blockNumber,
block.Hash().Hex(),
txIndex,
fromAddr.Hex(),
toAddr,
tx.Value().String(),
tx.Gas(),
tx.GasPrice().String(),
receipt.GasUsed,
receipt.CumulativeGasUsed,
status,
time.Unix(int64(block.Time()), 0),
)
if err != nil {
return fmt.Errorf("failed to insert transaction: %w", err)
}
// Update address statistics
ti.updateAddressStats(ctx, fromAddr.Hex(), toAddr, tx.Value())
return nil
}
// IndexBlockTransactions indexes all transactions in a block
func (ti *TransactionIndexer) IndexBlockTransactions(ctx context.Context, blockNumber uint64) error {
block, err := ti.client.BlockByNumber(ctx, big.NewInt(int64(blockNumber)))
if err != nil {
return fmt.Errorf("failed to get block: %w", err)
}
for i, tx := range block.Transactions() {
if err := ti.IndexTransaction(ctx, tx.Hash(), blockNumber, uint(i)); err != nil {
fmt.Printf("Failed to index transaction %s: %v\n", tx.Hash().Hex(), err)
}
}
return nil
}
// updateAddressStats updates address statistics
func (ti *TransactionIndexer) updateAddressStats(ctx context.Context, from, to string, value *big.Int) {
// Update from address
if from != "" {
updateQuery := `
INSERT INTO addresses (address, chain_id, tx_count_sent, total_sent_wei, first_seen_timestamp, last_seen_timestamp)
VALUES ($1, $2, 1, $3, NOW(), NOW())
ON CONFLICT (address) DO UPDATE SET
tx_count_sent = addresses.tx_count_sent + 1,
total_sent_wei = addresses.total_sent_wei + $3::numeric,
last_seen_timestamp = NOW(),
updated_at = NOW()
`
ti.db.Exec(ctx, updateQuery, from, ti.chainID, value.String())
}
// Update to address
if to != "" {
updateQuery := `
INSERT INTO addresses (address, chain_id, tx_count_received, total_received_wei, first_seen_timestamp, last_seen_timestamp)
VALUES ($1, $2, 1, $3, NOW(), NOW())
ON CONFLICT (address) DO UPDATE SET
tx_count_received = addresses.tx_count_received + 1,
total_received_wei = addresses.total_received_wei + $3::numeric,
last_seen_timestamp = NOW(),
updated_at = NOW()
`
ti.db.Exec(ctx, updateQuery, to, ti.chainID, value.String())
}
}