veza/veza-backend-api/internal/services/hls_queue_service.go
2026-03-05 23:03:43 +01:00

167 lines
4.9 KiB
Go

package services
import (
"context"
"time"
"veza-backend-api/internal/models"
"github.com/google/uuid"
"go.uber.org/zap"
"gorm.io/gorm"
)
// HLSQueueService gère la queue de transcodage HLS
type HLSQueueService struct {
db *gorm.DB
logger *zap.Logger
}
// NewHLSQueueService crée un nouveau service de queue HLS
func NewHLSQueueService(db *gorm.DB, logger *zap.Logger) *HLSQueueService {
if logger == nil {
logger = zap.NewNop()
}
return &HLSQueueService{
db: db,
logger: logger,
}
}
// Enqueue ajoute un job de transcodage à la queue
func (s *HLSQueueService) Enqueue(ctx context.Context, trackID uuid.UUID, priority int) error {
_, err := s.EnqueueWithID(ctx, trackID, priority)
return err
}
// EnqueueWithID ajoute un job de transcodage à la queue et retourne le job ID
// T0343: Retourne le job ID pour l'endpoint de déclenchement
func (s *HLSQueueService) EnqueueWithID(ctx context.Context, trackID uuid.UUID, priority int) (uuid.UUID, error) {
// Vérifier si un job existe déjà pour ce track avec statut pending ou processing
var existingJob models.HLSTranscodeQueue
err := s.db.WithContext(ctx).
Where("track_id = ? AND status IN ?", trackID, []models.QueueStatus{models.QueueStatusPending, models.QueueStatusProcessing}).
First(&existingJob).Error
if err == nil {
// Un job existe déjà, retourner son ID
s.logger.Info("Job already exists for track", zap.String("track_id", trackID.String()), zap.String("job_id", existingJob.ID.String()))
return existingJob.ID, nil
}
if err != gorm.ErrRecordNotFound {
return uuid.Nil, err
}
job := &models.HLSTranscodeQueue{
TrackID: trackID,
Priority: priority,
Status: models.QueueStatusPending,
RetryCount: 0,
MaxRetries: 3,
}
if err := s.db.WithContext(ctx).Create(job).Error; err != nil {
return uuid.Nil, err
}
s.logger.Info("Job enqueued", zap.String("job_id", job.ID.String()), zap.String("track_id", trackID.String()), zap.Int("priority", priority))
return job.ID, nil
}
// Dequeue récupère le prochain job à traiter (par priorité puis date de création)
func (s *HLSQueueService) Dequeue(ctx context.Context) (*models.HLSTranscodeQueue, error) {
var job models.HLSTranscodeQueue
// Utiliser une transaction pour éviter les race conditions
err := s.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error {
// Récupérer le job avec la plus haute priorité et la plus ancienne date de création
err := tx.Where("status = ?", models.QueueStatusPending).
Order("priority DESC, created_at ASC").
First(&job).Error
if err != nil {
return err
}
// Mettre à jour le statut et la date de début
now := time.Now()
job.Status = models.QueueStatusProcessing
job.StartedAt = &now
return tx.Save(&job).Error
})
if err != nil {
if err == gorm.ErrRecordNotFound {
return nil, nil // Pas de job disponible
}
return nil, err
}
return &job, nil
}
// MarkCompleted marque un job comme terminé
func (s *HLSQueueService) MarkCompleted(ctx context.Context, jobID uuid.UUID) error {
now := time.Now()
return s.db.WithContext(ctx).Model(&models.HLSTranscodeQueue{}).
Where("id = ?", jobID).
Updates(map[string]interface{}{
"status": models.QueueStatusCompleted,
"completed_at": &now,
}).Error
}
// MarkFailed marque un job comme échoué
func (s *HLSQueueService) MarkFailed(ctx context.Context, jobID uuid.UUID, errorMessage string) error {
return s.db.WithContext(ctx).Model(&models.HLSTranscodeQueue{}).
Where("id = ?", jobID).
Updates(map[string]interface{}{
"status": models.QueueStatusFailed,
"error_message": errorMessage,
"completed_at": time.Now(),
}).Error
}
// RetryJob réessaie un job qui a échoué
func (s *HLSQueueService) RetryJob(ctx context.Context, jobID uuid.UUID) error {
var job models.HLSTranscodeQueue
if err := s.db.WithContext(ctx).First(&job, jobID).Error; err != nil {
return err
}
// Vérifier si on peut encore réessayer
if job.RetryCount >= job.MaxRetries {
return s.MarkFailed(ctx, jobID, "Max retries exceeded")
}
// Réinitialiser le job pour un nouvel essai
return s.db.WithContext(ctx).Model(&job).
Updates(map[string]interface{}{
"status": models.QueueStatusPending,
"retry_count": job.RetryCount + 1,
"error_message": nil,
"started_at": nil,
}).Error
}
// GetJob récupère un job par son ID
func (s *HLSQueueService) GetJob(ctx context.Context, jobID uuid.UUID) (*models.HLSTranscodeQueue, error) {
var job models.HLSTranscodeQueue
err := s.db.WithContext(ctx).Preload("Track").First(&job, jobID).Error
if err != nil {
return nil, err
}
return &job, nil
}
// GetPendingJobsCount retourne le nombre de jobs en attente
func (s *HLSQueueService) GetPendingJobsCount(ctx context.Context) (int64, error) {
var count int64
err := s.db.WithContext(ctx).Model(&models.HLSTranscodeQueue{}).
Where("status = ?", models.QueueStatusPending).
Count(&count).Error
return count, err
}