176 lines
4.8 KiB
Go
176 lines
4.8 KiB
Go
package workers
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"time"
|
|
|
|
"veza-backend-api/internal/models"
|
|
"veza-backend-api/internal/services"
|
|
|
|
"go.uber.org/zap"
|
|
"gorm.io/gorm"
|
|
)
|
|
|
|
// HLSTranscodeWorker gère le traitement de la queue de transcodage HLS
|
|
type HLSTranscodeWorker struct {
|
|
db *gorm.DB
|
|
queueService *services.HLSQueueService
|
|
transcodeService *services.HLSTranscodeService
|
|
logger *zap.Logger
|
|
processingWorkers int
|
|
pollInterval time.Duration
|
|
maxRetries int
|
|
stopChan chan struct{}
|
|
}
|
|
|
|
// NewHLSTranscodeWorker crée un nouveau worker de transcodage HLS
|
|
func NewHLSTranscodeWorker(
|
|
db *gorm.DB,
|
|
queueService *services.HLSQueueService,
|
|
transcodeService *services.HLSTranscodeService,
|
|
logger *zap.Logger,
|
|
processingWorkers int,
|
|
pollInterval time.Duration,
|
|
) *HLSTranscodeWorker {
|
|
if logger == nil {
|
|
logger = zap.NewNop()
|
|
}
|
|
if pollInterval == 0 {
|
|
pollInterval = 5 * time.Second
|
|
}
|
|
if processingWorkers == 0 {
|
|
processingWorkers = 1
|
|
}
|
|
|
|
return &HLSTranscodeWorker{
|
|
db: db,
|
|
queueService: queueService,
|
|
transcodeService: transcodeService,
|
|
logger: logger,
|
|
processingWorkers: processingWorkers,
|
|
pollInterval: pollInterval,
|
|
maxRetries: 3,
|
|
stopChan: make(chan struct{}),
|
|
}
|
|
}
|
|
|
|
// Start démarre le worker
|
|
func (w *HLSTranscodeWorker) Start(ctx context.Context) {
|
|
w.logger.Info("Starting HLS transcode worker",
|
|
zap.Int("workers", w.processingWorkers),
|
|
zap.Duration("poll_interval", w.pollInterval))
|
|
|
|
for i := 0; i < w.processingWorkers; i++ {
|
|
go w.processWorker(ctx, i)
|
|
}
|
|
}
|
|
|
|
// Stop arrête le worker
|
|
func (w *HLSTranscodeWorker) Stop() {
|
|
w.logger.Info("Stopping HLS transcode worker")
|
|
close(w.stopChan)
|
|
}
|
|
|
|
// processWorker traite les jobs de la queue
|
|
func (w *HLSTranscodeWorker) processWorker(ctx context.Context, workerID int) {
|
|
logger := w.logger.With(zap.Int("worker_id", workerID))
|
|
logger.Info("HLS transcode worker started")
|
|
|
|
ticker := time.NewTicker(w.pollInterval)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
logger.Info("HLS transcode worker stopping")
|
|
return
|
|
|
|
case <-w.stopChan:
|
|
logger.Info("HLS transcode worker stopping")
|
|
return
|
|
|
|
case <-ticker.C:
|
|
w.processNextJob(ctx, workerID, logger)
|
|
}
|
|
}
|
|
}
|
|
|
|
// processNextJob traite le prochain job disponible
|
|
func (w *HLSTranscodeWorker) processNextJob(ctx context.Context, _ int, logger *zap.Logger) {
|
|
// Récupérer le prochain job
|
|
job, err := w.queueService.Dequeue(ctx)
|
|
if err != nil {
|
|
logger.Error("Failed to dequeue job", zap.Error(err))
|
|
return
|
|
}
|
|
|
|
if job == nil {
|
|
// Pas de job disponible
|
|
return
|
|
}
|
|
|
|
logger = logger.With(
|
|
zap.String("job_id", job.ID.String()),
|
|
zap.Any("track_id", job.TrackID), // Changed to zap.Any for uuid.UUID
|
|
zap.Int("retry_count", job.RetryCount))
|
|
|
|
logger.Info("Processing HLS transcode job")
|
|
|
|
// Créer un contexte avec timeout pour le transcodage
|
|
jobCtx, cancel := context.WithTimeout(ctx, 30*time.Minute)
|
|
defer cancel()
|
|
|
|
// Récupérer le track
|
|
var track models.Track
|
|
if err := w.db.WithContext(jobCtx).First(&track, job.TrackID).Error; err != nil {
|
|
logger.Error("Failed to load track", zap.Error(err))
|
|
w.handleJobError(ctx, job, fmt.Errorf("failed to load track: %w", err), logger)
|
|
return
|
|
}
|
|
|
|
// Transcoder le track
|
|
_, err = w.transcodeService.TranscodeTrack(jobCtx, &track)
|
|
if err != nil {
|
|
logger.Error("Transcode failed", zap.Error(err))
|
|
w.handleJobError(ctx, job, err, logger)
|
|
return
|
|
}
|
|
|
|
// Marquer le job comme terminé
|
|
if err := w.queueService.MarkCompleted(ctx, job.ID); err != nil {
|
|
logger.Error("Failed to mark job as completed", zap.Error(err))
|
|
return
|
|
}
|
|
|
|
logger.Info("HLS transcode job completed successfully")
|
|
}
|
|
|
|
// handleJobError gère les erreurs de traitement
|
|
func (w *HLSTranscodeWorker) handleJobError(ctx context.Context, job *models.HLSTranscodeQueue, err error, logger *zap.Logger) {
|
|
errorMsg := err.Error()
|
|
|
|
// Vérifier si on peut réessayer
|
|
if job.RetryCount < job.MaxRetries {
|
|
logger.Info("Retrying job",
|
|
zap.Int("retry_count", job.RetryCount+1),
|
|
zap.Int("max_retries", job.MaxRetries))
|
|
|
|
// Réessayer le job avec exponential backoff
|
|
retryErr := w.queueService.RetryJob(ctx, job.ID)
|
|
if retryErr != nil {
|
|
logger.Error("Failed to retry job", zap.Error(retryErr))
|
|
// Si on ne peut pas réessayer, marquer comme échoué
|
|
w.queueService.MarkFailed(ctx, job.ID, fmt.Sprintf("Failed to retry: %v", retryErr))
|
|
} else {
|
|
// Attendre avant de réessayer (exponential backoff)
|
|
delay := time.Duration(job.RetryCount+1) * 30 * time.Second
|
|
logger.Info("Job will be retried", zap.Duration("delay", delay))
|
|
}
|
|
} else {
|
|
// Max retries atteint, marquer comme échoué
|
|
logger.Error("Job failed after max retries",
|
|
zap.Int("max_retries", job.MaxRetries))
|
|
w.queueService.MarkFailed(ctx, job.ID, errorMsg)
|
|
}
|
|
}
|