package workers import ( "context" "fmt" "time" "veza-backend-api/internal/models" "veza-backend-api/internal/services" "go.uber.org/zap" "gorm.io/gorm" ) // HLSTranscodeWorker gère le traitement de la queue de transcodage HLS type HLSTranscodeWorker struct { db *gorm.DB queueService *services.HLSQueueService transcodeService *services.HLSTranscodeService logger *zap.Logger processingWorkers int pollInterval time.Duration maxRetries int stopChan chan struct{} } // NewHLSTranscodeWorker crée un nouveau worker de transcodage HLS func NewHLSTranscodeWorker( db *gorm.DB, queueService *services.HLSQueueService, transcodeService *services.HLSTranscodeService, logger *zap.Logger, processingWorkers int, pollInterval time.Duration, ) *HLSTranscodeWorker { if logger == nil { logger = zap.NewNop() } if pollInterval == 0 { pollInterval = 5 * time.Second } if processingWorkers == 0 { processingWorkers = 1 } return &HLSTranscodeWorker{ db: db, queueService: queueService, transcodeService: transcodeService, logger: logger, processingWorkers: processingWorkers, pollInterval: pollInterval, maxRetries: 3, stopChan: make(chan struct{}), } } // Start démarre le worker func (w *HLSTranscodeWorker) Start(ctx context.Context) { w.logger.Info("Starting HLS transcode worker", zap.Int("workers", w.processingWorkers), zap.Duration("poll_interval", w.pollInterval)) for i := 0; i < w.processingWorkers; i++ { go w.processWorker(ctx, i) } } // Stop arrête le worker func (w *HLSTranscodeWorker) Stop() { w.logger.Info("Stopping HLS transcode worker") close(w.stopChan) } // processWorker traite les jobs de la queue func (w *HLSTranscodeWorker) processWorker(ctx context.Context, workerID int) { logger := w.logger.With(zap.Int("worker_id", workerID)) logger.Info("HLS transcode worker started") ticker := time.NewTicker(w.pollInterval) defer ticker.Stop() for { select { case <-ctx.Done(): logger.Info("HLS transcode worker stopping") return case <-w.stopChan: logger.Info("HLS transcode worker stopping") return case <-ticker.C: w.processNextJob(ctx, workerID, logger) } } } // processNextJob traite le prochain job disponible func (w *HLSTranscodeWorker) processNextJob(ctx context.Context, workerID int, logger *zap.Logger) { // Récupérer le prochain job job, err := w.queueService.Dequeue(ctx) if err != nil { logger.Error("Failed to dequeue job", zap.Error(err)) return } if job == nil { // Pas de job disponible return } logger = logger.With( zap.String("job_id", job.ID.String()), zap.Any("track_id", job.TrackID), // Changed to zap.Any for uuid.UUID zap.Int("retry_count", job.RetryCount)) logger.Info("Processing HLS transcode job") // Créer un contexte avec timeout pour le transcodage jobCtx, cancel := context.WithTimeout(ctx, 30*time.Minute) defer cancel() // Récupérer le track var track models.Track if err := w.db.WithContext(jobCtx).First(&track, job.TrackID).Error; err != nil { logger.Error("Failed to load track", zap.Error(err)) w.handleJobError(ctx, job, fmt.Errorf("failed to load track: %w", err), logger) return } // Transcoder le track _, err = w.transcodeService.TranscodeTrack(jobCtx, &track) if err != nil { logger.Error("Transcode failed", zap.Error(err)) w.handleJobError(ctx, job, err, logger) return } // Marquer le job comme terminé if err := w.queueService.MarkCompleted(ctx, job.ID); err != nil { logger.Error("Failed to mark job as completed", zap.Error(err)) return } logger.Info("HLS transcode job completed successfully") } // handleJobError gère les erreurs de traitement func (w *HLSTranscodeWorker) handleJobError(ctx context.Context, job *models.HLSTranscodeQueue, err error, logger *zap.Logger) { errorMsg := err.Error() // Vérifier si on peut réessayer if job.RetryCount < job.MaxRetries { logger.Info("Retrying job", zap.Int("retry_count", job.RetryCount+1), zap.Int("max_retries", job.MaxRetries)) // Réessayer le job avec exponential backoff retryErr := w.queueService.RetryJob(ctx, job.ID) if retryErr != nil { logger.Error("Failed to retry job", zap.Error(retryErr)) // Si on ne peut pas réessayer, marquer comme échoué w.queueService.MarkFailed(ctx, job.ID, fmt.Sprintf("Failed to retry: %v", retryErr)) } else { // Attendre avant de réessayer (exponential backoff) delay := time.Duration(job.RetryCount+1) * 30 * time.Second logger.Info("Job will be retried", zap.Duration("delay", delay)) } } else { // Max retries atteint, marquer comme échoué logger.Error("Job failed after max retries", zap.Int("max_retries", job.MaxRetries)) w.queueService.MarkFailed(ctx, job.ID, errorMsg) } }