veza/veza-backend-api/internal/handlers/playback_websocket_handler.go
senke 9cd0da0046 fix(v0.12.6): apply all pentest remediations — 36 findings across 36 files
CRITICAL fixes:
- Race condition (TOCTOU) in payout/refund with SELECT FOR UPDATE (CRITICAL-001/002)
- IDOR on analytics endpoint — ownership check enforced (CRITICAL-003)
- CSWSH on all WebSocket endpoints — origin whitelist (CRITICAL-004)
- Mass assignment on user self-update — strip privileged fields (CRITICAL-005)

HIGH fixes:
- Path traversal in marketplace upload — UUID filenames (HIGH-001)
- IP spoofing — use Gin trusted proxy c.ClientIP() (HIGH-002)
- Popularity metrics (followers, likes) set to json:"-" (HIGH-003)
- bcrypt cost hardened to 12 everywhere (HIGH-004)
- Refresh token lock made mandatory (HIGH-005)
- Stream token replay prevention with access_count (HIGH-006)
- Subscription trial race condition fixed (HIGH-007)
- License download expiration check (HIGH-008)
- Webhook amount validation (HIGH-009)
- pprof endpoint removed from production (HIGH-010)

MEDIUM fixes:
- WebSocket message size limit 64KB (MEDIUM-010)
- HSTS header in nginx production (MEDIUM-001)
- CORS origin restricted in nginx-rtmp (MEDIUM-002)
- Docker alpine pinned to 3.21 (MEDIUM-003/004)
- Redis authentication enforced (MEDIUM-005)
- GDPR account deletion expanded (MEDIUM-006)
- .gitignore hardened (MEDIUM-007)

LOW/INFO fixes:
- GitHub Actions SHA pinning on all workflows (LOW-001)
- .env.example security documentation (INFO-001)
- Production CORS set to HTTPS (LOW-002)

All tests pass. Go and Rust compile clean.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-14 00:44:46 +01:00

474 lines
13 KiB
Go

