feat: explorer API, wallet, CCIP scripts, and config refresh

- Backend REST/gateway/track routes, analytics, Blockscout proxy paths.
- Frontend wallet and liquidity surfaces; MetaMask token list alignment.
- Deployment docs, verification scripts, address inventory updates.

Check: go build ./... under backend/ (pass).
Made-with: Cursor
This commit is contained in:
defiQUG
2026-04-07 23:22:12 -07:00
parent d931be8e19
commit 6eef6b07f6
224 changed files with 19671 additions and 3291 deletions

View File

@@ -0,0 +1,146 @@
package track1
import (
"context"
"os"
"strings"
"time"
)
func relaySnapshotStatus(relay map[string]interface{}) string {
if relay == nil {
return ""
}
if probe, ok := relay["url_probe"].(map[string]interface{}); ok {
if okValue, exists := probe["ok"].(bool); exists && !okValue {
return "down"
}
if body, ok := probe["body"].(map[string]interface{}); ok {
if status, ok := body["status"].(string); ok {
return strings.ToLower(strings.TrimSpace(status))
}
}
}
if _, ok := relay["file_snapshot_error"].(string); ok {
return "down"
}
if snapshot, ok := relay["file_snapshot"].(map[string]interface{}); ok {
if status, ok := snapshot["status"].(string); ok {
return strings.ToLower(strings.TrimSpace(status))
}
}
return ""
}
func relayNeedsAttention(relay map[string]interface{}) bool {
status := relaySnapshotStatus(relay)
switch status {
case "degraded", "stale", "stopped", "down":
return true
default:
return false
}
}
// BuildBridgeStatusData builds the inner `data` object for bridge/status and SSE payloads.
func (s *Server) BuildBridgeStatusData(ctx context.Context) map[string]interface{} {
rpc138 := strings.TrimSpace(os.Getenv("RPC_URL"))
if rpc138 == "" {
rpc138 = "http://localhost:8545"
}
var probes []RPCProbeResult
p138 := ProbeEVMJSONRPC(ctx, "chain-138", "138", rpc138)
probes = append(probes, p138)
if eth := strings.TrimSpace(os.Getenv("ETH_MAINNET_RPC_URL")); eth != "" {
probes = append(probes, ProbeEVMJSONRPC(ctx, "ethereum-mainnet", "1", eth))
}
for _, row := range ParseExtraRPCProbes() {
name, u, ck := row[0], row[1], row[2]
probes = append(probes, ProbeEVMJSONRPC(ctx, name, ck, u))
}
overall := "operational"
if !p138.OK {
overall = "degraded"
} else {
for _, p := range probes {
if !p.OK {
overall = "degraded"
break
}
}
}
now := time.Now().UTC().Format(time.RFC3339)
chains := map[string]interface{}{
"138": map[string]interface{}{
"name": "Defi Oracle Meta Mainnet",
"status": chainStatusFromProbe(p138),
"last_sync": now,
"latency_ms": p138.LatencyMs,
"head_age_sec": p138.HeadAgeSeconds,
"block_number": p138.BlockNumberDec,
"endpoint": p138.Endpoint,
"probe_error": p138.Error,
},
}
for _, p := range probes {
if p.ChainKey != "1" && p.Name != "ethereum-mainnet" {
continue
}
chains["1"] = map[string]interface{}{
"name": "Ethereum Mainnet",
"status": chainStatusFromProbe(p),
"last_sync": now,
"latency_ms": p.LatencyMs,
"head_age_sec": p.HeadAgeSeconds,
"block_number": p.BlockNumberDec,
"endpoint": p.Endpoint,
"probe_error": p.Error,
}
break
}
probeJSON := make([]map[string]interface{}, 0, len(probes))
for _, p := range probes {
probeJSON = append(probeJSON, map[string]interface{}{
"name": p.Name,
"chainKey": p.ChainKey,
"endpoint": p.Endpoint,
"ok": p.OK,
"latencyMs": p.LatencyMs,
"blockNumber": p.BlockNumber,
"blockNumberDec": p.BlockNumberDec,
"headAgeSeconds": p.HeadAgeSeconds,
"error": p.Error,
})
}
data := map[string]interface{}{
"status": overall,
"chains": chains,
"rpc_probe": probeJSON,
"checked_at": now,
}
if ov := readOptionalVerifyJSON(); ov != nil {
data["operator_verify"] = ov
}
if relays := FetchCCIPRelayHealths(ctx); relays != nil {
data["ccip_relays"] = relays
if ccip := primaryRelayHealth(relays); ccip != nil {
data["ccip_relay"] = ccip
}
for _, value := range relays {
relay, ok := value.(map[string]interface{})
if ok && relayNeedsAttention(relay) {
data["status"] = "degraded"
break
}
}
}
return data
}

