package rest import ( "bytes" "context" "encoding/json" "fmt" "io" "log" "net/http" "net/url" "os" "path/filepath" "regexp" "strings" "sync" "sync/atomic" "time" ) var ( hexAddrRe = regexp.MustCompile(`(?i)^0x[0-9a-f]{40}$`) hexTxRe = regexp.MustCompile(`(?i)^0x[0-9a-f]{64}$`) ) type liquidityCacheEntry struct { body []byte until time.Time ctype string } var liquidityPoolsCache sync.Map // string -> liquidityCacheEntry var missionControlMetrics struct { liquidityCacheHits uint64 liquidityCacheMisses uint64 liquidityUpstreamFailure uint64 bridgeTraceRequests uint64 bridgeTraceFailures uint64 } func tokenAggregationBase() string { for _, k := range []string{"TOKEN_AGGREGATION_BASE_URL", "TOKEN_AGGREGATION_URL"} { if u := strings.TrimSpace(os.Getenv(k)); u != "" { return strings.TrimRight(u, "/") } } return "" } func looksLikeGenericUpstreamErrorPayload(body []byte) bool { if len(bytes.TrimSpace(body)) == 0 { return false } var payload map[string]any if err := json.Unmarshal(body, &payload); err != nil { return false } errValue, ok := payload["error"].(string) if !ok || strings.TrimSpace(errValue) == "" { return false } if _, ok := payload["pools"]; ok { return false } if _, ok := payload["tokens"]; ok { return false } if _, ok := payload["data"]; ok { return false } if _, ok := payload["chains"]; ok { return false } if _, ok := payload["tree"]; ok { return false } if _, ok := payload["quote"]; ok { return false } if status, ok := payload["status"].(string); ok && strings.EqualFold(status, "healthy") { return false } return true } func blockscoutInternalBase() string { u := strings.TrimSpace(os.Getenv("BLOCKSCOUT_INTERNAL_URL")) if u == "" { u = "http://127.0.0.1:4000" } return strings.TrimRight(u, "/") } func missionControlChainID() string { if s := strings.TrimSpace(os.Getenv("CHAIN_ID")); s != "" { return s } return "138" } func rpcURL() string { if s := strings.TrimSpace(os.Getenv("RPC_URL")); s != "" { return s } return "" } // handleMissionControlLiquidityTokenPath serves GET .../mission-control/liquidity/token/{addr}/pools (cached proxy to token-aggregation). func (s *Server) handleMissionControlLiquidityTokenPath(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodGet { writeMethodNotAllowed(w) return } rest := strings.TrimPrefix(r.URL.Path, "/api/v1/mission-control/liquidity/token/") rest = strings.Trim(rest, "/") parts := strings.Split(rest, "/") if len(parts) < 2 || parts[1] != "pools" { writeError(w, http.StatusNotFound, "not_found", "expected /liquidity/token/{address}/pools") return } addr := strings.TrimSpace(parts[0]) if !hexAddrRe.MatchString(addr) { writeError(w, http.StatusBadRequest, "bad_request", "invalid token address") return } base := tokenAggregationBase() if base == "" { writeError(w, http.StatusServiceUnavailable, "service_unavailable", "TOKEN_AGGREGATION_BASE_URL not configured") return } chain := missionControlChainID() cacheKey := strings.ToLower(addr) + "|" + chain bypassCache := r.URL.Query().Get("refresh") == "1" || r.URL.Query().Get("noCache") == "1" || strings.Contains(strings.ToLower(r.Header.Get("Cache-Control")), "no-cache") || strings.Contains(strings.ToLower(r.Header.Get("Cache-Control")), "no-store") if ent, ok := liquidityPoolsCache.Load(cacheKey); ok && !bypassCache { e := ent.(liquidityCacheEntry) if time.Now().Before(e.until) { atomic.AddUint64(&missionControlMetrics.liquidityCacheHits, 1) w.Header().Set("X-Mission-Control-Cache", "hit") if e.ctype != "" { w.Header().Set("Content-Type", e.ctype) } else { w.Header().Set("Content-Type", "application/json") } w.WriteHeader(http.StatusOK) _, _ = w.Write(e.body) return } } atomic.AddUint64(&missionControlMetrics.liquidityCacheMisses, 1) if bypassCache { w.Header().Set("X-Mission-Control-Cache", "bypass") } else { w.Header().Set("X-Mission-Control-Cache", "miss") } up, err := url.Parse(base + "/api/v1/tokens/" + url.PathEscape(addr) + "/pools") if err != nil { writeInternalError(w, "bad upstream URL") return } q := up.Query() q.Set("chainId", chain) up.RawQuery = q.Encode() ctx, cancel := context.WithTimeout(r.Context(), 25*time.Second) defer cancel() req, err := http.NewRequestWithContext(ctx, http.MethodGet, up.String(), nil) if err != nil { writeInternalError(w, err.Error()) return } resp, err := http.DefaultClient.Do(req) if err != nil { atomic.AddUint64(&missionControlMetrics.liquidityUpstreamFailure, 1) log.Printf("mission_control liquidity_proxy addr=%s chain=%s cache=miss upstream_error=%v", strings.ToLower(addr), chain, err) writeError(w, http.StatusBadGateway, "bad_gateway", err.Error()) return } defer resp.Body.Close() body, err := io.ReadAll(io.LimitReader(resp.Body, 4<<20)) if err != nil { atomic.AddUint64(&missionControlMetrics.liquidityUpstreamFailure, 1) log.Printf("mission_control liquidity_proxy addr=%s chain=%s cache=miss read_error=%v", strings.ToLower(addr), chain, err) writeError(w, http.StatusBadGateway, "bad_gateway", "read upstream body failed") return } ctype := resp.Header.Get("Content-Type") if ctype == "" { ctype = "application/json" } isGenericSuccessError := resp.StatusCode >= 200 && resp.StatusCode < 300 && looksLikeGenericUpstreamErrorPayload(body) if isGenericSuccessError { atomic.AddUint64(&missionControlMetrics.liquidityUpstreamFailure, 1) log.Printf("mission_control liquidity_proxy addr=%s chain=%s cache=miss upstream_status=%d generic_error_envelope=true", strings.ToLower(addr), chain, resp.StatusCode) w.Header().Set("Content-Type", ctype) w.WriteHeader(http.StatusBadGateway) _, _ = w.Write(body) return } if resp.StatusCode == http.StatusOK { liquidityPoolsCache.Store(cacheKey, liquidityCacheEntry{ body: body, until: time.Now().Add(30 * time.Second), ctype: ctype, }) cacheMode := "miss" if bypassCache { cacheMode = "bypass-refresh" } log.Printf("mission_control liquidity_proxy addr=%s chain=%s cache=%s stored_ttl_sec=30", strings.ToLower(addr), chain, cacheMode) } else { atomic.AddUint64(&missionControlMetrics.liquidityUpstreamFailure, 1) log.Printf("mission_control liquidity_proxy addr=%s chain=%s cache=miss upstream_status=%d", strings.ToLower(addr), chain, resp.StatusCode) } w.Header().Set("Content-Type", ctype) w.WriteHeader(resp.StatusCode) _, _ = w.Write(body) } var ( registryOnce sync.Once registryAddrToKey map[string]string registryLoadErr error ) func firstReadableFile(paths []string) ([]byte, string, error) { for _, p := range paths { if strings.TrimSpace(p) == "" { continue } b, err := os.ReadFile(p) if err == nil && len(b) > 0 { return b, p, nil } } return nil, "", fmt.Errorf("no readable file found") } func loadAddressRegistry138() map[string]string { registryOnce.Do(func() { registryAddrToKey = make(map[string]string) var masterPaths []string if p := strings.TrimSpace(os.Getenv("SMART_CONTRACTS_MASTER_JSON")); p != "" { masterPaths = append(masterPaths, p) } masterPaths = append(masterPaths, "config/smart-contracts-master.json", "../config/smart-contracts-master.json", "../../config/smart-contracts-master.json", ) raw, masterPath, _ := firstReadableFile(masterPaths) if len(raw) == 0 { registryLoadErr = fmt.Errorf("smart-contracts-master.json not found") return } var root map[string]interface{} if err := json.Unmarshal(raw, &root); err != nil { registryLoadErr = err return } chains, _ := root["chains"].(map[string]interface{}) c138, _ := chains["138"].(map[string]interface{}) contracts, _ := c138["contracts"].(map[string]interface{}) for k, v := range contracts { s, ok := v.(string) if !ok || !hexAddrRe.MatchString(s) { continue } registryAddrToKey[strings.ToLower(s)] = k } var inventoryPaths []string if p := strings.TrimSpace(os.Getenv("EXPLORER_ADDRESS_INVENTORY_FILE")); p != "" { inventoryPaths = append(inventoryPaths, p) } if masterPath != "" { inventoryPaths = append(inventoryPaths, filepath.Join(filepath.Dir(masterPath), "address-inventory.json")) } inventoryPaths = append(inventoryPaths, "explorer-monorepo/config/address-inventory.json", "config/address-inventory.json", "../config/address-inventory.json", "../../config/address-inventory.json", ) inventoryRaw, _, invErr := firstReadableFile(inventoryPaths) if invErr != nil || len(inventoryRaw) == 0 { return } var inventoryRoot struct { Inventory map[string]string `json:"inventory"` } if err := json.Unmarshal(inventoryRaw, &inventoryRoot); err != nil { return } for k, v := range inventoryRoot.Inventory { if !hexAddrRe.MatchString(v) { continue } addr := strings.ToLower(v) if _, exists := registryAddrToKey[addr]; exists { continue } registryAddrToKey[addr] = k } }) return registryAddrToKey } func jsonStringField(m map[string]interface{}, keys ...string) string { for _, k := range keys { if v, ok := m[k].(string); ok && v != "" { return v } } return "" } func extractEthAddress(val interface{}) string { switch t := val.(type) { case string: if hexAddrRe.MatchString(strings.TrimSpace(t)) { return strings.ToLower(strings.TrimSpace(t)) } case map[string]interface{}: if h := jsonStringField(t, "hash", "address"); h != "" && hexAddrRe.MatchString(h) { return strings.ToLower(h) } } return "" } func fetchBlockscoutTransaction(ctx context.Context, tx string) ([]byte, int, error) { fetchURL := blockscoutInternalBase() + "/api/v2/transactions/" + url.PathEscape(tx) timeouts := []time.Duration{15 * time.Second, 25 * time.Second} var lastBody []byte var lastStatus int var lastErr error for idx, timeout := range timeouts { attemptCtx, cancel := context.WithTimeout(ctx, timeout) req, err := http.NewRequestWithContext(attemptCtx, http.MethodGet, fetchURL, nil) if err != nil { cancel() return nil, 0, err } resp, err := http.DefaultClient.Do(req) if err != nil { cancel() lastErr = err if idx == len(timeouts)-1 { return nil, 0, err } continue } body, readErr := io.ReadAll(io.LimitReader(resp.Body, 2<<20)) resp.Body.Close() cancel() if readErr != nil { lastErr = readErr if idx == len(timeouts)-1 { return nil, 0, readErr } continue } lastBody = body lastStatus = resp.StatusCode if resp.StatusCode == http.StatusOK { return body, resp.StatusCode, nil } if resp.StatusCode < 500 || idx == len(timeouts)-1 { return body, resp.StatusCode, nil } } return lastBody, lastStatus, lastErr } func fetchTransactionViaRPC(ctx context.Context, tx string) (string, string, error) { base := rpcURL() if base == "" { return "", "", fmt.Errorf("RPC_URL not configured") } payload, err := json.Marshal(map[string]interface{}{ "jsonrpc": "2.0", "id": 1, "method": "eth_getTransactionByHash", "params": []interface{}{tx}, }) if err != nil { return "", "", err } req, err := http.NewRequestWithContext(ctx, http.MethodPost, base, bytes.NewReader(payload)) if err != nil { return "", "", err } req.Header.Set("Content-Type", "application/json") resp, err := http.DefaultClient.Do(req) if err != nil { return "", "", err } defer resp.Body.Close() body, err := io.ReadAll(io.LimitReader(resp.Body, 1<<20)) if err != nil { return "", "", err } if resp.StatusCode != http.StatusOK { return "", "", fmt.Errorf("rpc HTTP %d", resp.StatusCode) } var rpcResp struct { Result map[string]interface{} `json:"result"` Error map[string]interface{} `json:"error"` } if err := json.Unmarshal(body, &rpcResp); err != nil { return "", "", err } if rpcResp.Error != nil { return "", "", fmt.Errorf("rpc error") } if rpcResp.Result == nil { return "", "", fmt.Errorf("transaction not found") } fromAddr := extractEthAddress(jsonStringField(rpcResp.Result, "from")) toAddr := extractEthAddress(jsonStringField(rpcResp.Result, "to")) if fromAddr == "" && toAddr == "" { return "", "", fmt.Errorf("transaction missing from/to") } return fromAddr, toAddr, nil } // HandleMissionControlBridgeTrace handles GET /api/v1/mission-control/bridge/trace?tx=0x... func (s *Server) HandleMissionControlBridgeTrace(w http.ResponseWriter, r *http.Request) { atomic.AddUint64(&missionControlMetrics.bridgeTraceRequests, 1) if r.Method != http.MethodGet { writeMethodNotAllowed(w) return } tx := strings.TrimSpace(r.URL.Query().Get("tx")) if tx == "" { writeError(w, http.StatusBadRequest, "bad_request", "missing tx query parameter") return } if !hexTxRe.MatchString(tx) { writeError(w, http.StatusBadRequest, "bad_request", "invalid transaction hash") return } reg := loadAddressRegistry138() publicBase := strings.TrimRight(strings.TrimSpace(os.Getenv("EXPLORER_PUBLIC_BASE")), "/") if publicBase == "" { publicBase = "https://explorer.d-bis.org" } fromAddr := "" toAddr := "" fromLabel := "" toLabel := "" source := "blockscout" body, statusCode, err := fetchBlockscoutTransaction(r.Context(), tx) if err == nil && statusCode == http.StatusOK { var txDoc map[string]interface{} if uerr := json.Unmarshal(body, &txDoc); uerr != nil { // Fall through to the RPC fallback below. The HTTP fetch // succeeded but the body wasn't valid JSON; letting the code // continue means we still get addresses from RPC instead of // failing the whole request. _ = uerr } else { fromAddr = extractEthAddress(txDoc["from"]) toAddr = extractEthAddress(txDoc["to"]) } } if fromAddr == "" && toAddr == "" { rpcFrom, rpcTo, rpcErr := fetchTransactionViaRPC(r.Context(), tx) if rpcErr == nil { fromAddr = rpcFrom toAddr = rpcTo source = "rpc_fallback" } else { atomic.AddUint64(&missionControlMetrics.bridgeTraceFailures, 1) if err != nil { log.Printf("mission_control bridge_trace tx=%s fetch_error=%v rpc_fallback_error=%v", strings.ToLower(tx), err, rpcErr) writeError(w, http.StatusBadGateway, "bad_gateway", err.Error()) return } log.Printf("mission_control bridge_trace tx=%s upstream_status=%d rpc_fallback_error=%v", strings.ToLower(tx), statusCode, rpcErr) writeError(w, http.StatusBadGateway, "blockscout_error", fmt.Sprintf("blockscout HTTP %d", statusCode)) return } } if fromAddr != "" { fromLabel = reg[fromAddr] } if toAddr != "" { toLabel = reg[toAddr] } out := map[string]interface{}{ "tx_hash": strings.ToLower(tx), "from": fromAddr, "from_registry": fromLabel, "to": toAddr, "to_registry": toLabel, "blockscout_url": publicBase + "/transactions/" + strings.ToLower(tx), "source": source, } if registryLoadErr != nil && len(reg) == 0 { out["registry_warning"] = registryLoadErr.Error() } log.Printf("mission_control bridge_trace tx=%s from=%s to=%s from_label=%s to_label=%s", strings.ToLower(tx), fromAddr, toAddr, fromLabel, toLabel) writeJSON(w, http.StatusOK, map[string]interface{}{"data": out}) }