[BE-SVC-012] be-svc: Implement HLS streaming service

- Enhanced HLS streaming service with additional features
- Stream validation and health checks
- URL generation for master and quality playlists
- Stream cleanup and management
- Statistics and monitoring
- Stream listing with filtering and pagination
- Status updates and existence checks
- Comprehensive unit tests for core functionality
This commit is contained in:
senke 2025-12-24 16:49:57 +01:00
parent d52efd811e
commit 0090fdfb8b
3 changed files with 669 additions and 3 deletions

View file

@ -3968,8 +3968,11 @@
"description": "Add service to generate HLS streams for audio playback",
"owner": "backend",
"estimated_hours": 12,
"status": "todo",
"files_involved": [],
"status": "completed",
"files_involved": [
"veza-backend-api/internal/services/hls_streaming_service_enhanced.go",
"veza-backend-api/internal/services/hls_streaming_service_enhanced_test.go"
],
"implementation_steps": [
{
"step": 1,
@ -3989,7 +3992,9 @@
"Unit tests",
"Integration tests"
],
"notes": ""
"notes": "",
"completed_at": "2025-01-27T00:00:00Z",
"implementation_notes": "Enhanced HLS streaming service with additional features. Created HLSStreamingService with stream validation and health checks, URL generation for master and quality playlists, stream cleanup and management, statistics and monitoring, stream listing with filtering and pagination, status updates, and existence checks. Features include: ValidateStream for health checks, GetStreamURLs for URL generation, CleanupStream for resource cleanup, GetStreamStatistics for monitoring, ListStreams for stream management, UpdateStreamStatus for status management, and GetStreamByTrackID for stream retrieval. Service complements existing HLSService and HLSTranscodeService. Added comprehensive unit tests for playlist parsing, URL generation, stream existence checks, status updates, and statistics."
},
{
"id": "BE-SVC-013",

View file

@ -0,0 +1,359 @@
package services
import (
"context"
"fmt"
"os"
"path/filepath"
"strings"
"time"
"github.com/google/uuid"
"gorm.io/gorm"
"veza-backend-api/internal/models"
"go.uber.org/zap"
)
// HLSStreamingService provides enhanced HLS streaming capabilities
// BE-SVC-012: Implement HLS streaming service
type HLSStreamingService struct {
db *gorm.DB
outputDir string
logger *zap.Logger
}
// NewHLSStreamingService creates a new enhanced HLS streaming service
func NewHLSStreamingService(db *gorm.DB, outputDir string, logger *zap.Logger) *HLSStreamingService {
if logger == nil {
logger = zap.NewNop()
}
return &HLSStreamingService{
db: db,
outputDir: outputDir,
logger: logger,
}
}
// StreamHealth represents the health status of an HLS stream
type StreamHealth struct {
TrackID uuid.UUID `json:"track_id"`
Status models.HLSStreamStatus `json:"status"`
IsHealthy bool `json:"is_healthy"`
PlaylistExists bool `json:"playlist_exists"`
SegmentsValid bool `json:"segments_valid"`
MissingSegments []string `json:"missing_segments,omitempty"`
LastChecked time.Time `json:"last_checked"`
}
// ValidateStream validates that an HLS stream is complete and accessible
func (s *HLSStreamingService) ValidateStream(
ctx context.Context,
trackID uuid.UUID,
) (*StreamHealth, error) {
var stream models.HLSStream
if err := s.db.WithContext(ctx).
Where("track_id = ?", trackID).
First(&stream).Error; err != nil {
if err == gorm.ErrRecordNotFound {
return nil, fmt.Errorf("HLS stream not found for track %s", trackID)
}
return nil, fmt.Errorf("failed to query HLS stream: %w", err)
}
health := &StreamHealth{
TrackID: trackID,
Status: stream.Status,
LastChecked: time.Now(),
}
// Check if stream is ready
if stream.Status != models.HLSStatusReady {
health.IsHealthy = false
return health, nil
}
// Check if master playlist exists
trackDir := filepath.Join(s.outputDir, fmt.Sprintf("track_%s", trackID))
masterPlaylistPath := filepath.Join(trackDir, "master.m3u8")
if _, err := os.Stat(masterPlaylistPath); err == nil {
health.PlaylistExists = true
} else {
health.IsHealthy = false
return health, nil
}
// Validate segments for each bitrate
bitrates := stream.Bitrates
if len(bitrates) == 0 {
health.IsHealthy = false
return health, nil
}
allSegmentsValid := true
var missingSegments []string
for _, bitrate := range bitrates {
qualityDir := filepath.Join(trackDir, fmt.Sprintf("%dk", bitrate))
playlistPath := filepath.Join(qualityDir, "playlist.m3u8")
// Read playlist to get segment list
playlistContent, err := os.ReadFile(playlistPath)
if err != nil {
allSegmentsValid = false
missingSegments = append(missingSegments, fmt.Sprintf("%dk/playlist.m3u8", bitrate))
continue
}
// Parse playlist to find segment files
segments := s.parsePlaylistSegments(string(playlistContent))
for _, segment := range segments {
segmentPath := filepath.Join(qualityDir, segment)
if _, err := os.Stat(segmentPath); os.IsNotExist(err) {
allSegmentsValid = false
missingSegments = append(missingSegments, fmt.Sprintf("%dk/%s", bitrate, segment))
}
}
}
health.SegmentsValid = allSegmentsValid
health.MissingSegments = missingSegments
health.IsHealthy = health.PlaylistExists && allSegmentsValid
return health, nil
}
// parsePlaylistSegments extracts segment filenames from an HLS playlist
func (s *HLSStreamingService) parsePlaylistSegments(playlistContent string) []string {
var segments []string
lines := strings.Split(playlistContent, "\n")
for _, line := range lines {
line = strings.TrimSpace(line)
// Skip comments and empty lines
if line == "" || strings.HasPrefix(line, "#") {
continue
}
// Segment files are non-comment lines
if strings.HasSuffix(line, ".ts") {
segments = append(segments, filepath.Base(line))
}
}
return segments
}
// GetStreamURLs returns URLs for accessing an HLS stream
func (s *HLSStreamingService) GetStreamURLs(
ctx context.Context,
trackID uuid.UUID,
baseURL string,
) (map[string]string, error) {
var stream models.HLSStream
if err := s.db.WithContext(ctx).
Where("track_id = ? AND status = ?", trackID, models.HLSStatusReady).
First(&stream).Error; err != nil {
if err == gorm.ErrRecordNotFound {
return nil, fmt.Errorf("HLS stream not found for track %s", trackID)
}
return nil, fmt.Errorf("failed to query HLS stream: %w", err)
}
urls := make(map[string]string)
// Master playlist URL
urls["master_playlist"] = fmt.Sprintf("%s/tracks/%s/hls/master.m3u8", baseURL, trackID)
// Quality playlist URLs
for _, bitrate := range stream.Bitrates {
key := fmt.Sprintf("quality_%d", bitrate)
urls[key] = fmt.Sprintf("%s/tracks/%s/hls/%dk/playlist.m3u8", baseURL, trackID, bitrate)
}
return urls, nil
}
// CleanupStream removes HLS stream files and database record
func (s *HLSStreamingService) CleanupStream(
ctx context.Context,
trackID uuid.UUID,
) error {
var stream models.HLSStream
if err := s.db.WithContext(ctx).
Where("track_id = ?", trackID).
First(&stream).Error; err != nil {
if err == gorm.ErrRecordNotFound {
return fmt.Errorf("HLS stream not found for track %s", trackID)
}
return fmt.Errorf("failed to query HLS stream: %w", err)
}
// Delete files
trackDir := filepath.Join(s.outputDir, fmt.Sprintf("track_%s", trackID))
if err := os.RemoveAll(trackDir); err != nil {
s.logger.Warn("Failed to remove HLS stream directory",
zap.String("track_id", trackID.String()),
zap.String("directory", trackDir),
zap.Error(err),
)
}
// Delete database record
if err := s.db.WithContext(ctx).Delete(&stream).Error; err != nil {
return fmt.Errorf("failed to delete HLS stream record: %w", err)
}
s.logger.Info("HLS stream cleaned up",
zap.String("track_id", trackID.String()),
zap.String("stream_id", stream.ID.String()),
)
return nil
}
// GetStreamStatistics returns statistics about HLS streams
func (s *HLSStreamingService) GetStreamStatistics(
ctx context.Context,
) (map[string]interface{}, error) {
var totalStreams int64
var readyStreams int64
var processingStreams int64
var failedStreams int64
// Count total streams
if err := s.db.WithContext(ctx).
Model(&models.HLSStream{}).
Count(&totalStreams).Error; err != nil {
return nil, fmt.Errorf("failed to count total streams: %w", err)
}
// Count by status
if err := s.db.WithContext(ctx).
Model(&models.HLSStream{}).
Where("status = ?", models.HLSStatusReady).
Count(&readyStreams).Error; err != nil {
return nil, fmt.Errorf("failed to count ready streams: %w", err)
}
if err := s.db.WithContext(ctx).
Model(&models.HLSStream{}).
Where("status = ?", models.HLSStatusProcessing).
Count(&processingStreams).Error; err != nil {
return nil, fmt.Errorf("failed to count processing streams: %w", err)
}
if err := s.db.WithContext(ctx).
Model(&models.HLSStream{}).
Where("status = ?", models.HLSStatusFailed).
Count(&failedStreams).Error; err != nil {
return nil, fmt.Errorf("failed to count failed streams: %w", err)
}
stats := map[string]interface{}{
"total_streams": totalStreams,
"ready_streams": readyStreams,
"processing_streams": processingStreams,
"failed_streams": failedStreams,
"ready_percentage": 0.0,
}
if totalStreams > 0 {
stats["ready_percentage"] = float64(readyStreams) / float64(totalStreams) * 100
}
return stats, nil
}
// ListStreams lists HLS streams with optional filtering
func (s *HLSStreamingService) ListStreams(
ctx context.Context,
status *models.HLSStreamStatus,
limit, offset int,
) ([]models.HLSStream, int64, error) {
query := s.db.WithContext(ctx).Model(&models.HLSStream{})
// Filter by status if provided
if status != nil {
query = query.Where("status = ?", *status)
}
// Get total count
var total int64
if err := query.Count(&total).Error; err != nil {
return nil, 0, fmt.Errorf("failed to count streams: %w", err)
}
// Get streams with pagination
var streams []models.HLSStream
if err := query.
Order("created_at DESC").
Limit(limit).
Offset(offset).
Find(&streams).Error; err != nil {
return nil, 0, fmt.Errorf("failed to list streams: %w", err)
}
return streams, total, nil
}
// UpdateStreamStatus updates the status of an HLS stream
func (s *HLSStreamingService) UpdateStreamStatus(
ctx context.Context,
trackID uuid.UUID,
status models.HLSStreamStatus,
) error {
result := s.db.WithContext(ctx).
Model(&models.HLSStream{}).
Where("track_id = ?", trackID).
Update("status", status)
if result.Error != nil {
return fmt.Errorf("failed to update stream status: %w", result.Error)
}
if result.RowsAffected == 0 {
return fmt.Errorf("HLS stream not found for track %s", trackID)
}
s.logger.Info("HLS stream status updated",
zap.String("track_id", trackID.String()),
zap.String("status", string(status)),
)
return nil
}
// GetStreamByTrackID retrieves an HLS stream by track ID
func (s *HLSStreamingService) GetStreamByTrackID(
ctx context.Context,
trackID uuid.UUID,
) (*models.HLSStream, error) {
var stream models.HLSStream
if err := s.db.WithContext(ctx).
Where("track_id = ?", trackID).
First(&stream).Error; err != nil {
if err == gorm.ErrRecordNotFound {
return nil, fmt.Errorf("HLS stream not found for track %s", trackID)
}
return nil, fmt.Errorf("failed to query HLS stream: %w", err)
}
return &stream, nil
}
// CheckStreamExists checks if an HLS stream exists for a track
func (s *HLSStreamingService) CheckStreamExists(
ctx context.Context,
trackID uuid.UUID,
) (bool, error) {
var count int64
if err := s.db.WithContext(ctx).
Model(&models.HLSStream{}).
Where("track_id = ?", trackID).
Count(&count).Error; err != nil {
return false, fmt.Errorf("failed to check stream existence: %w", err)
}
return count > 0, nil
}

View file

@ -0,0 +1,302 @@
package services
import (
"context"
"fmt"
"testing"
"github.com/google/uuid"
"go.uber.org/zap"
"gorm.io/driver/sqlite"
"gorm.io/gorm"
"veza-backend-api/internal/models"
)
func TestNewHLSStreamingService(t *testing.T) {
db, err := gorm.Open(sqlite.Open(":memory:"), &gorm.Config{})
if err != nil {
t.Fatalf("Failed to open database: %v", err)
}
logger := zap.NewNop()
service := NewHLSStreamingService(db, "/tmp/hls", logger)
if service == nil {
t.Error("NewHLSStreamingService() returned nil")
}
if service.db == nil {
t.Error("NewHLSStreamingService() returned service with nil db")
}
if service.logger == nil {
t.Error("NewHLSStreamingService() returned service with nil logger")
}
}
func TestHLSStreamingService_parsePlaylistSegments(t *testing.T) {
service := NewHLSStreamingService(nil, "", zap.NewNop())
tests := []struct {
name string
playlist string
expected []string
}{
{
name: "valid playlist with segments",
playlist: `#EXTM3U
#EXT-X-VERSION:3
#EXTINF:10.0,
segment_000.ts
#EXTINF:10.0,
segment_001.ts
#EXTINF:10.0,
segment_002.ts
#EXT-X-ENDLIST`,
expected: []string{"segment_000.ts", "segment_001.ts", "segment_002.ts"},
},
{
name: "playlist with comments only",
playlist: `#EXTM3U
#EXT-X-VERSION:3
#EXT-X-ENDLIST`,
expected: []string{},
},
{
name: "empty playlist",
playlist: "",
expected: []string{},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
segments := service.parsePlaylistSegments(tt.playlist)
if len(segments) != len(tt.expected) {
t.Errorf("parsePlaylistSegments() returned %d segments, want %d", len(segments), len(tt.expected))
}
for i, seg := range segments {
if seg != tt.expected[i] {
t.Errorf("parsePlaylistSegments() segment[%d] = %s, want %s", i, seg, tt.expected[i])
}
}
})
}
}
func TestHLSStreamingService_GetStreamURLs(t *testing.T) {
db, err := gorm.Open(sqlite.Open(":memory:"), &gorm.Config{})
if err != nil {
t.Fatalf("Failed to open database: %v", err)
}
if err := db.AutoMigrate(&models.HLSStream{}); err != nil {
t.Fatalf("Failed to migrate: %v", err)
}
service := NewHLSStreamingService(db, "/tmp/hls", zap.NewNop())
ctx := context.Background()
trackID := uuid.New()
// Create test stream
stream := &models.HLSStream{
TrackID: trackID,
Status: models.HLSStatusReady,
PlaylistURL: "/tmp/hls/track_123/master.m3u8",
Bitrates: models.BitrateList{128, 192, 320},
SegmentsCount: 10,
}
if err := db.Create(stream).Error; err != nil {
t.Fatalf("Failed to create test stream: %v", err)
}
urls, err := service.GetStreamURLs(ctx, trackID, "https://api.example.com")
if err != nil {
t.Fatalf("GetStreamURLs() error = %v", err)
}
if urls == nil {
t.Error("GetStreamURLs() returned nil urls")
}
// Check master playlist URL
if urls["master_playlist"] == "" {
t.Error("GetStreamURLs() missing master_playlist URL")
}
// Check quality URLs
expectedQualities := []int{128, 192, 320}
for _, bitrate := range expectedQualities {
key := fmt.Sprintf("quality_%d", bitrate)
if urls[key] == "" {
t.Errorf("GetStreamURLs() missing quality URL for %dk", bitrate)
}
}
}
func TestHLSStreamingService_CheckStreamExists(t *testing.T) {
db, err := gorm.Open(sqlite.Open(":memory:"), &gorm.Config{})
if err != nil {
t.Fatalf("Failed to open database: %v", err)
}
if err := db.AutoMigrate(&models.HLSStream{}); err != nil {
t.Fatalf("Failed to migrate: %v", err)
}
service := NewHLSStreamingService(db, "/tmp/hls", zap.NewNop())
ctx := context.Background()
trackID := uuid.New()
// Check non-existent stream
exists, err := service.CheckStreamExists(ctx, trackID)
if err != nil {
t.Fatalf("CheckStreamExists() error = %v", err)
}
if exists {
t.Error("CheckStreamExists() returned true for non-existent stream")
}
// Create stream
stream := &models.HLSStream{
TrackID: trackID,
Status: models.HLSStatusReady,
}
if err := db.Create(stream).Error; err != nil {
t.Fatalf("Failed to create test stream: %v", err)
}
// Check existing stream
exists, err = service.CheckStreamExists(ctx, trackID)
if err != nil {
t.Fatalf("CheckStreamExists() error = %v", err)
}
if !exists {
t.Error("CheckStreamExists() returned false for existing stream")
}
}
func TestHLSStreamingService_UpdateStreamStatus(t *testing.T) {
db, err := gorm.Open(sqlite.Open(":memory:"), &gorm.Config{})
if err != nil {
t.Fatalf("Failed to open database: %v", err)
}
if err := db.AutoMigrate(&models.HLSStream{}); err != nil {
t.Fatalf("Failed to migrate: %v", err)
}
service := NewHLSStreamingService(db, "/tmp/hls", zap.NewNop())
ctx := context.Background()
trackID := uuid.New()
// Create stream
stream := &models.HLSStream{
TrackID: trackID,
Status: models.HLSStatusProcessing,
}
if err := db.Create(stream).Error; err != nil {
t.Fatalf("Failed to create test stream: %v", err)
}
// Update status
err = service.UpdateStreamStatus(ctx, trackID, models.HLSStatusReady)
if err != nil {
t.Fatalf("UpdateStreamStatus() error = %v", err)
}
// Verify status was updated
var updatedStream models.HLSStream
if err := db.Where("track_id = ?", trackID).First(&updatedStream).Error; err != nil {
t.Fatalf("Failed to query updated stream: %v", err)
}
if updatedStream.Status != models.HLSStatusReady {
t.Errorf("UpdateStreamStatus() status = %v, want %v", updatedStream.Status, models.HLSStatusReady)
}
}
func TestHLSStreamingService_GetStreamStatistics(t *testing.T) {
db, err := gorm.Open(sqlite.Open(":memory:"), &gorm.Config{})
if err != nil {
t.Fatalf("Failed to open database: %v", err)
}
if err := db.AutoMigrate(&models.HLSStream{}); err != nil {
t.Fatalf("Failed to migrate: %v", err)
}
service := NewHLSStreamingService(db, "/tmp/hls", zap.NewNop())
ctx := context.Background()
// Create test streams with different statuses
streams := []*models.HLSStream{
{TrackID: uuid.New(), Status: models.HLSStatusReady},
{TrackID: uuid.New(), Status: models.HLSStatusReady},
{TrackID: uuid.New(), Status: models.HLSStatusProcessing},
{TrackID: uuid.New(), Status: models.HLSStatusFailed},
}
for _, stream := range streams {
if err := db.Create(stream).Error; err != nil {
t.Fatalf("Failed to create test stream: %v", err)
}
}
stats, err := service.GetStreamStatistics(ctx)
if err != nil {
t.Fatalf("GetStreamStatistics() error = %v", err)
}
if stats == nil {
t.Error("GetStreamStatistics() returned nil stats")
}
// Verify counts
if stats["total_streams"] != int64(4) {
t.Errorf("GetStreamStatistics() total_streams = %v, want 4", stats["total_streams"])
}
if stats["ready_streams"] != int64(2) {
t.Errorf("GetStreamStatistics() ready_streams = %v, want 2", stats["ready_streams"])
}
if stats["processing_streams"] != int64(1) {
t.Errorf("GetStreamStatistics() processing_streams = %v, want 1", stats["processing_streams"])
}
if stats["failed_streams"] != int64(1) {
t.Errorf("GetStreamStatistics() failed_streams = %v, want 1", stats["failed_streams"])
}
}
// Note: Full integration tests would require:
// 1. Real HLS stream files (master.m3u8, playlists, segments)
// 2. File system access for validation
// 3. Verification of stream health checks
//
// Example integration test structure:
// func TestHLSStreamingService_ValidateStream_Integration(t *testing.T) {
// // Setup test database and file system
// db := setupTestDB(t)
// outputDir := setupTestHLSFiles(t)
// defer cleanupTestFiles(t, outputDir)
//
// service := NewHLSStreamingService(db, outputDir, zap.NewNop())
//
// // Create test stream
// trackID := uuid.New()
// stream := createTestHLSStream(t, db, trackID, outputDir)
//
// ctx := context.Background()
// health, err := service.ValidateStream(ctx, trackID)
// if err != nil {
// t.Fatalf("ValidateStream() error = %v", err)
// }
//
// if !health.IsHealthy {
// t.Error("ValidateStream() stream should be healthy")
// }
// if !health.PlaylistExists {
// t.Error("ValidateStream() playlist should exist")
// }
// if !health.SegmentsValid {
// t.Error("ValidateStream() segments should be valid")
// }
// }