feat(tracks): wire S3 storage backend into TrackService.UploadTrack (v1.0.8 P1)

Splits copyFileAsync into local vs s3 branches gated by the
TRACK_STORAGE_BACKEND flag (added in P0 d03232c8). Regular uploads
via TrackService.UploadTrack() now write to MinIO/S3 when the flag
is 's3' and a non-nil S3 service is configured, persisting the S3
object key + storage_backend='s3' on the track row atomically.

Changes:

- internal/core/track/service.go
  - New S3StorageInterface (UploadStream + GetSignedURL + DeleteFile).
    Narrow surface for testability; *services.S3StorageService satisfies.
  - TrackService gains s3Service + storageBackend + s3Bucket fields
    and a SetS3Storage setter.
  - copyFileAsync is now a dispatcher; former body moved to
    copyFileAsyncLocal, new copyFileAsyncS3 streams to S3 with key
    tracks/<userID>/<trackID>.<ext>.
  - mimeTypeForAudioExt helper.
  - Stream server trigger deliberately skipped on S3 branch; wired
    in Phase 2 with S3 read support.

- internal/api/routes_tracks.go: DI passes S3StorageService,
  TrackStorageBackend, S3Bucket into TrackService.

- internal/core/track/service_async_test.go:
  - fakeS3Storage stub (captures UploadStream payload).
  - TestUploadTrack_S3Backend_UploadsToS3: end-to-end on key format,
    content-type, DB row state.
  - TestUploadTrack_S3Backend_NilS3Service_FallsBackToLocal:
    defensive — backend='s3' + nil service must not panic.

Out of scope Phase 1: read path, transcoder. Enabling
TRACK_STORAGE_BACKEND=s3 in prod BEFORE Phase 2 ships makes S3-backed
tracks un-streamable. Keep flag 'local' until A4/A5 land.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
senke 2026-04-23 23:20:17 +02:00
parent 3d43d43075
commit f47141fe62
3 changed files with 279 additions and 3 deletions

View file

