120 lines
2.8 KiB
Go
120 lines
2.8 KiB
Go
package analytics
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"time"
|
|
|
|
"github.com/jackc/pgx/v5/pgxpool"
|
|
)
|
|
|
|
// FlowTracker tracks address-to-address flows
|
|
type FlowTracker struct {
|
|
db *pgxpool.Pool
|
|
chainID int
|
|
}
|
|
|
|
// NewFlowTracker creates a new flow tracker
|
|
func NewFlowTracker(db *pgxpool.Pool, chainID int) *FlowTracker {
|
|
return &FlowTracker{
|
|
db: db,
|
|
chainID: chainID,
|
|
}
|
|
}
|
|
|
|
// Flow represents a flow between addresses
|
|
type Flow struct {
|
|
From string
|
|
To string
|
|
Token string
|
|
Amount string
|
|
Count int
|
|
FirstSeen time.Time
|
|
LastSeen time.Time
|
|
}
|
|
|
|
// TrackFlow tracks a flow between addresses
|
|
func (ft *FlowTracker) TrackFlow(ctx context.Context, from, to, token string, amount string) error {
|
|
query := `
|
|
INSERT INTO analytics_flows (
|
|
chain_id, from_address, to_address, token_contract,
|
|
total_amount, transfer_count, first_seen, last_seen
|
|
) VALUES ($1, $2, $3, $4, $5, 1, NOW(), NOW())
|
|
ON CONFLICT (chain_id, from_address, to_address, token_contract) DO UPDATE SET
|
|
total_amount = analytics_flows.total_amount + $5::numeric,
|
|
transfer_count = analytics_flows.transfer_count + 1,
|
|
last_seen = NOW(),
|
|
updated_at = NOW()
|
|
`
|
|
|
|
_, err := ft.db.Exec(ctx, query, ft.chainID, from, to, token, amount)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to track flow: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// GetFlows gets flows matching criteria
|
|
func (ft *FlowTracker) GetFlows(ctx context.Context, from, to, token string, startDate, endDate *time.Time, limit int) ([]Flow, error) {
|
|
query := `
|
|
SELECT from_address, to_address, token_contract, total_amount, transfer_count, first_seen, last_seen
|
|
FROM analytics_flows
|
|
WHERE chain_id = $1
|
|
`
|
|
|
|
args := []interface{}{ft.chainID}
|
|
argIndex := 2
|
|
|
|
if from != "" {
|
|
query += fmt.Sprintf(" AND from_address = $%d", argIndex)
|
|
args = append(args, from)
|
|
argIndex++
|
|
}
|
|
|
|
if to != "" {
|
|
query += fmt.Sprintf(" AND to_address = $%d", argIndex)
|
|
args = append(args, to)
|
|
argIndex++
|
|
}
|
|
|
|
if token != "" {
|
|
query += fmt.Sprintf(" AND token_contract = $%d", argIndex)
|
|
args = append(args, token)
|
|
argIndex++
|
|
}
|
|
|
|
if startDate != nil {
|
|
query += fmt.Sprintf(" AND last_seen >= $%d", argIndex)
|
|
args = append(args, *startDate)
|
|
argIndex++
|
|
}
|
|
|
|
if endDate != nil {
|
|
query += fmt.Sprintf(" AND last_seen <= $%d", argIndex)
|
|
args = append(args, *endDate)
|
|
argIndex++
|
|
}
|
|
|
|
query += " ORDER BY last_seen DESC LIMIT $" + fmt.Sprintf("%d", argIndex)
|
|
args = append(args, limit)
|
|
|
|
rows, err := ft.db.Query(ctx, query, args...)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to query flows: %w", err)
|
|
}
|
|
defer rows.Close()
|
|
|
|
flows := []Flow{}
|
|
for rows.Next() {
|
|
var f Flow
|
|
if err := rows.Scan(&f.From, &f.To, &f.Token, &f.Amount, &f.Count, &f.FirstSeen, &f.LastSeen); err != nil {
|
|
continue
|
|
}
|
|
flows = append(flows, f)
|
|
}
|
|
|
|
return flows, nil
|
|
}
|
|
|