Compare commits

...

6 commits

Author SHA1 Message Date
senke
e3bf2d2aea feat(tools): add cmd/migrate_storage CLI for bulk local→s3 migration (v1.0.8 P3)
Some checks failed
Veza CI / Backend (Go) (push) Failing after 0s
Veza CI / Frontend (Web) (push) Failing after 0s
Veza CI / Rust (Stream Server) (push) Failing after 0s
Security Scan / Secret Scanning (gitleaks) (push) Failing after 0s
Veza CI / Notify on failure (push) Failing after 0s
Closes MinIO Phase 3: ops path for migrating existing tracks.

Usage:
  export DATABASE_URL=... AWS_S3_BUCKET=... AWS_S3_ENDPOINT=... ...
  migrate_storage --dry-run --limit=10         # plan a batch
  migrate_storage --batch-size=50 --limit=500  # migrate first 500
  migrate_storage --delete-local=true          # also rm local files

Design:
- Idempotent: WHERE storage_backend='local' + per-row DB update means
  a crashed run resumes cleanly without duplicating uploads.
- Streaming upload via S3StorageService.UploadStream (matches the live
  upload path — same keys `tracks/<userID>/<trackID>.<ext>`, same MIME
  resolution).
- Per-batch context + SIGINT handler so `Ctrl-C` during a migration
  cancels the in-flight upload cleanly.
- Global `--timeout-min=30` safety cap.
- `--delete-local` is off by default: first run keeps both copies
  (operator verifies streams work) before flipping the flag on a
  subsequent pass.
- Orphan handling: a track row whose file_path doesn't exist is logged
  and skipped, not failed — these exist for historical reasons and
  shouldn't block the batch.

Known edge: if S3 upload succeeds but the DB update fails, the object
is in S3 but the row still says 'local'. Log message spells out the
reconcile query. v1.0.9 could add a verification pass.

Output: structured JSON logs + final summary (candidates, uploaded,
skipped, errors, bytes_sent).

Refs: plan Batch A step A6, migration 985 schema (Phase 0, d03232c8).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-23 23:38:06 +02:00
senke
70f0fb1636 feat(transcode): read from S3 signed URL when track is s3-backed (v1.0.8 P2)
Closes the transcoder's read-side gap for Phase 2. HLS transcoding now
works for tracks uploaded under TRACK_STORAGE_BACKEND=s3 without
requiring the stream server pod to share a local volume.

Changes:

