From e3bf2d2aead04726283e1b6d4a892833e2434b2e Mon Sep 17 00:00:00 2001 From: senke Date: Thu, 23 Apr 2026 23:38:06 +0200 Subject: [PATCH] =?UTF-8?q?feat(tools):=20add=20cmd/migrate=5Fstorage=20CL?= =?UTF-8?q?I=20for=20bulk=20local=E2=86=92s3=20migration=20(v1.0.8=20P3)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Closes MinIO Phase 3: ops path for migrating existing tracks. Usage: export DATABASE_URL=... AWS_S3_BUCKET=... AWS_S3_ENDPOINT=... ... migrate_storage --dry-run --limit=10 # plan a batch migrate_storage --batch-size=50 --limit=500 # migrate first 500 migrate_storage --delete-local=true # also rm local files Design: - Idempotent: WHERE storage_backend='local' + per-row DB update means a crashed run resumes cleanly without duplicating uploads. - Streaming upload via S3StorageService.UploadStream (matches the live upload path — same keys `tracks//.`, same MIME resolution). - Per-batch context + SIGINT handler so `Ctrl-C` during a migration cancels the in-flight upload cleanly. - Global `--timeout-min=30` safety cap. - `--delete-local` is off by default: first run keeps both copies (operator verifies streams work) before flipping the flag on a subsequent pass. - Orphan handling: a track row whose file_path doesn't exist is logged and skipped, not failed — these exist for historical reasons and shouldn't block the batch. Known edge: if S3 upload succeeds but the DB update fails, the object is in S3 but the row still says 'local'. Log message spells out the reconcile query. v1.0.9 could add a verification pass. Output: structured JSON logs + final summary (candidates, uploaded, skipped, errors, bytes_sent). Refs: plan Batch A step A6, migration 985 schema (Phase 0, d03232c8). Co-Authored-By: Claude Opus 4.7 (1M context) --- veza-backend-api/cmd/migrate_storage/main.go | 317 +++++++++++++++++++ 1 file changed, 317 insertions(+) create mode 100644 veza-backend-api/cmd/migrate_storage/main.go diff --git a/veza-backend-api/cmd/migrate_storage/main.go b/veza-backend-api/cmd/migrate_storage/main.go new file mode 100644 index 000000000..1a00d69b6 --- /dev/null +++ b/veza-backend-api/cmd/migrate_storage/main.go @@ -0,0 +1,317 @@ +// Command migrate_storage bulk-migrates tracks from local disk to MinIO/S3. +// +// Reads env (DATABASE_URL, AWS_S3_*) and iterates rows where +// `tracks.storage_backend='local'`, streaming each file to S3 at +// `tracks//.` and atomically updating the row to +// `storage_backend='s3' + storage_key=`. +// +// v1.0.8 Phase 3. Usage: +// +// migrate_storage --dry-run +// migrate_storage --batch-size=50 --limit=500 +// migrate_storage --delete-local=true # delete source file after successful upload +// +// Safe to re-run: each iteration filters on storage_backend='local', so rows +// already migrated are skipped. A crash mid-batch leaves partial progress +// committed per row (transactions scoped to one track). +package main + +import ( + "context" + "flag" + "fmt" + "os" + "os/signal" + "syscall" + "time" + + "veza-backend-api/internal/database" + "veza-backend-api/internal/models" + "veza-backend-api/internal/services" + + "github.com/joho/godotenv" + "go.uber.org/zap" + "gorm.io/gorm" +) + +type flags struct { + dryRun bool + batchSize int + limit int + deleteLocal bool + timeoutMin int +} + +func parseFlags() flags { + f := flags{} + flag.BoolVar(&f.dryRun, "dry-run", false, "Log candidates without uploading or updating DB") + flag.IntVar(&f.batchSize, "batch-size", 50, "Rows to SELECT per iteration") + flag.IntVar(&f.limit, "limit", 0, "Total max rows to migrate (0 = unlimited)") + flag.BoolVar(&f.deleteLocal, "delete-local", false, "Remove local file after successful S3 upload") + flag.IntVar(&f.timeoutMin, "timeout-min", 30, "Global timeout in minutes (safety stop)") + flag.Parse() + return f +} + +func main() { + _ = godotenv.Load() + + cfg := parseFlags() + + logger, err := zap.NewProduction() + if err != nil { + fmt.Fprintf(os.Stderr, "failed to init logger: %v\n", err) + os.Exit(1) + } + defer logger.Sync() + + logger.Info("migrate_storage starting", + zap.Bool("dry_run", cfg.dryRun), + zap.Int("batch_size", cfg.batchSize), + zap.Int("limit", cfg.limit), + zap.Bool("delete_local", cfg.deleteLocal), + ) + + // Top-level context with SIGINT + timeout so a stuck S3 call doesn't hang forever. + ctx, cancel := context.WithTimeout(context.Background(), time.Duration(cfg.timeoutMin)*time.Minute) + defer cancel() + sigCh := make(chan os.Signal, 1) + signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM) + go func() { + <-sigCh + logger.Warn("signal received, cancelling context") + cancel() + }() + + // DB init — minimal config, just DATABASE_URL. + dbURL := os.Getenv("DATABASE_URL") + if dbURL == "" { + logger.Fatal("DATABASE_URL not set") + } + db, err := database.NewDatabase(&database.Config{ + URL: dbURL, + MaxOpenConns: 5, + MaxIdleConns: 2, + MaxLifetime: 5 * time.Minute, + MaxIdleTime: 2 * time.Minute, + }) + if err != nil { + logger.Fatal("failed to open database", zap.Error(err)) + } + defer db.Close() + db.Logger = logger + + // S3 init (always required — if it fails, the CLI has no reason to run). + s3Bucket := os.Getenv("AWS_S3_BUCKET") + if s3Bucket == "" { + logger.Fatal("AWS_S3_BUCKET not set (migration target)") + } + s3Service, err := services.NewS3StorageService(services.S3Config{ + Bucket: s3Bucket, + Region: envOrDefault("AWS_REGION", "us-east-1"), + Endpoint: os.Getenv("AWS_S3_ENDPOINT"), + AccessKey: os.Getenv("AWS_ACCESS_KEY_ID"), + SecretKey: os.Getenv("AWS_SECRET_ACCESS_KEY"), + Logger: logger, + }) + if err != nil { + logger.Fatal("failed to init S3", zap.Error(err)) + } + + stats := &runStats{} + runner := &migrator{ + db: db.GormDB, + s3: s3Service, + logger: logger, + cfg: cfg, + stats: stats, + } + if err := runner.run(ctx); err != nil { + logger.Error("migration finished with errors", zap.Error(err)) + stats.log(logger) + os.Exit(2) + } + stats.log(logger) +} + +// runStats accumulates per-run counters for the final summary. +type runStats struct { + candidates int + uploaded int + skipped int + errors int + bytesSent int64 +} + +func (s *runStats) log(logger *zap.Logger) { + logger.Info("migrate_storage summary", + zap.Int("candidates", s.candidates), + zap.Int("uploaded", s.uploaded), + zap.Int("skipped", s.skipped), + zap.Int("errors", s.errors), + zap.Int64("bytes_sent", s.bytesSent), + ) +} + +type migrator struct { + db *gorm.DB + s3 *services.S3StorageService + logger *zap.Logger + cfg flags + stats *runStats +} + +func (m *migrator) run(ctx context.Context) error { + seen := 0 + for { + if ctx.Err() != nil { + return ctx.Err() + } + if m.cfg.limit > 0 && seen >= m.cfg.limit { + m.logger.Info("reached --limit, stopping", zap.Int("limit", m.cfg.limit)) + return nil + } + + // Pull a batch of local-backed tracks. Ordering by created_at + // oldest-first so long-lived legacy rows get processed first. + var tracks []models.Track + q := m.db.WithContext(ctx). + Where("storage_backend = ?", "local"). + Where("file_path != ''"). + Order("created_at ASC"). + Limit(m.cfg.batchSize) + if err := q.Find(&tracks).Error; err != nil { + return fmt.Errorf("select batch: %w", err) + } + if len(tracks) == 0 { + m.logger.Info("no more local-backed tracks, done") + return nil + } + + m.stats.candidates += len(tracks) + for i := range tracks { + if ctx.Err() != nil { + return ctx.Err() + } + if m.cfg.limit > 0 && seen >= m.cfg.limit { + return nil + } + m.migrateOne(ctx, &tracks[i]) + seen++ + } + } +} + +func (m *migrator) migrateOne(ctx context.Context, t *models.Track) { + log := m.logger.With( + zap.String("track_id", t.ID.String()), + zap.String("user_id", t.UserID.String()), + zap.String("file_path", t.FilePath), + ) + + if m.cfg.dryRun { + log.Info("dry-run: would upload to S3") + m.stats.skipped++ + return + } + + file, err := os.Open(t.FilePath) + if err != nil { + if os.IsNotExist(err) { + log.Warn("local file missing, skipping (orphan row)") + m.stats.skipped++ + return + } + log.Error("open local file failed", zap.Error(err)) + m.stats.errors++ + return + } + defer file.Close() + + stat, err := file.Stat() + if err != nil { + log.Error("stat local file failed", zap.Error(err)) + m.stats.errors++ + return + } + + ext := extOrDefault(t.FilePath, t.Format) + s3Key := fmt.Sprintf("tracks/%s/%s%s", t.UserID.String(), t.ID.String(), ext) + contentType := mimeFromExt(ext) + + uploadCtx, cancel := context.WithTimeout(ctx, 10*time.Minute) + defer cancel() + if _, err := m.s3.UploadStream(uploadCtx, file, s3Key, contentType, stat.Size()); err != nil { + log.Error("S3 upload failed", zap.Error(err), zap.String("s3_key", s3Key)) + m.stats.errors++ + return + } + + // Update DB row atomically. + if err := m.db.WithContext(ctx).Model(&models.Track{}). + Where("id = ?", t.ID). + Updates(map[string]interface{}{ + "storage_backend": "s3", + "storage_key": s3Key, + }).Error; err != nil { + // Object uploaded but row stale — log loud. Admin can run a reconcile + // query to find this state: SELECT id FROM tracks WHERE storage_backend='local' AND EXISTS(s3 object for id). + log.Error("DB update failed after S3 upload (object leaked)", + zap.String("s3_key", s3Key), + zap.Error(err)) + m.stats.errors++ + return + } + + m.stats.uploaded++ + m.stats.bytesSent += stat.Size() + + if m.cfg.deleteLocal { + if err := os.Remove(t.FilePath); err != nil { + log.Warn("local delete failed (file orphaned)", zap.Error(err)) + } + } + + log.Info("migrated", + zap.String("s3_key", s3Key), + zap.Int64("bytes", stat.Size()), + ) +} + +func envOrDefault(key, def string) string { + if v := os.Getenv(key); v != "" { + return v + } + return def +} + +// extOrDefault returns the file extension (with dot) from path, falling back +// to "." when path has none. +func extOrDefault(path, format string) string { + for i := len(path) - 1; i >= 0 && path[i] != '/'; i-- { + if path[i] == '.' { + return path[i:] + } + } + if format != "" { + return "." + format + } + return "" +} + +func mimeFromExt(ext string) string { + switch ext { + case ".mp3": + return "audio/mpeg" + case ".flac": + return "audio/flac" + case ".wav": + return "audio/wav" + case ".ogg": + return "audio/ogg" + case ".m4a", ".aac": + return "audio/aac" + default: + return "application/octet-stream" + } +}