veza/veza-backend-api/internal/services/hls_transcode_service.go
senke 70f0fb1636 feat(transcode): read from S3 signed URL when track is s3-backed (v1.0.8 P2)
Closes the transcoder's read-side gap for Phase 2. HLS transcoding now
works for tracks uploaded under TRACK_STORAGE_BACKEND=s3 without
requiring the stream server pod to share a local volume.

Changes:

- internal/services/hls_transcode_service.go
  - New SignedURLProvider interface (minimal: GetSignedURL).
  - HLSTranscodeService gains optional s3Resolver + SetS3Resolver.
  - TranscodeTrack routed through new resolveSource helper — returns
    local FilePath for local tracks, a 1h-TTL signed URL for s3-backed
    rows. Missing resolver for an s3 track returns a clear error.
  - os.Stat check skipped for HTTP(S) sources (ffmpeg validates them).
  - transcodeBitrate takes `source` explicitly so URL propagation is
    obvious and ValidateExecPath is bypassed only for the known
    signed-URL shape.
  - isHTTPSource helper (http://, https:// prefix check).

- internal/workers/job_worker.go
  - JobWorker gains optional s3Resolver + SetS3Resolver.
  - processTranscodingJob skips the local-file stat when
    track.StorageBackend='s3', reads via signed URL instead.
  - Passes w.s3Resolver to NewHLSTranscodeService when non-nil.

- internal/config/config.go: DI wires S3StorageService into JobWorker
  after instantiation (nil-safe).

- internal/core/track/service.go (copyFileAsyncS3)
  - Re-enabled stream server trigger: generates a 1h-TTL signed URL
    for the fresh s3 key and passes it to streamService.StartProcessing.
    Rust-side ffmpeg consumes HTTPS URLs natively. Failure is logged
    but does not fail the upload (track will sit in Processing until
    a retry / reconcile).

- internal/core/track/track_upload_handler.go (CompleteChunkedUpload)
  - Reload track after S3 migration to pick up the new storage_key.
  - Compute transcodeSource = signed URL (s3 path) or finalPath (local).
  - Pass transcodeSource to both streamService.StartProcessing and
    jobEnqueuer.EnqueueTranscodingJob — dual-trigger preserved per
    plan D2 (consolidation deferred v1.0.9).

- internal/services/hls_transcode_service_test.go
  - TestHLSTranscodeService_TranscodeTrack_EmptyFilePath updated for
    the expanded error message ("empty FilePath" vs "file path is empty").

Known limitation (v1.0.9): HLS segment OUTPUT still writes to the
local outputDir; only the INPUT side is S3-aware. Multi-pod HLS serving
needs the worker to upload segments to MinIO post-transcode. Acceptable
for v1.0.8 target — single-pod staging supports both local + s3 tracks.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-23 23:34:51 +02:00

301 lines
9.9 KiB
Go

package services
import (
"context"
"fmt"
"os"
"os/exec"
"path/filepath"
"strings"
"time"
"veza-backend-api/internal/models"
"veza-backend-api/internal/utils"
"github.com/google/uuid"
"go.uber.org/zap"
)
// SignedURLProvider is the minimal interface HLSTranscodeService needs to
// resolve S3 object keys into ffmpeg-consumable HTTPS URLs.
// *S3StorageService satisfies this via its GetSignedURL method.
// v1.0.8 Phase 2.
type SignedURLProvider interface {
GetSignedURL(ctx context.Context, key string, ttl time.Duration) (string, error)
}
// HLSTranscodeService gère le transcodage HLS des tracks audio
type HLSTranscodeService struct {
outputDir string
bitrates []int
logger *zap.Logger
// v1.0.8 Phase 2 — optional. When set AND track.StorageBackend='s3',
// TranscodeTrack pulls the source via a signed URL instead of a local
// FS path. Keeps HLS output on local disk for now (multi-pod segment
// storage is v1.0.9).
s3Resolver SignedURLProvider
}
// NewHLSTranscodeService crée un nouveau service de transcodage HLS
func NewHLSTranscodeService(outputDir string, logger *zap.Logger) *HLSTranscodeService {
if logger == nil {
logger = zap.NewNop()
}
return &HLSTranscodeService{
outputDir: outputDir,
bitrates: []int{128, 192, 320},
logger: logger,
}
}
// SetBitrates configure les bitrates à utiliser pour le transcodage
func (s *HLSTranscodeService) SetBitrates(bitrates []int) {
s.bitrates = bitrates
}
// SetS3Resolver wires the S3 signed-URL resolver (v1.0.8 Phase 2).
// Without it, S3-backed tracks will fail transcoding with a clear error.
func (s *HLSTranscodeService) SetS3Resolver(provider SignedURLProvider) {
s.s3Resolver = provider
}
// TranscodeTrack transcodage un track en format HLS avec plusieurs qualités.
// v1.0.8 Phase 2 — accepte tracks locaux (source = track.FilePath) ou s3
// (source = signed URL généré via s.s3Resolver). Output reste local dans
// s.outputDir dans les deux cas (multi-pod HLS deferred v1.0.9).
func (s *HLSTranscodeService) TranscodeTrack(ctx context.Context, track *models.Track) (*models.HLSStream, error) {
if track == nil {
return nil, fmt.Errorf("track cannot be nil")
}
source, err := s.resolveSource(ctx, track)
if err != nil {
return nil, err
}
// Pour les sources locales uniquement, vérifier que le fichier existe.
// Les URLs HTTPS sont validées par ffmpeg lors du premier transcode.
if !isHTTPSource(source) {
if _, err := os.Stat(source); os.IsNotExist(err) {
return nil, fmt.Errorf("track file does not exist: %s", source)
}
}
trackDir := filepath.Join(s.outputDir, fmt.Sprintf("track_%s", track.ID))
if err := os.MkdirAll(trackDir, 0755); err != nil {
return nil, fmt.Errorf("failed to create track directory: %w", err)
}
// Cleanup en cas d'erreur
var cleanupErr error
defer func() {
if cleanupErr != nil {
// Nettoyer en cas d'erreur
if err := s.cleanupTrackDir(trackDir); err != nil {
s.logger.Error("Failed to cleanup track directory", zap.Error(err))
}
}
}()
var bitrates []int
for _, bitrate := range s.bitrates {
if err := s.transcodeBitrate(ctx, track, source, trackDir, bitrate); err != nil {
cleanupErr = err
return nil, fmt.Errorf("failed to transcode bitrate %dk: %w", bitrate, err)
}
bitrates = append(bitrates, bitrate)
s.logger.Info("Transcoded bitrate", zap.Int("bitrate", bitrate), zap.String("track_id", track.ID.String()))
}
playlistURL := filepath.Join(trackDir, "master.m3u8")
if err := s.generateMasterPlaylist(trackDir, bitrates); err != nil {
cleanupErr = err
return nil, fmt.Errorf("failed to generate master playlist: %w", err)
}
segmentsCount, err := s.countSegments(trackDir)
if err != nil {
cleanupErr = err
return nil, fmt.Errorf("failed to count segments: %w", err)
}
return &models.HLSStream{
TrackID: track.ID,
PlaylistURL: playlistURL,
SegmentsCount: segmentsCount,
Bitrates: models.BitrateList(bitrates),
Status: models.HLSStatusReady,
},
nil
}
// transcodeBitrate transcodage un track pour un bitrate spécifique.
// source peut être un path local ou une signed URL HTTPS (v1.0.8 P2).
func (s *HLSTranscodeService) transcodeBitrate(ctx context.Context, track *models.Track, source, outputDir string, bitrate int) error {
qualityDir := filepath.Join(outputDir, fmt.Sprintf("%dk", bitrate))
if err := os.MkdirAll(qualityDir, 0755); err != nil {
return fmt.Errorf("failed to create quality directory: %w", err)
}
outputPattern := filepath.Join(qualityDir, "segment_%03d.ts")
playlistPath := filepath.Join(qualityDir, "playlist.m3u8")
// SECURITY: Validate paths for exec.Command. Skip pour les URLs HTTPS
// (ValidateExecPath rejette les URLs comme inattendues ; on les accepte
// explicitement ici parce qu'elles proviennent du signed-URL resolver,
// chaîne authentique hors contrôle utilisateur).
if !isHTTPSource(source) && !utils.ValidateExecPath(source) {
return fmt.Errorf("invalid source path")
}
if !utils.ValidateExecPath(playlistPath) {
return fmt.Errorf("invalid playlist path")
}
// Commande ffmpeg pour transcoder en HLS
cmd := exec.CommandContext(ctx, "ffmpeg",
"-i", source,
"-codec:a", "aac",
"-b:a", fmt.Sprintf("%dk", bitrate),
"-hls_time", "10",
"-hls_playlist_type", "vod",
"-hls_segment_filename", outputPattern,
"-hls_list_size", "0", // Inclure tous les segments
"-y", // Overwrite output files
playlistPath,
)
// Capturer la sortie pour le logging
output, err := cmd.CombinedOutput()
if err != nil {
s.logger.Error("FFmpeg transcoding failed",
zap.Int("bitrate", bitrate),
zap.String("track_id", track.ID.String()),
zap.String("output", string(output)),
zap.Error(err))
return fmt.Errorf("ffmpeg failed: %w", err)
}
// Vérifier que le fichier playlist a été créé
if _, err := os.Stat(playlistPath); os.IsNotExist(err) {
return fmt.Errorf("playlist file was not created: %s", playlistPath)
}
return nil
}
// generateMasterPlaylist génère le fichier master.m3u8 avec toutes les qualités
func (s *HLSTranscodeService) generateMasterPlaylist(trackDir string, bitrates []int) error {
masterPlaylistPath := filepath.Join(trackDir, "master.m3u8")
var lines []string
lines = append(lines, "#EXTM3U")
lines = append(lines, "#EXT-X-VERSION:3")
for _, bitrate := range bitrates {
qualityDir := fmt.Sprintf("%dk", bitrate)
playlistPath := filepath.Join(qualityDir, "playlist.m3u8")
// Ajouter l'entrée pour cette qualité
lines = append(lines, fmt.Sprintf("#EXT-X-STREAM-INF:BANDWIDTH=%d000", bitrate))
lines = append(lines, playlistPath)
}
content := strings.Join(lines, "\n") + "\n"
if err := os.WriteFile(masterPlaylistPath, []byte(content), 0644); err != nil {
return fmt.Errorf("failed to write master playlist: %w", err)
}
return nil
}
// getPlaylistDuration lit la durée totale d'une playlist .m3u8
func (s *HLSTranscodeService) getPlaylistDuration(playlistPath string) float64 {
data, err := os.ReadFile(playlistPath)
if err != nil {
return 0
}
lines := strings.Split(string(data), "\n")
var totalDuration float64
for _, line := range lines {
if strings.HasPrefix(line, "#EXTINF:") {
// Format: #EXTINF:10.0,
parts := strings.Split(line, ":")
if len(parts) > 1 {
durationStr := strings.TrimSuffix(parts[1], ",")
var duration float64
if _, err := fmt.Sscanf(durationStr, "%f", &duration); err == nil {
totalDuration += duration
}
}
}
}
return totalDuration
}
// countSegments compte le nombre de segments .ts dans le répertoire du track
// T0344: Compte les segments dans chaque répertoire de qualité et retourne le maximum
func (s *HLSTranscodeService) countSegments(trackDir string) (int, error) {
// Check if track directory exists
if _, err := os.Stat(trackDir); os.IsNotExist(err) {
return 0, fmt.Errorf("track directory does not exist: %s", trackDir)
}
count := 0
for _, bitrate := range s.bitrates {
qualityDir := filepath.Join(trackDir, fmt.Sprintf("%dk", bitrate))
files, err := filepath.Glob(filepath.Join(qualityDir, "segment_*.ts"))
if err != nil {
return 0, fmt.Errorf("failed to glob segments in %s: %w", qualityDir, err)
}
if len(files) > count {
count = len(files)
}
}
return count, nil
}
// cleanupTrackDir supprime le répertoire d'un track en cas d'erreur
func (s *HLSTranscodeService) cleanupTrackDir(trackDir string) error {
return os.RemoveAll(trackDir)
}
// CleanupTrackDir supprime le répertoire d'un track (méthode publique)
func (s *HLSTranscodeService) CleanupTrackDir(trackID uuid.UUID) error {
trackDir := filepath.Join(s.outputDir, fmt.Sprintf("track_%s", trackID))
return s.cleanupTrackDir(trackDir)
}
// resolveSource décide de l'input ffmpeg : path local ou signed URL S3.
// v1.0.8 Phase 2 — s3-backed tracks résolus via s3Resolver, local tracks
// continuent avec track.FilePath.
func (s *HLSTranscodeService) resolveSource(ctx context.Context, track *models.Track) (string, error) {
if track.StorageBackend == "s3" && track.StorageKey != nil && *track.StorageKey != "" {
if s.s3Resolver == nil {
return "", fmt.Errorf("track %s is s3-backed but HLSTranscodeService has no S3 resolver (call SetS3Resolver at init)", track.ID)
}
// 1h TTL — supérieur au temps de transcode (généralement <10 min) mais
// pas illimité : si le job reste stuck, la prochaine retry regénère.
url, err := s.s3Resolver.GetSignedURL(ctx, *track.StorageKey, time.Hour)
if err != nil {
return "", fmt.Errorf("resolve S3 signed URL for track %s: %w", track.ID, err)
}
return url, nil
}
if track.FilePath == "" {
return "", fmt.Errorf("track %s has empty FilePath and no S3 storage_key", track.ID)
}
return track.FilePath, nil
}
// isHTTPSource retourne true ssi source commence par http:// ou https://.
// Utilisé pour bypasser les checks de path local (os.Stat, ValidateExecPath)
// qui ne s'appliquent pas aux URLs.
func isHTTPSource(source string) bool {
return strings.HasPrefix(source, "http://") || strings.HasPrefix(source, "https://")
}