Two more cohesive blocks lifted out of monolithic files following the same recipe as the marketplace refund split (commit36ee3da1). internal/core/track/service.go : 1639 → 1026 LOC Extracted to service_upload.go (640 LOC) : UploadTrack (multipart entry point) copyFileAsync (local/s3 dispatcher) copyFileAsyncLocal (FS write path) copyFileAsyncS3 (direct S3 stream path, v1.0.8) chunkStreamer interface (helper for chunked → S3) CreateTrackFromChunkedUploadToS3 (v1.0.9 1.5 fast path) extFromContentType (helper) MigrateLocalToS3IfConfigured (post-assembly migration) mimeTypeForAudioExt (helper) updateTrackStatus (status updater) cleanupFailedUpload (rollback helper) CreateTrackFromPath (no-multipart constructor) Removed `internal/monitoring` import from service.go (the only user was the upload path). internal/handlers/playlist_handler.go : 1397 → 1107 LOC Extracted to playlist_handler_collaborators.go (309 LOC) : AddCollaboratorRequest, UpdateCollaboratorPermissionRequest DTOs AddCollaborator, RemoveCollaborator, UpdateCollaboratorPermission, GetCollaborators handlers All four handlers were a self-contained surface (one route group, one DTO pair, no shared helpers with the rest of the file). Tests run after each split : go test ./internal/core/marketplace -short → PASS go test ./internal/core/track -short → PASS go test ./internal/handlers -short → PASS The dette-tech split target was three files at 1.7k+ / 1.6k+ / 1.4k+ LOC. After this commit +36ee3da1: marketplace/service.go : 1737 → 1340 (-397) track/service.go : 1639 → 1026 (-613) handlers/playlist_handler.go : 1397 → 1107 (-290) total reduction : 4773 → 3473 (-1300, -27%) Each receiver still has a clear "main" file ; the extracted siblings encapsulate one concern apiece. Future splits should follow the same naming pattern (service_<concern>.go, playlist_handler_<concern>.go) so a quick `ls` shows the file organisation matches the feature surface. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
640 lines
24 KiB
Go
640 lines
24 KiB
Go
package track
|
||
|
||
// Upload pipeline methods extracted from service.go (v1.0.10 dette tech).
|
||
// Same package, same TrackService receiver — pure code-org move with no
|
||
// behaviour change. Covers : direct upload (UploadTrack +
|
||
// copyFileAsync{Local,S3}), the chunked → S3 fast path
|
||
// (CreateTrackFromChunkedUploadToS3), the local→S3 migration helper, and
|
||
// the small helpers used only by these flows (status updater,
|
||
// cleanup, MIME/extension mapping).
|
||
|
||
import (
|
||
"context"
|
||
"fmt"
|
||
"io"
|
||
"mime/multipart"
|
||
"os"
|
||
"path/filepath"
|
||
"strings"
|
||
"time"
|
||
|
||
"veza-backend-api/internal/models"
|
||
"veza-backend-api/internal/monitoring"
|
||
|
||
"github.com/google/uuid"
|
||
"go.uber.org/zap"
|
||
)
|
||
|
||
// UploadTrack upload un fichier audio et crée un enregistrement Track en base
|
||
// MOD-P2-008: Implémentation asynchrone - crée le Track immédiatement et lance la copie en goroutine
|
||
// Retourne le Track avec Status=Uploading, la copie se fait en arrière-plan
|
||
func (s *TrackService) UploadTrack(ctx context.Context, userID uuid.UUID, fileHeader *multipart.FileHeader, metadata TrackMetadata) (*models.Track, error) {
|
||
// Vérifier le quota utilisateur
|
||
if err := s.CheckUserQuota(ctx, userID, fileHeader.Size); err != nil {
|
||
// FIX #10: Logger l'erreur avec contexte
|
||
s.logger.Warn("User quota check failed",
|
||
zap.String("user_id", userID.String()),
|
||
zap.Int64("file_size", fileHeader.Size),
|
||
zap.String("filename", fileHeader.Filename),
|
||
zap.Error(err),
|
||
)
|
||
return nil, err
|
||
}
|
||
|
||
// Valider le fichier
|
||
if err := s.ValidateTrackFile(fileHeader); err != nil {
|
||
// FIX #10: Logger l'erreur avec contexte
|
||
s.logger.Warn("Track file validation failed",
|
||
zap.String("user_id", userID.String()),
|
||
zap.String("filename", fileHeader.Filename),
|
||
zap.Int64("file_size", fileHeader.Size),
|
||
zap.Error(err),
|
||
)
|
||
return nil, err
|
||
}
|
||
|
||
// Créer le répertoire d'upload s'il n'existe pas
|
||
s.logger.Debug("Checking upload directory", zap.String("upload_dir", s.uploadDir))
|
||
if err := os.MkdirAll(s.uploadDir, 0755); err != nil {
|
||
s.logger.Error("Failed to create upload directory", zap.String("upload_dir", s.uploadDir), zap.Error(err))
|
||
return nil, fmt.Errorf("%w: failed to create upload directory: %w", ErrStorageError, err)
|
||
}
|
||
s.logger.Debug("Upload directory created/verified", zap.String("upload_dir", s.uploadDir))
|
||
|
||
// Générer un nom de fichier unique
|
||
timestamp := uuid.New()
|
||
ext := filepath.Ext(fileHeader.Filename)
|
||
filename := fmt.Sprintf("%s_%s%s", userID.String(), timestamp.String(), ext) // Fixed format to use strings for UUID
|
||
filePath := filepath.Join(s.uploadDir, filename)
|
||
s.logger.Debug("Upload destination path", zap.String("file_path", filePath))
|
||
|
||
// Déterminer le format depuis l'extension
|
||
format := strings.TrimPrefix(strings.ToUpper(ext), ".")
|
||
if format == "M4A" {
|
||
format = "AAC"
|
||
}
|
||
|
||
// Déterminer le titre (métadonnée ou nom de fichier)
|
||
title := metadata.Title
|
||
if title == "" {
|
||
title = strings.TrimSuffix(fileHeader.Filename, ext)
|
||
}
|
||
|
||
// MOD-P2-008: Créer l'enregistrement Track en base AVANT la copie (sémantique asynchrone)
|
||
// Le fichier n'existe pas encore, mais on crée l'enregistrement pour traçabilité
|
||
// FileID est NULL temporairement (sera mis à jour après création du fichier)
|
||
track := &models.Track{
|
||
UserID: userID,
|
||
FileID: nil, // NULL temporairement - sera mis à jour après création fichier
|
||
Title: title,
|
||
Artist: metadata.Artist,
|
||
Album: metadata.Album,
|
||
Genre: metadata.Genre,
|
||
Year: metadata.Year,
|
||
FilePath: filePath,
|
||
FileSize: fileHeader.Size,
|
||
Format: format,
|
||
Duration: 0, // Sera mis à jour lors du traitement asynchrone
|
||
IsPublic: metadata.IsPublic,
|
||
Status: models.TrackStatusUploading,
|
||
StatusMessage: "Upload started",
|
||
}
|
||
|
||
if err := s.db.WithContext(ctx).Create(track).Error; err != nil {
|
||
s.logger.Error("Failed to create track record", zap.Error(err))
|
||
return nil, fmt.Errorf("failed to create track record: %w", err)
|
||
}
|
||
s.logger.Debug("Track record created in DB", zap.String("track_id", track.ID.String()))
|
||
|
||
// MOD-P2-008: Lancer la copie fichier en goroutine avec suivi (context + cancellation)
|
||
// La goroutine mettra à jour le Status quand terminé
|
||
s.logger.Debug("Starting async file copy")
|
||
go s.copyFileAsync(ctx, track.ID, fileHeader, filePath, userID)
|
||
|
||
// MOD-P2-003: Enregistrer la métrique business
|
||
monitoring.RecordTrackUploaded()
|
||
|
||
s.logger.Info("Track upload initiated (async)",
|
||
zap.String("track_id", track.ID.String()),
|
||
zap.String("user_id", userID.String()),
|
||
zap.String("filename", filename),
|
||
zap.Int64("file_size", fileHeader.Size),
|
||
)
|
||
|
||
return track, nil
|
||
}
|
||
|
||
// copyFileAsync écrit le fichier uploadé sur le storage backend configuré
|
||
// (local ou s3) et met à jour le Status du Track.
|
||
// MOD-P2-008: Goroutine suivie avec context + cancellation + nettoyage en cas d'erreur.
|
||
// v1.0.8 Phase 1: dispatcher local/s3 selon TrackService.storageBackend.
|
||
func (s *TrackService) copyFileAsync(ctx context.Context, trackID uuid.UUID, fileHeader *multipart.FileHeader, filePath string, userID uuid.UUID) {
|
||
if s.storageBackend == "s3" && s.s3Service != nil {
|
||
s.copyFileAsyncS3(ctx, trackID, fileHeader, userID)
|
||
return
|
||
}
|
||
s.copyFileAsyncLocal(ctx, trackID, fileHeader, filePath, userID)
|
||
}
|
||
|
||
// copyFileAsyncLocal : comportement historique — écrit sur le FS local à filePath.
|
||
// Appelé par copyFileAsync quand storageBackend != "s3".
|
||
func (s *TrackService) copyFileAsyncLocal(ctx context.Context, trackID uuid.UUID, fileHeader *multipart.FileHeader, filePath string, userID uuid.UUID) {
|
||
// Créer un contexte avec timeout pour la copie (5 minutes max)
|
||
copyCtx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
|
||
defer cancel()
|
||
|
||
// Ouvrir le fichier source
|
||
s.logger.Debug("Opening source file for async copy")
|
||
src, err := fileHeader.Open()
|
||
if err != nil {
|
||
s.logger.Error("Failed to open source file for async copy", zap.Error(err))
|
||
s.updateTrackStatus(copyCtx, trackID, models.TrackStatusFailed, fmt.Sprintf("Failed to open uploaded file: %v", err))
|
||
s.cleanupFailedUpload(filePath, trackID, "failed to open source file")
|
||
return
|
||
}
|
||
defer src.Close()
|
||
s.logger.Debug("Source file opened for async copy")
|
||
|
||
// Créer le fichier de destination
|
||
s.logger.Debug("Creating destination file", zap.String("file_path", filePath))
|
||
dst, err := os.Create(filePath)
|
||
if err != nil {
|
||
s.logger.Error("Failed to create destination file", zap.String("file_path", filePath), zap.Error(err))
|
||
s.updateTrackStatus(copyCtx, trackID, models.TrackStatusFailed, fmt.Sprintf("Failed to create destination file: %v", err))
|
||
s.cleanupFailedUpload(filePath, trackID, "failed to create destination file")
|
||
return
|
||
}
|
||
defer dst.Close()
|
||
s.logger.Debug("Destination file created")
|
||
|
||
// Copier le fichier avec gestion d'erreurs
|
||
s.logger.Debug("Starting file copy")
|
||
bytesWritten, err := io.Copy(dst, src)
|
||
if err != nil {
|
||
s.logger.Error("File copy failed", zap.Error(err))
|
||
s.updateTrackStatus(copyCtx, trackID, models.TrackStatusFailed, fmt.Sprintf("Failed to save file: %v", err))
|
||
s.cleanupFailedUpload(filePath, trackID, fmt.Sprintf("copy failed: %v", err))
|
||
return
|
||
}
|
||
s.logger.Debug("File copied successfully", zap.Int64("bytes_written", bytesWritten))
|
||
|
||
// Vérifier si le contexte a été annulé
|
||
select {
|
||
case <-copyCtx.Done():
|
||
s.updateTrackStatus(copyCtx, trackID, models.TrackStatusFailed, fmt.Sprintf("Upload cancelled: %v", copyCtx.Err()))
|
||
s.cleanupFailedUpload(filePath, trackID, "upload cancelled")
|
||
return
|
||
default:
|
||
// Continuer
|
||
}
|
||
|
||
// Vérifier que tous les bytes ont été copiés
|
||
if bytesWritten != fileHeader.Size {
|
||
s.updateTrackStatus(copyCtx, trackID, models.TrackStatusFailed, fmt.Sprintf("Incomplete copy: %d/%d bytes", bytesWritten, fileHeader.Size))
|
||
s.cleanupFailedUpload(filePath, trackID, fmt.Sprintf("incomplete copy: %d/%d bytes", bytesWritten, fileHeader.Size))
|
||
return
|
||
}
|
||
|
||
// Copie réussie - mettre à jour le Status
|
||
s.updateTrackStatus(copyCtx, trackID, models.TrackStatusProcessing, "File uploaded, processing...")
|
||
|
||
// INT-02: Trigger HLS transcoding on stream server after successful upload.
|
||
// v1.0.8 Phase 1 — dual-trigger (gRPC + RabbitMQ) noted in plan D2 is
|
||
// preserved as-is; the chunked upload path in track_upload_handler.go ALSO
|
||
// enqueues a RabbitMQ transcoding job, regular uploads only hit gRPC. To be
|
||
// consolidated in v1.0.9.
|
||
if s.streamService != nil {
|
||
if err := s.streamService.StartProcessing(copyCtx, trackID, filePath); err != nil {
|
||
s.logger.Warn("Failed to trigger stream server transcoding (track will remain in Processing)",
|
||
zap.String("track_id", trackID.String()),
|
||
zap.String("file_path", filePath),
|
||
zap.Error(err),
|
||
)
|
||
} else {
|
||
s.logger.Info("Stream server transcoding triggered",
|
||
zap.String("track_id", trackID.String()),
|
||
zap.String("file_path", filePath),
|
||
)
|
||
}
|
||
}
|
||
|
||
s.logger.Info("Track file copied successfully (async)",
|
||
zap.String("track_id", trackID.String()),
|
||
zap.String("user_id", userID.String()),
|
||
zap.Int64("bytes_written", bytesWritten),
|
||
zap.String("file_path", filePath),
|
||
)
|
||
}
|
||
|
||
// copyFileAsyncS3 streame le fichier uploadé directement vers S3/MinIO et
|
||
// enregistre storage_backend=s3 + storage_key sur le Track. Pas de passage
|
||
// par le FS local.
|
||
//
|
||
// v1.0.8 Phase 1. Read path (StreamTrack/DownloadTrack) + transcoder restent
|
||
// à brancher en Phase 2 — un track uploadé en s3 AVANT Phase 2 sera
|
||
// stockable mais pas streamable tant que Phase 2 n'a pas atterri. Pour éviter
|
||
// cet état cassé en prod, ne pas flipper TRACK_STORAGE_BACKEND=s3 avant que
|
||
// toutes les phases P0 soient taguées (cf. plan batch A).
|
||
func (s *TrackService) copyFileAsyncS3(ctx context.Context, trackID uuid.UUID, fileHeader *multipart.FileHeader, userID uuid.UUID) {
|
||
copyCtx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
|
||
defer cancel()
|
||
|
||
src, err := fileHeader.Open()
|
||
if err != nil {
|
||
s.logger.Error("S3 upload: failed to open source file", zap.Error(err))
|
||
s.updateTrackStatus(copyCtx, trackID, models.TrackStatusFailed, fmt.Sprintf("Failed to open uploaded file: %v", err))
|
||
return
|
||
}
|
||
defer src.Close()
|
||
|
||
ext := filepath.Ext(fileHeader.Filename)
|
||
// Clé déterministe, facile à migrer et à retrouver : tracks/<userID>/<trackID>.<ext>
|
||
s3Key := fmt.Sprintf("tracks/%s/%s%s", userID.String(), trackID.String(), ext)
|
||
contentType := mimeTypeForAudioExt(ext)
|
||
|
||
s.logger.Info("S3 upload starting",
|
||
zap.String("track_id", trackID.String()),
|
||
zap.String("s3_key", s3Key),
|
||
zap.String("s3_bucket", s.s3Bucket),
|
||
zap.Int64("size", fileHeader.Size),
|
||
)
|
||
|
||
if _, err := s.s3Service.UploadStream(copyCtx, src, s3Key, contentType, fileHeader.Size); err != nil {
|
||
s.logger.Error("S3 upload failed", zap.Error(err), zap.String("s3_key", s3Key))
|
||
s.updateTrackStatus(copyCtx, trackID, models.TrackStatusFailed, fmt.Sprintf("S3 upload failed: %v", err))
|
||
return
|
||
}
|
||
|
||
// DB: persist storage_backend + storage_key atomically with status transition.
|
||
if err := s.db.WithContext(copyCtx).Model(&models.Track{}).
|
||
Where("id = ?", trackID).
|
||
Updates(map[string]interface{}{
|
||
"storage_backend": "s3",
|
||
"storage_key": s3Key,
|
||
"status": models.TrackStatusProcessing,
|
||
"status_message": "Uploaded to S3, processing...",
|
||
}).Error; err != nil {
|
||
s.logger.Error("Failed to update track after S3 upload — object exists but DB row is stale",
|
||
zap.String("track_id", trackID.String()),
|
||
zap.String("s3_key", s3Key),
|
||
zap.Error(err),
|
||
)
|
||
// Don't return — the object is in S3 already. Admin can reconcile via
|
||
// storage_backend column and S3 list.
|
||
}
|
||
|
||
// v1.0.8 Phase 2 — trigger stream server transcoding with a signed URL
|
||
// so the Rust-side ffmpeg can pull the source directly from MinIO. TTL
|
||
// 1h is a soft cap : transcode usually completes in <10min.
|
||
if s.streamService != nil {
|
||
signedURL, err := s.s3Service.GetSignedURL(copyCtx, s3Key, time.Hour)
|
||
if err != nil {
|
||
s.logger.Warn("Failed to generate signed URL for stream trigger (track stays Processing until re-tried)",
|
||
zap.String("track_id", trackID.String()),
|
||
zap.String("s3_key", s3Key),
|
||
zap.Error(err),
|
||
)
|
||
} else if err := s.streamService.StartProcessing(copyCtx, trackID, signedURL); err != nil {
|
||
s.logger.Warn("Stream server transcoding trigger failed (track stays Processing until re-tried)",
|
||
zap.String("track_id", trackID.String()),
|
||
zap.Error(err),
|
||
)
|
||
} else {
|
||
s.logger.Info("Stream server transcoding triggered with S3 signed URL",
|
||
zap.String("track_id", trackID.String()),
|
||
zap.String("s3_key", s3Key),
|
||
)
|
||
}
|
||
}
|
||
|
||
s.logger.Info("Track streamed successfully to S3 (async)",
|
||
zap.String("track_id", trackID.String()),
|
||
zap.String("user_id", userID.String()),
|
||
zap.String("s3_key", s3Key),
|
||
zap.Int64("size", fileHeader.Size),
|
||
)
|
||
}
|
||
|
||
// chunkStreamer is the narrow interface CreateTrackFromChunkedUploadToS3
|
||
// needs from a chunk service. *services.TrackChunkService satisfies it
|
||
// via its v1.0.9 StreamChunkedUpload method (chunks → io.Writer).
|
||
//
|
||
// Kept package-local to avoid a circular import with internal/services.
|
||
type chunkStreamer interface {
|
||
StreamChunkedUpload(ctx context.Context, uploadID string, dst io.Writer) (filename string, totalSize int64, checksum string, err error)
|
||
}
|
||
|
||
// CreateTrackFromChunkedUploadToS3 streame les chunks d'un upload en cours
|
||
// directement vers S3 multipart (sans matérialiser le fichier assemblé sur
|
||
// disque local), puis crée la ligne Track avec storage_backend=s3.
|
||
//
|
||
// v1.0.9 item 1.5 — fast path pour le backend S3. Le path historique
|
||
// (CompleteChunkedUpload → fichier local → MigrateLocalToS3IfConfigured)
|
||
// reste utilisé pour storageBackend="local" (et conservé en fallback si
|
||
// s3Service est nil).
|
||
//
|
||
// Tuyauterie : un io.Pipe relie le côté write (alimenté par
|
||
// `chunks.StreamChunkedUpload` dans une goroutine) au côté read
|
||
// (consommé par `s3Service.UploadStream`, qui lui-même délègue à
|
||
// manager.Uploader pour faire du multipart 10 MB × 3 goroutines en
|
||
// streaming). Aucun buffer intermédiaire, aucun fichier assemblé sur
|
||
// disque local.
|
||
//
|
||
// Sur erreur de la goroutine d'assemblage, le pipe est fermé avec cette
|
||
// erreur ; UploadStream la voit et abandonne le multipart upload côté
|
||
// AWS SDK (cleanup automatique des parts uploadées). Sur erreur S3, la
|
||
// goroutine est notifiée par PipeWriter.Close, le state Redis et les
|
||
// chunks locaux NE sont PAS supprimés (préservation pour reprise).
|
||
func (s *TrackService) CreateTrackFromChunkedUploadToS3(
|
||
ctx context.Context,
|
||
chunks chunkStreamer,
|
||
userID uuid.UUID,
|
||
uploadID string,
|
||
contentType string,
|
||
expectedSize int64,
|
||
) (track *models.Track, checksumOut string, err error) {
|
||
if s.storageBackend != "s3" || s.s3Service == nil {
|
||
return nil, "", fmt.Errorf("s3 backend not configured")
|
||
}
|
||
if chunks == nil {
|
||
return nil, "", fmt.Errorf("chunk streamer is required")
|
||
}
|
||
|
||
trackID := uuid.New()
|
||
|
||
// We need the filename/extension before knowing where to put the S3 key.
|
||
// StreamChunkedUpload returns it after streaming, but we need the key
|
||
// up-front for the S3 PUT. Encode the trackID in the key and let the
|
||
// extension follow once known via a rename — except S3 doesn't have
|
||
// rename, only copy+delete. Cheaper to derive the extension from the
|
||
// caller (handler already has uploadInfo.Filename via GetUploadInfo).
|
||
//
|
||
// Pragmatic compromise: caller passes contentType, we derive ext from it
|
||
// for the S3 key. Falls back to .bin if unknown. The Track row's
|
||
// .Format field is set by the caller after.
|
||
ext := extFromContentType(contentType)
|
||
s3Key := fmt.Sprintf("tracks/%s/%s%s", userID.String(), trackID.String(), ext)
|
||
|
||
pr, pw := io.Pipe()
|
||
|
||
type assembleResult struct {
|
||
filename string
|
||
totalSize int64
|
||
checksum string
|
||
err error
|
||
}
|
||
resultCh := make(chan assembleResult, 1)
|
||
|
||
go func() {
|
||
filename, totalSize, checksum, streamErr := chunks.StreamChunkedUpload(ctx, uploadID, pw)
|
||
// Closing the pipe with the streaming error makes UploadStream see
|
||
// it on its next Read and abort the multipart upload. CloseWithError
|
||
// is safe even on the success path (nil error) — the read side
|
||
// receives io.EOF as expected.
|
||
_ = pw.CloseWithError(streamErr)
|
||
resultCh <- assembleResult{filename: filename, totalSize: totalSize, checksum: checksum, err: streamErr}
|
||
}()
|
||
|
||
// expectedSize lets the SDK avoid a "stream of unknown length" path
|
||
// (which forces full buffering of each part); we trust the Redis
|
||
// state's TotalSize (validated by StreamChunkedUpload before ack).
|
||
if _, err := s.s3Service.UploadStream(ctx, pr, s3Key, contentType, expectedSize); err != nil {
|
||
_ = pr.CloseWithError(err)
|
||
<-resultCh // drain so the goroutine exits
|
||
return nil, "", fmt.Errorf("s3 multipart upload failed: %w", err)
|
||
}
|
||
|
||
res := <-resultCh
|
||
if res.err != nil {
|
||
// S3 may have a complete object at this point, but the assembled
|
||
// stream was wrong (size mismatch, missing chunk file). Delete the
|
||
// orphan to avoid charging for storage of a corrupt object.
|
||
if delErr := s.s3Service.DeleteFile(ctx, s3Key); delErr != nil {
|
||
s.logger.Warn("Failed to delete orphan S3 object after stream error",
|
||
zap.String("s3_key", s3Key),
|
||
zap.Error(delErr),
|
||
)
|
||
}
|
||
return nil, "", fmt.Errorf("chunk assembly failed: %w", res.err)
|
||
}
|
||
|
||
title := strings.TrimSuffix(res.filename, filepath.Ext(res.filename))
|
||
format := strings.TrimPrefix(strings.ToUpper(filepath.Ext(res.filename)), ".")
|
||
if format == "M4A" {
|
||
format = "AAC"
|
||
}
|
||
|
||
track = &models.Track{
|
||
ID: trackID,
|
||
UserID: userID,
|
||
Title: title,
|
||
FilePath: "", // legacy column, unused for S3-backed rows
|
||
FileSize: res.totalSize,
|
||
Format: format,
|
||
Duration: 0,
|
||
IsPublic: true,
|
||
Status: models.TrackStatusUploading,
|
||
StatusMessage: fmt.Sprintf("Upload completed (S3), checksum: %s", res.checksum),
|
||
StorageBackend: "s3",
|
||
StorageKey: &s3Key,
|
||
}
|
||
|
||
if err := s.db.WithContext(ctx).Create(track).Error; err != nil {
|
||
// DB row creation failed but the S3 object exists — clean up to
|
||
// avoid an orphan that no row points at.
|
||
if delErr := s.s3Service.DeleteFile(ctx, s3Key); delErr != nil {
|
||
s.logger.Error("Failed to delete S3 object after DB row creation failed",
|
||
zap.String("s3_key", s3Key),
|
||
zap.Error(delErr),
|
||
)
|
||
}
|
||
return nil, "", fmt.Errorf("failed to create track record: %w", err)
|
||
}
|
||
|
||
s.logger.Info("Track created via direct chunked-S3 streaming",
|
||
zap.String("track_id", track.ID.String()),
|
||
zap.String("user_id", userID.String()),
|
||
zap.String("s3_key", s3Key),
|
||
zap.Int64("size", res.totalSize),
|
||
zap.String("checksum", res.checksum),
|
||
)
|
||
return track, res.checksum, nil
|
||
}
|
||
|
||
// extFromContentType returns a sensible file extension (incl. leading
|
||
// dot) for the audio MIME types we accept. Used to build S3 object keys
|
||
// when assembling chunks straight to S3 (v1.0.9 item 1.5) — the chunk
|
||
// service knows the original filename only AFTER streaming completes,
|
||
// but the S3 key is needed BEFORE.
|
||
func extFromContentType(contentType string) string {
|
||
switch strings.ToLower(contentType) {
|
||
case "audio/mpeg":
|
||
return ".mp3"
|
||
case "audio/flac":
|
||
return ".flac"
|
||
case "audio/wav":
|
||
return ".wav"
|
||
case "audio/ogg":
|
||
return ".ogg"
|
||
case "audio/aac", "audio/mp4", "audio/x-m4a":
|
||
return ".m4a"
|
||
case "audio/aiff", "audio/x-aiff":
|
||
return ".aiff"
|
||
default:
|
||
return ".bin"
|
||
}
|
||
}
|
||
|
||
// MigrateLocalToS3IfConfigured upload un fichier déjà assemblé sur le FS local
|
||
// vers S3 si storageBackend="s3". No-op sinon.
|
||
//
|
||
// v1.0.8 Phase 1 — utilisé par le path chunked upload (après assemblage local
|
||
// + CreateTrackFromPath). En post-assemblage on a un gros fichier local qu'on
|
||
// stream vers S3 et on efface le local. Si quelque chose échoue, on log mais
|
||
// ne retourne pas d'erreur au handler : la track existe déjà (storage_backend
|
||
// reste "local"), l'admin peut réessayer via `cmd/migrate_storage` (Phase 3).
|
||
func (s *TrackService) MigrateLocalToS3IfConfigured(ctx context.Context, trackID, userID uuid.UUID, localPath string) error {
|
||
if s.storageBackend != "s3" || s.s3Service == nil {
|
||
return nil
|
||
}
|
||
|
||
file, err := os.Open(localPath)
|
||
if err != nil {
|
||
return fmt.Errorf("open assembled file: %w", err)
|
||
}
|
||
defer file.Close()
|
||
|
||
stat, err := file.Stat()
|
||
if err != nil {
|
||
return fmt.Errorf("stat assembled file: %w", err)
|
||
}
|
||
|
||
ext := filepath.Ext(localPath)
|
||
s3Key := fmt.Sprintf("tracks/%s/%s%s", userID.String(), trackID.String(), ext)
|
||
contentType := mimeTypeForAudioExt(ext)
|
||
|
||
if _, err := s.s3Service.UploadStream(ctx, file, s3Key, contentType, stat.Size()); err != nil {
|
||
return fmt.Errorf("s3 upload: %w", err)
|
||
}
|
||
|
||
if err := s.db.WithContext(ctx).Model(&models.Track{}).
|
||
Where("id = ?", trackID).
|
||
Updates(map[string]interface{}{
|
||
"storage_backend": "s3",
|
||
"storage_key": s3Key,
|
||
}).Error; err != nil {
|
||
// Object is in S3 but DB row still says local. Dangerous (reader will
|
||
// try local FS path). Caller should treat as fatal for this request.
|
||
return fmt.Errorf("s3 upload succeeded but DB update failed: %w", err)
|
||
}
|
||
|
||
if err := os.Remove(localPath); err != nil {
|
||
s.logger.Warn("S3 migration OK but local cleanup failed (dangling file)",
|
||
zap.String("track_id", trackID.String()),
|
||
zap.String("path", localPath),
|
||
zap.Error(err),
|
||
)
|
||
// Non-fatal — object in S3, DB correct, local file is orphaned debris.
|
||
}
|
||
|
||
s.logger.Info("Assembled chunk upload migrated to S3",
|
||
zap.String("track_id", trackID.String()),
|
||
zap.String("s3_key", s3Key),
|
||
zap.Int64("size", stat.Size()),
|
||
)
|
||
return nil
|
||
}
|
||
|
||
// mimeTypeForAudioExt retourne le Content-Type MIME pour une extension audio.
|
||
// Fallback sur application/octet-stream pour les extensions inconnues.
|
||
func mimeTypeForAudioExt(ext string) string {
|
||
switch strings.ToLower(ext) {
|
||
case ".mp3":
|
||
return "audio/mpeg"
|
||
case ".flac":
|
||
return "audio/flac"
|
||
case ".wav":
|
||
return "audio/wav"
|
||
case ".ogg":
|
||
return "audio/ogg"
|
||
case ".m4a", ".aac":
|
||
return "audio/aac"
|
||
default:
|
||
return "application/octet-stream"
|
||
}
|
||
}
|
||
|
||
// updateTrackStatus met à jour le Status et StatusMessage d'un Track
|
||
// MOD-P2-008: Helper pour mettre à jour le Status de manière thread-safe
|
||
func (s *TrackService) updateTrackStatus(ctx context.Context, trackID uuid.UUID, status models.TrackStatus, message string) {
|
||
if err := s.db.WithContext(ctx).Model(&models.Track{}).
|
||
Where("id = ?", trackID).
|
||
Updates(map[string]interface{}{
|
||
"status": status,
|
||
"status_message": message,
|
||
}).Error; err != nil {
|
||
s.logger.Error("Failed to update track status",
|
||
zap.String("track_id", trackID.String()),
|
||
zap.String("status", string(status)),
|
||
zap.String("message", message),
|
||
zap.Error(err),
|
||
)
|
||
} else {
|
||
s.logger.Info("Track status updated",
|
||
zap.String("track_id", trackID.String()),
|
||
zap.String("status", string(status)),
|
||
zap.String("message", message),
|
||
)
|
||
}
|
||
}
|
||
|
||
// cleanupFailedUpload nettoie le fichier et le Track en cas d'échec
|
||
// MOD-P2-008: Nettoyage automatique en cas d'erreur
|
||
func (s *TrackService) cleanupFailedUpload(filePath string, trackID uuid.UUID, reason string) {
|
||
// Supprimer le fichier s'il existe
|
||
if err := os.Remove(filePath); err != nil && !os.IsNotExist(err) {
|
||
s.logger.Warn("Failed to cleanup file after upload failure",
|
||
zap.String("file_path", filePath),
|
||
zap.String("track_id", trackID.String()),
|
||
zap.String("reason", reason),
|
||
zap.Error(err),
|
||
)
|
||
}
|
||
|
||
s.logger.Info("Cleaned up failed upload",
|
||
zap.String("track_id", trackID.String()),
|
||
zap.String("file_path", filePath),
|
||
zap.String("reason", reason),
|
||
)
|
||
}
|
||
|
||
// CreateTrackFromPath crée un track à partir d'un fichier déjà sauvegardé
|
||
func (s *TrackService) CreateTrackFromPath(ctx context.Context, userID uuid.UUID, filePath, filename string, fileSize int64, format string) (*models.Track, error) {
|
||
ext := filepath.Ext(filename)
|
||
title := strings.TrimSuffix(filename, ext)
|
||
|
||
track := &models.Track{
|
||
UserID: userID,
|
||
Title: title,
|
||
FilePath: filePath,
|
||
FileSize: fileSize,
|
||
Format: format,
|
||
Duration: 0, // Sera mis à jour lors du traitement asynchrone
|
||
IsPublic: true,
|
||
Status: models.TrackStatusUploading,
|
||
StatusMessage: "Upload completed",
|
||
}
|
||
|
||
if err := s.db.WithContext(ctx).Create(track).Error; err != nil {
|
||
return nil, fmt.Errorf("failed to create track record: %w", err)
|
||
}
|
||
|
||
s.logger.Info("Track created from path",
|
||
zap.String("track_id", track.ID.String()),
|
||
zap.String("user_id", userID.String()),
|
||
zap.String("file_path", filePath),
|
||
zap.Int64("file_size", fileSize),
|
||
)
|
||
|
||
return track, nil
|
||
}
|