- internal/services/hls_transcode_service.go
  - New SignedURLProvider interface (minimal: GetSignedURL).
  - HLSTranscodeService gains optional s3Resolver + SetS3Resolver.
  - TranscodeTrack routed through new resolveSource helper — returns
    local FilePath for local tracks, a 1h-TTL signed URL for s3-backed
    rows. Missing resolver for an s3 track returns a clear error.
  - os.Stat check skipped for HTTP(S) sources (ffmpeg validates them).
  - transcodeBitrate takes `source` explicitly so URL propagation is
    obvious and ValidateExecPath is bypassed only for the known
    signed-URL shape.
  - isHTTPSource helper (http://, https:// prefix check).

- internal/workers/job_worker.go
  - JobWorker gains optional s3Resolver + SetS3Resolver.
  - processTranscodingJob skips the local-file stat when
    track.StorageBackend='s3', reads via signed URL instead.
  - Passes w.s3Resolver to NewHLSTranscodeService when non-nil.

- internal/config/config.go: DI wires S3StorageService into JobWorker
  after instantiation (nil-safe).

- internal/core/track/service.go (copyFileAsyncS3)
  - Re-enabled stream server trigger: generates a 1h-TTL signed URL
    for the fresh s3 key and passes it to streamService.StartProcessing.
    Rust-side ffmpeg consumes HTTPS URLs natively. Failure is logged
    but does not fail the upload (track will sit in Processing until
    a retry / reconcile).

- internal/core/track/track_upload_handler.go (CompleteChunkedUpload)
  - Reload track after S3 migration to pick up the new storage_key.
  - Compute transcodeSource = signed URL (s3 path) or finalPath (local).
  - Pass transcodeSource to both streamService.StartProcessing and
    jobEnqueuer.EnqueueTranscodingJob — dual-trigger preserved per
    plan D2 (consolidation deferred v1.0.9).

- internal/services/hls_transcode_service_test.go
  - TestHLSTranscodeService_TranscodeTrack_EmptyFilePath updated for
    the expanded error message ("empty FilePath" vs "file path is empty").

Known limitation (v1.0.9): HLS segment OUTPUT still writes to the
local outputDir; only the INPUT side is S3-aware. Multi-pod HLS serving
needs the worker to upload segments to MinIO post-transcode. Acceptable
for v1.0.8 target — single-pod staging supports both local + s3 tracks.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-23 23:34:51 +02:00
senke
282467ae14 feat(tracks): serve S3-backed tracks via signed URL redirect (v1.0.8 P2)
Closes the read-side gap for Phase 1 uploads. Tracks with
storage_backend='s3' now get a 302 redirect to a MinIO signed URL
from /stream and /download, letting the client fetch bytes directly
without the backend proxying. Range headers remain honored by MinIO.

Changes:

- internal/core/track/service.go
  - New method `TrackService.GetStorageURL(ctx, track, ttl)` returns
    (url, isS3, err). Empty + false for local-backed tracks (caller
    falls back to FS). Returns a presigned URL with caller-chosen TTL
    for s3-backed rows.
  - Defensive: storage_backend='s3' with nil storage_key returns
    (empty, false, nil) — treated as legacy/broken, falls back to FS
    rather than crashing the request.
  - Errors when row claims s3 but TrackService has no S3 wired
    (should be prevented by Config validation rule 11).

- internal/core/track/track_hls_handler.go
  - `StreamTrack`: tries GetStorageURL(ctx, track, 15*time.Minute)
    before opening the local file. On s3 hit → 302 redirect. TTL 15min
    fits a full track consumption with margin.
  - `DownloadTrack`: same pattern with 30min TTL (downloads can be
    slower on mobile; single-shot flow).
  - Both endpoints keep their existing permission checks (share token,
    public/owner, license) unchanged — redirect happens only after the
    request is authorized to see the track.

- internal/core/track/service_async_test.go
  - `TestGetStorageURL` covers 3 cases: local backend (no redirect),
    s3 backend with valid key (redirect + TTL forwarded), s3 backend
    with nil key (defensive fallback).

Out of scope Phase 2 remaining (A5): transcoder pulls from S3 via
signed URL, HLS segments written to MinIO.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-23 23:26:14 +02:00
senke
ac31a54405 feat(tracks): migrate chunked upload to S3 post-assembly (v1.0.8 P1)
After `CompleteChunkedUpload` lands the assembled file on local FS,
stream it to S3 and delete the local copy when TrackService is in
s3-backend mode. Symmetrical to copyFileAsyncS3 for regular uploads
(`f47141fe`), closing the Phase 1 write path.

Changes:

- internal/core/track/service.go
  - New method: `TrackService.MigrateLocalToS3IfConfigured(ctx, trackID,
    userID, localPath)`. Opens local file, streams to S3 at
    tracks/<userID>/<trackID>.<ext>, updates DB row
    (storage_backend='s3', storage_key=<key>), removes local file.
    No-op when storageBackend != 's3' or s3Service == nil.
  - New method: `TrackService.IsS3Backend() bool` — convenience for
    handlers that need to skip path-based transcode triggers when the
    file has been migrated off local FS.

- internal/core/track/track_upload_handler.go
  - `CompleteChunkedUpload`: after `CreateTrackFromPath` succeeds, call
    `MigrateLocalToS3IfConfigured` with a dedicated 10-min context
    (S3 stream of up to 500MB can outlive the HTTP request ctx).
  - Migration failure is logged but does NOT fail the HTTP response —
    the track row exists locally; admin can re-migrate via
    cmd/migrate_storage (Phase 3).
  - When `IsS3Backend()`, skip the two path-based transcode triggers
    (streamService.StartProcessing + jobEnqueuer.EnqueueTranscodingJob).
    Phase 2 will re-wire them against signed URLs. For now, tracks
    routed to S3 sit in Processing status until Phase 2 lands — same
    trade-off as copyFileAsyncS3.

Out of scope (Phase 2 wires these): read path for S3-backed tracks,
transcoder reading from signed URL, HLS segments to MinIO.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-23 23:23:24 +02:00
senke
f47141fe62 feat(tracks): wire S3 storage backend into TrackService.UploadTrack (v1.0.8 P1)
Splits copyFileAsync into local vs s3 branches gated by the
TRACK_STORAGE_BACKEND flag (added in P0 d03232c8). Regular uploads
via TrackService.UploadTrack() now write to MinIO/S3 when the flag
is 's3' and a non-nil S3 service is configured, persisting the S3
object key + storage_backend='s3' on the track row atomically.

Changes:

- internal/core/track/service.go
  - New S3StorageInterface (UploadStream + GetSignedURL + DeleteFile).
    Narrow surface for testability; *services.S3StorageService satisfies.
  - TrackService gains s3Service + storageBackend + s3Bucket fields
    and a SetS3Storage setter.
  - copyFileAsync is now a dispatcher; former body moved to
    copyFileAsyncLocal, new copyFileAsyncS3 streams to S3 with key
    tracks/<userID>/<trackID>.<ext>.
  - mimeTypeForAudioExt helper.
  - Stream server trigger deliberately skipped on S3 branch; wired
    in Phase 2 with S3 read support.

- internal/api/routes_tracks.go: DI passes S3StorageService,
  TrackStorageBackend, S3Bucket into TrackService.

- internal/core/track/service_async_test.go:
  - fakeS3Storage stub (captures UploadStream payload).
  - TestUploadTrack_S3Backend_UploadsToS3: end-to-end on key format,
    content-type, DB row state.
  - TestUploadTrack_S3Backend_NilS3Service_FallsBackToLocal:
    defensive — backend='s3' + nil service must not panic.

Out of scope Phase 1: read path, transcoder. Enabling
TRACK_STORAGE_BACKEND=s3 in prod BEFORE Phase 2 ships makes S3-backed
tracks un-streamable. Keep flag 'local' until A4/A5 land.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-23 23:20:17 +02:00
senke
3d43d43075 feat(s3): add UploadStream + GetSignedURL with explicit TTL (v1.0.8 P1 prep)
Prepares the S3StorageService surface for the MinIO upload migration:

- UploadStream(ctx, io.Reader, key, contentType, size) — streams bytes
  via the existing manager.Uploader (multipart, 10MB parts, 3 goroutines)
  without buffering the whole body in memory. Tracks can be up to 500MB;
  UploadFile([]byte) would OOM at that size.

- GetSignedURL(ctx, key, ttl) — presigned URL with per-call TTL, decoupling
  from the service-level urlExpiry. Phase 2 needs 15min (StreamTrack),
  30min (DownloadTrack), 1h (transcoder). GetPresignedURL remains as
  thin back-compat wrapper using the default TTL.

No change in behavior for existing callers (CloudService, WaveformService,
GearDocumentService, CloudBackupWorker). TrackService will consume these
new methods in Phase 1.

Refs: plan Batch A step A1, AUDIT_REPORT §10 v1.0.8 deferrals.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-23 20:49:19 +02:00
11 changed files with 1013 additions and 44 deletions

View file

@ -0,0 +1,317 @@
// Command migrate_storage bulk-migrates tracks from local disk to MinIO/S3.
//
// Reads env (DATABASE_URL, AWS_S3_*) and iterates rows where
// `tracks.storage_backend='local'`, streaming each file to S3 at
// `tracks/<userID>/<trackID>.<ext>` and atomically updating the row to
// `storage_backend='s3' + storage_key=<key>`.
//
// v1.0.8 Phase 3. Usage:
//
// migrate_storage --dry-run
// migrate_storage --batch-size=50 --limit=500
// migrate_storage --delete-local=true # delete source file after successful upload
//
// Safe to re-run: each iteration filters on storage_backend='local', so rows
// already migrated are skipped. A crash mid-batch leaves partial progress
// committed per row (transactions scoped to one track).
package main
import (
"context"
"flag"
"fmt"
"os"
"os/signal"
"syscall"
"time"
"veza-backend-api/internal/database"
"veza-backend-api/internal/models"
"veza-backend-api/internal/services"
"github.com/joho/godotenv"
"go.uber.org/zap"
"gorm.io/gorm"
)
type flags struct {
dryRun bool
batchSize int
limit int
deleteLocal bool
timeoutMin int
}
func parseFlags() flags {
f := flags{}
flag.BoolVar(&f.dryRun, "dry-run", false, "Log candidates without uploading or updating DB")
flag.IntVar(&f.batchSize, "batch-size", 50, "Rows to SELECT per iteration")
flag.IntVar(&f.limit, "limit", 0, "Total max rows to migrate (0 = unlimited)")
flag.BoolVar(&f.deleteLocal, "delete-local", false, "Remove local file after successful S3 upload")
flag.IntVar(&f.timeoutMin, "timeout-min", 30, "Global timeout in minutes (safety stop)")
flag.Parse()
return f
}
func main() {
_ = godotenv.Load()
cfg := parseFlags()
logger, err := zap.NewProduction()
if err != nil {
fmt.Fprintf(os.Stderr, "failed to init logger: %v\n", err)
os.Exit(1)
}
defer logger.Sync()
logger.Info("migrate_storage starting",
zap.Bool("dry_run", cfg.dryRun),
zap.Int("batch_size", cfg.batchSize),
zap.Int("limit", cfg.limit),
zap.Bool("delete_local", cfg.deleteLocal),
)
// Top-level context with SIGINT + timeout so a stuck S3 call doesn't hang forever.
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(cfg.timeoutMin)*time.Minute)
defer cancel()
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-sigCh
logger.Warn("signal received, cancelling context")
cancel()
}()
// DB init — minimal config, just DATABASE_URL.
dbURL := os.Getenv("DATABASE_URL")
if dbURL == "" {
logger.Fatal("DATABASE_URL not set")
}
db, err := database.NewDatabase(&database.Config{
URL: dbURL,
MaxOpenConns: 5,
MaxIdleConns: 2,
MaxLifetime: 5 * time.Minute,
MaxIdleTime: 2 * time.Minute,
})
if err != nil {
logger.Fatal("failed to open database", zap.Error(err))
}
defer db.Close()
db.Logger = logger
// S3 init (always required — if it fails, the CLI has no reason to run).
s3Bucket := os.Getenv("AWS_S3_BUCKET")
if s3Bucket == "" {
logger.Fatal("AWS_S3_BUCKET not set (migration target)")
}
s3Service, err := services.NewS3StorageService(services.S3Config{
Bucket: s3Bucket,
Region: envOrDefault("AWS_REGION", "us-east-1"),
Endpoint: os.Getenv("AWS_S3_ENDPOINT"),
AccessKey: os.Getenv("AWS_ACCESS_KEY_ID"),
SecretKey: os.Getenv("AWS_SECRET_ACCESS_KEY"),
Logger: logger,
})
if err != nil {
logger.Fatal("failed to init S3", zap.Error(err))
}
stats := &runStats{}
runner := &migrator{
db: db.GormDB,
s3: s3Service,
logger: logger,
cfg: cfg,
stats: stats,
}
if err := runner.run(ctx); err != nil {
logger.Error("migration finished with errors", zap.Error(err))
stats.log(logger)
os.Exit(2)
}
stats.log(logger)
}
// runStats accumulates per-run counters for the final summary.
type runStats struct {
candidates int
uploaded int
skipped int
errors int
bytesSent int64
}
func (s *runStats) log(logger *zap.Logger) {
logger.Info("migrate_storage summary",
zap.Int("candidates", s.candidates),
zap.Int("uploaded", s.uploaded),
zap.Int("skipped", s.skipped),
zap.Int("errors", s.errors),
zap.Int64("bytes_sent", s.bytesSent),
)
}
type migrator struct {
db *gorm.DB
s3 *services.S3StorageService
logger *zap.Logger
cfg flags
stats *runStats
}
func (m *migrator) run(ctx context.Context) error {
seen := 0
for {
if ctx.Err() != nil {
return ctx.Err()
}
if m.cfg.limit > 0 && seen >= m.cfg.limit {
m.logger.Info("reached --limit, stopping", zap.Int("limit", m.cfg.limit))
return nil
}
// Pull a batch of local-backed tracks. Ordering by created_at
// oldest-first so long-lived legacy rows get processed first.
var tracks []models.Track
q := m.db.WithContext(ctx).
Where("storage_backend = ?", "local").
Where("file_path != ''").
Order("created_at ASC").
Limit(m.cfg.batchSize)
if err := q.Find(&tracks).Error; err != nil {
return fmt.Errorf("select batch: %w", err)
}
if len(tracks) == 0 {
m.logger.Info("no more local-backed tracks, done")
return nil
}
m.stats.candidates += len(tracks)
for i := range tracks {
if ctx.Err() != nil {
return ctx.Err()
}
if m.cfg.limit > 0 && seen >= m.cfg.limit {
return nil
}
m.migrateOne(ctx, &tracks[i])
seen++
}
}
}
func (m *migrator) migrateOne(ctx context.Context, t *models.Track) {
log := m.logger.With(
zap.String("track_id", t.ID.String()),
zap.String("user_id", t.UserID.String()),
zap.String("file_path", t.FilePath),
)
if m.cfg.dryRun {
log.Info("dry-run: would upload to S3")
m.stats.skipped++
return
}
file, err := os.Open(t.FilePath)
if err != nil {
if os.IsNotExist(err) {
log.Warn("local file missing, skipping (orphan row)")
m.stats.skipped++
return
}
log.Error("open local file failed", zap.Error(err))
m.stats.errors++
return
}
defer file.Close()
stat, err := file.Stat()
if err != nil {
log.Error("stat local file failed", zap.Error(err))
m.stats.errors++
return
}
ext := extOrDefault(t.FilePath, t.Format)
s3Key := fmt.Sprintf("tracks/%s/%s%s", t.UserID.String(), t.ID.String(), ext)
contentType := mimeFromExt(ext)
uploadCtx, cancel := context.WithTimeout(ctx, 10*time.Minute)
defer cancel()
if _, err := m.s3.UploadStream(uploadCtx, file, s3Key, contentType, stat.Size()); err != nil {
log.Error("S3 upload failed", zap.Error(err), zap.String("s3_key", s3Key))
m.stats.errors++
return
}
// Update DB row atomically.
if err := m.db.WithContext(ctx).Model(&models.Track{}).
Where("id = ?", t.ID).
Updates(map[string]interface{}{
"storage_backend": "s3",
"storage_key": s3Key,
}).Error; err != nil {
// Object uploaded but row stale — log loud. Admin can run a reconcile
// query to find this state: SELECT id FROM tracks WHERE storage_backend='local' AND EXISTS(s3 object for id).
log.Error("DB update failed after S3 upload (object leaked)",
zap.String("s3_key", s3Key),
zap.Error(err))
m.stats.errors++
return
}
m.stats.uploaded++
m.stats.bytesSent += stat.Size()
if m.cfg.deleteLocal {
if err := os.Remove(t.FilePath); err != nil {
log.Warn("local delete failed (file orphaned)", zap.Error(err))
}
}
log.Info("migrated",
zap.String("s3_key", s3Key),
zap.Int64("bytes", stat.Size()),
)
}
func envOrDefault(key, def string) string {
if v := os.Getenv(key); v != "" {
return v
}
return def
}
// extOrDefault returns the file extension (with dot) from path, falling back
// to ".<format>" when path has none.
func extOrDefault(path, format string) string {
for i := len(path) - 1; i >= 0 && path[i] != '/'; i-- {
if path[i] == '.' {
return path[i:]
}
}
if format != "" {
return "." + format
}
return ""
}
func mimeFromExt(ext string) string {
switch ext {
case ".mp3":
return "audio/mpeg"
case ".flac":
return "audio/flac"
case ".wav":
return "audio/wav"
case ".ogg":
return "audio/ogg"
case ".m4a", ".aac":
return "audio/aac"
default:
return "application/octet-stream"
}
}

