veza/veza-backend-api/internal/handlers/playback_websocket_handler.go
2025-12-16 11:23:49 -05:00

404 lines
9.9 KiB
Go

package handlers
import (
"encoding/json"
"net/http"
"sync"
"time"
"github.com/google/uuid"
"veza-backend-api/internal/models"
"veza-backend-api/internal/services"
"github.com/gin-gonic/gin"
"github.com/gorilla/websocket"
"go.uber.org/zap"
)
var (
// upgrader est utilisé pour mettre à niveau les connexions HTTP vers WebSocket
upgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
CheckOrigin: func(r *http.Request) bool {
// En production, vérifier l'origine de la requête
return true
},
}
)
// PlaybackWebSocketHandler gère les connexions WebSocket pour les analytics de lecture en temps réel
// T0368: Create Playback Analytics Real-time Updates
type PlaybackWebSocketHandler struct {
analyticsService *services.PlaybackAnalyticsService
logger *zap.Logger
clients map[int64]map[*websocket.Conn]*Client // trackID -> conn -> client
mu sync.RWMutex
broadcast chan *BroadcastMessage
}
// Client représente un client WebSocket connecté
type Client struct {
conn *websocket.Conn
trackID int64
userID uuid.UUID // Changed to UUID
send chan []byte
handler *PlaybackWebSocketHandler
mu sync.Mutex
}
// BroadcastMessage représente un message à diffuser
type BroadcastMessage struct {
TrackID int64 `json:"track_id"`
Type string `json:"type"`
Data interface{} `json:"data"`
Timestamp time.Time `json:"timestamp"`
}
// WebSocketMessage représente un message reçu du client
type WebSocketMessage struct {
Type string `json:"type"`
TrackID int64 `json:"track_id,omitempty"`
Data json.RawMessage `json:"data,omitempty"`
}
// NewPlaybackWebSocketHandler crée un nouveau handler WebSocket pour les analytics
func NewPlaybackWebSocketHandler(analyticsService *services.PlaybackAnalyticsService, logger *zap.Logger) *PlaybackWebSocketHandler {
if logger == nil {
logger = zap.NewNop()
}
handler := &PlaybackWebSocketHandler{
analyticsService: analyticsService,
logger: logger,
clients: make(map[int64]map[*websocket.Conn]*Client),
broadcast: make(chan *BroadcastMessage, 256),
}
// Démarrer la goroutine de diffusion
go handler.broadcastMessages()
return handler
}
// WebSocketHandler gère les connexions WebSocket pour les analytics de lecture
// T0368: Create Playback Analytics Real-time Updates
func (h *PlaybackWebSocketHandler) WebSocketHandler(c *gin.Context) {
// Récupérer l'ID de l'utilisateur depuis le contexte
userID, ok := GetUserIDUUID(c)
if !ok {
return // Erreur déjà envoyée par GetUserIDUUID
}
if userID == uuid.Nil {
c.JSON(http.StatusUnauthorized, gin.H{"error": "unauthorized"})
return
}
// Mettre à niveau la connexion HTTP vers WebSocket
conn, err := upgrader.Upgrade(c.Writer, c.Request, nil)
if err != nil {
h.logger.Error("Failed to upgrade connection to WebSocket",
zap.Error(err),
zap.String("user_id", userID.String()))
return
}
// Créer un nouveau client
client := &Client{
conn: conn,
userID: userID,
send: make(chan []byte, 256),
handler: h,
}
// Gérer la connexion dans une goroutine séparée
go client.writePump()
go client.readPump()
h.logger.Info("WebSocket client connected",
zap.String("user_id", userID.String()))
}
// readPump lit les messages du client
func (c *Client) readPump() {
defer func() {
c.handler.unregisterClient(c)
c.conn.Close()
}()
c.conn.SetReadDeadline(time.Now().Add(60 * time.Second))
c.conn.SetPongHandler(func(string) error {
c.conn.SetReadDeadline(time.Now().Add(60 * time.Second))
return nil
})
for {
_, message, err := c.conn.ReadMessage()
if err != nil {
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
c.handler.logger.Error("WebSocket read error",
zap.Error(err),
zap.String("user_id", c.userID.String()))
}
break
}
// Traiter le message
var wsMsg WebSocketMessage
if err := json.Unmarshal(message, &wsMsg); err != nil {
c.handler.logger.Warn("Failed to unmarshal WebSocket message",
zap.Error(err),
zap.String("user_id", c.userID.String()))
continue
}
// Gérer différents types de messages
switch wsMsg.Type {
case "subscribe":
// S'abonner à un track
if wsMsg.TrackID > 0 {
c.handler.subscribeClient(c, wsMsg.TrackID)
}
case "unsubscribe":
// Se désabonner d'un track
if wsMsg.TrackID > 0 {
c.handler.unsubscribeClient(c, wsMsg.TrackID)
}
case "ping":
// Répondre au ping
c.sendMessage(&BroadcastMessage{
Type: "pong",
Timestamp: time.Now(),
})
}
}
}
// writePump envoie les messages au client
func (c *Client) writePump() {
ticker := time.NewTicker(54 * time.Second)
defer func() {
ticker.Stop()
c.conn.Close()
}()
for {
select {
case message, ok := <-c.send:
c.conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
if !ok {
c.conn.WriteMessage(websocket.CloseMessage, []byte{})
return
}
w, err := c.conn.NextWriter(websocket.TextMessage)
if err != nil {
return
}
w.Write(message)
// Envoyer les messages en attente
n := len(c.send)
for i := 0; i < n; i++ {
w.Write([]byte("\n"))
w.Write(<-c.send)
}
if err := w.Close(); err != nil {
return
}
case <-ticker.C:
c.conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
if err := c.conn.WriteMessage(websocket.PingMessage, nil); err != nil {
return
}
}
}
}
// sendMessage envoie un message au client
func (c *Client) sendMessage(msg *BroadcastMessage) {
c.mu.Lock()
defer c.mu.Unlock()
data, err := json.Marshal(msg)
if err != nil {
c.handler.logger.Error("Failed to marshal message",
zap.Error(err),
zap.String("user_id", c.userID.String()))
return
}
select {
case c.send <- data:
default:
close(c.send)
}
}
// subscribeClient abonne un client à un track
func (h *PlaybackWebSocketHandler) subscribeClient(client *Client, trackID int64) {
h.mu.Lock()
defer h.mu.Unlock()
if h.clients[trackID] == nil {
h.clients[trackID] = make(map[*websocket.Conn]*Client)
}
client.trackID = trackID
h.clients[trackID][client.conn] = client
h.logger.Info("Client subscribed to track",
zap.String("user_id", client.userID.String()),
zap.Int64("track_id", trackID))
// Envoyer un message de confirmation
client.sendMessage(&BroadcastMessage{
TrackID: trackID,
Type: "subscribed",
Data: gin.H{"track_id": trackID},
Timestamp: time.Now(),
})
}
// unsubscribeClient désabonne un client d'un track
func (h *PlaybackWebSocketHandler) unsubscribeClient(client *Client, trackID int64) {
h.mu.Lock()
defer h.mu.Unlock()
if clients, ok := h.clients[trackID]; ok {
delete(clients, client.conn)
if len(clients) == 0 {
delete(h.clients, trackID)
}
}
h.logger.Info("Client unsubscribed from track",
zap.String("user_id", client.userID.String()),
zap.Int64("track_id", trackID))
// Envoyer un message de confirmation
client.sendMessage(&BroadcastMessage{
TrackID: trackID,
Type: "unsubscribed",
Data: gin.H{"track_id": trackID},
Timestamp: time.Now(),
})
}
// unregisterClient retire un client de tous les tracks
func (h *PlaybackWebSocketHandler) unregisterClient(client *Client) {
h.mu.Lock()
defer h.mu.Unlock()
if client.trackID > 0 {
if clients, ok := h.clients[client.trackID]; ok {
delete(clients, client.conn)
if len(clients) == 0 {
delete(h.clients, client.trackID)
}
}
}
h.logger.Info("Client disconnected",
zap.String("user_id", client.userID.String()),
zap.Int64("track_id", client.trackID))
}
// broadcastMessages diffuse les messages à tous les clients abonnés
func (h *PlaybackWebSocketHandler) broadcastMessages() {
for message := range h.broadcast {
h.mu.RLock()
clients, ok := h.clients[message.TrackID]
if !ok {
h.mu.RUnlock()
continue
}
data, err := json.Marshal(message)
if err != nil {
h.mu.RUnlock()
h.logger.Error("Failed to marshal broadcast message",
zap.Error(err))
continue
}
// Envoyer le message à tous les clients abonnés
for _, client := range clients {
select {
case client.send <- data:
default:
close(client.send)
delete(clients, client.conn)
}
}
h.mu.RUnlock()
}
}
// BroadcastAnalyticsUpdate diffuse une mise à jour d'analytics à tous les clients abonnés
// T0368: Create Playback Analytics Real-time Updates
func (h *PlaybackWebSocketHandler) BroadcastAnalyticsUpdate(trackID int64, analytics *models.PlaybackAnalytics) {
if analytics == nil {
return
}
message := &BroadcastMessage{
TrackID: trackID,
Type: "analytics_update",
Data: analytics,
Timestamp: time.Now(),
}
select {
case h.broadcast <- message:
default:
h.logger.Warn("Broadcast channel full, dropping message",
zap.Int64("track_id", trackID))
}
}
// BroadcastStatsUpdate diffuse une mise à jour de statistiques à tous les clients abonnés
// T0368: Create Playback Analytics Real-time Updates
func (h *PlaybackWebSocketHandler) BroadcastStatsUpdate(trackID int64, stats *services.PlaybackStats) {
if stats == nil {
return
}
message := &BroadcastMessage{
TrackID: trackID,
Type: "stats_update",
Data: stats,
Timestamp: time.Now(),
}
select {
case h.broadcast <- message:
default:
h.logger.Warn("Broadcast channel full, dropping message",
zap.Int64("track_id", trackID))
}
}
// GetConnectedClientsCount retourne le nombre de clients connectés pour un track
func (h *PlaybackWebSocketHandler) GetConnectedClientsCount(trackID int64) int {
h.mu.RLock()
defer h.mu.RUnlock()
if clients, ok := h.clients[trackID]; ok {
return len(clients)
}
return 0
}
// GetTotalConnectedClientsCount retourne le nombre total de clients connectés
func (h *PlaybackWebSocketHandler) GetTotalConnectedClientsCount() int {
h.mu.RLock()
defer h.mu.RUnlock()
total := 0
for _, clients := range h.clients {
total += len(clients)
}
return total
}