feat(workers): hourly cleanup of orphan tracks stuck in processing

Upload flow: POST creates a track row with `status=processing` and
writes the file at `file_path`. If the uploader process dies (OOM,
SIGKILL during deploy, disk wipe) between row-create and status-update,
the row stays in `processing` forever with a `file_path` that doesn't
exist. The library UI shows a ghost track the user can never play,
never reach, and only partially delete.

New worker:

  * `jobs/cleanup_orphan_tracks.go` — `CleanupOrphanTracks` queries
    tracks with `status=processing AND created_at < NOW()-1h`, stats
    the `file_path`, and flips the row to `status=failed` with
    `status_message = "orphan cleanup: file missing on disk after >1h
    in processing"`. Never deletes; never touches present files or
    rows already in another state. Safe to run repeatedly.
  * `ScheduleOrphanTracksCleanup(db, logger)` runs once at boot and
    then every hour thereafter. Wired in `cmd/api/main.go` right after
    route setup so restarts trigger an immediate scan.
  * Threshold exported as `OrphanTrackAgeThreshold` constant so tests
    and future tuning don't need to edit the worker.

Tests: 5 cases in `cleanup_orphan_tracks_test.go`:
  - `_FlipsStuckMissingFile` happy path
  - `_LeavesFilePresent` (slow uploads must not be failed)
  - `_LeavesRecent` (below threshold)
  - `_IgnoresAlreadyFailed` (idempotent)
  - `_NilDatabaseIsNoop` (safety)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
senke 2026-04-16 14:57:24 +02:00
parent 3a95e38fdf
commit 5530267287
3 changed files with 238 additions and 0 deletions

View file

