veza/veza-backend-api/internal/workers/playback_analytics_worker.go

368 lines
9.6 KiB
Go
Raw Normal View History

2025-12-03 19:29:37 +00:00
package workers
import (
"context"
"fmt"
"sync"
"time"
"veza-backend-api/internal/models"
"veza-backend-api/internal/services"
"github.com/google/uuid"
"go.uber.org/zap"
"gorm.io/gorm"
)
// PlaybackAnalyticsWorker gère le traitement par lots des analytics de playback
// T0387: Create Playback Analytics Batch Processing
type PlaybackAnalyticsWorker struct {
db *gorm.DB
analyticsService *services.PlaybackAnalyticsService
logger *zap.Logger
queue chan AnalyticsJob
maxRetries int
processingWorkers int
batchSize int
batchTimeout time.Duration
stopChan chan struct{}
running bool
mu sync.RWMutex
}
// AnalyticsJob représente un job d'analytics à traiter
type AnalyticsJob struct {
ID uuid.UUID
Analytics *models.PlaybackAnalytics
Retries int
CreatedAt time.Time
Priority int // 1 = haut, 2 = moyen, 3 = bas
}
// Batch représente un lot d'analytics à traiter
type Batch struct {
Jobs []AnalyticsJob
CreatedAt time.Time
}
// NewPlaybackAnalyticsWorker crée un nouveau worker d'analytics
// T0387: Create Playback Analytics Batch Processing
func NewPlaybackAnalyticsWorker(
db *gorm.DB,
analyticsService *services.PlaybackAnalyticsService,
logger *zap.Logger,
queueSize int,
workers int,
maxRetries int,
batchSize int,
batchTimeout time.Duration,
) *PlaybackAnalyticsWorker {
if logger == nil {
logger = zap.NewNop()
}
if queueSize <= 0 {
queueSize = 1000 // Taille par défaut de la queue
}
if workers <= 0 {
workers = 3 // Nombre par défaut de workers
}
if maxRetries <= 0 {
maxRetries = 3 // Nombre par défaut de retries
}
if batchSize <= 0 {
batchSize = 100 // Taille par défaut du batch
}
if batchTimeout <= 0 {
batchTimeout = 5 * time.Second // Timeout par défaut pour former un batch
}
return &PlaybackAnalyticsWorker{
db: db,
analyticsService: analyticsService,
logger: logger,
queue: make(chan AnalyticsJob, queueSize),
maxRetries: maxRetries,
processingWorkers: workers,
batchSize: batchSize,
batchTimeout: batchTimeout,
stopChan: make(chan struct{}),
running: false,
}
}
// Enqueue ajoute un job d'analytics à la queue
// T0387: Create Playback Analytics Batch Processing
func (w *PlaybackAnalyticsWorker) Enqueue(analytics *models.PlaybackAnalytics, priority int) error {
if analytics == nil {
return fmt.Errorf("analytics cannot be nil")
}
job := AnalyticsJob{
ID: uuid.New(),
Analytics: analytics,
Retries: 0,
CreatedAt: time.Now(),
Priority: priority,
}
select {
case w.queue <- job:
w.logger.Debug("Analytics job enqueued",
zap.String("job_id", job.ID.String()),
zap.String("track_id", analytics.TrackID.String()),
zap.String("user_id", analytics.UserID.String()),
zap.Int("priority", priority))
return nil
default:
w.logger.Warn("Analytics queue full, dropping job",
zap.String("track_id", analytics.TrackID.String()),
zap.String("user_id", analytics.UserID.String()))
return fmt.Errorf("queue is full")
}
}
// EnqueueBatch ajoute plusieurs analytics à la queue
// T0387: Create Playback Analytics Batch Processing
func (w *PlaybackAnalyticsWorker) EnqueueBatch(analyticsList []*models.PlaybackAnalytics, priority int) error {
if len(analyticsList) == 0 {
return fmt.Errorf("analytics list cannot be empty")
}
enqueued := 0
for _, analytics := range analyticsList {
if err := w.Enqueue(analytics, priority); err != nil {
w.logger.Warn("Failed to enqueue analytics",
zap.Error(err),
zap.String("track_id", analytics.TrackID.String()))
continue
}
enqueued++
}
w.logger.Info("Batch enqueued",
zap.Int("total", len(analyticsList)),
zap.Int("enqueued", enqueued),
zap.Int("failed", len(analyticsList)-enqueued))
return nil
}
// Start démarre le worker
// T0387: Create Playback Analytics Batch Processing
func (w *PlaybackAnalyticsWorker) Start(ctx context.Context) {
w.mu.Lock()
if w.running {
w.mu.Unlock()
w.logger.Warn("Playback analytics worker is already running")
return
}
w.running = true
w.mu.Unlock()
w.logger.Info("Starting playback analytics worker",
zap.Int("workers", w.processingWorkers),
zap.Int("batch_size", w.batchSize),
zap.Duration("batch_timeout", w.batchTimeout))
for i := 0; i < w.processingWorkers; i++ {
go w.processWorker(ctx, i)
}
}
// Stop arrête le worker
// T0387: Create Playback Analytics Batch Processing
func (w *PlaybackAnalyticsWorker) Stop() {
w.mu.Lock()
defer w.mu.Unlock()
if !w.running {
return
}
w.logger.Info("Stopping playback analytics worker")
close(w.stopChan)
w.running = false
}
// IsRunning retourne si le worker est en cours d'exécution
func (w *PlaybackAnalyticsWorker) IsRunning() bool {
w.mu.RLock()
defer w.mu.RUnlock()
return w.running
}
// processWorker traite les jobs de la queue par lots
// T0387: Create Playback Analytics Batch Processing
func (w *PlaybackAnalyticsWorker) processWorker(ctx context.Context, workerID int) {
logger := w.logger.With(zap.Int("worker_id", workerID))
logger.Info("Playback analytics worker started")
for {
select {
case <-ctx.Done():
logger.Info("Playback analytics worker stopping")
return
case <-w.stopChan:
logger.Info("Playback analytics worker stopping (stop requested)")
return
default:
// Collecter les jobs pour former un batch
batch := w.collectBatch(ctx, workerID)
if len(batch.Jobs) > 0 {
w.processBatch(ctx, batch, workerID)
}
}
}
}
// collectBatch collecte les jobs pour former un batch
// T0387: Create Playback Analytics Batch Processing
2025-12-13 02:34:34 +00:00
func (w *PlaybackAnalyticsWorker) collectBatch(ctx context.Context, _ int) Batch {
2025-12-03 19:29:37 +00:00
batch := Batch{
Jobs: make([]AnalyticsJob, 0, w.batchSize),
CreatedAt: time.Now(),
}
timeout := time.NewTimer(w.batchTimeout)
defer timeout.Stop()
for len(batch.Jobs) < w.batchSize {
select {
case <-ctx.Done():
return batch
case <-w.stopChan:
return batch
case <-timeout.C:
// Timeout atteint, traiter le batch même s'il n'est pas plein
if len(batch.Jobs) > 0 {
return batch
}
// Réinitialiser le timeout si le batch est vide
timeout.Reset(w.batchTimeout)
case job := <-w.queue:
batch.Jobs = append(batch.Jobs, job)
// Si le batch est plein, le traiter immédiatement
if len(batch.Jobs) >= w.batchSize {
return batch
}
}
}
return batch
}
// processBatch traite un lot d'analytics
// T0387: Create Playback Analytics Batch Processing
func (w *PlaybackAnalyticsWorker) processBatch(ctx context.Context, batch Batch, workerID int) {
logger := w.logger.With(
zap.Int("worker_id", workerID),
zap.Int("batch_size", len(batch.Jobs)))
logger.Info("Processing analytics batch")
// Convertir les jobs en analytics
analyticsList := make([]*models.PlaybackAnalytics, 0, len(batch.Jobs))
for _, job := range batch.Jobs {
analyticsList = append(analyticsList, job.Analytics)
}
// Créer un contexte avec timeout pour le traitement du batch
batchCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
// Traiter le batch
startTime := time.Now()
err := w.analyticsService.RecordPlaybackBatch(batchCtx, analyticsList)
duration := time.Since(startTime)
if err != nil {
logger.Error("Batch processing failed",
zap.Error(err),
zap.Duration("duration", duration))
// Retry les jobs individuellement si le batch échoue
w.retryFailedJobs(ctx, batch.Jobs, err, workerID)
} else {
logger.Info("Batch processed successfully",
zap.Int("count", len(batch.Jobs)),
zap.Duration("duration", duration))
}
}
// retryFailedJobs réessaie les jobs qui ont échoué
// T0387: Create Playback Analytics Batch Processing
2025-12-13 02:34:34 +00:00
func (w *PlaybackAnalyticsWorker) retryFailedJobs(_ context.Context, jobs []AnalyticsJob, batchError error, workerID int) {
2025-12-03 19:29:37 +00:00
logger := w.logger.With(
zap.Int("worker_id", workerID),
zap.Int("failed_jobs", len(jobs)))
logger.Warn("Retrying failed analytics jobs",
zap.Error(batchError))
for _, job := range jobs {
// Vérifier si on peut encore retry
if job.Retries >= w.maxRetries {
logger.Error("Job exceeded max retries, dropping",
zap.String("job_id", job.ID.String()),
zap.Int("retries", job.Retries))
continue
}
// Incrémenter le compteur de retries
job.Retries++
// Exponential backoff via time.AfterFunc (non-blocking)
2025-12-03 19:29:37 +00:00
delay := time.Duration(job.Retries) * time.Second
2025-12-13 02:34:34 +00:00
// Capture variable for usage in closure
jobToRetry := job
2025-12-13 02:34:34 +00:00
time.AfterFunc(delay, func() {
// Ré-enqueue le job
select {
case w.queue <- jobToRetry:
logger.Debug("Job re-enqueued for retry",
zap.String("job_id", jobToRetry.ID.String()),
zap.Int("retries", jobToRetry.Retries))
default:
logger.Warn("Queue full, cannot retry job",
zap.String("job_id", jobToRetry.ID.String()))
}
})
2025-12-03 19:29:37 +00:00
}
}
// GetQueueSize retourne la taille actuelle de la queue
func (w *PlaybackAnalyticsWorker) GetQueueSize() int {
return len(w.queue)
}
// GetStats retourne les statistiques du worker
type WorkerStats struct {
Running bool `json:"running"`
QueueSize int `json:"queue_size"`
Workers int `json:"workers"`
MaxRetries int `json:"max_retries"`
BatchSize int `json:"batch_size"`
BatchTimeout time.Duration `json:"batch_timeout"`
}
func (w *PlaybackAnalyticsWorker) GetStats() WorkerStats {
w.mu.RLock()
defer w.mu.RUnlock()
return WorkerStats{
Running: w.running,
QueueSize: len(w.queue),
Workers: w.processingWorkers,
MaxRetries: w.maxRetries,
BatchSize: w.batchSize,
BatchTimeout: w.batchTimeout,
}
}