veza/veza-backend-api/internal/workers/hls_transcode_worker.go
2025-12-12 21:34:34 -05:00

176 lines
4.8 KiB
Go

package workers
import (
"context"
"fmt"
"time"
"veza-backend-api/internal/models"
"veza-backend-api/internal/services"
"go.uber.org/zap"
"gorm.io/gorm"
)
// HLSTranscodeWorker gère le traitement de la queue de transcodage HLS
type HLSTranscodeWorker struct {
db *gorm.DB
queueService *services.HLSQueueService
transcodeService *services.HLSTranscodeService
logger *zap.Logger
processingWorkers int
pollInterval time.Duration
maxRetries int
stopChan chan struct{}
}
// NewHLSTranscodeWorker crée un nouveau worker de transcodage HLS
func NewHLSTranscodeWorker(
db *gorm.DB,
queueService *services.HLSQueueService,
transcodeService *services.HLSTranscodeService,
logger *zap.Logger,
processingWorkers int,
pollInterval time.Duration,
) *HLSTranscodeWorker {
if logger == nil {
logger = zap.NewNop()
}
if pollInterval == 0 {
pollInterval = 5 * time.Second
}
if processingWorkers == 0 {
processingWorkers = 1
}
return &HLSTranscodeWorker{
db: db,
queueService: queueService,
transcodeService: transcodeService,
logger: logger,
processingWorkers: processingWorkers,
pollInterval: pollInterval,
maxRetries: 3,
stopChan: make(chan struct{}),
}
}
// Start démarre le worker
func (w *HLSTranscodeWorker) Start(ctx context.Context) {
w.logger.Info("Starting HLS transcode worker",
zap.Int("workers", w.processingWorkers),
zap.Duration("poll_interval", w.pollInterval))
for i := 0; i < w.processingWorkers; i++ {
go w.processWorker(ctx, i)
}
}
// Stop arrête le worker
func (w *HLSTranscodeWorker) Stop() {
w.logger.Info("Stopping HLS transcode worker")
close(w.stopChan)
}
// processWorker traite les jobs de la queue
func (w *HLSTranscodeWorker) processWorker(ctx context.Context, workerID int) {
logger := w.logger.With(zap.Int("worker_id", workerID))
logger.Info("HLS transcode worker started")
ticker := time.NewTicker(w.pollInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
logger.Info("HLS transcode worker stopping")
return
case <-w.stopChan:
logger.Info("HLS transcode worker stopping")
return
case <-ticker.C:
w.processNextJob(ctx, workerID, logger)
}
}
}
// processNextJob traite le prochain job disponible
func (w *HLSTranscodeWorker) processNextJob(ctx context.Context, _ int, logger *zap.Logger) {
// Récupérer le prochain job
job, err := w.queueService.Dequeue(ctx)
if err != nil {
logger.Error("Failed to dequeue job", zap.Error(err))
return
}
if job == nil {
// Pas de job disponible
return
}
logger = logger.With(
zap.String("job_id", job.ID.String()),
zap.Any("track_id", job.TrackID), // Changed to zap.Any for uuid.UUID
zap.Int("retry_count", job.RetryCount))
logger.Info("Processing HLS transcode job")
// Créer un contexte avec timeout pour le transcodage
jobCtx, cancel := context.WithTimeout(ctx, 30*time.Minute)
defer cancel()
// Récupérer le track
var track models.Track
if err := w.db.WithContext(jobCtx).First(&track, job.TrackID).Error; err != nil {
logger.Error("Failed to load track", zap.Error(err))
w.handleJobError(ctx, job, fmt.Errorf("failed to load track: %w", err), logger)
return
}
// Transcoder le track
_, err = w.transcodeService.TranscodeTrack(jobCtx, &track)
if err != nil {
logger.Error("Transcode failed", zap.Error(err))
w.handleJobError(ctx, job, err, logger)
return
}
// Marquer le job comme terminé
if err := w.queueService.MarkCompleted(ctx, job.ID); err != nil {
logger.Error("Failed to mark job as completed", zap.Error(err))
return
}
logger.Info("HLS transcode job completed successfully")
}
// handleJobError gère les erreurs de traitement
func (w *HLSTranscodeWorker) handleJobError(ctx context.Context, job *models.HLSTranscodeQueue, err error, logger *zap.Logger) {
errorMsg := err.Error()
// Vérifier si on peut réessayer
if job.RetryCount < job.MaxRetries {
logger.Info("Retrying job",
zap.Int("retry_count", job.RetryCount+1),
zap.Int("max_retries", job.MaxRetries))
// Réessayer le job avec exponential backoff
retryErr := w.queueService.RetryJob(ctx, job.ID)
if retryErr != nil {
logger.Error("Failed to retry job", zap.Error(retryErr))
// Si on ne peut pas réessayer, marquer comme échoué
w.queueService.MarkFailed(ctx, job.ID, fmt.Sprintf("Failed to retry: %v", retryErr))
} else {
// Attendre avant de réessayer (exponential backoff)
delay := time.Duration(job.RetryCount+1) * 30 * time.Second
logger.Info("Job will be retried", zap.Duration("delay", delay))
}
} else {
// Max retries atteint, marquer comme échoué
logger.Error("Job failed after max retries",
zap.Int("max_retries", job.MaxRetries))
w.queueService.MarkFailed(ctx, job.ID, errorMsg)
}
}