Add full monorepo: virtual-banker, backend, frontend, docs, scripts, deployment
Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
198
backend/realtime/gateway.go
Normal file
198
backend/realtime/gateway.go
Normal file
@@ -0,0 +1,198 @@
|
||||
package realtime
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"log"
|
||||
"net/http"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/gorilla/websocket"
|
||||
)
|
||||
|
||||
var upgrader = websocket.Upgrader{
|
||||
CheckOrigin: func(r *http.Request) bool {
|
||||
// In production, validate origin properly
|
||||
return true
|
||||
},
|
||||
}
|
||||
|
||||
// Gateway handles WebRTC signaling and WebSocket connections
|
||||
type Gateway struct {
|
||||
connections map[string]*Connection
|
||||
mu sync.RWMutex
|
||||
}
|
||||
|
||||
// NewGateway creates a new WebRTC gateway
|
||||
func NewGateway() *Gateway {
|
||||
return &Gateway{
|
||||
connections: make(map[string]*Connection),
|
||||
}
|
||||
}
|
||||
|
||||
// Connection represents a WebSocket connection for signaling
|
||||
type Connection struct {
|
||||
sessionID string
|
||||
ws *websocket.Conn
|
||||
send chan []byte
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
}
|
||||
|
||||
// HandleWebSocket handles WebSocket upgrade for signaling
|
||||
func (g *Gateway) HandleWebSocket(w http.ResponseWriter, r *http.Request, sessionID string) error {
|
||||
ws, err := upgrader.Upgrade(w, r, nil)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to upgrade connection: %w", err)
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(r.Context())
|
||||
conn := &Connection{
|
||||
sessionID: sessionID,
|
||||
ws: ws,
|
||||
send: make(chan []byte, 256),
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
}
|
||||
|
||||
g.mu.Lock()
|
||||
g.connections[sessionID] = conn
|
||||
g.mu.Unlock()
|
||||
|
||||
// Start goroutines
|
||||
go conn.writePump()
|
||||
go conn.readPump(g)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// SendMessage sends a message to a specific session
|
||||
func (g *Gateway) SendMessage(sessionID string, message interface{}) error {
|
||||
g.mu.RLock()
|
||||
conn, ok := g.connections[sessionID]
|
||||
g.mu.RUnlock()
|
||||
|
||||
if !ok {
|
||||
return fmt.Errorf("connection not found for session: %s", sessionID)
|
||||
}
|
||||
|
||||
data, err := json.Marshal(message)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to marshal message: %w", err)
|
||||
}
|
||||
|
||||
select {
|
||||
case conn.send <- data:
|
||||
return nil
|
||||
case <-conn.ctx.Done():
|
||||
return fmt.Errorf("connection closed")
|
||||
}
|
||||
}
|
||||
|
||||
// CloseConnection closes a connection
|
||||
func (g *Gateway) CloseConnection(sessionID string) {
|
||||
g.mu.Lock()
|
||||
defer g.mu.Unlock()
|
||||
|
||||
if conn, ok := g.connections[sessionID]; ok {
|
||||
conn.cancel()
|
||||
conn.ws.Close()
|
||||
delete(g.connections, sessionID)
|
||||
}
|
||||
}
|
||||
|
||||
// readPump reads messages from the WebSocket
|
||||
func (c *Connection) readPump(gateway *Gateway) {
|
||||
defer func() {
|
||||
gateway.CloseConnection(c.sessionID)
|
||||
c.ws.Close()
|
||||
}()
|
||||
|
||||
c.ws.SetReadDeadline(time.Now().Add(60 * time.Second))
|
||||
c.ws.SetPongHandler(func(string) error {
|
||||
c.ws.SetReadDeadline(time.Now().Add(60 * time.Second))
|
||||
return nil
|
||||
})
|
||||
|
||||
for {
|
||||
_, message, err := c.ws.ReadMessage()
|
||||
if err != nil {
|
||||
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
|
||||
log.Printf("WebSocket error: %v", err)
|
||||
}
|
||||
break
|
||||
}
|
||||
|
||||
// Handle incoming message (ICE candidates, SDP offers/answers, etc.)
|
||||
var msg map[string]interface{}
|
||||
if err := json.Unmarshal(message, &msg); err != nil {
|
||||
log.Printf("Failed to unmarshal message: %v", err)
|
||||
continue
|
||||
}
|
||||
|
||||
// Route message based on type
|
||||
msgType, ok := msg["type"].(string)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
switch msgType {
|
||||
case "ice-candidate":
|
||||
// Handle ICE candidate
|
||||
case "offer":
|
||||
// Handle SDP offer
|
||||
case "answer":
|
||||
// Handle SDP answer
|
||||
default:
|
||||
log.Printf("Unknown message type: %s", msgType)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// writePump writes messages to the WebSocket
|
||||
func (c *Connection) writePump() {
|
||||
ticker := time.NewTicker(54 * time.Second)
|
||||
defer func() {
|
||||
ticker.Stop()
|
||||
c.ws.Close()
|
||||
}()
|
||||
|
||||
for {
|
||||
select {
|
||||
case message, ok := <-c.send:
|
||||
c.ws.SetWriteDeadline(time.Now().Add(10 * time.Second))
|
||||
if !ok {
|
||||
c.ws.WriteMessage(websocket.CloseMessage, []byte{})
|
||||
return
|
||||
}
|
||||
|
||||
w, err := c.ws.NextWriter(websocket.TextMessage)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
w.Write(message)
|
||||
|
||||
// Add queued messages
|
||||
n := len(c.send)
|
||||
for i := 0; i < n; i++ {
|
||||
w.Write([]byte{'\n'})
|
||||
w.Write(<-c.send)
|
||||
}
|
||||
|
||||
if err := w.Close(); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
case <-ticker.C:
|
||||
c.ws.SetWriteDeadline(time.Now().Add(10 * time.Second))
|
||||
if err := c.ws.WriteMessage(websocket.PingMessage, nil); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
case <-c.ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user