veza/veza-backend-api/internal/services/track_chunk_service.go
senke 8699004974 feat(track): native S3 multipart for chunked uploads (v1.0.9 item 1.5)
Replaces the historical chunked-upload flow when TRACK_STORAGE_BACKEND=s3:

  before: chunks → assembled file on disk → MigrateLocalToS3IfConfigured
          opens the file → manager.Uploader streams in 10 MB parts
  after:  chunks → io.Pipe → manager.Uploader streams in 10 MB parts
          (no assembled file on local disk)

Eliminates the second local copy of every upload and ~500 MB of disk
I/O per concurrent 500 MB upload. The local-storage path
(TRACK_STORAGE_BACKEND=local, default) is unchanged — it still goes
through CompleteChunkedUpload + CreateTrackFromPath because ClamAV needs
the assembled file (chunked path skips ClamAV by design, see audit).

New surface:
  - TrackChunkService.StreamChunkedUpload(ctx, uploadID, dst io.Writer)
    — extracted from CompleteChunkedUpload, writes chunks in order to
    any io.Writer, computes SHA-256 + verifies expected size, cleans
    up Redis state on success and preserves it on failure (resumable).
  - TrackService.CreateTrackFromChunkedUploadToS3 — orchestrates
    io.Pipe + goroutine, deletes orphan S3 objects on assembly failure,
    creates the Track row with storage_backend=s3 + storage_key.

Tests: 4 chunk-service stream tests (happy / writer error / size
mismatch / delegation) + 4 service tests (happy / wrong backend /
stream error / S3 upload error). One E2E @critical-s3 spec gated on
S3 availability via /health/deep so it ships today and starts running
once MinIO is added to the e2e workflow services block.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-26 23:12:56 +02:00

