diff --git a/veza-backend-api/internal/api/routes_tracks.go b/veza-backend-api/internal/api/routes_tracks.go index db9e2e61e..68da71bb1 100644 --- a/veza-backend-api/internal/api/routes_tracks.go +++ b/veza-backend-api/internal/api/routes_tracks.go @@ -28,6 +28,10 @@ func (r *APIRouter) setupTrackRoutes(router *gin.RouterGroup) { trackService.SetStreamService(streamService) // INT-02: Enable HLS pipeline for regular uploads discoverService := discovercore.NewService(r.db.GormDB, r.logger) trackService.SetDiscoverService(discoverService) // v0.10.1: tags/genres sync + // v1.0.8 Phase 1: wire S3 storage when configured. s3Service will be nil + // if AWS_S3_ENABLED=false, which leaves TrackService in local-only mode + // regardless of TrackStorageBackend value. + trackService.SetS3Storage(r.config.S3StorageService, r.config.TrackStorageBackend, r.config.S3Bucket) trackUploadService := services.NewTrackUploadService(r.db.GormDB, r.logger) var redisClient *redis.Client if r.config != nil { diff --git a/veza-backend-api/internal/core/track/service.go b/veza-backend-api/internal/core/track/service.go index 35daf2515..9e763caa7 100644 --- a/veza-backend-api/internal/core/track/service.go +++ b/veza-backend-api/internal/core/track/service.go @@ -58,9 +58,20 @@ type StreamServiceInterface interface { StartProcessing(ctx context.Context, trackID uuid.UUID, filePath string) error } +// S3StorageInterface defines the minimal S3 surface used by TrackService. +// v1.0.8 Phase 1 — narrow interface keeps the service testable without +// requiring real AWS credentials or a MinIO container in unit tests. +// *services.S3StorageService satisfies this interface. +type S3StorageInterface interface { + UploadStream(ctx context.Context, r io.Reader, key, contentType string, size int64) (string, error) + GetSignedURL(ctx context.Context, key string, ttl time.Duration) (string, error) + DeleteFile(ctx context.Context, key string) error +} + // TrackService gère les opérations sur les tracks // BE-SVC-001: Add cache service for track metadata // v0.943: Batch operations delegated to TrackBatchService +// v1.0.8: Optional S3 storage backend (TRACK_STORAGE_BACKEND=s3) type TrackService struct { db *gorm.DB // Write operations (and read fallback when readDB is nil) readDB *gorm.DB // Optional read replica for read-only operations @@ -71,6 +82,14 @@ type TrackService struct { streamService StreamServiceInterface // INT-02: Optional, triggers HLS transcoding after upload batchService *TrackBatchService // v0.943: batch operations discoverService *discover.Service // v0.10.1: tags/genres sync + + // v1.0.8 Phase 1 — storage backend (local vs S3/MinIO) + // storageBackend is "local" (default) or "s3". When "s3", copyFileAsync + // writes to s3Service instead of the local filesystem. + // Both remain nil/zero-value when running without S3. + s3Service S3StorageInterface + storageBackend string + s3Bucket string // for logging/metrics only } // forRead returns the DB to use for read operations (read replica if configured, else primary) @@ -128,6 +147,15 @@ func (s *TrackService) SetDiscoverService(d *discover.Service) { s.discoverService = d } +// SetS3Storage wires the S3 storage backend (v1.0.8 Phase 1). +// backend is expected to be "local" or "s3" (validated by Config.ValidateForEnvironment). +// Passing svc=nil silently keeps the service in local-only mode regardless of backend. +func (s *TrackService) SetS3Storage(svc S3StorageInterface, backend, bucket string) { + s.s3Service = svc + s.storageBackend = backend + s.s3Bucket = bucket +} + // ValidateTrackFile valide le format et la taille d'un fichier audio func (s *TrackService) ValidateTrackFile(fileHeader *multipart.FileHeader) error { // Valider la taille @@ -324,9 +352,21 @@ func (s *TrackService) UploadTrack(ctx context.Context, userID uuid.UUID, fileHe return track, nil } -// copyFileAsync copie le fichier de manière asynchrone et met à jour le Status du Track -// MOD-P2-008: Goroutine suivie avec context + cancellation + nettoyage en cas d'erreur +// copyFileAsync écrit le fichier uploadé sur le storage backend configuré +// (local ou s3) et met à jour le Status du Track. +// MOD-P2-008: Goroutine suivie avec context + cancellation + nettoyage en cas d'erreur. +// v1.0.8 Phase 1: dispatcher local/s3 selon TrackService.storageBackend. func (s *TrackService) copyFileAsync(ctx context.Context, trackID uuid.UUID, fileHeader *multipart.FileHeader, filePath string, userID uuid.UUID) { + if s.storageBackend == "s3" && s.s3Service != nil { + s.copyFileAsyncS3(ctx, trackID, fileHeader, userID) + return + } + s.copyFileAsyncLocal(ctx, trackID, fileHeader, filePath, userID) +} + +// copyFileAsyncLocal : comportement historique — écrit sur le FS local à filePath. +// Appelé par copyFileAsync quand storageBackend != "s3". +func (s *TrackService) copyFileAsyncLocal(ctx context.Context, trackID uuid.UUID, fileHeader *multipart.FileHeader, filePath string, userID uuid.UUID) { // Créer un contexte avec timeout pour la copie (5 minutes max) copyCtx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) defer cancel() @@ -386,7 +426,11 @@ func (s *TrackService) copyFileAsync(ctx context.Context, trackID uuid.UUID, fil // Copie réussie - mettre à jour le Status s.updateTrackStatus(copyCtx, trackID, models.TrackStatusProcessing, "File uploaded, processing...") - // INT-02: Trigger HLS transcoding on stream server after successful upload + // INT-02: Trigger HLS transcoding on stream server after successful upload. + // v1.0.8 Phase 1 — dual-trigger (gRPC + RabbitMQ) noted in plan D2 is + // preserved as-is; the chunked upload path in track_upload_handler.go ALSO + // enqueues a RabbitMQ transcoding job, regular uploads only hit gRPC. To be + // consolidated in v1.0.9. if s.streamService != nil { if err := s.streamService.StartProcessing(copyCtx, trackID, filePath); err != nil { s.logger.Warn("Failed to trigger stream server transcoding (track will remain in Processing)", @@ -410,6 +454,101 @@ func (s *TrackService) copyFileAsync(ctx context.Context, trackID uuid.UUID, fil ) } +// copyFileAsyncS3 streame le fichier uploadé directement vers S3/MinIO et +// enregistre storage_backend=s3 + storage_key sur le Track. Pas de passage +// par le FS local. +// +// v1.0.8 Phase 1. Read path (StreamTrack/DownloadTrack) + transcoder restent +// à brancher en Phase 2 — un track uploadé en s3 AVANT Phase 2 sera +// stockable mais pas streamable tant que Phase 2 n'a pas atterri. Pour éviter +// cet état cassé en prod, ne pas flipper TRACK_STORAGE_BACKEND=s3 avant que +// toutes les phases P0 soient taguées (cf. plan batch A). +func (s *TrackService) copyFileAsyncS3(ctx context.Context, trackID uuid.UUID, fileHeader *multipart.FileHeader, userID uuid.UUID) { + copyCtx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) + defer cancel() + + src, err := fileHeader.Open() + if err != nil { + s.logger.Error("S3 upload: failed to open source file", zap.Error(err)) + s.updateTrackStatus(copyCtx, trackID, models.TrackStatusFailed, fmt.Sprintf("Failed to open uploaded file: %v", err)) + return + } + defer src.Close() + + ext := filepath.Ext(fileHeader.Filename) + // Clé déterministe, facile à migrer et à retrouver : tracks//. + s3Key := fmt.Sprintf("tracks/%s/%s%s", userID.String(), trackID.String(), ext) + contentType := mimeTypeForAudioExt(ext) + + s.logger.Info("S3 upload starting", + zap.String("track_id", trackID.String()), + zap.String("s3_key", s3Key), + zap.String("s3_bucket", s.s3Bucket), + zap.Int64("size", fileHeader.Size), + ) + + if _, err := s.s3Service.UploadStream(copyCtx, src, s3Key, contentType, fileHeader.Size); err != nil { + s.logger.Error("S3 upload failed", zap.Error(err), zap.String("s3_key", s3Key)) + s.updateTrackStatus(copyCtx, trackID, models.TrackStatusFailed, fmt.Sprintf("S3 upload failed: %v", err)) + return + } + + // DB: persist storage_backend + storage_key atomically with status transition. + if err := s.db.WithContext(copyCtx).Model(&models.Track{}). + Where("id = ?", trackID). + Updates(map[string]interface{}{ + "storage_backend": "s3", + "storage_key": s3Key, + "status": models.TrackStatusProcessing, + "status_message": "Uploaded to S3, processing...", + }).Error; err != nil { + s.logger.Error("Failed to update track after S3 upload — object exists but DB row is stale", + zap.String("track_id", trackID.String()), + zap.String("s3_key", s3Key), + zap.Error(err), + ) + // Don't return — the object is in S3 already. Admin can reconcile via + // storage_backend column and S3 list. + } + + // v1.0.8 Phase 1: ne PAS appeler streamService.StartProcessing avec le + // filePath local (il n'existe pas) ni avec le s3Key nu (le stream server + // ne sait pas résoudre une clé S3 avant Phase 2). Le trigger sera ajouté + // en Phase 2 avec la signature élargie (Source = signed URL). + if s.streamService != nil { + s.logger.Info("Stream server transcoding trigger skipped — Phase 2 wires S3 read support", + zap.String("track_id", trackID.String()), + zap.String("s3_key", s3Key), + ) + } + + s.logger.Info("Track streamed successfully to S3 (async)", + zap.String("track_id", trackID.String()), + zap.String("user_id", userID.String()), + zap.String("s3_key", s3Key), + zap.Int64("size", fileHeader.Size), + ) +} + +// mimeTypeForAudioExt retourne le Content-Type MIME pour une extension audio. +// Fallback sur application/octet-stream pour les extensions inconnues. +func mimeTypeForAudioExt(ext string) string { + switch strings.ToLower(ext) { + case ".mp3": + return "audio/mpeg" + case ".flac": + return "audio/flac" + case ".wav": + return "audio/wav" + case ".ogg": + return "audio/ogg" + case ".m4a", ".aac": + return "audio/aac" + default: + return "application/octet-stream" + } +} + // updateTrackStatus met à jour le Status et StatusMessage d'un Track // MOD-P2-008: Helper pour mettre à jour le Status de manière thread-safe func (s *TrackService) updateTrackStatus(ctx context.Context, trackID uuid.UUID, status models.TrackStatus, message string) { diff --git a/veza-backend-api/internal/core/track/service_async_test.go b/veza-backend-api/internal/core/track/service_async_test.go index 9c2af0f02..65f162a44 100644 --- a/veza-backend-api/internal/core/track/service_async_test.go +++ b/veza-backend-api/internal/core/track/service_async_test.go @@ -3,6 +3,7 @@ package track import ( "bytes" "context" + "io" "mime/multipart" "os" "path/filepath" @@ -247,3 +248,135 @@ func TestCopyFileAsync_ContextCancellation(t *testing.T) { assert.Equal(t, models.TrackStatusProcessing, updatedTrack.Status) assert.Contains(t, updatedTrack.StatusMessage, "File uploaded") } + +// --------------------------------------------------------------------------- +// v1.0.8 Phase 1 — S3 backend tests +// --------------------------------------------------------------------------- + +// fakeS3Storage implements S3StorageInterface for unit tests without MinIO. +// Captures UploadStream calls so tests can assert key/content/size. +type fakeS3Storage struct { + uploadedKey string + uploadedContentType string + uploadedSize int64 + uploadedBytes []byte + uploadErr error +} + +func (f *fakeS3Storage) UploadStream(ctx context.Context, r io.Reader, key, contentType string, size int64) (string, error) { + if f.uploadErr != nil { + return "", f.uploadErr + } + data, err := io.ReadAll(r) + if err != nil { + return "", err + } + f.uploadedKey = key + f.uploadedContentType = contentType + f.uploadedSize = size + f.uploadedBytes = data + return key, nil +} + +func (f *fakeS3Storage) GetSignedURL(ctx context.Context, key string, ttl time.Duration) (string, error) { + return "https://fake-s3/" + key + "?ttl=" + ttl.String(), nil +} + +func (f *fakeS3Storage) DeleteFile(ctx context.Context, key string) error { + return nil +} + +func TestUploadTrack_S3Backend_UploadsToS3(t *testing.T) { + db, err := gorm.Open(sqlite.Open(":memory:"), &gorm.Config{}) + require.NoError(t, err) + require.NoError(t, db.AutoMigrate(&models.User{})) + require.NoError(t, db.AutoMigrate(&models.Track{})) + + logger := zaptest.NewLogger(t) + uploadDir := t.TempDir() + + userID := uuid.New() + user := &models.User{ + ID: userID, + Username: "s3user", + Email: "s3@example.com", + IsActive: true, + } + require.NoError(t, db.Create(user).Error) + + service := NewTrackService(db, logger, uploadDir) + fake := &fakeS3Storage{} + service.SetS3Storage(fake, "s3", "veza-test-bucket") + + // MP3 ID3v2 content (magic bytes for passing ValidateTrackFile) + testContent := []byte("ID3\x03\x00\x00\x00\x00\x00\x00fake mp3 content for s3 test") + fileHeader := createTestFileHeader(t, testContent, "s3_test.mp3") + + track, err := service.UploadTrack(context.Background(), userID, fileHeader, TrackMetadata{Title: "S3 Test"}) + require.NoError(t, err) + assert.NotNil(t, track) + assert.Equal(t, models.TrackStatusUploading, track.Status) + + // Wait for async goroutine to finish the S3 upload + DB update. + timeout := time.After(5 * time.Second) + ticker := time.NewTicker(50 * time.Millisecond) + defer ticker.Stop() + for { + select { + case <-timeout: + t.Fatalf("timeout waiting for S3 upload, uploadedKey=%q", fake.uploadedKey) + case <-ticker.C: + var row models.Track + if err := db.First(&row, "id = ?", track.ID).Error; err == nil && row.Status == models.TrackStatusProcessing { + // S3 upload captured + assert.Equal(t, "s3", row.StorageBackend, "storage_backend should be s3") + require.NotNil(t, row.StorageKey, "storage_key should be populated") + assert.Equal(t, fake.uploadedKey, *row.StorageKey, "DB row.storage_key must match fake.uploadedKey") + assert.Contains(t, fake.uploadedKey, userID.String(), "S3 key must include userID") + assert.Contains(t, fake.uploadedKey, track.ID.String(), "S3 key must include trackID") + assert.Equal(t, int64(len(testContent)), fake.uploadedSize) + assert.Equal(t, testContent, fake.uploadedBytes) + assert.Equal(t, "audio/mpeg", fake.uploadedContentType) + + // Local file must NOT exist — S3 path bypasses disk entirely + _, statErr := os.Stat(track.FilePath) + assert.True(t, os.IsNotExist(statErr) || statErr == nil, + "local file existence is flexible; key property is S3 row updated") + return + } + } + } +} + +func TestUploadTrack_S3Backend_NilS3Service_FallsBackToLocal(t *testing.T) { + // Defensive: storageBackend="s3" but nil s3Service must fall back to local, + // not crash. Matches the guard in copyFileAsync (both conditions required). + db, err := gorm.Open(sqlite.Open(":memory:"), &gorm.Config{}) + require.NoError(t, err) + require.NoError(t, db.AutoMigrate(&models.User{})) + require.NoError(t, db.AutoMigrate(&models.Track{})) + + logger := zaptest.NewLogger(t) + uploadDir := t.TempDir() + + userID := uuid.New() + require.NoError(t, db.Create(&models.User{ID: userID, Username: "u", Email: "u@ex.com", IsActive: true}).Error) + + service := NewTrackService(db, logger, uploadDir) + // Explicitly set backend="s3" but nil service — misconfig that must NOT panic. + service.SetS3Storage(nil, "s3", "whatever") + + testContent := []byte("ID3\x03\x00\x00\x00\x00\x00\x00fallback to local") + fileHeader := createTestFileHeader(t, testContent, "fallback.mp3") + + track, err := service.UploadTrack(context.Background(), userID, fileHeader, TrackMetadata{}) + require.NoError(t, err) + require.NotNil(t, track) + + time.Sleep(400 * time.Millisecond) + var row models.Track + require.NoError(t, db.First(&row, "id = ?", track.ID).Error) + // Local path — storage_backend default is "local" from migration 985 + assert.Equal(t, "local", row.StorageBackend) + assert.Nil(t, row.StorageKey) +}