package workers import ( "context" "fmt" "time" "veza-backend-api/internal/email" "veza-backend-api/internal/services" "github.com/google/uuid" "go.uber.org/zap" "gorm.io/gorm" ) // JobWorker gère les tâches en arrière-plan type JobWorker struct { db *gorm.DB jobService *services.JobService logger *zap.Logger queue chan Job maxRetries int processingWorkers int emailSender email.EmailSender // Email sender pour les jobs d'email } // Job représente une tâche à traiter type Job struct { ID uuid.UUID Type string Payload map[string]interface{} Retries int CreatedAt time.Time Priority int // 1 = haut, 2 = moyen, 3 = bas } // NewJobWorker crée un nouveau worker de jobs func NewJobWorker( db *gorm.DB, jobService *services.JobService, logger *zap.Logger, queueSize int, workers int, maxRetries int, emailSender email.EmailSender, ) *JobWorker { return &JobWorker{ db: db, jobService: jobService, logger: logger, queue: make(chan Job, queueSize), maxRetries: maxRetries, processingWorkers: workers, emailSender: emailSender, } } // Enqueue ajoute un job au queue func (w *JobWorker) Enqueue(job Job) { job.CreatedAt = time.Now() if job.ID == uuid.Nil { job.ID = uuid.New() } select { case w.queue <- job: w.logger.Debug("Job enqueued", zap.String("job_id", job.ID.String()), zap.String("job_type", job.Type), zap.Int("priority", job.Priority)) default: w.logger.Warn("Job queue full, dropping job", zap.String("job_type", job.Type)) } } // Start démarre le worker func (w *JobWorker) Start(ctx context.Context) { w.logger.Info("Starting job worker", zap.Int("workers", w.processingWorkers)) for i := 0; i < w.processingWorkers; i++ { go w.processWorker(ctx, i) } } // processWorker traite les jobs du queue func (w *JobWorker) processWorker(ctx context.Context, workerID int) { w.logger.Info("Job worker started", zap.Int("worker_id", workerID)) for { select { case <-ctx.Done(): w.logger.Info("Job worker stopping", zap.Int("worker_id", workerID)) return case job := <-w.queue: w.processJob(ctx, job, workerID) } } } // processJob traite un job individuel func (w *JobWorker) processJob(ctx context.Context, job Job, workerID int) { logger := w.logger.With( zap.String("job_id", job.ID.String()), zap.String("job_type", job.Type), zap.Int("worker_id", workerID)) logger.Info("Processing job", zap.Int("retries", job.Retries)) // Créer un contexte avec timeout jobCtx, cancel := context.WithTimeout(ctx, 5*time.Minute) defer cancel() // Traiter le job selon son type err := w.executeJob(jobCtx, job) if err != nil { logger.Error("Job execution failed", zap.Error(err)) // Retry si pas atteint max retries if job.Retries < w.maxRetries { job.Retries++ // Exponential backoff delay := time.Duration(job.Retries) * 5 * time.Second // Non-blocking retry: re-enqueue after delay go func(d time.Duration, j Job) { time.Sleep(d) w.Enqueue(j) }(delay, job) logger.Info("Job scheduled for retry", zap.Duration("delay", delay), zap.Int("new_retries", job.Retries)) } else { logger.Error("Job failed after max retries", zap.Int("max_retries", w.maxRetries)) // Enregistrer l'échec définitif w.logFailedJob(ctx, job, err) } } else { logger.Info("Job executed successfully") } } // executeJob exécute un job selon son type func (w *JobWorker) executeJob(ctx context.Context, job Job) error { switch job.Type { case "email": return w.processEmailJob(ctx, job) case "thumbnail": return w.processThumbnailJob(ctx, job) case "analytics": return w.processAnalyticsJob(ctx, job) default: return fmt.Errorf("unknown job type: %s", job.Type) } } // processEmailJob traite un job d'email func (w *JobWorker) processEmailJob(ctx context.Context, job Job) error { // Extraire les données du payload to, ok := job.Payload["to"].(string) if !ok { return fmt.Errorf("missing 'to' in payload") } subject, _ := job.Payload["subject"].(string) body, _ := job.Payload["body"].(string) templateName, _ := job.Payload["template"].(string) // Extraire les données du template si présentes var templateData map[string]interface{} if data, ok := job.Payload["template_data"].(map[string]interface{}); ok { templateData = data } else { templateData = make(map[string]interface{}) } // Créer l'EmailJob var emailJob *EmailJob if templateName != "" { emailJob = NewEmailJobWithTemplate(to, subject, templateName, templateData) } else { emailJob = NewEmailJob(to, subject, body) } // Exécuter le job d'email if err := emailJob.Execute(ctx, w.emailSender, w.logger); err != nil { return fmt.Errorf("email job execution failed: %w", err) } return nil } // EnqueueEmailJob ajoute un job d'email au queue (méthode helper) func (w *JobWorker) EnqueueEmailJob(to, subject, body string) { job := Job{ Type: "email", Priority: 2, // Priorité moyenne par défaut Payload: map[string]interface{}{ "to": to, "subject": subject, "body": body, }, } w.Enqueue(job) } // EnqueueEmailJobWithTemplate ajoute un job d'email avec template au queue func (w *JobWorker) EnqueueEmailJobWithTemplate(to, subject, templateName string, templateData map[string]interface{}) { job := Job{ Type: "email", Priority: 2, // Priorité moyenne par défaut Payload: map[string]interface{}{ "to": to, "subject": subject, "template": templateName, "template_data": templateData, }, } w.Enqueue(job) } // EnqueueThumbnailJob ajoute un job de génération de thumbnail au queue func (w *JobWorker) EnqueueThumbnailJob(inputPath, outputPath string, width, height int) { job := Job{ Type: "thumbnail", Priority: 2, // Priorité moyenne par défaut Payload: map[string]interface{}{ "input_path": inputPath, "output_path": outputPath, "width": float64(width), "height": float64(height), }, } w.Enqueue(job) } // EnqueueAnalyticsJob ajoute un job d'analytics au queue func (w *JobWorker) EnqueueAnalyticsJob(eventName string, userID *uuid.UUID, payload map[string]interface{}) { jobPayload := map[string]interface{}{ "event_name": eventName, "payload": payload, } if userID != nil { jobPayload["user_id"] = userID.String() } job := Job{ Type: "analytics", Priority: 3, // Priorité basse par défaut (analytics non critique) Payload: jobPayload, } w.Enqueue(job) } // processThumbnailJob traite un job de génération de thumbnail func (w *JobWorker) processThumbnailJob(ctx context.Context, job Job) error { // Extraire les paramètres du payload inputPath, ok := job.Payload["input_path"].(string) if !ok { return fmt.Errorf("missing 'input_path' in payload") } outputPath, ok := job.Payload["output_path"].(string) if !ok { return fmt.Errorf("missing 'output_path' in payload") } // Largeur et hauteur (optionnels, avec valeurs par défaut) width := 300 height := 300 if w, ok := job.Payload["width"].(float64); ok { width = int(w) } if h, ok := job.Payload["height"].(float64); ok { height = int(h) } // Créer et exécuter le ThumbnailJob thumbnailJob := NewThumbnailJob(inputPath, outputPath, width, height) if err := thumbnailJob.Execute(ctx, w.logger); err != nil { return fmt.Errorf("thumbnail job execution failed: %w", err) } return nil } // processAnalyticsJob traite un job d'analytics func (w *JobWorker) processAnalyticsJob(ctx context.Context, job Job) error { // Extraire les données du payload eventName, ok := job.Payload["event_name"].(string) if !ok { return fmt.Errorf("missing 'event_name' in payload") } // UserID (optionnel, peut être nil pour événements anonymes) var userID *uuid.UUID if uidStr, ok := job.Payload["user_id"].(string); ok && uidStr != "" { uid, err := uuid.Parse(uidStr) if err != nil { return fmt.Errorf("invalid user_id format: %w", err) } userID = &uid } // Payload additionnel (optionnel) var payload map[string]interface{} if p, ok := job.Payload["payload"].(map[string]interface{}); ok { payload = p } else { payload = make(map[string]interface{}) } // Créer et exécuter l'AnalyticsEventJob analyticsJob := NewAnalyticsEventJob(eventName, userID, payload) if err := analyticsJob.Execute(ctx, w.db, w.logger); err != nil { return fmt.Errorf("analytics job execution failed: %w", err) } return nil } // logFailedJob enregistre un échec de job func (w *JobWorker) logFailedJob(ctx context.Context, job Job, err error) { // TODO: Enregistrer dans la table job_failures w.logger.Error("Job permanently failed", zap.String("job_id", job.ID.String()), zap.String("job_type", job.Type), zap.Error(err)) } // GetStats retourne les statistiques du worker func (w *JobWorker) GetStats() map[string]interface{} { return map[string]interface{}{ "queue_size": len(w.queue), "workers": w.processingWorkers, "max_retries": w.maxRetries, } }