package handlers
import (
"context"
"encoding/json"
"errors"
"net"
"strconv"
"sync"
"time"
"github.com/google/uuid"
apperrors "veza-backend-api/internal/errors"
"veza-backend-api/internal/models"
"veza-backend-api/internal/services"
wsmsg "veza-backend-api/internal/websocket"
"github.com/coder/websocket"
"github.com/gin-gonic/gin"
"go.uber.org/zap"
)
// PlaybackAnalyticsServiceInterface defines methods needed for playback analytics
type PlaybackAnalyticsServiceInterface interface {
RecordPlayback(ctx context.Context, analytics *models.PlaybackAnalytics) error
GetTrackStats(ctx context.Context, trackID uuid.UUID) (*services.PlaybackStats, error)
}
// PlaybackWebSocketHandler gère les connexions WebSocket pour les analytics de lecture en temps réel
// T0368: Create Playback Analytics Real-time Updates
type PlaybackWebSocketHandler struct {
analyticsService PlaybackAnalyticsServiceInterface
logger *zap.Logger
clients map[int64]map[*websocket.Conn]*Client // trackID -> conn -> client
mu sync.RWMutex
broadcast chan *BroadcastMessage
}
// Client représente un client WebSocket connecté
type Client struct {
conn *websocket.Conn
trackID int64
userID uuid.UUID // Changed to UUID
send chan []byte
handler *PlaybackWebSocketHandler
mu sync.Mutex
}
// BroadcastMessage représente un message à diffuser
// DEPRECATED: Use wsmsg.WebSocketMessage instead
// INT-014: Kept for backward compatibility during migration
type BroadcastMessage struct {
TrackID int64 `json:"track_id"`
Type string `json:"type"`
Data interface{} `json:"data"`
Timestamp time.Time `json:"timestamp"`
}
// IncomingWebSocketMessage représente un message reçu du client
// INT-014: Standardized format for incoming messages
type IncomingWebSocketMessage struct {
Type string `json:"type"`
TrackID string `json:"track_id,omitempty"` // Changed to string for UUID compatibility
Data json.RawMessage `json:"data,omitempty"`
}
// NewPlaybackWebSocketHandler crée un nouveau handler WebSocket pour les analytics
func NewPlaybackWebSocketHandler(analyticsService *services.PlaybackAnalyticsService, logger *zap.Logger) *PlaybackWebSocketHandler {
return NewPlaybackWebSocketHandlerWithInterface(analyticsService, logger)
}
// NewPlaybackWebSocketHandlerWithInterface crée un nouveau handler WebSocket avec une interface (pour tests)
func NewPlaybackWebSocketHandlerWithInterface(analyticsService PlaybackAnalyticsServiceInterface, logger *zap.Logger) *PlaybackWebSocketHandler {
if logger == nil {
logger = zap.NewNop()
}
handler := &PlaybackWebSocketHandler{
analyticsService: analyticsService,
logger: logger,
clients: make(map[int64]map[*websocket.Conn]*Client),
broadcast: make(chan *BroadcastMessage, 256),
}
// Démarrer la goroutine de diffusion
go handler.broadcastMessages()
return handler
}
// WebSocketHandler gère les connexions WebSocket pour les analytics de lecture
// T0368: Create Playback Analytics Real-time Updates
func (h *PlaybackWebSocketHandler) WebSocketHandler(c *gin.Context) {
// Récupérer l'ID de l'utilisateur depuis le contexte
userID, ok := GetUserIDUUID(c)
if !ok {
return // Erreur déjà envoyée par GetUserIDUUID
}
if userID == uuid.Nil {
RespondWithAppError(c, apperrors.NewUnauthorizedError("unauthorized"))
return
}
// Mettre à niveau la connexion HTTP vers WebSocket (INT-06: coder/websocket)
conn, err := websocket.Accept(c.Writer, c.Request, &websocket.AcceptOptions{
OriginPatterns: GetAllowedWebSocketOrigins(),
})
if err != nil {
h.logger.Error("Failed to upgrade connection to WebSocket",
zap.Error(err),
zap.String("user_id", userID.String()))
return
}
// Créer un nouveau client
client := &Client{
conn: conn,
userID: userID,
send: make(chan []byte, 256),
handler: h,
}
// Gérer la connexion dans une goroutine séparée
go client.writePump()
go client.readPump()
h.logger.Info("WebSocket client connected",
zap.String("user_id", userID.String()))
}
// readPump lit les messages du client
func (c *Client) readPump() {
defer func() {
c.handler.unregisterClient(c)
_ = c.conn.Close(websocket.StatusNormalClosure, "")
}()
// SECURITY(REM-025): Limit WebSocket message size to prevent memory exhaustion
c.conn.SetReadLimit(64 * 1024) // 64KB max message size
for {
readCtx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
_, message, err := c.conn.Read(readCtx)
cancel()
if err != nil {
if !errors.Is(err, net.ErrClosed) {
status := websocket.CloseStatus(err)
if status != websocket.StatusGoingAway && status != websocket.StatusAbnormalClosure {
c.handler.logger.Error("WebSocket read error",
zap.Error(err),
zap.String("user_id", c.userID.String()))
}
}
break
}
// INT-014: Parse standardized WebSocket message format
var wsMsg IncomingWebSocketMessage
if err := json.Unmarshal(message, &wsMsg); err != nil {
// Try to parse as legacy format for backward compatibility
var legacyMsg struct {
Type string `json:"type"`
TrackID int64 `json:"track_id,omitempty"`
}
if err2 := json.Unmarshal(message, &legacyMsg); err2 != nil {
c.handler.logger.Warn("Failed to unmarshal WebSocket message",
zap.Error(err),
zap.String("user_id", c.userID.String()))
// Send error message
errorMsg := wsmsg.NewErrorMessage(400, "Invalid message format", nil)
c.sendStandardizedMessage(errorMsg)
continue
}
// Convert legacy format
wsMsg.Type = legacyMsg.Type
if legacyMsg.TrackID > 0 {
wsMsg.TrackID = strconv.FormatInt(legacyMsg.TrackID, 10)
}
}
// Gérer différents types de messages
switch wsMsg.Type {
case "subscribe":
// S'abonner à un track
if wsMsg.TrackID != "" {
trackIDInt, err := strconv.ParseInt(wsMsg.TrackID, 10, 64)
if err == nil && trackIDInt > 0 {
c.handler.subscribeClient(c, trackIDInt)
}
}
case "unsubscribe":
// Se désabonner d'un track
if wsMsg.TrackID != "" {
trackIDInt, err := strconv.ParseInt(wsMsg.TrackID, 10, 64)
if err == nil && trackIDInt > 0 {
c.handler.unsubscribeClient(c, trackIDInt)
}
}
case "ping":
// Répondre au ping avec format standardisé
pongMsg := wsmsg.NewWebSocketMessage(wsmsg.MessageTypePong, nil)
c.sendStandardizedMessage(pongMsg)
}
}
}
// writePump envoie les messages au client
func (c *Client) writePump() {
ticker := time.NewTicker(54 * time.Second)
defer func() {
ticker.Stop()
_ = c.conn.Close(websocket.StatusNormalClosure, "")
}()
for {
select {
case message, ok := <-c.send:
writeCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
if !ok {
cancel()
_ = c.conn.Close(websocket.StatusNormalClosure, "")
return
}
w, err := c.conn.Writer(writeCtx, websocket.MessageText)
if err != nil {
cancel()
return
}
_, _ = w.Write(message)
// Envoyer les messages en attente
n := len(c.send)
for i := 0; i < n; i++ {
_, _ = w.Write([]byte("\n"))
_, _ = w.Write(<-c.send)
}
if err := w.Close(); err != nil {
cancel()
return
}
cancel()
case <-ticker.C:
pingCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
if err := c.conn.Ping(pingCtx); err != nil {
cancel()
return
}
cancel()
}
}
}
// sendMessage envoie un message au client (legacy format)
// DEPRECATED: Use sendStandardizedMessage instead
func (c *Client) sendMessage(msg *BroadcastMessage) {
c.mu.Lock()
defer c.mu.Unlock()
data, err := json.Marshal(msg)
if err != nil {
c.handler.logger.Error("Failed to marshal message",
zap.Error(err),
zap.String("user_id", c.userID.String()))
return
}
select {
case c.send <- data:
default:
close(c.send)
}
}
// sendStandardizedMessage envoie un message au client avec format standardisé
// INT-014: Standardized WebSocket message format
func (c *Client) sendStandardizedMessage(msg *wsmsg.WebSocketMessage) {
c.mu.Lock()
defer c.mu.Unlock()
// Set track ID if available
if c.trackID > 0 {
msg.WithTrackID(strconv.FormatInt(c.trackID, 10))
}
// Set user ID
msg.WithUserID(c.userID.String())
data, err := msg.ToJSON()
if err != nil {
c.handler.logger.Error("Failed to marshal standardized message",
zap.Error(err),
zap.String("user_id", c.userID.String()))
return
}
select {
case c.send <- data:
default:
close(c.send)
}
}
// subscribeClient abonne un client à un track
func (h *PlaybackWebSocketHandler) subscribeClient(client *Client, trackID int64) {
h.mu.Lock()
defer h.mu.Unlock()
if h.clients[trackID] == nil {
h.clients[trackID] = make(map[*websocket.Conn]*Client)
}
client.trackID = trackID
h.clients[trackID][client.conn] = client
h.logger.Info("Client subscribed to track",
zap.String("user_id", client.userID.String()),
zap.Int64("track_id", trackID))
// INT-014: Envoyer un message de confirmation avec format standardisé
subscribedMsg := wsmsg.NewWebSocketMessage(
wsmsg.MessageTypeSubscribed,
gin.H{"track_id": trackID},
).WithTrackID(strconv.FormatInt(trackID, 10))
client.sendStandardizedMessage(subscribedMsg)
}
// unsubscribeClient désabonne un client d'un track
func (h *PlaybackWebSocketHandler) unsubscribeClient(client *Client, trackID int64) {
h.mu.Lock()
defer h.mu.Unlock()
if clients, ok := h.clients[trackID]; ok {
delete(clients, client.conn)
if len(clients) == 0 {
delete(h.clients, trackID)
}
}
h.logger.Info("Client unsubscribed from track",
zap.String("user_id", client.userID.String()),
zap.Int64("track_id", trackID))
// INT-014: Envoyer un message de confirmation avec format standardisé
unsubscribedMsg := wsmsg.NewWebSocketMessage(
wsmsg.MessageTypeUnsubscribed,
gin.H{"track_id": trackID},
).WithTrackID(strconv.FormatInt(trackID, 10))
client.sendStandardizedMessage(unsubscribedMsg)
}
// unregisterClient retire un client de tous les tracks
func (h *PlaybackWebSocketHandler) unregisterClient(client *Client) {
h.mu.Lock()
defer h.mu.Unlock()
if client.trackID > 0 {
if clients, ok := h.clients[client.trackID]; ok {
delete(clients, client.conn)
if len(clients) == 0 {
delete(h.clients, client.trackID)
}
}
}
h.logger.Info("Client disconnected",
zap.String("user_id", client.userID.String()),
zap.Int64("track_id", client.trackID))
}
// broadcastMessages diffuse les messages à tous les clients abonnés
func (h *PlaybackWebSocketHandler) broadcastMessages() {
for message := range h.broadcast {
h.mu.RLock()
clients, ok := h.clients[message.TrackID]
if !ok {
h.mu.RUnlock()
continue
}
data, err := json.Marshal(message)
if err != nil {
h.mu.RUnlock()
h.logger.Error("Failed to marshal broadcast message",
zap.Error(err))
continue
}
// Envoyer le message à tous les clients abonnés
for _, client := range clients {
select {
case client.send <- data:
default:
close(client.send)
delete(clients, client.conn)
}
}
h.mu.RUnlock()
}
}
// BroadcastAnalyticsUpdate diffuse une mise à jour d'analytics à tous les clients abonnés
// T0368: Create Playback Analytics Real-time Updates
// INT-014: Updated to use standardized message format
func (h *PlaybackWebSocketHandler) BroadcastAnalyticsUpdate(trackID int64, analytics *models.PlaybackAnalytics) {
if analytics == nil {
return
}
// INT-014: Convert to legacy format for broadcast channel (backward compatibility)
// Note: Standardized format conversion happens in broadcastMessages()
legacyMsg := &BroadcastMessage{
TrackID: trackID,
Type: "analytics_update",
Data: analytics,
Timestamp: time.Now(),
}
select {
case h.broadcast <- legacyMsg:
default:
h.logger.Warn("Broadcast channel full, dropping message",
zap.Int64("track_id", trackID))
}
}
// BroadcastStatsUpdate diffuse une mise à jour de statistiques à tous les clients abonnés
// T0368: Create Playback Analytics Real-time Updates
// INT-014: Updated to use standardized message format
func (h *PlaybackWebSocketHandler) BroadcastStatsUpdate(trackID int64, stats *services.PlaybackStats) {
if stats == nil {
return
}
// INT-014: Convert to legacy format for broadcast channel (backward compatibility)
// Note: Standardized format conversion happens in broadcastMessages()
legacyMsg := &BroadcastMessage{
TrackID: trackID,
Type: "stats_update",
Data: stats,
Timestamp: time.Now(),
}
select {
case h.broadcast <- legacyMsg:
default:
h.logger.Warn("Broadcast channel full, dropping message",
zap.Int64("track_id", trackID))
}
}
// GetConnectedClientsCount retourne le nombre de clients connectés pour un track
func (h *PlaybackWebSocketHandler) GetConnectedClientsCount(trackID int64) int {
h.mu.RLock()
defer h.mu.RUnlock()
if clients, ok := h.clients[trackID]; ok {
return len(clients)
}
return 0
}
// GetTotalConnectedClientsCount retourne le nombre total de clients connectés
func (h *PlaybackWebSocketHandler) GetTotalConnectedClientsCount() int {
h.mu.RLock()
defer h.mu.RUnlock()
total := 0
for _, clients := range h.clients {
total += len(clients)
}
return total
}