feat(backend-worker): persist job queue in postgres
This commit is contained in:
parent
41e554a3e1
commit
8b32beb156
5 changed files with 283 additions and 171 deletions
|
|
@ -7,7 +7,7 @@ import (
|
||||||
"strings"
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"veza-backend-api/internal/email"
|
// "veza-backend-api/internal/email" // Removed unused import
|
||||||
|
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
|
||||||
|
|
@ -11,72 +11,94 @@ import (
|
||||||
"github.com/google/uuid"
|
"github.com/google/uuid"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
"gorm.io/gorm"
|
"gorm.io/gorm"
|
||||||
|
"gorm.io/gorm/clause"
|
||||||
)
|
)
|
||||||
|
|
||||||
// JobWorker gère les tâches en arrière-plan
|
// JobWorker gère les tâches en arrière-plan via une queue persistée en DB
|
||||||
type JobWorker struct {
|
type JobWorker struct {
|
||||||
db *gorm.DB
|
db *gorm.DB
|
||||||
jobService *services.JobService
|
jobService *services.JobService
|
||||||
logger *zap.Logger
|
logger *zap.Logger
|
||||||
queue chan Job
|
|
||||||
maxRetries int
|
maxRetries int
|
||||||
processingWorkers int
|
processingWorkers int
|
||||||
emailSender email.EmailSender // Email sender pour les jobs d'email
|
emailSender email.EmailSender
|
||||||
|
pollingInterval time.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
// Job représente une tâche à traiter
|
// Job représente une tâche persistée en base de données
|
||||||
type Job struct {
|
type Job struct {
|
||||||
ID uuid.UUID
|
ID uuid.UUID `gorm:"type:uuid;primary_key"`
|
||||||
Type string
|
Type string `gorm:"not null"`
|
||||||
Payload map[string]interface{}
|
Payload map[string]interface{} `gorm:"serializer:json;not null"`
|
||||||
Retries int
|
Status string `gorm:"not null;default:'pending'"` // pending, processing, completed, failed
|
||||||
CreatedAt time.Time
|
Priority int `gorm:"not null;default:2"` // 1=high, 2=medium, 3=low
|
||||||
Priority int // 1 = haut, 2 = moyen, 3 = bas
|
RunAt time.Time `gorm:"not null;index"`
|
||||||
|
CreatedAt time.Time `gorm:"not null"`
|
||||||
|
UpdatedAt time.Time `gorm:"not null"`
|
||||||
|
StartedAt *time.Time
|
||||||
|
CompletedAt *time.Time
|
||||||
|
FailedAt *time.Time
|
||||||
|
Retries int `gorm:"not null;default:0"`
|
||||||
|
MaxRetries int `gorm:"not null;default:3"`
|
||||||
|
LastError string `gorm:"type:text"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewJobWorker crée un nouveau worker de jobs
|
// NewJobWorker crée un nouveau worker de jobs persisté
|
||||||
func NewJobWorker(
|
func NewJobWorker(
|
||||||
db *gorm.DB,
|
db *gorm.DB,
|
||||||
jobService *services.JobService,
|
jobService *services.JobService,
|
||||||
logger *zap.Logger,
|
logger *zap.Logger,
|
||||||
queueSize int,
|
_ int, // queueSize ignoré car persisté
|
||||||
workers int,
|
workers int,
|
||||||
maxRetries int,
|
maxRetries int,
|
||||||
emailSender email.EmailSender,
|
emailSender email.EmailSender,
|
||||||
) *JobWorker {
|
) *JobWorker {
|
||||||
|
// AutoMigrate la table Job si nécessaire (optionnel si géré par migrations SQL)
|
||||||
|
// db.AutoMigrate(&Job{})
|
||||||
|
|
||||||
return &JobWorker{
|
return &JobWorker{
|
||||||
db: db,
|
db: db,
|
||||||
jobService: jobService,
|
jobService: jobService,
|
||||||
logger: logger,
|
logger: logger,
|
||||||
queue: make(chan Job, queueSize),
|
|
||||||
maxRetries: maxRetries,
|
maxRetries: maxRetries,
|
||||||
processingWorkers: workers,
|
processingWorkers: workers,
|
||||||
emailSender: emailSender,
|
emailSender: emailSender,
|
||||||
|
pollingInterval: 1 * time.Second, // Polling agressif pour réactivité
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Enqueue ajoute un job au queue
|
// Enqueue ajoute un job dans la table jobs
|
||||||
func (w *JobWorker) Enqueue(job Job) {
|
func (w *JobWorker) Enqueue(job Job) {
|
||||||
job.CreatedAt = time.Now()
|
|
||||||
if job.ID == uuid.Nil {
|
if job.ID == uuid.Nil {
|
||||||
job.ID = uuid.New()
|
job.ID = uuid.New()
|
||||||
}
|
}
|
||||||
|
// Initialisation des champs par défaut
|
||||||
select {
|
if job.Status == "" {
|
||||||
case w.queue <- job:
|
job.Status = "pending"
|
||||||
w.logger.Debug("Job enqueued",
|
|
||||||
zap.String("job_id", job.ID.String()),
|
|
||||||
zap.String("job_type", job.Type),
|
|
||||||
zap.Int("priority", job.Priority))
|
|
||||||
default:
|
|
||||||
w.logger.Warn("Job queue full, dropping job",
|
|
||||||
zap.String("job_type", job.Type))
|
|
||||||
}
|
}
|
||||||
|
if job.RunAt.IsZero() {
|
||||||
|
job.RunAt = time.Now()
|
||||||
|
}
|
||||||
|
if job.MaxRetries == 0 {
|
||||||
|
job.MaxRetries = w.maxRetries
|
||||||
|
}
|
||||||
|
// Le mapping GORM gère CreatedAt/UpdatedAt
|
||||||
|
|
||||||
|
if err := w.db.Create(&job).Error; err != nil {
|
||||||
|
w.logger.Error("Failed to enqueue job",
|
||||||
|
zap.String("type", job.Type),
|
||||||
|
zap.Error(err))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
w.logger.Debug("Job enqueued (persisted)",
|
||||||
|
zap.String("job_id", job.ID.String()),
|
||||||
|
zap.String("type", job.Type))
|
||||||
}
|
}
|
||||||
|
|
||||||
// Start démarre le worker
|
// Start démarre les workers de polling
|
||||||
func (w *JobWorker) Start(ctx context.Context) {
|
func (w *JobWorker) Start(ctx context.Context) {
|
||||||
w.logger.Info("Starting job worker",
|
w.logger.Info("Starting persisted job worker",
|
||||||
zap.Int("workers", w.processingWorkers))
|
zap.Int("workers", w.processingWorkers))
|
||||||
|
|
||||||
for i := 0; i < w.processingWorkers; i++ {
|
for i := 0; i < w.processingWorkers; i++ {
|
||||||
|
|
@ -84,79 +106,123 @@ func (w *JobWorker) Start(ctx context.Context) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// processWorker traite les jobs du queue
|
// processWorker boucle de polling et traitement
|
||||||
func (w *JobWorker) processWorker(ctx context.Context, workerID int) {
|
func (w *JobWorker) processWorker(ctx context.Context, workerID int) {
|
||||||
w.logger.Info("Job worker started",
|
ticker := time.NewTicker(w.pollingInterval)
|
||||||
zap.Int("worker_id", workerID))
|
defer ticker.Stop()
|
||||||
|
|
||||||
|
w.logger.Info("Worker started", zap.Int("worker_id", workerID))
|
||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
w.logger.Info("Job worker stopping",
|
w.logger.Info("Worker stopping", zap.Int("worker_id", workerID))
|
||||||
zap.Int("worker_id", workerID))
|
|
||||||
return
|
return
|
||||||
|
case <-ticker.C:
|
||||||
case job := <-w.queue:
|
w.fetchAndProcessJob(ctx, workerID)
|
||||||
w.processJob(ctx, job, workerID)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// processJob traite un job individuel
|
// fetchAndProcessJob récupère UN job en attente (atomiquement) et le traite
|
||||||
|
func (w *JobWorker) fetchAndProcessJob(ctx context.Context, workerID int) {
|
||||||
|
var job Job
|
||||||
|
|
||||||
|
// Transaction pour verrouiller le job (SELECT ... FOR UPDATE SKIP LOCKED)
|
||||||
|
// Compatible Postgres (et MySQL 8+). Pour SQLite, le locking est différent mais Gorm gère le basic.
|
||||||
|
err := w.db.Transaction(func(tx *gorm.DB) error {
|
||||||
|
// Trouver un job 'pending' ou 'failed' (si retry auto géré ici, mais on préfère 'pending' avec RunAt <= Now)
|
||||||
|
// On cherche status='pending' AND run_at <= NOW()
|
||||||
|
// Order by Priority ASC (1 first), then CreatedAt
|
||||||
|
if err := tx.Clauses(clause.Locking{Strength: "UPDATE", Options: "SKIP LOCKED"}).
|
||||||
|
Where("status = ? AND run_at <= ?", "pending", time.Now()).
|
||||||
|
Order("priority ASC, created_at ASC").
|
||||||
|
First(&job).Error; err != nil {
|
||||||
|
return err // RecordNotFound est typique ici
|
||||||
|
}
|
||||||
|
|
||||||
|
// Update status to 'processing'
|
||||||
|
now := time.Now()
|
||||||
|
job.Status = "processing"
|
||||||
|
job.StartedAt = &now
|
||||||
|
if err := tx.Save(&job).Error; err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
if err != gorm.ErrRecordNotFound {
|
||||||
|
w.logger.Error("Failed to fetch job", zap.Error(err))
|
||||||
|
}
|
||||||
|
// Pas de job à traiter, on attend le prochain tick
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Job récupéré, on traite
|
||||||
|
w.processJob(ctx, job, workerID)
|
||||||
|
}
|
||||||
|
|
||||||
|
// processJob exécute la logique métier et met à jour le statut final
|
||||||
func (w *JobWorker) processJob(ctx context.Context, job Job, workerID int) {
|
func (w *JobWorker) processJob(ctx context.Context, job Job, workerID int) {
|
||||||
|
// Si le payload est une map vide, tenter de le decoder s'il vient de GORM (jsonb)
|
||||||
|
// Gorm avec `serializer:json` devrait le faire auto, mais verifions.
|
||||||
|
|
||||||
logger := w.logger.With(
|
logger := w.logger.With(
|
||||||
zap.String("job_id", job.ID.String()),
|
zap.String("job_id", job.ID.String()),
|
||||||
zap.String("job_type", job.Type),
|
zap.String("type", job.Type),
|
||||||
zap.Int("worker_id", workerID))
|
zap.Int("worker_id", workerID),
|
||||||
|
zap.Int("retry", job.Retries))
|
||||||
|
|
||||||
logger.Info("Processing job",
|
logger.Info("Processing job")
|
||||||
zap.Int("retries", job.Retries))
|
|
||||||
|
|
||||||
// Créer un contexte avec timeout
|
// Timeout per job execution
|
||||||
jobCtx, cancel := context.WithTimeout(ctx, 5*time.Minute)
|
jobCtx, cancel := context.WithTimeout(ctx, 5*time.Minute)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
// Traiter le job selon son type
|
// Exécution
|
||||||
err := w.executeJob(jobCtx, job)
|
execErr := w.executeJob(jobCtx, job)
|
||||||
|
|
||||||
if err != nil {
|
// Update status final
|
||||||
logger.Error("Job execution failed",
|
now := time.Now()
|
||||||
zap.Error(err))
|
if execErr != nil {
|
||||||
|
logger.Error("Job execution failed", zap.Error(execErr))
|
||||||
// Retry si pas atteint max retries
|
|
||||||
if job.Retries < w.maxRetries {
|
// Calcul du prochain retry
|
||||||
job.Retries++
|
job.Retries++
|
||||||
|
job.LastError = execErr.Error()
|
||||||
// Exponential backoff
|
|
||||||
delay := time.Duration(job.Retries) * 5 * time.Second
|
if job.Retries >= job.MaxRetries {
|
||||||
|
job.Status = "failed"
|
||||||
// Non-blocking retry: re-enqueue after delay
|
job.FailedAt = &now
|
||||||
go func(d time.Duration, j Job) {
|
logger.Error("Job reached max retries, marked as failed")
|
||||||
time.Sleep(d)
|
|
||||||
w.Enqueue(j)
|
|
||||||
}(delay, job)
|
|
||||||
|
|
||||||
logger.Info("Job scheduled for retry",
|
|
||||||
zap.Duration("delay", delay),
|
|
||||||
zap.Int("new_retries", job.Retries))
|
|
||||||
} else {
|
} else {
|
||||||
logger.Error("Job failed after max retries",
|
// Backoff exponentiel : 5s, 10s, 20s... (lineaire * coefficient) ou 5 * retry
|
||||||
zap.Int("max_retries", w.maxRetries))
|
backoff := time.Duration(job.Retries) * 10 * time.Second
|
||||||
|
job.Status = "pending" // Retour en queue
|
||||||
// Enregistrer l'échec définitif
|
job.RunAt = time.Now().Add(backoff)
|
||||||
w.logFailedJob(ctx, job, err)
|
logger.Info("Job scheduled for retry", zap.Duration("backoff", backoff))
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
logger.Info("Job executed successfully")
|
job.Status = "completed"
|
||||||
|
job.CompletedAt = &now
|
||||||
|
logger.Info("Job completed successfully")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Sauvegarde finale
|
||||||
|
// On le fait hors transaction "fetch", car le traitement peut être long
|
||||||
|
if err := w.db.Save(&job).Error; err != nil {
|
||||||
|
logger.Error("Failed to update job status after execution", zap.Error(err))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// executeJob exécute un job selon son type
|
// executeJob exécute la logique selon le type (inchangé)
|
||||||
func (w *JobWorker) executeJob(ctx context.Context, job Job) error {
|
func (w *JobWorker) executeJob(ctx context.Context, job Job) error {
|
||||||
switch job.Type {
|
switch job.Type {
|
||||||
case "email":
|
case "email":
|
||||||
return w.processEmailJob(ctx, job)
|
return w.processEmailJob(ctx, job)
|
||||||
case "thumbnail":
|
case "thumbnail":
|
||||||
|
// Mapping manuel pour compatibilité avec l'ancien code si nécessaire
|
||||||
return w.processThumbnailJob(ctx, job)
|
return w.processThumbnailJob(ctx, job)
|
||||||
case "analytics":
|
case "analytics":
|
||||||
return w.processAnalyticsJob(ctx, job)
|
return w.processAnalyticsJob(ctx, job)
|
||||||
|
|
@ -165,27 +231,33 @@ func (w *JobWorker) executeJob(ctx context.Context, job Job) error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// processEmailJob traite un job d'email
|
// processEmailJob (inchangé structurellement, mais adapte le payload use)
|
||||||
func (w *JobWorker) processEmailJob(ctx context.Context, job Job) error {
|
func (w *JobWorker) processEmailJob(ctx context.Context, job Job) error {
|
||||||
// Extraire les données du payload
|
// Re-conversion du payload map si nécessaire
|
||||||
to, ok := job.Payload["to"].(string)
|
p := job.Payload
|
||||||
if !ok {
|
|
||||||
|
to, _ := p["to"].(string)
|
||||||
|
if to == "" {
|
||||||
return fmt.Errorf("missing 'to' in payload")
|
return fmt.Errorf("missing 'to' in payload")
|
||||||
}
|
}
|
||||||
|
|
||||||
subject, _ := job.Payload["subject"].(string)
|
subject, _ := p["subject"].(string)
|
||||||
body, _ := job.Payload["body"].(string)
|
body, _ := p["body"].(string)
|
||||||
templateName, _ := job.Payload["template"].(string)
|
templateName, _ := p["template"].(string)
|
||||||
|
|
||||||
// Extraire les données du template si présentes
|
|
||||||
var templateData map[string]interface{}
|
var templateData map[string]interface{}
|
||||||
if data, ok := job.Payload["template_data"].(map[string]interface{}); ok {
|
// Gorm serialization handle maps directly
|
||||||
|
if data, ok := p["template_data"].(map[string]interface{}); ok {
|
||||||
templateData = data
|
templateData = data
|
||||||
} else {
|
} else {
|
||||||
templateData = make(map[string]interface{})
|
// Try generic map
|
||||||
|
if data, ok := p["template_data"].(map[string]any); ok {
|
||||||
|
templateData = data
|
||||||
|
} else {
|
||||||
|
templateData = make(map[string]interface{})
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Créer l'EmailJob
|
|
||||||
var emailJob *EmailJob
|
var emailJob *EmailJob
|
||||||
if templateName != "" {
|
if templateName != "" {
|
||||||
emailJob = NewEmailJobWithTemplate(to, subject, templateName, templateData)
|
emailJob = NewEmailJobWithTemplate(to, subject, templateName, templateData)
|
||||||
|
|
@ -193,19 +265,16 @@ func (w *JobWorker) processEmailJob(ctx context.Context, job Job) error {
|
||||||
emailJob = NewEmailJob(to, subject, body)
|
emailJob = NewEmailJob(to, subject, body)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Exécuter le job d'email
|
return emailJob.Execute(ctx, w.emailSender, w.logger)
|
||||||
if err := emailJob.Execute(ctx, w.emailSender, w.logger); err != nil {
|
|
||||||
return fmt.Errorf("email job execution failed: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// EnqueueEmailJob ajoute un job d'email au queue (méthode helper)
|
// Helper methods pour enqueuing (inchangés, mais adaptent l'objet Job)
|
||||||
|
|
||||||
|
// EnqueueEmailJob helper
|
||||||
func (w *JobWorker) EnqueueEmailJob(to, subject, body string) {
|
func (w *JobWorker) EnqueueEmailJob(to, subject, body string) {
|
||||||
job := Job{
|
job := Job{
|
||||||
Type: "email",
|
Type: "email",
|
||||||
Priority: 2, // Priorité moyenne par défaut
|
Priority: 2,
|
||||||
Payload: map[string]interface{}{
|
Payload: map[string]interface{}{
|
||||||
"to": to,
|
"to": to,
|
||||||
"subject": subject,
|
"subject": subject,
|
||||||
|
|
@ -215,41 +284,41 @@ func (w *JobWorker) EnqueueEmailJob(to, subject, body string) {
|
||||||
w.Enqueue(job)
|
w.Enqueue(job)
|
||||||
}
|
}
|
||||||
|
|
||||||
// EnqueueEmailJobWithTemplate ajoute un job d'email avec template au queue
|
// EnqueueEmailJobWithTemplate helper
|
||||||
func (w *JobWorker) EnqueueEmailJobWithTemplate(to, subject, templateName string, templateData map[string]interface{}) {
|
func (w *JobWorker) EnqueueEmailJobWithTemplate(to, subject, templateName string, templateData map[string]interface{}) {
|
||||||
job := Job{
|
job := Job{
|
||||||
Type: "email",
|
Type: "email",
|
||||||
Priority: 2, // Priorité moyenne par défaut
|
Priority: 2,
|
||||||
Payload: map[string]interface{}{
|
Payload: map[string]interface{}{
|
||||||
"to": to,
|
"to": to,
|
||||||
"subject": subject,
|
"subject": subject,
|
||||||
"template": templateName,
|
"template": templateName,
|
||||||
"template_data": templateData,
|
"template_data": templateData,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
w.Enqueue(job)
|
w.Enqueue(job)
|
||||||
}
|
}
|
||||||
|
|
||||||
// EnqueueThumbnailJob ajoute un job de génération de thumbnail au queue
|
// EnqueueThumbnailJob helper
|
||||||
func (w *JobWorker) EnqueueThumbnailJob(inputPath, outputPath string, width, height int) {
|
func (w *JobWorker) EnqueueThumbnailJob(inputPath, outputPath string, width, height int) {
|
||||||
job := Job{
|
job := Job{
|
||||||
Type: "thumbnail",
|
Type: "thumbnail",
|
||||||
Priority: 2, // Priorité moyenne par défaut
|
Priority: 2,
|
||||||
Payload: map[string]interface{}{
|
Payload: map[string]interface{}{
|
||||||
"input_path": inputPath,
|
"input_path": inputPath,
|
||||||
"output_path": outputPath,
|
"output_path": outputPath,
|
||||||
"width": float64(width),
|
"width": width,
|
||||||
"height": float64(height),
|
"height": height,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
w.Enqueue(job)
|
w.Enqueue(job)
|
||||||
}
|
}
|
||||||
|
|
||||||
// EnqueueAnalyticsJob ajoute un job d'analytics au queue
|
// EnqueueAnalyticsJob helper
|
||||||
func (w *JobWorker) EnqueueAnalyticsJob(eventName string, userID *uuid.UUID, payload map[string]interface{}) {
|
func (w *JobWorker) EnqueueAnalyticsJob(eventName string, userID *uuid.UUID, payload map[string]interface{}) {
|
||||||
jobPayload := map[string]interface{}{
|
jobPayload := map[string]interface{}{
|
||||||
"event_name": eventName,
|
"event_name": eventName,
|
||||||
"payload": payload,
|
"payload": payload,
|
||||||
}
|
}
|
||||||
if userID != nil {
|
if userID != nil {
|
||||||
jobPayload["user_id"] = userID.String()
|
jobPayload["user_id"] = userID.String()
|
||||||
|
|
@ -257,93 +326,85 @@ func (w *JobWorker) EnqueueAnalyticsJob(eventName string, userID *uuid.UUID, pay
|
||||||
|
|
||||||
job := Job{
|
job := Job{
|
||||||
Type: "analytics",
|
Type: "analytics",
|
||||||
Priority: 3, // Priorité basse par défaut (analytics non critique)
|
Priority: 3,
|
||||||
Payload: jobPayload,
|
Payload: jobPayload,
|
||||||
}
|
}
|
||||||
w.Enqueue(job)
|
w.Enqueue(job)
|
||||||
}
|
}
|
||||||
|
|
||||||
// processThumbnailJob traite un job de génération de thumbnail
|
// processThumbnailJob wrapper
|
||||||
func (w *JobWorker) processThumbnailJob(ctx context.Context, job Job) error {
|
func (w *JobWorker) processThumbnailJob(ctx context.Context, job Job) error {
|
||||||
// Extraire les paramètres du payload
|
p := job.Payload
|
||||||
inputPath, ok := job.Payload["input_path"].(string)
|
inputPath, _ := p["input_path"].(string)
|
||||||
if !ok {
|
outputPath, _ := p["output_path"].(string)
|
||||||
return fmt.Errorf("missing 'input_path' in payload")
|
|
||||||
|
if inputPath == "" || outputPath == "" {
|
||||||
|
return fmt.Errorf("missing paths in payload")
|
||||||
}
|
}
|
||||||
|
|
||||||
outputPath, ok := job.Payload["output_path"].(string)
|
// JSON unmarshal numbers as float64
|
||||||
if !ok {
|
|
||||||
return fmt.Errorf("missing 'output_path' in payload")
|
|
||||||
}
|
|
||||||
|
|
||||||
// Largeur et hauteur (optionnels, avec valeurs par défaut)
|
|
||||||
width := 300
|
width := 300
|
||||||
|
if wVal, ok := p["width"].(float64); ok {
|
||||||
|
width = int(wVal)
|
||||||
|
} else if wInt, ok := p["width"].(int); ok { // just in case
|
||||||
|
width = wInt
|
||||||
|
}
|
||||||
|
|
||||||
height := 300
|
height := 300
|
||||||
if w, ok := job.Payload["width"].(float64); ok {
|
if hVal, ok := p["height"].(float64); ok {
|
||||||
width = int(w)
|
height = int(hVal)
|
||||||
}
|
} else if hInt, ok := p["height"].(int); ok {
|
||||||
if h, ok := job.Payload["height"].(float64); ok {
|
height = hInt
|
||||||
height = int(h)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Créer et exécuter le ThumbnailJob
|
|
||||||
thumbnailJob := NewThumbnailJob(inputPath, outputPath, width, height)
|
thumbnailJob := NewThumbnailJob(inputPath, outputPath, width, height)
|
||||||
if err := thumbnailJob.Execute(ctx, w.logger); err != nil {
|
return thumbnailJob.Execute(ctx, w.logger)
|
||||||
return fmt.Errorf("thumbnail job execution failed: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// processAnalyticsJob traite un job d'analytics
|
// processAnalyticsJob wrapper
|
||||||
func (w *JobWorker) processAnalyticsJob(ctx context.Context, job Job) error {
|
func (w *JobWorker) processAnalyticsJob(ctx context.Context, job Job) error {
|
||||||
// Extraire les données du payload
|
p := job.Payload
|
||||||
eventName, ok := job.Payload["event_name"].(string)
|
eventName, _ := p["event_name"].(string)
|
||||||
if !ok {
|
if eventName == "" {
|
||||||
return fmt.Errorf("missing 'event_name' in payload")
|
return fmt.Errorf("missing event_name")
|
||||||
}
|
}
|
||||||
|
|
||||||
// UserID (optionnel, peut être nil pour événements anonymes)
|
|
||||||
var userID *uuid.UUID
|
var userID *uuid.UUID
|
||||||
if uidStr, ok := job.Payload["user_id"].(string); ok && uidStr != "" {
|
if uidStr, ok := p["user_id"].(string); ok && uidStr != "" {
|
||||||
uid, err := uuid.Parse(uidStr)
|
uid, err := uuid.Parse(uidStr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("invalid user_id format: %w", err)
|
return fmt.Errorf("invalid user_id: %w", err)
|
||||||
}
|
}
|
||||||
userID = &uid
|
userID = &uid
|
||||||
}
|
}
|
||||||
|
|
||||||
// Payload additionnel (optionnel)
|
var extraPayload map[string]interface{}
|
||||||
var payload map[string]interface{}
|
// Handle nested map from JSON
|
||||||
if p, ok := job.Payload["payload"].(map[string]interface{}); ok {
|
if nested, ok := p["payload"].(map[string]interface{}); ok {
|
||||||
payload = p
|
extraPayload = nested
|
||||||
|
} else if nested, ok := p["payload"].(map[string]any); ok {
|
||||||
|
extraPayload = nested
|
||||||
} else {
|
} else {
|
||||||
payload = make(map[string]interface{})
|
// If payload is a string (escaped json), try unmarshal?
|
||||||
|
// For now assume standard structure
|
||||||
|
extraPayload = make(map[string]interface{})
|
||||||
}
|
}
|
||||||
|
|
||||||
// Créer et exécuter l'AnalyticsEventJob
|
analyticsJob := NewAnalyticsEventJob(eventName, userID, extraPayload)
|
||||||
analyticsJob := NewAnalyticsEventJob(eventName, userID, payload)
|
return analyticsJob.Execute(ctx, w.db, w.logger)
|
||||||
if err := analyticsJob.Execute(ctx, w.db, w.logger); err != nil {
|
|
||||||
return fmt.Errorf("analytics job execution failed: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// logFailedJob enregistre un échec de job
|
// GetStats retourne les stats DB si possible
|
||||||
func (w *JobWorker) logFailedJob(ctx context.Context, job Job, err error) {
|
|
||||||
// TODO: Enregistrer dans la table job_failures
|
|
||||||
w.logger.Error("Job permanently failed",
|
|
||||||
zap.String("job_id", job.ID.String()),
|
|
||||||
zap.String("job_type", job.Type),
|
|
||||||
zap.Error(err))
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetStats retourne les statistiques du worker
|
|
||||||
func (w *JobWorker) GetStats() map[string]interface{} {
|
func (w *JobWorker) GetStats() map[string]interface{} {
|
||||||
|
var pending, processing, failed int64
|
||||||
|
w.db.Model(&Job{}).Where("status = ?", "pending").Count(&pending)
|
||||||
|
w.db.Model(&Job{}).Where("status = ?", "processing").Count(&processing)
|
||||||
|
w.db.Model(&Job{}).Where("status = ?", "failed").Count(&failed)
|
||||||
|
|
||||||
return map[string]interface{}{
|
return map[string]interface{}{
|
||||||
"queue_size": len(w.queue),
|
"queue_pending": pending,
|
||||||
"workers": w.processingWorkers,
|
"queue_processing": processing,
|
||||||
"max_retries": w.maxRetries,
|
"queue_failed": failed,
|
||||||
|
"workers": w.processingWorkers,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -20,7 +20,12 @@ func setupTestJobWorker(t *testing.T) (*JobWorker, *gorm.DB) {
|
||||||
t.Fatalf("Failed to open test database: %v", err)
|
t.Fatalf("Failed to open test database: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
logger, _ := zap.NewDevelopment()
|
// Auto-migrate the Job struct for tests
|
||||||
|
if err := db.AutoMigrate(&Job{}); err != nil {
|
||||||
|
t.Fatalf("Failed to migrate test database: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
logger := zap.NewNop() // Use Nop for tests to avoid noise
|
||||||
jobService := services.NewJobService(logger)
|
jobService := services.NewJobService(logger)
|
||||||
|
|
||||||
// Config SMTP de test (mock)
|
// Config SMTP de test (mock)
|
||||||
|
|
@ -37,7 +42,7 @@ func setupTestJobWorker(t *testing.T) (*JobWorker, *gorm.DB) {
|
||||||
db,
|
db,
|
||||||
jobService,
|
jobService,
|
||||||
logger,
|
logger,
|
||||||
10, // queueSize
|
10, // queueSize (ignored)
|
||||||
1, // workers
|
1, // workers
|
||||||
3, // maxRetries
|
3, // maxRetries
|
||||||
emailSender,
|
emailSender,
|
||||||
|
|
@ -62,9 +67,9 @@ func TestJobWorker_Enqueue(t *testing.T) {
|
||||||
worker.Enqueue(job)
|
worker.Enqueue(job)
|
||||||
|
|
||||||
stats := worker.GetStats()
|
stats := worker.GetStats()
|
||||||
queueSize := stats["queue_size"].(int)
|
queueSize := stats["queue_pending"].(int64)
|
||||||
if queueSize != 1 {
|
if queueSize != 1 {
|
||||||
t.Errorf("Expected queue size to be 1, got %d", queueSize)
|
t.Errorf("Expected queue pending to be 1, got %d", queueSize)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -74,9 +79,9 @@ func TestJobWorker_EnqueueEmailJob(t *testing.T) {
|
||||||
worker.EnqueueEmailJob("test@example.com", "Test Subject", "Test Body")
|
worker.EnqueueEmailJob("test@example.com", "Test Subject", "Test Body")
|
||||||
|
|
||||||
stats := worker.GetStats()
|
stats := worker.GetStats()
|
||||||
queueSize := stats["queue_size"].(int)
|
queueSize := stats["queue_pending"].(int64)
|
||||||
if queueSize != 1 {
|
if queueSize != 1 {
|
||||||
t.Errorf("Expected queue size to be 1, got %d", queueSize)
|
t.Errorf("Expected queue pending to be 1, got %d", queueSize)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -96,9 +101,9 @@ func TestJobWorker_EnqueueEmailJobWithTemplate(t *testing.T) {
|
||||||
)
|
)
|
||||||
|
|
||||||
stats := worker.GetStats()
|
stats := worker.GetStats()
|
||||||
queueSize := stats["queue_size"].(int)
|
queueSize := stats["queue_pending"].(int64)
|
||||||
if queueSize != 1 {
|
if queueSize != 1 {
|
||||||
t.Errorf("Expected queue size to be 1, got %d", queueSize)
|
t.Errorf("Expected queue pending to be 1, got %d", queueSize)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -114,11 +119,33 @@ func TestJobWorker_Start(t *testing.T) {
|
||||||
// Enqueue un job
|
// Enqueue un job
|
||||||
worker.EnqueueEmailJob("test@example.com", "Test", "Body")
|
worker.EnqueueEmailJob("test@example.com", "Test", "Body")
|
||||||
|
|
||||||
// Attendre un peu pour que le worker traite le job
|
// Attendre un peu pour que le worker traite le job (polling interval is 1s, setupTestJobWorker uses real NewJobWorker so 1s)
|
||||||
time.Sleep(100 * time.Millisecond)
|
// We need to override polling interval or wait longer.
|
||||||
|
// Or we can modify NewJobWorker to accept config/options but that would change signature again.
|
||||||
|
// For test, 1s interval might be slow.
|
||||||
|
// Let's modify JobWorker struct locally in test if possible, assuming fields are exported or we add a Setter.
|
||||||
|
// They are unexported.
|
||||||
|
// We can update pollingInterval via reflection or just wait > 1s.
|
||||||
|
// Or we can construct JobWorker manually in setupTestJobWorker if NewJobWorker doesn't allow it.
|
||||||
|
// Since NewJobWorker hardcodes 1s, we should wait slightly more than 1s in test if we want to verify processing.
|
||||||
|
// Or we just check that it started.
|
||||||
|
|
||||||
|
// Let's modify valid wait time
|
||||||
|
worker.pollingInterval = 10 * time.Millisecond // Set shorter interval for test (if allowed, wait, it's unexported in package workers? Yes but test is in package workers)
|
||||||
|
|
||||||
|
// Wait for processing
|
||||||
|
time.Sleep(200 * time.Millisecond)
|
||||||
|
|
||||||
// Le job devrait être traité (queue vide ou en cours)
|
// Le job devrait être traité (pending 0)
|
||||||
stats := worker.GetStats()
|
stats := worker.GetStats()
|
||||||
_ = stats // Vérifier que les stats sont disponibles
|
pending := stats["queue_pending"].(int64)
|
||||||
|
processing := stats["queue_processing"].(int64)
|
||||||
|
// It relies on email sending success which might fail with mock?
|
||||||
|
// If failed, it might be in pending (retry) or failed.
|
||||||
|
|
||||||
|
t.Logf("Stats: %+v", stats)
|
||||||
|
if pending > 0 && processing == 0 {
|
||||||
|
t.Log("Job still pending or retrying")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -2,6 +2,7 @@ package workers
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"image/color"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
@ -22,7 +23,7 @@ func TestThumbnailJob_Execute(t *testing.T) {
|
||||||
testThumbnailPath := filepath.Join(tmpDir, "test_thumb.jpg")
|
testThumbnailPath := filepath.Join(tmpDir, "test_thumb.jpg")
|
||||||
|
|
||||||
// Créer une image de test avec imaging (image rouge 100x100)
|
// Créer une image de test avec imaging (image rouge 100x100)
|
||||||
img := imaging.New(100, 100, imaging.Color{255, 0, 0, 255})
|
img := imaging.New(100, 100, color.NRGBA{255, 0, 0, 255})
|
||||||
if err := imaging.Save(img, testImagePath); err != nil {
|
if err := imaging.Save(img, testImagePath); err != nil {
|
||||||
t.Fatalf("Failed to create test image: %v", err)
|
t.Fatalf("Failed to create test image: %v", err)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
23
veza-backend-api/migrations/060_job_queue.sql
Normal file
23
veza-backend-api/migrations/060_job_queue.sql
Normal file
|
|
@ -0,0 +1,23 @@
|
||||||
|
-- Migration: 060_job_queue.sql
|
||||||
|
-- Description: Create jobs table for persistent worker queue
|
||||||
|
|
||||||
|
CREATE TABLE IF NOT EXISTS jobs (
|
||||||
|
id UUID PRIMARY KEY,
|
||||||
|
type VARCHAR(50) NOT NULL,
|
||||||
|
payload JSONB NOT NULL DEFAULT '{}',
|
||||||
|
status VARCHAR(20) NOT NULL DEFAULT 'pending', -- pending, processing, completed, failed
|
||||||
|
priority INT NOT NULL DEFAULT 2, -- 1=high, 2=medium, 3=low
|
||||||
|
run_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
|
||||||
|
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
|
||||||
|
updated_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
|
||||||
|
started_at TIMESTAMP WITH TIME ZONE,
|
||||||
|
completed_at TIMESTAMP WITH TIME ZONE,
|
||||||
|
failed_at TIMESTAMP WITH TIME ZONE,
|
||||||
|
retries INT NOT NULL DEFAULT 0,
|
||||||
|
max_retries INT NOT NULL DEFAULT 3,
|
||||||
|
last_error TEXT
|
||||||
|
);
|
||||||
|
|
||||||
|
-- Index for efficient polling
|
||||||
|
CREATE INDEX IF NOT EXISTS idx_jobs_status_run_at ON jobs (status, run_at);
|
||||||
|
CREATE INDEX IF NOT EXISTS idx_jobs_type ON jobs (type);
|
||||||
Loading…
Reference in a new issue