Files
explorer-monorepo/backend/analytics/flow_tracker.go

120 lines
2.8 KiB
Go

package analytics
import (
"context"
"fmt"
"time"
"github.com/jackc/pgx/v5/pgxpool"
)
// FlowTracker tracks address-to-address flows
type FlowTracker struct {
db *pgxpool.Pool
chainID int
}
// NewFlowTracker creates a new flow tracker
func NewFlowTracker(db *pgxpool.Pool, chainID int) *FlowTracker {
return &FlowTracker{
db: db,
chainID: chainID,
}
}
// Flow represents a flow between addresses
type Flow struct {
From string
To string
Token string
Amount string
Count int
FirstSeen time.Time
LastSeen time.Time
}
// TrackFlow tracks a flow between addresses
func (ft *FlowTracker) TrackFlow(ctx context.Context, from, to, token string, amount string) error {
query := `
INSERT INTO analytics_flows (
chain_id, from_address, to_address, token_contract,
total_amount, transfer_count, first_seen, last_seen
) VALUES ($1, $2, $3, $4, $5, 1, NOW(), NOW())
ON CONFLICT (chain_id, from_address, to_address, token_contract) DO UPDATE SET
total_amount = analytics_flows.total_amount + $5::numeric,
transfer_count = analytics_flows.transfer_count + 1,
last_seen = NOW(),
updated_at = NOW()
`
_, err := ft.db.Exec(ctx, query, ft.chainID, from, to, token, amount)
if err != nil {
return fmt.Errorf("failed to track flow: %w", err)
}
return nil
}
// GetFlows gets flows matching criteria
func (ft *FlowTracker) GetFlows(ctx context.Context, from, to, token string, startDate, endDate *time.Time, limit int) ([]Flow, error) {
query := `
SELECT from_address, to_address, token_contract, total_amount, transfer_count, first_seen, last_seen
FROM analytics_flows
WHERE chain_id = $1
`
args := []interface{}{ft.chainID}
argIndex := 2
if from != "" {
query += fmt.Sprintf(" AND from_address = $%d", argIndex)
args = append(args, from)
argIndex++
}
if to != "" {
query += fmt.Sprintf(" AND to_address = $%d", argIndex)
args = append(args, to)
argIndex++
}
if token != "" {
query += fmt.Sprintf(" AND token_contract = $%d", argIndex)
args = append(args, token)
argIndex++
}
if startDate != nil {
query += fmt.Sprintf(" AND last_seen >= $%d", argIndex)
args = append(args, *startDate)
argIndex++
}
if endDate != nil {
query += fmt.Sprintf(" AND last_seen <= $%d", argIndex)
args = append(args, *endDate)
argIndex++
}
query += " ORDER BY last_seen DESC LIMIT $" + fmt.Sprintf("%d", argIndex)
args = append(args, limit)
rows, err := ft.db.Query(ctx, query, args...)
if err != nil {
return nil, fmt.Errorf("failed to query flows: %w", err)
}
defer rows.Close()
flows := []Flow{}
for rows.Next() {
var f Flow
if err := rows.Scan(&f.From, &f.To, &f.Token, &f.Amount, &f.Count, &f.FirstSeen, &f.LastSeen); err != nil {
continue
}
flows = append(flows, f)
}
return flows, nil
}