package services import ( "context" "time" "github.com/google/uuid" "veza-backend-api/internal/models" "go.uber.org/zap" "gorm.io/gorm" ) // HLSQueueService gère la queue de transcodage HLS type HLSQueueService struct { db *gorm.DB logger *zap.Logger } // NewHLSQueueService crée un nouveau service de queue HLS func NewHLSQueueService(db *gorm.DB, logger *zap.Logger) *HLSQueueService { if logger == nil { logger = zap.NewNop() } return &HLSQueueService{ db: db, logger: logger, } } // Enqueue ajoute un job de transcodage à la queue func (s *HLSQueueService) Enqueue(ctx context.Context, trackID uuid.UUID, priority int) error { _, err := s.EnqueueWithID(ctx, trackID, priority) return err } // EnqueueWithID ajoute un job de transcodage à la queue et retourne le job ID // T0343: Retourne le job ID pour l'endpoint de déclenchement func (s *HLSQueueService) EnqueueWithID(ctx context.Context, trackID uuid.UUID, priority int) (uuid.UUID, error) { // Vérifier si un job existe déjà pour ce track avec statut pending ou processing var existingJob models.HLSTranscodeQueue err := s.db.WithContext(ctx). Where("track_id = ? AND status IN ?", trackID, []models.QueueStatus{models.QueueStatusPending, models.QueueStatusProcessing}). First(&existingJob).Error if err == nil { // Un job existe déjà, retourner son ID s.logger.Info("Job already exists for track", zap.String("track_id", trackID.String()), zap.String("job_id", existingJob.ID.String())) return existingJob.ID, nil } if err != gorm.ErrRecordNotFound { return uuid.Nil, err } job := &models.HLSTranscodeQueue{ TrackID: trackID, Priority: priority, Status: models.QueueStatusPending, RetryCount: 0, MaxRetries: 3, } if err := s.db.WithContext(ctx).Create(job).Error; err != nil { return uuid.Nil, err } s.logger.Info("Job enqueued", zap.String("job_id", job.ID.String()), zap.String("track_id", trackID.String()), zap.Int("priority", priority)) return job.ID, nil } // Dequeue récupère le prochain job à traiter (par priorité puis date de création) func (s *HLSQueueService) Dequeue(ctx context.Context) (*models.HLSTranscodeQueue, error) { var job models.HLSTranscodeQueue // Utiliser une transaction pour éviter les race conditions err := s.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error { // Récupérer le job avec la plus haute priorité et la plus ancienne date de création err := tx.Where("status = ?", models.QueueStatusPending). Order("priority DESC, created_at ASC"). First(&job).Error if err != nil { return err } // Mettre à jour le statut et la date de début now := time.Now() job.Status = models.QueueStatusProcessing job.StartedAt = &now return tx.Save(&job).Error }) if err != nil { if err == gorm.ErrRecordNotFound { return nil, nil // Pas de job disponible } return nil, err } return &job, nil } // MarkCompleted marque un job comme terminé func (s *HLSQueueService) MarkCompleted(ctx context.Context, jobID uuid.UUID) error { now := time.Now() return s.db.WithContext(ctx).Model(&models.HLSTranscodeQueue{}). Where("id = ?", jobID). Updates(map[string]interface{}{ "status": models.QueueStatusCompleted, "completed_at": &now, }).Error } // MarkFailed marque un job comme échoué func (s *HLSQueueService) MarkFailed(ctx context.Context, jobID uuid.UUID, errorMessage string) error { return s.db.WithContext(ctx).Model(&models.HLSTranscodeQueue{}). Where("id = ?", jobID). Updates(map[string]interface{}{ "status": models.QueueStatusFailed, "error_message": errorMessage, "completed_at": time.Now(), }).Error } // RetryJob réessaie un job qui a échoué func (s *HLSQueueService) RetryJob(ctx context.Context, jobID uuid.UUID) error { var job models.HLSTranscodeQueue if err := s.db.WithContext(ctx).First(&job, jobID).Error; err != nil { return err } // Vérifier si on peut encore réessayer if job.RetryCount >= job.MaxRetries { return s.MarkFailed(ctx, jobID, "Max retries exceeded") } // Réinitialiser le job pour un nouvel essai return s.db.WithContext(ctx).Model(&job). Updates(map[string]interface{}{ "status": models.QueueStatusPending, "retry_count": job.RetryCount + 1, "error_message": nil, "started_at": nil, }).Error } // GetJob récupère un job par son ID func (s *HLSQueueService) GetJob(ctx context.Context, jobID uuid.UUID) (*models.HLSTranscodeQueue, error) { var job models.HLSTranscodeQueue err := s.db.WithContext(ctx).Preload("Track").First(&job, jobID).Error if err != nil { return nil, err } return &job, nil } // GetPendingJobsCount retourne le nombre de jobs en attente func (s *HLSQueueService) GetPendingJobsCount(ctx context.Context) (int64, error) { var count int64 err := s.db.WithContext(ctx).Model(&models.HLSTranscodeQueue{}). Where("status = ?", models.QueueStatusPending). Count(&count).Error return count, err }