feat(workers): hourly cleanup of orphan tracks stuck in processing
Upload flow: POST creates a track row with `status=processing` and
writes the file at `file_path`. If the uploader process dies (OOM,
SIGKILL during deploy, disk wipe) between row-create and status-update,
the row stays in `processing` forever with a `file_path` that doesn't
exist. The library UI shows a ghost track the user can never play,
never reach, and only partially delete.
New worker:
* `jobs/cleanup_orphan_tracks.go` — `CleanupOrphanTracks` queries
tracks with `status=processing AND created_at < NOW()-1h`, stats
the `file_path`, and flips the row to `status=failed` with
`status_message = "orphan cleanup: file missing on disk after >1h
in processing"`. Never deletes; never touches present files or
rows already in another state. Safe to run repeatedly.
* `ScheduleOrphanTracksCleanup(db, logger)` runs once at boot and
then every hour thereafter. Wired in `cmd/api/main.go` right after
route setup so restarts trigger an immediate scan.
* Threshold exported as `OrphanTrackAgeThreshold` constant so tests
and future tuning don't need to edit the worker.
Tests: 5 cases in `cleanup_orphan_tracks_test.go`:
- `_FlipsStuckMissingFile` happy path
- `_LeavesFilePresent` (slow uploads must not be failed)
- `_LeavesRecent` (below threshold)
- `_IgnoresAlreadyFailed` (idempotent)
- `_NilDatabaseIsNoop` (safety)
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
1cab2a1d56
commit
712a0568e3
3 changed files with 238 additions and 0 deletions
|
|
@ -22,6 +22,7 @@ import (
|
|||
"veza-backend-api/internal/config"
|
||||
"veza-backend-api/internal/core/marketplace"
|
||||
vezaes "veza-backend-api/internal/elasticsearch"
|
||||
"veza-backend-api/internal/jobs"
|
||||
"veza-backend-api/internal/metrics"
|
||||
"veza-backend-api/internal/services"
|
||||
"veza-backend-api/internal/shutdown"
|
||||
|
|
@ -276,6 +277,10 @@ func main() {
|
|||
os.Exit(1)
|
||||
}
|
||||
|
||||
// v1.0.4: Hourly cleanup of tracks stuck in `processing` whose upload file
|
||||
// vanished (crash, SIGKILL, disk wipe). Keeps the tracks table honest.
|
||||
jobs.ScheduleOrphanTracksCleanup(db, logger)
|
||||
|
||||
// Configuration du serveur HTTP
|
||||
port := fmt.Sprintf("%d", cfg.AppPort)
|
||||
if cfg.AppPort == 0 {
|
||||
|
|
|
|||
111
veza-backend-api/internal/jobs/cleanup_orphan_tracks.go
Normal file
111
veza-backend-api/internal/jobs/cleanup_orphan_tracks.go
Normal file
|
|
@ -0,0 +1,111 @@
|
|||
package jobs
|
||||
|
||||
import (
|
||||
"context"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"veza-backend-api/internal/database"
|
||||
"veza-backend-api/internal/models"
|
||||
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// OrphanTrackAgeThreshold is how long a track can stay in the `processing`
|
||||
// state before the cleanup job considers it abandoned.
|
||||
const OrphanTrackAgeThreshold = time.Hour
|
||||
|
||||
// CleanupOrphanTracks finds tracks stuck in `processing` whose source file
|
||||
// has disappeared from disk (upload service crashed mid-write, disk cleanup,
|
||||
// container restart during a long upload, etc.) and flips them to `failed`
|
||||
// with an explanatory status message. It never deletes the row.
|
||||
//
|
||||
// Safe to run repeatedly: already-failed rows are ignored, and tracks still
|
||||
// present on disk are left alone.
|
||||
func CleanupOrphanTracks(ctx context.Context, db *database.Database, logger *zap.Logger) error {
|
||||
if db == nil || db.GormDB == nil {
|
||||
return nil
|
||||
}
|
||||
cutoff := time.Now().Add(-OrphanTrackAgeThreshold)
|
||||
|
||||
var stuck []models.Track
|
||||
if err := db.GormDB.WithContext(ctx).
|
||||
Where("status = ? AND created_at < ?", models.TrackStatusProcessing, cutoff).
|
||||
Find(&stuck).Error; err != nil {
|
||||
logger.Error("Failed to query stuck processing tracks", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
if len(stuck) == 0 {
|
||||
logger.Debug("Orphan tracks cleanup: no tracks older than threshold in processing state",
|
||||
zap.Duration("age_threshold", OrphanTrackAgeThreshold),
|
||||
)
|
||||
return nil
|
||||
}
|
||||
|
||||
failed := 0
|
||||
for i := range stuck {
|
||||
track := &stuck[i]
|
||||
if _, err := os.Stat(track.FilePath); err == nil {
|
||||
// File still there — uploader is slow, not dead. Leave the row.
|
||||
continue
|
||||
} else if !os.IsNotExist(err) {
|
||||
logger.Warn("Could not stat track file while checking orphan",
|
||||
zap.String("track_id", track.ID.String()),
|
||||
zap.String("file_path", track.FilePath),
|
||||
zap.Error(err),
|
||||
)
|
||||
continue
|
||||
}
|
||||
|
||||
updates := map[string]interface{}{
|
||||
"status": models.TrackStatusFailed,
|
||||
"status_message": "orphan cleanup: file missing on disk after >1h in processing",
|
||||
}
|
||||
if err := db.GormDB.WithContext(ctx).
|
||||
Model(&models.Track{}).
|
||||
Where("id = ? AND status = ?", track.ID, models.TrackStatusProcessing).
|
||||
Updates(updates).Error; err != nil {
|
||||
logger.Error("Failed to mark orphan track as failed",
|
||||
zap.String("track_id", track.ID.String()),
|
||||
zap.Error(err),
|
||||
)
|
||||
continue
|
||||
}
|
||||
failed++
|
||||
logger.Warn("Orphan track flipped to failed",
|
||||
zap.String("track_id", track.ID.String()),
|
||||
zap.String("file_path", track.FilePath),
|
||||
zap.Duration("age", time.Since(track.CreatedAt)),
|
||||
)
|
||||
}
|
||||
|
||||
logger.Info("Orphan tracks cleanup complete",
|
||||
zap.Int("candidates", len(stuck)),
|
||||
zap.Int("marked_failed", failed),
|
||||
)
|
||||
return nil
|
||||
}
|
||||
|
||||
// ScheduleOrphanTracksCleanup kicks off a background goroutine that runs
|
||||
// CleanupOrphanTracks once immediately and then every hour thereafter.
|
||||
// Mirrors the pattern used by ScheduleSessionCleanupJob.
|
||||
func ScheduleOrphanTracksCleanup(db *database.Database, logger *zap.Logger) {
|
||||
ticker := time.NewTicker(time.Hour)
|
||||
go func() {
|
||||
ctx := context.Background()
|
||||
|
||||
if err := CleanupOrphanTracks(ctx, db, logger); err != nil {
|
||||
logger.Error("Initial orphan tracks cleanup failed", zap.Error(err))
|
||||
}
|
||||
|
||||
for range ticker.C {
|
||||
if err := CleanupOrphanTracks(ctx, db, logger); err != nil {
|
||||
logger.Error("Scheduled orphan tracks cleanup failed", zap.Error(err))
|
||||
}
|
||||
}
|
||||
}()
|
||||
logger.Info("Orphan tracks cleanup scheduled to run hourly",
|
||||
zap.Duration("age_threshold", OrphanTrackAgeThreshold),
|
||||
)
|
||||
}
|
||||
122
veza-backend-api/internal/jobs/cleanup_orphan_tracks_test.go
Normal file
122
veza-backend-api/internal/jobs/cleanup_orphan_tracks_test.go
Normal file
|
|
@ -0,0 +1,122 @@
|
|||
package jobs
|
||||
|
||||
import (
|
||||
"context"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"veza-backend-api/internal/database"
|
||||
"veza-backend-api/internal/models"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.uber.org/zap/zaptest"
|
||||
"gorm.io/driver/sqlite"
|
||||
"gorm.io/gorm"
|
||||
)
|
||||
|
||||
func setupOrphanTestDB(t *testing.T) (*database.Database, *gorm.DB) {
|
||||
gormDB, err := gorm.Open(sqlite.Open(":memory:"), &gorm.Config{})
|
||||
require.NoError(t, err)
|
||||
|
||||
require.NoError(t, gormDB.AutoMigrate(&models.Track{}, &models.User{}))
|
||||
|
||||
db := &database.Database{GormDB: gormDB}
|
||||
return db, gormDB
|
||||
}
|
||||
|
||||
func insertTestTrack(t *testing.T, gormDB *gorm.DB, filePath string, status models.TrackStatus, createdAt time.Time) uuid.UUID {
|
||||
ownerID := uuid.New()
|
||||
require.NoError(t, gormDB.Create(&models.User{
|
||||
ID: ownerID,
|
||||
Username: "owner-" + ownerID.String()[:8],
|
||||
Email: ownerID.String() + "@test.local",
|
||||
}).Error)
|
||||
|
||||
trackID := uuid.New()
|
||||
track := &models.Track{
|
||||
ID: trackID,
|
||||
UserID: ownerID,
|
||||
Title: "Test",
|
||||
Artist: "Test",
|
||||
Duration: 1,
|
||||
FilePath: filePath,
|
||||
FileSize: 1,
|
||||
Status: status,
|
||||
CreatedAt: createdAt,
|
||||
}
|
||||
require.NoError(t, gormDB.Create(track).Error)
|
||||
// created_at is managed by autoCreateTime — force our sentinel time.
|
||||
require.NoError(t, gormDB.Model(&models.Track{}).Where("id = ?", trackID).Update("created_at", createdAt).Error)
|
||||
return trackID
|
||||
}
|
||||
|
||||
func TestCleanupOrphanTracks_FlipsStuckMissingFile(t *testing.T) {
|
||||
db, gormDB := setupOrphanTestDB(t)
|
||||
logger := zaptest.NewLogger(t)
|
||||
|
||||
missingPath := filepath.Join(t.TempDir(), "vanished.mp3")
|
||||
id := insertTestTrack(t, gormDB, missingPath, models.TrackStatusProcessing, time.Now().Add(-2*time.Hour))
|
||||
|
||||
require.NoError(t, CleanupOrphanTracks(context.Background(), db, logger))
|
||||
|
||||
var after models.Track
|
||||
require.NoError(t, gormDB.First(&after, "id = ?", id).Error)
|
||||
assert.Equal(t, models.TrackStatusFailed, after.Status)
|
||||
assert.Contains(t, after.StatusMessage, "orphan cleanup")
|
||||
}
|
||||
|
||||
func TestCleanupOrphanTracks_LeavesFilePresent(t *testing.T) {
|
||||
db, gormDB := setupOrphanTestDB(t)
|
||||
logger := zaptest.NewLogger(t)
|
||||
|
||||
goodPath := filepath.Join(t.TempDir(), "still-here.mp3")
|
||||
require.NoError(t, os.WriteFile(goodPath, []byte("audio"), 0o600))
|
||||
id := insertTestTrack(t, gormDB, goodPath, models.TrackStatusProcessing, time.Now().Add(-2*time.Hour))
|
||||
|
||||
require.NoError(t, CleanupOrphanTracks(context.Background(), db, logger))
|
||||
|
||||
var after models.Track
|
||||
require.NoError(t, gormDB.First(&after, "id = ?", id).Error)
|
||||
assert.Equal(t, models.TrackStatusProcessing, after.Status, "slow uploads should not be marked failed")
|
||||
}
|
||||
|
||||
func TestCleanupOrphanTracks_LeavesRecent(t *testing.T) {
|
||||
db, gormDB := setupOrphanTestDB(t)
|
||||
logger := zaptest.NewLogger(t)
|
||||
|
||||
missingPath := filepath.Join(t.TempDir(), "recent.mp3")
|
||||
id := insertTestTrack(t, gormDB, missingPath, models.TrackStatusProcessing, time.Now().Add(-10*time.Minute))
|
||||
|
||||
require.NoError(t, CleanupOrphanTracks(context.Background(), db, logger))
|
||||
|
||||
var after models.Track
|
||||
require.NoError(t, gormDB.First(&after, "id = ?", id).Error)
|
||||
assert.Equal(t, models.TrackStatusProcessing, after.Status, "tracks younger than threshold must be spared")
|
||||
}
|
||||
|
||||
func TestCleanupOrphanTracks_IgnoresAlreadyFailed(t *testing.T) {
|
||||
db, gormDB := setupOrphanTestDB(t)
|
||||
logger := zaptest.NewLogger(t)
|
||||
|
||||
missingPath := filepath.Join(t.TempDir(), "old.mp3")
|
||||
id := insertTestTrack(t, gormDB, missingPath, models.TrackStatusFailed, time.Now().Add(-3*time.Hour))
|
||||
// Seed a message we'd notice if the job overwrote it.
|
||||
require.NoError(t, gormDB.Model(&models.Track{}).Where("id = ?", id).Update("status_message", "previous failure").Error)
|
||||
|
||||
require.NoError(t, CleanupOrphanTracks(context.Background(), db, logger))
|
||||
|
||||
var after models.Track
|
||||
require.NoError(t, gormDB.First(&after, "id = ?", id).Error)
|
||||
assert.Equal(t, models.TrackStatusFailed, after.Status)
|
||||
assert.Equal(t, "previous failure", after.StatusMessage, "job must not rewrite unrelated failed rows")
|
||||
}
|
||||
|
||||
func TestCleanupOrphanTracks_NilDatabaseIsNoop(t *testing.T) {
|
||||
logger := zaptest.NewLogger(t)
|
||||
assert.NoError(t, CleanupOrphanTracks(context.Background(), nil, logger))
|
||||
assert.NoError(t, CleanupOrphanTracks(context.Background(), &database.Database{}, logger))
|
||||
}
|
||||
Loading…
Reference in a new issue