View File

@@ -0,0 +1,182 @@
package track1
import (
"context"
"encoding/json"
"io"
"net/http"
"os"
"sort"
"strconv"
"strings"
"time"
)
type relayHealthTarget struct {
Name string
URL string
}
func fetchRelayHealthURL(ctx context.Context, u string) map[string]interface{} {
out := make(map[string]interface{})
c := &http.Client{Timeout: 4 * time.Second}
req, err := http.NewRequestWithContext(ctx, http.MethodGet, u, nil)
if err != nil {
out["url_probe"] = map[string]interface{}{"ok": false, "error": err.Error()}
} else {
resp, err := c.Do(req)
if err != nil {
out["url_probe"] = map[string]interface{}{"ok": false, "error": err.Error()}
} else {
func() {
defer resp.Body.Close()
b, _ := io.ReadAll(io.LimitReader(resp.Body, 256*1024))
ok := resp.StatusCode >= 200 && resp.StatusCode < 300
var j interface{}
if json.Unmarshal(b, &j) == nil {
out["url_probe"] = map[string]interface{}{"ok": ok, "status": resp.StatusCode, "body": j}
} else {
out["url_probe"] = map[string]interface{}{"ok": ok, "status": resp.StatusCode, "raw": string(b)}
}
}()
}
}
return out
}
func fetchRelayHealthFileSnapshot(p string) map[string]interface{} {
out := make(map[string]interface{})
if p != "" {
b, err := os.ReadFile(p)
if err != nil {
out["file_snapshot_error"] = err.Error()
} else if len(b) > 512*1024 {
out["file_snapshot_error"] = "file too large"
} else {
var j interface{}
if err := json.Unmarshal(b, &j); err != nil {
out["file_snapshot_error"] = err.Error()
} else {
out["file_snapshot"] = j
}
}
}
return out
}
func buildRelayHealthSignal(ctx context.Context, url, filePath string) map[string]interface{} {
out := make(map[string]interface{})
if strings.TrimSpace(url) != "" {
for key, value := range fetchRelayHealthURL(ctx, url) {
out[key] = value
}
}
if strings.TrimSpace(filePath) != "" {
for key, value := range fetchRelayHealthFileSnapshot(filePath) {
out[key] = value
}
}
if len(out) == 0 {
return nil
}
return out
}
func normalizeRelayHealthName(raw string, index int) string {
name := strings.TrimSpace(strings.ToLower(raw))
if name == "" {
return "relay_" + strconv.Itoa(index)
}
replacer := strings.NewReplacer(" ", "_", "-", "_", "/", "_")
name = replacer.Replace(name)
return name
}
func parseRelayHealthTargets() []relayHealthTarget {
raw := strings.TrimSpace(os.Getenv("CCIP_RELAY_HEALTH_URLS"))
if raw == "" {
return nil
}
normalized := strings.NewReplacer("\n", ",", ";", ",").Replace(raw)
parts := strings.Split(normalized, ",")
targets := make([]relayHealthTarget, 0, len(parts))
for idx, part := range parts {
part = strings.TrimSpace(part)
if part == "" {
continue
}
name := ""
url := part
if strings.Contains(part, "=") {
chunks := strings.SplitN(part, "=", 2)
name = normalizeRelayHealthName(chunks[0], idx+1)
url = strings.TrimSpace(chunks[1])
} else {
name = normalizeRelayHealthName("", idx+1)
}
if url == "" {
continue
}
targets = append(targets, relayHealthTarget{Name: name, URL: url})
}
return targets
}
// FetchCCIPRelayHealths returns optional named CCIP / relay signals from URL probes and/or operator JSON files.
// Safe defaults: short timeouts, small body cap. Omit from payload when nothing is configured.
func FetchCCIPRelayHealths(ctx context.Context) map[string]interface{} {
relays := make(map[string]interface{})
if legacy := buildRelayHealthSignal(
ctx,
strings.TrimSpace(os.Getenv("CCIP_RELAY_HEALTH_URL")),
strings.TrimSpace(os.Getenv("MISSION_CONTROL_CCIP_JSON")),
); legacy != nil {
relays["mainnet"] = legacy
}
for _, target := range parseRelayHealthTargets() {
if _, exists := relays[target.Name]; exists {
continue
}
if relay := buildRelayHealthSignal(ctx, target.URL, ""); relay != nil {
relays[target.Name] = relay
}
}
if len(relays) == 0 {
return nil
}
return relays
}
func primaryRelayHealth(relays map[string]interface{}) map[string]interface{} {
if len(relays) == 0 {
return nil
}
preferred := []string{"mainnet_cw", "mainnet_weth", "mainnet"}
for _, key := range preferred {
if relay, ok := relays[key].(map[string]interface{}); ok {
return relay
}
}
keys := make([]string, 0, len(relays))
for key := range relays {
keys = append(keys, key)
}
sort.Strings(keys)
for _, key := range keys {
if relay, ok := relays[key].(map[string]interface{}); ok {
return relay
}
}
return nil
}
// FetchCCIPRelayHealth returns the primary relay signal for legacy callers.
func FetchCCIPRelayHealth(ctx context.Context) map[string]interface{} {
return primaryRelayHealth(FetchCCIPRelayHealths(ctx))
}

