Some checks failed
Veza CI / Backend (Go) (push) Failing after 0s
Veza CI / Frontend (Web) (push) Failing after 0s
Veza CI / Rust (Stream Server) (push) Failing after 0s
Security Scan / Secret Scanning (gitleaks) (push) Failing after 0s
Veza CI / Notify on failure (push) Failing after 0s
Closes MinIO Phase 3: ops path for migrating existing tracks.
Usage:
export DATABASE_URL=... AWS_S3_BUCKET=... AWS_S3_ENDPOINT=... ...
migrate_storage --dry-run --limit=10 # plan a batch
migrate_storage --batch-size=50 --limit=500 # migrate first 500
migrate_storage --delete-local=true # also rm local files
Design:
- Idempotent: WHERE storage_backend='local' + per-row DB update means
a crashed run resumes cleanly without duplicating uploads.
- Streaming upload via S3StorageService.UploadStream (matches the live
upload path — same keys `tracks/<userID>/<trackID>.<ext>`, same MIME
resolution).
- Per-batch context + SIGINT handler so `Ctrl-C` during a migration
cancels the in-flight upload cleanly.
- Global `--timeout-min=30` safety cap.
- `--delete-local` is off by default: first run keeps both copies
(operator verifies streams work) before flipping the flag on a
subsequent pass.
- Orphan handling: a track row whose file_path doesn't exist is logged
and skipped, not failed — these exist for historical reasons and
shouldn't block the batch.
Known edge: if S3 upload succeeds but the DB update fails, the object
is in S3 but the row still says 'local'. Log message spells out the
reconcile query. v1.0.9 could add a verification pass.
Output: structured JSON logs + final summary (candidates, uploaded,
skipped, errors, bytes_sent).
Refs: plan Batch A step A6, migration 985 schema (Phase 0, d03232c8).
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
317 lines
7.9 KiB
Go
317 lines
7.9 KiB
Go
// Command migrate_storage bulk-migrates tracks from local disk to MinIO/S3.
|
|
//
|
|
// Reads env (DATABASE_URL, AWS_S3_*) and iterates rows where
|
|
// `tracks.storage_backend='local'`, streaming each file to S3 at
|
|
// `tracks/<userID>/<trackID>.<ext>` and atomically updating the row to
|
|
// `storage_backend='s3' + storage_key=<key>`.
|
|
//
|
|
// v1.0.8 Phase 3. Usage:
|
|
//
|
|
// migrate_storage --dry-run
|
|
// migrate_storage --batch-size=50 --limit=500
|
|
// migrate_storage --delete-local=true # delete source file after successful upload
|
|
//
|
|
// Safe to re-run: each iteration filters on storage_backend='local', so rows
|
|
// already migrated are skipped. A crash mid-batch leaves partial progress
|
|
// committed per row (transactions scoped to one track).
|
|
package main
|
|
|
|
import (
|
|
"context"
|
|
"flag"
|
|
"fmt"
|
|
"os"
|
|
"os/signal"
|
|
"syscall"
|
|
"time"
|
|
|
|
"veza-backend-api/internal/database"
|
|
"veza-backend-api/internal/models"
|
|
"veza-backend-api/internal/services"
|
|
|
|
"github.com/joho/godotenv"
|
|
"go.uber.org/zap"
|
|
"gorm.io/gorm"
|
|
)
|
|
|
|
type flags struct {
|
|
dryRun bool
|
|
batchSize int
|
|
limit int
|
|
deleteLocal bool
|
|
timeoutMin int
|
|
}
|
|
|
|
func parseFlags() flags {
|
|
f := flags{}
|
|
flag.BoolVar(&f.dryRun, "dry-run", false, "Log candidates without uploading or updating DB")
|
|
flag.IntVar(&f.batchSize, "batch-size", 50, "Rows to SELECT per iteration")
|
|
flag.IntVar(&f.limit, "limit", 0, "Total max rows to migrate (0 = unlimited)")
|
|
flag.BoolVar(&f.deleteLocal, "delete-local", false, "Remove local file after successful S3 upload")
|
|
flag.IntVar(&f.timeoutMin, "timeout-min", 30, "Global timeout in minutes (safety stop)")
|
|
flag.Parse()
|
|
return f
|
|
}
|
|
|
|
func main() {
|
|
_ = godotenv.Load()
|
|
|
|
cfg := parseFlags()
|
|
|
|
logger, err := zap.NewProduction()
|
|
if err != nil {
|
|
fmt.Fprintf(os.Stderr, "failed to init logger: %v\n", err)
|
|
os.Exit(1)
|
|
}
|
|
defer logger.Sync()
|
|
|
|
logger.Info("migrate_storage starting",
|
|
zap.Bool("dry_run", cfg.dryRun),
|
|
zap.Int("batch_size", cfg.batchSize),
|
|
zap.Int("limit", cfg.limit),
|
|
zap.Bool("delete_local", cfg.deleteLocal),
|
|
)
|
|
|
|
// Top-level context with SIGINT + timeout so a stuck S3 call doesn't hang forever.
|
|
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(cfg.timeoutMin)*time.Minute)
|
|
defer cancel()
|
|
sigCh := make(chan os.Signal, 1)
|
|
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
|
|
go func() {
|
|
<-sigCh
|
|
logger.Warn("signal received, cancelling context")
|
|
cancel()
|
|
}()
|
|
|
|
// DB init — minimal config, just DATABASE_URL.
|
|
dbURL := os.Getenv("DATABASE_URL")
|
|
if dbURL == "" {
|
|
logger.Fatal("DATABASE_URL not set")
|
|
}
|
|
db, err := database.NewDatabase(&database.Config{
|
|
URL: dbURL,
|
|
MaxOpenConns: 5,
|
|
MaxIdleConns: 2,
|
|
MaxLifetime: 5 * time.Minute,
|
|
MaxIdleTime: 2 * time.Minute,
|
|
})
|
|
if err != nil {
|
|
logger.Fatal("failed to open database", zap.Error(err))
|
|
}
|
|
defer db.Close()
|
|
db.Logger = logger
|
|
|
|
// S3 init (always required — if it fails, the CLI has no reason to run).
|
|
s3Bucket := os.Getenv("AWS_S3_BUCKET")
|
|
if s3Bucket == "" {
|
|
logger.Fatal("AWS_S3_BUCKET not set (migration target)")
|
|
}
|
|
s3Service, err := services.NewS3StorageService(services.S3Config{
|
|
Bucket: s3Bucket,
|
|
Region: envOrDefault("AWS_REGION", "us-east-1"),
|
|
Endpoint: os.Getenv("AWS_S3_ENDPOINT"),
|
|
AccessKey: os.Getenv("AWS_ACCESS_KEY_ID"),
|
|
SecretKey: os.Getenv("AWS_SECRET_ACCESS_KEY"),
|
|
Logger: logger,
|
|
})
|
|
if err != nil {
|
|
logger.Fatal("failed to init S3", zap.Error(err))
|
|
}
|
|
|
|
stats := &runStats{}
|
|
runner := &migrator{
|
|
db: db.GormDB,
|
|
s3: s3Service,
|
|
logger: logger,
|
|
cfg: cfg,
|
|
stats: stats,
|
|
}
|
|
if err := runner.run(ctx); err != nil {
|
|
logger.Error("migration finished with errors", zap.Error(err))
|
|
stats.log(logger)
|
|
os.Exit(2)
|
|
}
|
|
stats.log(logger)
|
|
}
|
|
|
|
// runStats accumulates per-run counters for the final summary.
|
|
type runStats struct {
|
|
candidates int
|
|
uploaded int
|
|
skipped int
|
|
errors int
|
|
bytesSent int64
|
|
}
|
|
|
|
func (s *runStats) log(logger *zap.Logger) {
|
|
logger.Info("migrate_storage summary",
|
|
zap.Int("candidates", s.candidates),
|
|
zap.Int("uploaded", s.uploaded),
|
|
zap.Int("skipped", s.skipped),
|
|
zap.Int("errors", s.errors),
|
|
zap.Int64("bytes_sent", s.bytesSent),
|
|
)
|
|
}
|
|
|
|
type migrator struct {
|
|
db *gorm.DB
|
|
s3 *services.S3StorageService
|
|
logger *zap.Logger
|
|
cfg flags
|
|
stats *runStats
|
|
}
|
|
|
|
func (m *migrator) run(ctx context.Context) error {
|
|
seen := 0
|
|
for {
|
|
if ctx.Err() != nil {
|
|
return ctx.Err()
|
|
}
|
|
if m.cfg.limit > 0 && seen >= m.cfg.limit {
|
|
m.logger.Info("reached --limit, stopping", zap.Int("limit", m.cfg.limit))
|
|
return nil
|
|
}
|
|
|
|
// Pull a batch of local-backed tracks. Ordering by created_at
|
|
// oldest-first so long-lived legacy rows get processed first.
|
|
var tracks []models.Track
|
|
q := m.db.WithContext(ctx).
|
|
Where("storage_backend = ?", "local").
|
|
Where("file_path != ''").
|
|
Order("created_at ASC").
|
|
Limit(m.cfg.batchSize)
|
|
if err := q.Find(&tracks).Error; err != nil {
|
|
return fmt.Errorf("select batch: %w", err)
|
|
}
|
|
if len(tracks) == 0 {
|
|
m.logger.Info("no more local-backed tracks, done")
|
|
return nil
|
|
}
|
|
|
|
m.stats.candidates += len(tracks)
|
|
for i := range tracks {
|
|
if ctx.Err() != nil {
|
|
return ctx.Err()
|
|
}
|
|
if m.cfg.limit > 0 && seen >= m.cfg.limit {
|
|
return nil
|
|
}
|
|
m.migrateOne(ctx, &tracks[i])
|
|
seen++
|
|
}
|
|
}
|
|
}
|
|
|
|
func (m *migrator) migrateOne(ctx context.Context, t *models.Track) {
|
|
log := m.logger.With(
|
|
zap.String("track_id", t.ID.String()),
|
|
zap.String("user_id", t.UserID.String()),
|
|
zap.String("file_path", t.FilePath),
|
|
)
|
|
|
|
if m.cfg.dryRun {
|
|
log.Info("dry-run: would upload to S3")
|
|
m.stats.skipped++
|
|
return
|
|
}
|
|
|
|
file, err := os.Open(t.FilePath)
|
|
if err != nil {
|
|
if os.IsNotExist(err) {
|
|
log.Warn("local file missing, skipping (orphan row)")
|
|
m.stats.skipped++
|
|
return
|
|
}
|
|
log.Error("open local file failed", zap.Error(err))
|
|
m.stats.errors++
|
|
return
|
|
}
|
|
defer file.Close()
|
|
|
|
stat, err := file.Stat()
|
|
if err != nil {
|
|
log.Error("stat local file failed", zap.Error(err))
|
|
m.stats.errors++
|
|
return
|
|
}
|
|
|
|
ext := extOrDefault(t.FilePath, t.Format)
|
|
s3Key := fmt.Sprintf("tracks/%s/%s%s", t.UserID.String(), t.ID.String(), ext)
|
|
contentType := mimeFromExt(ext)
|
|
|
|
uploadCtx, cancel := context.WithTimeout(ctx, 10*time.Minute)
|
|
defer cancel()
|
|
if _, err := m.s3.UploadStream(uploadCtx, file, s3Key, contentType, stat.Size()); err != nil {
|
|
log.Error("S3 upload failed", zap.Error(err), zap.String("s3_key", s3Key))
|
|
m.stats.errors++
|
|
return
|
|
}
|
|
|
|
// Update DB row atomically.
|
|
if err := m.db.WithContext(ctx).Model(&models.Track{}).
|
|
Where("id = ?", t.ID).
|
|
Updates(map[string]interface{}{
|
|
"storage_backend": "s3",
|
|
"storage_key": s3Key,
|
|
}).Error; err != nil {
|
|
// Object uploaded but row stale — log loud. Admin can run a reconcile
|
|
// query to find this state: SELECT id FROM tracks WHERE storage_backend='local' AND EXISTS(s3 object for id).
|
|
log.Error("DB update failed after S3 upload (object leaked)",
|
|
zap.String("s3_key", s3Key),
|
|
zap.Error(err))
|
|
m.stats.errors++
|
|
return
|
|
}
|
|
|
|
m.stats.uploaded++
|
|
m.stats.bytesSent += stat.Size()
|
|
|
|
if m.cfg.deleteLocal {
|
|
if err := os.Remove(t.FilePath); err != nil {
|
|
log.Warn("local delete failed (file orphaned)", zap.Error(err))
|
|
}
|
|
}
|
|
|
|
log.Info("migrated",
|
|
zap.String("s3_key", s3Key),
|
|
zap.Int64("bytes", stat.Size()),
|
|
)
|
|
}
|
|
|
|
func envOrDefault(key, def string) string {
|
|
if v := os.Getenv(key); v != "" {
|
|
return v
|
|
}
|
|
return def
|
|
}
|
|
|
|
// extOrDefault returns the file extension (with dot) from path, falling back
|
|
// to ".<format>" when path has none.
|
|
func extOrDefault(path, format string) string {
|
|
for i := len(path) - 1; i >= 0 && path[i] != '/'; i-- {
|
|
if path[i] == '.' {
|
|
return path[i:]
|
|
}
|
|
}
|
|
if format != "" {
|
|
return "." + format
|
|
}
|
|
return ""
|
|
}
|
|
|
|
func mimeFromExt(ext string) string {
|
|
switch ext {
|
|
case ".mp3":
|
|
return "audio/mpeg"
|
|
case ".flac":
|
|
return "audio/flac"
|
|
case ".wav":
|
|
return "audio/wav"
|
|
case ".ogg":
|
|
return "audio/ogg"
|
|
case ".m4a", ".aac":
|
|
return "audio/aac"
|
|
default:
|
|
return "application/octet-stream"
|
|
}
|
|
}
|