feat(tools): add cmd/migrate_storage CLI for bulk local→s3 migration (v1.0.8 P3)
Some checks failed
Veza CI / Backend (Go) (push) Failing after 0s
Veza CI / Frontend (Web) (push) Failing after 0s
Veza CI / Rust (Stream Server) (push) Failing after 0s
Security Scan / Secret Scanning (gitleaks) (push) Failing after 0s
Veza CI / Notify on failure (push) Failing after 0s

Closes MinIO Phase 3: ops path for migrating existing tracks.

Usage:
  export DATABASE_URL=... AWS_S3_BUCKET=... AWS_S3_ENDPOINT=... ...
  migrate_storage --dry-run --limit=10         # plan a batch
  migrate_storage --batch-size=50 --limit=500  # migrate first 500
  migrate_storage --delete-local=true          # also rm local files

Design:
- Idempotent: WHERE storage_backend='local' + per-row DB update means
  a crashed run resumes cleanly without duplicating uploads.
- Streaming upload via S3StorageService.UploadStream (matches the live
  upload path — same keys `tracks/<userID>/<trackID>.<ext>`, same MIME
  resolution).
- Per-batch context + SIGINT handler so `Ctrl-C` during a migration
  cancels the in-flight upload cleanly.
- Global `--timeout-min=30` safety cap.
- `--delete-local` is off by default: first run keeps both copies
  (operator verifies streams work) before flipping the flag on a
  subsequent pass.
- Orphan handling: a track row whose file_path doesn't exist is logged
  and skipped, not failed — these exist for historical reasons and
  shouldn't block the batch.

Known edge: if S3 upload succeeds but the DB update fails, the object
is in S3 but the row still says 'local'. Log message spells out the
reconcile query. v1.0.9 could add a verification pass.

Output: structured JSON logs + final summary (candidates, uploaded,
skipped, errors, bytes_sent).

Refs: plan Batch A step A6, migration 985 schema (Phase 0, d03232c8).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
senke 2026-04-23 23:38:06 +02:00
parent 70f0fb1636
commit e3bf2d2aea

View file

