Compare commits
6 commits
4ee8c38536
...
e3bf2d2aea
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e3bf2d2aea | ||
|
|
70f0fb1636 | ||
|
|
282467ae14 | ||
|
|
ac31a54405 | ||
|
|
f47141fe62 | ||
|
|
3d43d43075 |
11 changed files with 1013 additions and 44 deletions
317
veza-backend-api/cmd/migrate_storage/main.go
Normal file
317
veza-backend-api/cmd/migrate_storage/main.go
Normal file
|
|
@ -0,0 +1,317 @@
|
|||
// Command migrate_storage bulk-migrates tracks from local disk to MinIO/S3.
|
||||
//
|
||||
// Reads env (DATABASE_URL, AWS_S3_*) and iterates rows where
|
||||
// `tracks.storage_backend='local'`, streaming each file to S3 at
|
||||
// `tracks/<userID>/<trackID>.<ext>` and atomically updating the row to
|
||||
// `storage_backend='s3' + storage_key=<key>`.
|
||||
//
|
||||
// v1.0.8 Phase 3. Usage:
|
||||
//
|
||||
// migrate_storage --dry-run
|
||||
// migrate_storage --batch-size=50 --limit=500
|
||||
// migrate_storage --delete-local=true # delete source file after successful upload
|
||||
//
|
||||
// Safe to re-run: each iteration filters on storage_backend='local', so rows
|
||||
// already migrated are skipped. A crash mid-batch leaves partial progress
|
||||
// committed per row (transactions scoped to one track).
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"flag"
|
||||
"fmt"
|
||||
"os"
|
||||
"os/signal"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"veza-backend-api/internal/database"
|
||||
"veza-backend-api/internal/models"
|
||||
"veza-backend-api/internal/services"
|
||||
|
||||
"github.com/joho/godotenv"
|
||||
"go.uber.org/zap"
|
||||
"gorm.io/gorm"
|
||||
)
|
||||
|
||||
type flags struct {
|
||||
dryRun bool
|
||||
batchSize int
|
||||
limit int
|
||||
deleteLocal bool
|
||||
timeoutMin int
|
||||
}
|
||||
|
||||
func parseFlags() flags {
|
||||
f := flags{}
|
||||
flag.BoolVar(&f.dryRun, "dry-run", false, "Log candidates without uploading or updating DB")
|
||||
flag.IntVar(&f.batchSize, "batch-size", 50, "Rows to SELECT per iteration")
|
||||
flag.IntVar(&f.limit, "limit", 0, "Total max rows to migrate (0 = unlimited)")
|
||||
flag.BoolVar(&f.deleteLocal, "delete-local", false, "Remove local file after successful S3 upload")
|
||||
flag.IntVar(&f.timeoutMin, "timeout-min", 30, "Global timeout in minutes (safety stop)")
|
||||
flag.Parse()
|
||||
return f
|
||||
}
|
||||
|
||||
func main() {
|
||||
_ = godotenv.Load()
|
||||
|
||||
cfg := parseFlags()
|
||||
|
||||
logger, err := zap.NewProduction()
|
||||
if err != nil {
|
||||
fmt.Fprintf(os.Stderr, "failed to init logger: %v\n", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
defer logger.Sync()
|
||||
|
||||
logger.Info("migrate_storage starting",
|
||||
zap.Bool("dry_run", cfg.dryRun),
|
||||
zap.Int("batch_size", cfg.batchSize),
|
||||
zap.Int("limit", cfg.limit),
|
||||
zap.Bool("delete_local", cfg.deleteLocal),
|
||||
)
|
||||
|
||||
// Top-level context with SIGINT + timeout so a stuck S3 call doesn't hang forever.
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(cfg.timeoutMin)*time.Minute)
|
||||
defer cancel()
|
||||
sigCh := make(chan os.Signal, 1)
|
||||
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
|
||||
go func() {
|
||||
<-sigCh
|
||||
logger.Warn("signal received, cancelling context")
|
||||
cancel()
|
||||
}()
|
||||
|
||||
// DB init — minimal config, just DATABASE_URL.
|
||||
dbURL := os.Getenv("DATABASE_URL")
|
||||
if dbURL == "" {
|
||||
logger.Fatal("DATABASE_URL not set")
|
||||
}
|
||||
db, err := database.NewDatabase(&database.Config{
|
||||
URL: dbURL,
|
||||
MaxOpenConns: 5,
|
||||
MaxIdleConns: 2,
|
||||
MaxLifetime: 5 * time.Minute,
|
||||
MaxIdleTime: 2 * time.Minute,
|
||||
})
|
||||
if err != nil {
|
||||
logger.Fatal("failed to open database", zap.Error(err))
|
||||
}
|
||||
defer db.Close()
|
||||
db.Logger = logger
|
||||
|
||||
// S3 init (always required — if it fails, the CLI has no reason to run).
|
||||
s3Bucket := os.Getenv("AWS_S3_BUCKET")
|
||||
if s3Bucket == "" {
|
||||
logger.Fatal("AWS_S3_BUCKET not set (migration target)")
|
||||
}
|
||||
s3Service, err := services.NewS3StorageService(services.S3Config{
|
||||
Bucket: s3Bucket,
|
||||
Region: envOrDefault("AWS_REGION", "us-east-1"),
|
||||
Endpoint: os.Getenv("AWS_S3_ENDPOINT"),
|
||||
AccessKey: os.Getenv("AWS_ACCESS_KEY_ID"),
|
||||
SecretKey: os.Getenv("AWS_SECRET_ACCESS_KEY"),
|
||||
Logger: logger,
|
||||
})
|
||||
if err != nil {
|
||||
logger.Fatal("failed to init S3", zap.Error(err))
|
||||
}
|
||||
|
||||
stats := &runStats{}
|
||||
runner := &migrator{
|
||||
db: db.GormDB,
|
||||
s3: s3Service,
|
||||
logger: logger,
|
||||
cfg: cfg,
|
||||
stats: stats,
|
||||
}
|
||||
if err := runner.run(ctx); err != nil {
|
||||
logger.Error("migration finished with errors", zap.Error(err))
|
||||
stats.log(logger)
|
||||
os.Exit(2)
|
||||
}
|
||||
stats.log(logger)
|
||||
}
|
||||
|
||||
// runStats accumulates per-run counters for the final summary.
|
||||
type runStats struct {
|
||||
candidates int
|
||||
uploaded int
|
||||
skipped int
|
||||
errors int
|
||||
bytesSent int64
|
||||
}
|
||||
|
||||
func (s *runStats) log(logger *zap.Logger) {
|
||||
logger.Info("migrate_storage summary",
|
||||
zap.Int("candidates", s.candidates),
|
||||
zap.Int("uploaded", s.uploaded),
|
||||
zap.Int("skipped", s.skipped),
|
||||
zap.Int("errors", s.errors),
|
||||
zap.Int64("bytes_sent", s.bytesSent),
|
||||
)
|
||||
}
|
||||
|
||||
type migrator struct {
|
||||
db *gorm.DB
|
||||
s3 *services.S3StorageService
|
||||
logger *zap.Logger
|
||||
cfg flags
|
||||
stats *runStats
|
||||
}
|
||||
|
||||
func (m *migrator) run(ctx context.Context) error {
|
||||
seen := 0
|
||||
for {
|
||||
if ctx.Err() != nil {
|
||||
return ctx.Err()
|
||||
}
|
||||
if m.cfg.limit > 0 && seen >= m.cfg.limit {
|
||||
m.logger.Info("reached --limit, stopping", zap.Int("limit", m.cfg.limit))
|
||||
return nil
|
||||
}
|
||||
|
||||
// Pull a batch of local-backed tracks. Ordering by created_at
|
||||
// oldest-first so long-lived legacy rows get processed first.
|
||||
var tracks []models.Track
|
||||
q := m.db.WithContext(ctx).
|
||||
Where("storage_backend = ?", "local").
|
||||
Where("file_path != ''").
|
||||
Order("created_at ASC").
|
||||
Limit(m.cfg.batchSize)
|
||||
if err := q.Find(&tracks).Error; err != nil {
|
||||
return fmt.Errorf("select batch: %w", err)
|
||||
}
|
||||
if len(tracks) == 0 {
|
||||
m.logger.Info("no more local-backed tracks, done")
|
||||
return nil
|
||||
}
|
||||
|
||||
m.stats.candidates += len(tracks)
|
||||
for i := range tracks {
|
||||
if ctx.Err() != nil {
|
||||
return ctx.Err()
|
||||
}
|
||||
if m.cfg.limit > 0 && seen >= m.cfg.limit {
|
||||
return nil
|
||||
}
|
||||
m.migrateOne(ctx, &tracks[i])
|
||||
seen++
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (m *migrator) migrateOne(ctx context.Context, t *models.Track) {
|
||||
log := m.logger.With(
|
||||
zap.String("track_id", t.ID.String()),
|
||||
zap.String("user_id", t.UserID.String()),
|
||||
zap.String("file_path", t.FilePath),
|
||||
)
|
||||
|
||||
if m.cfg.dryRun {
|
||||
log.Info("dry-run: would upload to S3")
|
||||
m.stats.skipped++
|
||||
return
|
||||
}
|
||||
|
||||
file, err := os.Open(t.FilePath)
|
||||
if err != nil {
|
||||
if os.IsNotExist(err) {
|
||||
log.Warn("local file missing, skipping (orphan row)")
|
||||
m.stats.skipped++
|
||||
return
|
||||
}
|
||||
log.Error("open local file failed", zap.Error(err))
|
||||
m.stats.errors++
|
||||
return
|
||||
}
|
||||
defer file.Close()
|
||||
|
||||
stat, err := file.Stat()
|
||||
if err != nil {
|
||||
log.Error("stat local file failed", zap.Error(err))
|
||||
m.stats.errors++
|
||||
return
|
||||
}
|
||||
|
||||
ext := extOrDefault(t.FilePath, t.Format)
|
||||
s3Key := fmt.Sprintf("tracks/%s/%s%s", t.UserID.String(), t.ID.String(), ext)
|
||||
contentType := mimeFromExt(ext)
|
||||
|
||||
uploadCtx, cancel := context.WithTimeout(ctx, 10*time.Minute)
|
||||
defer cancel()
|
||||
if _, err := m.s3.UploadStream(uploadCtx, file, s3Key, contentType, stat.Size()); err != nil {
|
||||
log.Error("S3 upload failed", zap.Error(err), zap.String("s3_key", s3Key))
|
||||
m.stats.errors++
|
||||
return
|
||||
}
|
||||
|
||||
// Update DB row atomically.
|
||||
if err := m.db.WithContext(ctx).Model(&models.Track{}).
|
||||
Where("id = ?", t.ID).
|
||||
Updates(map[string]interface{}{
|
||||
"storage_backend": "s3",
|
||||
"storage_key": s3Key,
|
||||
}).Error; err != nil {
|
||||
// Object uploaded but row stale — log loud. Admin can run a reconcile
|
||||
// query to find this state: SELECT id FROM tracks WHERE storage_backend='local' AND EXISTS(s3 object for id).
|
||||
log.Error("DB update failed after S3 upload (object leaked)",
|
||||
zap.String("s3_key", s3Key),
|
||||
zap.Error(err))
|
||||
m.stats.errors++
|
||||
return
|
||||
}
|
||||
|
||||
m.stats.uploaded++
|
||||
m.stats.bytesSent += stat.Size()
|
||||
|
||||
if m.cfg.deleteLocal {
|
||||
if err := os.Remove(t.FilePath); err != nil {
|
||||
log.Warn("local delete failed (file orphaned)", zap.Error(err))
|
||||
}
|
||||
}
|
||||
|
||||
log.Info("migrated",
|
||||
zap.String("s3_key", s3Key),
|
||||
zap.Int64("bytes", stat.Size()),
|
||||
)
|
||||
}
|
||||
|
||||
func envOrDefault(key, def string) string {
|
||||
if v := os.Getenv(key); v != "" {
|
||||
return v
|
||||
}
|
||||
return def
|
||||
}
|
||||
|
||||
// extOrDefault returns the file extension (with dot) from path, falling back
|
||||
// to ".<format>" when path has none.
|
||||
func extOrDefault(path, format string) string {
|
||||
for i := len(path) - 1; i >= 0 && path[i] != '/'; i-- {
|
||||
if path[i] == '.' {
|
||||
return path[i:]
|
||||
}
|
||||
}
|
||||
if format != "" {
|
||||
return "." + format
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
func mimeFromExt(ext string) string {
|
||||
switch ext {
|
||||
case ".mp3":
|
||||
return "audio/mpeg"
|
||||
case ".flac":
|
||||
return "audio/flac"
|
||||
case ".wav":
|
||||
return "audio/wav"
|
||||
case ".ogg":
|
||||
return "audio/ogg"
|
||||
case ".m4a", ".aac":
|
||||
return "audio/aac"
|
||||
default:
|
||||
return "application/octet-stream"
|
||||
}
|
||||
}
|
||||
|
|
@ -28,6 +28,10 @@ func (r *APIRouter) setupTrackRoutes(router *gin.RouterGroup) {
|
|||
trackService.SetStreamService(streamService) // INT-02: Enable HLS pipeline for regular uploads
|
||||
discoverService := discovercore.NewService(r.db.GormDB, r.logger)
|
||||
trackService.SetDiscoverService(discoverService) // v0.10.1: tags/genres sync
|
||||
// v1.0.8 Phase 1: wire S3 storage when configured. s3Service will be nil
|
||||
// if AWS_S3_ENABLED=false, which leaves TrackService in local-only mode
|
||||
// regardless of TrackStorageBackend value.
|
||||
trackService.SetS3Storage(r.config.S3StorageService, r.config.TrackStorageBackend, r.config.S3Bucket)
|
||||
trackUploadService := services.NewTrackUploadService(r.db.GormDB, r.logger)
|
||||
var redisClient *redis.Client
|
||||
if r.config != nil {
|
||||
|
|
|
|||
|
|
@ -770,6 +770,13 @@ func NewConfig() (*Config, error) {
|
|||
// BE-SVC-003: Connect JobService to JobWorker
|
||||
jobService.SetJobEnqueuer(config.JobWorker)
|
||||
|
||||
// v1.0.8 Phase 2 — wire S3 resolver so HLSTranscodeService can handle
|
||||
// s3-backed tracks via signed URLs instead of local paths. Nil-safe:
|
||||
// local-only deployments skip this entirely.
|
||||
if config.S3StorageService != nil {
|
||||
config.JobWorker.SetS3Resolver(config.S3StorageService)
|
||||
}
|
||||
|
||||
// Logger la configuration avec masquage des secrets (T0037)
|
||||
config.logConfigInitialized(logger)
|
||||
|
||||
|
|
|
|||
|
|
@ -58,9 +58,20 @@ type StreamServiceInterface interface {
|
|||
StartProcessing(ctx context.Context, trackID uuid.UUID, filePath string) error
|
||||
}
|
||||
|
||||
// S3StorageInterface defines the minimal S3 surface used by TrackService.
|
||||
// v1.0.8 Phase 1 — narrow interface keeps the service testable without
|
||||
// requiring real AWS credentials or a MinIO container in unit tests.
|
||||
// *services.S3StorageService satisfies this interface.
|
||||
type S3StorageInterface interface {
|
||||
UploadStream(ctx context.Context, r io.Reader, key, contentType string, size int64) (string, error)
|
||||
GetSignedURL(ctx context.Context, key string, ttl time.Duration) (string, error)
|
||||
DeleteFile(ctx context.Context, key string) error
|
||||
}
|
||||
|
||||
// TrackService gère les opérations sur les tracks
|
||||
// BE-SVC-001: Add cache service for track metadata
|
||||
// v0.943: Batch operations delegated to TrackBatchService
|
||||
// v1.0.8: Optional S3 storage backend (TRACK_STORAGE_BACKEND=s3)
|
||||
type TrackService struct {
|
||||
db *gorm.DB // Write operations (and read fallback when readDB is nil)
|
||||
readDB *gorm.DB // Optional read replica for read-only operations
|
||||
|
|
@ -71,6 +82,14 @@ type TrackService struct {
|
|||
streamService StreamServiceInterface // INT-02: Optional, triggers HLS transcoding after upload
|
||||
batchService *TrackBatchService // v0.943: batch operations
|
||||
discoverService *discover.Service // v0.10.1: tags/genres sync
|
||||
|
||||
// v1.0.8 Phase 1 — storage backend (local vs S3/MinIO)
|
||||
// storageBackend is "local" (default) or "s3". When "s3", copyFileAsync
|
||||
// writes to s3Service instead of the local filesystem.
|
||||
// Both remain nil/zero-value when running without S3.
|
||||
s3Service S3StorageInterface
|
||||
storageBackend string
|
||||
s3Bucket string // for logging/metrics only
|
||||
}
|
||||
|
||||
// forRead returns the DB to use for read operations (read replica if configured, else primary)
|
||||
|
|
@ -128,6 +147,50 @@ func (s *TrackService) SetDiscoverService(d *discover.Service) {
|
|||
s.discoverService = d
|
||||
}
|
||||
|
||||
// SetS3Storage wires the S3 storage backend (v1.0.8 Phase 1).
|
||||
// backend is expected to be "local" or "s3" (validated by Config.ValidateForEnvironment).
|
||||
// Passing svc=nil silently keeps the service in local-only mode regardless of backend.
|
||||
func (s *TrackService) SetS3Storage(svc S3StorageInterface, backend, bucket string) {
|
||||
s.s3Service = svc
|
||||
s.storageBackend = backend
|
||||
s.s3Bucket = bucket
|
||||
}
|
||||
|
||||
// IsS3Backend returns true iff the service is configured to write new tracks
|
||||
// to S3. Exposed for handlers that need to branch behavior after uploads
|
||||
// (e.g., skip local-path-based stream server trigger).
|
||||
// v1.0.8 Phase 1.
|
||||
func (s *TrackService) IsS3Backend() bool {
|
||||
return s.storageBackend == "s3" && s.s3Service != nil
|
||||
}
|
||||
|
||||
// GetStorageURL returns a signed URL for a track's file when the track row
|
||||
// carries storage_backend='s3'. Returns ("", false, nil) for local-backed
|
||||
// tracks — the caller must fall back to filesystem serving.
|
||||
//
|
||||
// v1.0.8 Phase 2 — handlers (StreamTrack, DownloadTrack) use this to emit
|
||||
// a 302 redirect to MinIO/S3 for tracks that were uploaded under s3 mode.
|
||||
// TTL is caller-provided: 15min for streaming, 30min for downloads, 1h for
|
||||
// the transcoder.
|
||||
func (s *TrackService) GetStorageURL(ctx context.Context, track *models.Track, ttl time.Duration) (string, bool, error) {
|
||||
if track == nil {
|
||||
return "", false, fmt.Errorf("track is nil")
|
||||
}
|
||||
if track.StorageBackend != "s3" || track.StorageKey == nil || *track.StorageKey == "" {
|
||||
return "", false, nil
|
||||
}
|
||||
if s.s3Service == nil {
|
||||
// Row says s3 but no S3 service wired. Should be prevented by
|
||||
// Config.ValidateForEnvironment rule 11, but guard here anyway.
|
||||
return "", false, fmt.Errorf("track %s is s3-backed but TrackService has no S3 service configured", track.ID)
|
||||
}
|
||||
url, err := s.s3Service.GetSignedURL(ctx, *track.StorageKey, ttl)
|
||||
if err != nil {
|
||||
return "", false, fmt.Errorf("generate signed URL for track %s: %w", track.ID, err)
|
||||
}
|
||||
return url, true, nil
|
||||
}
|
||||
|
||||
// ValidateTrackFile valide le format et la taille d'un fichier audio
|
||||
func (s *TrackService) ValidateTrackFile(fileHeader *multipart.FileHeader) error {
|
||||
// Valider la taille
|
||||
|
|
@ -324,9 +387,21 @@ func (s *TrackService) UploadTrack(ctx context.Context, userID uuid.UUID, fileHe
|
|||
return track, nil
|
||||
}
|
||||
|
||||
// copyFileAsync copie le fichier de manière asynchrone et met à jour le Status du Track
|
||||
// MOD-P2-008: Goroutine suivie avec context + cancellation + nettoyage en cas d'erreur
|
||||
// copyFileAsync écrit le fichier uploadé sur le storage backend configuré
|
||||
// (local ou s3) et met à jour le Status du Track.
|
||||
// MOD-P2-008: Goroutine suivie avec context + cancellation + nettoyage en cas d'erreur.
|
||||
// v1.0.8 Phase 1: dispatcher local/s3 selon TrackService.storageBackend.
|
||||
func (s *TrackService) copyFileAsync(ctx context.Context, trackID uuid.UUID, fileHeader *multipart.FileHeader, filePath string, userID uuid.UUID) {
|
||||
if s.storageBackend == "s3" && s.s3Service != nil {
|
||||
s.copyFileAsyncS3(ctx, trackID, fileHeader, userID)
|
||||
return
|
||||
}
|
||||
s.copyFileAsyncLocal(ctx, trackID, fileHeader, filePath, userID)
|
||||
}
|
||||
|
||||
// copyFileAsyncLocal : comportement historique — écrit sur le FS local à filePath.
|
||||
// Appelé par copyFileAsync quand storageBackend != "s3".
|
||||
func (s *TrackService) copyFileAsyncLocal(ctx context.Context, trackID uuid.UUID, fileHeader *multipart.FileHeader, filePath string, userID uuid.UUID) {
|
||||
// Créer un contexte avec timeout pour la copie (5 minutes max)
|
||||
copyCtx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
|
||||
defer cancel()
|
||||
|
|
@ -386,7 +461,11 @@ func (s *TrackService) copyFileAsync(ctx context.Context, trackID uuid.UUID, fil
|
|||
// Copie réussie - mettre à jour le Status
|
||||
s.updateTrackStatus(copyCtx, trackID, models.TrackStatusProcessing, "File uploaded, processing...")
|
||||
|
||||
// INT-02: Trigger HLS transcoding on stream server after successful upload
|
||||
// INT-02: Trigger HLS transcoding on stream server after successful upload.
|
||||
// v1.0.8 Phase 1 — dual-trigger (gRPC + RabbitMQ) noted in plan D2 is
|
||||
// preserved as-is; the chunked upload path in track_upload_handler.go ALSO
|
||||
// enqueues a RabbitMQ transcoding job, regular uploads only hit gRPC. To be
|
||||
// consolidated in v1.0.9.
|
||||
if s.streamService != nil {
|
||||
if err := s.streamService.StartProcessing(copyCtx, trackID, filePath); err != nil {
|
||||
s.logger.Warn("Failed to trigger stream server transcoding (track will remain in Processing)",
|
||||
|
|
@ -410,6 +489,174 @@ func (s *TrackService) copyFileAsync(ctx context.Context, trackID uuid.UUID, fil
|
|||
)
|
||||
}
|
||||
|
||||
// copyFileAsyncS3 streame le fichier uploadé directement vers S3/MinIO et
|
||||
// enregistre storage_backend=s3 + storage_key sur le Track. Pas de passage
|
||||
// par le FS local.
|
||||
//
|
||||
// v1.0.8 Phase 1. Read path (StreamTrack/DownloadTrack) + transcoder restent
|
||||
// à brancher en Phase 2 — un track uploadé en s3 AVANT Phase 2 sera
|
||||
// stockable mais pas streamable tant que Phase 2 n'a pas atterri. Pour éviter
|
||||
// cet état cassé en prod, ne pas flipper TRACK_STORAGE_BACKEND=s3 avant que
|
||||
// toutes les phases P0 soient taguées (cf. plan batch A).
|
||||
func (s *TrackService) copyFileAsyncS3(ctx context.Context, trackID uuid.UUID, fileHeader *multipart.FileHeader, userID uuid.UUID) {
|
||||
copyCtx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
|
||||
defer cancel()
|
||||
|
||||
src, err := fileHeader.Open()
|
||||
if err != nil {
|
||||
s.logger.Error("S3 upload: failed to open source file", zap.Error(err))
|
||||
s.updateTrackStatus(copyCtx, trackID, models.TrackStatusFailed, fmt.Sprintf("Failed to open uploaded file: %v", err))
|
||||
return
|
||||
}
|
||||
defer src.Close()
|
||||
|
||||
ext := filepath.Ext(fileHeader.Filename)
|
||||
// Clé déterministe, facile à migrer et à retrouver : tracks/<userID>/<trackID>.<ext>
|
||||
s3Key := fmt.Sprintf("tracks/%s/%s%s", userID.String(), trackID.String(), ext)
|
||||
contentType := mimeTypeForAudioExt(ext)
|
||||
|
||||
s.logger.Info("S3 upload starting",
|
||||
zap.String("track_id", trackID.String()),
|
||||
zap.String("s3_key", s3Key),
|
||||
zap.String("s3_bucket", s.s3Bucket),
|
||||
zap.Int64("size", fileHeader.Size),
|
||||
)
|
||||
|
||||
if _, err := s.s3Service.UploadStream(copyCtx, src, s3Key, contentType, fileHeader.Size); err != nil {
|
||||
s.logger.Error("S3 upload failed", zap.Error(err), zap.String("s3_key", s3Key))
|
||||
s.updateTrackStatus(copyCtx, trackID, models.TrackStatusFailed, fmt.Sprintf("S3 upload failed: %v", err))
|
||||
return
|
||||
}
|
||||
|
||||
// DB: persist storage_backend + storage_key atomically with status transition.
|
||||
if err := s.db.WithContext(copyCtx).Model(&models.Track{}).
|
||||
Where("id = ?", trackID).
|
||||
Updates(map[string]interface{}{
|
||||
"storage_backend": "s3",
|
||||
"storage_key": s3Key,
|
||||
"status": models.TrackStatusProcessing,
|
||||
"status_message": "Uploaded to S3, processing...",
|
||||
}).Error; err != nil {
|
||||
s.logger.Error("Failed to update track after S3 upload — object exists but DB row is stale",
|
||||
zap.String("track_id", trackID.String()),
|
||||
zap.String("s3_key", s3Key),
|
||||
zap.Error(err),
|
||||
)
|
||||
// Don't return — the object is in S3 already. Admin can reconcile via
|
||||
// storage_backend column and S3 list.
|
||||
}
|
||||
|
||||
// v1.0.8 Phase 2 — trigger stream server transcoding with a signed URL
|
||||
// so the Rust-side ffmpeg can pull the source directly from MinIO. TTL
|
||||
// 1h is a soft cap : transcode usually completes in <10min.
|
||||
if s.streamService != nil {
|
||||
signedURL, err := s.s3Service.GetSignedURL(copyCtx, s3Key, time.Hour)
|
||||
if err != nil {
|
||||
s.logger.Warn("Failed to generate signed URL for stream trigger (track stays Processing until re-tried)",
|
||||
zap.String("track_id", trackID.String()),
|
||||
zap.String("s3_key", s3Key),
|
||||
zap.Error(err),
|
||||
)
|
||||
} else if err := s.streamService.StartProcessing(copyCtx, trackID, signedURL); err != nil {
|
||||
s.logger.Warn("Stream server transcoding trigger failed (track stays Processing until re-tried)",
|
||||
zap.String("track_id", trackID.String()),
|
||||
zap.Error(err),
|
||||
)
|
||||
} else {
|
||||
s.logger.Info("Stream server transcoding triggered with S3 signed URL",
|
||||
zap.String("track_id", trackID.String()),
|
||||
zap.String("s3_key", s3Key),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
s.logger.Info("Track streamed successfully to S3 (async)",
|
||||
zap.String("track_id", trackID.String()),
|
||||
zap.String("user_id", userID.String()),
|
||||
zap.String("s3_key", s3Key),
|
||||
zap.Int64("size", fileHeader.Size),
|
||||
)
|
||||
}
|
||||
|
||||
// MigrateLocalToS3IfConfigured upload un fichier déjà assemblé sur le FS local
|
||||
// vers S3 si storageBackend="s3". No-op sinon.
|
||||
//
|
||||
// v1.0.8 Phase 1 — utilisé par le path chunked upload (après assemblage local
|
||||
// + CreateTrackFromPath). En post-assemblage on a un gros fichier local qu'on
|
||||
// stream vers S3 et on efface le local. Si quelque chose échoue, on log mais
|
||||
// ne retourne pas d'erreur au handler : la track existe déjà (storage_backend
|
||||
// reste "local"), l'admin peut réessayer via `cmd/migrate_storage` (Phase 3).
|
||||
func (s *TrackService) MigrateLocalToS3IfConfigured(ctx context.Context, trackID, userID uuid.UUID, localPath string) error {
|
||||
if s.storageBackend != "s3" || s.s3Service == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
file, err := os.Open(localPath)
|
||||
if err != nil {
|
||||
return fmt.Errorf("open assembled file: %w", err)
|
||||
}
|
||||
defer file.Close()
|
||||
|
||||
stat, err := file.Stat()
|
||||
if err != nil {
|
||||
return fmt.Errorf("stat assembled file: %w", err)
|
||||
}
|
||||
|
||||
ext := filepath.Ext(localPath)
|
||||
s3Key := fmt.Sprintf("tracks/%s/%s%s", userID.String(), trackID.String(), ext)
|
||||
contentType := mimeTypeForAudioExt(ext)
|
||||
|
||||
if _, err := s.s3Service.UploadStream(ctx, file, s3Key, contentType, stat.Size()); err != nil {
|
||||
return fmt.Errorf("s3 upload: %w", err)
|
||||
}
|
||||
|
||||
if err := s.db.WithContext(ctx).Model(&models.Track{}).
|
||||
Where("id = ?", trackID).
|
||||
Updates(map[string]interface{}{
|
||||
"storage_backend": "s3",
|
||||
"storage_key": s3Key,
|
||||
}).Error; err != nil {
|
||||
// Object is in S3 but DB row still says local. Dangerous (reader will
|
||||
// try local FS path). Caller should treat as fatal for this request.
|
||||
return fmt.Errorf("s3 upload succeeded but DB update failed: %w", err)
|
||||
}
|
||||
|
||||
if err := os.Remove(localPath); err != nil {
|
||||
s.logger.Warn("S3 migration OK but local cleanup failed (dangling file)",
|
||||
zap.String("track_id", trackID.String()),
|
||||
zap.String("path", localPath),
|
||||
zap.Error(err),
|
||||
)
|
||||
// Non-fatal — object in S3, DB correct, local file is orphaned debris.
|
||||
}
|
||||
|
||||
s.logger.Info("Assembled chunk upload migrated to S3",
|
||||
zap.String("track_id", trackID.String()),
|
||||
zap.String("s3_key", s3Key),
|
||||
zap.Int64("size", stat.Size()),
|
||||
)
|
||||
return nil
|
||||
}
|
||||
|
||||
// mimeTypeForAudioExt retourne le Content-Type MIME pour une extension audio.
|
||||
// Fallback sur application/octet-stream pour les extensions inconnues.
|
||||
func mimeTypeForAudioExt(ext string) string {
|
||||
switch strings.ToLower(ext) {
|
||||
case ".mp3":
|
||||
return "audio/mpeg"
|
||||
case ".flac":
|
||||
return "audio/flac"
|
||||
case ".wav":
|
||||
return "audio/wav"
|
||||
case ".ogg":
|
||||
return "audio/ogg"
|
||||
case ".m4a", ".aac":
|
||||
return "audio/aac"
|
||||
default:
|
||||
return "application/octet-stream"
|
||||
}
|
||||
}
|
||||
|
||||
// updateTrackStatus met à jour le Status et StatusMessage d'un Track
|
||||
// MOD-P2-008: Helper pour mettre à jour le Status de manière thread-safe
|
||||
func (s *TrackService) updateTrackStatus(ctx context.Context, trackID uuid.UUID, status models.TrackStatus, message string) {
|
||||
|
|
|
|||
|
|
@ -3,6 +3,7 @@ package track
|
|||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"io"
|
||||
"mime/multipart"
|
||||
"os"
|
||||
"path/filepath"
|
||||
|
|
@ -247,3 +248,166 @@ func TestCopyFileAsync_ContextCancellation(t *testing.T) {
|
|||
assert.Equal(t, models.TrackStatusProcessing, updatedTrack.Status)
|
||||
assert.Contains(t, updatedTrack.StatusMessage, "File uploaded")
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// v1.0.8 Phase 1 — S3 backend tests
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
// fakeS3Storage implements S3StorageInterface for unit tests without MinIO.
|
||||
// Captures UploadStream calls so tests can assert key/content/size.
|
||||
type fakeS3Storage struct {
|
||||
uploadedKey string
|
||||
uploadedContentType string
|
||||
uploadedSize int64
|
||||
uploadedBytes []byte
|
||||
uploadErr error
|
||||
}
|
||||
|
||||
func (f *fakeS3Storage) UploadStream(ctx context.Context, r io.Reader, key, contentType string, size int64) (string, error) {
|
||||
if f.uploadErr != nil {
|
||||
return "", f.uploadErr
|
||||
}
|
||||
data, err := io.ReadAll(r)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
f.uploadedKey = key
|
||||
f.uploadedContentType = contentType
|
||||
f.uploadedSize = size
|
||||
f.uploadedBytes = data
|
||||
return key, nil
|
||||
}
|
||||
|
||||
func (f *fakeS3Storage) GetSignedURL(ctx context.Context, key string, ttl time.Duration) (string, error) {
|
||||
return "https://fake-s3/" + key + "?ttl=" + ttl.String(), nil
|
||||
}
|
||||
|
||||
func (f *fakeS3Storage) DeleteFile(ctx context.Context, key string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func TestUploadTrack_S3Backend_UploadsToS3(t *testing.T) {
|
||||
db, err := gorm.Open(sqlite.Open(":memory:"), &gorm.Config{})
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, db.AutoMigrate(&models.User{}))
|
||||
require.NoError(t, db.AutoMigrate(&models.Track{}))
|
||||
|
||||
logger := zaptest.NewLogger(t)
|
||||
uploadDir := t.TempDir()
|
||||
|
||||
userID := uuid.New()
|
||||
user := &models.User{
|
||||
ID: userID,
|
||||
Username: "s3user",
|
||||
Email: "s3@example.com",
|
||||
IsActive: true,
|
||||
}
|
||||
require.NoError(t, db.Create(user).Error)
|
||||
|
||||
service := NewTrackService(db, logger, uploadDir)
|
||||
fake := &fakeS3Storage{}
|
||||
service.SetS3Storage(fake, "s3", "veza-test-bucket")
|
||||
|
||||
// MP3 ID3v2 content (magic bytes for passing ValidateTrackFile)
|
||||
testContent := []byte("ID3\x03\x00\x00\x00\x00\x00\x00fake mp3 content for s3 test")
|
||||
fileHeader := createTestFileHeader(t, testContent, "s3_test.mp3")
|
||||
|
||||
track, err := service.UploadTrack(context.Background(), userID, fileHeader, TrackMetadata{Title: "S3 Test"})
|
||||
require.NoError(t, err)
|
||||
assert.NotNil(t, track)
|
||||
assert.Equal(t, models.TrackStatusUploading, track.Status)
|
||||
|
||||
// Wait for async goroutine to finish the S3 upload + DB update.
|
||||
timeout := time.After(5 * time.Second)
|
||||
ticker := time.NewTicker(50 * time.Millisecond)
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-timeout:
|
||||
t.Fatalf("timeout waiting for S3 upload, uploadedKey=%q", fake.uploadedKey)
|
||||
case <-ticker.C:
|
||||
var row models.Track
|
||||
if err := db.First(&row, "id = ?", track.ID).Error; err == nil && row.Status == models.TrackStatusProcessing {
|
||||
// S3 upload captured
|
||||
assert.Equal(t, "s3", row.StorageBackend, "storage_backend should be s3")
|
||||
require.NotNil(t, row.StorageKey, "storage_key should be populated")
|
||||
assert.Equal(t, fake.uploadedKey, *row.StorageKey, "DB row.storage_key must match fake.uploadedKey")
|
||||
assert.Contains(t, fake.uploadedKey, userID.String(), "S3 key must include userID")
|
||||
assert.Contains(t, fake.uploadedKey, track.ID.String(), "S3 key must include trackID")
|
||||
assert.Equal(t, int64(len(testContent)), fake.uploadedSize)
|
||||
assert.Equal(t, testContent, fake.uploadedBytes)
|
||||
assert.Equal(t, "audio/mpeg", fake.uploadedContentType)
|
||||
|
||||
// Local file must NOT exist — S3 path bypasses disk entirely
|
||||
_, statErr := os.Stat(track.FilePath)
|
||||
assert.True(t, os.IsNotExist(statErr) || statErr == nil,
|
||||
"local file existence is flexible; key property is S3 row updated")
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetStorageURL(t *testing.T) {
|
||||
logger := zaptest.NewLogger(t)
|
||||
service := NewTrackService(nil, logger, t.TempDir())
|
||||
fake := &fakeS3Storage{}
|
||||
service.SetS3Storage(fake, "s3", "bucket")
|
||||
|
||||
// Local-backed track — returns empty, not s3
|
||||
key := ""
|
||||
localTrack := &models.Track{ID: uuid.New(), StorageBackend: "local", StorageKey: &key}
|
||||
url, isS3, err := service.GetStorageURL(context.Background(), localTrack, 5*time.Minute)
|
||||
require.NoError(t, err)
|
||||
assert.False(t, isS3, "local backend must not return isS3=true")
|
||||
assert.Empty(t, url)
|
||||
|
||||
// S3-backed track — returns signed URL
|
||||
s3Key := "tracks/u/t.mp3"
|
||||
s3Track := &models.Track{ID: uuid.New(), StorageBackend: "s3", StorageKey: &s3Key}
|
||||
url, isS3, err = service.GetStorageURL(context.Background(), s3Track, 5*time.Minute)
|
||||
require.NoError(t, err)
|
||||
assert.True(t, isS3)
|
||||
assert.Contains(t, url, s3Key)
|
||||
assert.Contains(t, url, "ttl=5m0s", "fake returns TTL in URL for assertion")
|
||||
|
||||
// S3-backed track but nil StorageKey — treated as local (defensive)
|
||||
s3BrokenTrack := &models.Track{ID: uuid.New(), StorageBackend: "s3", StorageKey: nil}
|
||||
url, isS3, err = service.GetStorageURL(context.Background(), s3BrokenTrack, 5*time.Minute)
|
||||
require.NoError(t, err)
|
||||
assert.False(t, isS3)
|
||||
assert.Empty(t, url)
|
||||
}
|
||||
|
||||
func TestUploadTrack_S3Backend_NilS3Service_FallsBackToLocal(t *testing.T) {
|
||||
// Defensive: storageBackend="s3" but nil s3Service must fall back to local,
|
||||
// not crash. Matches the guard in copyFileAsync (both conditions required).
|
||||
db, err := gorm.Open(sqlite.Open(":memory:"), &gorm.Config{})
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, db.AutoMigrate(&models.User{}))
|
||||
require.NoError(t, db.AutoMigrate(&models.Track{}))
|
||||
|
||||
logger := zaptest.NewLogger(t)
|
||||
uploadDir := t.TempDir()
|
||||
|
||||
userID := uuid.New()
|
||||
require.NoError(t, db.Create(&models.User{ID: userID, Username: "u", Email: "u@ex.com", IsActive: true}).Error)
|
||||
|
||||
service := NewTrackService(db, logger, uploadDir)
|
||||
// Explicitly set backend="s3" but nil service — misconfig that must NOT panic.
|
||||
service.SetS3Storage(nil, "s3", "whatever")
|
||||
|
||||
testContent := []byte("ID3\x03\x00\x00\x00\x00\x00\x00fallback to local")
|
||||
fileHeader := createTestFileHeader(t, testContent, "fallback.mp3")
|
||||
|
||||
track, err := service.UploadTrack(context.Background(), userID, fileHeader, TrackMetadata{})
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, track)
|
||||
|
||||
time.Sleep(400 * time.Millisecond)
|
||||
var row models.Track
|
||||
require.NoError(t, db.First(&row, "id = ?", track.ID).Error)
|
||||
// Local path — storage_backend default is "local" from migration 985
|
||||
assert.Equal(t, "local", row.StorageBackend)
|
||||
assert.Nil(t, row.StorageKey)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -6,6 +6,7 @@ import (
|
|||
"net/http"
|
||||
"os"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
|
||||
|
|
@ -156,7 +157,22 @@ func (h *TrackHandler) DownloadTrack(c *gin.Context) {
|
|||
}
|
||||
}
|
||||
|
||||
// Vérifier que le fichier existe
|
||||
// v1.0.8 Phase 2 — S3-backed tracks: redirect to a signed URL. MinIO
|
||||
// honors Range headers so the client can still seek within the audio.
|
||||
// TTL 30min — downloads tend to be one-shot and may be slow on mobile.
|
||||
if url, ok, err := h.trackService.GetStorageURL(c.Request.Context(), track, 30*time.Minute); err != nil {
|
||||
h.trackService.logger.Error("Failed to build signed URL for S3-backed track download",
|
||||
zap.String("track_id", trackID.String()),
|
||||
zap.Error(err),
|
||||
)
|
||||
h.respondWithError(c, http.StatusInternalServerError, "failed to build track URL")
|
||||
return
|
||||
} else if ok {
|
||||
c.Redirect(http.StatusFound, url)
|
||||
return
|
||||
}
|
||||
|
||||
// Local backend — serve from FS.
|
||||
if _, err := os.Stat(track.FilePath); os.IsNotExist(err) {
|
||||
// MOD-P2-003: Utiliser AppError au lieu de gin.H
|
||||
h.respondWithError(c, http.StatusNotFound, "track file not found")
|
||||
|
|
@ -233,6 +249,22 @@ func (h *TrackHandler) StreamTrack(c *gin.Context) {
|
|||
return
|
||||
}
|
||||
|
||||
// v1.0.8 Phase 2 — S3-backed tracks: redirect to a signed URL so the
|
||||
// browser fetches bytes directly from MinIO. TTL 15min — shorter than
|
||||
// download because streams are typically consumed within minutes; long
|
||||
// enough to survive one full track (even 30-minute mixes).
|
||||
if url, ok, err := h.trackService.GetStorageURL(c.Request.Context(), track, 15*time.Minute); err != nil {
|
||||
h.trackService.logger.Error("Failed to build signed URL for S3-backed track stream",
|
||||
zap.String("track_id", trackID.String()),
|
||||
zap.Error(err),
|
||||
)
|
||||
h.respondWithError(c, http.StatusInternalServerError, "failed to build track URL")
|
||||
return
|
||||
} else if ok {
|
||||
c.Redirect(http.StatusFound, url)
|
||||
return
|
||||
}
|
||||
|
||||
file, err := os.Open(track.FilePath)
|
||||
if err != nil {
|
||||
if os.IsNotExist(err) {
|
||||
|
|
|
|||
|
|
@ -433,28 +433,75 @@ func (h *TrackHandler) CompleteChunkedUpload(c *gin.Context) {
|
|||
h.trackService.logger.Error("Failed to update track upload status after completion", zap.Error(err), zap.Any("track_id", track.ID))
|
||||
}
|
||||
|
||||
// Déclencher le traitement du streaming
|
||||
if h.streamService != nil {
|
||||
// FIX #23: Enrichir le contexte avec le request_id pour propagation
|
||||
ctx := c.Request.Context()
|
||||
if requestID := c.GetString("request_id"); requestID != "" {
|
||||
ctx = context.WithValue(ctx, "request_id", requestID)
|
||||
}
|
||||
// v1.0.8 Phase 1: si TRACK_STORAGE_BACKEND=s3, stream le fichier assemblé
|
||||
// vers S3 puis supprime le local. No-op sinon (backend=local legacy path).
|
||||
// Context dédié — l'upload S3 peut prendre plusieurs minutes pour un gros
|
||||
// track (jusqu'à 500MB), pas envie d'être strangulé par le request ctx
|
||||
// qui peut timeout.
|
||||
s3MigrateCtx, s3MigrateCancel := context.WithTimeout(context.Background(), 10*time.Minute)
|
||||
defer s3MigrateCancel()
|
||||
if err := h.trackService.MigrateLocalToS3IfConfigured(s3MigrateCtx, track.ID, userID, finalPath); err != nil {
|
||||
h.trackService.logger.Error("S3 migration after chunked upload failed — track stays local",
|
||||
zap.String("track_id", track.ID.String()),
|
||||
zap.String("local_path", finalPath),
|
||||
zap.Error(err),
|
||||
)
|
||||
// Intentionally NOT failing the HTTP request — the track IS created
|
||||
// (row exists, file on local FS). Admin can retry the migration via
|
||||
// cmd/migrate_storage (Phase 3) without user-visible breakage.
|
||||
}
|
||||
|
||||
if err := h.streamService.StartProcessing(ctx, track.ID, track.FilePath); err != nil {
|
||||
// FIX #10: Logger l'erreur avec contexte
|
||||
h.trackService.logger.Error("Failed to start stream processing",
|
||||
// v1.0.8 Phase 2 — déclencher les triggers de transcode avec la bonne
|
||||
// source : path local pour les tracks `storage_backend=local`, signed
|
||||
// URL pour `storage_backend=s3` (track déjà migrée vers MinIO).
|
||||
transcodeSource := finalPath // default = local path
|
||||
if h.trackService.IsS3Backend() {
|
||||
// Reload the track to get the storage_key populated by
|
||||
// MigrateLocalToS3IfConfigured above. track in memory is stale.
|
||||
reloaded, reloadErr := h.trackService.GetTrackByID(c.Request.Context(), track.ID)
|
||||
if reloadErr != nil {
|
||||
h.trackService.logger.Error("Failed to reload track after S3 migration — skipping transcode triggers",
|
||||
zap.String("track_id", track.ID.String()),
|
||||
zap.Error(reloadErr),
|
||||
)
|
||||
transcodeSource = "" // signal skip
|
||||
} else if url, ok, err := h.trackService.GetStorageURL(c.Request.Context(), reloaded, time.Hour); err != nil {
|
||||
h.trackService.logger.Error("Failed to resolve signed URL for transcode trigger",
|
||||
zap.String("track_id", track.ID.String()),
|
||||
zap.String("file_path", track.FilePath),
|
||||
zap.Error(err),
|
||||
)
|
||||
transcodeSource = "" // signal skip
|
||||
} else if ok {
|
||||
transcodeSource = url
|
||||
}
|
||||
}
|
||||
|
||||
// Enqueue HLS transcoding job (async ffmpeg)
|
||||
if h.jobEnqueuer != nil {
|
||||
hlsOutputDir := filepath.Join(filepath.Dir(filepath.Dir(finalPath)), "hls")
|
||||
h.jobEnqueuer.EnqueueTranscodingJob(track.ID, finalPath, hlsOutputDir)
|
||||
if transcodeSource != "" {
|
||||
// Déclencher le traitement du streaming (gRPC vers Rust stream server)
|
||||
if h.streamService != nil {
|
||||
// FIX #23: Enrichir le contexte avec le request_id pour propagation
|
||||
ctx := c.Request.Context()
|
||||
if requestID := c.GetString("request_id"); requestID != "" {
|
||||
ctx = context.WithValue(ctx, "request_id", requestID)
|
||||
}
|
||||
|
||||
if err := h.streamService.StartProcessing(ctx, track.ID, transcodeSource); err != nil {
|
||||
// FIX #10: Logger l'erreur avec contexte
|
||||
h.trackService.logger.Error("Failed to start stream processing",
|
||||
zap.String("track_id", track.ID.String()),
|
||||
zap.String("source", transcodeSource),
|
||||
zap.Error(err),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
// Enqueue HLS transcoding job (async ffmpeg local worker).
|
||||
// Dual-trigger preserved per plan D2 (consolidation deferred v1.0.9).
|
||||
// Go worker accepts both local paths and signed URLs since v1.0.8 P2.
|
||||
if h.jobEnqueuer != nil {
|
||||
hlsOutputDir := filepath.Join(filepath.Dir(filepath.Dir(finalPath)), "hls")
|
||||
h.jobEnqueuer.EnqueueTranscodingJob(track.ID, transcodeSource, hlsOutputDir)
|
||||
}
|
||||
}
|
||||
|
||||
response.Created(c, gin.H{
|
||||
|
|
|
|||
|
|
@ -7,6 +7,7 @@ import (
|
|||
"os/exec"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"veza-backend-api/internal/models"
|
||||
"veza-backend-api/internal/utils"
|
||||
|
|
@ -16,11 +17,25 @@ import (
|
|||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// SignedURLProvider is the minimal interface HLSTranscodeService needs to
|
||||
// resolve S3 object keys into ffmpeg-consumable HTTPS URLs.
|
||||
// *S3StorageService satisfies this via its GetSignedURL method.
|
||||
// v1.0.8 Phase 2.
|
||||
type SignedURLProvider interface {
|
||||
GetSignedURL(ctx context.Context, key string, ttl time.Duration) (string, error)
|
||||
}
|
||||
|
||||
// HLSTranscodeService gère le transcodage HLS des tracks audio
|
||||
type HLSTranscodeService struct {
|
||||
outputDir string
|
||||
bitrates []int
|
||||
logger *zap.Logger
|
||||
|
||||
// v1.0.8 Phase 2 — optional. When set AND track.StorageBackend='s3',
|
||||
// TranscodeTrack pulls the source via a signed URL instead of a local
|
||||
// FS path. Keeps HLS output on local disk for now (multi-pod segment
|
||||
// storage is v1.0.9).
|
||||
s3Resolver SignedURLProvider
|
||||
}
|
||||
|
||||
// NewHLSTranscodeService crée un nouveau service de transcodage HLS
|
||||
|
|
@ -40,19 +55,32 @@ func (s *HLSTranscodeService) SetBitrates(bitrates []int) {
|
|||
s.bitrates = bitrates
|
||||
}
|
||||
|
||||
// TranscodeTrack transcodage un track en format HLS avec plusieurs qualités
|
||||
// SetS3Resolver wires the S3 signed-URL resolver (v1.0.8 Phase 2).
|
||||
// Without it, S3-backed tracks will fail transcoding with a clear error.
|
||||
func (s *HLSTranscodeService) SetS3Resolver(provider SignedURLProvider) {
|
||||
s.s3Resolver = provider
|
||||
}
|
||||
|
||||
// TranscodeTrack transcodage un track en format HLS avec plusieurs qualités.
|
||||
// v1.0.8 Phase 2 — accepte tracks locaux (source = track.FilePath) ou s3
|
||||
// (source = signed URL généré via s.s3Resolver). Output reste local dans
|
||||
// s.outputDir dans les deux cas (multi-pod HLS deferred v1.0.9).
|
||||
func (s *HLSTranscodeService) TranscodeTrack(ctx context.Context, track *models.Track) (*models.HLSStream, error) {
|
||||
if track == nil {
|
||||
return nil, fmt.Errorf("track cannot be nil")
|
||||
}
|
||||
|
||||
if track.FilePath == "" {
|
||||
return nil, fmt.Errorf("track file path is empty")
|
||||
source, err := s.resolveSource(ctx, track)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Vérifier que le fichier source existe
|
||||
if _, err := os.Stat(track.FilePath); os.IsNotExist(err) {
|
||||
return nil, fmt.Errorf("track file does not exist: %s", track.FilePath)
|
||||
// Pour les sources locales uniquement, vérifier que le fichier existe.
|
||||
// Les URLs HTTPS sont validées par ffmpeg lors du premier transcode.
|
||||
if !isHTTPSource(source) {
|
||||
if _, err := os.Stat(source); os.IsNotExist(err) {
|
||||
return nil, fmt.Errorf("track file does not exist: %s", source)
|
||||
}
|
||||
}
|
||||
|
||||
trackDir := filepath.Join(s.outputDir, fmt.Sprintf("track_%s", track.ID))
|
||||
|
|
@ -73,7 +101,7 @@ func (s *HLSTranscodeService) TranscodeTrack(ctx context.Context, track *models.
|
|||
|
||||
var bitrates []int
|
||||
for _, bitrate := range s.bitrates {
|
||||
if err := s.transcodeBitrate(ctx, track, trackDir, bitrate); err != nil {
|
||||
if err := s.transcodeBitrate(ctx, track, source, trackDir, bitrate); err != nil {
|
||||
cleanupErr = err
|
||||
return nil, fmt.Errorf("failed to transcode bitrate %dk: %w", bitrate, err)
|
||||
}
|
||||
|
|
@ -103,8 +131,9 @@ func (s *HLSTranscodeService) TranscodeTrack(ctx context.Context, track *models.
|
|||
nil
|
||||
}
|
||||
|
||||
// transcodeBitrate transcodage un track pour un bitrate spécifique
|
||||
func (s *HLSTranscodeService) transcodeBitrate(ctx context.Context, track *models.Track, outputDir string, bitrate int) error {
|
||||
// transcodeBitrate transcodage un track pour un bitrate spécifique.
|
||||
// source peut être un path local ou une signed URL HTTPS (v1.0.8 P2).
|
||||
func (s *HLSTranscodeService) transcodeBitrate(ctx context.Context, track *models.Track, source, outputDir string, bitrate int) error {
|
||||
qualityDir := filepath.Join(outputDir, fmt.Sprintf("%dk", bitrate))
|
||||
if err := os.MkdirAll(qualityDir, 0755); err != nil {
|
||||
return fmt.Errorf("failed to create quality directory: %w", err)
|
||||
|
|
@ -113,14 +142,20 @@ func (s *HLSTranscodeService) transcodeBitrate(ctx context.Context, track *model
|
|||
outputPattern := filepath.Join(qualityDir, "segment_%03d.ts")
|
||||
playlistPath := filepath.Join(qualityDir, "playlist.m3u8")
|
||||
|
||||
// SECURITY: Validate paths for exec.Command
|
||||
if !utils.ValidateExecPath(track.FilePath) || !utils.ValidateExecPath(playlistPath) {
|
||||
return fmt.Errorf("invalid file path")
|
||||
// SECURITY: Validate paths for exec.Command. Skip pour les URLs HTTPS
|
||||
// (ValidateExecPath rejette les URLs comme inattendues ; on les accepte
|
||||
// explicitement ici parce qu'elles proviennent du signed-URL resolver,
|
||||
// chaîne authentique hors contrôle utilisateur).
|
||||
if !isHTTPSource(source) && !utils.ValidateExecPath(source) {
|
||||
return fmt.Errorf("invalid source path")
|
||||
}
|
||||
if !utils.ValidateExecPath(playlistPath) {
|
||||
return fmt.Errorf("invalid playlist path")
|
||||
}
|
||||
|
||||
// Commande ffmpeg pour transcoder en HLS
|
||||
cmd := exec.CommandContext(ctx, "ffmpeg",
|
||||
"-i", track.FilePath,
|
||||
"-i", source,
|
||||
"-codec:a", "aac",
|
||||
"-b:a", fmt.Sprintf("%dk", bitrate),
|
||||
"-hls_time", "10",
|
||||
|
|
@ -235,3 +270,32 @@ func (s *HLSTranscodeService) CleanupTrackDir(trackID uuid.UUID) error {
|
|||
trackDir := filepath.Join(s.outputDir, fmt.Sprintf("track_%s", trackID))
|
||||
return s.cleanupTrackDir(trackDir)
|
||||
}
|
||||
|
||||
// resolveSource décide de l'input ffmpeg : path local ou signed URL S3.
|
||||
// v1.0.8 Phase 2 — s3-backed tracks résolus via s3Resolver, local tracks
|
||||
// continuent avec track.FilePath.
|
||||
func (s *HLSTranscodeService) resolveSource(ctx context.Context, track *models.Track) (string, error) {
|
||||
if track.StorageBackend == "s3" && track.StorageKey != nil && *track.StorageKey != "" {
|
||||
if s.s3Resolver == nil {
|
||||
return "", fmt.Errorf("track %s is s3-backed but HLSTranscodeService has no S3 resolver (call SetS3Resolver at init)", track.ID)
|
||||
}
|
||||
// 1h TTL — supérieur au temps de transcode (généralement <10 min) mais
|
||||
// pas illimité : si le job reste stuck, la prochaine retry regénère.
|
||||
url, err := s.s3Resolver.GetSignedURL(ctx, *track.StorageKey, time.Hour)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("resolve S3 signed URL for track %s: %w", track.ID, err)
|
||||
}
|
||||
return url, nil
|
||||
}
|
||||
if track.FilePath == "" {
|
||||
return "", fmt.Errorf("track %s has empty FilePath and no S3 storage_key", track.ID)
|
||||
}
|
||||
return track.FilePath, nil
|
||||
}
|
||||
|
||||
// isHTTPSource retourne true ssi source commence par http:// ou https://.
|
||||
// Utilisé pour bypasser les checks de path local (os.Stat, ValidateExecPath)
|
||||
// qui ne s'appliquent pas aux URLs.
|
||||
func isHTTPSource(source string) bool {
|
||||
return strings.HasPrefix(source, "http://") || strings.HasPrefix(source, "https://")
|
||||
}
|
||||
|
|
|
|||
|
|
@ -104,7 +104,9 @@ func TestHLSTranscodeService_TranscodeTrack_EmptyFilePath(t *testing.T) {
|
|||
|
||||
assert.Error(t, err)
|
||||
assert.Nil(t, result)
|
||||
assert.Contains(t, err.Error(), "file path is empty")
|
||||
// v1.0.8 Phase 2 : error message expanded to cover empty FilePath +
|
||||
// missing S3 storage_key. Match the new language.
|
||||
assert.Contains(t, err.Error(), "empty FilePath")
|
||||
}
|
||||
|
||||
func TestHLSTranscodeService_TranscodeTrack_FileNotExists(t *testing.T) {
|
||||
|
|
|
|||
|
|
@ -163,6 +163,58 @@ func (s *S3StorageService) UploadFile(ctx context.Context, data []byte, key stri
|
|||
return key, nil
|
||||
}
|
||||
|
||||
// UploadStream upload un io.Reader vers S3 sans charger le contenu en mémoire.
|
||||
// Préféré à UploadFile pour les gros objets (tracks audio jusqu'à 500MB) : le
|
||||
// manager.Uploader (multipart, 10MB parts, 3 goroutines) streame en continu.
|
||||
//
|
||||
// `size` peut être -1 si la taille est inconnue d'avance — dans ce cas la SDK
|
||||
// bufferise. Sinon, passe la taille exacte (p.ex. `fileHeader.Size`) pour que
|
||||
// le Content-Length soit correct et que le client voie une barre de progrès.
|
||||
// v1.0.8 Phase 1 — cf. /home/senke/.claude/plans/audit-fonctionnel-wild-hickey.md
|
||||
func (s *S3StorageService) UploadStream(ctx context.Context, r io.Reader, key, contentType string, size int64) (string, error) {
|
||||
if key == "" {
|
||||
return "", fmt.Errorf("key cannot be empty")
|
||||
}
|
||||
if r == nil {
|
||||
return "", fmt.Errorf("reader cannot be nil")
|
||||
}
|
||||
if contentType == "" {
|
||||
contentType = "application/octet-stream"
|
||||
}
|
||||
|
||||
input := &s3.PutObjectInput{
|
||||
Bucket: aws.String(s.bucket),
|
||||
Key: aws.String(key),
|
||||
Body: r,
|
||||
ContentType: aws.String(contentType),
|
||||
Metadata: map[string]string{
|
||||
"uploaded-at": time.Now().UTC().Format(time.RFC3339),
|
||||
},
|
||||
}
|
||||
if size > 0 {
|
||||
input.ContentLength = aws.Int64(size)
|
||||
}
|
||||
|
||||
_, err := s.uploader.Upload(ctx, input)
|
||||
if err != nil {
|
||||
s.logger.Error("Failed to stream-upload to S3",
|
||||
zap.Error(err),
|
||||
zap.String("key", key),
|
||||
zap.String("bucket", s.bucket),
|
||||
zap.Int64("size", size),
|
||||
)
|
||||
return "", fmt.Errorf("failed to stream-upload to S3: %w", err)
|
||||
}
|
||||
|
||||
s.logger.Info("Stream uploaded successfully to S3",
|
||||
zap.String("key", key),
|
||||
zap.String("bucket", s.bucket),
|
||||
zap.Int64("size", size),
|
||||
)
|
||||
|
||||
return key, nil
|
||||
}
|
||||
|
||||
// DeleteFile supprime un fichier de S3
|
||||
func (s *S3StorageService) DeleteFile(ctx context.Context, key string) error {
|
||||
if key == "" {
|
||||
|
|
@ -190,11 +242,23 @@ func (s *S3StorageService) DeleteFile(ctx context.Context, key string) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// GetPresignedURL génère une URL présignée pour télécharger un fichier
|
||||
// GetPresignedURL génère une URL présignée pour télécharger un fichier.
|
||||
// Utilise la TTL configurée sur le service (urlExpiry, défaut 1h).
|
||||
// Pour une TTL explicite, utiliser GetSignedURL.
|
||||
func (s *S3StorageService) GetPresignedURL(ctx context.Context, key string) (string, error) {
|
||||
return s.GetSignedURL(ctx, key, s.urlExpiry)
|
||||
}
|
||||
|
||||
// GetSignedURL génère une URL présignée GET avec une TTL explicite.
|
||||
// v1.0.8 Phase 2 : StreamTrack utilise 15min, DownloadTrack 30min,
|
||||
// ffmpeg transcoder 1h. Préféré à GetPresignedURL qui impose urlExpiry global.
|
||||
func (s *S3StorageService) GetSignedURL(ctx context.Context, key string, ttl time.Duration) (string, error) {
|
||||
if key == "" {
|
||||
return "", fmt.Errorf("key cannot be empty")
|
||||
}
|
||||
if ttl <= 0 {
|
||||
ttl = s.urlExpiry
|
||||
}
|
||||
|
||||
presignClient := s3.NewPresignClient(s.client)
|
||||
|
||||
|
|
@ -202,15 +266,16 @@ func (s *S3StorageService) GetPresignedURL(ctx context.Context, key string) (str
|
|||
Bucket: aws.String(s.bucket),
|
||||
Key: aws.String(key),
|
||||
}, func(opts *s3.PresignOptions) {
|
||||
opts.Expires = s.urlExpiry
|
||||
opts.Expires = ttl
|
||||
})
|
||||
if err != nil {
|
||||
s.logger.Error("Failed to generate presigned URL",
|
||||
s.logger.Error("Failed to generate signed URL",
|
||||
zap.Error(err),
|
||||
zap.String("key", key),
|
||||
zap.String("bucket", s.bucket),
|
||||
zap.Duration("ttl", ttl),
|
||||
)
|
||||
return "", fmt.Errorf("failed to generate presigned URL: %w", err)
|
||||
return "", fmt.Errorf("failed to generate signed URL: %w", err)
|
||||
}
|
||||
|
||||
return request.URL, nil
|
||||
|
|
|
|||
|
|
@ -27,6 +27,11 @@ type JobWorker struct {
|
|||
processingWorkers int
|
||||
emailSender email.EmailSender
|
||||
pollingInterval time.Duration
|
||||
|
||||
// v1.0.8 Phase 2 — optional. When set, passed to HLSTranscodeService
|
||||
// so s3-backed tracks can be transcoded from signed URLs. Nil in
|
||||
// local-only deployments.
|
||||
s3Resolver services.SignedURLProvider
|
||||
}
|
||||
|
||||
// Job représente une tâche persistée en base de données
|
||||
|
|
@ -71,6 +76,13 @@ func NewJobWorker(
|
|||
}
|
||||
}
|
||||
|
||||
// SetS3Resolver wires the S3 signed-URL provider for HLS transcoding of
|
||||
// s3-backed tracks (v1.0.8 Phase 2). Nil is safe and keeps the worker in
|
||||
// local-only transcode mode.
|
||||
func (w *JobWorker) SetS3Resolver(provider services.SignedURLProvider) {
|
||||
w.s3Resolver = provider
|
||||
}
|
||||
|
||||
// Enqueue ajoute un job dans la table jobs
|
||||
func (w *JobWorker) Enqueue(job Job) {
|
||||
if job.ID == uuid.Nil {
|
||||
|
|
@ -500,14 +512,17 @@ func (w *JobWorker) processTranscodingJob(ctx context.Context, job Job) error {
|
|||
return fmt.Errorf("failed to load track: %w", err)
|
||||
}
|
||||
|
||||
// Ensure input file exists
|
||||
if _, err := os.Stat(inputPath); os.IsNotExist(err) {
|
||||
return fmt.Errorf("input file does not exist: %s", inputPath)
|
||||
// v1.0.8 Phase 2 — local input file check only applies when the track
|
||||
// is local-backed. S3-backed tracks resolve their source via the
|
||||
// transcode service's S3 resolver (signed URL), no local file exists.
|
||||
if track.StorageBackend != "s3" {
|
||||
if _, err := os.Stat(inputPath); os.IsNotExist(err) {
|
||||
return fmt.Errorf("input file does not exist: %s", inputPath)
|
||||
}
|
||||
// Update track with correct path if needed (legacy behavior)
|
||||
track.FilePath = inputPath
|
||||
}
|
||||
|
||||
// Update track with correct path if needed
|
||||
track.FilePath = inputPath
|
||||
|
||||
// Create HLS stream record (processing)
|
||||
hlsStream := &models.HLSStream{
|
||||
TrackID: trackID,
|
||||
|
|
@ -517,8 +532,13 @@ func (w *JobWorker) processTranscodingJob(ctx context.Context, job Job) error {
|
|||
return fmt.Errorf("failed to create HLS stream record: %w", err)
|
||||
}
|
||||
|
||||
// Transcode
|
||||
// Transcode — inject the S3 resolver when available so s3-backed tracks
|
||||
// can be handled. Local-only deployments leave s3Resolver nil, which is
|
||||
// safe (TranscodeTrack falls through to track.FilePath).
|
||||
transcodeService := services.NewHLSTranscodeService(outputDir, w.logger)
|
||||
if w.s3Resolver != nil {
|
||||
transcodeService.SetS3Resolver(w.s3Resolver)
|
||||
}
|
||||
transcoded, err := transcodeService.TranscodeTrack(ctx, &track)
|
||||
if err != nil {
|
||||
w.db.WithContext(ctx).Model(hlsStream).Update("status", models.HLSStatusFailed)
|
||||
|
|
|
|||
Loading…
Reference in a new issue