veza/veza-backend-api/internal/workers/job_worker.go
2025-12-03 20:29:37 +01:00

235 lines
5.5 KiB
Go

package workers
import (
"context"
"fmt"
"time"
"veza-backend-api/internal/services"
"github.com/google/uuid"
"go.uber.org/zap"
"gorm.io/gorm"
)
// JobWorker gère les tâches en arrière-plan
type JobWorker struct {
db *gorm.DB
jobService *services.JobService
logger *zap.Logger
queue chan Job
maxRetries int
processingWorkers int
}
// Job représente une tâche à traiter
type Job struct {
ID uuid.UUID
Type string
Payload map[string]interface{}
Retries int
CreatedAt time.Time
Priority int // 1 = haut, 2 = moyen, 3 = bas
}
// NewJobWorker crée un nouveau worker de jobs
func NewJobWorker(
db *gorm.DB,
jobService *services.JobService,
logger *zap.Logger,
queueSize int,
workers int,
maxRetries int,
) *JobWorker {
return &JobWorker{
db: db,
jobService: jobService,
logger: logger,
queue: make(chan Job, queueSize),
maxRetries: maxRetries,
processingWorkers: workers,
}
}
// Enqueue ajoute un job au queue
func (w *JobWorker) Enqueue(job Job) {
job.CreatedAt = time.Now()
if job.ID == uuid.Nil {
job.ID = uuid.New()
}
select {
case w.queue <- job:
w.logger.Debug("Job enqueued",
zap.String("job_id", job.ID.String()),
zap.String("job_type", job.Type),
zap.Int("priority", job.Priority))
default:
w.logger.Warn("Job queue full, dropping job",
zap.String("job_type", job.Type))
}
}
// Start démarre le worker
func (w *JobWorker) Start(ctx context.Context) {
w.logger.Info("Starting job worker",
zap.Int("workers", w.processingWorkers))
for i := 0; i < w.processingWorkers; i++ {
go w.processWorker(ctx, i)
}
}
// processWorker traite les jobs du queue
func (w *JobWorker) processWorker(ctx context.Context, workerID int) {
w.logger.Info("Job worker started",
zap.Int("worker_id", workerID))
for {
select {
case <-ctx.Done():
w.logger.Info("Job worker stopping",
zap.Int("worker_id", workerID))
return
case job := <-w.queue:
w.processJob(ctx, job, workerID)
}
}
}
// processJob traite un job individuel
func (w *JobWorker) processJob(ctx context.Context, job Job, workerID int) {
logger := w.logger.With(
zap.String("job_id", job.ID.String()),
zap.String("job_type", job.Type),
zap.Int("worker_id", workerID))
logger.Info("Processing job",
zap.Int("retries", job.Retries))
// Créer un contexte avec timeout
jobCtx, cancel := context.WithTimeout(ctx, 5*time.Minute)
defer cancel()
// Traiter le job selon son type
err := w.executeJob(jobCtx, job)
if err != nil {
logger.Error("Job execution failed",
zap.Error(err))
// Retry si pas atteint max retries
if job.Retries < w.maxRetries {
job.Retries++
// Exponential backoff
delay := time.Duration(job.Retries) * 5 * time.Second
time.Sleep(delay)
// Ré-enqueue le job
w.Enqueue(job)
logger.Info("Retrying job",
zap.Int("new_retries", job.Retries))
} else {
logger.Error("Job failed after max retries",
zap.Int("max_retries", w.maxRetries))
// Enregistrer l'échec définitif
w.logFailedJob(ctx, job, err)
}
} else {
logger.Info("Job executed successfully")
}
}
// executeJob exécute un job selon son type
func (w *JobWorker) executeJob(ctx context.Context, job Job) error {
switch job.Type {
case "email":
return w.processEmailJob(ctx, job)
case "thumbnail":
return w.processThumbnailJob(ctx, job)
case "analytics":
return w.processAnalyticsJob(ctx, job)
default:
return fmt.Errorf("unknown job type: %s", job.Type)
}
}
// processEmailJob traite un job d'email
func (w *JobWorker) processEmailJob(ctx context.Context, job Job) error {
to, ok := job.Payload["to"].(string)
if !ok {
return fmt.Errorf("missing 'to' in payload")
}
subject, _ := job.Payload["subject"].(string)
_, _ = job.Payload["body"].(string)
w.logger.Info("Sending email",
zap.String("to", to),
zap.String("subject", subject))
// TODO: Implémenter envoi email (SMTP, SendGrid, etc.)
// Simuler pour l'instant
time.Sleep(100 * time.Millisecond)
return nil
}
// processThumbnailJob traite un job de génération de thumbnail
func (w *JobWorker) processThumbnailJob(ctx context.Context, job Job) error {
fileID, ok := job.Payload["file_id"].(string)
if !ok {
return fmt.Errorf("missing 'file_id' in payload")
}
fileType, _ := job.Payload["file_type"].(string)
w.logger.Info("Generating thumbnail",
zap.String("file_id", fileID),
zap.String("file_type", fileType))
// TODO: Implémenter génération thumbnail (ImageMagick, etc.)
// Simuler pour l'instant
time.Sleep(500 * time.Millisecond)
return nil
}
// processAnalyticsJob traite un job d'analytics
func (w *JobWorker) processAnalyticsJob(ctx context.Context, job Job) error {
event, ok := job.Payload["event"].(string)
if !ok {
return fmt.Errorf("missing 'event' in payload")
}
w.logger.Info("Processing analytics",
zap.String("event", event))
// TODO: Implémenter traitement analytics
// Simuler pour l'instant
time.Sleep(50 * time.Millisecond)
return nil
}
// logFailedJob enregistre un échec de job
func (w *JobWorker) logFailedJob(ctx context.Context, job Job, err error) {
// TODO: Enregistrer dans la table job_failures
w.logger.Error("Job permanently failed",
zap.String("job_id", job.ID.String()),
zap.String("job_type", job.Type),
zap.Error(err))
}
// GetStats retourne les statistiques du worker
func (w *JobWorker) GetStats() map[string]interface{} {
return map[string]interface{}{
"queue_size": len(w.queue),
"workers": w.processingWorkers,
"max_retries": w.maxRetries,
}
}