461 lines
15 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package services
import (
"context"
"crypto/sha256"
"encoding/hex"
"fmt"
"io"
"mime/multipart"
"os"
"path/filepath"
"time"
"github.com/google/uuid"
"github.com/redis/go-redis/v9"
"go.uber.org/zap"
)
// ChunkUploadInfo représente les informations sur un upload par chunks
// MIGRATION UUID: UserID migré vers uuid.UUID
type ChunkUploadInfo struct {
UploadID string `json:"upload_id"`
UserID uuid.UUID `json:"user_id"`
TotalChunks int `json:"total_chunks"`
TotalSize int64 `json:"total_size"`
Filename string `json:"filename"`
Chunks map[int]ChunkInfo `json:"chunks"` // chunk_number -> ChunkInfo
ReceivedMD5 string `json:"received_md5,omitempty"` // SHA256 du fichier final (legacy key)
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
}
// ChunkInfo représente les informations sur un chunk
type ChunkInfo struct {
ChunkNumber int `json:"chunk_number"`
Size int64 `json:"size"`
MD5 string `json:"md5"` // SHA256 checksum (64 hex chars, legacy key)
FilePath string `json:"file_path"`
Received bool `json:"received"`
}
// UploadState représente l'état d'un upload pour la reprise
// MIGRATION UUID: UserID migré vers uuid.UUID
type UploadState struct {
UploadID string `json:"upload_id"`
UserID uuid.UUID `json:"user_id"`
TotalChunks int `json:"total_chunks"`
TotalSize int64 `json:"total_size"`
Filename string `json:"filename"`
ChunksReceived []int `json:"chunks_received"` // Liste des numéros de chunks reçus
LastChunk int `json:"last_chunk"` // Dernier chunk reçu (0 si aucun)
ReceivedCount int `json:"received_count"` // Nombre de chunks reçus
Progress int `json:"progress"` // Pourcentage de progression (0-100)
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
}
// TrackChunkService gère l'upload par chunks de fichiers audio
type TrackChunkService struct {
chunksDir string
store UploadStateStore
logger *zap.Logger
cleanupInterval time.Duration
maxUploadAge time.Duration
}
// NewTrackChunkService crée un nouveau service de gestion d'upload par chunks
// MIGRATION: Ajout de Redis Client pour le store
func NewTrackChunkService(chunksDir string, redisClient *redis.Client, logger *zap.Logger) *TrackChunkService {
if chunksDir == "" {
chunksDir = "uploads/tracks/chunks"
}
if logger == nil {
logger = zap.NewNop()
}
// 24h retention for uploads
store := NewRedisUploadStore(redisClient, 24*time.Hour)
service := &TrackChunkService{
chunksDir: chunksDir,
store: store,
logger: logger,
cleanupInterval: 1 * time.Hour,
maxUploadAge: 24 * time.Hour,
}
// Créer le répertoire de chunks
if err := os.MkdirAll(chunksDir, 0755); err != nil {
logger.Warn("Failed to create chunks directory", zap.Error(err))
}
// Démarrer le nettoyages des FICHIERS orphelins (Garbage Collector)
go service.startDiskCleanup()
return service
}
// InitiateChunkedUpload initialise un nouvel upload par chunks
func (s *TrackChunkService) InitiateChunkedUpload(userID uuid.UUID, totalChunks int, totalSize int64, filename string) (string, error) {
uploadID := uuid.New().String()
uploadInfo := &ChunkUploadInfo{
UploadID: uploadID,
UserID: userID,
TotalChunks: totalChunks,
TotalSize: totalSize,
Filename: filename,
Chunks: make(map[int]ChunkInfo),
CreatedAt: time.Now(),
UpdatedAt: time.Now(),
}
// Save to Redis
ctx := context.Background()
if err := s.store.SetState(ctx, uploadInfo); err != nil {
return "", fmt.Errorf("failed to initiate upload: %w", err)
}
s.logger.Info("Chunked upload initiated",
zap.String("upload_id", uploadID),
zap.String("user_id", userID.String()),
zap.Int("total_chunks", totalChunks),
zap.Int64("total_size", totalSize),
)
return uploadID, nil
}
// SaveChunk sauvegarde un chunk reçu
func (s *TrackChunkService) SaveChunk(ctx context.Context, uploadID string, chunkNumber int, totalChunks int, fileHeader *multipart.FileHeader) error {
// 1. Get State from Redis
uploadInfo, err := s.store.GetState(ctx, uploadID)
if err != nil {
return err
}
// Use mutex within memory object is Useless in distributed system,
// BUT since we just fetched it and will write it back, we rely on Redis being fast.
// Optimistic locking (WATCH) would be better but simple GET/SET is acceptable for P0 fix
// assuming low contention per user/upload.
// Vérifier que le chunk n'a pas déjà été reçu
if chunk, exists := uploadInfo.Chunks[chunkNumber]; exists && chunk.Received {
return fmt.Errorf("chunk %d already received", chunkNumber)
}
// Vérifier les paramètres
if uploadInfo.TotalChunks != totalChunks {
return fmt.Errorf("total chunks mismatch: expected %d, got %d", uploadInfo.TotalChunks, totalChunks)
}
// Créer le répertoire pour cet upload
uploadDir := filepath.Join(s.chunksDir, uploadID)
if err := os.MkdirAll(uploadDir, 0755); err != nil {
return fmt.Errorf("failed to create upload directory: %w", err)
}
// Sauvegarder le chunk
chunkPath := filepath.Join(uploadDir, fmt.Sprintf("chunk_%d", chunkNumber))
file, err := fileHeader.Open()
if err != nil {
return fmt.Errorf("failed to open chunk file: %w", err)
}
defer file.Close()
// Créer le fichier de destination
destFile, err := os.Create(chunkPath)
if err != nil {
return fmt.Errorf("failed to create chunk file: %w", err)
}
defer destFile.Close()
// Calculer le SHA256 pendant la copie (SEC-007: MD5 remplacé par SHA256)
hash := sha256.New()
multiWriter := io.MultiWriter(destFile, hash)
if _, err := io.Copy(multiWriter, file); err != nil {
os.Remove(chunkPath)
return fmt.Errorf("failed to save chunk: %w", err)
}
chunkChecksum := hex.EncodeToString(hash.Sum(nil))
// Enregistrer les informations du chunk
uploadInfo.Chunks[chunkNumber] = ChunkInfo{
ChunkNumber: chunkNumber,
Size: fileHeader.Size,
MD5: chunkChecksum,
FilePath: chunkPath,
Received: true,
}
uploadInfo.UpdatedAt = time.Now()
// Update State in Redis
if err := s.store.SetState(ctx, uploadInfo); err != nil {
return fmt.Errorf("failed to update upload state: %w", err)
}
s.logger.Info("Chunk saved",
zap.String("upload_id", uploadID),
zap.Int("chunk_number", chunkNumber),
zap.Int64("size", fileHeader.Size),
zap.String("checksum", chunkChecksum),
)
return nil
}
// GetUploadInfo récupère les informations d'un upload
func (s *TrackChunkService) GetUploadInfo(uploadID string) (*ChunkUploadInfo, error) {
return s.store.GetState(context.Background(), uploadID)
}
// CompleteChunkedUpload assemble tous les chunks et écrit le fichier final
// sur le système de fichiers local. Conserve la sémantique historique :
// chunks → fichier assemblé sur disque → caller (handler) déplace ensuite
// vers S3 si configuré (cf. MigrateLocalToS3IfConfigured).
//
// v1.0.9 item 1.5 — pour le path S3 natif (sans intermédiaire local),
// utiliser StreamChunkedUpload qui pipe directement les chunks vers
// io.Writer (typiquement le côté écriture d'un io.Pipe relié à
// s3Service.UploadStream).
func (s *TrackChunkService) CompleteChunkedUpload(ctx context.Context, uploadID string, finalPath string) (string, int64, string, error) {
if err := os.MkdirAll(filepath.Dir(finalPath), 0755); err != nil {
return "", 0, "", fmt.Errorf("failed to create destination directory: %w", err)
}
finalFile, err := os.Create(finalPath)
if err != nil {
return "", 0, "", fmt.Errorf("failed to create final file: %w", err)
}
filename, totalSize, checksum, streamErr := s.StreamChunkedUpload(ctx, uploadID, finalFile)
closeErr := finalFile.Close()
if streamErr != nil {
os.Remove(finalPath)
return "", 0, "", streamErr
}
if closeErr != nil {
os.Remove(finalPath)
return "", 0, "", fmt.Errorf("failed to close final file: %w", closeErr)
}
s.logger.Info("Chunked upload assembled to local file",
zap.String("upload_id", uploadID),
zap.String("final_path", finalPath),
zap.Int64("total_size", totalSize),
zap.String("checksum", checksum),
)
return filename, totalSize, checksum, nil
}
// StreamChunkedUpload streame les chunks d'un upload en cours vers `dst`
// dans l'ordre, calcule le checksum SHA-256, vérifie la taille totale,
// puis nettoie les chunks locaux + l'état Redis sur succès.
//
// v1.0.9 item 1.5 — extrait de CompleteChunkedUpload pour permettre au
// path S3 natif de streamer directement chunks → S3 multipart sans
// jamais matérialiser le fichier assemblé sur le disque local. Utilisé
// par TrackHandler.CompleteChunkedUpload quand TRACK_STORAGE_BACKEND=s3 :
// le caller branche un io.Pipe entre cette méthode (côté write) et
// S3StorageService.UploadStream (côté read), de sorte que le multipart
// uploader (manager.Uploader, 10 MB parts × 3 goroutines) consomme la
// pipe en streaming.
//
// `dst` n'est jamais fermé par cette fonction — c'est la responsabilité
// du caller (typiquement via PipeWriter.Close() ou file.Close()).
//
// Sur erreur, l'état Redis et les chunks locaux sont préservés pour
// permettre une reprise (cf. GetUploadState). Sur succès, ils sont
// supprimés.
func (s *TrackChunkService) StreamChunkedUpload(ctx context.Context, uploadID string, dst io.Writer) (string, int64, string, error) {
uploadInfo, err := s.store.GetState(ctx, uploadID)
if err != nil {
return "", 0, "", err
}
if len(uploadInfo.Chunks) != uploadInfo.TotalChunks {
return "", 0, "", fmt.Errorf("missing chunks: received %d/%d", len(uploadInfo.Chunks), uploadInfo.TotalChunks)
}
for i := 1; i <= uploadInfo.TotalChunks; i++ {
chunk, exists := uploadInfo.Chunks[i]
if !exists || !chunk.Received {
return "", 0, "", fmt.Errorf("chunk %d is missing", i)
}
}
hash := sha256.New()
multiWriter := io.MultiWriter(dst, hash)
var totalSize int64
for i := 1; i <= uploadInfo.TotalChunks; i++ {
chunk := uploadInfo.Chunks[i]
chunkFile, err := os.Open(chunk.FilePath)
if err != nil {
return "", 0, "", fmt.Errorf("failed to open chunk %d: %w", i, err)
}
size, copyErr := io.Copy(multiWriter, chunkFile)
_ = chunkFile.Close()
if copyErr != nil {
return "", 0, "", fmt.Errorf("failed to write chunk %d: %w", i, copyErr)
}
totalSize += size
}
if totalSize != uploadInfo.TotalSize {
return "", 0, "", fmt.Errorf("size mismatch: expected %d, got %d", uploadInfo.TotalSize, totalSize)
}
finalChecksum := hex.EncodeToString(hash.Sum(nil))
uploadDir := filepath.Join(s.chunksDir, uploadID)
if err := os.RemoveAll(uploadDir); err != nil {
s.logger.Warn("Failed to cleanup chunks", zap.String("upload_id", uploadID), zap.Error(err))
}
if err := s.store.DeleteState(ctx, uploadID); err != nil {
s.logger.Warn("Failed to delete state from Redis", zap.Error(err))
}
return uploadInfo.Filename, totalSize, finalChecksum, nil
}
// GetUploadState récupère l'état d'un upload pour permettre la reprise
func (s *TrackChunkService) GetUploadState(uploadID string) (*UploadState, error) {
uploadInfo, err := s.store.GetState(context.Background(), uploadID)
if err != nil {
return nil, err
}
// Compter les chunks reçus et déterminer le dernier
chunksReceived := make([]int, 0, len(uploadInfo.Chunks))
lastChunk := 0
receivedCount := 0
for chunkNum, chunk := range uploadInfo.Chunks {
if chunk.Received {
chunksReceived = append(chunksReceived, chunkNum)
if chunkNum > lastChunk {
lastChunk = chunkNum
}
receivedCount++
}
}
progress := 0
if uploadInfo.TotalChunks > 0 {
progress = (receivedCount * 100) / uploadInfo.TotalChunks
}
return &UploadState{
UploadID: uploadInfo.UploadID,
UserID: uploadInfo.UserID,
TotalChunks: uploadInfo.TotalChunks,
TotalSize: uploadInfo.TotalSize,
Filename: uploadInfo.Filename,
ChunksReceived: chunksReceived,
LastChunk: lastChunk,
ReceivedCount: receivedCount,
Progress: progress,
CreatedAt: uploadInfo.CreatedAt,
UpdatedAt: uploadInfo.UpdatedAt,
}, nil
}
// GetUploadProgress retourne la progression d'un upload par chunks
func (s *TrackChunkService) GetUploadProgress(uploadID string) (int, int, error) {
uploadInfo, err := s.store.GetState(context.Background(), uploadID)
if err != nil {
return 0, 0, err
}
receivedChunks := 0
for _, chunk := range uploadInfo.Chunks {
if chunk.Received {
receivedChunks++
}
}
progress := (receivedChunks * 100) / uploadInfo.TotalChunks
return receivedChunks, progress, nil
}
// CleanupUpload supprime un upload et ses chunks
func (s *TrackChunkService) CleanupUpload(uploadID string) error {
// Clean from Redis
_ = s.store.DeleteState(context.Background(), uploadID) // Ignore error if already deleted
// Clean from Disk
uploadDir := filepath.Join(s.chunksDir, uploadID)
if err := os.RemoveAll(uploadDir); err != nil {
return fmt.Errorf("failed to cleanup chunks: %w", err)
}
s.logger.Info("Upload cleaned up", zap.String("upload_id", uploadID))
return nil
}
// startDiskCleanup démarre le nettoyage périodique des FICHIERS orphelins (Garbage Collector)
func (s *TrackChunkService) startDiskCleanup() {
ticker := time.NewTicker(s.cleanupInterval)
defer ticker.Stop()
for range ticker.C {
s.CleanupOrphanedChunks(context.Background())
}
}
// CleanupOrphanedChunks scan le disque et supprime les dossiers qui n'ont pas bougé depuis maxUploadAge
// Ceci agit comme un Garbage Collector pour les fichiers orphelins
func (s *TrackChunkService) CleanupOrphanedChunks(ctx context.Context) {
s.logger.Debug("Starting orphaned chunks cleanup")
entries, err := os.ReadDir(s.chunksDir)
if err != nil {
s.logger.Error("Failed to read chunks directory", zap.Error(err))
return
}
now := time.Now()
deletedCount := 0
for _, entry := range entries {
if !entry.IsDir() {
continue
}
info, err := entry.Info()
if err != nil {
continue
}
// Si le dossier est plus vieux que maxUploadAge
if now.Sub(info.ModTime()) > s.maxUploadAge {
// On vérifie s'il existe dans Redis (au cas où Redis a été flushé mais pas les fichiers, ou TTL mismatch)
// Si Redis n'a plus l'info, on considère que c'est orphelin
uploadID := entry.Name()
_, err := s.store.GetState(ctx, uploadID)
if err != nil {
// Upload not in Redis (or error), assume safe to delete if older than 24h
path := filepath.Join(s.chunksDir, uploadID)
if err := os.RemoveAll(path); err != nil {
s.logger.Warn("Failed to delete orphaned chunk folder", zap.String("path", path), zap.Error(err))
} else {
deletedCount++
s.logger.Info("Deleted orphaned upload folder", zap.String("upload_id", uploadID))
}
}
}
}
if deletedCount > 0 {
s.logger.Info("Cleanup completed", zap.Int("deleted_count", deletedCount))
}
}