@ -0,0 +1,317 @@
// Command migrate_storage bulk-migrates tracks from local disk to MinIO/S3.
//
// Reads env (DATABASE_URL, AWS_S3_*) and iterates rows where
// `tracks.storage_backend='local'`, streaming each file to S3 at
// `tracks/<userID>/<trackID>.<ext>` and atomically updating the row to
// `storage_backend='s3' + storage_key=<key>`.
//
// v1.0.8 Phase 3. Usage:
//
// migrate_storage --dry-run
// migrate_storage --batch-size=50 --limit=500
// migrate_storage --delete-local=true # delete source file after successful upload
//
// Safe to re-run: each iteration filters on storage_backend='local', so rows
// already migrated are skipped. A crash mid-batch leaves partial progress
// committed per row (transactions scoped to one track).
package main
import (
"context"
"flag"
"fmt"
"os"
"os/signal"
"syscall"
"time"
"veza-backend-api/internal/database"
"veza-backend-api/internal/models"
"veza-backend-api/internal/services"
"github.com/joho/godotenv"
"go.uber.org/zap"
"gorm.io/gorm"
)
type flags struct {
dryRun bool
batchSize int
limit int
deleteLocal bool
timeoutMin int
}
func parseFlags() flags {
f := flags{}
flag.BoolVar(&f.dryRun, "dry-run", false, "Log candidates without uploading or updating DB")
flag.IntVar(&f.batchSize, "batch-size", 50, "Rows to SELECT per iteration")
flag.IntVar(&f.limit, "limit", 0, "Total max rows to migrate (0 = unlimited)")
flag.BoolVar(&f.deleteLocal, "delete-local", false, "Remove local file after successful S3 upload")
flag.IntVar(&f.timeoutMin, "timeout-min", 30, "Global timeout in minutes (safety stop)")
flag.Parse()
return f
}
func main() {
_ = godotenv.Load()
cfg := parseFlags()
logger, err := zap.NewProduction()
if err != nil {
fmt.Fprintf(os.Stderr, "failed to init logger: %v\n", err)
os.Exit(1)
}
defer logger.Sync()
logger.Info("migrate_storage starting",
zap.Bool("dry_run", cfg.dryRun),
zap.Int("batch_size", cfg.batchSize),
zap.Int("limit", cfg.limit),
zap.Bool("delete_local", cfg.deleteLocal),
)
// Top-level context with SIGINT + timeout so a stuck S3 call doesn't hang forever.
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(cfg.timeoutMin)*time.Minute)
defer cancel()
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-sigCh
logger.Warn("signal received, cancelling context")
cancel()
}()
// DB init — minimal config, just DATABASE_URL.
dbURL := os.Getenv("DATABASE_URL")
if dbURL == "" {
logger.Fatal("DATABASE_URL not set")
}
db, err := database.NewDatabase(&database.Config{
URL: dbURL,
MaxOpenConns: 5,
MaxIdleConns: 2,
MaxLifetime: 5 * time.Minute,
MaxIdleTime: 2 * time.Minute,
})
if err != nil {
logger.Fatal("failed to open database", zap.Error(err))
}
defer db.Close()
db.Logger = logger
// S3 init (always required — if it fails, the CLI has no reason to run).
s3Bucket := os.Getenv("AWS_S3_BUCKET")
if s3Bucket == "" {
logger.Fatal("AWS_S3_BUCKET not set (migration target)")
}
s3Service, err := services.NewS3StorageService(services.S3Config{
Bucket: s3Bucket,
Region: envOrDefault("AWS_REGION", "us-east-1"),
Endpoint: os.Getenv("AWS_S3_ENDPOINT"),
AccessKey: os.Getenv("AWS_ACCESS_KEY_ID"),
SecretKey: os.Getenv("AWS_SECRET_ACCESS_KEY"),
Logger: logger,
})
if err != nil {
logger.Fatal("failed to init S3", zap.Error(err))
}
stats := &runStats{}
runner := &migrator{
db: db.GormDB,
s3: s3Service,
logger: logger,
cfg: cfg,
stats: stats,
}
if err := runner.run(ctx); err != nil {
logger.Error("migration finished with errors", zap.Error(err))
stats.log(logger)
os.Exit(2)
}
stats.log(logger)
}
// runStats accumulates per-run counters for the final summary.
type runStats struct {
candidates int
uploaded int
skipped int
errors int
bytesSent int64
}
func (s *runStats) log(logger *zap.Logger) {
logger.Info("migrate_storage summary",
zap.Int("candidates", s.candidates),
zap.Int("uploaded", s.uploaded),
zap.Int("skipped", s.skipped),
zap.Int("errors", s.errors),
zap.Int64("bytes_sent", s.bytesSent),
)
}
type migrator struct {
db *gorm.DB
s3 *services.S3StorageService
logger *zap.Logger
cfg flags
stats *runStats
}
func (m *migrator) run(ctx context.Context) error {
seen := 0
for {
if ctx.Err() != nil {
return ctx.Err()
}
if m.cfg.limit > 0 && seen >= m.cfg.limit {
m.logger.Info("reached --limit, stopping", zap.Int("limit", m.cfg.limit))
return nil
}
// Pull a batch of local-backed tracks. Ordering by created_at
// oldest-first so long-lived legacy rows get processed first.
var tracks []models.Track
q := m.db.WithContext(ctx).
Where("storage_backend = ?", "local").
Where("file_path != ''").
Order("created_at ASC").
Limit(m.cfg.batchSize)
if err := q.Find(&tracks).Error; err != nil {
return fmt.Errorf("select batch: %w", err)
}
if len(tracks) == 0 {
m.logger.Info("no more local-backed tracks, done")
return nil
}
m.stats.candidates += len(tracks)
for i := range tracks {
if ctx.Err() != nil {
return ctx.Err()
}
if m.cfg.limit > 0 && seen >= m.cfg.limit {
return nil
}
m.migrateOne(ctx, &tracks[i])
seen++
}
}
}
func (m *migrator) migrateOne(ctx context.Context, t *models.Track) {
log := m.logger.With(
zap.String("track_id", t.ID.String()),
zap.String("user_id", t.UserID.String()),
zap.String("file_path", t.FilePath),
)
if m.cfg.dryRun {
log.Info("dry-run: would upload to S3")
m.stats.skipped++
return
}
file, err := os.Open(t.FilePath)
if err != nil {
if os.IsNotExist(err) {
log.Warn("local file missing, skipping (orphan row)")
m.stats.skipped++
return
}
log.Error("open local file failed", zap.Error(err))
m.stats.errors++
return
}
defer file.Close()
stat, err := file.Stat()
if err != nil {
log.Error("stat local file failed", zap.Error(err))
m.stats.errors++
return
}
ext := extOrDefault(t.FilePath, t.Format)
s3Key := fmt.Sprintf("tracks/%s/%s%s", t.UserID.String(), t.ID.String(), ext)
contentType := mimeFromExt(ext)
uploadCtx, cancel := context.WithTimeout(ctx, 10*time.Minute)
defer cancel()
if _, err := m.s3.UploadStream(uploadCtx, file, s3Key, contentType, stat.Size()); err != nil {
log.Error("S3 upload failed", zap.Error(err), zap.String("s3_key", s3Key))
m.stats.errors++
return
}
// Update DB row atomically.
if err := m.db.WithContext(ctx).Model(&models.Track{}).
Where("id = ?", t.ID).
Updates(map[string]interface{}{
"storage_backend": "s3",
"storage_key": s3Key,
}).Error; err != nil {
// Object uploaded but row stale — log loud. Admin can run a reconcile
// query to find this state: SELECT id FROM tracks WHERE storage_backend='local' AND EXISTS(s3 object for id).
log.Error("DB update failed after S3 upload (object leaked)",
zap.String("s3_key", s3Key),
zap.Error(err))
m.stats.errors++
return
}
m.stats.uploaded++
m.stats.bytesSent += stat.Size()
if m.cfg.deleteLocal {
if err := os.Remove(t.FilePath); err != nil {
log.Warn("local delete failed (file orphaned)", zap.Error(err))
}
}
log.Info("migrated",
zap.String("s3_key", s3Key),
zap.Int64("bytes", stat.Size()),
)
}
func envOrDefault(key, def string) string {
if v := os.Getenv(key); v != "" {
return v
}
return def
}
// extOrDefault returns the file extension (with dot) from path, falling back
// to ".<format>" when path has none.
func extOrDefault(path, format string) string {
for i := len(path) - 1; i >= 0 && path[i] != '/'; i-- {
if path[i] == '.' {
return path[i:]
}
}
if format != "" {
return "." + format
}
return ""
}
func mimeFromExt(ext string) string {
switch ext {
case ".mp3":
return "audio/mpeg"
case ".flac":
return "audio/flac"
case ".wav":
return "audio/wav"
case ".ogg":
return "audio/ogg"
case ".m4a", ".aac":
return "audio/aac"
default:
return "application/octet-stream"
}
}