View File

@@ -0,0 +1,203 @@
package track1
import (
"context"
"encoding/json"
"net/http"
"net/http/httptest"
"os"
"path/filepath"
"strconv"
"testing"
"time"
"github.com/stretchr/testify/require"
)
func TestFetchCCIPRelayHealthFromURL(t *testing.T) {
relay := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
_, _ = w.Write([]byte(`{"ok":true,"status":"operational","destination":{"chain_name":"Ethereum Mainnet"},"queue":{"size":0}}`))
}))
defer relay.Close()
t.Setenv("CCIP_RELAY_HEALTH_URL", relay.URL+"/healthz")
t.Setenv("CCIP_RELAY_HEALTH_URLS", "")
t.Setenv("MISSION_CONTROL_CCIP_JSON", "")
got := FetchCCIPRelayHealth(context.Background())
require.NotNil(t, got)
probe, ok := got["url_probe"].(map[string]interface{})
require.True(t, ok)
require.Equal(t, true, probe["ok"])
body, ok := probe["body"].(map[string]interface{})
require.True(t, ok)
require.Equal(t, "operational", body["status"])
dest, ok := body["destination"].(map[string]interface{})
require.True(t, ok)
require.Equal(t, "Ethereum Mainnet", dest["chain_name"])
}
func TestFetchCCIPRelayHealthsFromNamedURLs(t *testing.T) {
mainnet := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
_, _ = w.Write([]byte(`{"status":"operational","destination":{"chain_name":"Ethereum Mainnet"},"queue":{"size":0}}`))
}))
defer mainnet.Close()
bsc := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
_, _ = w.Write([]byte(`{"status":"operational","destination":{"chain_name":"BSC"},"queue":{"size":1}}`))
}))
defer bsc.Close()
t.Setenv("CCIP_RELAY_HEALTH_URL", "")
t.Setenv("MISSION_CONTROL_CCIP_JSON", "")
t.Setenv("CCIP_RELAY_HEALTH_URLS", "mainnet="+mainnet.URL+"/healthz,bsc="+bsc.URL+"/healthz")
got := FetchCCIPRelayHealths(context.Background())
require.NotNil(t, got)
mainnetRelay, ok := got["mainnet"].(map[string]interface{})
require.True(t, ok)
mainnetProbe, ok := mainnetRelay["url_probe"].(map[string]interface{})
require.True(t, ok)
require.Equal(t, true, mainnetProbe["ok"])
bscRelay, ok := got["bsc"].(map[string]interface{})
require.True(t, ok)
bscProbe, ok := bscRelay["url_probe"].(map[string]interface{})
require.True(t, ok)
body, ok := bscProbe["body"].(map[string]interface{})
require.True(t, ok)
dest, ok := body["destination"].(map[string]interface{})
require.True(t, ok)
require.Equal(t, "BSC", dest["chain_name"])
}
func TestFetchCCIPRelayHealthPrefersMainnetCW(t *testing.T) {
relays := map[string]interface{}{
"mainnet_weth": map[string]interface{}{"url_probe": map[string]interface{}{"ok": true}},
"mainnet_cw": map[string]interface{}{"url_probe": map[string]interface{}{"ok": true, "body": map[string]interface{}{"status": "operational"}}},
"bsc": map[string]interface{}{"url_probe": map[string]interface{}{"ok": true}},
}
got := primaryRelayHealth(relays)
require.NotNil(t, got)
require.Equal(t, relays["mainnet_cw"], got)
}
func TestFetchCCIPRelayHealthFromFileSnapshot(t *testing.T) {
dir := t.TempDir()
path := filepath.Join(dir, "relay-health.json")
require.NoError(t, os.WriteFile(path, []byte(`{"status":"paused","queue":{"size":3}}`), 0o644))
t.Setenv("CCIP_RELAY_HEALTH_URL", "")
t.Setenv("CCIP_RELAY_HEALTH_URLS", "")
t.Setenv("MISSION_CONTROL_CCIP_JSON", path)
got := FetchCCIPRelayHealth(context.Background())
require.NotNil(t, got)
snapshot, ok := got["file_snapshot"].(map[string]interface{})
require.True(t, ok)
require.Equal(t, "paused", snapshot["status"])
queue, ok := snapshot["queue"].(map[string]interface{})
require.True(t, ok)
require.Equal(t, float64(3), queue["size"])
}
func TestBuildBridgeStatusDataIncludesCCIPRelay(t *testing.T) {
rpc := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
var req struct {
Method string `json:"method"`
}
require.NoError(t, json.NewDecoder(r.Body).Decode(&req))
w.Header().Set("Content-Type", "application/json")
switch req.Method {
case "eth_blockNumber":
_, _ = w.Write([]byte(`{"jsonrpc":"2.0","id":1,"result":"0x10"}`))
case "eth_getBlockByNumber":
ts := strconv.FormatInt(time.Now().Add(-2*time.Second).Unix(), 16)
_, _ = w.Write([]byte(`{"jsonrpc":"2.0","id":1,"result":{"timestamp":"0x` + ts + `"}}`))
default:
http.Error(w, `{"jsonrpc":"2.0","id":1,"error":{"message":"unsupported"}}`, http.StatusBadRequest)
}
}))
defer rpc.Close()
relay := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
_, _ = w.Write([]byte(`{"ok":true,"status":"operational","queue":{"size":0}}`))
}))
defer relay.Close()
t.Setenv("RPC_URL", rpc.URL)
t.Setenv("ETH_MAINNET_RPC_URL", "")
t.Setenv("MISSION_CONTROL_EXTRA_RPCS", "")
t.Setenv("MISSION_CONTROL_VERIFY_JSON", "")
t.Setenv("CCIP_RELAY_HEALTH_URL", relay.URL+"/healthz")
t.Setenv("CCIP_RELAY_HEALTH_URLS", "")
t.Setenv("MISSION_CONTROL_CCIP_JSON", "")
s := &Server{}
got := s.BuildBridgeStatusData(context.Background())
ccip, ok := got["ccip_relay"].(map[string]interface{})
require.True(t, ok)
relays, ok := got["ccip_relays"].(map[string]interface{})
require.True(t, ok)
require.Contains(t, relays, "mainnet")
probe, ok := ccip["url_probe"].(map[string]interface{})
require.True(t, ok)
require.Equal(t, true, probe["ok"])
}
func TestBuildBridgeStatusDataDegradesWhenNamedRelayFails(t *testing.T) {
rpc := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
var req struct {
Method string `json:"method"`
}
require.NoError(t, json.NewDecoder(r.Body).Decode(&req))
w.Header().Set("Content-Type", "application/json")
switch req.Method {
case "eth_blockNumber":
_, _ = w.Write([]byte(`{"jsonrpc":"2.0","id":1,"result":"0x10"}`))
case "eth_getBlockByNumber":
ts := strconv.FormatInt(time.Now().Add(-2*time.Second).Unix(), 16)
_, _ = w.Write([]byte(`{"jsonrpc":"2.0","id":1,"result":{"timestamp":"0x` + ts + `"}}`))
default:
http.Error(w, `{"jsonrpc":"2.0","id":1,"error":{"message":"unsupported"}}`, http.StatusBadRequest)
}
}))
defer rpc.Close()
mainnet := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
_, _ = w.Write([]byte(`{"status":"operational","queue":{"size":0}}`))
}))
defer mainnet.Close()
bad := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
http.Error(w, `{"status":"degraded"}`, http.StatusBadGateway)
}))
defer bad.Close()
t.Setenv("RPC_URL", rpc.URL)
t.Setenv("ETH_MAINNET_RPC_URL", "")
t.Setenv("MISSION_CONTROL_EXTRA_RPCS", "")
t.Setenv("MISSION_CONTROL_VERIFY_JSON", "")
t.Setenv("CCIP_RELAY_HEALTH_URL", "")
t.Setenv("MISSION_CONTROL_CCIP_JSON", "")
t.Setenv("CCIP_RELAY_HEALTH_URLS", "mainnet="+mainnet.URL+"/healthz,bsc="+bad.URL+"/healthz")
s := &Server{}
got := s.BuildBridgeStatusData(context.Background())
require.Equal(t, "degraded", got["status"])
}

