package workers import ( "context" "time" "veza-backend-api/internal/core/education" "veza-backend-api/internal/services" "go.uber.org/zap" "gorm.io/gorm" ) // VideoTranscodeWorker processes video transcoding jobs for course lessons type VideoTranscodeWorker struct { db *gorm.DB transcodeService *services.VideoTranscodeService educationService *education.Service logger *zap.Logger pollInterval time.Duration stopChan chan struct{} } // NewVideoTranscodeWorker creates a new video transcoding worker func NewVideoTranscodeWorker( db *gorm.DB, transcodeService *services.VideoTranscodeService, educationService *education.Service, logger *zap.Logger, pollInterval time.Duration, ) *VideoTranscodeWorker { if logger == nil { logger = zap.NewNop() } if pollInterval == 0 { pollInterval = 10 * time.Second } return &VideoTranscodeWorker{ db: db, transcodeService: transcodeService, educationService: educationService, logger: logger, pollInterval: pollInterval, stopChan: make(chan struct{}), } } // Start begins polling for pending video transcoding jobs func (w *VideoTranscodeWorker) Start(ctx context.Context) { w.logger.Info("Starting video transcode worker", zap.Duration("poll_interval", w.pollInterval)) go w.processLoop(ctx) } // Stop stops the worker func (w *VideoTranscodeWorker) Stop() { w.logger.Info("Stopping video transcode worker") close(w.stopChan) } func (w *VideoTranscodeWorker) processLoop(ctx context.Context) { ticker := time.NewTicker(w.pollInterval) defer ticker.Stop() for { select { case <-ctx.Done(): return case <-w.stopChan: return case <-ticker.C: w.processNext(ctx) } } } func (w *VideoTranscodeWorker) processNext(ctx context.Context) { // Find a lesson with pending transcoding and a video file var lesson education.Lesson err := w.db.WithContext(ctx). Where("transcoding_status = ? AND video_file_path != ''", string(education.TranscodingPending)). Order("created_at ASC"). First(&lesson).Error if err != nil { return // no pending jobs or error } w.logger.Info("Processing video transcoding", zap.String("lesson_id", lesson.ID.String()), zap.String("video_path", lesson.VideoFilePath), ) // Mark as processing if err := w.educationService.UpdateLessonTranscoding(ctx, lesson.ID, education.TranscodingProcessing, "", 0); err != nil { w.logger.Error("Failed to mark lesson as processing", zap.Error(err)) return } // Transcode result, err := w.transcodeService.TranscodeVideo(ctx, lesson.ID, lesson.VideoFilePath) if err != nil { w.logger.Error("Video transcoding failed", zap.String("lesson_id", lesson.ID.String()), zap.Error(err), ) if updateErr := w.educationService.UpdateLessonTranscoding(ctx, lesson.ID, education.TranscodingFailed, "", 0); updateErr != nil { w.logger.Error("Failed to mark lesson as failed", zap.Error(updateErr)) } return } // Mark as complete if err := w.educationService.UpdateLessonTranscoding(ctx, lesson.ID, education.TranscodingComplete, result.MasterPlaylistURL, result.DurationSeconds); err != nil { w.logger.Error("Failed to mark lesson as complete", zap.Error(err)) return } w.logger.Info("Video transcoding completed", zap.String("lesson_id", lesson.ID.String()), zap.String("hls_url", result.MasterPlaylistURL), zap.Int("duration_seconds", result.DurationSeconds), ) }