Prepares the S3StorageService surface for the MinIO upload migration: - UploadStream(ctx, io.Reader, key, contentType, size) — streams bytes via the existing manager.Uploader (multipart, 10MB parts, 3 goroutines) without buffering the whole body in memory. Tracks can be up to 500MB; UploadFile([]byte) would OOM at that size. - GetSignedURL(ctx, key, ttl) — presigned URL with per-call TTL, decoupling from the service-level urlExpiry. Phase 2 needs 15min (StreamTrack), 30min (DownloadTrack), 1h (transcoder). GetPresignedURL remains as thin back-compat wrapper using the default TTL. No change in behavior for existing callers (CloudService, WaveformService, GearDocumentService, CloudBackupWorker). TrackService will consume these new methods in Phase 1. Refs: plan Batch A step A1, AUDIT_REPORT §10 v1.0.8 deferrals. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
345 lines
10 KiB
Go
345 lines
10 KiB
Go
package services
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"fmt"
|
|
"io"
|
|
"time"
|
|
|
|
"github.com/aws/aws-sdk-go-v2/aws"
|
|
"github.com/aws/aws-sdk-go-v2/config"
|
|
"github.com/aws/aws-sdk-go-v2/credentials"
|
|
"github.com/aws/aws-sdk-go-v2/feature/s3/manager"
|
|
"github.com/aws/aws-sdk-go-v2/service/s3"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
// S3StorageService implémente l'interface S3Service pour le stockage S3-compatible
|
|
type S3StorageService struct {
|
|
client *s3.Client
|
|
uploader *manager.Uploader
|
|
bucket string
|
|
region string
|
|
endpoint string // Pour MinIO ou autres services S3-compatibles
|
|
logger *zap.Logger
|
|
urlExpiry time.Duration // Durée de validité des URLs présignées
|
|
}
|
|
|
|
// S3Config contient la configuration pour le service S3
|
|
type S3Config struct {
|
|
Bucket string
|
|
Region string
|
|
Endpoint string // Optionnel, pour MinIO ou autres services S3-compatibles
|
|
AccessKey string
|
|
SecretKey string
|
|
URLExpiry time.Duration // Durée de validité des URLs présignées (par défaut 1h)
|
|
Logger *zap.Logger
|
|
}
|
|
|
|
// NewS3StorageService crée un nouveau service de stockage S3
|
|
func NewS3StorageService(cfg S3Config) (*S3StorageService, error) {
|
|
if cfg.Bucket == "" {
|
|
return nil, fmt.Errorf("S3 bucket name is required")
|
|
}
|
|
if cfg.Region == "" {
|
|
cfg.Region = "us-east-1" // Par défaut
|
|
}
|
|
if cfg.URLExpiry == 0 {
|
|
cfg.URLExpiry = time.Hour // Par défaut 1 heure
|
|
}
|
|
if cfg.Logger == nil {
|
|
cfg.Logger = zap.NewNop()
|
|
}
|
|
|
|
// Configuration AWS SDK
|
|
awsCfg, err := config.LoadDefaultConfig(context.Background(),
|
|
config.WithRegion(cfg.Region),
|
|
)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to load AWS config: %w", err)
|
|
}
|
|
|
|
// Si des credentials sont fournis explicitement, les utiliser
|
|
if cfg.AccessKey != "" && cfg.SecretKey != "" {
|
|
awsCfg.Credentials = credentials.NewStaticCredentialsProvider(cfg.AccessKey, cfg.SecretKey, "")
|
|
}
|
|
|
|
// Créer le client S3
|
|
s3Client := s3.NewFromConfig(awsCfg, func(o *s3.Options) {
|
|
// Si un endpoint personnalisé est fourni (pour MinIO, etc.), l'utiliser
|
|
if cfg.Endpoint != "" {
|
|
o.BaseEndpoint = aws.String(cfg.Endpoint)
|
|
// Pour MinIO et services S3-compatibles, désactiver le path-style
|
|
o.UsePathStyle = true
|
|
}
|
|
})
|
|
|
|
// Créer l'uploader avec gestionnaire de retry
|
|
uploader := manager.NewUploader(s3Client, func(u *manager.Uploader) {
|
|
u.PartSize = 10 * 1024 * 1024 // 10MB par partie
|
|
u.Concurrency = 3 // 3 uploads concurrents
|
|
})
|
|
|
|
service := &S3StorageService{
|
|
client: s3Client,
|
|
uploader: uploader,
|
|
bucket: cfg.Bucket,
|
|
region: cfg.Region,
|
|
endpoint: cfg.Endpoint,
|
|
logger: cfg.Logger,
|
|
urlExpiry: cfg.URLExpiry,
|
|
}
|
|
|
|
// Vérifier que le bucket existe et est accessible
|
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
|
defer cancel()
|
|
if err := service.verifyBucketAccess(ctx); err != nil {
|
|
return nil, fmt.Errorf("failed to verify bucket access: %w", err)
|
|
}
|
|
|
|
return service, nil
|
|
}
|
|
|
|
// verifyBucketAccess vérifie que le bucket est accessible
|
|
func (s *S3StorageService) verifyBucketAccess(ctx context.Context) error {
|
|
_, err := s.client.HeadBucket(ctx, &s3.HeadBucketInput{
|
|
Bucket: aws.String(s.bucket),
|
|
})
|
|
if err != nil {
|
|
// Logger un avertissement mais ne pas bloquer
|
|
// En production, on devrait créer le bucket ou vérifier qu'il existe
|
|
s.logger.Warn("Bucket does not exist or is not accessible, will attempt to use on first upload",
|
|
zap.String("bucket", s.bucket),
|
|
zap.Error(err),
|
|
)
|
|
// Ne pas retourner d'erreur pour permettre la création du bucket plus tard
|
|
// ou pour permettre l'utilisation avec des buckets créés dynamiquement
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// UploadFile upload un fichier vers S3 et retourne l'URL publique ou la clé
|
|
func (s *S3StorageService) UploadFile(ctx context.Context, data []byte, key string, contentType string) (string, error) {
|
|
if key == "" {
|
|
return "", fmt.Errorf("key cannot be empty")
|
|
}
|
|
if len(data) == 0 {
|
|
return "", fmt.Errorf("data cannot be empty")
|
|
}
|
|
|
|
// Déterminer le Content-Type si non fourni
|
|
if contentType == "" {
|
|
contentType = "application/octet-stream"
|
|
}
|
|
|
|
// Upload vers S3
|
|
_, err := s.uploader.Upload(ctx, &s3.PutObjectInput{
|
|
Bucket: aws.String(s.bucket),
|
|
Key: aws.String(key),
|
|
Body: bytes.NewReader(data),
|
|
ContentType: aws.String(contentType),
|
|
// Métadonnées optionnelles
|
|
Metadata: map[string]string{
|
|
"uploaded-at": time.Now().UTC().Format(time.RFC3339),
|
|
},
|
|
})
|
|
if err != nil {
|
|
s.logger.Error("Failed to upload file to S3",
|
|
zap.Error(err),
|
|
zap.String("key", key),
|
|
zap.String("bucket", s.bucket),
|
|
)
|
|
return "", fmt.Errorf("failed to upload file to S3: %w", err)
|
|
}
|
|
|
|
s.logger.Info("File uploaded successfully to S3",
|
|
zap.String("key", key),
|
|
zap.String("bucket", s.bucket),
|
|
zap.Int("size", len(data)),
|
|
)
|
|
|
|
// Retourner la clé (l'URL complète sera générée via GetPresignedURL si nécessaire)
|
|
return key, nil
|
|
}
|
|
|
|
// UploadStream upload un io.Reader vers S3 sans charger le contenu en mémoire.
|
|
// Préféré à UploadFile pour les gros objets (tracks audio jusqu'à 500MB) : le
|
|
// manager.Uploader (multipart, 10MB parts, 3 goroutines) streame en continu.
|
|
//
|
|
// `size` peut être -1 si la taille est inconnue d'avance — dans ce cas la SDK
|
|
// bufferise. Sinon, passe la taille exacte (p.ex. `fileHeader.Size`) pour que
|
|
// le Content-Length soit correct et que le client voie une barre de progrès.
|
|
// v1.0.8 Phase 1 — cf. /home/senke/.claude/plans/audit-fonctionnel-wild-hickey.md
|
|
func (s *S3StorageService) UploadStream(ctx context.Context, r io.Reader, key, contentType string, size int64) (string, error) {
|
|
if key == "" {
|
|
return "", fmt.Errorf("key cannot be empty")
|
|
}
|
|
if r == nil {
|
|
return "", fmt.Errorf("reader cannot be nil")
|
|
}
|
|
if contentType == "" {
|
|
contentType = "application/octet-stream"
|
|
}
|
|
|
|
input := &s3.PutObjectInput{
|
|
Bucket: aws.String(s.bucket),
|
|
Key: aws.String(key),
|
|
Body: r,
|
|
ContentType: aws.String(contentType),
|
|
Metadata: map[string]string{
|
|
"uploaded-at": time.Now().UTC().Format(time.RFC3339),
|
|
},
|
|
}
|
|
if size > 0 {
|
|
input.ContentLength = aws.Int64(size)
|
|
}
|
|
|
|
_, err := s.uploader.Upload(ctx, input)
|
|
if err != nil {
|
|
s.logger.Error("Failed to stream-upload to S3",
|
|
zap.Error(err),
|
|
zap.String("key", key),
|
|
zap.String("bucket", s.bucket),
|
|
zap.Int64("size", size),
|
|
)
|
|
return "", fmt.Errorf("failed to stream-upload to S3: %w", err)
|
|
}
|
|
|
|
s.logger.Info("Stream uploaded successfully to S3",
|
|
zap.String("key", key),
|
|
zap.String("bucket", s.bucket),
|
|
zap.Int64("size", size),
|
|
)
|
|
|
|
return key, nil
|
|
}
|
|
|
|
// DeleteFile supprime un fichier de S3
|
|
func (s *S3StorageService) DeleteFile(ctx context.Context, key string) error {
|
|
if key == "" {
|
|
return fmt.Errorf("key cannot be empty")
|
|
}
|
|
|
|
_, err := s.client.DeleteObject(ctx, &s3.DeleteObjectInput{
|
|
Bucket: aws.String(s.bucket),
|
|
Key: aws.String(key),
|
|
})
|
|
if err != nil {
|
|
s.logger.Error("Failed to delete file from S3",
|
|
zap.Error(err),
|
|
zap.String("key", key),
|
|
zap.String("bucket", s.bucket),
|
|
)
|
|
return fmt.Errorf("failed to delete file from S3: %w", err)
|
|
}
|
|
|
|
s.logger.Info("File deleted successfully from S3",
|
|
zap.String("key", key),
|
|
zap.String("bucket", s.bucket),
|
|
)
|
|
|
|
return nil
|
|
}
|
|
|
|
// GetPresignedURL génère une URL présignée pour télécharger un fichier.
|
|
// Utilise la TTL configurée sur le service (urlExpiry, défaut 1h).
|
|
// Pour une TTL explicite, utiliser GetSignedURL.
|
|
func (s *S3StorageService) GetPresignedURL(ctx context.Context, key string) (string, error) {
|
|
return s.GetSignedURL(ctx, key, s.urlExpiry)
|
|
}
|
|
|
|
// GetSignedURL génère une URL présignée GET avec une TTL explicite.
|
|
// v1.0.8 Phase 2 : StreamTrack utilise 15min, DownloadTrack 30min,
|
|
// ffmpeg transcoder 1h. Préféré à GetPresignedURL qui impose urlExpiry global.
|
|
func (s *S3StorageService) GetSignedURL(ctx context.Context, key string, ttl time.Duration) (string, error) {
|
|
if key == "" {
|
|
return "", fmt.Errorf("key cannot be empty")
|
|
}
|
|
if ttl <= 0 {
|
|
ttl = s.urlExpiry
|
|
}
|
|
|
|
presignClient := s3.NewPresignClient(s.client)
|
|
|
|
request, err := presignClient.PresignGetObject(ctx, &s3.GetObjectInput{
|
|
Bucket: aws.String(s.bucket),
|
|
Key: aws.String(key),
|
|
}, func(opts *s3.PresignOptions) {
|
|
opts.Expires = ttl
|
|
})
|
|
if err != nil {
|
|
s.logger.Error("Failed to generate signed URL",
|
|
zap.Error(err),
|
|
zap.String("key", key),
|
|
zap.String("bucket", s.bucket),
|
|
zap.Duration("ttl", ttl),
|
|
)
|
|
return "", fmt.Errorf("failed to generate signed URL: %w", err)
|
|
}
|
|
|
|
return request.URL, nil
|
|
}
|
|
|
|
// GetPublicURL génère une URL publique (si le bucket est public)
|
|
func (s *S3StorageService) GetPublicURL(key string) string {
|
|
if s.endpoint != "" {
|
|
// Pour MinIO ou services S3-compatibles avec endpoint personnalisé
|
|
return fmt.Sprintf("%s/%s/%s", s.endpoint, s.bucket, key)
|
|
}
|
|
// Pour AWS S3 standard
|
|
return fmt.Sprintf("https://%s.s3.%s.amazonaws.com/%s", s.bucket, s.region, key)
|
|
}
|
|
|
|
// DownloadFile télécharge un fichier depuis S3 et retourne son contenu
|
|
func (s *S3StorageService) DownloadFile(ctx context.Context, key string) ([]byte, error) {
|
|
if key == "" {
|
|
return nil, fmt.Errorf("key cannot be empty")
|
|
}
|
|
|
|
result, err := s.client.GetObject(ctx, &s3.GetObjectInput{
|
|
Bucket: aws.String(s.bucket),
|
|
Key: aws.String(key),
|
|
})
|
|
if err != nil {
|
|
s.logger.Error("Failed to download file from S3",
|
|
zap.Error(err),
|
|
zap.String("key", key),
|
|
zap.String("bucket", s.bucket),
|
|
)
|
|
return nil, fmt.Errorf("failed to download file from S3: %w", err)
|
|
}
|
|
defer result.Body.Close()
|
|
|
|
data, err := io.ReadAll(result.Body)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to read S3 object body: %w", err)
|
|
}
|
|
|
|
return data, nil
|
|
}
|
|
|
|
// ListFiles liste les fichiers dans un préfixe donné
|
|
func (s *S3StorageService) ListFiles(ctx context.Context, prefix string) ([]string, error) {
|
|
var keys []string
|
|
|
|
paginator := s3.NewListObjectsV2Paginator(s.client, &s3.ListObjectsV2Input{
|
|
Bucket: aws.String(s.bucket),
|
|
Prefix: aws.String(prefix),
|
|
})
|
|
|
|
for paginator.HasMorePages() {
|
|
output, err := paginator.NextPage(ctx)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to list objects: %w", err)
|
|
}
|
|
|
|
for _, obj := range output.Contents {
|
|
if obj.Key != nil {
|
|
keys = append(keys, *obj.Key)
|
|
}
|
|
}
|
|
}
|
|
|
|
return keys, nil
|
|
}
|