veza/veza-backend-api/internal/workers/job_worker.go

461 lines
13 KiB
Go

package workers
import (
"context"
"fmt"
"time"
"veza-backend-api/internal/email"
"veza-backend-api/internal/services"
"github.com/google/uuid"
"go.uber.org/zap"
"gorm.io/gorm"
"gorm.io/gorm/clause"
)
// JobWorker gère les tâches en arrière-plan via une queue persistée en DB
type JobWorker struct {
db *gorm.DB
jobService *services.JobService
logger *zap.Logger
maxRetries int
processingWorkers int
emailSender email.EmailSender
pollingInterval time.Duration
}
// Job représente une tâche persistée en base de données
type Job struct {
ID uuid.UUID `gorm:"type:uuid;primary_key"`
Type string `gorm:"not null"`
Payload map[string]interface{} `gorm:"serializer:json;not null"`
Status string `gorm:"not null;default:'pending'"` // pending, processing, completed, failed
Priority int `gorm:"not null;default:2"` // 1=high, 2=medium, 3=low
RunAt time.Time `gorm:"not null;index"`
CreatedAt time.Time `gorm:"not null"`
UpdatedAt time.Time `gorm:"not null"`
StartedAt *time.Time
CompletedAt *time.Time
FailedAt *time.Time
Retries int `gorm:"not null;default:0"`
MaxRetries int `gorm:"not null;default:3"`
LastError string `gorm:"type:text"`
}
// NewJobWorker crée un nouveau worker de jobs persisté
func NewJobWorker(
db *gorm.DB,
jobService *services.JobService,
logger *zap.Logger,
_ int, // queueSize ignoré car persisté
workers int,
maxRetries int,
emailSender email.EmailSender,
) *JobWorker {
// AutoMigrate la table Job si nécessaire (optionnel si géré par migrations SQL)
// db.AutoMigrate(&Job{})
return &JobWorker{
db: db,
jobService: jobService,
logger: logger,
maxRetries: maxRetries,
processingWorkers: workers,
emailSender: emailSender,
pollingInterval: 1 * time.Second, // Polling agressif pour réactivité
}
}
// Enqueue ajoute un job dans la table jobs
func (w *JobWorker) Enqueue(job Job) {
if job.ID == uuid.Nil {
job.ID = uuid.New()
}
// Initialisation des champs par défaut
if job.Status == "" {
job.Status = "pending"
}
if job.RunAt.IsZero() {
job.RunAt = time.Now()
}
if job.MaxRetries == 0 {
job.MaxRetries = w.maxRetries
}
// Le mapping GORM gère CreatedAt/UpdatedAt
if err := w.db.Create(&job).Error; err != nil {
w.logger.Error("Failed to enqueue job",
zap.String("type", job.Type),
zap.Error(err))
return
}
w.logger.Debug("Job enqueued (persisted)",
zap.String("job_id", job.ID.String()),
zap.String("type", job.Type))
}
// Start démarre les workers de polling
func (w *JobWorker) Start(ctx context.Context) {
w.logger.Info("Starting persisted job worker",
zap.Int("workers", w.processingWorkers))
// Start zombie job rescuer (background loop)
go w.rescueZombieJobsLoop(ctx)
for i := 0; i < w.processingWorkers; i++ {
go w.processWorker(ctx, i)
}
}
// rescueZombieJobsLoop runs periodically to reset jobs stuck in processing state
func (w *JobWorker) rescueZombieJobsLoop(ctx context.Context) {
ticker := time.NewTicker(5 * time.Minute)
defer ticker.Stop()
// Run once immediately on startup
if err := w.rescueZombieJobs(); err != nil {
w.logger.Error("Failed to rescue zombie jobs on startup", zap.Error(err))
}
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if err := w.rescueZombieJobs(); err != nil {
w.logger.Error("Failed to rescue zombie jobs", zap.Error(err))
}
}
}
}
// rescueZombieJobs atomically resets stuck jobs
func (w *JobWorker) rescueZombieJobs() error {
// Threshold: 15 minutes. If a job is "processing" for > 15m, it is likely the worker crashed.
threshold := time.Now().Add(-15 * time.Minute)
result := w.db.Model(&Job{}).
Where("status = ? AND started_at < ?", "processing", threshold).
Updates(map[string]interface{}{
"status": "pending",
"started_at": nil,
// We increment retries to prevent infinite loops if the job itself causes the crash
"retries": gorm.Expr("retries + 1"),
"last_error": "Zombie job rescue: Worker probably crashed",
"run_at": time.Now(), // Retry immediately
})
if result.Error != nil {
return result.Error
}
if result.RowsAffected > 0 {
w.logger.Warn("Rescued zombie jobs", zap.Int64("count", result.RowsAffected))
}
return nil
}
// processWorker boucle de polling et traitement
func (w *JobWorker) processWorker(ctx context.Context, workerID int) {
ticker := time.NewTicker(w.pollingInterval)
defer ticker.Stop()
w.logger.Info("Worker started", zap.Int("worker_id", workerID))
for {
select {
case <-ctx.Done():
w.logger.Info("Worker stopping", zap.Int("worker_id", workerID))
return
case <-ticker.C:
w.fetchAndProcessJob(ctx, workerID)
}
}
}
// fetchAndProcessJob récupère UN job en attente (atomiquement) et le traite
func (w *JobWorker) fetchAndProcessJob(ctx context.Context, workerID int) {
var job Job
// Transaction pour verrouiller le job (SELECT ... FOR UPDATE SKIP LOCKED)
// Compatible Postgres (et MySQL 8+). Pour SQLite, le locking est différent mais Gorm gère le basic.
err := w.db.Transaction(func(tx *gorm.DB) error {
// Trouver un job 'pending' ou 'failed' (si retry auto géré ici, mais on préfère 'pending' avec RunAt <= Now)
// On cherche status='pending' AND run_at <= NOW()
// Order by Priority ASC (1 first), then CreatedAt
if err := tx.Clauses(clause.Locking{Strength: "UPDATE", Options: "SKIP LOCKED"}).
Where("status = ? AND run_at <= ?", "pending", time.Now()).
Order("priority ASC, created_at ASC").
First(&job).Error; err != nil {
return err // RecordNotFound est typique ici
}
// Update status to 'processing'
now := time.Now()
job.Status = "processing"
job.StartedAt = &now
if err := tx.Save(&job).Error; err != nil {
return err
}
return nil
})
if err != nil {
if err != gorm.ErrRecordNotFound {
w.logger.Error("Failed to fetch job", zap.Error(err))
}
// Pas de job à traiter, on attend le prochain tick
return
}
// Job récupéré, on traite
w.processJob(ctx, job, workerID)
}
// processJob exécute la logique métier et met à jour le statut final
func (w *JobWorker) processJob(ctx context.Context, job Job, workerID int) {
// Si le payload est une map vide, tenter de le decoder s'il vient de GORM (jsonb)
// Gorm avec `serializer:json` devrait le faire auto, mais verifions.
logger := w.logger.With(
zap.String("job_id", job.ID.String()),
zap.String("type", job.Type),
zap.Int("worker_id", workerID),
zap.Int("retry", job.Retries))
logger.Info("Processing job")
// Timeout per job execution
jobCtx, cancel := context.WithTimeout(ctx, 5*time.Minute)
defer cancel()
// Exécution
execErr := w.executeJob(jobCtx, job)
// Update status final
now := time.Now()
if execErr != nil {
logger.Error("Job execution failed", zap.Error(execErr))
// Calcul du prochain retry
job.Retries++
job.LastError = execErr.Error()
if job.Retries >= job.MaxRetries {
job.Status = "failed"
job.FailedAt = &now
logger.Error("Job reached max retries, marked as failed")
} else {
// Backoff exponentiel : 5s, 10s, 20s... (lineaire * coefficient) ou 5 * retry
backoff := time.Duration(job.Retries) * 10 * time.Second
job.Status = "pending" // Retour en queue
job.RunAt = time.Now().Add(backoff)
logger.Info("Job scheduled for retry", zap.Duration("backoff", backoff))
}
} else {
job.Status = "completed"
job.CompletedAt = &now
logger.Info("Job completed successfully")
}
// Sauvegarde finale
// On le fait hors transaction "fetch", car le traitement peut être long
if err := w.db.Save(&job).Error; err != nil {
logger.Error("Failed to update job status after execution", zap.Error(err))
}
}
// executeJob exécute la logique selon le type (inchangé)
func (w *JobWorker) executeJob(ctx context.Context, job Job) error {
switch job.Type {
case "email":
return w.processEmailJob(ctx, job)
case "thumbnail":
// Mapping manuel pour compatibilité avec l'ancien code si nécessaire
return w.processThumbnailJob(ctx, job)
case "analytics":
return w.processAnalyticsJob(ctx, job)
default:
return fmt.Errorf("unknown job type: %s", job.Type)
}
}
// processEmailJob (inchangé structurellement, mais adapte le payload use)
func (w *JobWorker) processEmailJob(ctx context.Context, job Job) error {
// Re-conversion du payload map si nécessaire
p := job.Payload
to, _ := p["to"].(string)
if to == "" {
return fmt.Errorf("missing 'to' in payload")
}
subject, _ := p["subject"].(string)
body, _ := p["body"].(string)
templateName, _ := p["template"].(string)
var templateData map[string]interface{}
// Gorm serialization handle maps directly
if data, ok := p["template_data"].(map[string]interface{}); ok {
templateData = data
} else {
// Try generic map
if data, ok := p["template_data"].(map[string]any); ok {
templateData = data
} else {
templateData = make(map[string]interface{})
}
}
var emailJob *EmailJob
if templateName != "" {
emailJob = NewEmailJobWithTemplate(to, subject, templateName, templateData)
} else {
emailJob = NewEmailJob(to, subject, body)
}
return emailJob.Execute(ctx, w.emailSender, w.logger)
}
// Helper methods pour enqueuing (inchangés, mais adaptent l'objet Job)
// EnqueueEmailJob helper
func (w *JobWorker) EnqueueEmailJob(to, subject, body string) {
job := Job{
Type: "email",
Priority: 2,
Payload: map[string]interface{}{
"to": to,
"subject": subject,
"body": body,
},
}
w.Enqueue(job)
}
// EnqueueEmailJobWithTemplate helper
func (w *JobWorker) EnqueueEmailJobWithTemplate(to, subject, templateName string, templateData map[string]interface{}) {
job := Job{
Type: "email",
Priority: 2,
Payload: map[string]interface{}{
"to": to,
"subject": subject,
"template": templateName,
"template_data": templateData,
},
}
w.Enqueue(job)
}
// EnqueueThumbnailJob helper
func (w *JobWorker) EnqueueThumbnailJob(inputPath, outputPath string, width, height int) {
job := Job{
Type: "thumbnail",
Priority: 2,
Payload: map[string]interface{}{
"input_path": inputPath,
"output_path": outputPath,
"width": width,
"height": height,
},
}
w.Enqueue(job)
}
// EnqueueAnalyticsJob helper
func (w *JobWorker) EnqueueAnalyticsJob(eventName string, userID *uuid.UUID, payload map[string]interface{}) {
jobPayload := map[string]interface{}{
"event_name": eventName,
"payload": payload,
}
if userID != nil {
jobPayload["user_id"] = userID.String()
}
job := Job{
Type: "analytics",
Priority: 3,
Payload: jobPayload,
}
w.Enqueue(job)
}
// processThumbnailJob wrapper
func (w *JobWorker) processThumbnailJob(ctx context.Context, job Job) error {
p := job.Payload
inputPath, _ := p["input_path"].(string)
outputPath, _ := p["output_path"].(string)
if inputPath == "" || outputPath == "" {
return fmt.Errorf("missing paths in payload")
}
// JSON unmarshal numbers as float64
width := 300
if wVal, ok := p["width"].(float64); ok {
width = int(wVal)
} else if wInt, ok := p["width"].(int); ok { // just in case
width = wInt
}
height := 300
if hVal, ok := p["height"].(float64); ok {
height = int(hVal)
} else if hInt, ok := p["height"].(int); ok {
height = hInt
}
thumbnailJob := NewThumbnailJob(inputPath, outputPath, width, height)
return thumbnailJob.Execute(ctx, w.logger)
}
// processAnalyticsJob wrapper
func (w *JobWorker) processAnalyticsJob(ctx context.Context, job Job) error {
p := job.Payload
eventName, _ := p["event_name"].(string)
if eventName == "" {
return fmt.Errorf("missing event_name")
}
var userID *uuid.UUID
if uidStr, ok := p["user_id"].(string); ok && uidStr != "" {
uid, err := uuid.Parse(uidStr)
if err != nil {
return fmt.Errorf("invalid user_id: %w", err)
}
userID = &uid
}
var extraPayload map[string]interface{}
// Handle nested map from JSON
if nested, ok := p["payload"].(map[string]interface{}); ok {
extraPayload = nested
} else if nested, ok := p["payload"].(map[string]any); ok {
extraPayload = nested
} else {
// If payload is a string (escaped json), try unmarshal?
// For now assume standard structure
extraPayload = make(map[string]interface{})
}
analyticsJob := NewAnalyticsEventJob(eventName, userID, extraPayload)
return analyticsJob.Execute(ctx, w.db, w.logger)
}
// GetStats retourne les stats DB si possible
func (w *JobWorker) GetStats() map[string]interface{} {
var pending, processing, failed int64
w.db.Model(&Job{}).Where("status = ?", "pending").Count(&pending)
w.db.Model(&Job{}).Where("status = ?", "processing").Count(&processing)
w.db.Model(&Job{}).Where("status = ?", "failed").Count(&failed)
return map[string]interface{}{
"queue_pending": pending,
"queue_processing": processing,
"queue_failed": failed,
"workers": w.processingWorkers,
}
}