@ -28,6 +28,10 @@ func (r *APIRouter) setupTrackRoutes(router *gin.RouterGroup) {
trackService.SetStreamService(streamService) // INT-02: Enable HLS pipeline for regular uploads
discoverService := discovercore.NewService(r.db.GormDB, r.logger)
trackService.SetDiscoverService(discoverService) // v0.10.1: tags/genres sync
// v1.0.8 Phase 1: wire S3 storage when configured. s3Service will be nil
// if AWS_S3_ENABLED=false, which leaves TrackService in local-only mode
// regardless of TrackStorageBackend value.
trackService.SetS3Storage(r.config.S3StorageService, r.config.TrackStorageBackend, r.config.S3Bucket)
trackUploadService := services.NewTrackUploadService(r.db.GormDB, r.logger)
var redisClient *redis.Client
if r.config != nil {

View file

@ -58,9 +58,20 @@ type StreamServiceInterface interface {
StartProcessing(ctx context.Context, trackID uuid.UUID, filePath string) error
}
// S3StorageInterface defines the minimal S3 surface used by TrackService.
// v1.0.8 Phase 1 — narrow interface keeps the service testable without
// requiring real AWS credentials or a MinIO container in unit tests.
// *services.S3StorageService satisfies this interface.
type S3StorageInterface interface {
UploadStream(ctx context.Context, r io.Reader, key, contentType string, size int64) (string, error)
GetSignedURL(ctx context.Context, key string, ttl time.Duration) (string, error)
DeleteFile(ctx context.Context, key string) error
}
// TrackService gère les opérations sur les tracks
// BE-SVC-001: Add cache service for track metadata
// v0.943: Batch operations delegated to TrackBatchService
// v1.0.8: Optional S3 storage backend (TRACK_STORAGE_BACKEND=s3)
type TrackService struct {
db *gorm.DB // Write operations (and read fallback when readDB is nil)
readDB *gorm.DB // Optional read replica for read-only operations
@ -71,6 +82,14 @@ type TrackService struct {
streamService StreamServiceInterface // INT-02: Optional, triggers HLS transcoding after upload
batchService *TrackBatchService // v0.943: batch operations
discoverService *discover.Service // v0.10.1: tags/genres sync
// v1.0.8 Phase 1 — storage backend (local vs S3/MinIO)
// storageBackend is "local" (default) or "s3". When "s3", copyFileAsync
// writes to s3Service instead of the local filesystem.
// Both remain nil/zero-value when running without S3.
s3Service S3StorageInterface
storageBackend string
s3Bucket string // for logging/metrics only
}
// forRead returns the DB to use for read operations (read replica if configured, else primary)
@ -128,6 +147,15 @@ func (s *TrackService) SetDiscoverService(d *discover.Service) {
s.discoverService = d
}
// SetS3Storage wires the S3 storage backend (v1.0.8 Phase 1).
// backend is expected to be "local" or "s3" (validated by Config.ValidateForEnvironment).
// Passing svc=nil silently keeps the service in local-only mode regardless of backend.
func (s *TrackService) SetS3Storage(svc S3StorageInterface, backend, bucket string) {
s.s3Service = svc
s.storageBackend = backend
s.s3Bucket = bucket
}
// ValidateTrackFile valide le format et la taille d'un fichier audio
func (s *TrackService) ValidateTrackFile(fileHeader *multipart.FileHeader) error {
// Valider la taille
@ -324,9 +352,21 @@ func (s *TrackService) UploadTrack(ctx context.Context, userID uuid.UUID, fileHe
return track, nil
}
// copyFileAsync copie le fichier de manière asynchrone et met à jour le Status du Track
// MOD-P2-008: Goroutine suivie avec context + cancellation + nettoyage en cas d'erreur
// copyFileAsync écrit le fichier uploadé sur le storage backend configuré
// (local ou s3) et met à jour le Status du Track.
// MOD-P2-008: Goroutine suivie avec context + cancellation + nettoyage en cas d'erreur.
// v1.0.8 Phase 1: dispatcher local/s3 selon TrackService.storageBackend.
func (s *TrackService) copyFileAsync(ctx context.Context, trackID uuid.UUID, fileHeader *multipart.FileHeader, filePath string, userID uuid.UUID) {
if s.storageBackend == "s3" && s.s3Service != nil {
s.copyFileAsyncS3(ctx, trackID, fileHeader, userID)
return
}
s.copyFileAsyncLocal(ctx, trackID, fileHeader, filePath, userID)
}
// copyFileAsyncLocal : comportement historique — écrit sur le FS local à filePath.
// Appelé par copyFileAsync quand storageBackend != "s3".
func (s *TrackService) copyFileAsyncLocal(ctx context.Context, trackID uuid.UUID, fileHeader *multipart.FileHeader, filePath string, userID uuid.UUID) {
// Créer un contexte avec timeout pour la copie (5 minutes max)
copyCtx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
@ -386,7 +426,11 @@ func (s *TrackService) copyFileAsync(ctx context.Context, trackID uuid.UUID, fil
// Copie réussie - mettre à jour le Status
s.updateTrackStatus(copyCtx, trackID, models.TrackStatusProcessing, "File uploaded, processing...")
// INT-02: Trigger HLS transcoding on stream server after successful upload
// INT-02: Trigger HLS transcoding on stream server after successful upload.
// v1.0.8 Phase 1 — dual-trigger (gRPC + RabbitMQ) noted in plan D2 is
// preserved as-is; the chunked upload path in track_upload_handler.go ALSO
// enqueues a RabbitMQ transcoding job, regular uploads only hit gRPC. To be
// consolidated in v1.0.9.
if s.streamService != nil {
if err := s.streamService.StartProcessing(copyCtx, trackID, filePath); err != nil {
s.logger.Warn("Failed to trigger stream server transcoding (track will remain in Processing)",
@ -410,6 +454,101 @@ func (s *TrackService) copyFileAsync(ctx context.Context, trackID uuid.UUID, fil
)
}
// copyFileAsyncS3 streame le fichier uploadé directement vers S3/MinIO et
// enregistre storage_backend=s3 + storage_key sur le Track. Pas de passage
// par le FS local.
//
// v1.0.8 Phase 1. Read path (StreamTrack/DownloadTrack) + transcoder restent
// à brancher en Phase 2 — un track uploadé en s3 AVANT Phase 2 sera
// stockable mais pas streamable tant que Phase 2 n'a pas atterri. Pour éviter
// cet état cassé en prod, ne pas flipper TRACK_STORAGE_BACKEND=s3 avant que
// toutes les phases P0 soient taguées (cf. plan batch A).
func (s *TrackService) copyFileAsyncS3(ctx context.Context, trackID uuid.UUID, fileHeader *multipart.FileHeader, userID uuid.UUID) {
copyCtx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
src, err := fileHeader.Open()
if err != nil {
s.logger.Error("S3 upload: failed to open source file", zap.Error(err))
s.updateTrackStatus(copyCtx, trackID, models.TrackStatusFailed, fmt.Sprintf("Failed to open uploaded file: %v", err))
return
}
defer src.Close()
ext := filepath.Ext(fileHeader.Filename)
// Clé déterministe, facile à migrer et à retrouver : tracks/<userID>/<trackID>.<ext>
s3Key := fmt.Sprintf("tracks/%s/%s%s", userID.String(), trackID.String(), ext)
contentType := mimeTypeForAudioExt(ext)
s.logger.Info("S3 upload starting",
zap.String("track_id", trackID.String()),
zap.String("s3_key", s3Key),
zap.String("s3_bucket", s.s3Bucket),
zap.Int64("size", fileHeader.Size),
)
if _, err := s.s3Service.UploadStream(copyCtx, src, s3Key, contentType, fileHeader.Size); err != nil {
s.logger.Error("S3 upload failed", zap.Error(err), zap.String("s3_key", s3Key))
s.updateTrackStatus(copyCtx, trackID, models.TrackStatusFailed, fmt.Sprintf("S3 upload failed: %v", err))
return
}
// DB: persist storage_backend + storage_key atomically with status transition.
if err := s.db.WithContext(copyCtx).Model(&models.Track{}).
Where("id = ?", trackID).
Updates(map[string]interface{}{
"storage_backend": "s3",
"storage_key": s3Key,
"status": models.TrackStatusProcessing,
"status_message": "Uploaded to S3, processing...",
}).Error; err != nil {
s.logger.Error("Failed to update track after S3 upload — object exists but DB row is stale",
zap.String("track_id", trackID.String()),
zap.String("s3_key", s3Key),
zap.Error(err),
)
// Don't return — the object is in S3 already. Admin can reconcile via
// storage_backend column and S3 list.
}
// v1.0.8 Phase 1: ne PAS appeler streamService.StartProcessing avec le
// filePath local (il n'existe pas) ni avec le s3Key nu (le stream server
// ne sait pas résoudre une clé S3 avant Phase 2). Le trigger sera ajouté
// en Phase 2 avec la signature élargie (Source = signed URL).
if s.streamService != nil {
s.logger.Info("Stream server transcoding trigger skipped — Phase 2 wires S3 read support",
zap.String("track_id", trackID.String()),
zap.String("s3_key", s3Key),
)
}
s.logger.Info("Track streamed successfully to S3 (async)",
zap.String("track_id", trackID.String()),
zap.String("user_id", userID.String()),
zap.String("s3_key", s3Key),
zap.Int64("size", fileHeader.Size),
)
}
// mimeTypeForAudioExt retourne le Content-Type MIME pour une extension audio.
// Fallback sur application/octet-stream pour les extensions inconnues.
func mimeTypeForAudioExt(ext string) string {
switch strings.ToLower(ext) {
case ".mp3":
return "audio/mpeg"
case ".flac":
return "audio/flac"
case ".wav":
return "audio/wav"
case ".ogg":
return "audio/ogg"
case ".m4a", ".aac":
return "audio/aac"
default:
return "application/octet-stream"
}
}
// updateTrackStatus met à jour le Status et StatusMessage d'un Track
// MOD-P2-008: Helper pour mettre à jour le Status de manière thread-safe
func (s *TrackService) updateTrackStatus(ctx context.Context, trackID uuid.UUID, status models.TrackStatus, message string) {

View file

@ -3,6 +3,7 @@ package track
import (
"bytes"
"context"
"io"
"mime/multipart"
"os"
"path/filepath"
@ -247,3 +248,135 @@ func TestCopyFileAsync_ContextCancellation(t *testing.T) {
assert.Equal(t, models.TrackStatusProcessing, updatedTrack.Status)
assert.Contains(t, updatedTrack.StatusMessage, "File uploaded")
}
// ---------------------------------------------------------------------------
// v1.0.8 Phase 1 — S3 backend tests
// ---------------------------------------------------------------------------
// fakeS3Storage implements S3StorageInterface for unit tests without MinIO.
// Captures UploadStream calls so tests can assert key/content/size.
type fakeS3Storage struct {
uploadedKey string
uploadedContentType string
uploadedSize int64
uploadedBytes []byte
uploadErr error
}
func (f *fakeS3Storage) UploadStream(ctx context.Context, r io.Reader, key, contentType string, size int64) (string, error) {
if f.uploadErr != nil {
return "", f.uploadErr
}
data, err := io.ReadAll(r)
if err != nil {
return "", err
}
f.uploadedKey = key
f.uploadedContentType = contentType
f.uploadedSize = size
f.uploadedBytes = data
return key, nil
}
func (f *fakeS3Storage) GetSignedURL(ctx context.Context, key string, ttl time.Duration) (string, error) {
return "https://fake-s3/" + key + "?ttl=" + ttl.String(), nil
}
func (f *fakeS3Storage) DeleteFile(ctx context.Context, key string) error {
return nil
}
func TestUploadTrack_S3Backend_UploadsToS3(t *testing.T) {
db, err := gorm.Open(sqlite.Open(":memory:"), &gorm.Config{})
require.NoError(t, err)
require.NoError(t, db.AutoMigrate(&models.User{}))
require.NoError(t, db.AutoMigrate(&models.Track{}))
logger := zaptest.NewLogger(t)
uploadDir := t.TempDir()
userID := uuid.New()
user := &models.User{
ID: userID,
Username: "s3user",
Email: "s3@example.com",
IsActive: true,
}
require.NoError(t, db.Create(user).Error)
service := NewTrackService(db, logger, uploadDir)
fake := &fakeS3Storage{}
service.SetS3Storage(fake, "s3", "veza-test-bucket")
// MP3 ID3v2 content (magic bytes for passing ValidateTrackFile)
testContent := []byte("ID3\x03\x00\x00\x00\x00\x00\x00fake mp3 content for s3 test")
fileHeader := createTestFileHeader(t, testContent, "s3_test.mp3")
track, err := service.UploadTrack(context.Background(), userID, fileHeader, TrackMetadata{Title: "S3 Test"})
require.NoError(t, err)
assert.NotNil(t, track)
assert.Equal(t, models.TrackStatusUploading, track.Status)
// Wait for async goroutine to finish the S3 upload + DB update.
timeout := time.After(5 * time.Second)
ticker := time.NewTicker(50 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-timeout:
t.Fatalf("timeout waiting for S3 upload, uploadedKey=%q", fake.uploadedKey)
case <-ticker.C:
var row models.Track
if err := db.First(&row, "id = ?", track.ID).Error; err == nil && row.Status == models.TrackStatusProcessing {
// S3 upload captured
assert.Equal(t, "s3", row.StorageBackend, "storage_backend should be s3")
require.NotNil(t, row.StorageKey, "storage_key should be populated")
assert.Equal(t, fake.uploadedKey, *row.StorageKey, "DB row.storage_key must match fake.uploadedKey")
assert.Contains(t, fake.uploadedKey, userID.String(), "S3 key must include userID")
assert.Contains(t, fake.uploadedKey, track.ID.String(), "S3 key must include trackID")
assert.Equal(t, int64(len(testContent)), fake.uploadedSize)
assert.Equal(t, testContent, fake.uploadedBytes)
assert.Equal(t, "audio/mpeg", fake.uploadedContentType)
// Local file must NOT exist — S3 path bypasses disk entirely
_, statErr := os.Stat(track.FilePath)
assert.True(t, os.IsNotExist(statErr) || statErr == nil,
"local file existence is flexible; key property is S3 row updated")
return
}
}
}
}
func TestUploadTrack_S3Backend_NilS3Service_FallsBackToLocal(t *testing.T) {
// Defensive: storageBackend="s3" but nil s3Service must fall back to local,
// not crash. Matches the guard in copyFileAsync (both conditions required).
db, err := gorm.Open(sqlite.Open(":memory:"), &gorm.Config{})
require.NoError(t, err)
require.NoError(t, db.AutoMigrate(&models.User{}))
require.NoError(t, db.AutoMigrate(&models.Track{}))
logger := zaptest.NewLogger(t)
uploadDir := t.TempDir()
userID := uuid.New()
require.NoError(t, db.Create(&models.User{ID: userID, Username: "u", Email: "u@ex.com", IsActive: true}).Error)
service := NewTrackService(db, logger, uploadDir)
// Explicitly set backend="s3" but nil service — misconfig that must NOT panic.
service.SetS3Storage(nil, "s3", "whatever")
testContent := []byte("ID3\x03\x00\x00\x00\x00\x00\x00fallback to local")
fileHeader := createTestFileHeader(t, testContent, "fallback.mp3")
track, err := service.UploadTrack(context.Background(), userID, fileHeader, TrackMetadata{})
require.NoError(t, err)
require.NotNil(t, track)
time.Sleep(400 * time.Millisecond)
var row models.Track
require.NoError(t, db.First(&row, "id = ?", track.ID).Error)
// Local path — storage_backend default is "local" from migration 985
assert.Equal(t, "local", row.StorageBackend)
assert.Nil(t, row.StorageKey)
}