349 lines
9 KiB
Go
349 lines
9 KiB
Go
package workers
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"time"
|
|
|
|
"veza-backend-api/internal/email"
|
|
"veza-backend-api/internal/services"
|
|
|
|
"github.com/google/uuid"
|
|
"go.uber.org/zap"
|
|
"gorm.io/gorm"
|
|
)
|
|
|
|
// JobWorker gère les tâches en arrière-plan
|
|
type JobWorker struct {
|
|
db *gorm.DB
|
|
jobService *services.JobService
|
|
logger *zap.Logger
|
|
queue chan Job
|
|
maxRetries int
|
|
processingWorkers int
|
|
emailSender email.EmailSender // Email sender pour les jobs d'email
|
|
}
|
|
|
|
// Job représente une tâche à traiter
|
|
type Job struct {
|
|
ID uuid.UUID
|
|
Type string
|
|
Payload map[string]interface{}
|
|
Retries int
|
|
CreatedAt time.Time
|
|
Priority int // 1 = haut, 2 = moyen, 3 = bas
|
|
}
|
|
|
|
// NewJobWorker crée un nouveau worker de jobs
|
|
func NewJobWorker(
|
|
db *gorm.DB,
|
|
jobService *services.JobService,
|
|
logger *zap.Logger,
|
|
queueSize int,
|
|
workers int,
|
|
maxRetries int,
|
|
emailSender email.EmailSender,
|
|
) *JobWorker {
|
|
return &JobWorker{
|
|
db: db,
|
|
jobService: jobService,
|
|
logger: logger,
|
|
queue: make(chan Job, queueSize),
|
|
maxRetries: maxRetries,
|
|
processingWorkers: workers,
|
|
emailSender: emailSender,
|
|
}
|
|
}
|
|
|
|
// Enqueue ajoute un job au queue
|
|
func (w *JobWorker) Enqueue(job Job) {
|
|
job.CreatedAt = time.Now()
|
|
if job.ID == uuid.Nil {
|
|
job.ID = uuid.New()
|
|
}
|
|
|
|
select {
|
|
case w.queue <- job:
|
|
w.logger.Debug("Job enqueued",
|
|
zap.String("job_id", job.ID.String()),
|
|
zap.String("job_type", job.Type),
|
|
zap.Int("priority", job.Priority))
|
|
default:
|
|
w.logger.Warn("Job queue full, dropping job",
|
|
zap.String("job_type", job.Type))
|
|
}
|
|
}
|
|
|
|
// Start démarre le worker
|
|
func (w *JobWorker) Start(ctx context.Context) {
|
|
w.logger.Info("Starting job worker",
|
|
zap.Int("workers", w.processingWorkers))
|
|
|
|
for i := 0; i < w.processingWorkers; i++ {
|
|
go w.processWorker(ctx, i)
|
|
}
|
|
}
|
|
|
|
// processWorker traite les jobs du queue
|
|
func (w *JobWorker) processWorker(ctx context.Context, workerID int) {
|
|
w.logger.Info("Job worker started",
|
|
zap.Int("worker_id", workerID))
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
w.logger.Info("Job worker stopping",
|
|
zap.Int("worker_id", workerID))
|
|
return
|
|
|
|
case job := <-w.queue:
|
|
w.processJob(ctx, job, workerID)
|
|
}
|
|
}
|
|
}
|
|
|
|
// processJob traite un job individuel
|
|
func (w *JobWorker) processJob(ctx context.Context, job Job, workerID int) {
|
|
logger := w.logger.With(
|
|
zap.String("job_id", job.ID.String()),
|
|
zap.String("job_type", job.Type),
|
|
zap.Int("worker_id", workerID))
|
|
|
|
logger.Info("Processing job",
|
|
zap.Int("retries", job.Retries))
|
|
|
|
// Créer un contexte avec timeout
|
|
jobCtx, cancel := context.WithTimeout(ctx, 5*time.Minute)
|
|
defer cancel()
|
|
|
|
// Traiter le job selon son type
|
|
err := w.executeJob(jobCtx, job)
|
|
|
|
if err != nil {
|
|
logger.Error("Job execution failed",
|
|
zap.Error(err))
|
|
|
|
// Retry si pas atteint max retries
|
|
if job.Retries < w.maxRetries {
|
|
job.Retries++
|
|
|
|
// Exponential backoff
|
|
delay := time.Duration(job.Retries) * 5 * time.Second
|
|
|
|
// Non-blocking retry: re-enqueue after delay
|
|
go func(d time.Duration, j Job) {
|
|
time.Sleep(d)
|
|
w.Enqueue(j)
|
|
}(delay, job)
|
|
|
|
logger.Info("Job scheduled for retry",
|
|
zap.Duration("delay", delay),
|
|
zap.Int("new_retries", job.Retries))
|
|
} else {
|
|
logger.Error("Job failed after max retries",
|
|
zap.Int("max_retries", w.maxRetries))
|
|
|
|
// Enregistrer l'échec définitif
|
|
w.logFailedJob(ctx, job, err)
|
|
}
|
|
} else {
|
|
logger.Info("Job executed successfully")
|
|
}
|
|
}
|
|
|
|
// executeJob exécute un job selon son type
|
|
func (w *JobWorker) executeJob(ctx context.Context, job Job) error {
|
|
switch job.Type {
|
|
case "email":
|
|
return w.processEmailJob(ctx, job)
|
|
case "thumbnail":
|
|
return w.processThumbnailJob(ctx, job)
|
|
case "analytics":
|
|
return w.processAnalyticsJob(ctx, job)
|
|
default:
|
|
return fmt.Errorf("unknown job type: %s", job.Type)
|
|
}
|
|
}
|
|
|
|
// processEmailJob traite un job d'email
|
|
func (w *JobWorker) processEmailJob(ctx context.Context, job Job) error {
|
|
// Extraire les données du payload
|
|
to, ok := job.Payload["to"].(string)
|
|
if !ok {
|
|
return fmt.Errorf("missing 'to' in payload")
|
|
}
|
|
|
|
subject, _ := job.Payload["subject"].(string)
|
|
body, _ := job.Payload["body"].(string)
|
|
templateName, _ := job.Payload["template"].(string)
|
|
|
|
// Extraire les données du template si présentes
|
|
var templateData map[string]interface{}
|
|
if data, ok := job.Payload["template_data"].(map[string]interface{}); ok {
|
|
templateData = data
|
|
} else {
|
|
templateData = make(map[string]interface{})
|
|
}
|
|
|
|
// Créer l'EmailJob
|
|
var emailJob *EmailJob
|
|
if templateName != "" {
|
|
emailJob = NewEmailJobWithTemplate(to, subject, templateName, templateData)
|
|
} else {
|
|
emailJob = NewEmailJob(to, subject, body)
|
|
}
|
|
|
|
// Exécuter le job d'email
|
|
if err := emailJob.Execute(ctx, w.emailSender, w.logger); err != nil {
|
|
return fmt.Errorf("email job execution failed: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// EnqueueEmailJob ajoute un job d'email au queue (méthode helper)
|
|
func (w *JobWorker) EnqueueEmailJob(to, subject, body string) {
|
|
job := Job{
|
|
Type: "email",
|
|
Priority: 2, // Priorité moyenne par défaut
|
|
Payload: map[string]interface{}{
|
|
"to": to,
|
|
"subject": subject,
|
|
"body": body,
|
|
},
|
|
}
|
|
w.Enqueue(job)
|
|
}
|
|
|
|
// EnqueueEmailJobWithTemplate ajoute un job d'email avec template au queue
|
|
func (w *JobWorker) EnqueueEmailJobWithTemplate(to, subject, templateName string, templateData map[string]interface{}) {
|
|
job := Job{
|
|
Type: "email",
|
|
Priority: 2, // Priorité moyenne par défaut
|
|
Payload: map[string]interface{}{
|
|
"to": to,
|
|
"subject": subject,
|
|
"template": templateName,
|
|
"template_data": templateData,
|
|
},
|
|
}
|
|
w.Enqueue(job)
|
|
}
|
|
|
|
// EnqueueThumbnailJob ajoute un job de génération de thumbnail au queue
|
|
func (w *JobWorker) EnqueueThumbnailJob(inputPath, outputPath string, width, height int) {
|
|
job := Job{
|
|
Type: "thumbnail",
|
|
Priority: 2, // Priorité moyenne par défaut
|
|
Payload: map[string]interface{}{
|
|
"input_path": inputPath,
|
|
"output_path": outputPath,
|
|
"width": float64(width),
|
|
"height": float64(height),
|
|
},
|
|
}
|
|
w.Enqueue(job)
|
|
}
|
|
|
|
// EnqueueAnalyticsJob ajoute un job d'analytics au queue
|
|
func (w *JobWorker) EnqueueAnalyticsJob(eventName string, userID *uuid.UUID, payload map[string]interface{}) {
|
|
jobPayload := map[string]interface{}{
|
|
"event_name": eventName,
|
|
"payload": payload,
|
|
}
|
|
if userID != nil {
|
|
jobPayload["user_id"] = userID.String()
|
|
}
|
|
|
|
job := Job{
|
|
Type: "analytics",
|
|
Priority: 3, // Priorité basse par défaut (analytics non critique)
|
|
Payload: jobPayload,
|
|
}
|
|
w.Enqueue(job)
|
|
}
|
|
|
|
// processThumbnailJob traite un job de génération de thumbnail
|
|
func (w *JobWorker) processThumbnailJob(ctx context.Context, job Job) error {
|
|
// Extraire les paramètres du payload
|
|
inputPath, ok := job.Payload["input_path"].(string)
|
|
if !ok {
|
|
return fmt.Errorf("missing 'input_path' in payload")
|
|
}
|
|
|
|
outputPath, ok := job.Payload["output_path"].(string)
|
|
if !ok {
|
|
return fmt.Errorf("missing 'output_path' in payload")
|
|
}
|
|
|
|
// Largeur et hauteur (optionnels, avec valeurs par défaut)
|
|
width := 300
|
|
height := 300
|
|
if w, ok := job.Payload["width"].(float64); ok {
|
|
width = int(w)
|
|
}
|
|
if h, ok := job.Payload["height"].(float64); ok {
|
|
height = int(h)
|
|
}
|
|
|
|
// Créer et exécuter le ThumbnailJob
|
|
thumbnailJob := NewThumbnailJob(inputPath, outputPath, width, height)
|
|
if err := thumbnailJob.Execute(ctx, w.logger); err != nil {
|
|
return fmt.Errorf("thumbnail job execution failed: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// processAnalyticsJob traite un job d'analytics
|
|
func (w *JobWorker) processAnalyticsJob(ctx context.Context, job Job) error {
|
|
// Extraire les données du payload
|
|
eventName, ok := job.Payload["event_name"].(string)
|
|
if !ok {
|
|
return fmt.Errorf("missing 'event_name' in payload")
|
|
}
|
|
|
|
// UserID (optionnel, peut être nil pour événements anonymes)
|
|
var userID *uuid.UUID
|
|
if uidStr, ok := job.Payload["user_id"].(string); ok && uidStr != "" {
|
|
uid, err := uuid.Parse(uidStr)
|
|
if err != nil {
|
|
return fmt.Errorf("invalid user_id format: %w", err)
|
|
}
|
|
userID = &uid
|
|
}
|
|
|
|
// Payload additionnel (optionnel)
|
|
var payload map[string]interface{}
|
|
if p, ok := job.Payload["payload"].(map[string]interface{}); ok {
|
|
payload = p
|
|
} else {
|
|
payload = make(map[string]interface{})
|
|
}
|
|
|
|
// Créer et exécuter l'AnalyticsEventJob
|
|
analyticsJob := NewAnalyticsEventJob(eventName, userID, payload)
|
|
if err := analyticsJob.Execute(ctx, w.db, w.logger); err != nil {
|
|
return fmt.Errorf("analytics job execution failed: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// logFailedJob enregistre un échec de job
|
|
func (w *JobWorker) logFailedJob(ctx context.Context, job Job, err error) {
|
|
// TODO: Enregistrer dans la table job_failures
|
|
w.logger.Error("Job permanently failed",
|
|
zap.String("job_id", job.ID.String()),
|
|
zap.String("job_type", job.Type),
|
|
zap.Error(err))
|
|
}
|
|
|
|
// GetStats retourne les statistiques du worker
|
|
func (w *JobWorker) GetStats() map[string]interface{} {
|
|
return map[string]interface{}{
|
|
"queue_size": len(w.queue),
|
|
"workers": w.processingWorkers,
|
|
"max_retries": w.maxRetries,
|
|
}
|
|
}
|