@ -22,6 +22,7 @@ import (
"veza-backend-api/internal/config"
"veza-backend-api/internal/core/marketplace"
vezaes "veza-backend-api/internal/elasticsearch"
"veza-backend-api/internal/jobs"
"veza-backend-api/internal/metrics"
"veza-backend-api/internal/services"
"veza-backend-api/internal/shutdown"
@ -276,6 +277,10 @@ func main() {
os.Exit(1)
}
// v1.0.4: Hourly cleanup of tracks stuck in `processing` whose upload file
// vanished (crash, SIGKILL, disk wipe). Keeps the tracks table honest.
jobs.ScheduleOrphanTracksCleanup(db, logger)
// Configuration du serveur HTTP
port := fmt.Sprintf("%d", cfg.AppPort)
if cfg.AppPort == 0 {

View file

@ -0,0 +1,111 @@
package jobs
import (
"context"
"os"
"time"
"veza-backend-api/internal/database"
"veza-backend-api/internal/models"
"go.uber.org/zap"
)
// OrphanTrackAgeThreshold is how long a track can stay in the `processing`
// state before the cleanup job considers it abandoned.
const OrphanTrackAgeThreshold = time.Hour
// CleanupOrphanTracks finds tracks stuck in `processing` whose source file
// has disappeared from disk (upload service crashed mid-write, disk cleanup,
// container restart during a long upload, etc.) and flips them to `failed`
// with an explanatory status message. It never deletes the row.
//
// Safe to run repeatedly: already-failed rows are ignored, and tracks still
// present on disk are left alone.
func CleanupOrphanTracks(ctx context.Context, db *database.Database, logger *zap.Logger) error {
if db == nil || db.GormDB == nil {
return nil
}
cutoff := time.Now().Add(-OrphanTrackAgeThreshold)
var stuck []models.Track
if err := db.GormDB.WithContext(ctx).
Where("status = ? AND created_at < ?", models.TrackStatusProcessing, cutoff).
Find(&stuck).Error; err != nil {
logger.Error("Failed to query stuck processing tracks", zap.Error(err))
return err
}
if len(stuck) == 0 {
logger.Debug("Orphan tracks cleanup: no tracks older than threshold in processing state",
zap.Duration("age_threshold", OrphanTrackAgeThreshold),
)
return nil
}
failed := 0
for i := range stuck {
track := &stuck[i]
if _, err := os.Stat(track.FilePath); err == nil {
// File still there — uploader is slow, not dead. Leave the row.
continue
} else if !os.IsNotExist(err) {
logger.Warn("Could not stat track file while checking orphan",
zap.String("track_id", track.ID.String()),
zap.String("file_path", track.FilePath),
zap.Error(err),
)
continue
}
updates := map[string]interface{}{
"status": models.TrackStatusFailed,
"status_message": "orphan cleanup: file missing on disk after >1h in processing",
}
if err := db.GormDB.WithContext(ctx).
Model(&models.Track{}).
Where("id = ? AND status = ?", track.ID, models.TrackStatusProcessing).
Updates(updates).Error; err != nil {
logger.Error("Failed to mark orphan track as failed",
zap.String("track_id", track.ID.String()),
zap.Error(err),
)
continue
}
failed++
logger.Warn("Orphan track flipped to failed",
zap.String("track_id", track.ID.String()),
zap.String("file_path", track.FilePath),
zap.Duration("age", time.Since(track.CreatedAt)),
)
}
logger.Info("Orphan tracks cleanup complete",
zap.Int("candidates", len(stuck)),
zap.Int("marked_failed", failed),
)
return nil
}
// ScheduleOrphanTracksCleanup kicks off a background goroutine that runs
// CleanupOrphanTracks once immediately and then every hour thereafter.
// Mirrors the pattern used by ScheduleSessionCleanupJob.
func ScheduleOrphanTracksCleanup(db *database.Database, logger *zap.Logger) {
ticker := time.NewTicker(time.Hour)
go func() {
ctx := context.Background()
if err := CleanupOrphanTracks(ctx, db, logger); err != nil {
logger.Error("Initial orphan tracks cleanup failed", zap.Error(err))
}
for range ticker.C {
if err := CleanupOrphanTracks(ctx, db, logger); err != nil {
logger.Error("Scheduled orphan tracks cleanup failed", zap.Error(err))
}
}
}()
logger.Info("Orphan tracks cleanup scheduled to run hourly",
zap.Duration("age_threshold", OrphanTrackAgeThreshold),
)
}

View file

@ -0,0 +1,122 @@
package jobs
import (
"context"
"os"
"path/filepath"
"testing"
"time"
"veza-backend-api/internal/database"
"veza-backend-api/internal/models"
"github.com/google/uuid"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zaptest"
"gorm.io/driver/sqlite"
"gorm.io/gorm"
)
func setupOrphanTestDB(t *testing.T) (*database.Database, *gorm.DB) {
gormDB, err := gorm.Open(sqlite.Open(":memory:"), &gorm.Config{})
require.NoError(t, err)
require.NoError(t, gormDB.AutoMigrate(&models.Track{}, &models.User{}))
db := &database.Database{GormDB: gormDB}
return db, gormDB
}
func insertTestTrack(t *testing.T, gormDB *gorm.DB, filePath string, status models.TrackStatus, createdAt time.Time) uuid.UUID {
ownerID := uuid.New()
require.NoError(t, gormDB.Create(&models.User{
ID: ownerID,
Username: "owner-" + ownerID.String()[:8],
Email: ownerID.String() + "@test.local",
}).Error)
trackID := uuid.New()
track := &models.Track{
ID: trackID,
UserID: ownerID,
Title: "Test",
Artist: "Test",
Duration: 1,
FilePath: filePath,
FileSize: 1,
Status: status,
CreatedAt: createdAt,
}
require.NoError(t, gormDB.Create(track).Error)
// created_at is managed by autoCreateTime — force our sentinel time.
require.NoError(t, gormDB.Model(&models.Track{}).Where("id = ?", trackID).Update("created_at", createdAt).Error)
return trackID
}
func TestCleanupOrphanTracks_FlipsStuckMissingFile(t *testing.T) {
db, gormDB := setupOrphanTestDB(t)
logger := zaptest.NewLogger(t)
missingPath := filepath.Join(t.TempDir(), "vanished.mp3")
id := insertTestTrack(t, gormDB, missingPath, models.TrackStatusProcessing, time.Now().Add(-2*time.Hour))
require.NoError(t, CleanupOrphanTracks(context.Background(), db, logger))
var after models.Track
require.NoError(t, gormDB.First(&after, "id = ?", id).Error)
assert.Equal(t, models.TrackStatusFailed, after.Status)
assert.Contains(t, after.StatusMessage, "orphan cleanup")
}
func TestCleanupOrphanTracks_LeavesFilePresent(t *testing.T) {
db, gormDB := setupOrphanTestDB(t)
logger := zaptest.NewLogger(t)
goodPath := filepath.Join(t.TempDir(), "still-here.mp3")
require.NoError(t, os.WriteFile(goodPath, []byte("audio"), 0o600))
id := insertTestTrack(t, gormDB, goodPath, models.TrackStatusProcessing, time.Now().Add(-2*time.Hour))
require.NoError(t, CleanupOrphanTracks(context.Background(), db, logger))
var after models.Track
require.NoError(t, gormDB.First(&after, "id = ?", id).Error)
assert.Equal(t, models.TrackStatusProcessing, after.Status, "slow uploads should not be marked failed")
}
func TestCleanupOrphanTracks_LeavesRecent(t *testing.T) {
db, gormDB := setupOrphanTestDB(t)
logger := zaptest.NewLogger(t)
missingPath := filepath.Join(t.TempDir(), "recent.mp3")
id := insertTestTrack(t, gormDB, missingPath, models.TrackStatusProcessing, time.Now().Add(-10*time.Minute))
require.NoError(t, CleanupOrphanTracks(context.Background(), db, logger))
var after models.Track
require.NoError(t, gormDB.First(&after, "id = ?", id).Error)
assert.Equal(t, models.TrackStatusProcessing, after.Status, "tracks younger than threshold must be spared")
}
func TestCleanupOrphanTracks_IgnoresAlreadyFailed(t *testing.T) {
db, gormDB := setupOrphanTestDB(t)
logger := zaptest.NewLogger(t)
missingPath := filepath.Join(t.TempDir(), "old.mp3")
id := insertTestTrack(t, gormDB, missingPath, models.TrackStatusFailed, time.Now().Add(-3*time.Hour))
// Seed a message we'd notice if the job overwrote it.
require.NoError(t, gormDB.Model(&models.Track{}).Where("id = ?", id).Update("status_message", "previous failure").Error)
require.NoError(t, CleanupOrphanTracks(context.Background(), db, logger))
var after models.Track
require.NoError(t, gormDB.First(&after, "id = ?", id).Error)
assert.Equal(t, models.TrackStatusFailed, after.Status)
assert.Equal(t, "previous failure", after.StatusMessage, "job must not rewrite unrelated failed rows")
}
func TestCleanupOrphanTracks_NilDatabaseIsNoop(t *testing.T) {
logger := zaptest.NewLogger(t)
assert.NoError(t, CleanupOrphanTracks(context.Background(), nil, logger))
assert.NoError(t, CleanupOrphanTracks(context.Background(), &database.Database{}, logger))
}