package workers import ( "context" "fmt" "time" "veza-backend-api/internal/email" "veza-backend-api/internal/services" "github.com/google/uuid" "go.uber.org/zap" "gorm.io/gorm" "gorm.io/gorm/clause" ) // JobWorker gère les tâches en arrière-plan via une queue persistée en DB type JobWorker struct { db *gorm.DB jobService *services.JobService logger *zap.Logger maxRetries int processingWorkers int emailSender email.EmailSender pollingInterval time.Duration } // Job représente une tâche persistée en base de données type Job struct { ID uuid.UUID `gorm:"type:uuid;primary_key"` Type string `gorm:"not null"` Payload map[string]interface{} `gorm:"serializer:json;not null"` Status string `gorm:"not null;default:'pending'"` // pending, processing, completed, failed Priority int `gorm:"not null;default:2"` // 1=high, 2=medium, 3=low RunAt time.Time `gorm:"not null;index"` CreatedAt time.Time `gorm:"not null"` UpdatedAt time.Time `gorm:"not null"` StartedAt *time.Time CompletedAt *time.Time FailedAt *time.Time Retries int `gorm:"not null;default:0"` MaxRetries int `gorm:"not null;default:3"` LastError string `gorm:"type:text"` } // NewJobWorker crée un nouveau worker de jobs persisté func NewJobWorker( db *gorm.DB, jobService *services.JobService, logger *zap.Logger, _ int, // queueSize ignoré car persisté workers int, maxRetries int, emailSender email.EmailSender, ) *JobWorker { // AutoMigrate la table Job si nécessaire (optionnel si géré par migrations SQL) // db.AutoMigrate(&Job{}) return &JobWorker{ db: db, jobService: jobService, logger: logger, maxRetries: maxRetries, processingWorkers: workers, emailSender: emailSender, pollingInterval: 1 * time.Second, // Polling agressif pour réactivité } } // Enqueue ajoute un job dans la table jobs func (w *JobWorker) Enqueue(job Job) { if job.ID == uuid.Nil { job.ID = uuid.New() } // Initialisation des champs par défaut if job.Status == "" { job.Status = "pending" } if job.RunAt.IsZero() { job.RunAt = time.Now() } if job.MaxRetries == 0 { job.MaxRetries = w.maxRetries } // Le mapping GORM gère CreatedAt/UpdatedAt if err := w.db.Create(&job).Error; err != nil { w.logger.Error("Failed to enqueue job", zap.String("type", job.Type), zap.Error(err)) return } w.logger.Debug("Job enqueued (persisted)", zap.String("job_id", job.ID.String()), zap.String("type", job.Type)) } // Start démarre les workers de polling func (w *JobWorker) Start(ctx context.Context) { w.logger.Info("Starting persisted job worker", zap.Int("workers", w.processingWorkers)) // Start zombie job rescuer (background loop) go w.rescueZombieJobsLoop(ctx) for i := 0; i < w.processingWorkers; i++ { go w.processWorker(ctx, i) } } // rescueZombieJobsLoop runs periodically to reset jobs stuck in processing state func (w *JobWorker) rescueZombieJobsLoop(ctx context.Context) { ticker := time.NewTicker(5 * time.Minute) defer ticker.Stop() // Run once immediately on startup if err := w.rescueZombieJobs(); err != nil { w.logger.Error("Failed to rescue zombie jobs on startup", zap.Error(err)) } for { select { case <-ctx.Done(): return case <-ticker.C: if err := w.rescueZombieJobs(); err != nil { w.logger.Error("Failed to rescue zombie jobs", zap.Error(err)) } } } } // rescueZombieJobs atomically resets stuck jobs func (w *JobWorker) rescueZombieJobs() error { // Threshold: 15 minutes. If a job is "processing" for > 15m, it is likely the worker crashed. threshold := time.Now().Add(-15 * time.Minute) result := w.db.Model(&Job{}). Where("status = ? AND started_at < ?", "processing", threshold). Updates(map[string]interface{}{ "status": "pending", "started_at": nil, // We increment retries to prevent infinite loops if the job itself causes the crash "retries": gorm.Expr("retries + 1"), "last_error": "Zombie job rescue: Worker probably crashed", "run_at": time.Now(), // Retry immediately }) if result.Error != nil { return result.Error } if result.RowsAffected > 0 { w.logger.Warn("Rescued zombie jobs", zap.Int64("count", result.RowsAffected)) } return nil } // processWorker boucle de polling et traitement func (w *JobWorker) processWorker(ctx context.Context, workerID int) { ticker := time.NewTicker(w.pollingInterval) defer ticker.Stop() w.logger.Info("Worker started", zap.Int("worker_id", workerID)) for { select { case <-ctx.Done(): w.logger.Info("Worker stopping", zap.Int("worker_id", workerID)) return case <-ticker.C: w.fetchAndProcessJob(ctx, workerID) } } } // fetchAndProcessJob récupère UN job en attente (atomiquement) et le traite func (w *JobWorker) fetchAndProcessJob(ctx context.Context, workerID int) { var job Job // Transaction pour verrouiller le job (SELECT ... FOR UPDATE SKIP LOCKED) // Compatible Postgres (et MySQL 8+). Pour SQLite, le locking est différent mais Gorm gère le basic. err := w.db.Transaction(func(tx *gorm.DB) error { // Trouver un job 'pending' ou 'failed' (si retry auto géré ici, mais on préfère 'pending' avec RunAt <= Now) // On cherche status='pending' AND run_at <= NOW() // Order by Priority ASC (1 first), then CreatedAt if err := tx.Clauses(clause.Locking{Strength: "UPDATE", Options: "SKIP LOCKED"}). Where("status = ? AND run_at <= ?", "pending", time.Now()). Order("priority ASC, created_at ASC"). First(&job).Error; err != nil { return err // RecordNotFound est typique ici } // Update status to 'processing' now := time.Now() job.Status = "processing" job.StartedAt = &now if err := tx.Save(&job).Error; err != nil { return err } return nil }) if err != nil { if err != gorm.ErrRecordNotFound { w.logger.Error("Failed to fetch job", zap.Error(err)) } // Pas de job à traiter, on attend le prochain tick return } // Job récupéré, on traite w.processJob(ctx, job, workerID) } // processJob exécute la logique métier et met à jour le statut final func (w *JobWorker) processJob(ctx context.Context, job Job, workerID int) { // Si le payload est une map vide, tenter de le decoder s'il vient de GORM (jsonb) // Gorm avec `serializer:json` devrait le faire auto, mais verifions. logger := w.logger.With( zap.String("job_id", job.ID.String()), zap.String("type", job.Type), zap.Int("worker_id", workerID), zap.Int("retry", job.Retries)) logger.Info("Processing job") // Timeout per job execution jobCtx, cancel := context.WithTimeout(ctx, 5*time.Minute) defer cancel() // Exécution execErr := w.executeJob(jobCtx, job) // Update status final now := time.Now() if execErr != nil { logger.Error("Job execution failed", zap.Error(execErr)) // Calcul du prochain retry job.Retries++ job.LastError = execErr.Error() if job.Retries >= job.MaxRetries { job.Status = "failed" job.FailedAt = &now logger.Error("Job reached max retries, marked as failed") } else { // Backoff exponentiel : 5s, 10s, 20s... (lineaire * coefficient) ou 5 * retry backoff := time.Duration(job.Retries) * 10 * time.Second job.Status = "pending" // Retour en queue job.RunAt = time.Now().Add(backoff) logger.Info("Job scheduled for retry", zap.Duration("backoff", backoff)) } } else { job.Status = "completed" job.CompletedAt = &now logger.Info("Job completed successfully") } // Sauvegarde finale // On le fait hors transaction "fetch", car le traitement peut être long if err := w.db.Save(&job).Error; err != nil { logger.Error("Failed to update job status after execution", zap.Error(err)) } } // executeJob exécute la logique selon le type (inchangé) func (w *JobWorker) executeJob(ctx context.Context, job Job) error { switch job.Type { case "email": return w.processEmailJob(ctx, job) case "thumbnail": // Mapping manuel pour compatibilité avec l'ancien code si nécessaire return w.processThumbnailJob(ctx, job) case "analytics": return w.processAnalyticsJob(ctx, job) default: return fmt.Errorf("unknown job type: %s", job.Type) } } // processEmailJob (inchangé structurellement, mais adapte le payload use) func (w *JobWorker) processEmailJob(ctx context.Context, job Job) error { // Re-conversion du payload map si nécessaire p := job.Payload to, _ := p["to"].(string) if to == "" { return fmt.Errorf("missing 'to' in payload") } subject, _ := p["subject"].(string) body, _ := p["body"].(string) templateName, _ := p["template"].(string) var templateData map[string]interface{} // Gorm serialization handle maps directly if data, ok := p["template_data"].(map[string]interface{}); ok { templateData = data } else { // Try generic map if data, ok := p["template_data"].(map[string]any); ok { templateData = data } else { templateData = make(map[string]interface{}) } } var emailJob *EmailJob if templateName != "" { emailJob = NewEmailJobWithTemplate(to, subject, templateName, templateData) } else { emailJob = NewEmailJob(to, subject, body) } return emailJob.Execute(ctx, w.emailSender, w.logger) } // Helper methods pour enqueuing (inchangés, mais adaptent l'objet Job) // EnqueueEmailJob helper func (w *JobWorker) EnqueueEmailJob(to, subject, body string) { job := Job{ Type: "email", Priority: 2, Payload: map[string]interface{}{ "to": to, "subject": subject, "body": body, }, } w.Enqueue(job) } // EnqueueEmailJobWithTemplate helper func (w *JobWorker) EnqueueEmailJobWithTemplate(to, subject, templateName string, templateData map[string]interface{}) { job := Job{ Type: "email", Priority: 2, Payload: map[string]interface{}{ "to": to, "subject": subject, "template": templateName, "template_data": templateData, }, } w.Enqueue(job) } // EnqueueThumbnailJob helper func (w *JobWorker) EnqueueThumbnailJob(inputPath, outputPath string, width, height int) { job := Job{ Type: "thumbnail", Priority: 2, Payload: map[string]interface{}{ "input_path": inputPath, "output_path": outputPath, "width": width, "height": height, }, } w.Enqueue(job) } // EnqueueAnalyticsJob helper func (w *JobWorker) EnqueueAnalyticsJob(eventName string, userID *uuid.UUID, payload map[string]interface{}) { jobPayload := map[string]interface{}{ "event_name": eventName, "payload": payload, } if userID != nil { jobPayload["user_id"] = userID.String() } job := Job{ Type: "analytics", Priority: 3, Payload: jobPayload, } w.Enqueue(job) } // processThumbnailJob wrapper func (w *JobWorker) processThumbnailJob(ctx context.Context, job Job) error { p := job.Payload inputPath, _ := p["input_path"].(string) outputPath, _ := p["output_path"].(string) if inputPath == "" || outputPath == "" { return fmt.Errorf("missing paths in payload") } // JSON unmarshal numbers as float64 width := 300 if wVal, ok := p["width"].(float64); ok { width = int(wVal) } else if wInt, ok := p["width"].(int); ok { // just in case width = wInt } height := 300 if hVal, ok := p["height"].(float64); ok { height = int(hVal) } else if hInt, ok := p["height"].(int); ok { height = hInt } thumbnailJob := NewThumbnailJob(inputPath, outputPath, width, height) return thumbnailJob.Execute(ctx, w.logger) } // processAnalyticsJob wrapper func (w *JobWorker) processAnalyticsJob(ctx context.Context, job Job) error { p := job.Payload eventName, _ := p["event_name"].(string) if eventName == "" { return fmt.Errorf("missing event_name") } var userID *uuid.UUID if uidStr, ok := p["user_id"].(string); ok && uidStr != "" { uid, err := uuid.Parse(uidStr) if err != nil { return fmt.Errorf("invalid user_id: %w", err) } userID = &uid } var extraPayload map[string]interface{} // Handle nested map from JSON if nested, ok := p["payload"].(map[string]interface{}); ok { extraPayload = nested } else if nested, ok := p["payload"].(map[string]any); ok { extraPayload = nested } else { // If payload is a string (escaped json), try unmarshal? // For now assume standard structure extraPayload = make(map[string]interface{}) } analyticsJob := NewAnalyticsEventJob(eventName, userID, extraPayload) return analyticsJob.Execute(ctx, w.db, w.logger) } // GetStats retourne les stats DB si possible func (w *JobWorker) GetStats() map[string]interface{} { var pending, processing, failed int64 w.db.Model(&Job{}).Where("status = ?", "pending").Count(&pending) w.db.Model(&Job{}).Where("status = ?", "processing").Count(&processing) w.db.Model(&Job{}).Where("status = ?", "failed").Count(&failed) return map[string]interface{}{ "queue_pending": pending, "queue_processing": processing, "queue_failed": failed, "workers": w.processingWorkers, } }