View File

@@ -1,17 +1,22 @@
package track1
import (
"context"
"encoding/json"
"fmt"
"math/big"
"net/http"
"regexp"
"strconv"
"strings"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/explorer/backend/libs/go-rpc-gateway"
)
var track1HashPattern = regexp.MustCompile(`^0x[a-fA-F0-9]{64}$`)
// Server handles Track 1 endpoints (uses RPC gateway from lib)
type Server struct {
rpcGateway *gateway.RPCGateway
@@ -173,7 +178,12 @@ func (s *Server) HandleBlockDetail(w http.ResponseWriter, r *http.Request) {
}
path := strings.TrimPrefix(r.URL.Path, "/api/v1/track1/block/")
blockNumStr := fmt.Sprintf("0x%x", parseBlockNumber(path))
blockNumber, err := strconv.ParseInt(strings.TrimSpace(path), 10, 64)
if err != nil || blockNumber < 0 {
writeError(w, http.StatusBadRequest, "bad_request", "Invalid block number")
return
}
blockNumStr := fmt.Sprintf("0x%x", blockNumber)
blockResp, err := s.rpcGateway.GetBlockByNumber(r.Context(), blockNumStr, false)
if err != nil {
@@ -203,7 +213,11 @@ func (s *Server) HandleTransactionDetail(w http.ResponseWriter, r *http.Request)
}
path := strings.TrimPrefix(r.URL.Path, "/api/v1/track1/tx/")
txHash := path
txHash := strings.TrimSpace(path)
if !track1HashPattern.MatchString(txHash) {
writeError(w, http.StatusBadRequest, "bad_request", "Invalid transaction hash")
return
}
txResp, err := s.rpcGateway.GetTransactionByHash(r.Context(), txHash)
if err != nil {
@@ -239,7 +253,11 @@ func (s *Server) HandleAddressBalance(w http.ResponseWriter, r *http.Request) {
return
}
address := parts[0]
address := strings.TrimSpace(parts[0])
if !common.IsHexAddress(address) {
writeError(w, http.StatusBadRequest, "bad_request", "Invalid address")
return
}
balanceResp, err := s.rpcGateway.GetBalance(r.Context(), address, "latest")
if err != nil {
writeError(w, http.StatusInternalServerError, "rpc_error", err.Error())
@@ -278,31 +296,25 @@ func (s *Server) HandleBridgeStatus(w http.ResponseWriter, r *http.Request) {
return
}
// Return bridge status (simplified - in production, query bridge contracts)
ctx, cancel := context.WithTimeout(r.Context(), 12*time.Second)
defer cancel()
data := s.BuildBridgeStatusData(ctx)
response := map[string]interface{}{
"data": map[string]interface{}{
"status": "operational",
"chains": map[string]interface{}{
"138": map[string]interface{}{
"name": "Defi Oracle Meta Mainnet",
"status": "operational",
"last_sync": time.Now().UTC().Format(time.RFC3339),
},
"1": map[string]interface{}{
"name": "Ethereum Mainnet",
"status": "operational",
"last_sync": time.Now().UTC().Format(time.RFC3339),
},
},
"total_transfers_24h": 150,
"total_volume_24h": "5000000000000000000000",
},
"data": data,
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(response)
}
func chainStatusFromProbe(p RPCProbeResult) string {
if p.OK {
return "operational"
}
return "unreachable"
}
// Helper functions
func writeError(w http.ResponseWriter, statusCode int, code, message string) {
w.Header().Set("Content-Type", "application/json")
@@ -320,14 +332,6 @@ func hexToInt(hex string) (int64, error) {
return strconv.ParseInt(hex, 16, 64)
}
func parseBlockNumber(s string) int64 {
num, err := strconv.ParseInt(s, 10, 64)
if err != nil {
return 0
}
return num
}
func transformBlock(blockData map[string]interface{}) map[string]interface{} {
return map[string]interface{}{
"number": parseHexField(blockData["number"]),

View File

@@ -0,0 +1,43 @@
package track1
import (
"net/http"
"net/http/httptest"
"testing"
)
func TestHandleBlockDetailRejectsInvalidBlockNumber(t *testing.T) {
server := &Server{}
req := httptest.NewRequest(http.MethodGet, "/api/v1/track1/block/not-a-number", nil)
w := httptest.NewRecorder()
server.HandleBlockDetail(w, req)
if w.Code != http.StatusBadRequest {
t.Fatalf("expected 400 for invalid block number, got %d", w.Code)
}
}
func TestHandleTransactionDetailRejectsInvalidHash(t *testing.T) {
server := &Server{}
req := httptest.NewRequest(http.MethodGet, "/api/v1/track1/tx/not-a-hash", nil)
w := httptest.NewRecorder()
server.HandleTransactionDetail(w, req)
if w.Code != http.StatusBadRequest {
t.Fatalf("expected 400 for invalid tx hash, got %d", w.Code)
}
}
func TestHandleAddressBalanceRejectsInvalidAddress(t *testing.T) {
server := &Server{}
req := httptest.NewRequest(http.MethodGet, "/api/v1/track1/address/not-an-address/balance", nil)
w := httptest.NewRecorder()
server.HandleAddressBalance(w, req)
if w.Code != http.StatusBadRequest {
t.Fatalf("expected 400 for invalid address, got %d", w.Code)
}
}

View File

@@ -0,0 +1,54 @@
package track1
import (
"context"
"encoding/json"
"fmt"
"net/http"
"time"
)
// HandleMissionControlStream sends periodic text/event-stream payloads with full bridge/status data (for SPA or tooling).
func (s *Server) HandleMissionControlStream(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
writeError(w, http.StatusMethodNotAllowed, "method_not_allowed", "Method not allowed")
return
}
controller := http.NewResponseController(w)
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("X-Accel-Buffering", "no")
tick := time.NewTicker(20 * time.Second)
defer tick.Stop()
send := func() bool {
ctx, cancel := context.WithTimeout(r.Context(), 12*time.Second)
defer cancel()
data := s.BuildBridgeStatusData(ctx)
payload, err := json.Marshal(map[string]interface{}{"data": data})
if err != nil {
return false
}
_, _ = fmt.Fprintf(w, "event: mission-control\ndata: %s\n\n", payload)
return controller.Flush() == nil
}
if !send() {
return
}
for {
select {
case <-r.Context().Done():
return
case <-tick.C:
if !send() {
return
}
}
}
}

View File

@@ -0,0 +1,72 @@
package track1
import (
"context"
"encoding/json"
"net/http"
"net/http/httptest"
"strconv"
"strings"
"testing"
"time"
"github.com/stretchr/testify/require"
)
func TestHandleMissionControlStreamSendsInitialEvent(t *testing.T) {
rpc := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
var req struct {
Method string `json:"method"`
}
require.NoError(t, json.NewDecoder(r.Body).Decode(&req))
w.Header().Set("Content-Type", "application/json")
switch req.Method {
case "eth_blockNumber":
_, _ = w.Write([]byte(`{"jsonrpc":"2.0","id":1,"result":"0x10"}`))
case "eth_getBlockByNumber":
ts := strconv.FormatInt(time.Now().Add(-2*time.Second).Unix(), 16)
_, _ = w.Write([]byte(`{"jsonrpc":"2.0","id":1,"result":{"timestamp":"0x` + ts + `"}}`))
default:
http.Error(w, `{"jsonrpc":"2.0","id":1,"error":{"message":"unsupported"}}`, http.StatusBadRequest)
}
}))
defer rpc.Close()
t.Setenv("RPC_URL", rpc.URL)
t.Setenv("ETH_MAINNET_RPC_URL", "")
t.Setenv("MISSION_CONTROL_EXTRA_RPCS", "")
t.Setenv("MISSION_CONTROL_VERIFY_JSON", "")
t.Setenv("CCIP_RELAY_HEALTH_URL", "")
t.Setenv("CCIP_RELAY_HEALTH_URLS", "")
t.Setenv("MISSION_CONTROL_CCIP_JSON", "")
s := &Server{}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
req := httptest.NewRequest(http.MethodGet, "/api/v1/mission-control/stream", nil).WithContext(ctx)
w := httptest.NewRecorder()
done := make(chan struct{})
go func() {
s.HandleMissionControlStream(w, req)
close(done)
}()
deadline := time.Now().Add(500 * time.Millisecond)
for time.Now().Before(deadline) {
if strings.Contains(w.Body.String(), "event: mission-control") {
break
}
time.Sleep(10 * time.Millisecond)
}
cancel()
<-done
require.Contains(t, w.Header().Get("Content-Type"), "text/event-stream")
require.Contains(t, w.Body.String(), "event: mission-control")
require.Contains(t, w.Body.String(), `"status":"operational"`)
require.Contains(t, w.Body.String(), `"chain-138"`)
}

View File

@@ -0,0 +1,204 @@
package track1
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"os"
"strconv"
"strings"
"time"
)
// RPCProbeResult is one JSON-RPC health check (URLs are redacted to origin only in JSON).
type RPCProbeResult struct {
Name string `json:"name"`
ChainKey string `json:"chainKey,omitempty"`
Endpoint string `json:"endpoint"`
OK bool `json:"ok"`
LatencyMs int64 `json:"latencyMs"`
BlockNumber string `json:"blockNumber,omitempty"`
BlockNumberDec string `json:"blockNumberDec,omitempty"`
HeadAgeSeconds float64 `json:"headAgeSeconds,omitempty"`
Error string `json:"error,omitempty"`
}
type jsonRPCReq struct {
JSONRPC string `json:"jsonrpc"`
Method string `json:"method"`
Params []interface{} `json:"params"`
ID int `json:"id"`
}
type jsonRPCResp struct {
Result json.RawMessage `json:"result"`
Error *struct {
Message string `json:"message"`
} `json:"error"`
}
func redactRPCOrigin(raw string) string {
raw = strings.TrimSpace(raw)
if raw == "" {
return ""
}
u, err := url.Parse(raw)
if err != nil || u.Host == "" {
return "hidden"
}
if u.Scheme == "" {
return u.Host
}
return u.Scheme + "://" + u.Host
}
func postJSONRPC(ctx context.Context, client *http.Client, rpcURL string, method string, params []interface{}) (json.RawMessage, int64, error) {
if client == nil {
client = http.DefaultClient
}
body, err := json.Marshal(jsonRPCReq{JSONRPC: "2.0", Method: method, Params: params, ID: 1})
if err != nil {
return nil, 0, err
}
req, err := http.NewRequestWithContext(ctx, http.MethodPost, rpcURL, bytes.NewReader(body))
if err != nil {
return nil, 0, err
}
req.Header.Set("Content-Type", "application/json")
start := time.Now()
resp, err := client.Do(req)
latency := time.Since(start).Milliseconds()
if err != nil {
return nil, latency, err
}
defer resp.Body.Close()
b, err := io.ReadAll(io.LimitReader(resp.Body, 1<<20))
if err != nil {
return nil, latency, err
}
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
return nil, latency, fmt.Errorf("http %d", resp.StatusCode)
}
var out jsonRPCResp
if err := json.Unmarshal(b, &out); err != nil {
return nil, latency, err
}
if out.Error != nil && out.Error.Message != "" {
return nil, latency, fmt.Errorf("rpc error: %s", out.Error.Message)
}
return out.Result, latency, nil
}
// ProbeEVMJSONRPC runs eth_blockNumber and eth_getBlockByNumber(latest) for head age.
func ProbeEVMJSONRPC(ctx context.Context, name, chainKey, rpcURL string) RPCProbeResult {
rpcURL = strings.TrimSpace(rpcURL)
res := RPCProbeResult{
Name: name,
ChainKey: chainKey,
Endpoint: redactRPCOrigin(rpcURL),
}
if rpcURL == "" {
res.Error = "empty rpc url"
return res
}
client := &http.Client{Timeout: 6 * time.Second}
numRaw, lat1, err := postJSONRPC(ctx, client, rpcURL, "eth_blockNumber", []interface{}{})
if err != nil {
res.LatencyMs = lat1
res.Error = err.Error()
return res
}
var numHex string
if err := json.Unmarshal(numRaw, &numHex); err != nil {
res.LatencyMs = lat1
res.Error = "blockNumber decode: " + err.Error()
return res
}
res.BlockNumber = numHex
if n, err := strconv.ParseInt(strings.TrimPrefix(strings.TrimSpace(numHex), "0x"), 16, 64); err == nil {
res.BlockNumberDec = strconv.FormatInt(n, 10)
}
blockRaw, lat2, err := postJSONRPC(ctx, client, rpcURL, "eth_getBlockByNumber", []interface{}{"latest", false})
res.LatencyMs = lat1 + lat2
if err != nil {
res.OK = true
res.Error = "head block timestamp unavailable: " + err.Error()
return res
}
var block struct {
Timestamp string `json:"timestamp"`
}
if err := json.Unmarshal(blockRaw, &block); err != nil || block.Timestamp == "" {
res.OK = true
if err != nil {
res.Error = "block decode: " + err.Error()
}
return res
}
tsHex := strings.TrimSpace(block.Timestamp)
ts, err := strconv.ParseInt(strings.TrimPrefix(tsHex, "0x"), 16, 64)
if err != nil {
res.OK = true
res.Error = "timestamp parse: " + err.Error()
return res
}
bt := time.Unix(ts, 0)
res.HeadAgeSeconds = time.Since(bt).Seconds()
res.OK = true
return res
}
func readOptionalVerifyJSON() map[string]interface{} {
path := strings.TrimSpace(os.Getenv("MISSION_CONTROL_VERIFY_JSON"))
if path == "" {
return nil
}
b, err := os.ReadFile(path)
if err != nil || len(b) == 0 {
return map[string]interface{}{"error": "unreadable or empty", "path": path}
}
if len(b) > 512*1024 {
return map[string]interface{}{"error": "file too large", "path": path}
}
var v map[string]interface{}
if err := json.Unmarshal(b, &v); err != nil {
return map[string]interface{}{"error": err.Error(), "path": path}
}
return v
}
// ParseExtraRPCProbes reads MISSION_CONTROL_EXTRA_RPCS lines "name|url" or "name|url|chainKey".
func ParseExtraRPCProbes() [][3]string {
raw := strings.TrimSpace(os.Getenv("MISSION_CONTROL_EXTRA_RPCS"))
if raw == "" {
return nil
}
var out [][3]string
for _, line := range strings.Split(raw, "\n") {
line = strings.TrimSpace(line)
if line == "" || strings.HasPrefix(line, "#") {
continue
}
parts := strings.Split(line, "|")
if len(parts) < 2 {
continue
}
name := strings.TrimSpace(parts[0])
u := strings.TrimSpace(parts[1])
ck := ""
if len(parts) > 2 {
ck = strings.TrimSpace(parts[2])
}
if name != "" && u != "" {
out = append(out, [3]string{name, u, ck})
}
}
return out
}