diff --git a/veza-backend-api/cmd/api/main.go b/veza-backend-api/cmd/api/main.go index 37afd84ae..bc729ab6b 100644 --- a/veza-backend-api/cmd/api/main.go +++ b/veza-backend-api/cmd/api/main.go @@ -22,6 +22,7 @@ import ( "veza-backend-api/internal/config" "veza-backend-api/internal/core/marketplace" vezaes "veza-backend-api/internal/elasticsearch" + "veza-backend-api/internal/jobs" "veza-backend-api/internal/metrics" "veza-backend-api/internal/services" "veza-backend-api/internal/shutdown" @@ -276,6 +277,10 @@ func main() { os.Exit(1) } + // v1.0.4: Hourly cleanup of tracks stuck in `processing` whose upload file + // vanished (crash, SIGKILL, disk wipe). Keeps the tracks table honest. + jobs.ScheduleOrphanTracksCleanup(db, logger) + // Configuration du serveur HTTP port := fmt.Sprintf("%d", cfg.AppPort) if cfg.AppPort == 0 { diff --git a/veza-backend-api/internal/jobs/cleanup_orphan_tracks.go b/veza-backend-api/internal/jobs/cleanup_orphan_tracks.go new file mode 100644 index 000000000..789eda3ab --- /dev/null +++ b/veza-backend-api/internal/jobs/cleanup_orphan_tracks.go @@ -0,0 +1,111 @@ +package jobs + +import ( + "context" + "os" + "time" + + "veza-backend-api/internal/database" + "veza-backend-api/internal/models" + + "go.uber.org/zap" +) + +// OrphanTrackAgeThreshold is how long a track can stay in the `processing` +// state before the cleanup job considers it abandoned. +const OrphanTrackAgeThreshold = time.Hour + +// CleanupOrphanTracks finds tracks stuck in `processing` whose source file +// has disappeared from disk (upload service crashed mid-write, disk cleanup, +// container restart during a long upload, etc.) and flips them to `failed` +// with an explanatory status message. It never deletes the row. +// +// Safe to run repeatedly: already-failed rows are ignored, and tracks still +// present on disk are left alone. +func CleanupOrphanTracks(ctx context.Context, db *database.Database, logger *zap.Logger) error { + if db == nil || db.GormDB == nil { + return nil + } + cutoff := time.Now().Add(-OrphanTrackAgeThreshold) + + var stuck []models.Track + if err := db.GormDB.WithContext(ctx). + Where("status = ? AND created_at < ?", models.TrackStatusProcessing, cutoff). + Find(&stuck).Error; err != nil { + logger.Error("Failed to query stuck processing tracks", zap.Error(err)) + return err + } + + if len(stuck) == 0 { + logger.Debug("Orphan tracks cleanup: no tracks older than threshold in processing state", + zap.Duration("age_threshold", OrphanTrackAgeThreshold), + ) + return nil + } + + failed := 0 + for i := range stuck { + track := &stuck[i] + if _, err := os.Stat(track.FilePath); err == nil { + // File still there — uploader is slow, not dead. Leave the row. + continue + } else if !os.IsNotExist(err) { + logger.Warn("Could not stat track file while checking orphan", + zap.String("track_id", track.ID.String()), + zap.String("file_path", track.FilePath), + zap.Error(err), + ) + continue + } + + updates := map[string]interface{}{ + "status": models.TrackStatusFailed, + "status_message": "orphan cleanup: file missing on disk after >1h in processing", + } + if err := db.GormDB.WithContext(ctx). + Model(&models.Track{}). + Where("id = ? AND status = ?", track.ID, models.TrackStatusProcessing). + Updates(updates).Error; err != nil { + logger.Error("Failed to mark orphan track as failed", + zap.String("track_id", track.ID.String()), + zap.Error(err), + ) + continue + } + failed++ + logger.Warn("Orphan track flipped to failed", + zap.String("track_id", track.ID.String()), + zap.String("file_path", track.FilePath), + zap.Duration("age", time.Since(track.CreatedAt)), + ) + } + + logger.Info("Orphan tracks cleanup complete", + zap.Int("candidates", len(stuck)), + zap.Int("marked_failed", failed), + ) + return nil +} + +// ScheduleOrphanTracksCleanup kicks off a background goroutine that runs +// CleanupOrphanTracks once immediately and then every hour thereafter. +// Mirrors the pattern used by ScheduleSessionCleanupJob. +func ScheduleOrphanTracksCleanup(db *database.Database, logger *zap.Logger) { + ticker := time.NewTicker(time.Hour) + go func() { + ctx := context.Background() + + if err := CleanupOrphanTracks(ctx, db, logger); err != nil { + logger.Error("Initial orphan tracks cleanup failed", zap.Error(err)) + } + + for range ticker.C { + if err := CleanupOrphanTracks(ctx, db, logger); err != nil { + logger.Error("Scheduled orphan tracks cleanup failed", zap.Error(err)) + } + } + }() + logger.Info("Orphan tracks cleanup scheduled to run hourly", + zap.Duration("age_threshold", OrphanTrackAgeThreshold), + ) +} diff --git a/veza-backend-api/internal/jobs/cleanup_orphan_tracks_test.go b/veza-backend-api/internal/jobs/cleanup_orphan_tracks_test.go new file mode 100644 index 000000000..200e5e52d --- /dev/null +++ b/veza-backend-api/internal/jobs/cleanup_orphan_tracks_test.go @@ -0,0 +1,122 @@ +package jobs + +import ( + "context" + "os" + "path/filepath" + "testing" + "time" + + "veza-backend-api/internal/database" + "veza-backend-api/internal/models" + + "github.com/google/uuid" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/zap/zaptest" + "gorm.io/driver/sqlite" + "gorm.io/gorm" +) + +func setupOrphanTestDB(t *testing.T) (*database.Database, *gorm.DB) { + gormDB, err := gorm.Open(sqlite.Open(":memory:"), &gorm.Config{}) + require.NoError(t, err) + + require.NoError(t, gormDB.AutoMigrate(&models.Track{}, &models.User{})) + + db := &database.Database{GormDB: gormDB} + return db, gormDB +} + +func insertTestTrack(t *testing.T, gormDB *gorm.DB, filePath string, status models.TrackStatus, createdAt time.Time) uuid.UUID { + ownerID := uuid.New() + require.NoError(t, gormDB.Create(&models.User{ + ID: ownerID, + Username: "owner-" + ownerID.String()[:8], + Email: ownerID.String() + "@test.local", + }).Error) + + trackID := uuid.New() + track := &models.Track{ + ID: trackID, + UserID: ownerID, + Title: "Test", + Artist: "Test", + Duration: 1, + FilePath: filePath, + FileSize: 1, + Status: status, + CreatedAt: createdAt, + } + require.NoError(t, gormDB.Create(track).Error) + // created_at is managed by autoCreateTime — force our sentinel time. + require.NoError(t, gormDB.Model(&models.Track{}).Where("id = ?", trackID).Update("created_at", createdAt).Error) + return trackID +} + +func TestCleanupOrphanTracks_FlipsStuckMissingFile(t *testing.T) { + db, gormDB := setupOrphanTestDB(t) + logger := zaptest.NewLogger(t) + + missingPath := filepath.Join(t.TempDir(), "vanished.mp3") + id := insertTestTrack(t, gormDB, missingPath, models.TrackStatusProcessing, time.Now().Add(-2*time.Hour)) + + require.NoError(t, CleanupOrphanTracks(context.Background(), db, logger)) + + var after models.Track + require.NoError(t, gormDB.First(&after, "id = ?", id).Error) + assert.Equal(t, models.TrackStatusFailed, after.Status) + assert.Contains(t, after.StatusMessage, "orphan cleanup") +} + +func TestCleanupOrphanTracks_LeavesFilePresent(t *testing.T) { + db, gormDB := setupOrphanTestDB(t) + logger := zaptest.NewLogger(t) + + goodPath := filepath.Join(t.TempDir(), "still-here.mp3") + require.NoError(t, os.WriteFile(goodPath, []byte("audio"), 0o600)) + id := insertTestTrack(t, gormDB, goodPath, models.TrackStatusProcessing, time.Now().Add(-2*time.Hour)) + + require.NoError(t, CleanupOrphanTracks(context.Background(), db, logger)) + + var after models.Track + require.NoError(t, gormDB.First(&after, "id = ?", id).Error) + assert.Equal(t, models.TrackStatusProcessing, after.Status, "slow uploads should not be marked failed") +} + +func TestCleanupOrphanTracks_LeavesRecent(t *testing.T) { + db, gormDB := setupOrphanTestDB(t) + logger := zaptest.NewLogger(t) + + missingPath := filepath.Join(t.TempDir(), "recent.mp3") + id := insertTestTrack(t, gormDB, missingPath, models.TrackStatusProcessing, time.Now().Add(-10*time.Minute)) + + require.NoError(t, CleanupOrphanTracks(context.Background(), db, logger)) + + var after models.Track + require.NoError(t, gormDB.First(&after, "id = ?", id).Error) + assert.Equal(t, models.TrackStatusProcessing, after.Status, "tracks younger than threshold must be spared") +} + +func TestCleanupOrphanTracks_IgnoresAlreadyFailed(t *testing.T) { + db, gormDB := setupOrphanTestDB(t) + logger := zaptest.NewLogger(t) + + missingPath := filepath.Join(t.TempDir(), "old.mp3") + id := insertTestTrack(t, gormDB, missingPath, models.TrackStatusFailed, time.Now().Add(-3*time.Hour)) + // Seed a message we'd notice if the job overwrote it. + require.NoError(t, gormDB.Model(&models.Track{}).Where("id = ?", id).Update("status_message", "previous failure").Error) + + require.NoError(t, CleanupOrphanTracks(context.Background(), db, logger)) + + var after models.Track + require.NoError(t, gormDB.First(&after, "id = ?", id).Error) + assert.Equal(t, models.TrackStatusFailed, after.Status) + assert.Equal(t, "previous failure", after.StatusMessage, "job must not rewrite unrelated failed rows") +} + +func TestCleanupOrphanTracks_NilDatabaseIsNoop(t *testing.T) { + logger := zaptest.NewLogger(t) + assert.NoError(t, CleanupOrphanTracks(context.Background(), nil, logger)) + assert.NoError(t, CleanupOrphanTracks(context.Background(), &database.Database{}, logger)) +}