package services import ( "context" "fmt" "os" "path/filepath" "strings" "time" "github.com/google/uuid" "go.uber.org/zap" "gorm.io/gorm" "veza-backend-api/internal/models" ) // HLSStreamingService provides enhanced HLS streaming capabilities // BE-SVC-012: Implement HLS streaming service type HLSStreamingService struct { db *gorm.DB outputDir string logger *zap.Logger } // NewHLSStreamingService creates a new enhanced HLS streaming service func NewHLSStreamingService(db *gorm.DB, outputDir string, logger *zap.Logger) *HLSStreamingService { if logger == nil { logger = zap.NewNop() } return &HLSStreamingService{ db: db, outputDir: outputDir, logger: logger, } } // StreamHealth represents the health status of an HLS stream type StreamHealth struct { TrackID uuid.UUID `json:"track_id"` Status models.HLSStreamStatus `json:"status"` IsHealthy bool `json:"is_healthy"` PlaylistExists bool `json:"playlist_exists"` SegmentsValid bool `json:"segments_valid"` MissingSegments []string `json:"missing_segments,omitempty"` LastChecked time.Time `json:"last_checked"` } // ValidateStream validates that an HLS stream is complete and accessible func (s *HLSStreamingService) ValidateStream( ctx context.Context, trackID uuid.UUID, ) (*StreamHealth, error) { var stream models.HLSStream if err := s.db.WithContext(ctx). Where("track_id = ?", trackID). First(&stream).Error; err != nil { if err == gorm.ErrRecordNotFound { return nil, fmt.Errorf("HLS stream not found for track %s", trackID) } return nil, fmt.Errorf("failed to query HLS stream: %w", err) } health := &StreamHealth{ TrackID: trackID, Status: stream.Status, LastChecked: time.Now(), } // Check if stream is ready if stream.Status != models.HLSStatusReady { health.IsHealthy = false return health, nil } // Check if master playlist exists trackDir := filepath.Join(s.outputDir, fmt.Sprintf("track_%s", trackID)) masterPlaylistPath := filepath.Join(trackDir, "master.m3u8") if _, err := os.Stat(masterPlaylistPath); err == nil { health.PlaylistExists = true } else { health.IsHealthy = false return health, nil } // Validate segments for each bitrate bitrates := stream.Bitrates if len(bitrates) == 0 { health.IsHealthy = false return health, nil } allSegmentsValid := true var missingSegments []string for _, bitrate := range bitrates { qualityDir := filepath.Join(trackDir, fmt.Sprintf("%dk", bitrate)) playlistPath := filepath.Join(qualityDir, "playlist.m3u8") // Read playlist to get segment list playlistContent, err := os.ReadFile(playlistPath) if err != nil { allSegmentsValid = false missingSegments = append(missingSegments, fmt.Sprintf("%dk/playlist.m3u8", bitrate)) continue } // Parse playlist to find segment files segments := s.parsePlaylistSegments(string(playlistContent)) for _, segment := range segments { segmentPath := filepath.Join(qualityDir, segment) if _, err := os.Stat(segmentPath); os.IsNotExist(err) { allSegmentsValid = false missingSegments = append(missingSegments, fmt.Sprintf("%dk/%s", bitrate, segment)) } } } health.SegmentsValid = allSegmentsValid health.MissingSegments = missingSegments health.IsHealthy = health.PlaylistExists && allSegmentsValid return health, nil } // parsePlaylistSegments extracts segment filenames from an HLS playlist func (s *HLSStreamingService) parsePlaylistSegments(playlistContent string) []string { var segments []string lines := strings.Split(playlistContent, "\n") for _, line := range lines { line = strings.TrimSpace(line) // Skip comments and empty lines if line == "" || strings.HasPrefix(line, "#") { continue } // Segment files are non-comment lines if strings.HasSuffix(line, ".ts") { segments = append(segments, filepath.Base(line)) } } return segments } // GetStreamURLs returns URLs for accessing an HLS stream func (s *HLSStreamingService) GetStreamURLs( ctx context.Context, trackID uuid.UUID, baseURL string, ) (map[string]string, error) { var stream models.HLSStream if err := s.db.WithContext(ctx). Where("track_id = ? AND status = ?", trackID, models.HLSStatusReady). First(&stream).Error; err != nil { if err == gorm.ErrRecordNotFound { return nil, fmt.Errorf("HLS stream not found for track %s", trackID) } return nil, fmt.Errorf("failed to query HLS stream: %w", err) } urls := make(map[string]string) // Master playlist URL urls["master_playlist"] = fmt.Sprintf("%s/tracks/%s/hls/master.m3u8", baseURL, trackID) // Quality playlist URLs for _, bitrate := range stream.Bitrates { key := fmt.Sprintf("quality_%d", bitrate) urls[key] = fmt.Sprintf("%s/tracks/%s/hls/%dk/playlist.m3u8", baseURL, trackID, bitrate) } return urls, nil } // CleanupStream removes HLS stream files and database record func (s *HLSStreamingService) CleanupStream( ctx context.Context, trackID uuid.UUID, ) error { var stream models.HLSStream if err := s.db.WithContext(ctx). Where("track_id = ?", trackID). First(&stream).Error; err != nil { if err == gorm.ErrRecordNotFound { return fmt.Errorf("HLS stream not found for track %s", trackID) } return fmt.Errorf("failed to query HLS stream: %w", err) } // Delete files trackDir := filepath.Join(s.outputDir, fmt.Sprintf("track_%s", trackID)) if err := os.RemoveAll(trackDir); err != nil { s.logger.Warn("Failed to remove HLS stream directory", zap.String("track_id", trackID.String()), zap.String("directory", trackDir), zap.Error(err), ) } // Delete database record if err := s.db.WithContext(ctx).Delete(&stream).Error; err != nil { return fmt.Errorf("failed to delete HLS stream record: %w", err) } s.logger.Info("HLS stream cleaned up", zap.String("track_id", trackID.String()), zap.String("stream_id", stream.ID.String()), ) return nil } // GetStreamStatistics returns statistics about HLS streams func (s *HLSStreamingService) GetStreamStatistics( ctx context.Context, ) (map[string]interface{}, error) { var totalStreams int64 var readyStreams int64 var processingStreams int64 var failedStreams int64 // Count total streams if err := s.db.WithContext(ctx). Model(&models.HLSStream{}). Count(&totalStreams).Error; err != nil { return nil, fmt.Errorf("failed to count total streams: %w", err) } // Count by status if err := s.db.WithContext(ctx). Model(&models.HLSStream{}). Where("status = ?", models.HLSStatusReady). Count(&readyStreams).Error; err != nil { return nil, fmt.Errorf("failed to count ready streams: %w", err) } if err := s.db.WithContext(ctx). Model(&models.HLSStream{}). Where("status = ?", models.HLSStatusProcessing). Count(&processingStreams).Error; err != nil { return nil, fmt.Errorf("failed to count processing streams: %w", err) } if err := s.db.WithContext(ctx). Model(&models.HLSStream{}). Where("status = ?", models.HLSStatusFailed). Count(&failedStreams).Error; err != nil { return nil, fmt.Errorf("failed to count failed streams: %w", err) } stats := map[string]interface{}{ "total_streams": totalStreams, "ready_streams": readyStreams, "processing_streams": processingStreams, "failed_streams": failedStreams, "ready_percentage": 0.0, } if totalStreams > 0 { stats["ready_percentage"] = float64(readyStreams) / float64(totalStreams) * 100 } return stats, nil } // ListStreams lists HLS streams with optional filtering func (s *HLSStreamingService) ListStreams( ctx context.Context, status *models.HLSStreamStatus, limit, offset int, ) ([]models.HLSStream, int64, error) { query := s.db.WithContext(ctx).Model(&models.HLSStream{}) // Filter by status if provided if status != nil { query = query.Where("status = ?", *status) } // Get total count var total int64 if err := query.Count(&total).Error; err != nil { return nil, 0, fmt.Errorf("failed to count streams: %w", err) } // Get streams with pagination var streams []models.HLSStream if err := query. Order("created_at DESC"). Limit(limit). Offset(offset). Find(&streams).Error; err != nil { return nil, 0, fmt.Errorf("failed to list streams: %w", err) } return streams, total, nil } // UpdateStreamStatus updates the status of an HLS stream func (s *HLSStreamingService) UpdateStreamStatus( ctx context.Context, trackID uuid.UUID, status models.HLSStreamStatus, ) error { result := s.db.WithContext(ctx). Model(&models.HLSStream{}). Where("track_id = ?", trackID). Update("status", status) if result.Error != nil { return fmt.Errorf("failed to update stream status: %w", result.Error) } if result.RowsAffected == 0 { return fmt.Errorf("HLS stream not found for track %s", trackID) } s.logger.Info("HLS stream status updated", zap.String("track_id", trackID.String()), zap.String("status", string(status)), ) return nil } // GetStreamByTrackID retrieves an HLS stream by track ID func (s *HLSStreamingService) GetStreamByTrackID( ctx context.Context, trackID uuid.UUID, ) (*models.HLSStream, error) { var stream models.HLSStream if err := s.db.WithContext(ctx). Where("track_id = ?", trackID). First(&stream).Error; err != nil { if err == gorm.ErrRecordNotFound { return nil, fmt.Errorf("HLS stream not found for track %s", trackID) } return nil, fmt.Errorf("failed to query HLS stream: %w", err) } return &stream, nil } // CheckStreamExists checks if an HLS stream exists for a track func (s *HLSStreamingService) CheckStreamExists( ctx context.Context, trackID uuid.UUID, ) (bool, error) { var count int64 if err := s.db.WithContext(ctx). Model(&models.HLSStream{}). Where("track_id = ?", trackID). Count(&count).Error; err != nil { return false, fmt.Errorf("failed to check stream existence: %w", err) } return count > 0, nil }