veza/veza-backend-api/internal/core/track/service.go
senke 15e591305e
Some checks failed
Veza CI / Rust (Stream Server) (push) Successful in 5m12s
Security Scan / Secret Scanning (gitleaks) (push) Failing after 54s
Veza CI / Backend (Go) (push) Failing after 8m38s
Veza CI / Frontend (Web) (push) Failing after 16m44s
Veza CI / Notify on failure (push) Successful in 15s
E2E Playwright / e2e (full) (push) Successful in 20m28s
feat(cdn): Bunny.net signed URLs + HLS cache headers + metric collision fix (W3 Day 13)
CDN edge in front of S3/MinIO via origin-pull. Backend signs URLs
with Bunny.net token-auth (SHA-256 over security_key + path + expires)
so edges verify before serving cached objects ; origin is never hit
on a valid token. Cloudflare CDN / R2 / CloudFront stubs kept.

- internal/services/cdn_service.go : new providers CDNProviderBunny +
  CDNProviderCloudflareR2. SecurityKey added to CDNConfig.
  generateBunnySignedURL implements the documented Bunny scheme
  (url-safe base64, no padding, expires query). HLSSegmentCacheHeaders
  + HLSPlaylistCacheHeaders helpers exported for handlers.
- internal/services/cdn_service_test.go : pin Bunny URL shape +
  base64-url charset ; assert empty SecurityKey fails fast (no
  silent fallback to unsigned URLs).
- internal/core/track/service.go : new CDNURLSigner interface +
  SetCDNService(cdn). GetStorageURL prefers CDN signed URL when
  cdnService.IsEnabled, falls back to direct S3 presign on signing
  error so a CDN partial outage doesn't block playback.
- internal/api/routes_tracks.go + routes_core.go : wire SetCDNService
  on the two TrackService construction sites that serve stream/download.
- internal/config/config.go : 4 new env vars (CDN_ENABLED, CDN_PROVIDER,
  CDN_BASE_URL, CDN_SECURITY_KEY). config.CDNService always non-nil
  after init ; IsEnabled gates the actual usage.
- internal/handlers/hls_handler.go : segments now return
  Cache-Control: public, max-age=86400, immutable (content-addressed
  filenames make this safe). Playlists at max-age=60.
- veza-backend-api/.env.template : 4 placeholder env vars.
- docs/ENV_VARIABLES.md §12 : provider matrix + Bunny vs Cloudflare
  vs R2 trade-offs.

Bug fix collateral : v1.0.9 Day 11 introduced veza_cache_hits_total
which collided in name with monitoring.CacheHitsTotal (different
label set ⇒ promauto MustRegister panic at process init). Day 13
deletes the monitoring duplicate and restores the metrics-package
counter as the single source of truth (label: subsystem). All 8
affected packages green : services, core/track, handlers, middleware,
websocket/chat, metrics, monitoring, config.

Acceptance (Day 13) : code path is wired ; verifying via real Bunny
edge requires a Pull Zone provisioned by the user (EX-? in roadmap).
On the user side : create Pull Zone w/ origin = MinIO, copy token
auth key into CDN_SECURITY_KEY, set CDN_ENABLED=true.

W3 progress : Redis Sentinel ✓ · MinIO distribué ✓ · CDN ✓ ·
DMCA  Day 14 · embed  Day 15.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-28 14:07:20 +02:00