View file

@ -28,6 +28,10 @@ func (r *APIRouter) setupTrackRoutes(router *gin.RouterGroup) {
trackService.SetStreamService(streamService) // INT-02: Enable HLS pipeline for regular uploads
discoverService := discovercore.NewService(r.db.GormDB, r.logger)
trackService.SetDiscoverService(discoverService) // v0.10.1: tags/genres sync
// v1.0.8 Phase 1: wire S3 storage when configured. s3Service will be nil
// if AWS_S3_ENABLED=false, which leaves TrackService in local-only mode
// regardless of TrackStorageBackend value.
trackService.SetS3Storage(r.config.S3StorageService, r.config.TrackStorageBackend, r.config.S3Bucket)
trackUploadService := services.NewTrackUploadService(r.db.GormDB, r.logger)
var redisClient *redis.Client
if r.config != nil {

View file

@ -770,6 +770,13 @@ func NewConfig() (*Config, error) {
// BE-SVC-003: Connect JobService to JobWorker
jobService.SetJobEnqueuer(config.JobWorker)
// v1.0.8 Phase 2 — wire S3 resolver so HLSTranscodeService can handle
// s3-backed tracks via signed URLs instead of local paths. Nil-safe:
// local-only deployments skip this entirely.
if config.S3StorageService != nil {
config.JobWorker.SetS3Resolver(config.S3StorageService)
}
// Logger la configuration avec masquage des secrets (T0037)
config.logConfigInitialized(logger)

View file

@ -58,9 +58,20 @@ type StreamServiceInterface interface {
StartProcessing(ctx context.Context, trackID uuid.UUID, filePath string) error
}
// S3StorageInterface defines the minimal S3 surface used by TrackService.
// v1.0.8 Phase 1 — narrow interface keeps the service testable without
// requiring real AWS credentials or a MinIO container in unit tests.
// *services.S3StorageService satisfies this interface.
type S3StorageInterface interface {
UploadStream(ctx context.Context, r io.Reader, key, contentType string, size int64) (string, error)
GetSignedURL(ctx context.Context, key string, ttl time.Duration) (string, error)
DeleteFile(ctx context.Context, key string) error
}
// TrackService gère les opérations sur les tracks
// BE-SVC-001: Add cache service for track metadata
// v0.943: Batch operations delegated to TrackBatchService
// v1.0.8: Optional S3 storage backend (TRACK_STORAGE_BACKEND=s3)
type TrackService struct {
db *gorm.DB // Write operations (and read fallback when readDB is nil)
readDB *gorm.DB // Optional read replica for read-only operations
@ -71,6 +82,14 @@ type TrackService struct {
streamService StreamServiceInterface // INT-02: Optional, triggers HLS transcoding after upload
batchService *TrackBatchService // v0.943: batch operations
discoverService *discover.Service // v0.10.1: tags/genres sync
// v1.0.8 Phase 1 — storage backend (local vs S3/MinIO)
// storageBackend is "local" (default) or "s3". When "s3", copyFileAsync
// writes to s3Service instead of the local filesystem.
// Both remain nil/zero-value when running without S3.
s3Service S3StorageInterface
storageBackend string
s3Bucket string // for logging/metrics only
}
// forRead returns the DB to use for read operations (read replica if configured, else primary)
@ -128,6 +147,50 @@ func (s *TrackService) SetDiscoverService(d *discover.Service) {
s.discoverService = d
}
// SetS3Storage wires the S3 storage backend (v1.0.8 Phase 1).
// backend is expected to be "local" or "s3" (validated by Config.ValidateForEnvironment).
// Passing svc=nil silently keeps the service in local-only mode regardless of backend.
func (s *TrackService) SetS3Storage(svc S3StorageInterface, backend, bucket string) {
s.s3Service = svc
s.storageBackend = backend
s.s3Bucket = bucket
}
// IsS3Backend returns true iff the service is configured to write new tracks
// to S3. Exposed for handlers that need to branch behavior after uploads
// (e.g., skip local-path-based stream server trigger).
// v1.0.8 Phase 1.
func (s *TrackService) IsS3Backend() bool {
return s.storageBackend == "s3" && s.s3Service != nil
}
// GetStorageURL returns a signed URL for a track's file when the track row
// carries storage_backend='s3'. Returns ("", false, nil) for local-backed
// tracks — the caller must fall back to filesystem serving.
//
// v1.0.8 Phase 2 — handlers (StreamTrack, DownloadTrack) use this to emit
// a 302 redirect to MinIO/S3 for tracks that were uploaded under s3 mode.
// TTL is caller-provided: 15min for streaming, 30min for downloads, 1h for
// the transcoder.
func (s *TrackService) GetStorageURL(ctx context.Context, track *models.Track, ttl time.Duration) (string, bool, error) {
if track == nil {
return "", false, fmt.Errorf("track is nil")
}
if track.StorageBackend != "s3" || track.StorageKey == nil || *track.StorageKey == "" {
return "", false, nil
}
if s.s3Service == nil {
// Row says s3 but no S3 service wired. Should be prevented by
// Config.ValidateForEnvironment rule 11, but guard here anyway.
return "", false, fmt.Errorf("track %s is s3-backed but TrackService has no S3 service configured", track.ID)
}
url, err := s.s3Service.GetSignedURL(ctx, *track.StorageKey, ttl)
if err != nil {
return "", false, fmt.Errorf("generate signed URL for track %s: %w", track.ID, err)
}
return url, true, nil
}
// ValidateTrackFile valide le format et la taille d'un fichier audio
func (s *TrackService) ValidateTrackFile(fileHeader *multipart.FileHeader) error {
// Valider la taille
@ -324,9 +387,21 @@ func (s *TrackService) UploadTrack(ctx context.Context, userID uuid.UUID, fileHe
return track, nil
}
// copyFileAsync copie le fichier de manière asynchrone et met à jour le Status du Track
// MOD-P2-008: Goroutine suivie avec context + cancellation + nettoyage en cas d'erreur
// copyFileAsync écrit le fichier uploadé sur le storage backend configuré
// (local ou s3) et met à jour le Status du Track.
// MOD-P2-008: Goroutine suivie avec context + cancellation + nettoyage en cas d'erreur.
// v1.0.8 Phase 1: dispatcher local/s3 selon TrackService.storageBackend.
func (s *TrackService) copyFileAsync(ctx context.Context, trackID uuid.UUID, fileHeader *multipart.FileHeader, filePath string, userID uuid.UUID) {
if s.storageBackend == "s3" && s.s3Service != nil {
s.copyFileAsyncS3(ctx, trackID, fileHeader, userID)
return
}
s.copyFileAsyncLocal(ctx, trackID, fileHeader, filePath, userID)
}
// copyFileAsyncLocal : comportement historique — écrit sur le FS local à filePath.
// Appelé par copyFileAsync quand storageBackend != "s3".
func (s *TrackService) copyFileAsyncLocal(ctx context.Context, trackID uuid.UUID, fileHeader *multipart.FileHeader, filePath string, userID uuid.UUID) {
// Créer un contexte avec timeout pour la copie (5 minutes max)
copyCtx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
@ -386,7 +461,11 @@ func (s *TrackService) copyFileAsync(ctx context.Context, trackID uuid.UUID, fil
// Copie réussie - mettre à jour le Status
s.updateTrackStatus(copyCtx, trackID, models.TrackStatusProcessing, "File uploaded, processing...")
// INT-02: Trigger HLS transcoding on stream server after successful upload
// INT-02: Trigger HLS transcoding on stream server after successful upload.
// v1.0.8 Phase 1 — dual-trigger (gRPC + RabbitMQ) noted in plan D2 is
// preserved as-is; the chunked upload path in track_upload_handler.go ALSO
// enqueues a RabbitMQ transcoding job, regular uploads only hit gRPC. To be
// consolidated in v1.0.9.
if s.streamService != nil {
if err := s.streamService.StartProcessing(copyCtx, trackID, filePath); err != nil {
s.logger.Warn("Failed to trigger stream server transcoding (track will remain in Processing)",
@ -410,6 +489,174 @@ func (s *TrackService) copyFileAsync(ctx context.Context, trackID uuid.UUID, fil
)
}
// copyFileAsyncS3 streame le fichier uploadé directement vers S3/MinIO et
// enregistre storage_backend=s3 + storage_key sur le Track. Pas de passage
// par le FS local.
//
// v1.0.8 Phase 1. Read path (StreamTrack/DownloadTrack) + transcoder restent
// à brancher en Phase 2 — un track uploadé en s3 AVANT Phase 2 sera
// stockable mais pas streamable tant que Phase 2 n'a pas atterri. Pour éviter
// cet état cassé en prod, ne pas flipper TRACK_STORAGE_BACKEND=s3 avant que
// toutes les phases P0 soient taguées (cf. plan batch A).
func (s *TrackService) copyFileAsyncS3(ctx context.Context, trackID uuid.UUID, fileHeader *multipart.FileHeader, userID uuid.UUID) {
copyCtx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
src, err := fileHeader.Open()
if err != nil {
s.logger.Error("S3 upload: failed to open source file", zap.Error(err))
s.updateTrackStatus(copyCtx, trackID, models.TrackStatusFailed, fmt.Sprintf("Failed to open uploaded file: %v", err))
return
}
defer src.Close()
ext := filepath.Ext(fileHeader.Filename)
// Clé déterministe, facile à migrer et à retrouver : tracks/<userID>/<trackID>.<ext>
s3Key := fmt.Sprintf("tracks/%s/%s%s", userID.String(), trackID.String(), ext)
contentType := mimeTypeForAudioExt(ext)
s.logger.Info("S3 upload starting",
zap.String("track_id", trackID.String()),
zap.String("s3_key", s3Key),
zap.String("s3_bucket", s.s3Bucket),
zap.Int64("size", fileHeader.Size),
)
if _, err := s.s3Service.UploadStream(copyCtx, src, s3Key, contentType, fileHeader.Size); err != nil {
s.logger.Error("S3 upload failed", zap.Error(err), zap.String("s3_key", s3Key))
s.updateTrackStatus(copyCtx, trackID, models.TrackStatusFailed, fmt.Sprintf("S3 upload failed: %v", err))
return
}
// DB: persist storage_backend + storage_key atomically with status transition.
if err := s.db.WithContext(copyCtx).Model(&models.Track{}).
Where("id = ?", trackID).
Updates(map[string]interface{}{
"storage_backend": "s3",
"storage_key": s3Key,
"status": models.TrackStatusProcessing,
"status_message": "Uploaded to S3, processing...",
}).Error; err != nil {
s.logger.Error("Failed to update track after S3 upload — object exists but DB row is stale",
zap.String("track_id", trackID.String()),
zap.String("s3_key", s3Key),
zap.Error(err),
)
// Don't return — the object is in S3 already. Admin can reconcile via
// storage_backend column and S3 list.
}
// v1.0.8 Phase 2 — trigger stream server transcoding with a signed URL
// so the Rust-side ffmpeg can pull the source directly from MinIO. TTL
// 1h is a soft cap : transcode usually completes in <10min.
if s.streamService != nil {
signedURL, err := s.s3Service.GetSignedURL(copyCtx, s3Key, time.Hour)
if err != nil {
s.logger.Warn("Failed to generate signed URL for stream trigger (track stays Processing until re-tried)",
zap.String("track_id", trackID.String()),
zap.String("s3_key", s3Key),
zap.Error(err),
)
} else if err := s.streamService.StartProcessing(copyCtx, trackID, signedURL); err != nil {
s.logger.Warn("Stream server transcoding trigger failed (track stays Processing until re-tried)",
zap.String("track_id", trackID.String()),
zap.Error(err),
)
} else {
s.logger.Info("Stream server transcoding triggered with S3 signed URL",
zap.String("track_id", trackID.String()),
zap.String("s3_key", s3Key),
)
}
}
s.logger.Info("Track streamed successfully to S3 (async)",
zap.String("track_id", trackID.String()),
zap.String("user_id", userID.String()),
zap.String("s3_key", s3Key),
zap.Int64("size", fileHeader.Size),
)
}
// MigrateLocalToS3IfConfigured upload un fichier déjà assemblé sur le FS local
// vers S3 si storageBackend="s3". No-op sinon.
//
// v1.0.8 Phase 1 — utilisé par le path chunked upload (après assemblage local
// + CreateTrackFromPath). En post-assemblage on a un gros fichier local qu'on
// stream vers S3 et on efface le local. Si quelque chose échoue, on log mais
// ne retourne pas d'erreur au handler : la track existe déjà (storage_backend
// reste "local"), l'admin peut réessayer via `cmd/migrate_storage` (Phase 3).
func (s *TrackService) MigrateLocalToS3IfConfigured(ctx context.Context, trackID, userID uuid.UUID, localPath string) error {
if s.storageBackend != "s3" || s.s3Service == nil {
return nil
}
file, err := os.Open(localPath)
if err != nil {
return fmt.Errorf("open assembled file: %w", err)
}
defer file.Close()
stat, err := file.Stat()
if err != nil {
return fmt.Errorf("stat assembled file: %w", err)
}
ext := filepath.Ext(localPath)
s3Key := fmt.Sprintf("tracks/%s/%s%s", userID.String(), trackID.String(), ext)
contentType := mimeTypeForAudioExt(ext)
if _, err := s.s3Service.UploadStream(ctx, file, s3Key, contentType, stat.Size()); err != nil {
return fmt.Errorf("s3 upload: %w", err)
}
if err := s.db.WithContext(ctx).Model(&models.Track{}).
Where("id = ?", trackID).
Updates(map[string]interface{}{
"storage_backend": "s3",
"storage_key": s3Key,
}).Error; err != nil {
// Object is in S3 but DB row still says local. Dangerous (reader will
// try local FS path). Caller should treat as fatal for this request.
return fmt.Errorf("s3 upload succeeded but DB update failed: %w", err)
}
if err := os.Remove(localPath); err != nil {
s.logger.Warn("S3 migration OK but local cleanup failed (dangling file)",
zap.String("track_id", trackID.String()),
zap.String("path", localPath),
zap.Error(err),
)
// Non-fatal — object in S3, DB correct, local file is orphaned debris.
}
s.logger.Info("Assembled chunk upload migrated to S3",
zap.String("track_id", trackID.String()),
zap.String("s3_key", s3Key),
zap.Int64("size", stat.Size()),
)
return nil
}
// mimeTypeForAudioExt retourne le Content-Type MIME pour une extension audio.
// Fallback sur application/octet-stream pour les extensions inconnues.
func mimeTypeForAudioExt(ext string) string {
switch strings.ToLower(ext) {
case ".mp3":
return "audio/mpeg"
case ".flac":
return "audio/flac"
case ".wav":
return "audio/wav"
case ".ogg":
return "audio/ogg"
case ".m4a", ".aac":
return "audio/aac"
default:
return "application/octet-stream"
}
}
// updateTrackStatus met à jour le Status et StatusMessage d'un Track
// MOD-P2-008: Helper pour mettre à jour le Status de manière thread-safe
func (s *TrackService) updateTrackStatus(ctx context.Context, trackID uuid.UUID, status models.TrackStatus, message string) {

View file

@ -3,6 +3,7 @@ package track
import (
"bytes"
"context"
"io"
"mime/multipart"
"os"
"path/filepath"
@ -247,3 +248,166 @@ func TestCopyFileAsync_ContextCancellation(t *testing.T) {
assert.Equal(t, models.TrackStatusProcessing, updatedTrack.Status)
assert.Contains(t, updatedTrack.StatusMessage, "File uploaded")
}
// ---------------------------------------------------------------------------
// v1.0.8 Phase 1 — S3 backend tests
// ---------------------------------------------------------------------------
// fakeS3Storage implements S3StorageInterface for unit tests without MinIO.
// Captures UploadStream calls so tests can assert key/content/size.
type fakeS3Storage struct {
uploadedKey string
uploadedContentType string
uploadedSize int64
uploadedBytes []byte
uploadErr error
}
func (f *fakeS3Storage) UploadStream(ctx context.Context, r io.Reader, key, contentType string, size int64) (string, error) {
if f.uploadErr != nil {
return "", f.uploadErr
}
data, err := io.ReadAll(r)
if err != nil {
return "", err
}
f.uploadedKey = key
f.uploadedContentType = contentType
f.uploadedSize = size
f.uploadedBytes = data
return key, nil
}
func (f *fakeS3Storage) GetSignedURL(ctx context.Context, key string, ttl time.Duration) (string, error) {
return "https://fake-s3/" + key + "?ttl=" + ttl.String(), nil
}
func (f *fakeS3Storage) DeleteFile(ctx context.Context, key string) error {
return nil
}
func TestUploadTrack_S3Backend_UploadsToS3(t *testing.T) {
db, err := gorm.Open(sqlite.Open(":memory:"), &gorm.Config{})
require.NoError(t, err)
require.NoError(t, db.AutoMigrate(&models.User{}))
require.NoError(t, db.AutoMigrate(&models.Track{}))
logger := zaptest.NewLogger(t)
uploadDir := t.TempDir()
userID := uuid.New()
user := &models.User{
ID: userID,
Username: "s3user",
Email: "s3@example.com",
IsActive: true,
}
require.NoError(t, db.Create(user).Error)
service := NewTrackService(db, logger, uploadDir)
fake := &fakeS3Storage{}
service.SetS3Storage(fake, "s3", "veza-test-bucket")
// MP3 ID3v2 content (magic bytes for passing ValidateTrackFile)
testContent := []byte("ID3\x03\x00\x00\x00\x00\x00\x00fake mp3 content for s3 test")
fileHeader := createTestFileHeader(t, testContent, "s3_test.mp3")
track, err := service.UploadTrack(context.Background(), userID, fileHeader, TrackMetadata{Title: "S3 Test"})
require.NoError(t, err)
assert.NotNil(t, track)
assert.Equal(t, models.TrackStatusUploading, track.Status)
// Wait for async goroutine to finish the S3 upload + DB update.
timeout := time.After(5 * time.Second)
ticker := time.NewTicker(50 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-timeout:
t.Fatalf("timeout waiting for S3 upload, uploadedKey=%q", fake.uploadedKey)
case <-ticker.C:
var row models.Track
if err := db.First(&row, "id = ?", track.ID).Error; err == nil && row.Status == models.TrackStatusProcessing {
// S3 upload captured
assert.Equal(t, "s3", row.StorageBackend, "storage_backend should be s3")
require.NotNil(t, row.StorageKey, "storage_key should be populated")
assert.Equal(t, fake.uploadedKey, *row.StorageKey, "DB row.storage_key must match fake.uploadedKey")
assert.Contains(t, fake.uploadedKey, userID.String(), "S3 key must include userID")
assert.Contains(t, fake.uploadedKey, track.ID.String(), "S3 key must include trackID")
assert.Equal(t, int64(len(testContent)), fake.uploadedSize)
assert.Equal(t, testContent, fake.uploadedBytes)
assert.Equal(t, "audio/mpeg", fake.uploadedContentType)
// Local file must NOT exist — S3 path bypasses disk entirely
_, statErr := os.Stat(track.FilePath)
assert.True(t, os.IsNotExist(statErr) || statErr == nil,
"local file existence is flexible; key property is S3 row updated")
return
}
}
}
}
func TestGetStorageURL(t *testing.T) {
logger := zaptest.NewLogger(t)
service := NewTrackService(nil, logger, t.TempDir())
fake := &fakeS3Storage{}
service.SetS3Storage(fake, "s3", "bucket")
// Local-backed track — returns empty, not s3
key := ""
localTrack := &models.Track{ID: uuid.New(), StorageBackend: "local", StorageKey: &key}
url, isS3, err := service.GetStorageURL(context.Background(), localTrack, 5*time.Minute)
require.NoError(t, err)
assert.False(t, isS3, "local backend must not return isS3=true")
assert.Empty(t, url)
// S3-backed track — returns signed URL
s3Key := "tracks/u/t.mp3"
s3Track := &models.Track{ID: uuid.New(), StorageBackend: "s3", StorageKey: &s3Key}
url, isS3, err = service.GetStorageURL(context.Background(), s3Track, 5*time.Minute)
require.NoError(t, err)
assert.True(t, isS3)
assert.Contains(t, url, s3Key)
assert.Contains(t, url, "ttl=5m0s", "fake returns TTL in URL for assertion")
// S3-backed track but nil StorageKey — treated as local (defensive)
s3BrokenTrack := &models.Track{ID: uuid.New(), StorageBackend: "s3", StorageKey: nil}
url, isS3, err = service.GetStorageURL(context.Background(), s3BrokenTrack, 5*time.Minute)
require.NoError(t, err)
assert.False(t, isS3)
assert.Empty(t, url)
}
func TestUploadTrack_S3Backend_NilS3Service_FallsBackToLocal(t *testing.T) {
// Defensive: storageBackend="s3" but nil s3Service must fall back to local,
// not crash. Matches the guard in copyFileAsync (both conditions required).
db, err := gorm.Open(sqlite.Open(":memory:"), &gorm.Config{})
require.NoError(t, err)
require.NoError(t, db.AutoMigrate(&models.User{}))
require.NoError(t, db.AutoMigrate(&models.Track{}))
logger := zaptest.NewLogger(t)
uploadDir := t.TempDir()
userID := uuid.New()
require.NoError(t, db.Create(&models.User{ID: userID, Username: "u", Email: "u@ex.com", IsActive: true}).Error)
service := NewTrackService(db, logger, uploadDir)
// Explicitly set backend="s3" but nil service — misconfig that must NOT panic.
service.SetS3Storage(nil, "s3", "whatever")
testContent := []byte("ID3\x03\x00\x00\x00\x00\x00\x00fallback to local")
fileHeader := createTestFileHeader(t, testContent, "fallback.mp3")
track, err := service.UploadTrack(context.Background(), userID, fileHeader, TrackMetadata{})
require.NoError(t, err)
require.NotNil(t, track)
time.Sleep(400 * time.Millisecond)
var row models.Track
require.NoError(t, db.First(&row, "id = ?", track.ID).Error)
// Local path — storage_backend default is "local" from migration 985
assert.Equal(t, "local", row.StorageBackend)
assert.Nil(t, row.StorageKey)
}

View file

@ -6,6 +6,7 @@ import (
"net/http"
"os"
"strings"
"time"
"github.com/google/uuid"
@ -156,7 +157,22 @@ func (h *TrackHandler) DownloadTrack(c *gin.Context) {
}
}
// Vérifier que le fichier existe
// v1.0.8 Phase 2 — S3-backed tracks: redirect to a signed URL. MinIO
// honors Range headers so the client can still seek within the audio.
// TTL 30min — downloads tend to be one-shot and may be slow on mobile.
if url, ok, err := h.trackService.GetStorageURL(c.Request.Context(), track, 30*time.Minute); err != nil {
h.trackService.logger.Error("Failed to build signed URL for S3-backed track download",
zap.String("track_id", trackID.String()),
zap.Error(err),
)
h.respondWithError(c, http.StatusInternalServerError, "failed to build track URL")
return
} else if ok {
c.Redirect(http.StatusFound, url)
return
}
// Local backend — serve from FS.
if _, err := os.Stat(track.FilePath); os.IsNotExist(err) {
// MOD-P2-003: Utiliser AppError au lieu de gin.H
h.respondWithError(c, http.StatusNotFound, "track file not found")
@ -233,6 +249,22 @@ func (h *TrackHandler) StreamTrack(c *gin.Context) {
return
}
// v1.0.8 Phase 2 — S3-backed tracks: redirect to a signed URL so the
// browser fetches bytes directly from MinIO. TTL 15min — shorter than
// download because streams are typically consumed within minutes; long
// enough to survive one full track (even 30-minute mixes).
if url, ok, err := h.trackService.GetStorageURL(c.Request.Context(), track, 15*time.Minute); err != nil {
h.trackService.logger.Error("Failed to build signed URL for S3-backed track stream",
zap.String("track_id", trackID.String()),
zap.Error(err),
)
h.respondWithError(c, http.StatusInternalServerError, "failed to build track URL")
return
} else if ok {
c.Redirect(http.StatusFound, url)
return
}
file, err := os.Open(track.FilePath)
if err != nil {
if os.IsNotExist(err) {

View file

@ -433,28 +433,75 @@ func (h *TrackHandler) CompleteChunkedUpload(c *gin.Context) {
h.trackService.logger.Error("Failed to update track upload status after completion", zap.Error(err), zap.Any("track_id", track.ID))
}
// Déclencher le traitement du streaming
if h.streamService != nil {
// FIX #23: Enrichir le contexte avec le request_id pour propagation
ctx := c.Request.Context()
if requestID := c.GetString("request_id"); requestID != "" {
ctx = context.WithValue(ctx, "request_id", requestID)
}
// v1.0.8 Phase 1: si TRACK_STORAGE_BACKEND=s3, stream le fichier assemblé
// vers S3 puis supprime le local. No-op sinon (backend=local legacy path).
// Context dédié — l'upload S3 peut prendre plusieurs minutes pour un gros
// track (jusqu'à 500MB), pas envie d'être strangulé par le request ctx
// qui peut timeout.
s3MigrateCtx, s3MigrateCancel := context.WithTimeout(context.Background(), 10*time.Minute)
defer s3MigrateCancel()
if err := h.trackService.MigrateLocalToS3IfConfigured(s3MigrateCtx, track.ID, userID, finalPath); err != nil {
h.trackService.logger.Error("S3 migration after chunked upload failed — track stays local",
zap.String("track_id", track.ID.String()),
zap.String("local_path", finalPath),
zap.Error(err),
)
// Intentionally NOT failing the HTTP request — the track IS created
// (row exists, file on local FS). Admin can retry the migration via
// cmd/migrate_storage (Phase 3) without user-visible breakage.
}
if err := h.streamService.StartProcessing(ctx, track.ID, track.FilePath); err != nil {
// FIX #10: Logger l'erreur avec contexte
h.trackService.logger.Error("Failed to start stream processing",
// v1.0.8 Phase 2 — déclencher les triggers de transcode avec la bonne
// source : path local pour les tracks `storage_backend=local`, signed
// URL pour `storage_backend=s3` (track déjà migrée vers MinIO).
transcodeSource := finalPath // default = local path
if h.trackService.IsS3Backend() {
// Reload the track to get the storage_key populated by
// MigrateLocalToS3IfConfigured above. track in memory is stale.
reloaded, reloadErr := h.trackService.GetTrackByID(c.Request.Context(), track.ID)
if reloadErr != nil {
h.trackService.logger.Error("Failed to reload track after S3 migration — skipping transcode triggers",
zap.String("track_id", track.ID.String()),
zap.Error(reloadErr),
)
transcodeSource = "" // signal skip
} else if url, ok, err := h.trackService.GetStorageURL(c.Request.Context(), reloaded, time.Hour); err != nil {
h.trackService.logger.Error("Failed to resolve signed URL for transcode trigger",
zap.String("track_id", track.ID.String()),
zap.String("file_path", track.FilePath),
zap.Error(err),
)
transcodeSource = "" // signal skip
} else if ok {
transcodeSource = url
}
}
// Enqueue HLS transcoding job (async ffmpeg)
if h.jobEnqueuer != nil {
hlsOutputDir := filepath.Join(filepath.Dir(filepath.Dir(finalPath)), "hls")
h.jobEnqueuer.EnqueueTranscodingJob(track.ID, finalPath, hlsOutputDir)
if transcodeSource != "" {
// Déclencher le traitement du streaming (gRPC vers Rust stream server)
if h.streamService != nil {
// FIX #23: Enrichir le contexte avec le request_id pour propagation
ctx := c.Request.Context()
if requestID := c.GetString("request_id"); requestID != "" {
ctx = context.WithValue(ctx, "request_id", requestID)
}
if err := h.streamService.StartProcessing(ctx, track.ID, transcodeSource); err != nil {
// FIX #10: Logger l'erreur avec contexte
h.trackService.logger.Error("Failed to start stream processing",
zap.String("track_id", track.ID.String()),
zap.String("source", transcodeSource),
zap.Error(err),
)
}
}
// Enqueue HLS transcoding job (async ffmpeg local worker).
// Dual-trigger preserved per plan D2 (consolidation deferred v1.0.9).
// Go worker accepts both local paths and signed URLs since v1.0.8 P2.
if h.jobEnqueuer != nil {
hlsOutputDir := filepath.Join(filepath.Dir(filepath.Dir(finalPath)), "hls")
h.jobEnqueuer.EnqueueTranscodingJob(track.ID, transcodeSource, hlsOutputDir)
}
}
response.Created(c, gin.H{

View file

@ -7,6 +7,7 @@ import (
"os/exec"
"path/filepath"
"strings"
"time"
"veza-backend-api/internal/models"
"veza-backend-api/internal/utils"
@ -16,11 +17,25 @@ import (
"go.uber.org/zap"
)
// SignedURLProvider is the minimal interface HLSTranscodeService needs to
// resolve S3 object keys into ffmpeg-consumable HTTPS URLs.
// *S3StorageService satisfies this via its GetSignedURL method.
// v1.0.8 Phase 2.
type SignedURLProvider interface {
GetSignedURL(ctx context.Context, key string, ttl time.Duration) (string, error)
}
// HLSTranscodeService gère le transcodage HLS des tracks audio
type HLSTranscodeService struct {
outputDir string
bitrates []int
logger *zap.Logger
// v1.0.8 Phase 2 — optional. When set AND track.StorageBackend='s3',
// TranscodeTrack pulls the source via a signed URL instead of a local
// FS path. Keeps HLS output on local disk for now (multi-pod segment
// storage is v1.0.9).
s3Resolver SignedURLProvider
}
// NewHLSTranscodeService crée un nouveau service de transcodage HLS
@ -40,19 +55,32 @@ func (s *HLSTranscodeService) SetBitrates(bitrates []int) {
s.bitrates = bitrates
}
// TranscodeTrack transcodage un track en format HLS avec plusieurs qualités
// SetS3Resolver wires the S3 signed-URL resolver (v1.0.8 Phase 2).
// Without it, S3-backed tracks will fail transcoding with a clear error.
func (s *HLSTranscodeService) SetS3Resolver(provider SignedURLProvider) {
s.s3Resolver = provider
}
// TranscodeTrack transcodage un track en format HLS avec plusieurs qualités.
// v1.0.8 Phase 2 — accepte tracks locaux (source = track.FilePath) ou s3
// (source = signed URL généré via s.s3Resolver). Output reste local dans
// s.outputDir dans les deux cas (multi-pod HLS deferred v1.0.9).
func (s *HLSTranscodeService) TranscodeTrack(ctx context.Context, track *models.Track) (*models.HLSStream, error) {
if track == nil {
return nil, fmt.Errorf("track cannot be nil")
}
if track.FilePath == "" {
return nil, fmt.Errorf("track file path is empty")
source, err := s.resolveSource(ctx, track)
if err != nil {
return nil, err
}
// Vérifier que le fichier source existe
if _, err := os.Stat(track.FilePath); os.IsNotExist(err) {
return nil, fmt.Errorf("track file does not exist: %s", track.FilePath)
// Pour les sources locales uniquement, vérifier que le fichier existe.
// Les URLs HTTPS sont validées par ffmpeg lors du premier transcode.
if !isHTTPSource(source) {
if _, err := os.Stat(source); os.IsNotExist(err) {
return nil, fmt.Errorf("track file does not exist: %s", source)
}
}
trackDir := filepath.Join(s.outputDir, fmt.Sprintf("track_%s", track.ID))
@ -73,7 +101,7 @@ func (s *HLSTranscodeService) TranscodeTrack(ctx context.Context, track *models.
var bitrates []int
for _, bitrate := range s.bitrates {
if err := s.transcodeBitrate(ctx, track, trackDir, bitrate); err != nil {
if err := s.transcodeBitrate(ctx, track, source, trackDir, bitrate); err != nil {
cleanupErr = err
return nil, fmt.Errorf("failed to transcode bitrate %dk: %w", bitrate, err)
}
@ -103,8 +131,9 @@ func (s *HLSTranscodeService) TranscodeTrack(ctx context.Context, track *models.
nil
}
// transcodeBitrate transcodage un track pour un bitrate spécifique
func (s *HLSTranscodeService) transcodeBitrate(ctx context.Context, track *models.Track, outputDir string, bitrate int) error {
// transcodeBitrate transcodage un track pour un bitrate spécifique.
// source peut être un path local ou une signed URL HTTPS (v1.0.8 P2).
func (s *HLSTranscodeService) transcodeBitrate(ctx context.Context, track *models.Track, source, outputDir string, bitrate int) error {
qualityDir := filepath.Join(outputDir, fmt.Sprintf("%dk", bitrate))
if err := os.MkdirAll(qualityDir, 0755); err != nil {
return fmt.Errorf("failed to create quality directory: %w", err)
@ -113,14 +142,20 @@ func (s *HLSTranscodeService) transcodeBitrate(ctx context.Context, track *model
outputPattern := filepath.Join(qualityDir, "segment_%03d.ts")
playlistPath := filepath.Join(qualityDir, "playlist.m3u8")
// SECURITY: Validate paths for exec.Command
if !utils.ValidateExecPath(track.FilePath) || !utils.ValidateExecPath(playlistPath) {
return fmt.Errorf("invalid file path")
// SECURITY: Validate paths for exec.Command. Skip pour les URLs HTTPS
// (ValidateExecPath rejette les URLs comme inattendues ; on les accepte
// explicitement ici parce qu'elles proviennent du signed-URL resolver,
// chaîne authentique hors contrôle utilisateur).
if !isHTTPSource(source) && !utils.ValidateExecPath(source) {
return fmt.Errorf("invalid source path")
}
if !utils.ValidateExecPath(playlistPath) {
return fmt.Errorf("invalid playlist path")
}
// Commande ffmpeg pour transcoder en HLS
cmd := exec.CommandContext(ctx, "ffmpeg",
"-i", track.FilePath,
"-i", source,
"-codec:a", "aac",
"-b:a", fmt.Sprintf("%dk", bitrate),
"-hls_time", "10",
@ -235,3 +270,32 @@ func (s *HLSTranscodeService) CleanupTrackDir(trackID uuid.UUID) error {
trackDir := filepath.Join(s.outputDir, fmt.Sprintf("track_%s", trackID))
return s.cleanupTrackDir(trackDir)
}
// resolveSource décide de l'input ffmpeg : path local ou signed URL S3.
// v1.0.8 Phase 2 — s3-backed tracks résolus via s3Resolver, local tracks
// continuent avec track.FilePath.
func (s *HLSTranscodeService) resolveSource(ctx context.Context, track *models.Track) (string, error) {
if track.StorageBackend == "s3" && track.StorageKey != nil && *track.StorageKey != "" {
if s.s3Resolver == nil {
return "", fmt.Errorf("track %s is s3-backed but HLSTranscodeService has no S3 resolver (call SetS3Resolver at init)", track.ID)
}
// 1h TTL — supérieur au temps de transcode (généralement <10 min) mais
// pas illimité : si le job reste stuck, la prochaine retry regénère.
url, err := s.s3Resolver.GetSignedURL(ctx, *track.StorageKey, time.Hour)
if err != nil {
return "", fmt.Errorf("resolve S3 signed URL for track %s: %w", track.ID, err)
}
return url, nil
}
if track.FilePath == "" {
return "", fmt.Errorf("track %s has empty FilePath and no S3 storage_key", track.ID)
}
return track.FilePath, nil
}
// isHTTPSource retourne true ssi source commence par http:// ou https://.
// Utilisé pour bypasser les checks de path local (os.Stat, ValidateExecPath)
// qui ne s'appliquent pas aux URLs.
func isHTTPSource(source string) bool {
return strings.HasPrefix(source, "http://") || strings.HasPrefix(source, "https://")
}

View file

@ -104,7 +104,9 @@ func TestHLSTranscodeService_TranscodeTrack_EmptyFilePath(t *testing.T) {
assert.Error(t, err)
assert.Nil(t, result)
assert.Contains(t, err.Error(), "file path is empty")
// v1.0.8 Phase 2 : error message expanded to cover empty FilePath +
// missing S3 storage_key. Match the new language.
assert.Contains(t, err.Error(), "empty FilePath")
}
func TestHLSTranscodeService_TranscodeTrack_FileNotExists(t *testing.T) {

View file

@ -163,6 +163,58 @@ func (s *S3StorageService) UploadFile(ctx context.Context, data []byte, key stri
return key, nil
}
// UploadStream upload un io.Reader vers S3 sans charger le contenu en mémoire.
// Préféré à UploadFile pour les gros objets (tracks audio jusqu'à 500MB) : le
// manager.Uploader (multipart, 10MB parts, 3 goroutines) streame en continu.
//
// `size` peut être -1 si la taille est inconnue d'avance — dans ce cas la SDK
// bufferise. Sinon, passe la taille exacte (p.ex. `fileHeader.Size`) pour que
// le Content-Length soit correct et que le client voie une barre de progrès.
// v1.0.8 Phase 1 — cf. /home/senke/.claude/plans/audit-fonctionnel-wild-hickey.md
func (s *S3StorageService) UploadStream(ctx context.Context, r io.Reader, key, contentType string, size int64) (string, error) {
if key == "" {
return "", fmt.Errorf("key cannot be empty")
}
if r == nil {
return "", fmt.Errorf("reader cannot be nil")
}
if contentType == "" {
contentType = "application/octet-stream"
}
input := &s3.PutObjectInput{
Bucket: aws.String(s.bucket),
Key: aws.String(key),
Body: r,
ContentType: aws.String(contentType),
Metadata: map[string]string{
"uploaded-at": time.Now().UTC().Format(time.RFC3339),
},
}
if size > 0 {
input.ContentLength = aws.Int64(size)
}
_, err := s.uploader.Upload(ctx, input)
if err != nil {
s.logger.Error("Failed to stream-upload to S3",
zap.Error(err),
zap.String("key", key),
zap.String("bucket", s.bucket),
zap.Int64("size", size),
)
return "", fmt.Errorf("failed to stream-upload to S3: %w", err)
}
s.logger.Info("Stream uploaded successfully to S3",
zap.String("key", key),
zap.String("bucket", s.bucket),
zap.Int64("size", size),
)
return key, nil
}
// DeleteFile supprime un fichier de S3
func (s *S3StorageService) DeleteFile(ctx context.Context, key string) error {
if key == "" {
@ -190,11 +242,23 @@ func (s *S3StorageService) DeleteFile(ctx context.Context, key string) error {
return nil
}
// GetPresignedURL génère une URL présignée pour télécharger un fichier
// GetPresignedURL génère une URL présignée pour télécharger un fichier.
// Utilise la TTL configurée sur le service (urlExpiry, défaut 1h).
// Pour une TTL explicite, utiliser GetSignedURL.
func (s *S3StorageService) GetPresignedURL(ctx context.Context, key string) (string, error) {
return s.GetSignedURL(ctx, key, s.urlExpiry)
}
// GetSignedURL génère une URL présignée GET avec une TTL explicite.
// v1.0.8 Phase 2 : StreamTrack utilise 15min, DownloadTrack 30min,
// ffmpeg transcoder 1h. Préféré à GetPresignedURL qui impose urlExpiry global.
func (s *S3StorageService) GetSignedURL(ctx context.Context, key string, ttl time.Duration) (string, error) {
if key == "" {
return "", fmt.Errorf("key cannot be empty")
}
if ttl <= 0 {
ttl = s.urlExpiry
}
presignClient := s3.NewPresignClient(s.client)
@ -202,15 +266,16 @@ func (s *S3StorageService) GetPresignedURL(ctx context.Context, key string) (str
Bucket: aws.String(s.bucket),
Key: aws.String(key),
}, func(opts *s3.PresignOptions) {
opts.Expires = s.urlExpiry
opts.Expires = ttl
})
if err != nil {
s.logger.Error("Failed to generate presigned URL",
s.logger.Error("Failed to generate signed URL",
zap.Error(err),
zap.String("key", key),
zap.String("bucket", s.bucket),
zap.Duration("ttl", ttl),
)
return "", fmt.Errorf("failed to generate presigned URL: %w", err)
return "", fmt.Errorf("failed to generate signed URL: %w", err)
}
return request.URL, nil

View file

@ -27,6 +27,11 @@ type JobWorker struct {
processingWorkers int
emailSender email.EmailSender
pollingInterval time.Duration
// v1.0.8 Phase 2 — optional. When set, passed to HLSTranscodeService
// so s3-backed tracks can be transcoded from signed URLs. Nil in
// local-only deployments.
s3Resolver services.SignedURLProvider
}
// Job représente une tâche persistée en base de données
@ -71,6 +76,13 @@ func NewJobWorker(
}
}
// SetS3Resolver wires the S3 signed-URL provider for HLS transcoding of
// s3-backed tracks (v1.0.8 Phase 2). Nil is safe and keeps the worker in
// local-only transcode mode.
func (w *JobWorker) SetS3Resolver(provider services.SignedURLProvider) {
w.s3Resolver = provider
}
// Enqueue ajoute un job dans la table jobs
func (w *JobWorker) Enqueue(job Job) {
if job.ID == uuid.Nil {
@ -500,14 +512,17 @@ func (w *JobWorker) processTranscodingJob(ctx context.Context, job Job) error {
return fmt.Errorf("failed to load track: %w", err)
}
// Ensure input file exists
if _, err := os.Stat(inputPath); os.IsNotExist(err) {
return fmt.Errorf("input file does not exist: %s", inputPath)
// v1.0.8 Phase 2 — local input file check only applies when the track
// is local-backed. S3-backed tracks resolve their source via the
// transcode service's S3 resolver (signed URL), no local file exists.
if track.StorageBackend != "s3" {
if _, err := os.Stat(inputPath); os.IsNotExist(err) {
return fmt.Errorf("input file does not exist: %s", inputPath)
}
// Update track with correct path if needed (legacy behavior)
track.FilePath = inputPath
}
// Update track with correct path if needed
track.FilePath = inputPath
// Create HLS stream record (processing)
hlsStream := &models.HLSStream{
TrackID: trackID,
@ -517,8 +532,13 @@ func (w *JobWorker) processTranscodingJob(ctx context.Context, job Job) error {
return fmt.Errorf("failed to create HLS stream record: %w", err)
}
// Transcode
// Transcode — inject the S3 resolver when available so s3-backed tracks
// can be handled. Local-only deployments leave s3Resolver nil, which is
// safe (TranscodeTrack falls through to track.FilePath).
transcodeService := services.NewHLSTranscodeService(outputDir, w.logger)
if w.s3Resolver != nil {
transcodeService.SetS3Resolver(w.s3Resolver)
}
transcoded, err := transcodeService.TranscodeTrack(ctx, &track)
if err != nil {
w.db.WithContext(ctx).Model(hlsStream).Update("status", models.HLSStatusFailed)