Cleanup of dead code marked // DEPRECATED in veza-backend-api/internal/handlers. Each symbol was verified to have zero callers across the codebase before deletion (go build ./... + go vet ./... + go test ./internal/... pass). Deleted: - UploadResponse type (upload.go) — callers use upload.StandardUploadResponse - BindJSON method on CommonHandler (common.go) — callers use BindAndValidateJSON - sendMessage method on *Client (playback_websocket_handler.go) — internal WS broadcast now goes through sendStandardizedMessage Kept as tech debt (still actively used, refactor out of J3 scope): - UploadRequest type (upload.go:23) — used by upload handler, refactor requires migrating to upload.StandardUploadRequest with multipart binding - BroadcastMessage type (playback_websocket_handler.go:53) — still the channel type for legacy playback broadcasts and referenced in tests Also in this day (already committed in parallel): - veza-backend-api/internal/api/handlers/two_factor_handlers.go deletion (had //go:build ignore, zero callers) — bundled into7fa314866by concurrent work on .github/workflows/*.yml seed-v2 investigation: - No Go source for seed-v2 found — it was only a compiled binary already purged in J1 (0e7097ed1). No code action needed. Refs: AUDIT_REPORT.md §8.1, §12 item 1-2
453 lines
13 KiB
Go
453 lines
13 KiB
Go
package handlers
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
"net"
|
|
"strconv"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/google/uuid"
|
|
|
|
apperrors "veza-backend-api/internal/errors"
|
|
"veza-backend-api/internal/models"
|
|
"veza-backend-api/internal/services"
|
|
wsmsg "veza-backend-api/internal/websocket"
|
|
|
|
"github.com/coder/websocket"
|
|
"github.com/gin-gonic/gin"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
// PlaybackAnalyticsServiceInterface defines methods needed for playback analytics
|
|
type PlaybackAnalyticsServiceInterface interface {
|
|
RecordPlayback(ctx context.Context, analytics *models.PlaybackAnalytics) error
|
|
GetTrackStats(ctx context.Context, trackID uuid.UUID) (*services.PlaybackStats, error)
|
|
}
|
|
|
|
// PlaybackWebSocketHandler gère les connexions WebSocket pour les analytics de lecture en temps réel
|
|
// T0368: Create Playback Analytics Real-time Updates
|
|
type PlaybackWebSocketHandler struct {
|
|
analyticsService PlaybackAnalyticsServiceInterface
|
|
logger *zap.Logger
|
|
clients map[int64]map[*websocket.Conn]*Client // trackID -> conn -> client
|
|
mu sync.RWMutex
|
|
broadcast chan *BroadcastMessage
|
|
}
|
|
|
|
// Client représente un client WebSocket connecté
|
|
type Client struct {
|
|
conn *websocket.Conn
|
|
trackID int64
|
|
userID uuid.UUID // Changed to UUID
|
|
send chan []byte
|
|
handler *PlaybackWebSocketHandler
|
|
mu sync.Mutex
|
|
}
|
|
|
|
// BroadcastMessage représente un message à diffuser
|
|
// DEPRECATED: Use wsmsg.WebSocketMessage instead
|
|
// INT-014: Kept for backward compatibility during migration
|
|
type BroadcastMessage struct {
|
|
TrackID int64 `json:"track_id"`
|
|
Type string `json:"type"`
|
|
Data interface{} `json:"data"`
|
|
Timestamp time.Time `json:"timestamp"`
|
|
}
|
|
|
|
// IncomingWebSocketMessage représente un message reçu du client
|
|
// INT-014: Standardized format for incoming messages
|
|
type IncomingWebSocketMessage struct {
|
|
Type string `json:"type"`
|
|
TrackID string `json:"track_id,omitempty"` // Changed to string for UUID compatibility
|
|
Data json.RawMessage `json:"data,omitempty"`
|
|
}
|
|
|
|
// NewPlaybackWebSocketHandler crée un nouveau handler WebSocket pour les analytics
|
|
func NewPlaybackWebSocketHandler(analyticsService *services.PlaybackAnalyticsService, logger *zap.Logger) *PlaybackWebSocketHandler {
|
|
return NewPlaybackWebSocketHandlerWithInterface(analyticsService, logger)
|
|
}
|
|
|
|
// NewPlaybackWebSocketHandlerWithInterface crée un nouveau handler WebSocket avec une interface (pour tests)
|
|
func NewPlaybackWebSocketHandlerWithInterface(analyticsService PlaybackAnalyticsServiceInterface, logger *zap.Logger) *PlaybackWebSocketHandler {
|
|
if logger == nil {
|
|
logger = zap.NewNop()
|
|
}
|
|
handler := &PlaybackWebSocketHandler{
|
|
analyticsService: analyticsService,
|
|
logger: logger,
|
|
clients: make(map[int64]map[*websocket.Conn]*Client),
|
|
broadcast: make(chan *BroadcastMessage, 256),
|
|
}
|
|
|
|
// Démarrer la goroutine de diffusion
|
|
go handler.broadcastMessages()
|
|
|
|
return handler
|
|
}
|
|
|
|
// WebSocketHandler gère les connexions WebSocket pour les analytics de lecture
|
|
// T0368: Create Playback Analytics Real-time Updates
|
|
func (h *PlaybackWebSocketHandler) WebSocketHandler(c *gin.Context) {
|
|
// Récupérer l'ID de l'utilisateur depuis le contexte
|
|
userID, ok := GetUserIDUUID(c)
|
|
if !ok {
|
|
return // Erreur déjà envoyée par GetUserIDUUID
|
|
}
|
|
if userID == uuid.Nil {
|
|
RespondWithAppError(c, apperrors.NewUnauthorizedError("unauthorized"))
|
|
return
|
|
}
|
|
|
|
// Mettre à niveau la connexion HTTP vers WebSocket (INT-06: coder/websocket)
|
|
conn, err := websocket.Accept(c.Writer, c.Request, &websocket.AcceptOptions{
|
|
OriginPatterns: GetAllowedWebSocketOrigins(),
|
|
})
|
|
if err != nil {
|
|
h.logger.Error("Failed to upgrade connection to WebSocket",
|
|
zap.Error(err),
|
|
zap.String("user_id", userID.String()))
|
|
return
|
|
}
|
|
|
|
// Créer un nouveau client
|
|
client := &Client{
|
|
conn: conn,
|
|
userID: userID,
|
|
send: make(chan []byte, 256),
|
|
handler: h,
|
|
}
|
|
|
|
// Gérer la connexion dans une goroutine séparée
|
|
go client.writePump()
|
|
go client.readPump()
|
|
|
|
h.logger.Info("WebSocket client connected",
|
|
zap.String("user_id", userID.String()))
|
|
}
|
|
|
|
// readPump lit les messages du client
|
|
func (c *Client) readPump() {
|
|
defer func() {
|
|
c.handler.unregisterClient(c)
|
|
_ = c.conn.Close(websocket.StatusNormalClosure, "")
|
|
}()
|
|
|
|
// SECURITY(REM-025): Limit WebSocket message size to prevent memory exhaustion
|
|
c.conn.SetReadLimit(64 * 1024) // 64KB max message size
|
|
|
|
for {
|
|
readCtx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
|
|
_, message, err := c.conn.Read(readCtx)
|
|
cancel()
|
|
if err != nil {
|
|
if !errors.Is(err, net.ErrClosed) {
|
|
status := websocket.CloseStatus(err)
|
|
if status != websocket.StatusGoingAway && status != websocket.StatusAbnormalClosure {
|
|
c.handler.logger.Error("WebSocket read error",
|
|
zap.Error(err),
|
|
zap.String("user_id", c.userID.String()))
|
|
}
|
|
}
|
|
break
|
|
}
|
|
|
|
// INT-014: Parse standardized WebSocket message format
|
|
var wsMsg IncomingWebSocketMessage
|
|
if err := json.Unmarshal(message, &wsMsg); err != nil {
|
|
// Try to parse as legacy format for backward compatibility
|
|
var legacyMsg struct {
|
|
Type string `json:"type"`
|
|
TrackID int64 `json:"track_id,omitempty"`
|
|
}
|
|
if err2 := json.Unmarshal(message, &legacyMsg); err2 != nil {
|
|
c.handler.logger.Warn("Failed to unmarshal WebSocket message",
|
|
zap.Error(err),
|
|
zap.String("user_id", c.userID.String()))
|
|
// Send error message
|
|
errorMsg := wsmsg.NewErrorMessage(400, "Invalid message format", nil)
|
|
c.sendStandardizedMessage(errorMsg)
|
|
continue
|
|
}
|
|
// Convert legacy format
|
|
wsMsg.Type = legacyMsg.Type
|
|
if legacyMsg.TrackID > 0 {
|
|
wsMsg.TrackID = strconv.FormatInt(legacyMsg.TrackID, 10)
|
|
}
|
|
}
|
|
|
|
// Gérer différents types de messages
|
|
switch wsMsg.Type {
|
|
case "subscribe":
|
|
// S'abonner à un track
|
|
if wsMsg.TrackID != "" {
|
|
trackIDInt, err := strconv.ParseInt(wsMsg.TrackID, 10, 64)
|
|
if err == nil && trackIDInt > 0 {
|
|
c.handler.subscribeClient(c, trackIDInt)
|
|
}
|
|
}
|
|
case "unsubscribe":
|
|
// Se désabonner d'un track
|
|
if wsMsg.TrackID != "" {
|
|
trackIDInt, err := strconv.ParseInt(wsMsg.TrackID, 10, 64)
|
|
if err == nil && trackIDInt > 0 {
|
|
c.handler.unsubscribeClient(c, trackIDInt)
|
|
}
|
|
}
|
|
case "ping":
|
|
// Répondre au ping avec format standardisé
|
|
pongMsg := wsmsg.NewWebSocketMessage(wsmsg.MessageTypePong, nil)
|
|
c.sendStandardizedMessage(pongMsg)
|
|
}
|
|
}
|
|
}
|
|
|
|
// writePump envoie les messages au client
|
|
func (c *Client) writePump() {
|
|
ticker := time.NewTicker(54 * time.Second)
|
|
defer func() {
|
|
ticker.Stop()
|
|
_ = c.conn.Close(websocket.StatusNormalClosure, "")
|
|
}()
|
|
|
|
for {
|
|
select {
|
|
case message, ok := <-c.send:
|
|
writeCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
|
if !ok {
|
|
cancel()
|
|
_ = c.conn.Close(websocket.StatusNormalClosure, "")
|
|
return
|
|
}
|
|
|
|
w, err := c.conn.Writer(writeCtx, websocket.MessageText)
|
|
if err != nil {
|
|
cancel()
|
|
return
|
|
}
|
|
_, _ = w.Write(message)
|
|
|
|
// Envoyer les messages en attente
|
|
n := len(c.send)
|
|
for i := 0; i < n; i++ {
|
|
_, _ = w.Write([]byte("\n"))
|
|
_, _ = w.Write(<-c.send)
|
|
}
|
|
|
|
if err := w.Close(); err != nil {
|
|
cancel()
|
|
return
|
|
}
|
|
cancel()
|
|
case <-ticker.C:
|
|
pingCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
|
if err := c.conn.Ping(pingCtx); err != nil {
|
|
cancel()
|
|
return
|
|
}
|
|
cancel()
|
|
}
|
|
}
|
|
}
|
|
|
|
// sendStandardizedMessage envoie un message au client avec format standardisé
|
|
// INT-014: Standardized WebSocket message format
|
|
func (c *Client) sendStandardizedMessage(msg *wsmsg.WebSocketMessage) {
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
|
|
// Set track ID if available
|
|
if c.trackID > 0 {
|
|
msg.WithTrackID(strconv.FormatInt(c.trackID, 10))
|
|
}
|
|
|
|
// Set user ID
|
|
msg.WithUserID(c.userID.String())
|
|
|
|
data, err := msg.ToJSON()
|
|
if err != nil {
|
|
c.handler.logger.Error("Failed to marshal standardized message",
|
|
zap.Error(err),
|
|
zap.String("user_id", c.userID.String()))
|
|
return
|
|
}
|
|
|
|
select {
|
|
case c.send <- data:
|
|
default:
|
|
close(c.send)
|
|
}
|
|
}
|
|
|
|
// subscribeClient abonne un client à un track
|
|
func (h *PlaybackWebSocketHandler) subscribeClient(client *Client, trackID int64) {
|
|
h.mu.Lock()
|
|
defer h.mu.Unlock()
|
|
|
|
if h.clients[trackID] == nil {
|
|
h.clients[trackID] = make(map[*websocket.Conn]*Client)
|
|
}
|
|
|
|
client.trackID = trackID
|
|
h.clients[trackID][client.conn] = client
|
|
|
|
h.logger.Info("Client subscribed to track",
|
|
zap.String("user_id", client.userID.String()),
|
|
zap.Int64("track_id", trackID))
|
|
|
|
// INT-014: Envoyer un message de confirmation avec format standardisé
|
|
subscribedMsg := wsmsg.NewWebSocketMessage(
|
|
wsmsg.MessageTypeSubscribed,
|
|
gin.H{"track_id": trackID},
|
|
).WithTrackID(strconv.FormatInt(trackID, 10))
|
|
client.sendStandardizedMessage(subscribedMsg)
|
|
}
|
|
|
|
// unsubscribeClient désabonne un client d'un track
|
|
func (h *PlaybackWebSocketHandler) unsubscribeClient(client *Client, trackID int64) {
|
|
h.mu.Lock()
|
|
defer h.mu.Unlock()
|
|
|
|
if clients, ok := h.clients[trackID]; ok {
|
|
delete(clients, client.conn)
|
|
if len(clients) == 0 {
|
|
delete(h.clients, trackID)
|
|
}
|
|
}
|
|
|
|
h.logger.Info("Client unsubscribed from track",
|
|
zap.String("user_id", client.userID.String()),
|
|
zap.Int64("track_id", trackID))
|
|
|
|
// INT-014: Envoyer un message de confirmation avec format standardisé
|
|
unsubscribedMsg := wsmsg.NewWebSocketMessage(
|
|
wsmsg.MessageTypeUnsubscribed,
|
|
gin.H{"track_id": trackID},
|
|
).WithTrackID(strconv.FormatInt(trackID, 10))
|
|
client.sendStandardizedMessage(unsubscribedMsg)
|
|
}
|
|
|
|
// unregisterClient retire un client de tous les tracks
|
|
func (h *PlaybackWebSocketHandler) unregisterClient(client *Client) {
|
|
h.mu.Lock()
|
|
defer h.mu.Unlock()
|
|
|
|
if client.trackID > 0 {
|
|
if clients, ok := h.clients[client.trackID]; ok {
|
|
delete(clients, client.conn)
|
|
if len(clients) == 0 {
|
|
delete(h.clients, client.trackID)
|
|
}
|
|
}
|
|
}
|
|
|
|
h.logger.Info("Client disconnected",
|
|
zap.String("user_id", client.userID.String()),
|
|
zap.Int64("track_id", client.trackID))
|
|
}
|
|
|
|
// broadcastMessages diffuse les messages à tous les clients abonnés
|
|
func (h *PlaybackWebSocketHandler) broadcastMessages() {
|
|
for message := range h.broadcast {
|
|
h.mu.RLock()
|
|
clients, ok := h.clients[message.TrackID]
|
|
if !ok {
|
|
h.mu.RUnlock()
|
|
continue
|
|
}
|
|
|
|
data, err := json.Marshal(message)
|
|
if err != nil {
|
|
h.mu.RUnlock()
|
|
h.logger.Error("Failed to marshal broadcast message",
|
|
zap.Error(err))
|
|
continue
|
|
}
|
|
|
|
// Envoyer le message à tous les clients abonnés
|
|
for _, client := range clients {
|
|
select {
|
|
case client.send <- data:
|
|
default:
|
|
close(client.send)
|
|
delete(clients, client.conn)
|
|
}
|
|
}
|
|
h.mu.RUnlock()
|
|
}
|
|
}
|
|
|
|
// BroadcastAnalyticsUpdate diffuse une mise à jour d'analytics à tous les clients abonnés
|
|
// T0368: Create Playback Analytics Real-time Updates
|
|
// INT-014: Updated to use standardized message format
|
|
func (h *PlaybackWebSocketHandler) BroadcastAnalyticsUpdate(trackID int64, analytics *models.PlaybackAnalytics) {
|
|
if analytics == nil {
|
|
return
|
|
}
|
|
|
|
// INT-014: Convert to legacy format for broadcast channel (backward compatibility)
|
|
// Note: Standardized format conversion happens in broadcastMessages()
|
|
legacyMsg := &BroadcastMessage{
|
|
TrackID: trackID,
|
|
Type: "analytics_update",
|
|
Data: analytics,
|
|
Timestamp: time.Now(),
|
|
}
|
|
|
|
select {
|
|
case h.broadcast <- legacyMsg:
|
|
default:
|
|
h.logger.Warn("Broadcast channel full, dropping message",
|
|
zap.Int64("track_id", trackID))
|
|
}
|
|
}
|
|
|
|
// BroadcastStatsUpdate diffuse une mise à jour de statistiques à tous les clients abonnés
|
|
// T0368: Create Playback Analytics Real-time Updates
|
|
// INT-014: Updated to use standardized message format
|
|
func (h *PlaybackWebSocketHandler) BroadcastStatsUpdate(trackID int64, stats *services.PlaybackStats) {
|
|
if stats == nil {
|
|
return
|
|
}
|
|
|
|
// INT-014: Convert to legacy format for broadcast channel (backward compatibility)
|
|
// Note: Standardized format conversion happens in broadcastMessages()
|
|
legacyMsg := &BroadcastMessage{
|
|
TrackID: trackID,
|
|
Type: "stats_update",
|
|
Data: stats,
|
|
Timestamp: time.Now(),
|
|
}
|
|
|
|
select {
|
|
case h.broadcast <- legacyMsg:
|
|
default:
|
|
h.logger.Warn("Broadcast channel full, dropping message",
|
|
zap.Int64("track_id", trackID))
|
|
}
|
|
}
|
|
|
|
// GetConnectedClientsCount retourne le nombre de clients connectés pour un track
|
|
func (h *PlaybackWebSocketHandler) GetConnectedClientsCount(trackID int64) int {
|
|
h.mu.RLock()
|
|
defer h.mu.RUnlock()
|
|
|
|
if clients, ok := h.clients[trackID]; ok {
|
|
return len(clients)
|
|
}
|
|
return 0
|
|
}
|
|
|
|
// GetTotalConnectedClientsCount retourne le nombre total de clients connectés
|
|
func (h *PlaybackWebSocketHandler) GetTotalConnectedClientsCount() int {
|
|
h.mu.RLock()
|
|
defer h.mu.RUnlock()
|
|
|
|
total := 0
|
|
for _, clients := range h.clients {
|
|
total += len(clients)
|
|
}
|
|
return total
|
|
}
|