// Command migrate_storage bulk-migrates tracks from local disk to MinIO/S3. // // Reads env (DATABASE_URL, AWS_S3_*) and iterates rows where // `tracks.storage_backend='local'`, streaming each file to S3 at // `tracks//.` and atomically updating the row to // `storage_backend='s3' + storage_key=`. // // v1.0.8 Phase 3. Usage: // // migrate_storage --dry-run // migrate_storage --batch-size=50 --limit=500 // migrate_storage --delete-local=true # delete source file after successful upload // // Safe to re-run: each iteration filters on storage_backend='local', so rows // already migrated are skipped. A crash mid-batch leaves partial progress // committed per row (transactions scoped to one track). package main import ( "context" "flag" "fmt" "os" "os/signal" "syscall" "time" "veza-backend-api/internal/database" "veza-backend-api/internal/models" "veza-backend-api/internal/services" "github.com/joho/godotenv" "go.uber.org/zap" "gorm.io/gorm" ) type flags struct { dryRun bool batchSize int limit int deleteLocal bool timeoutMin int } func parseFlags() flags { f := flags{} flag.BoolVar(&f.dryRun, "dry-run", false, "Log candidates without uploading or updating DB") flag.IntVar(&f.batchSize, "batch-size", 50, "Rows to SELECT per iteration") flag.IntVar(&f.limit, "limit", 0, "Total max rows to migrate (0 = unlimited)") flag.BoolVar(&f.deleteLocal, "delete-local", false, "Remove local file after successful S3 upload") flag.IntVar(&f.timeoutMin, "timeout-min", 30, "Global timeout in minutes (safety stop)") flag.Parse() return f } func main() { _ = godotenv.Load() cfg := parseFlags() logger, err := zap.NewProduction() if err != nil { fmt.Fprintf(os.Stderr, "failed to init logger: %v\n", err) os.Exit(1) } defer logger.Sync() logger.Info("migrate_storage starting", zap.Bool("dry_run", cfg.dryRun), zap.Int("batch_size", cfg.batchSize), zap.Int("limit", cfg.limit), zap.Bool("delete_local", cfg.deleteLocal), ) // Top-level context with SIGINT + timeout so a stuck S3 call doesn't hang forever. ctx, cancel := context.WithTimeout(context.Background(), time.Duration(cfg.timeoutMin)*time.Minute) defer cancel() sigCh := make(chan os.Signal, 1) signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM) go func() { <-sigCh logger.Warn("signal received, cancelling context") cancel() }() // DB init — minimal config, just DATABASE_URL. dbURL := os.Getenv("DATABASE_URL") if dbURL == "" { logger.Fatal("DATABASE_URL not set") } db, err := database.NewDatabase(&database.Config{ URL: dbURL, MaxOpenConns: 5, MaxIdleConns: 2, MaxLifetime: 5 * time.Minute, MaxIdleTime: 2 * time.Minute, }) if err != nil { logger.Fatal("failed to open database", zap.Error(err)) } defer db.Close() db.Logger = logger // S3 init (always required — if it fails, the CLI has no reason to run). s3Bucket := os.Getenv("AWS_S3_BUCKET") if s3Bucket == "" { logger.Fatal("AWS_S3_BUCKET not set (migration target)") } s3Service, err := services.NewS3StorageService(services.S3Config{ Bucket: s3Bucket, Region: envOrDefault("AWS_REGION", "us-east-1"), Endpoint: os.Getenv("AWS_S3_ENDPOINT"), AccessKey: os.Getenv("AWS_ACCESS_KEY_ID"), SecretKey: os.Getenv("AWS_SECRET_ACCESS_KEY"), Logger: logger, }) if err != nil { logger.Fatal("failed to init S3", zap.Error(err)) } stats := &runStats{} runner := &migrator{ db: db.GormDB, s3: s3Service, logger: logger, cfg: cfg, stats: stats, } if err := runner.run(ctx); err != nil { logger.Error("migration finished with errors", zap.Error(err)) stats.log(logger) os.Exit(2) } stats.log(logger) } // runStats accumulates per-run counters for the final summary. type runStats struct { candidates int uploaded int skipped int errors int bytesSent int64 } func (s *runStats) log(logger *zap.Logger) { logger.Info("migrate_storage summary", zap.Int("candidates", s.candidates), zap.Int("uploaded", s.uploaded), zap.Int("skipped", s.skipped), zap.Int("errors", s.errors), zap.Int64("bytes_sent", s.bytesSent), ) } type migrator struct { db *gorm.DB s3 *services.S3StorageService logger *zap.Logger cfg flags stats *runStats } func (m *migrator) run(ctx context.Context) error { seen := 0 for { if ctx.Err() != nil { return ctx.Err() } if m.cfg.limit > 0 && seen >= m.cfg.limit { m.logger.Info("reached --limit, stopping", zap.Int("limit", m.cfg.limit)) return nil } // Pull a batch of local-backed tracks. Ordering by created_at // oldest-first so long-lived legacy rows get processed first. var tracks []models.Track q := m.db.WithContext(ctx). Where("storage_backend = ?", "local"). Where("file_path != ''"). Order("created_at ASC"). Limit(m.cfg.batchSize) if err := q.Find(&tracks).Error; err != nil { return fmt.Errorf("select batch: %w", err) } if len(tracks) == 0 { m.logger.Info("no more local-backed tracks, done") return nil } m.stats.candidates += len(tracks) for i := range tracks { if ctx.Err() != nil { return ctx.Err() } if m.cfg.limit > 0 && seen >= m.cfg.limit { return nil } m.migrateOne(ctx, &tracks[i]) seen++ } } } func (m *migrator) migrateOne(ctx context.Context, t *models.Track) { log := m.logger.With( zap.String("track_id", t.ID.String()), zap.String("user_id", t.UserID.String()), zap.String("file_path", t.FilePath), ) if m.cfg.dryRun { log.Info("dry-run: would upload to S3") m.stats.skipped++ return } file, err := os.Open(t.FilePath) if err != nil { if os.IsNotExist(err) { log.Warn("local file missing, skipping (orphan row)") m.stats.skipped++ return } log.Error("open local file failed", zap.Error(err)) m.stats.errors++ return } defer file.Close() stat, err := file.Stat() if err != nil { log.Error("stat local file failed", zap.Error(err)) m.stats.errors++ return } ext := extOrDefault(t.FilePath, t.Format) s3Key := fmt.Sprintf("tracks/%s/%s%s", t.UserID.String(), t.ID.String(), ext) contentType := mimeFromExt(ext) uploadCtx, cancel := context.WithTimeout(ctx, 10*time.Minute) defer cancel() if _, err := m.s3.UploadStream(uploadCtx, file, s3Key, contentType, stat.Size()); err != nil { log.Error("S3 upload failed", zap.Error(err), zap.String("s3_key", s3Key)) m.stats.errors++ return } // Update DB row atomically. if err := m.db.WithContext(ctx).Model(&models.Track{}). Where("id = ?", t.ID). Updates(map[string]interface{}{ "storage_backend": "s3", "storage_key": s3Key, }).Error; err != nil { // Object uploaded but row stale — log loud. Admin can run a reconcile // query to find this state: SELECT id FROM tracks WHERE storage_backend='local' AND EXISTS(s3 object for id). log.Error("DB update failed after S3 upload (object leaked)", zap.String("s3_key", s3Key), zap.Error(err)) m.stats.errors++ return } m.stats.uploaded++ m.stats.bytesSent += stat.Size() if m.cfg.deleteLocal { if err := os.Remove(t.FilePath); err != nil { log.Warn("local delete failed (file orphaned)", zap.Error(err)) } } log.Info("migrated", zap.String("s3_key", s3Key), zap.Int64("bytes", stat.Size()), ) } func envOrDefault(key, def string) string { if v := os.Getenv(key); v != "" { return v } return def } // extOrDefault returns the file extension (with dot) from path, falling back // to "." when path has none. func extOrDefault(path, format string) string { for i := len(path) - 1; i >= 0 && path[i] != '/'; i-- { if path[i] == '.' { return path[i:] } } if format != "" { return "." + format } return "" } func mimeFromExt(ext string) string { switch ext { case ".mp3": return "audio/mpeg" case ".flac": return "audio/flac" case ".wav": return "audio/wav" case ".ogg": return "audio/ogg" case ".m4a", ".aac": return "audio/aac" default: return "application/octet-stream" } }