1639 lines
58 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package track
import (
"context"
"encoding/base64"
"errors"
"fmt"
"io"
"mime/multipart"
"os"
"path/filepath"
"strconv"
"strings"
"time" // MOD-P2-008: Ajouté pour timeout asynchrone
"veza-backend-api/internal/config"
"veza-backend-api/internal/core/discover"
"veza-backend-api/internal/database"
"veza-backend-api/internal/models"
"veza-backend-api/internal/monitoring"
"veza-backend-api/internal/services"
"veza-backend-api/internal/types"
"github.com/google/uuid"
"go.uber.org/zap"
"gorm.io/gorm"
)
// Constantes pour les quotas utilisateur
const (
MaxTracksPerUser = 1000 // Nombre maximum de tracks par utilisateur
MaxStoragePerUser = 100 * 1024 * 1024 * 1024 // 100GB par utilisateur
)
// Types d'erreurs spécifiques pour les tracks
var (
// ErrInvalidTrackFormat est retourné quand le format du fichier est invalide
ErrInvalidTrackFormat = errors.New("invalid track format")
// ErrTrackTooLarge est retourné quand le fichier dépasse la taille maximale
ErrTrackTooLarge = errors.New("track file too large")
// ErrTrackQuotaExceeded est retourné quand l'utilisateur a atteint son quota de tracks
ErrTrackQuotaExceeded = errors.New("track quota exceeded")
// ErrStorageQuotaExceeded est retourné quand l'utilisateur a atteint son quota de stockage
ErrStorageQuotaExceeded = errors.New("storage quota exceeded")
// ErrTrackNotFound est retourné quand un track n'est pas trouvé
ErrTrackNotFound = errors.New("track not found")
// ErrNetworkError est retourné en cas d'erreur réseau (timeout, connexion)
ErrNetworkError = errors.New("network error")
// ErrStorageError est retourné en cas d'erreur de stockage
ErrStorageError = errors.New("storage error")
// ErrForbidden est retourné quand l'utilisateur n'a pas la permission d'effectuer l'action
ErrForbidden = errors.New("forbidden")
)
// StreamServiceInterface defines the minimal interface for triggering HLS transcoding on the stream server.
// INT-02: Used to call stream server /internal/jobs/transcode after track upload.
type StreamServiceInterface interface {
StartProcessing(ctx context.Context, trackID uuid.UUID, filePath string) error
}
// S3StorageInterface defines the minimal S3 surface used by TrackService.
// v1.0.8 Phase 1 — narrow interface keeps the service testable without
// requiring real AWS credentials or a MinIO container in unit tests.
// *services.S3StorageService satisfies this interface.
type S3StorageInterface interface {
UploadStream(ctx context.Context, r io.Reader, key, contentType string, size int64) (string, error)
GetSignedURL(ctx context.Context, key string, ttl time.Duration) (string, error)
DeleteFile(ctx context.Context, key string) error
}
// TrackService gère les opérations sur les tracks
// BE-SVC-001: Add cache service for track metadata
// v0.943: Batch operations delegated to TrackBatchService
// v1.0.8: Optional S3 storage backend (TRACK_STORAGE_BACKEND=s3)
type TrackService struct {
db *gorm.DB // Write operations (and read fallback when readDB is nil)
readDB *gorm.DB // Optional read replica for read-only operations
logger *zap.Logger
uploadDir string
maxFileSize int64
cacheService *services.CacheService
streamService StreamServiceInterface // INT-02: Optional, triggers HLS transcoding after upload
batchService *TrackBatchService // v0.943: batch operations
discoverService *discover.Service // v0.10.1: tags/genres sync
// v1.0.8 Phase 1 — storage backend (local vs S3/MinIO)
// storageBackend is "local" (default) or "s3". When "s3", copyFileAsync
// writes to s3Service instead of the local filesystem.
// Both remain nil/zero-value when running without S3.
s3Service S3StorageInterface
storageBackend string
s3Bucket string // for logging/metrics only
// v1.0.9 W3 Day 13 — optional CDN edge in front of S3/MinIO. When
// set + IsEnabled, GetStorageURL routes browsers to a CDN signed URL
// (origin-pull from MinIO) instead of presigning MinIO directly.
// nil ⇒ keep the existing s3Service.GetSignedURL fallback.
cdnService CDNURLSigner
}
// CDNURLSigner is the slice of services.CDNService that TrackService
// needs. Defined as an interface so tests can stub the CDN without
// pulling the full services package in. The shape mirrors
// services.CDNService.GenerateSignedURL + IsEnabled.
type CDNURLSigner interface {
GenerateSignedURL(path string, expiration time.Duration) (string, error)
IsEnabled() bool
}
// forRead returns the DB to use for read operations (read replica if configured, else primary)
func (s *TrackService) forRead() *gorm.DB {
if s.readDB != nil {
return s.readDB
}
return s.db
}
// NewTrackService crée un nouveau service de tracks
func NewTrackService(db *gorm.DB, logger *zap.Logger, uploadDir string) *TrackService {
if uploadDir == "" {
uploadDir = "uploads/tracks"
}
return &TrackService{
db: db,
readDB: nil,
logger: logger,
uploadDir: uploadDir,
maxFileSize: config.AudioLimit.Bytes(),
batchService: NewTrackBatchService(db, logger),
}
}
// NewTrackServiceWithDB crée un TrackService avec support read replica (utilise db.ForRead pour les lectures)
func NewTrackServiceWithDB(db *database.Database, logger *zap.Logger, uploadDir string) *TrackService {
if uploadDir == "" {
uploadDir = "uploads/tracks"
}
return &TrackService{
db: db.GormDB,
readDB: db.ForRead(),
logger: logger,
uploadDir: uploadDir,
maxFileSize: config.AudioLimit.Bytes(),
batchService: NewTrackBatchService(db.GormDB, logger),
}
}
// SetCacheService définit le service de cache pour TrackService
// BE-SVC-001: Implement caching layer for frequently accessed data
func (s *TrackService) SetCacheService(cacheService *services.CacheService) {
s.cacheService = cacheService
}
// SetStreamService définit le service de streaming pour déclencher le transcodage HLS après upload.
// INT-02: Enables HLS pipeline - stream server transcodes track after successful upload.
func (s *TrackService) SetStreamService(streamService StreamServiceInterface) {
s.streamService = streamService
}
// SetDiscoverService définit le service discover pour sync tags/genres (v0.10.1)
func (s *TrackService) SetDiscoverService(d *discover.Service) {
s.discoverService = d
}
// SetS3Storage wires the S3 storage backend (v1.0.8 Phase 1).
// backend is expected to be "local" or "s3" (validated by Config.ValidateForEnvironment).
// Passing svc=nil silently keeps the service in local-only mode regardless of backend.
func (s *TrackService) SetS3Storage(svc S3StorageInterface, backend, bucket string) {
s.s3Service = svc
s.storageBackend = backend
s.s3Bucket = bucket
}
// SetCDNService wires an optional CDN edge in front of S3/MinIO. When
// set, GetStorageURL prefers CDN signed URLs over direct S3 presigns
// for s3-backed tracks. Pass nil to disable. (v1.0.9 W3 Day 13.)
func (s *TrackService) SetCDNService(cdn CDNURLSigner) {
s.cdnService = cdn
}
// IsS3Backend returns true iff the service is configured to write new tracks
// to S3. Exposed for handlers that need to branch behavior after uploads
// (e.g., skip local-path-based stream server trigger).
// v1.0.8 Phase 1.
func (s *TrackService) IsS3Backend() bool {
return s.storageBackend == "s3" && s.s3Service != nil
}
// GetStorageURL returns a signed URL for a track's file when the track row
// carries storage_backend='s3'. Returns ("", false, nil) for local-backed
// tracks — the caller must fall back to filesystem serving.
//
// v1.0.8 Phase 2 — handlers (StreamTrack, DownloadTrack) use this to emit
// a 302 redirect to MinIO/S3 for tracks that were uploaded under s3 mode.
// TTL is caller-provided: 15min for streaming, 30min for downloads, 1h for
// the transcoder.
func (s *TrackService) GetStorageURL(ctx context.Context, track *models.Track, ttl time.Duration) (string, bool, error) {
if track == nil {
return "", false, fmt.Errorf("track is nil")
}
if track.StorageBackend != "s3" || track.StorageKey == nil || *track.StorageKey == "" {
return "", false, nil
}
if s.s3Service == nil {
// Row says s3 but no S3 service wired. Should be prevented by
// Config.ValidateForEnvironment rule 11, but guard here anyway.
return "", false, fmt.Errorf("track %s is s3-backed but TrackService has no S3 service configured", track.ID)
}
// v1.0.9 W3 Day 13 — prefer the CDN signed URL when wired. The CDN
// fronts S3/MinIO via origin-pull, so the path stays the same as the
// storage key. Falls back to direct S3 presign when CDN is disabled
// or signing fails (CDN partial outage shouldn't block playback).
if s.cdnService != nil && s.cdnService.IsEnabled() {
cdnURL, cdnErr := s.cdnService.GenerateSignedURL(*track.StorageKey, ttl)
if cdnErr == nil && cdnURL != "" {
return cdnURL, true, nil
}
// log but keep going — direct presign still works.
s.logger.Warn("CDN signing failed, falling back to direct S3 presign",
zap.String("track_id", track.ID.String()),
zap.Error(cdnErr))
}
url, err := s.s3Service.GetSignedURL(ctx, *track.StorageKey, ttl)
if err != nil {
return "", false, fmt.Errorf("generate signed URL for track %s: %w", track.ID, err)
}
return url, true, nil
}
// ValidateTrackFile valide le format et la taille d'un fichier audio
func (s *TrackService) ValidateTrackFile(fileHeader *multipart.FileHeader) error {
// Valider la taille
if fileHeader.Size > s.maxFileSize {
return fmt.Errorf("%w: file size exceeds maximum allowed size of %s", ErrTrackTooLarge, config.AudioLimit.HumanReadable())
}
if fileHeader.Size == 0 {
return fmt.Errorf("%w: file is empty", ErrInvalidTrackFormat)
}
// Valider l'extension
ext := strings.ToLower(filepath.Ext(fileHeader.Filename))
allowedExtensions := []string{".mp3", ".flac", ".wav", ".ogg", ".m4a", ".aac"}
isValidExt := false
for _, allowedExt := range allowedExtensions {
if ext == allowedExt {
isValidExt = true
break
}
}
if !isValidExt {
return fmt.Errorf("%w: invalid file format. Allowed formats: MP3, FLAC, WAV, OGG", ErrInvalidTrackFormat)
}
// Valider le type MIME en ouvrant le fichier
file, err := fileHeader.Open()
if err != nil {
// FIX #10: Logger l'erreur avec contexte
s.logger.Error("Failed to open file for validation",
zap.String("filename", fileHeader.Filename),
zap.Int64("size", fileHeader.Size),
zap.Error(err),
)
return fmt.Errorf("failed to open file: %w", err)
}
defer file.Close()
// Lire les premiers bytes pour vérifier le magic number
header := make([]byte, 12)
n, err := file.Read(header)
if err != nil && err != io.EOF {
// FIX #10: Logger l'erreur avec contexte
s.logger.Error("Failed to read file header for validation",
zap.String("filename", fileHeader.Filename),
zap.Error(err),
)
return fmt.Errorf("failed to read file header: %w", err)
}
if n < 4 {
return fmt.Errorf("file too small to validate")
}
// Vérifier les magic numbers pour les formats audio
isValidFormat := false
headerStr := string(header[:n])
// MP3: ID3v2 (starts with "ID3") or MPEG frame sync (0xFF 0xFB/E/F)
if strings.HasPrefix(headerStr, "ID3") || (header[0] == 0xFF && (header[1]&0xE0) == 0xE0) {
isValidFormat = true
}
// FLAC: "fLaC"
if strings.HasPrefix(headerStr, "fLaC") {
isValidFormat = true
}
// WAV: "RIFF" followed by "WAVE"
if strings.HasPrefix(headerStr, "RIFF") && len(headerStr) >= 12 && string(header[8:12]) == "WAVE" {
isValidFormat = true
}
// OGG: "OggS"
if strings.HasPrefix(headerStr, "OggS") {
isValidFormat = true
}
// M4A/AAC: "ftyp" avec "M4A" ou "mp4"
if strings.Contains(headerStr, "ftyp") && (strings.Contains(headerStr, "M4A") || strings.Contains(headerStr, "mp4")) {
isValidFormat = true
}
if !isValidFormat {
return fmt.Errorf("%w: invalid audio file format", ErrInvalidTrackFormat)
}
return nil
}
// TrackMetadata contient les métadonnées optionnelles pour un upload
type TrackMetadata struct {
Title string
Artist string
Album string
Genre string
Year int
IsPublic bool
}
// UploadTrack upload un fichier audio et crée un enregistrement Track en base
// MOD-P2-008: Implémentation asynchrone - crée le Track immédiatement et lance la copie en goroutine
// Retourne le Track avec Status=Uploading, la copie se fait en arrière-plan
func (s *TrackService) UploadTrack(ctx context.Context, userID uuid.UUID, fileHeader *multipart.FileHeader, metadata TrackMetadata) (*models.Track, error) {
// Vérifier le quota utilisateur
if err := s.CheckUserQuota(ctx, userID, fileHeader.Size); err != nil {
// FIX #10: Logger l'erreur avec contexte
s.logger.Warn("User quota check failed",
zap.String("user_id", userID.String()),
zap.Int64("file_size", fileHeader.Size),
zap.String("filename", fileHeader.Filename),
zap.Error(err),
)
return nil, err
}
// Valider le fichier
if err := s.ValidateTrackFile(fileHeader); err != nil {
// FIX #10: Logger l'erreur avec contexte
s.logger.Warn("Track file validation failed",
zap.String("user_id", userID.String()),
zap.String("filename", fileHeader.Filename),
zap.Int64("file_size", fileHeader.Size),
zap.Error(err),
)
return nil, err
}
// Créer le répertoire d'upload s'il n'existe pas
s.logger.Debug("Checking upload directory", zap.String("upload_dir", s.uploadDir))
if err := os.MkdirAll(s.uploadDir, 0755); err != nil {
s.logger.Error("Failed to create upload directory", zap.String("upload_dir", s.uploadDir), zap.Error(err))
return nil, fmt.Errorf("%w: failed to create upload directory: %w", ErrStorageError, err)
}
s.logger.Debug("Upload directory created/verified", zap.String("upload_dir", s.uploadDir))
// Générer un nom de fichier unique
timestamp := uuid.New()
ext := filepath.Ext(fileHeader.Filename)
filename := fmt.Sprintf("%s_%s%s", userID.String(), timestamp.String(), ext) // Fixed format to use strings for UUID
filePath := filepath.Join(s.uploadDir, filename)
s.logger.Debug("Upload destination path", zap.String("file_path", filePath))
// Déterminer le format depuis l'extension
format := strings.TrimPrefix(strings.ToUpper(ext), ".")
if format == "M4A" {
format = "AAC"
}
// Déterminer le titre (métadonnée ou nom de fichier)
title := metadata.Title
if title == "" {
title = strings.TrimSuffix(fileHeader.Filename, ext)
}
// MOD-P2-008: Créer l'enregistrement Track en base AVANT la copie (sémantique asynchrone)
// Le fichier n'existe pas encore, mais on crée l'enregistrement pour traçabilité
// FileID est NULL temporairement (sera mis à jour après création du fichier)
track := &models.Track{
UserID: userID,
FileID: nil, // NULL temporairement - sera mis à jour après création fichier
Title: title,
Artist: metadata.Artist,
Album: metadata.Album,
Genre: metadata.Genre,
Year: metadata.Year,
FilePath: filePath,
FileSize: fileHeader.Size,
Format: format,
Duration: 0, // Sera mis à jour lors du traitement asynchrone
IsPublic: metadata.IsPublic,
Status: models.TrackStatusUploading,
StatusMessage: "Upload started",
}
if err := s.db.WithContext(ctx).Create(track).Error; err != nil {
s.logger.Error("Failed to create track record", zap.Error(err))
return nil, fmt.Errorf("failed to create track record: %w", err)
}
s.logger.Debug("Track record created in DB", zap.String("track_id", track.ID.String()))
// MOD-P2-008: Lancer la copie fichier en goroutine avec suivi (context + cancellation)
// La goroutine mettra à jour le Status quand terminé
s.logger.Debug("Starting async file copy")
go s.copyFileAsync(ctx, track.ID, fileHeader, filePath, userID)
// MOD-P2-003: Enregistrer la métrique business
monitoring.RecordTrackUploaded()
s.logger.Info("Track upload initiated (async)",
zap.String("track_id", track.ID.String()),
zap.String("user_id", userID.String()),
zap.String("filename", filename),
zap.Int64("file_size", fileHeader.Size),
)
return track, nil
}
// copyFileAsync écrit le fichier uploadé sur le storage backend configuré
// (local ou s3) et met à jour le Status du Track.
// MOD-P2-008: Goroutine suivie avec context + cancellation + nettoyage en cas d'erreur.
// v1.0.8 Phase 1: dispatcher local/s3 selon TrackService.storageBackend.
func (s *TrackService) copyFileAsync(ctx context.Context, trackID uuid.UUID, fileHeader *multipart.FileHeader, filePath string, userID uuid.UUID) {
if s.storageBackend == "s3" && s.s3Service != nil {
s.copyFileAsyncS3(ctx, trackID, fileHeader, userID)
return
}
s.copyFileAsyncLocal(ctx, trackID, fileHeader, filePath, userID)
}
// copyFileAsyncLocal : comportement historique — écrit sur le FS local à filePath.
// Appelé par copyFileAsync quand storageBackend != "s3".
func (s *TrackService) copyFileAsyncLocal(ctx context.Context, trackID uuid.UUID, fileHeader *multipart.FileHeader, filePath string, userID uuid.UUID) {
// Créer un contexte avec timeout pour la copie (5 minutes max)
copyCtx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
// Ouvrir le fichier source
s.logger.Debug("Opening source file for async copy")
src, err := fileHeader.Open()
if err != nil {
s.logger.Error("Failed to open source file for async copy", zap.Error(err))
s.updateTrackStatus(copyCtx, trackID, models.TrackStatusFailed, fmt.Sprintf("Failed to open uploaded file: %v", err))
s.cleanupFailedUpload(filePath, trackID, "failed to open source file")
return
}
defer src.Close()
s.logger.Debug("Source file opened for async copy")
// Créer le fichier de destination
s.logger.Debug("Creating destination file", zap.String("file_path", filePath))
dst, err := os.Create(filePath)
if err != nil {
s.logger.Error("Failed to create destination file", zap.String("file_path", filePath), zap.Error(err))
s.updateTrackStatus(copyCtx, trackID, models.TrackStatusFailed, fmt.Sprintf("Failed to create destination file: %v", err))
s.cleanupFailedUpload(filePath, trackID, "failed to create destination file")
return
}
defer dst.Close()
s.logger.Debug("Destination file created")
// Copier le fichier avec gestion d'erreurs
s.logger.Debug("Starting file copy")
bytesWritten, err := io.Copy(dst, src)
if err != nil {
s.logger.Error("File copy failed", zap.Error(err))
s.updateTrackStatus(copyCtx, trackID, models.TrackStatusFailed, fmt.Sprintf("Failed to save file: %v", err))
s.cleanupFailedUpload(filePath, trackID, fmt.Sprintf("copy failed: %v", err))
return
}
s.logger.Debug("File copied successfully", zap.Int64("bytes_written", bytesWritten))
// Vérifier si le contexte a été annulé
select {
case <-copyCtx.Done():
s.updateTrackStatus(copyCtx, trackID, models.TrackStatusFailed, fmt.Sprintf("Upload cancelled: %v", copyCtx.Err()))
s.cleanupFailedUpload(filePath, trackID, "upload cancelled")
return
default:
// Continuer
}
// Vérifier que tous les bytes ont été copiés
if bytesWritten != fileHeader.Size {
s.updateTrackStatus(copyCtx, trackID, models.TrackStatusFailed, fmt.Sprintf("Incomplete copy: %d/%d bytes", bytesWritten, fileHeader.Size))
s.cleanupFailedUpload(filePath, trackID, fmt.Sprintf("incomplete copy: %d/%d bytes", bytesWritten, fileHeader.Size))
return
}
// Copie réussie - mettre à jour le Status
s.updateTrackStatus(copyCtx, trackID, models.TrackStatusProcessing, "File uploaded, processing...")
// INT-02: Trigger HLS transcoding on stream server after successful upload.
// v1.0.8 Phase 1 — dual-trigger (gRPC + RabbitMQ) noted in plan D2 is
// preserved as-is; the chunked upload path in track_upload_handler.go ALSO
// enqueues a RabbitMQ transcoding job, regular uploads only hit gRPC. To be
// consolidated in v1.0.9.
if s.streamService != nil {
if err := s.streamService.StartProcessing(copyCtx, trackID, filePath); err != nil {
s.logger.Warn("Failed to trigger stream server transcoding (track will remain in Processing)",
zap.String("track_id", trackID.String()),
zap.String("file_path", filePath),
zap.Error(err),
)
} else {
s.logger.Info("Stream server transcoding triggered",
zap.String("track_id", trackID.String()),
zap.String("file_path", filePath),
)
}
}
s.logger.Info("Track file copied successfully (async)",
zap.String("track_id", trackID.String()),
zap.String("user_id", userID.String()),
zap.Int64("bytes_written", bytesWritten),
zap.String("file_path", filePath),
)
}
// copyFileAsyncS3 streame le fichier uploadé directement vers S3/MinIO et
// enregistre storage_backend=s3 + storage_key sur le Track. Pas de passage
// par le FS local.
//
// v1.0.8 Phase 1. Read path (StreamTrack/DownloadTrack) + transcoder restent
// à brancher en Phase 2 — un track uploadé en s3 AVANT Phase 2 sera
// stockable mais pas streamable tant que Phase 2 n'a pas atterri. Pour éviter
// cet état cassé en prod, ne pas flipper TRACK_STORAGE_BACKEND=s3 avant que
// toutes les phases P0 soient taguées (cf. plan batch A).
func (s *TrackService) copyFileAsyncS3(ctx context.Context, trackID uuid.UUID, fileHeader *multipart.FileHeader, userID uuid.UUID) {
copyCtx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
src, err := fileHeader.Open()
if err != nil {
s.logger.Error("S3 upload: failed to open source file", zap.Error(err))
s.updateTrackStatus(copyCtx, trackID, models.TrackStatusFailed, fmt.Sprintf("Failed to open uploaded file: %v", err))
return
}
defer src.Close()
ext := filepath.Ext(fileHeader.Filename)
// Clé déterministe, facile à migrer et à retrouver : tracks/<userID>/<trackID>.<ext>
s3Key := fmt.Sprintf("tracks/%s/%s%s", userID.String(), trackID.String(), ext)
contentType := mimeTypeForAudioExt(ext)
s.logger.Info("S3 upload starting",
zap.String("track_id", trackID.String()),
zap.String("s3_key", s3Key),
zap.String("s3_bucket", s.s3Bucket),
zap.Int64("size", fileHeader.Size),
)
if _, err := s.s3Service.UploadStream(copyCtx, src, s3Key, contentType, fileHeader.Size); err != nil {
s.logger.Error("S3 upload failed", zap.Error(err), zap.String("s3_key", s3Key))
s.updateTrackStatus(copyCtx, trackID, models.TrackStatusFailed, fmt.Sprintf("S3 upload failed: %v", err))
return
}
// DB: persist storage_backend + storage_key atomically with status transition.
if err := s.db.WithContext(copyCtx).Model(&models.Track{}).
Where("id = ?", trackID).
Updates(map[string]interface{}{
"storage_backend": "s3",
"storage_key": s3Key,
"status": models.TrackStatusProcessing,
"status_message": "Uploaded to S3, processing...",
}).Error; err != nil {
s.logger.Error("Failed to update track after S3 upload — object exists but DB row is stale",
zap.String("track_id", trackID.String()),
zap.String("s3_key", s3Key),
zap.Error(err),
)
// Don't return — the object is in S3 already. Admin can reconcile via
// storage_backend column and S3 list.
}
// v1.0.8 Phase 2 — trigger stream server transcoding with a signed URL
// so the Rust-side ffmpeg can pull the source directly from MinIO. TTL
// 1h is a soft cap : transcode usually completes in <10min.
if s.streamService != nil {
signedURL, err := s.s3Service.GetSignedURL(copyCtx, s3Key, time.Hour)
if err != nil {
s.logger.Warn("Failed to generate signed URL for stream trigger (track stays Processing until re-tried)",
zap.String("track_id", trackID.String()),
zap.String("s3_key", s3Key),
zap.Error(err),
)
} else if err := s.streamService.StartProcessing(copyCtx, trackID, signedURL); err != nil {
s.logger.Warn("Stream server transcoding trigger failed (track stays Processing until re-tried)",
zap.String("track_id", trackID.String()),
zap.Error(err),
)
} else {
s.logger.Info("Stream server transcoding triggered with S3 signed URL",
zap.String("track_id", trackID.String()),
zap.String("s3_key", s3Key),
)
}
}
s.logger.Info("Track streamed successfully to S3 (async)",
zap.String("track_id", trackID.String()),
zap.String("user_id", userID.String()),
zap.String("s3_key", s3Key),
zap.Int64("size", fileHeader.Size),
)
}
// chunkStreamer is the narrow interface CreateTrackFromChunkedUploadToS3
// needs from a chunk service. *services.TrackChunkService satisfies it
// via its v1.0.9 StreamChunkedUpload method (chunks → io.Writer).
//
// Kept package-local to avoid a circular import with internal/services.
type chunkStreamer interface {
StreamChunkedUpload(ctx context.Context, uploadID string, dst io.Writer) (filename string, totalSize int64, checksum string, err error)
}
// CreateTrackFromChunkedUploadToS3 streame les chunks d'un upload en cours
// directement vers S3 multipart (sans matérialiser le fichier assemblé sur
// disque local), puis crée la ligne Track avec storage_backend=s3.
//
// v1.0.9 item 1.5 — fast path pour le backend S3. Le path historique
// (CompleteChunkedUpload → fichier local → MigrateLocalToS3IfConfigured)
// reste utilisé pour storageBackend="local" (et conservé en fallback si
// s3Service est nil).
//
// Tuyauterie : un io.Pipe relie le côté write (alimenté par
// `chunks.StreamChunkedUpload` dans une goroutine) au côté read
// (consommé par `s3Service.UploadStream`, qui lui-même délègue à
// manager.Uploader pour faire du multipart 10 MB × 3 goroutines en
// streaming). Aucun buffer intermédiaire, aucun fichier assemblé sur
// disque local.
//
// Sur erreur de la goroutine d'assemblage, le pipe est fermé avec cette
// erreur ; UploadStream la voit et abandonne le multipart upload côté
// AWS SDK (cleanup automatique des parts uploadées). Sur erreur S3, la
// goroutine est notifiée par PipeWriter.Close, le state Redis et les
// chunks locaux NE sont PAS supprimés (préservation pour reprise).
func (s *TrackService) CreateTrackFromChunkedUploadToS3(
ctx context.Context,
chunks chunkStreamer,
userID uuid.UUID,
uploadID string,
contentType string,
expectedSize int64,
) (track *models.Track, checksumOut string, err error) {
if s.storageBackend != "s3" || s.s3Service == nil {
return nil, "", fmt.Errorf("s3 backend not configured")
}
if chunks == nil {
return nil, "", fmt.Errorf("chunk streamer is required")
}
trackID := uuid.New()
// We need the filename/extension before knowing where to put the S3 key.
// StreamChunkedUpload returns it after streaming, but we need the key
// up-front for the S3 PUT. Encode the trackID in the key and let the
// extension follow once known via a rename — except S3 doesn't have
// rename, only copy+delete. Cheaper to derive the extension from the
// caller (handler already has uploadInfo.Filename via GetUploadInfo).
//
// Pragmatic compromise: caller passes contentType, we derive ext from it
// for the S3 key. Falls back to .bin if unknown. The Track row's
// .Format field is set by the caller after.
ext := extFromContentType(contentType)
s3Key := fmt.Sprintf("tracks/%s/%s%s", userID.String(), trackID.String(), ext)
pr, pw := io.Pipe()
type assembleResult struct {
filename string
totalSize int64
checksum string
err error
}
resultCh := make(chan assembleResult, 1)
go func() {
filename, totalSize, checksum, streamErr := chunks.StreamChunkedUpload(ctx, uploadID, pw)
// Closing the pipe with the streaming error makes UploadStream see
// it on its next Read and abort the multipart upload. CloseWithError
// is safe even on the success path (nil error) — the read side
// receives io.EOF as expected.
_ = pw.CloseWithError(streamErr)
resultCh <- assembleResult{filename: filename, totalSize: totalSize, checksum: checksum, err: streamErr}
}()
// expectedSize lets the SDK avoid a "stream of unknown length" path
// (which forces full buffering of each part); we trust the Redis
// state's TotalSize (validated by StreamChunkedUpload before ack).
if _, err := s.s3Service.UploadStream(ctx, pr, s3Key, contentType, expectedSize); err != nil {
_ = pr.CloseWithError(err)
<-resultCh // drain so the goroutine exits
return nil, "", fmt.Errorf("s3 multipart upload failed: %w", err)
}
res := <-resultCh
if res.err != nil {
// S3 may have a complete object at this point, but the assembled
// stream was wrong (size mismatch, missing chunk file). Delete the
// orphan to avoid charging for storage of a corrupt object.
if delErr := s.s3Service.DeleteFile(ctx, s3Key); delErr != nil {
s.logger.Warn("Failed to delete orphan S3 object after stream error",
zap.String("s3_key", s3Key),
zap.Error(delErr),
)
}
return nil, "", fmt.Errorf("chunk assembly failed: %w", res.err)
}
title := strings.TrimSuffix(res.filename, filepath.Ext(res.filename))
format := strings.TrimPrefix(strings.ToUpper(filepath.Ext(res.filename)), ".")
if format == "M4A" {
format = "AAC"
}
track = &models.Track{
ID: trackID,
UserID: userID,
Title: title,
FilePath: "", // legacy column, unused for S3-backed rows
FileSize: res.totalSize,
Format: format,
Duration: 0,
IsPublic: true,
Status: models.TrackStatusUploading,
StatusMessage: fmt.Sprintf("Upload completed (S3), checksum: %s", res.checksum),
StorageBackend: "s3",
StorageKey: &s3Key,
}
if err := s.db.WithContext(ctx).Create(track).Error; err != nil {
// DB row creation failed but the S3 object exists — clean up to
// avoid an orphan that no row points at.
if delErr := s.s3Service.DeleteFile(ctx, s3Key); delErr != nil {
s.logger.Error("Failed to delete S3 object after DB row creation failed",
zap.String("s3_key", s3Key),
zap.Error(delErr),
)
}
return nil, "", fmt.Errorf("failed to create track record: %w", err)
}
s.logger.Info("Track created via direct chunked-S3 streaming",
zap.String("track_id", track.ID.String()),
zap.String("user_id", userID.String()),
zap.String("s3_key", s3Key),
zap.Int64("size", res.totalSize),
zap.String("checksum", res.checksum),
)
return track, res.checksum, nil
}
// extFromContentType returns a sensible file extension (incl. leading
// dot) for the audio MIME types we accept. Used to build S3 object keys
// when assembling chunks straight to S3 (v1.0.9 item 1.5) — the chunk
// service knows the original filename only AFTER streaming completes,
// but the S3 key is needed BEFORE.
func extFromContentType(contentType string) string {
switch strings.ToLower(contentType) {
case "audio/mpeg":
return ".mp3"
case "audio/flac":
return ".flac"
case "audio/wav":
return ".wav"
case "audio/ogg":
return ".ogg"
case "audio/aac", "audio/mp4", "audio/x-m4a":
return ".m4a"
case "audio/aiff", "audio/x-aiff":
return ".aiff"
default:
return ".bin"
}
}
// MigrateLocalToS3IfConfigured upload un fichier déjà assemblé sur le FS local
// vers S3 si storageBackend="s3". No-op sinon.
//
// v1.0.8 Phase 1 — utilisé par le path chunked upload (après assemblage local
// + CreateTrackFromPath). En post-assemblage on a un gros fichier local qu'on
// stream vers S3 et on efface le local. Si quelque chose échoue, on log mais
// ne retourne pas d'erreur au handler : la track existe déjà (storage_backend
// reste "local"), l'admin peut réessayer via `cmd/migrate_storage` (Phase 3).
func (s *TrackService) MigrateLocalToS3IfConfigured(ctx context.Context, trackID, userID uuid.UUID, localPath string) error {
if s.storageBackend != "s3" || s.s3Service == nil {
return nil
}
file, err := os.Open(localPath)
if err != nil {
return fmt.Errorf("open assembled file: %w", err)
}
defer file.Close()
stat, err := file.Stat()
if err != nil {
return fmt.Errorf("stat assembled file: %w", err)
}
ext := filepath.Ext(localPath)
s3Key := fmt.Sprintf("tracks/%s/%s%s", userID.String(), trackID.String(), ext)
contentType := mimeTypeForAudioExt(ext)
if _, err := s.s3Service.UploadStream(ctx, file, s3Key, contentType, stat.Size()); err != nil {
return fmt.Errorf("s3 upload: %w", err)
}
if err := s.db.WithContext(ctx).Model(&models.Track{}).
Where("id = ?", trackID).
Updates(map[string]interface{}{
"storage_backend": "s3",
"storage_key": s3Key,
}).Error; err != nil {
// Object is in S3 but DB row still says local. Dangerous (reader will
// try local FS path). Caller should treat as fatal for this request.
return fmt.Errorf("s3 upload succeeded but DB update failed: %w", err)
}
if err := os.Remove(localPath); err != nil {
s.logger.Warn("S3 migration OK but local cleanup failed (dangling file)",
zap.String("track_id", trackID.String()),
zap.String("path", localPath),
zap.Error(err),
)
// Non-fatal — object in S3, DB correct, local file is orphaned debris.
}
s.logger.Info("Assembled chunk upload migrated to S3",
zap.String("track_id", trackID.String()),
zap.String("s3_key", s3Key),
zap.Int64("size", stat.Size()),
)
return nil
}
// mimeTypeForAudioExt retourne le Content-Type MIME pour une extension audio.
// Fallback sur application/octet-stream pour les extensions inconnues.
func mimeTypeForAudioExt(ext string) string {
switch strings.ToLower(ext) {
case ".mp3":
return "audio/mpeg"
case ".flac":
return "audio/flac"
case ".wav":
return "audio/wav"
case ".ogg":
return "audio/ogg"
case ".m4a", ".aac":
return "audio/aac"
default:
return "application/octet-stream"
}
}
// updateTrackStatus met à jour le Status et StatusMessage d'un Track
// MOD-P2-008: Helper pour mettre à jour le Status de manière thread-safe
func (s *TrackService) updateTrackStatus(ctx context.Context, trackID uuid.UUID, status models.TrackStatus, message string) {
if err := s.db.WithContext(ctx).Model(&models.Track{}).
Where("id = ?", trackID).
Updates(map[string]interface{}{
"status": status,
"status_message": message,
}).Error; err != nil {
s.logger.Error("Failed to update track status",
zap.String("track_id", trackID.String()),
zap.String("status", string(status)),
zap.String("message", message),
zap.Error(err),
)
} else {
s.logger.Info("Track status updated",
zap.String("track_id", trackID.String()),
zap.String("status", string(status)),
zap.String("message", message),
)
}
}
// cleanupFailedUpload nettoie le fichier et le Track en cas d'échec
// MOD-P2-008: Nettoyage automatique en cas d'erreur
func (s *TrackService) cleanupFailedUpload(filePath string, trackID uuid.UUID, reason string) {
// Supprimer le fichier s'il existe
if err := os.Remove(filePath); err != nil && !os.IsNotExist(err) {
s.logger.Warn("Failed to cleanup file after upload failure",
zap.String("file_path", filePath),
zap.String("track_id", trackID.String()),
zap.String("reason", reason),
zap.Error(err),
)
}
s.logger.Info("Cleaned up failed upload",
zap.String("track_id", trackID.String()),
zap.String("file_path", filePath),
zap.String("reason", reason),
)
}
// CreateTrackFromPath crée un track à partir d'un fichier déjà sauvegardé
func (s *TrackService) CreateTrackFromPath(ctx context.Context, userID uuid.UUID, filePath, filename string, fileSize int64, format string) (*models.Track, error) {
ext := filepath.Ext(filename)
title := strings.TrimSuffix(filename, ext)
track := &models.Track{
UserID: userID,
Title: title,
FilePath: filePath,
FileSize: fileSize,
Format: format,
Duration: 0, // Sera mis à jour lors du traitement asynchrone
IsPublic: true,
Status: models.TrackStatusUploading,
StatusMessage: "Upload completed",
}
if err := s.db.WithContext(ctx).Create(track).Error; err != nil {
return nil, fmt.Errorf("failed to create track record: %w", err)
}
s.logger.Info("Track created from path",
zap.String("track_id", track.ID.String()),
zap.String("user_id", userID.String()),
zap.String("file_path", filePath),
zap.Int64("file_size", fileSize),
)
return track, nil
}
// UserQuota représente les informations de quota d'un utilisateur
type UserQuota struct {
TracksCount int64 `json:"tracks_count"`
TracksLimit int64 `json:"tracks_limit"`
StorageUsed int64 `json:"storage_used"` // bytes
StorageLimit int64 `json:"storage_limit"` // bytes
}
// CheckUserQuota vérifie si l'utilisateur peut uploader un fichier selon son quota
func (s *TrackService) CheckUserQuota(ctx context.Context, userID uuid.UUID, fileSize int64) error {
var trackCount int64
// MOD-P2-008: Utiliser creator_id (nom de colonne réel) au lieu de user_id
if err := s.db.WithContext(ctx).Model(&models.Track{}).Where("creator_id = ?", userID).Count(&trackCount).Error; err != nil {
// FIX #10: Logger l'erreur avec contexte
s.logger.Error("Failed to check track count for quota",
zap.String("user_id", userID.String()),
zap.Error(err),
)
return fmt.Errorf("failed to check track count: %w", err)
}
if trackCount >= MaxTracksPerUser {
s.logger.Warn("Track quota exceeded",
zap.String("user_id", userID.String()),
zap.Int64("track_count", trackCount),
zap.Int64("max_tracks", MaxTracksPerUser),
)
return ErrTrackQuotaExceeded
}
var totalSize int64
if err := s.db.WithContext(ctx).Model(&models.Track{}).
Where("creator_id = ?", userID).
Select("COALESCE(SUM(file_size), 0)").
Scan(&totalSize).Error; err != nil {
// FIX #10: Logger l'erreur avec contexte
s.logger.Error("Failed to check storage usage for quota",
zap.String("user_id", userID.String()),
zap.Error(err),
)
return fmt.Errorf("failed to check storage usage: %w", err)
}
if totalSize+fileSize > MaxStoragePerUser {
s.logger.Warn("Storage quota exceeded",
zap.String("user_id", userID.String()),
zap.Int64("total_size", totalSize),
zap.Int64("file_size", fileSize),
zap.Int64("max_storage", MaxStoragePerUser),
)
return ErrStorageQuotaExceeded
}
return nil
}
// GetUserQuota récupère les informations de quota d'un utilisateur
func (s *TrackService) GetUserQuota(ctx context.Context, userID uuid.UUID) (*UserQuota, error) {
var trackCount int64
if err := s.db.WithContext(ctx).Model(&models.Track{}).Where("creator_id = ?", userID).Count(&trackCount).Error; err != nil {
return nil, fmt.Errorf("failed to get track count: %w", err)
}
var totalSize int64
if err := s.db.WithContext(ctx).Model(&models.Track{}).
Where("creator_id = ?", userID).
Select("COALESCE(SUM(file_size), 0)").
Scan(&totalSize).Error; err != nil {
return nil, fmt.Errorf("failed to get storage usage: %w", err)
}
return &UserQuota{
TracksCount: trackCount,
TracksLimit: MaxTracksPerUser,
StorageUsed: totalSize,
StorageLimit: MaxStoragePerUser,
}, nil
}
// TrackListParams représente les paramètres de filtrage et pagination pour la liste des tracks
type TrackListParams struct {
Page int
Limit int
Cursor string // v0.931: opaque cursor for keyset pagination (base64)
UserID *uuid.UUID
Genre *string
Format *string
SortBy string // "created_at", "title", "popularity"
SortOrder string // "asc", "desc"
}
// TrackListResult holds list result with optional next cursor
type TrackListResult struct {
Tracks []*models.Track
Total int64
NextCursor string
}
// ListTracks récupère une liste de tracks avec pagination, filtres et tri
func (s *TrackService) ListTracks(ctx context.Context, params TrackListParams) ([]*models.Track, int64, error) {
// Créer la requête de base avec filtre sur le statut (read replica si configuré)
query := s.forRead().WithContext(ctx).Model(&models.Track{}).Where("status = ?", models.TrackStatusCompleted)
// Appliquer les filtres
if params.UserID != nil {
query = query.Where("creator_id = ?", *params.UserID)
}
if params.Genre != nil && *params.Genre != "" {
query = query.Where("genre = ?", *params.Genre)
}
if params.Format != nil && *params.Format != "" {
query = query.Where("format = ?", *params.Format)
}
// Compter le total avant pagination
var total int64
if err := query.Count(&total).Error; err != nil {
return nil, 0, fmt.Errorf("failed to count tracks: %w", err)
}
// Appliquer le tri
sortOrder := "DESC"
if params.SortOrder == "asc" {
sortOrder = "ASC"
}
// Valider et appliquer SortBy
sortBy := params.SortBy
if sortBy == "" {
sortBy = "created_at"
}
// Sécurité: valider que sortBy est un champ valide
validSortFields := map[string]bool{
"created_at": true,
"title": true,
"popularity": true,
}
if !validSortFields[sortBy] {
sortBy = "created_at"
}
// Pour "popularity", on utilise play_count + like_count
if sortBy == "popularity" {
query = query.Order(fmt.Sprintf("(play_count + like_count) %s", sortOrder))
} else {
query = query.Order(fmt.Sprintf("%s %s", sortBy, sortOrder))
}
// Appliquer la pagination
if params.Limit <= 0 {
params.Limit = 20 // Par défaut
}
if params.Limit > 100 {
params.Limit = 100 // Maximum
}
if params.Page <= 0 {
params.Page = 1
}
offset := (params.Page - 1) * params.Limit
query = query.Offset(offset).Limit(params.Limit)
// Exécuter la requête
var tracks []*models.Track
if err := query.Preload("User").Find(&tracks).Error; err != nil {
return nil, 0, fmt.Errorf("failed to list tracks: %w", err)
}
return tracks, total, nil
}
// ListTracksWithCursor uses keyset pagination on (created_at, id) for consistent performance.
// When params.Cursor is set, decodes it and fetches records after that point.
// Returns NextCursor for the next page when more results exist.
// v0.931: Cursor-based pagination for GET /tracks
func (s *TrackService) ListTracksWithCursor(ctx context.Context, params TrackListParams) (*TrackListResult, error) {
// Cursor-based only supported for sort_by=created_at (default)
if params.SortBy != "created_at" && params.SortBy != "" {
// Fallback to offset-based
tracks, total, err := s.ListTracks(ctx, params)
if err != nil {
return nil, err
}
return &TrackListResult{Tracks: tracks, Total: total}, nil
}
if params.Limit <= 0 {
params.Limit = 20
}
if params.Limit > 100 {
params.Limit = 100
}
query := s.forRead().WithContext(ctx).Model(&models.Track{}).Where("status = ?", models.TrackStatusCompleted)
if params.UserID != nil {
query = query.Where("creator_id = ?", *params.UserID)
}
if params.Genre != nil && *params.Genre != "" {
query = query.Where("genre = ?", *params.Genre)
}
if params.Format != nil && *params.Format != "" {
query = query.Where("format = ?", *params.Format)
}
// Decode cursor: base64(created_at_unix_nano|uuid)
var cursorCreatedAt int64
var cursorID uuid.UUID
if params.Cursor != "" {
decoded, err := base64.RawURLEncoding.DecodeString(params.Cursor)
if err == nil {
parts := strings.SplitN(string(decoded), "|", 2)
if len(parts) == 2 {
if ts, err := strconv.ParseInt(parts[0], 10, 64); err == nil {
cursorCreatedAt = ts
}
if uid, err := uuid.Parse(parts[1]); err == nil {
cursorID = uid
}
}
}
}
sortOrder := "DESC"
if params.SortOrder == "asc" {
sortOrder = "ASC"
}
if sortOrder == "DESC" {
if params.Cursor != "" && (cursorCreatedAt != 0 || cursorID != uuid.Nil) {
query = query.Where("(created_at, id) < (?, ?)", time.Unix(0, cursorCreatedAt), cursorID)
}
query = query.Order("created_at DESC, id DESC")
} else {
if params.Cursor != "" && (cursorCreatedAt != 0 || cursorID != uuid.Nil) {
query = query.Where("(created_at, id) > (?, ?)", time.Unix(0, cursorCreatedAt), cursorID)
}
query = query.Order("created_at ASC, id ASC")
}
// Fetch limit+1 to know if there's a next page
query = query.Limit(params.Limit + 1)
var tracks []*models.Track
if err := query.Preload("User").Find(&tracks).Error; err != nil {
return nil, fmt.Errorf("failed to list tracks: %w", err)
}
var nextCursor string
var total int64
if len(tracks) > params.Limit {
// Has more - last fetched is the cursor for next page
last := tracks[params.Limit-1]
nextCursor = base64.RawURLEncoding.EncodeToString([]byte(
fmt.Sprintf("%d|%s", last.CreatedAt.UnixNano(), last.ID.String())))
tracks = tracks[:params.Limit]
}
// Total not computed for cursor mode (expensive); use -1 or len as approximation
total = int64(len(tracks))
if nextCursor != "" {
total = int64(params.Limit) + 1
}
return &TrackListResult{
Tracks: tracks,
Total: total,
NextCursor: nextCursor,
}, nil
}
// GetTrackByID récupère un track par son ID
// MOD-P1-003: Preload User pour éviter N+1 queries si User est accédé plus tard
// BE-SVC-001: Add caching for track metadata
func (s *TrackService) GetTrackByID(ctx context.Context, trackID uuid.UUID) (*models.Track, error) { // Changed trackID to uuid.UUID
cacheConfig := services.DefaultCacheConfig()
// Try to get from cache first
if s.cacheService != nil {
var cachedTrack models.Track
if err := s.cacheService.GetTrack(ctx, trackID, &cachedTrack); err == nil {
// Cache hit
return &cachedTrack, nil
}
}
// Cache miss - fetch from database (read replica si configuré)
var track models.Track
if err := s.forRead().WithContext(ctx).
Preload("User").
First(&track, "id = ?", trackID).Error; err != nil { // Updated query
if err == gorm.ErrRecordNotFound {
return nil, ErrTrackNotFound
}
return nil, fmt.Errorf("failed to get track: %w", err)
}
// Cache the track
if s.cacheService != nil {
if err := s.cacheService.SetTrack(ctx, trackID, track, cacheConfig); err != nil {
s.logger.Warn("Failed to cache track", zap.Error(err), zap.String("track_id", trackID.String()))
}
}
return &track, nil
}
// UpdateTrackParams représente les paramètres de mise à jour d'un track
// v0.10.1: Genres for multi-genre (max 3)
type UpdateTrackParams struct {
Title *string `json:"title"`
Artist *string `json:"artist"`
Album *string `json:"album"`
Genre *string `json:"genre"` // legacy single
Genres []string `json:"genres"` // v0.10.1: max 3 slugs
Tags []string `json:"tags"`
Year *int `json:"year"`
BPM *int `json:"bpm"`
MusicalKey *string `json:"musical_key"`
IsPublic *bool `json:"is_public"`
}
// UpdateTrack met à jour les métadonnées d'un track
// BE-SVC-001: Invalidate cache on track update
func (s *TrackService) UpdateTrack(ctx context.Context, trackID uuid.UUID, userID uuid.UUID, params UpdateTrackParams) (*models.Track, error) { // Changed trackID to uuid.UUID
// Récupérer le track existant
track, err := s.GetTrackByID(ctx, trackID)
if err != nil {
return nil, err
}
// MOD-P1-003: Vérifier que l'utilisateur est propriétaire du track ou admin
// Check if user is admin (passed via context value)
isAdmin := false
if adminVal := ctx.Value("is_admin"); adminVal != nil {
if admin, ok := adminVal.(bool); ok {
isAdmin = admin
}
}
if track.UserID != userID && !isAdmin {
return nil, ErrForbidden
}
// Construire les mises à jour
updates := make(map[string]interface{})
if params.Title != nil {
if *params.Title == "" {
return nil, fmt.Errorf("title cannot be empty")
}
updates["title"] = *params.Title
}
if params.Artist != nil {
updates["artist"] = *params.Artist
}
if params.Album != nil {
updates["album"] = *params.Album
}
// v0.10.1: Tags and Genres via discover service (track_tags, track_genres)
if s.discoverService != nil {
if params.Tags != nil {
if err := s.discoverService.SyncTrackTags(ctx, trackID, params.Tags); err != nil {
return nil, fmt.Errorf("sync tags: %w", err)
}
}
if params.Genre != nil || len(params.Genres) > 0 {
genres := params.Genres
if len(genres) == 0 && params.Genre != nil {
genres = []string{*params.Genre}
}
if err := s.discoverService.SyncTrackGenres(ctx, trackID, genres); err != nil {
return nil, fmt.Errorf("sync genres: %w", err)
}
}
} else {
// Fallback when discover service not configured
if params.Genre != nil {
updates["genre"] = *params.Genre
}
if params.Tags != nil {
updates["tags"] = params.Tags
}
}
if params.Year != nil {
if *params.Year < 0 {
return nil, fmt.Errorf("year cannot be negative")
}
updates["year"] = *params.Year
}
if params.BPM != nil {
if *params.BPM < 0 || *params.BPM > 300 {
return nil, fmt.Errorf("bpm must be between 0 and 300")
}
updates["bpm"] = *params.BPM
}
if params.MusicalKey != nil {
updates["musical_key"] = *params.MusicalKey
}
if params.IsPublic != nil {
updates["is_public"] = *params.IsPublic
}
// Invalidate cache before update
if s.cacheService != nil {
if err := s.cacheService.InvalidateTrackCache(ctx, trackID); err != nil {
s.logger.Warn("Failed to invalidate track cache", zap.Error(err), zap.String("track_id", trackID.String()))
}
}
// v0.10.1: If only tags/genres were updated via discover, reload and return
discoverUpdated := s.discoverService != nil && (params.Tags != nil || params.Genre != nil || len(params.Genres) > 0)
if len(updates) == 0 {
if discoverUpdated {
updatedTrack, err := s.GetTrackByID(ctx, trackID)
if err != nil {
return nil, err
}
return updatedTrack, nil
}
return track, nil
}
// Appliquer les mises à jour
if err := s.db.WithContext(ctx).Model(track).Updates(updates).Error; err != nil {
return nil, fmt.Errorf("failed to update track: %w", err)
}
// Recharger le track pour obtenir les valeurs mises à jour
updatedTrack, err := s.GetTrackByID(ctx, trackID)
if err != nil {
return nil, err
}
s.logger.Info("Track updated",
zap.Any("track_id", trackID), // Changed to zap.Any for uuid.UUID
zap.String("user_id", userID.String()),
zap.Any("updates", updates),
)
return updatedTrack, nil
}
// DeleteTrack supprime un track et son fichier physique
func (s *TrackService) DeleteTrack(ctx context.Context, trackID uuid.UUID, userID uuid.UUID) error { // Changed trackID to uuid.UUID
// Récupérer le track existant
track, err := s.GetTrackByID(ctx, trackID)
if err != nil {
return err
}
// MOD-P1-003: Vérifier que l'utilisateur est propriétaire du track ou admin
// Check if user is admin (passed via context value)
isAdmin := false
if adminVal := ctx.Value("is_admin"); adminVal != nil {
if admin, ok := adminVal.(bool); ok {
isAdmin = admin
}
}
if track.UserID != userID && !isAdmin {
return ErrForbidden
}
// Supprimer le fichier physique
if track.FilePath != "" {
if err := os.Remove(track.FilePath); err != nil && !os.IsNotExist(err) {
s.logger.Warn("Failed to delete track file",
zap.Any("track_id", trackID), // Changed to zap.Any for uuid.UUID
zap.String("file_path", track.FilePath),
zap.Error(err),
)
// On continue même si la suppression du fichier échoue
}
}
// Supprimer les fichiers associés (waveform, cover art)
if track.WaveformPath != "" {
if err := os.Remove(track.WaveformPath); err != nil && !os.IsNotExist(err) {
s.logger.Warn("Failed to delete waveform file",
zap.Any("track_id", trackID), // Changed to zap.Any for uuid.UUID
zap.String("waveform_path", track.WaveformPath),
zap.Error(err),
)
}
}
if track.CoverArtPath != "" {
if err := os.Remove(track.CoverArtPath); err != nil && !os.IsNotExist(err) {
s.logger.Warn("Failed to delete cover art file",
zap.Any("track_id", trackID), // Changed to zap.Any for uuid.UUID
zap.String("cover_art_path", track.CoverArtPath),
zap.Error(err),
)
}
}
// Supprimer de la base de données
// GORM gérera automatiquement les relations en cascade grâce aux contraintes OnDelete:CASCADE
if err := s.db.WithContext(ctx).Delete(track).Error; err != nil {
return fmt.Errorf("failed to delete track: %w", err)
}
s.logger.Info("Track deleted",
zap.Any("track_id", trackID), // Changed to zap.Any for uuid.UUID
zap.String("user_id", userID.String()),
zap.String("file_path", track.FilePath),
)
return nil
}
// UpdateStreamStatus updates the stream status and manifest URL of a track
func (s *TrackService) UpdateStreamStatus(ctx context.Context, trackID uuid.UUID, status string, manifestURL string) error { // Changed trackID to uuid.UUID
updates := map[string]interface{}{
"stream_status": status,
}
if manifestURL != "" {
updates["stream_manifest_url"] = manifestURL
}
switch status {
case "ready":
updates["status"] = models.TrackStatusCompleted
updates["status_message"] = "Ready for streaming"
case "error":
updates["status"] = models.TrackStatusFailed
updates["status_message"] = "Transcoding failed"
}
if err := s.db.WithContext(ctx).Model(&models.Track{}).Where("id = ?", trackID).Updates(updates).Error; err != nil {
return fmt.Errorf("failed to update stream status: %w", err)
}
s.logger.Info("Track stream status updated",
zap.Any("track_id", trackID), // Changed to zap.Any for uuid.UUID
zap.String("status", status),
zap.String("manifest_url", manifestURL),
)
return nil
}
// TrackStats représente les statistiques d'un track
type TrackStats struct {
Views int64 `json:"views"`
Likes int64 `json:"likes"`
Comments int64 `json:"comments"`
TotalPlayTime int64 `json:"total_play_time"` // seconds
Downloads int64 `json:"downloads"`
}
// GetTrackStats récupère les statistiques d'un track
func (s *TrackService) GetTrackStats(ctx context.Context, trackID uuid.UUID) (*types.TrackStats, error) { // Changed trackID to uuid.UUID
// Vérifier que le track existe
var track models.Track
if err := s.db.WithContext(ctx).First(&track, "id = ?", trackID).Error; err != nil { // Updated query
if errors.Is(err, gorm.ErrRecordNotFound) {
return nil, ErrTrackNotFound
}
return nil, fmt.Errorf("failed to get track: %w", err)
}
var stats types.TrackStats
// Count likes
if err := s.db.WithContext(ctx).Model(&models.TrackLike{}).
Where("track_id = ?", trackID).
Count(&stats.Likes).Error; err != nil {
return nil, fmt.Errorf("failed to count likes: %w", err)
}
// Count comments (excluding soft-deleted)
if err := s.db.WithContext(ctx).Model(&models.TrackComment{}).
Where("track_id = ?", trackID).
Count(&stats.Comments).Error; err != nil {
return nil, fmt.Errorf("failed to count comments: %w", err)
}
// Count views (total plays) and sum total play time
type PlayStats struct {
Views int64
TotalPlayTime int64
}
var playStats PlayStats
if err := s.db.WithContext(ctx).Model(&models.TrackPlay{}).
Where("track_id = ?", trackID).
Select("COUNT(*) as views, COALESCE(SUM(duration), 0) as total_play_time").
Scan(&playStats).Error; err != nil {
return nil, fmt.Errorf("failed to get play statistics: %w", err)
}
stats.Views = playStats.Views
stats.TotalPlayTime = playStats.TotalPlayTime
// Count downloads (sum of access_count from track_shares where permissions include 'download')
// Note: access_count is incremented when a share link with download permission is accessed
if err := s.db.WithContext(ctx).Model(&models.TrackShare{}).
Where("track_id = ? AND permissions LIKE ?", trackID, "%download%").
Select("COALESCE(SUM(access_count), 0)").
Scan(&stats.Downloads).Error; err != nil {
return nil, fmt.Errorf("failed to count downloads: %w", err)
}
s.logger.Info("Track stats retrieved",
zap.Any("track_id", trackID), // Changed to zap.Any for uuid.UUID
zap.Int64("views", stats.Views),
zap.Int64("likes", stats.Likes),
zap.Int64("comments", stats.Comments),
zap.Int64("total_play_time", stats.TotalPlayTime),
zap.Int64("downloads", stats.Downloads),
)
return &stats, nil
}
// GetLyrics returns lyrics for a track (E3)
func (s *TrackService) GetLyrics(ctx context.Context, trackID uuid.UUID) (*models.TrackLyrics, error) {
var lyrics models.TrackLyrics
if err := s.forRead().WithContext(ctx).Where("track_id = ?", trackID).First(&lyrics).Error; err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
return nil, nil // No lyrics yet
}
return nil, fmt.Errorf("failed to get lyrics: %w", err)
}
return &lyrics, nil
}
// CreateOrUpdateLyrics creates or updates lyrics for a track (E3)
func (s *TrackService) CreateOrUpdateLyrics(ctx context.Context, trackID uuid.UUID, userID uuid.UUID, content string) (*models.TrackLyrics, error) {
// Verify track exists and user owns it
track, err := s.GetTrackByID(ctx, trackID)
if err != nil {
return nil, err
}
if track.UserID != userID {
return nil, ErrForbidden
}
var lyrics models.TrackLyrics
err = s.db.WithContext(ctx).Where("track_id = ?", trackID).First(&lyrics).Error
if err != nil && !errors.Is(err, gorm.ErrRecordNotFound) {
return nil, fmt.Errorf("failed to get lyrics: %w", err)
}
lyrics.TrackID = trackID
lyrics.Content = content
if lyrics.ID == uuid.Nil {
if err := s.db.WithContext(ctx).Create(&lyrics).Error; err != nil {
return nil, fmt.Errorf("failed to create lyrics: %w", err)
}
} else {
if err := s.db.WithContext(ctx).Save(&lyrics).Error; err != nil {
return nil, fmt.Errorf("failed to update lyrics: %w", err)
}
}
return &lyrics, nil
}
// BatchDeleteTracks delegates to TrackBatchService (v0.943)
func (s *TrackService) BatchDeleteTracks(ctx context.Context, trackIDs []uuid.UUID, userID uuid.UUID) (*BatchDeleteResult, error) {
return s.batchService.BatchDeleteTracks(ctx, trackIDs, userID)
}
// deleteTrackFiles supprime les fichiers physiques d'un track (logique extraite de DeleteTrack)
func (s *TrackService) deleteTrackFiles(ctx context.Context, track *models.Track) error {
var errors []error
// Supprimer le fichier principal
if track.FilePath != "" {
if err := os.Remove(track.FilePath); err != nil && !os.IsNotExist(err) {
errors = append(errors, fmt.Errorf("failed to delete track file %s: %w", track.FilePath, err))
}
}
// Supprimer le fichier waveform
if track.WaveformPath != "" {
if err := os.Remove(track.WaveformPath); err != nil && !os.IsNotExist(err) {
errors = append(errors, fmt.Errorf("failed to delete waveform file %s: %w", track.WaveformPath, err))
}
}
// Supprimer le fichier cover art
if track.CoverArtPath != "" {
if err := os.Remove(track.CoverArtPath); err != nil && !os.IsNotExist(err) {
errors = append(errors, fmt.Errorf("failed to delete cover art file %s: %w", track.CoverArtPath, err))
}
}
// Retourner la première erreur si il y en a, sinon nil
if len(errors) > 0 {
return errors[0]
}
return nil
}
// BatchUpdateTracks delegates to TrackBatchService (v0.943)
func (s *TrackService) BatchUpdateTracks(ctx context.Context, trackIDs []uuid.UUID, userID uuid.UUID, updates map[string]interface{}) (*BatchUpdateResult, error) {
return s.batchService.BatchUpdateTracks(ctx, trackIDs, userID, updates)
}
// UpdateStreamStatus updates the stream status and manifest URL of a track