veza/veza-backend-api/internal/services/hls_streaming_service_enhanced.go

358 lines
9.8 KiB
Go

package services
import (
"context"
"fmt"
"os"
"path/filepath"
"strings"
"time"
"github.com/google/uuid"
"go.uber.org/zap"
"gorm.io/gorm"
"veza-backend-api/internal/models"
)
// HLSStreamingService provides enhanced HLS streaming capabilities
// BE-SVC-012: Implement HLS streaming service
type HLSStreamingService struct {
db *gorm.DB
outputDir string
logger *zap.Logger
}
// NewHLSStreamingService creates a new enhanced HLS streaming service
func NewHLSStreamingService(db *gorm.DB, outputDir string, logger *zap.Logger) *HLSStreamingService {
if logger == nil {
logger = zap.NewNop()
}
return &HLSStreamingService{
db: db,
outputDir: outputDir,
logger: logger,
}
}
// StreamHealth represents the health status of an HLS stream
type StreamHealth struct {
TrackID uuid.UUID `json:"track_id"`
Status models.HLSStreamStatus `json:"status"`
IsHealthy bool `json:"is_healthy"`
PlaylistExists bool `json:"playlist_exists"`
SegmentsValid bool `json:"segments_valid"`
MissingSegments []string `json:"missing_segments,omitempty"`
LastChecked time.Time `json:"last_checked"`
}
// ValidateStream validates that an HLS stream is complete and accessible
func (s *HLSStreamingService) ValidateStream(
ctx context.Context,
trackID uuid.UUID,
) (*StreamHealth, error) {
var stream models.HLSStream
if err := s.db.WithContext(ctx).
Where("track_id = ?", trackID).
First(&stream).Error; err != nil {
if err == gorm.ErrRecordNotFound {
return nil, fmt.Errorf("HLS stream not found for track %s", trackID)
}
return nil, fmt.Errorf("failed to query HLS stream: %w", err)
}
health := &StreamHealth{
TrackID: trackID,
Status: stream.Status,
LastChecked: time.Now(),
}
// Check if stream is ready
if stream.Status != models.HLSStatusReady {
health.IsHealthy = false
return health, nil
}
// Check if master playlist exists
trackDir := filepath.Join(s.outputDir, fmt.Sprintf("track_%s", trackID))
masterPlaylistPath := filepath.Join(trackDir, "master.m3u8")
if _, err := os.Stat(masterPlaylistPath); err == nil {
health.PlaylistExists = true
} else {
health.IsHealthy = false
return health, nil
}
// Validate segments for each bitrate
bitrates := stream.Bitrates
if len(bitrates) == 0 {
health.IsHealthy = false
return health, nil
}
allSegmentsValid := true
var missingSegments []string
for _, bitrate := range bitrates {
qualityDir := filepath.Join(trackDir, fmt.Sprintf("%dk", bitrate))
playlistPath := filepath.Join(qualityDir, "playlist.m3u8")
// Read playlist to get segment list
playlistContent, err := os.ReadFile(playlistPath)
if err != nil {
allSegmentsValid = false
missingSegments = append(missingSegments, fmt.Sprintf("%dk/playlist.m3u8", bitrate))
continue
}
// Parse playlist to find segment files
segments := s.parsePlaylistSegments(string(playlistContent))
for _, segment := range segments {
segmentPath := filepath.Join(qualityDir, segment)
if _, err := os.Stat(segmentPath); os.IsNotExist(err) {
allSegmentsValid = false
missingSegments = append(missingSegments, fmt.Sprintf("%dk/%s", bitrate, segment))
}
}
}
health.SegmentsValid = allSegmentsValid
health.MissingSegments = missingSegments
health.IsHealthy = health.PlaylistExists && allSegmentsValid
return health, nil
}
// parsePlaylistSegments extracts segment filenames from an HLS playlist
func (s *HLSStreamingService) parsePlaylistSegments(playlistContent string) []string {
var segments []string
lines := strings.Split(playlistContent, "\n")
for _, line := range lines {
line = strings.TrimSpace(line)
// Skip comments and empty lines
if line == "" || strings.HasPrefix(line, "#") {
continue
}
// Segment files are non-comment lines
if strings.HasSuffix(line, ".ts") {
segments = append(segments, filepath.Base(line))
}
}
return segments
}
// GetStreamURLs returns URLs for accessing an HLS stream
func (s *HLSStreamingService) GetStreamURLs(
ctx context.Context,
trackID uuid.UUID,
baseURL string,
) (map[string]string, error) {
var stream models.HLSStream
if err := s.db.WithContext(ctx).
Where("track_id = ? AND status = ?", trackID, models.HLSStatusReady).
First(&stream).Error; err != nil {
if err == gorm.ErrRecordNotFound {
return nil, fmt.Errorf("HLS stream not found for track %s", trackID)
}
return nil, fmt.Errorf("failed to query HLS stream: %w", err)
}
urls := make(map[string]string)
// Master playlist URL
urls["master_playlist"] = fmt.Sprintf("%s/tracks/%s/hls/master.m3u8", baseURL, trackID)
// Quality playlist URLs
for _, bitrate := range stream.Bitrates {
key := fmt.Sprintf("quality_%d", bitrate)
urls[key] = fmt.Sprintf("%s/tracks/%s/hls/%dk/playlist.m3u8", baseURL, trackID, bitrate)
}
return urls, nil
}
// CleanupStream removes HLS stream files and database record
func (s *HLSStreamingService) CleanupStream(
ctx context.Context,
trackID uuid.UUID,
) error {
var stream models.HLSStream
if err := s.db.WithContext(ctx).
Where("track_id = ?", trackID).
First(&stream).Error; err != nil {
if err == gorm.ErrRecordNotFound {
return fmt.Errorf("HLS stream not found for track %s", trackID)
}
return fmt.Errorf("failed to query HLS stream: %w", err)
}
// Delete files
trackDir := filepath.Join(s.outputDir, fmt.Sprintf("track_%s", trackID))
if err := os.RemoveAll(trackDir); err != nil {
s.logger.Warn("Failed to remove HLS stream directory",
zap.String("track_id", trackID.String()),
zap.String("directory", trackDir),
zap.Error(err),
)
}
// Delete database record
if err := s.db.WithContext(ctx).Delete(&stream).Error; err != nil {
return fmt.Errorf("failed to delete HLS stream record: %w", err)
}
s.logger.Info("HLS stream cleaned up",
zap.String("track_id", trackID.String()),
zap.String("stream_id", stream.ID.String()),
)
return nil
}
// GetStreamStatistics returns statistics about HLS streams
func (s *HLSStreamingService) GetStreamStatistics(
ctx context.Context,
) (map[string]interface{}, error) {
var totalStreams int64
var readyStreams int64
var processingStreams int64
var failedStreams int64
// Count total streams
if err := s.db.WithContext(ctx).
Model(&models.HLSStream{}).
Count(&totalStreams).Error; err != nil {
return nil, fmt.Errorf("failed to count total streams: %w", err)
}
// Count by status
if err := s.db.WithContext(ctx).
Model(&models.HLSStream{}).
Where("status = ?", models.HLSStatusReady).
Count(&readyStreams).Error; err != nil {
return nil, fmt.Errorf("failed to count ready streams: %w", err)
}
if err := s.db.WithContext(ctx).
Model(&models.HLSStream{}).
Where("status = ?", models.HLSStatusProcessing).
Count(&processingStreams).Error; err != nil {
return nil, fmt.Errorf("failed to count processing streams: %w", err)
}
if err := s.db.WithContext(ctx).
Model(&models.HLSStream{}).
Where("status = ?", models.HLSStatusFailed).
Count(&failedStreams).Error; err != nil {
return nil, fmt.Errorf("failed to count failed streams: %w", err)
}
stats := map[string]interface{}{
"total_streams": totalStreams,
"ready_streams": readyStreams,
"processing_streams": processingStreams,
"failed_streams": failedStreams,
"ready_percentage": 0.0,
}
if totalStreams > 0 {
stats["ready_percentage"] = float64(readyStreams) / float64(totalStreams) * 100
}
return stats, nil
}
// ListStreams lists HLS streams with optional filtering
func (s *HLSStreamingService) ListStreams(
ctx context.Context,
status *models.HLSStreamStatus,
limit, offset int,
) ([]models.HLSStream, int64, error) {
query := s.db.WithContext(ctx).Model(&models.HLSStream{})
// Filter by status if provided
if status != nil {
query = query.Where("status = ?", *status)
}
// Get total count
var total int64
if err := query.Count(&total).Error; err != nil {
return nil, 0, fmt.Errorf("failed to count streams: %w", err)
}
// Get streams with pagination
var streams []models.HLSStream
if err := query.
Order("created_at DESC").
Limit(limit).
Offset(offset).
Find(&streams).Error; err != nil {
return nil, 0, fmt.Errorf("failed to list streams: %w", err)
}
return streams, total, nil
}
// UpdateStreamStatus updates the status of an HLS stream
func (s *HLSStreamingService) UpdateStreamStatus(
ctx context.Context,
trackID uuid.UUID,
status models.HLSStreamStatus,
) error {
result := s.db.WithContext(ctx).
Model(&models.HLSStream{}).
Where("track_id = ?", trackID).
Update("status", status)
if result.Error != nil {
return fmt.Errorf("failed to update stream status: %w", result.Error)
}
if result.RowsAffected == 0 {
return fmt.Errorf("HLS stream not found for track %s", trackID)
}
s.logger.Info("HLS stream status updated",
zap.String("track_id", trackID.String()),
zap.String("status", string(status)),
)
return nil
}
// GetStreamByTrackID retrieves an HLS stream by track ID
func (s *HLSStreamingService) GetStreamByTrackID(
ctx context.Context,
trackID uuid.UUID,
) (*models.HLSStream, error) {
var stream models.HLSStream
if err := s.db.WithContext(ctx).
Where("track_id = ?", trackID).
First(&stream).Error; err != nil {
if err == gorm.ErrRecordNotFound {
return nil, fmt.Errorf("HLS stream not found for track %s", trackID)
}
return nil, fmt.Errorf("failed to query HLS stream: %w", err)
}
return &stream, nil
}
// CheckStreamExists checks if an HLS stream exists for a track
func (s *HLSStreamingService) CheckStreamExists(
ctx context.Context,
trackID uuid.UUID,
) (bool, error) {
var count int64
if err := s.db.WithContext(ctx).
Model(&models.HLSStream{}).
Where("track_id = ?", trackID).
Count(&count).Error; err != nil {
return false, fmt.Errorf("failed to check stream existence: %w", err)
}
return count > 0, nil
}