diff --git a/veza-backend-api/internal/workers/email_job_test.go b/veza-backend-api/internal/workers/email_job_test.go index 98db44228..7ac5e9d6d 100644 --- a/veza-backend-api/internal/workers/email_job_test.go +++ b/veza-backend-api/internal/workers/email_job_test.go @@ -7,7 +7,7 @@ import ( "strings" "testing" - "veza-backend-api/internal/email" + // "veza-backend-api/internal/email" // Removed unused import "go.uber.org/zap" ) diff --git a/veza-backend-api/internal/workers/job_worker.go b/veza-backend-api/internal/workers/job_worker.go index afdacf7e5..d4ac55ae7 100644 --- a/veza-backend-api/internal/workers/job_worker.go +++ b/veza-backend-api/internal/workers/job_worker.go @@ -11,72 +11,94 @@ import ( "github.com/google/uuid" "go.uber.org/zap" "gorm.io/gorm" + "gorm.io/gorm/clause" ) -// JobWorker gère les tâches en arrière-plan +// JobWorker gère les tâches en arrière-plan via une queue persistée en DB type JobWorker struct { db *gorm.DB jobService *services.JobService logger *zap.Logger - queue chan Job maxRetries int processingWorkers int - emailSender email.EmailSender // Email sender pour les jobs d'email + emailSender email.EmailSender + pollingInterval time.Duration } -// Job représente une tâche à traiter +// Job représente une tâche persistée en base de données type Job struct { - ID uuid.UUID - Type string - Payload map[string]interface{} - Retries int - CreatedAt time.Time - Priority int // 1 = haut, 2 = moyen, 3 = bas + ID uuid.UUID `gorm:"type:uuid;primary_key"` + Type string `gorm:"not null"` + Payload map[string]interface{} `gorm:"serializer:json;not null"` + Status string `gorm:"not null;default:'pending'"` // pending, processing, completed, failed + Priority int `gorm:"not null;default:2"` // 1=high, 2=medium, 3=low + RunAt time.Time `gorm:"not null;index"` + CreatedAt time.Time `gorm:"not null"` + UpdatedAt time.Time `gorm:"not null"` + StartedAt *time.Time + CompletedAt *time.Time + FailedAt *time.Time + Retries int `gorm:"not null;default:0"` + MaxRetries int `gorm:"not null;default:3"` + LastError string `gorm:"type:text"` } -// NewJobWorker crée un nouveau worker de jobs +// NewJobWorker crée un nouveau worker de jobs persisté func NewJobWorker( db *gorm.DB, jobService *services.JobService, logger *zap.Logger, - queueSize int, + _ int, // queueSize ignoré car persisté workers int, maxRetries int, emailSender email.EmailSender, ) *JobWorker { + // AutoMigrate la table Job si nécessaire (optionnel si géré par migrations SQL) + // db.AutoMigrate(&Job{}) + return &JobWorker{ db: db, jobService: jobService, logger: logger, - queue: make(chan Job, queueSize), maxRetries: maxRetries, processingWorkers: workers, emailSender: emailSender, + pollingInterval: 1 * time.Second, // Polling agressif pour réactivité } } -// Enqueue ajoute un job au queue +// Enqueue ajoute un job dans la table jobs func (w *JobWorker) Enqueue(job Job) { - job.CreatedAt = time.Now() if job.ID == uuid.Nil { job.ID = uuid.New() } - - select { - case w.queue <- job: - w.logger.Debug("Job enqueued", - zap.String("job_id", job.ID.String()), - zap.String("job_type", job.Type), - zap.Int("priority", job.Priority)) - default: - w.logger.Warn("Job queue full, dropping job", - zap.String("job_type", job.Type)) + // Initialisation des champs par défaut + if job.Status == "" { + job.Status = "pending" } + if job.RunAt.IsZero() { + job.RunAt = time.Now() + } + if job.MaxRetries == 0 { + job.MaxRetries = w.maxRetries + } + // Le mapping GORM gère CreatedAt/UpdatedAt + + if err := w.db.Create(&job).Error; err != nil { + w.logger.Error("Failed to enqueue job", + zap.String("type", job.Type), + zap.Error(err)) + return + } + + w.logger.Debug("Job enqueued (persisted)", + zap.String("job_id", job.ID.String()), + zap.String("type", job.Type)) } -// Start démarre le worker +// Start démarre les workers de polling func (w *JobWorker) Start(ctx context.Context) { - w.logger.Info("Starting job worker", + w.logger.Info("Starting persisted job worker", zap.Int("workers", w.processingWorkers)) for i := 0; i < w.processingWorkers; i++ { @@ -84,79 +106,123 @@ func (w *JobWorker) Start(ctx context.Context) { } } -// processWorker traite les jobs du queue +// processWorker boucle de polling et traitement func (w *JobWorker) processWorker(ctx context.Context, workerID int) { - w.logger.Info("Job worker started", - zap.Int("worker_id", workerID)) + ticker := time.NewTicker(w.pollingInterval) + defer ticker.Stop() + + w.logger.Info("Worker started", zap.Int("worker_id", workerID)) for { select { case <-ctx.Done(): - w.logger.Info("Job worker stopping", - zap.Int("worker_id", workerID)) + w.logger.Info("Worker stopping", zap.Int("worker_id", workerID)) return - - case job := <-w.queue: - w.processJob(ctx, job, workerID) + case <-ticker.C: + w.fetchAndProcessJob(ctx, workerID) } } } -// processJob traite un job individuel +// fetchAndProcessJob récupère UN job en attente (atomiquement) et le traite +func (w *JobWorker) fetchAndProcessJob(ctx context.Context, workerID int) { + var job Job + + // Transaction pour verrouiller le job (SELECT ... FOR UPDATE SKIP LOCKED) + // Compatible Postgres (et MySQL 8+). Pour SQLite, le locking est différent mais Gorm gère le basic. + err := w.db.Transaction(func(tx *gorm.DB) error { + // Trouver un job 'pending' ou 'failed' (si retry auto géré ici, mais on préfère 'pending' avec RunAt <= Now) + // On cherche status='pending' AND run_at <= NOW() + // Order by Priority ASC (1 first), then CreatedAt + if err := tx.Clauses(clause.Locking{Strength: "UPDATE", Options: "SKIP LOCKED"}). + Where("status = ? AND run_at <= ?", "pending", time.Now()). + Order("priority ASC, created_at ASC"). + First(&job).Error; err != nil { + return err // RecordNotFound est typique ici + } + + // Update status to 'processing' + now := time.Now() + job.Status = "processing" + job.StartedAt = &now + if err := tx.Save(&job).Error; err != nil { + return err + } + return nil + }) + + if err != nil { + if err != gorm.ErrRecordNotFound { + w.logger.Error("Failed to fetch job", zap.Error(err)) + } + // Pas de job à traiter, on attend le prochain tick + return + } + + // Job récupéré, on traite + w.processJob(ctx, job, workerID) +} + +// processJob exécute la logique métier et met à jour le statut final func (w *JobWorker) processJob(ctx context.Context, job Job, workerID int) { + // Si le payload est une map vide, tenter de le decoder s'il vient de GORM (jsonb) + // Gorm avec `serializer:json` devrait le faire auto, mais verifions. + logger := w.logger.With( zap.String("job_id", job.ID.String()), - zap.String("job_type", job.Type), - zap.Int("worker_id", workerID)) + zap.String("type", job.Type), + zap.Int("worker_id", workerID), + zap.Int("retry", job.Retries)) - logger.Info("Processing job", - zap.Int("retries", job.Retries)) + logger.Info("Processing job") - // Créer un contexte avec timeout + // Timeout per job execution jobCtx, cancel := context.WithTimeout(ctx, 5*time.Minute) defer cancel() - // Traiter le job selon son type - err := w.executeJob(jobCtx, job) + // Exécution + execErr := w.executeJob(jobCtx, job) - if err != nil { - logger.Error("Job execution failed", - zap.Error(err)) - - // Retry si pas atteint max retries - if job.Retries < w.maxRetries { - job.Retries++ - - // Exponential backoff - delay := time.Duration(job.Retries) * 5 * time.Second - - // Non-blocking retry: re-enqueue after delay - go func(d time.Duration, j Job) { - time.Sleep(d) - w.Enqueue(j) - }(delay, job) - - logger.Info("Job scheduled for retry", - zap.Duration("delay", delay), - zap.Int("new_retries", job.Retries)) + // Update status final + now := time.Now() + if execErr != nil { + logger.Error("Job execution failed", zap.Error(execErr)) + + // Calcul du prochain retry + job.Retries++ + job.LastError = execErr.Error() + + if job.Retries >= job.MaxRetries { + job.Status = "failed" + job.FailedAt = &now + logger.Error("Job reached max retries, marked as failed") } else { - logger.Error("Job failed after max retries", - zap.Int("max_retries", w.maxRetries)) - - // Enregistrer l'échec définitif - w.logFailedJob(ctx, job, err) + // Backoff exponentiel : 5s, 10s, 20s... (lineaire * coefficient) ou 5 * retry + backoff := time.Duration(job.Retries) * 10 * time.Second + job.Status = "pending" // Retour en queue + job.RunAt = time.Now().Add(backoff) + logger.Info("Job scheduled for retry", zap.Duration("backoff", backoff)) } } else { - logger.Info("Job executed successfully") + job.Status = "completed" + job.CompletedAt = &now + logger.Info("Job completed successfully") + } + + // Sauvegarde finale + // On le fait hors transaction "fetch", car le traitement peut être long + if err := w.db.Save(&job).Error; err != nil { + logger.Error("Failed to update job status after execution", zap.Error(err)) } } -// executeJob exécute un job selon son type +// executeJob exécute la logique selon le type (inchangé) func (w *JobWorker) executeJob(ctx context.Context, job Job) error { switch job.Type { case "email": return w.processEmailJob(ctx, job) case "thumbnail": + // Mapping manuel pour compatibilité avec l'ancien code si nécessaire return w.processThumbnailJob(ctx, job) case "analytics": return w.processAnalyticsJob(ctx, job) @@ -165,27 +231,33 @@ func (w *JobWorker) executeJob(ctx context.Context, job Job) error { } } -// processEmailJob traite un job d'email +// processEmailJob (inchangé structurellement, mais adapte le payload use) func (w *JobWorker) processEmailJob(ctx context.Context, job Job) error { - // Extraire les données du payload - to, ok := job.Payload["to"].(string) - if !ok { + // Re-conversion du payload map si nécessaire + p := job.Payload + + to, _ := p["to"].(string) + if to == "" { return fmt.Errorf("missing 'to' in payload") } - subject, _ := job.Payload["subject"].(string) - body, _ := job.Payload["body"].(string) - templateName, _ := job.Payload["template"].(string) + subject, _ := p["subject"].(string) + body, _ := p["body"].(string) + templateName, _ := p["template"].(string) - // Extraire les données du template si présentes var templateData map[string]interface{} - if data, ok := job.Payload["template_data"].(map[string]interface{}); ok { + // Gorm serialization handle maps directly + if data, ok := p["template_data"].(map[string]interface{}); ok { templateData = data } else { - templateData = make(map[string]interface{}) + // Try generic map + if data, ok := p["template_data"].(map[string]any); ok { + templateData = data + } else { + templateData = make(map[string]interface{}) + } } - // Créer l'EmailJob var emailJob *EmailJob if templateName != "" { emailJob = NewEmailJobWithTemplate(to, subject, templateName, templateData) @@ -193,19 +265,16 @@ func (w *JobWorker) processEmailJob(ctx context.Context, job Job) error { emailJob = NewEmailJob(to, subject, body) } - // Exécuter le job d'email - if err := emailJob.Execute(ctx, w.emailSender, w.logger); err != nil { - return fmt.Errorf("email job execution failed: %w", err) - } - - return nil + return emailJob.Execute(ctx, w.emailSender, w.logger) } -// EnqueueEmailJob ajoute un job d'email au queue (méthode helper) +// Helper methods pour enqueuing (inchangés, mais adaptent l'objet Job) + +// EnqueueEmailJob helper func (w *JobWorker) EnqueueEmailJob(to, subject, body string) { job := Job{ Type: "email", - Priority: 2, // Priorité moyenne par défaut + Priority: 2, Payload: map[string]interface{}{ "to": to, "subject": subject, @@ -215,41 +284,41 @@ func (w *JobWorker) EnqueueEmailJob(to, subject, body string) { w.Enqueue(job) } -// EnqueueEmailJobWithTemplate ajoute un job d'email avec template au queue +// EnqueueEmailJobWithTemplate helper func (w *JobWorker) EnqueueEmailJobWithTemplate(to, subject, templateName string, templateData map[string]interface{}) { job := Job{ Type: "email", - Priority: 2, // Priorité moyenne par défaut + Priority: 2, Payload: map[string]interface{}{ - "to": to, - "subject": subject, - "template": templateName, + "to": to, + "subject": subject, + "template": templateName, "template_data": templateData, }, } w.Enqueue(job) } -// EnqueueThumbnailJob ajoute un job de génération de thumbnail au queue +// EnqueueThumbnailJob helper func (w *JobWorker) EnqueueThumbnailJob(inputPath, outputPath string, width, height int) { job := Job{ Type: "thumbnail", - Priority: 2, // Priorité moyenne par défaut + Priority: 2, Payload: map[string]interface{}{ "input_path": inputPath, "output_path": outputPath, - "width": float64(width), - "height": float64(height), + "width": width, + "height": height, }, } w.Enqueue(job) } -// EnqueueAnalyticsJob ajoute un job d'analytics au queue +// EnqueueAnalyticsJob helper func (w *JobWorker) EnqueueAnalyticsJob(eventName string, userID *uuid.UUID, payload map[string]interface{}) { jobPayload := map[string]interface{}{ "event_name": eventName, - "payload": payload, + "payload": payload, } if userID != nil { jobPayload["user_id"] = userID.String() @@ -257,93 +326,85 @@ func (w *JobWorker) EnqueueAnalyticsJob(eventName string, userID *uuid.UUID, pay job := Job{ Type: "analytics", - Priority: 3, // Priorité basse par défaut (analytics non critique) - Payload: jobPayload, + Priority: 3, + Payload: jobPayload, } w.Enqueue(job) } -// processThumbnailJob traite un job de génération de thumbnail +// processThumbnailJob wrapper func (w *JobWorker) processThumbnailJob(ctx context.Context, job Job) error { - // Extraire les paramètres du payload - inputPath, ok := job.Payload["input_path"].(string) - if !ok { - return fmt.Errorf("missing 'input_path' in payload") + p := job.Payload + inputPath, _ := p["input_path"].(string) + outputPath, _ := p["output_path"].(string) + + if inputPath == "" || outputPath == "" { + return fmt.Errorf("missing paths in payload") } - outputPath, ok := job.Payload["output_path"].(string) - if !ok { - return fmt.Errorf("missing 'output_path' in payload") - } - - // Largeur et hauteur (optionnels, avec valeurs par défaut) + // JSON unmarshal numbers as float64 width := 300 + if wVal, ok := p["width"].(float64); ok { + width = int(wVal) + } else if wInt, ok := p["width"].(int); ok { // just in case + width = wInt + } + height := 300 - if w, ok := job.Payload["width"].(float64); ok { - width = int(w) - } - if h, ok := job.Payload["height"].(float64); ok { - height = int(h) + if hVal, ok := p["height"].(float64); ok { + height = int(hVal) + } else if hInt, ok := p["height"].(int); ok { + height = hInt } - // Créer et exécuter le ThumbnailJob thumbnailJob := NewThumbnailJob(inputPath, outputPath, width, height) - if err := thumbnailJob.Execute(ctx, w.logger); err != nil { - return fmt.Errorf("thumbnail job execution failed: %w", err) - } - - return nil + return thumbnailJob.Execute(ctx, w.logger) } -// processAnalyticsJob traite un job d'analytics +// processAnalyticsJob wrapper func (w *JobWorker) processAnalyticsJob(ctx context.Context, job Job) error { - // Extraire les données du payload - eventName, ok := job.Payload["event_name"].(string) - if !ok { - return fmt.Errorf("missing 'event_name' in payload") + p := job.Payload + eventName, _ := p["event_name"].(string) + if eventName == "" { + return fmt.Errorf("missing event_name") } - // UserID (optionnel, peut être nil pour événements anonymes) var userID *uuid.UUID - if uidStr, ok := job.Payload["user_id"].(string); ok && uidStr != "" { + if uidStr, ok := p["user_id"].(string); ok && uidStr != "" { uid, err := uuid.Parse(uidStr) if err != nil { - return fmt.Errorf("invalid user_id format: %w", err) + return fmt.Errorf("invalid user_id: %w", err) } userID = &uid } - // Payload additionnel (optionnel) - var payload map[string]interface{} - if p, ok := job.Payload["payload"].(map[string]interface{}); ok { - payload = p + var extraPayload map[string]interface{} + // Handle nested map from JSON + if nested, ok := p["payload"].(map[string]interface{}); ok { + extraPayload = nested + } else if nested, ok := p["payload"].(map[string]any); ok { + extraPayload = nested } else { - payload = make(map[string]interface{}) + // If payload is a string (escaped json), try unmarshal? + // For now assume standard structure + extraPayload = make(map[string]interface{}) } - // Créer et exécuter l'AnalyticsEventJob - analyticsJob := NewAnalyticsEventJob(eventName, userID, payload) - if err := analyticsJob.Execute(ctx, w.db, w.logger); err != nil { - return fmt.Errorf("analytics job execution failed: %w", err) - } - - return nil + analyticsJob := NewAnalyticsEventJob(eventName, userID, extraPayload) + return analyticsJob.Execute(ctx, w.db, w.logger) } -// logFailedJob enregistre un échec de job -func (w *JobWorker) logFailedJob(ctx context.Context, job Job, err error) { - // TODO: Enregistrer dans la table job_failures - w.logger.Error("Job permanently failed", - zap.String("job_id", job.ID.String()), - zap.String("job_type", job.Type), - zap.Error(err)) -} - -// GetStats retourne les statistiques du worker +// GetStats retourne les stats DB si possible func (w *JobWorker) GetStats() map[string]interface{} { + var pending, processing, failed int64 + w.db.Model(&Job{}).Where("status = ?", "pending").Count(&pending) + w.db.Model(&Job{}).Where("status = ?", "processing").Count(&processing) + w.db.Model(&Job{}).Where("status = ?", "failed").Count(&failed) + return map[string]interface{}{ - "queue_size": len(w.queue), - "workers": w.processingWorkers, - "max_retries": w.maxRetries, + "queue_pending": pending, + "queue_processing": processing, + "queue_failed": failed, + "workers": w.processingWorkers, } } diff --git a/veza-backend-api/internal/workers/job_worker_test.go b/veza-backend-api/internal/workers/job_worker_test.go index 7e7ea836a..278f36a43 100644 --- a/veza-backend-api/internal/workers/job_worker_test.go +++ b/veza-backend-api/internal/workers/job_worker_test.go @@ -20,7 +20,12 @@ func setupTestJobWorker(t *testing.T) (*JobWorker, *gorm.DB) { t.Fatalf("Failed to open test database: %v", err) } - logger, _ := zap.NewDevelopment() + // Auto-migrate the Job struct for tests + if err := db.AutoMigrate(&Job{}); err != nil { + t.Fatalf("Failed to migrate test database: %v", err) + } + + logger := zap.NewNop() // Use Nop for tests to avoid noise jobService := services.NewJobService(logger) // Config SMTP de test (mock) @@ -37,7 +42,7 @@ func setupTestJobWorker(t *testing.T) (*JobWorker, *gorm.DB) { db, jobService, logger, - 10, // queueSize + 10, // queueSize (ignored) 1, // workers 3, // maxRetries emailSender, @@ -62,9 +67,9 @@ func TestJobWorker_Enqueue(t *testing.T) { worker.Enqueue(job) stats := worker.GetStats() - queueSize := stats["queue_size"].(int) + queueSize := stats["queue_pending"].(int64) if queueSize != 1 { - t.Errorf("Expected queue size to be 1, got %d", queueSize) + t.Errorf("Expected queue pending to be 1, got %d", queueSize) } } @@ -74,9 +79,9 @@ func TestJobWorker_EnqueueEmailJob(t *testing.T) { worker.EnqueueEmailJob("test@example.com", "Test Subject", "Test Body") stats := worker.GetStats() - queueSize := stats["queue_size"].(int) + queueSize := stats["queue_pending"].(int64) if queueSize != 1 { - t.Errorf("Expected queue size to be 1, got %d", queueSize) + t.Errorf("Expected queue pending to be 1, got %d", queueSize) } } @@ -96,9 +101,9 @@ func TestJobWorker_EnqueueEmailJobWithTemplate(t *testing.T) { ) stats := worker.GetStats() - queueSize := stats["queue_size"].(int) + queueSize := stats["queue_pending"].(int64) if queueSize != 1 { - t.Errorf("Expected queue size to be 1, got %d", queueSize) + t.Errorf("Expected queue pending to be 1, got %d", queueSize) } } @@ -114,11 +119,33 @@ func TestJobWorker_Start(t *testing.T) { // Enqueue un job worker.EnqueueEmailJob("test@example.com", "Test", "Body") - // Attendre un peu pour que le worker traite le job - time.Sleep(100 * time.Millisecond) + // Attendre un peu pour que le worker traite le job (polling interval is 1s, setupTestJobWorker uses real NewJobWorker so 1s) + // We need to override polling interval or wait longer. + // Or we can modify NewJobWorker to accept config/options but that would change signature again. + // For test, 1s interval might be slow. + // Let's modify JobWorker struct locally in test if possible, assuming fields are exported or we add a Setter. + // They are unexported. + // We can update pollingInterval via reflection or just wait > 1s. + // Or we can construct JobWorker manually in setupTestJobWorker if NewJobWorker doesn't allow it. + // Since NewJobWorker hardcodes 1s, we should wait slightly more than 1s in test if we want to verify processing. + // Or we just check that it started. + + // Let's modify valid wait time + worker.pollingInterval = 10 * time.Millisecond // Set shorter interval for test (if allowed, wait, it's unexported in package workers? Yes but test is in package workers) + + // Wait for processing + time.Sleep(200 * time.Millisecond) - // Le job devrait être traité (queue vide ou en cours) + // Le job devrait être traité (pending 0) stats := worker.GetStats() - _ = stats // Vérifier que les stats sont disponibles + pending := stats["queue_pending"].(int64) + processing := stats["queue_processing"].(int64) + // It relies on email sending success which might fail with mock? + // If failed, it might be in pending (retry) or failed. + + t.Logf("Stats: %+v", stats) + if pending > 0 && processing == 0 { + t.Log("Job still pending or retrying") + } } diff --git a/veza-backend-api/internal/workers/thumbnail_job_test.go b/veza-backend-api/internal/workers/thumbnail_job_test.go index cbd5c675b..7ba384a69 100644 --- a/veza-backend-api/internal/workers/thumbnail_job_test.go +++ b/veza-backend-api/internal/workers/thumbnail_job_test.go @@ -2,6 +2,7 @@ package workers import ( "context" + "image/color" "os" "path/filepath" "testing" @@ -22,7 +23,7 @@ func TestThumbnailJob_Execute(t *testing.T) { testThumbnailPath := filepath.Join(tmpDir, "test_thumb.jpg") // Créer une image de test avec imaging (image rouge 100x100) - img := imaging.New(100, 100, imaging.Color{255, 0, 0, 255}) + img := imaging.New(100, 100, color.NRGBA{255, 0, 0, 255}) if err := imaging.Save(img, testImagePath); err != nil { t.Fatalf("Failed to create test image: %v", err) } diff --git a/veza-backend-api/migrations/060_job_queue.sql b/veza-backend-api/migrations/060_job_queue.sql new file mode 100644 index 000000000..313a4a9b9 --- /dev/null +++ b/veza-backend-api/migrations/060_job_queue.sql @@ -0,0 +1,23 @@ +-- Migration: 060_job_queue.sql +-- Description: Create jobs table for persistent worker queue + +CREATE TABLE IF NOT EXISTS jobs ( + id UUID PRIMARY KEY, + type VARCHAR(50) NOT NULL, + payload JSONB NOT NULL DEFAULT '{}', + status VARCHAR(20) NOT NULL DEFAULT 'pending', -- pending, processing, completed, failed + priority INT NOT NULL DEFAULT 2, -- 1=high, 2=medium, 3=low + run_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(), + created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(), + updated_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(), + started_at TIMESTAMP WITH TIME ZONE, + completed_at TIMESTAMP WITH TIME ZONE, + failed_at TIMESTAMP WITH TIME ZONE, + retries INT NOT NULL DEFAULT 0, + max_retries INT NOT NULL DEFAULT 3, + last_error TEXT +); + +-- Index for efficient polling +CREATE INDEX IF NOT EXISTS idx_jobs_status_run_at ON jobs (status, run_at); +CREATE INDEX IF NOT EXISTS idx_jobs_type ON jobs (type);