veza/veza-backend-api/internal/workers/video_transcode_worker.go

124 lines
3.4 KiB
Go
Raw Normal View History

package workers
import (
"context"
"time"
"veza-backend-api/internal/core/education"
"veza-backend-api/internal/services"
"go.uber.org/zap"
"gorm.io/gorm"
)
// VideoTranscodeWorker processes video transcoding jobs for course lessons
type VideoTranscodeWorker struct {
db *gorm.DB
transcodeService *services.VideoTranscodeService
educationService *education.Service
logger *zap.Logger
pollInterval time.Duration
stopChan chan struct{}
}
// NewVideoTranscodeWorker creates a new video transcoding worker
func NewVideoTranscodeWorker(
db *gorm.DB,
transcodeService *services.VideoTranscodeService,
educationService *education.Service,
logger *zap.Logger,
pollInterval time.Duration,
) *VideoTranscodeWorker {
if logger == nil {
logger = zap.NewNop()
}
if pollInterval == 0 {
pollInterval = 10 * time.Second
}
return &VideoTranscodeWorker{
db: db,
transcodeService: transcodeService,
educationService: educationService,
logger: logger,
pollInterval: pollInterval,
stopChan: make(chan struct{}),
}
}
// Start begins polling for pending video transcoding jobs
func (w *VideoTranscodeWorker) Start(ctx context.Context) {
w.logger.Info("Starting video transcode worker", zap.Duration("poll_interval", w.pollInterval))
go w.processLoop(ctx)
}
// Stop stops the worker
func (w *VideoTranscodeWorker) Stop() {
w.logger.Info("Stopping video transcode worker")
close(w.stopChan)
}
func (w *VideoTranscodeWorker) processLoop(ctx context.Context) {
ticker := time.NewTicker(w.pollInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-w.stopChan:
return
case <-ticker.C:
w.processNext(ctx)
}
}
}
func (w *VideoTranscodeWorker) processNext(ctx context.Context) {
// Find a lesson with pending transcoding and a video file
var lesson education.Lesson
err := w.db.WithContext(ctx).
Where("transcoding_status = ? AND video_file_path != ''", string(education.TranscodingPending)).
Order("created_at ASC").
First(&lesson).Error
if err != nil {
return // no pending jobs or error
}
w.logger.Info("Processing video transcoding",
zap.String("lesson_id", lesson.ID.String()),
zap.String("video_path", lesson.VideoFilePath),
)
// Mark as processing
if err := w.educationService.UpdateLessonTranscoding(ctx, lesson.ID, education.TranscodingProcessing, "", 0); err != nil {
w.logger.Error("Failed to mark lesson as processing", zap.Error(err))
return
}
// Transcode
result, err := w.transcodeService.TranscodeVideo(ctx, lesson.ID, lesson.VideoFilePath)
if err != nil {
w.logger.Error("Video transcoding failed",
zap.String("lesson_id", lesson.ID.String()),
zap.Error(err),
)
if updateErr := w.educationService.UpdateLessonTranscoding(ctx, lesson.ID, education.TranscodingFailed, "", 0); updateErr != nil {
w.logger.Error("Failed to mark lesson as failed", zap.Error(updateErr))
}
return
}
// Mark as complete
if err := w.educationService.UpdateLessonTranscoding(ctx, lesson.ID, education.TranscodingComplete, result.MasterPlaylistURL, result.DurationSeconds); err != nil {
w.logger.Error("Failed to mark lesson as complete", zap.Error(err))
return
}
w.logger.Info("Video transcoding completed",
zap.String("lesson_id", lesson.ID.String()),
zap.String("hls_url", result.MasterPlaylistURL),
zap.Int("duration_seconds", result.DurationSeconds),
)
}