feat(streaming): wire HLS pipeline end-to-end with serving routes
- Add HLSEnabled and HLSStorageDir to backend config (HLS_STREAMING env) - Register HLS serving routes (master.m3u8, quality playlist, segments) behind HLSEnabled feature flag on existing track routes - Add GetHLSStatus and TriggerHLSTranscode methods to StreamService for stream server communication - Update docker-compose (dev, staging, prod) with HLS env vars and shared hls-data volume between backend and stream-server - Stream callback already correctly updates stream_manifest_url
This commit is contained in:
parent
1ed7fe2ebb
commit
218b4b33d6
6 changed files with 126 additions and 0 deletions
|
|
@ -171,6 +171,10 @@ services:
|
|||
- AWS_ACCESS_KEY_ID=${S3_ACCESS_KEY:?S3_ACCESS_KEY must be set}
|
||||
- AWS_SECRET_ACCESS_KEY=${S3_SECRET_KEY:?S3_SECRET_KEY must be set}
|
||||
- AWS_REGION=${AWS_REGION:-us-east-1}
|
||||
- HLS_STREAMING=true
|
||||
- HLS_STORAGE_DIR=/data/hls
|
||||
volumes:
|
||||
- hls_prod_data:/data/hls
|
||||
depends_on:
|
||||
postgres:
|
||||
condition: service_healthy
|
||||
|
|
@ -202,6 +206,9 @@ services:
|
|||
- REDIS_URL=redis://:${REDIS_PASSWORD:?REDIS_PASSWORD must be set}@redis:6379
|
||||
- JWT_SECRET=${JWT_SECRET:?JWT_SECRET must be set}
|
||||
- PORT=3001
|
||||
- HLS_OUTPUT_DIR=/data/hls
|
||||
volumes:
|
||||
- hls_prod_data:/data/hls
|
||||
depends_on:
|
||||
postgres:
|
||||
condition: service_healthy
|
||||
|
|
@ -315,3 +322,4 @@ volumes:
|
|||
rabbitmq_data:
|
||||
hyperswitch_postgres_data:
|
||||
minio_data:
|
||||
hls_prod_data:
|
||||
|
|
|
|||
|
|
@ -77,8 +77,11 @@ services:
|
|||
- AWS_ACCESS_KEY_ID=${STAGING_S3_ACCESS_KEY:?STAGING_S3_ACCESS_KEY must be set}
|
||||
- AWS_SECRET_ACCESS_KEY=${STAGING_S3_SECRET_KEY:?STAGING_S3_SECRET_KEY must be set}
|
||||
- AWS_REGION=us-east-1
|
||||
- HLS_STREAMING=true
|
||||
- HLS_STORAGE_DIR=/data/hls
|
||||
volumes:
|
||||
- veza_logs_staging:/var/log/veza
|
||||
- hls_staging_data:/data/hls
|
||||
depends_on:
|
||||
postgres:
|
||||
condition: service_healthy
|
||||
|
|
@ -107,6 +110,9 @@ services:
|
|||
- REDIS_URL=redis://redis:6379
|
||||
- JWT_SECRET=${STAGING_JWT_SECRET:?STAGING_JWT_SECRET must be set}
|
||||
- PORT=3001
|
||||
- HLS_OUTPUT_DIR=/data/hls
|
||||
volumes:
|
||||
- hls_staging_data:/data/hls
|
||||
depends_on:
|
||||
postgres:
|
||||
condition: service_healthy
|
||||
|
|
@ -192,4 +198,5 @@ volumes:
|
|||
caddy_data:
|
||||
caddy_config:
|
||||
minio_staging_data:
|
||||
hls_staging_data:
|
||||
|
||||
|
|
|
|||
|
|
@ -182,6 +182,10 @@ services:
|
|||
- AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID:-minioadmin}
|
||||
- AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY:-minioadmin}
|
||||
- AWS_REGION=us-east-1
|
||||
- HLS_STREAMING=true
|
||||
- HLS_STORAGE_DIR=/data/hls
|
||||
volumes:
|
||||
- hls-data:/data/hls
|
||||
ports:
|
||||
- "${PORT_BACKEND:-18080}:8080"
|
||||
depends_on:
|
||||
|
|
@ -220,6 +224,9 @@ services:
|
|||
- AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID:-minioadmin}
|
||||
- AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY:-minioadmin}
|
||||
- AWS_REGION=us-east-1
|
||||
- HLS_OUTPUT_DIR=/data/hls
|
||||
volumes:
|
||||
- hls-data:/data/hls
|
||||
ports:
|
||||
- "${PORT_STREAM:-18082}:3001"
|
||||
depends_on:
|
||||
|
|
@ -282,6 +289,7 @@ volumes:
|
|||
rabbitmq_data:
|
||||
hyperswitch_postgres_data:
|
||||
minio_data:
|
||||
hls-data:
|
||||
|
||||
networks:
|
||||
veza-net:
|
||||
|
|
|
|||
|
|
@ -158,6 +158,15 @@ func (r *APIRouter) setupTrackRoutes(router *gin.RouterGroup) {
|
|||
hlsHandler := handlers.NewHLSHandler(hlsService)
|
||||
tracks.GET("/:id/hls/info", hlsHandler.GetStreamInfo)
|
||||
tracks.GET("/:id/hls/status", hlsHandler.GetStreamStatus)
|
||||
|
||||
if r.config.HLSEnabled {
|
||||
hlsStreaming := tracks.Group("/:id/hls")
|
||||
{
|
||||
hlsStreaming.GET("/master.m3u8", hlsHandler.ServeMasterPlaylist)
|
||||
hlsStreaming.GET("/:bitrate/playlist.m3u8", hlsHandler.ServeQualityPlaylist)
|
||||
hlsStreaming.GET("/:bitrate/:segment", hlsHandler.ServeSegment)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -80,6 +80,10 @@ type Config struct {
|
|||
CORSOrigins []string // Liste des origines CORS autorisées
|
||||
FrontendURL string // URL du frontend (OAuth redirect, password reset links). FRONTEND_URL ou VITE_FRONTEND_URL
|
||||
|
||||
// HLS Streaming Configuration (v0.503)
|
||||
HLSEnabled bool // Enable HLS streaming routes
|
||||
HLSStorageDir string // Directory for HLS segment storage
|
||||
|
||||
// S3 Storage Configuration (BE-SVC-005)
|
||||
S3Bucket string // Nom du bucket S3
|
||||
S3Region string // Région AWS
|
||||
|
|
@ -287,6 +291,10 @@ func NewConfig() (*Config, error) {
|
|||
CORSOrigins: corsOrigins,
|
||||
FrontendURL: getFrontendURL(), // OAuth callback, password reset, email links
|
||||
|
||||
// HLS Streaming (v0.503)
|
||||
HLSEnabled: getEnvBool("HLS_STREAMING", false),
|
||||
HLSStorageDir: getEnv("HLS_STORAGE_DIR", "/tmp/veza-hls"),
|
||||
|
||||
// S3 Storage Configuration (BE-SVC-005)
|
||||
S3Bucket: getEnv("AWS_S3_BUCKET", ""),
|
||||
S3Region: getEnv("AWS_REGION", "us-east-1"),
|
||||
|
|
|
|||
|
|
@ -139,6 +139,92 @@ func (s *StreamService) StartProcessing(ctx context.Context, trackID uuid.UUID,
|
|||
return fmt.Errorf("stream server returned non-200 status after %d attempts", maxRetries)
|
||||
}
|
||||
|
||||
type HLSStatus struct {
|
||||
TrackID string `json:"track_id"`
|
||||
Status string `json:"status"`
|
||||
Progress int `json:"progress"`
|
||||
Qualities []string `json:"qualities,omitempty"`
|
||||
Duration float64 `json:"duration,omitempty"`
|
||||
}
|
||||
|
||||
// GetHLSStatus queries the stream server for the current HLS transcoding status.
|
||||
func (s *StreamService) GetHLSStatus(ctx context.Context, trackID uuid.UUID) (*HLSStatus, error) {
|
||||
url := fmt.Sprintf("%s/v1/stream/job/%s", s.baseURL, trackID.String())
|
||||
|
||||
req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("create request: %w", err)
|
||||
}
|
||||
if s.internalAPIKey != "" {
|
||||
req.Header.Set("X-Internal-API-Key", s.internalAPIKey)
|
||||
}
|
||||
|
||||
resp, err := s.circuitBreaker.DoWithContext(ctx, req)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("stream server request: %w", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode == http.StatusNotFound {
|
||||
return &HLSStatus{TrackID: trackID.String(), Status: "not_found"}, nil
|
||||
}
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
return nil, fmt.Errorf("stream server returned status %d", resp.StatusCode)
|
||||
}
|
||||
|
||||
var status HLSStatus
|
||||
if err := json.NewDecoder(resp.Body).Decode(&status); err != nil {
|
||||
return nil, fmt.Errorf("decode response: %w", err)
|
||||
}
|
||||
return &status, nil
|
||||
}
|
||||
|
||||
type HLSTranscodeRequest struct {
|
||||
TrackID string `json:"track_id"`
|
||||
FilePath string `json:"file_path"`
|
||||
Format string `json:"format"`
|
||||
}
|
||||
|
||||
// TriggerHLSTranscode sends a request to the stream server to start HLS transcoding.
|
||||
func (s *StreamService) TriggerHLSTranscode(ctx context.Context, trackID uuid.UUID, filePath string) error {
|
||||
url := fmt.Sprintf("%s/internal/jobs/transcode", s.baseURL)
|
||||
reqBody := HLSTranscodeRequest{
|
||||
TrackID: trackID.String(),
|
||||
FilePath: filePath,
|
||||
Format: "hls",
|
||||
}
|
||||
|
||||
jsonBody, err := json.Marshal(reqBody)
|
||||
if err != nil {
|
||||
return fmt.Errorf("marshal request: %w", err)
|
||||
}
|
||||
|
||||
req, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewBuffer(jsonBody))
|
||||
if err != nil {
|
||||
return fmt.Errorf("create request: %w", err)
|
||||
}
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
if s.internalAPIKey != "" {
|
||||
req.Header.Set("X-Internal-API-Key", s.internalAPIKey)
|
||||
}
|
||||
|
||||
resp, err := s.circuitBreaker.DoWithContext(ctx, req)
|
||||
if err != nil {
|
||||
return fmt.Errorf("stream server request: %w", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusAccepted {
|
||||
return fmt.Errorf("stream server returned status %d", resp.StatusCode)
|
||||
}
|
||||
|
||||
s.logger.Info("HLS transcode triggered",
|
||||
zap.String("track_id", trackID.String()),
|
||||
zap.String("file_path", filePath))
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// extractRequestIDFromContext extrait le request_id depuis un contexte Go
|
||||
// FIX #11: Utilise la même logique que middleware.GetRequestIDFromGoContext mais sans cycle d'import
|
||||
func extractRequestIDFromContext(ctx context.Context) string {
|
||||
|
|
|
|||
Loading…
Reference in a new issue