veza/veza-backend-api/cmd/tools/seed/utils.go
senke 2eff5a9b10 refactor(backend): split seed tool into domain-specific modules
Extract monolithic seed main.go into separate files per domain:
users, tracks, playlists, chat, analytics, marketplace, social,
content, live, moderation, notifications, and misc. Add config,
fake data helpers, and utility modules. Update Makefile targets.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-25 23:35:07 +01:00

251 lines
6.8 KiB
Go

package main
import (
"database/sql"
"fmt"
"strings"
"time"
)
// BulkInsert performs a multi-row INSERT for maximum performance.
// columns: comma-separated column list. rows: slice of value slices.
// Returns number of rows inserted.
func BulkInsert(db *sql.DB, table string, columns string, rows [][]interface{}) (int, error) {
if len(rows) == 0 {
return 0, nil
}
cols := strings.Split(columns, ",")
for i := range cols {
cols[i] = strings.TrimSpace(cols[i])
}
numCols := len(cols)
// Batch in chunks of 500 rows to stay within PG parameter limit (65535)
batchSize := 500
if numCols > 20 {
batchSize = 200
}
if numCols > 40 {
batchSize = 100
}
total := 0
for start := 0; start < len(rows); start += batchSize {
end := start + batchSize
if end > len(rows) {
end = len(rows)
}
batch := rows[start:end]
var sb strings.Builder
sb.WriteString("INSERT INTO ")
sb.WriteString(table)
sb.WriteString(" (")
sb.WriteString(strings.Join(cols, ","))
sb.WriteString(") VALUES ")
args := make([]interface{}, 0, len(batch)*numCols)
paramIdx := 1
for i, row := range batch {
if i > 0 {
sb.WriteByte(',')
}
sb.WriteByte('(')
for j := 0; j < numCols; j++ {
if j > 0 {
sb.WriteByte(',')
}
fmt.Fprintf(&sb, "$%d", paramIdx)
paramIdx++
if j < len(row) {
args = append(args, row[j])
} else {
args = append(args, nil)
}
}
sb.WriteByte(')')
}
sb.WriteString(" ON CONFLICT DO NOTHING")
_, err := db.Exec(sb.String(), args...)
if err != nil {
return total, fmt.Errorf("bulk insert into %s (batch at offset %d): %w", table, start, err)
}
total += len(batch)
}
return total, nil
}
// BulkInsertRaw is like BulkInsert but with a raw suffix instead of ON CONFLICT DO NOTHING.
func BulkInsertRaw(db *sql.DB, table string, columns string, rows [][]interface{}, suffix string) (int, error) {
if len(rows) == 0 {
return 0, nil
}
cols := strings.Split(columns, ",")
for i := range cols {
cols[i] = strings.TrimSpace(cols[i])
}
numCols := len(cols)
batchSize := 500
if numCols > 20 {
batchSize = 200
}
if numCols > 40 {
batchSize = 100
}
total := 0
for start := 0; start < len(rows); start += batchSize {
end := start + batchSize
if end > len(rows) {
end = len(rows)
}
batch := rows[start:end]
var sb strings.Builder
sb.WriteString("INSERT INTO ")
sb.WriteString(table)
sb.WriteString(" (")
sb.WriteString(strings.Join(cols, ","))
sb.WriteString(") VALUES ")
args := make([]interface{}, 0, len(batch)*numCols)
paramIdx := 1
for i, row := range batch {
if i > 0 {
sb.WriteByte(',')
}
sb.WriteByte('(')
for j := 0; j < numCols; j++ {
if j > 0 {
sb.WriteByte(',')
}
fmt.Fprintf(&sb, "$%d", paramIdx)
paramIdx++
if j < len(row) {
args = append(args, row[j])
} else {
args = append(args, nil)
}
}
sb.WriteByte(')')
}
if suffix != "" {
sb.WriteByte(' ')
sb.WriteString(suffix)
}
_, err := db.Exec(sb.String(), args...)
if err != nil {
return total, fmt.Errorf("bulk insert into %s (batch at offset %d): %w", table, start, err)
}
total += len(batch)
}
return total, nil
}
// TruncateAll truncates all seedable tables in correct FK order.
func TruncateAll(db *sql.DB) error {
// TRUNCATE CASCADE handles FK ordering for us
tables := []string{
// Level 3 (deepest dependents first)
"order_items", "product_previews", "product_images", "product_licenses",
"product_reviews", "product_views", "licenses",
"read_receipts", "delivered_status", "message_reactions",
"cloud_file_versions", "cloud_file_shares",
"shared_queue_items", "queue_sessions",
"lesson_progress", "course_enrollments", "certificates", "course_reviews",
"track_distribution_status_history", "external_streaming_royalties",
// Level 2
"playlist_tracks", "playlist_collaborators", "playlist_follows", "playlist_share_links",
"track_versions", "track_plays", "track_likes", "track_comments", "track_shares",
"track_history", "track_lyrics", "track_stems", "track_tags", "track_genres",
"track_reposts", "track_distributions", "daily_track_stats", "geographic_play_stats",
"track_discovery_sources", "track_segment_stats", "audio_fingerprints",
"playback_history", "playback_analytics", "hls_streams", "hls_transcode_queue",
"file_metadata", "file_conversions", "user_files",
"room_members", "messages", "room_invitations",
"group_members", "group_join_requests", "group_invitations",
"comments", "likes", "webhook_failures",
"gear_images", "gear_documents", "gear_repairs",
"products", "orders",
"queue_items", "analytics_events", "moderation_actions",
"subscription_invoices", "lessons",
"seller_transfers", "seller_payouts",
// Level 1
"user_profiles", "user_settings", "user_roles", "user_preferences", "user_presence",
"user_blocks", "user_storage_quotas", "user_folders", "user_genre_follows",
"user_tag_follows", "notification_preferences", "push_subscriptions",
"federated_identities", "refresh_tokens", "password_reset_tokens",
"email_verification_tokens", "user_sessions",
"follows", "tracks", "playlists", "groups", "posts", "rooms",
"files", "file_uploads", "live_streams", "gear_items",
"api_keys", "webhooks", "reports", "announcements", "support_tickets",
"data_exports", "seller_stripe_accounts", "seller_balances",
"co_listening_sessions", "user_subscriptions", "notifications",
"password_history", "login_history", "sms_verification_codes",
"webauthn_credentials", "metric_alerts", "metric_alert_preferences",
"user_strikes", "user_suspensions", "spam_detections", "queues",
"courses",
// Level 0
"users", "audit_logs",
}
for _, t := range tables {
_, err := db.Exec(fmt.Sprintf("TRUNCATE TABLE %s CASCADE", t))
if err != nil {
// Table might not exist yet, skip silently
continue
}
}
return nil
}
// CountRows returns the row count for a table.
func CountRows(db *sql.DB, table string) int {
var n int
_ = db.QueryRow(fmt.Sprintf("SELECT COUNT(*) FROM %s", table)).Scan(&n)
return n
}
// Progress tracks and displays progress for a seeding step.
type Progress struct {
label string
total int
current int
startTime time.Time
}
// NewProgress creates a new progress tracker.
func NewProgress(label string, total int) *Progress {
return &Progress{
label: label,
total: total,
startTime: time.Now(),
}
}
// Update advances the progress counter.
func (p *Progress) Update(n int) {
p.current += n
}
// Done prints the completion message with timing.
func (p *Progress) Done() {
elapsed := time.Since(p.startTime)
fmt.Printf(" %-30s %6d rows (%s)\n", p.label, p.current, elapsed.Round(time.Millisecond))
}
// SeedResult stores the result of a seeding operation.
type SeedResult struct {
Table string
Count int
Duration time.Duration
}