363 lines
9.4 KiB
Go
363 lines
9.4 KiB
Go
package workers
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"sync"
|
|
"time"
|
|
|
|
"veza-backend-api/internal/models"
|
|
"veza-backend-api/internal/services"
|
|
|
|
"github.com/google/uuid"
|
|
"go.uber.org/zap"
|
|
"gorm.io/gorm"
|
|
)
|
|
|
|
// PlaybackAnalyticsWorker gère le traitement par lots des analytics de playback
|
|
// T0387: Create Playback Analytics Batch Processing
|
|
type PlaybackAnalyticsWorker struct {
|
|
db *gorm.DB
|
|
analyticsService *services.PlaybackAnalyticsService
|
|
logger *zap.Logger
|
|
queue chan AnalyticsJob
|
|
maxRetries int
|
|
processingWorkers int
|
|
batchSize int
|
|
batchTimeout time.Duration
|
|
stopChan chan struct{}
|
|
running bool
|
|
mu sync.RWMutex
|
|
}
|
|
|
|
// AnalyticsJob représente un job d'analytics à traiter
|
|
type AnalyticsJob struct {
|
|
ID uuid.UUID
|
|
Analytics *models.PlaybackAnalytics
|
|
Retries int
|
|
CreatedAt time.Time
|
|
Priority int // 1 = haut, 2 = moyen, 3 = bas
|
|
}
|
|
|
|
// Batch représente un lot d'analytics à traiter
|
|
type Batch struct {
|
|
Jobs []AnalyticsJob
|
|
CreatedAt time.Time
|
|
}
|
|
|
|
// NewPlaybackAnalyticsWorker crée un nouveau worker d'analytics
|
|
// T0387: Create Playback Analytics Batch Processing
|
|
func NewPlaybackAnalyticsWorker(
|
|
db *gorm.DB,
|
|
analyticsService *services.PlaybackAnalyticsService,
|
|
logger *zap.Logger,
|
|
queueSize int,
|
|
workers int,
|
|
maxRetries int,
|
|
batchSize int,
|
|
batchTimeout time.Duration,
|
|
) *PlaybackAnalyticsWorker {
|
|
if logger == nil {
|
|
logger = zap.NewNop()
|
|
}
|
|
if queueSize <= 0 {
|
|
queueSize = 1000 // Taille par défaut de la queue
|
|
}
|
|
if workers <= 0 {
|
|
workers = 3 // Nombre par défaut de workers
|
|
}
|
|
if maxRetries <= 0 {
|
|
maxRetries = 3 // Nombre par défaut de retries
|
|
}
|
|
if batchSize <= 0 {
|
|
batchSize = 100 // Taille par défaut du batch
|
|
}
|
|
if batchTimeout <= 0 {
|
|
batchTimeout = 5 * time.Second // Timeout par défaut pour former un batch
|
|
}
|
|
|
|
return &PlaybackAnalyticsWorker{
|
|
db: db,
|
|
analyticsService: analyticsService,
|
|
logger: logger,
|
|
queue: make(chan AnalyticsJob, queueSize),
|
|
maxRetries: maxRetries,
|
|
processingWorkers: workers,
|
|
batchSize: batchSize,
|
|
batchTimeout: batchTimeout,
|
|
stopChan: make(chan struct{}),
|
|
running: false,
|
|
}
|
|
}
|
|
|
|
// Enqueue ajoute un job d'analytics à la queue
|
|
// T0387: Create Playback Analytics Batch Processing
|
|
func (w *PlaybackAnalyticsWorker) Enqueue(analytics *models.PlaybackAnalytics, priority int) error {
|
|
if analytics == nil {
|
|
return fmt.Errorf("analytics cannot be nil")
|
|
}
|
|
|
|
job := AnalyticsJob{
|
|
ID: uuid.New(),
|
|
Analytics: analytics,
|
|
Retries: 0,
|
|
CreatedAt: time.Now(),
|
|
Priority: priority,
|
|
}
|
|
|
|
select {
|
|
case w.queue <- job:
|
|
w.logger.Debug("Analytics job enqueued",
|
|
zap.String("job_id", job.ID.String()),
|
|
zap.String("track_id", analytics.TrackID.String()),
|
|
zap.String("user_id", analytics.UserID.String()),
|
|
zap.Int("priority", priority))
|
|
return nil
|
|
default:
|
|
w.logger.Warn("Analytics queue full, dropping job",
|
|
zap.String("track_id", analytics.TrackID.String()),
|
|
zap.String("user_id", analytics.UserID.String()))
|
|
return fmt.Errorf("queue is full")
|
|
}
|
|
}
|
|
|
|
// EnqueueBatch ajoute plusieurs analytics à la queue
|
|
// T0387: Create Playback Analytics Batch Processing
|
|
func (w *PlaybackAnalyticsWorker) EnqueueBatch(analyticsList []*models.PlaybackAnalytics, priority int) error {
|
|
if len(analyticsList) == 0 {
|
|
return fmt.Errorf("analytics list cannot be empty")
|
|
}
|
|
|
|
enqueued := 0
|
|
for _, analytics := range analyticsList {
|
|
if err := w.Enqueue(analytics, priority); err != nil {
|
|
w.logger.Warn("Failed to enqueue analytics",
|
|
zap.Error(err),
|
|
zap.String("track_id", analytics.TrackID.String()))
|
|
continue
|
|
}
|
|
enqueued++
|
|
}
|
|
|
|
w.logger.Info("Batch enqueued",
|
|
zap.Int("total", len(analyticsList)),
|
|
zap.Int("enqueued", enqueued),
|
|
zap.Int("failed", len(analyticsList)-enqueued))
|
|
|
|
return nil
|
|
}
|
|
|
|
// Start démarre le worker
|
|
// T0387: Create Playback Analytics Batch Processing
|
|
func (w *PlaybackAnalyticsWorker) Start(ctx context.Context) {
|
|
w.mu.Lock()
|
|
if w.running {
|
|
w.mu.Unlock()
|
|
w.logger.Warn("Playback analytics worker is already running")
|
|
return
|
|
}
|
|
w.running = true
|
|
w.mu.Unlock()
|
|
|
|
w.logger.Info("Starting playback analytics worker",
|
|
zap.Int("workers", w.processingWorkers),
|
|
zap.Int("batch_size", w.batchSize),
|
|
zap.Duration("batch_timeout", w.batchTimeout))
|
|
|
|
for i := 0; i < w.processingWorkers; i++ {
|
|
go w.processWorker(ctx, i)
|
|
}
|
|
}
|
|
|
|
// Stop arrête le worker
|
|
// T0387: Create Playback Analytics Batch Processing
|
|
func (w *PlaybackAnalyticsWorker) Stop() {
|
|
w.mu.Lock()
|
|
defer w.mu.Unlock()
|
|
|
|
if !w.running {
|
|
return
|
|
}
|
|
|
|
w.logger.Info("Stopping playback analytics worker")
|
|
close(w.stopChan)
|
|
w.running = false
|
|
}
|
|
|
|
// IsRunning retourne si le worker est en cours d'exécution
|
|
func (w *PlaybackAnalyticsWorker) IsRunning() bool {
|
|
w.mu.RLock()
|
|
defer w.mu.RUnlock()
|
|
return w.running
|
|
}
|
|
|
|
// processWorker traite les jobs de la queue par lots
|
|
// T0387: Create Playback Analytics Batch Processing
|
|
func (w *PlaybackAnalyticsWorker) processWorker(ctx context.Context, workerID int) {
|
|
logger := w.logger.With(zap.Int("worker_id", workerID))
|
|
logger.Info("Playback analytics worker started")
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
logger.Info("Playback analytics worker stopping")
|
|
return
|
|
|
|
case <-w.stopChan:
|
|
logger.Info("Playback analytics worker stopping (stop requested)")
|
|
return
|
|
|
|
default:
|
|
// Collecter les jobs pour former un batch
|
|
batch := w.collectBatch(ctx, workerID)
|
|
if len(batch.Jobs) > 0 {
|
|
w.processBatch(ctx, batch, workerID)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// collectBatch collecte les jobs pour former un batch
|
|
// T0387: Create Playback Analytics Batch Processing
|
|
func (w *PlaybackAnalyticsWorker) collectBatch(ctx context.Context, workerID int) Batch {
|
|
batch := Batch{
|
|
Jobs: make([]AnalyticsJob, 0, w.batchSize),
|
|
CreatedAt: time.Now(),
|
|
}
|
|
|
|
timeout := time.NewTimer(w.batchTimeout)
|
|
defer timeout.Stop()
|
|
|
|
for len(batch.Jobs) < w.batchSize {
|
|
select {
|
|
case <-ctx.Done():
|
|
return batch
|
|
|
|
case <-w.stopChan:
|
|
return batch
|
|
|
|
case <-timeout.C:
|
|
// Timeout atteint, traiter le batch même s'il n'est pas plein
|
|
if len(batch.Jobs) > 0 {
|
|
return batch
|
|
}
|
|
// Réinitialiser le timeout si le batch est vide
|
|
timeout.Reset(w.batchTimeout)
|
|
|
|
case job := <-w.queue:
|
|
batch.Jobs = append(batch.Jobs, job)
|
|
// Si le batch est plein, le traiter immédiatement
|
|
if len(batch.Jobs) >= w.batchSize {
|
|
return batch
|
|
}
|
|
}
|
|
}
|
|
|
|
return batch
|
|
}
|
|
|
|
// processBatch traite un lot d'analytics
|
|
// T0387: Create Playback Analytics Batch Processing
|
|
func (w *PlaybackAnalyticsWorker) processBatch(ctx context.Context, batch Batch, workerID int) {
|
|
logger := w.logger.With(
|
|
zap.Int("worker_id", workerID),
|
|
zap.Int("batch_size", len(batch.Jobs)))
|
|
|
|
logger.Info("Processing analytics batch")
|
|
|
|
// Convertir les jobs en analytics
|
|
analyticsList := make([]*models.PlaybackAnalytics, 0, len(batch.Jobs))
|
|
for _, job := range batch.Jobs {
|
|
analyticsList = append(analyticsList, job.Analytics)
|
|
}
|
|
|
|
// Créer un contexte avec timeout pour le traitement du batch
|
|
batchCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
|
|
defer cancel()
|
|
|
|
// Traiter le batch
|
|
startTime := time.Now()
|
|
err := w.analyticsService.RecordPlaybackBatch(batchCtx, analyticsList)
|
|
duration := time.Since(startTime)
|
|
|
|
if err != nil {
|
|
logger.Error("Batch processing failed",
|
|
zap.Error(err),
|
|
zap.Duration("duration", duration))
|
|
|
|
// Retry les jobs individuellement si le batch échoue
|
|
w.retryFailedJobs(ctx, batch.Jobs, err, workerID)
|
|
} else {
|
|
logger.Info("Batch processed successfully",
|
|
zap.Int("count", len(batch.Jobs)),
|
|
zap.Duration("duration", duration))
|
|
}
|
|
}
|
|
|
|
// retryFailedJobs réessaie les jobs qui ont échoué
|
|
// T0387: Create Playback Analytics Batch Processing
|
|
func (w *PlaybackAnalyticsWorker) retryFailedJobs(ctx context.Context, jobs []AnalyticsJob, batchError error, workerID int) {
|
|
logger := w.logger.With(
|
|
zap.Int("worker_id", workerID),
|
|
zap.Int("failed_jobs", len(jobs)))
|
|
|
|
logger.Warn("Retrying failed analytics jobs",
|
|
zap.Error(batchError))
|
|
|
|
for _, job := range jobs {
|
|
// Vérifier si on peut encore retry
|
|
if job.Retries >= w.maxRetries {
|
|
logger.Error("Job exceeded max retries, dropping",
|
|
zap.String("job_id", job.ID.String()),
|
|
zap.Int("retries", job.Retries))
|
|
continue
|
|
}
|
|
|
|
// Incrémenter le compteur de retries
|
|
job.Retries++
|
|
|
|
// Exponential backoff
|
|
delay := time.Duration(job.Retries) * time.Second
|
|
time.Sleep(delay)
|
|
|
|
// Ré-enqueue le job
|
|
select {
|
|
case w.queue <- job:
|
|
logger.Debug("Job re-enqueued for retry",
|
|
zap.String("job_id", job.ID.String()),
|
|
zap.Int("retries", job.Retries))
|
|
default:
|
|
logger.Warn("Queue full, cannot retry job",
|
|
zap.String("job_id", job.ID.String()))
|
|
}
|
|
}
|
|
}
|
|
|
|
// GetQueueSize retourne la taille actuelle de la queue
|
|
func (w *PlaybackAnalyticsWorker) GetQueueSize() int {
|
|
return len(w.queue)
|
|
}
|
|
|
|
// GetStats retourne les statistiques du worker
|
|
type WorkerStats struct {
|
|
Running bool `json:"running"`
|
|
QueueSize int `json:"queue_size"`
|
|
Workers int `json:"workers"`
|
|
MaxRetries int `json:"max_retries"`
|
|
BatchSize int `json:"batch_size"`
|
|
BatchTimeout time.Duration `json:"batch_timeout"`
|
|
}
|
|
|
|
func (w *PlaybackAnalyticsWorker) GetStats() WorkerStats {
|
|
w.mu.RLock()
|
|
defer w.mu.RUnlock()
|
|
|
|
return WorkerStats{
|
|
Running: w.running,
|
|
QueueSize: len(w.queue),
|
|
Workers: w.processingWorkers,
|
|
MaxRetries: w.maxRetries,
|
|
BatchSize: w.batchSize,
|
|
BatchTimeout: w.batchTimeout,
|
|
}
|
|
}
|