Backend Go: - Remplacement complet des anciennes migrations par la base V1 alignée sur ORIGIN. - Durcissement global du parsing JSON (BindAndValidateJSON + RespondWithAppError). - Sécurisation de config.go, CORS, statuts de santé et monitoring. - Implémentation des transactions P0 (RBAC, duplication de playlists, social toggles). - Ajout d’un job worker structuré (emails, analytics, thumbnails) + tests associés. - Nouvelle doc backend : AUDIT_CONFIG, BACKEND_CONFIG, AUTH_PASSWORD_RESET, JOB_WORKER_*. Chat server (Rust): - Refonte du pipeline JWT + sécurité, audit et rate limiting avancé. - Implémentation complète du cycle de message (read receipts, delivered, edit/delete, typing). - Nettoyage des panics, gestion d’erreurs robuste, logs structurés. - Migrations chat alignées sur le schéma UUID et nouvelles features. Stream server (Rust): - Refonte du moteur de streaming (encoding pipeline + HLS) et des modules core. - Transactions P0 pour les jobs et segments, garanties d’atomicité. - Documentation détaillée de la pipeline (AUDIT_STREAM_*, DESIGN_STREAM_PIPELINE, TRANSACTIONS_P0_IMPLEMENTATION). Documentation & audits: - TRIAGE.md et AUDIT_STABILITY.md à jour avec l’état réel des 3 services. - Cartographie complète des migrations et des transactions (DB_MIGRATIONS_*, DB_TRANSACTION_PLAN, AUDIT_DB_TRANSACTIONS, TRANSACTION_TESTS_PHASE3). - Scripts de reset et de cleanup pour la lab DB et la V1. Ce commit fige l’ensemble du travail de stabilisation P0 (UUID, backend, chat et stream) avant les phases suivantes (Coherence Guardian, WS hardening, etc.).
14 KiB
Job Worker System - Documentation Complète
Date : 2025-12-05
Version : 1.0
Statut : ✅ IMPLÉMENTÉ
Table des Matières
- Vue d'ensemble
- Architecture
- Types de Jobs
- API et Utilisation
- Configuration
- Tests
- Monitoring et Observabilité
- Guide d'Intégration
- Troubleshooting
Vue d'ensemble
Le système de Job Worker de Veza permet d'exécuter des tâches asynchrones en arrière-plan, garantissant que les opérations longues ou non critiques n'impactent pas la performance de l'API.
Fonctionnalités
- ✅ Queue in-memory avec workers pool
- ✅ Retry automatique avec exponential backoff
- ✅ Support de plusieurs types de jobs (Email, Thumbnail, Analytics)
- ✅ Logging structuré avec zap
- ✅ Gestion d'erreurs robuste
- ✅ Priorités de jobs (1 = haut, 2 = moyen, 3 = bas)
Types de Jobs Implémentés
- EmailJob : Envoi d'emails transactionnels via SMTP
- ThumbnailJob : Génération de thumbnails d'images
- AnalyticsEventJob : Enregistrement d'événements analytics génériques
Architecture
Schéma Global
┌─────────────────┐
│ API Handler │
│ (Gin Handler) │
└────────┬────────┘
│
│ EnqueueJob()
▼
┌─────────────────┐
│ JobWorker │
│ │
│ - Queue (chan) │
│ - Workers (N) │
│ - Retry Logic │
└────────┬────────┘
│
│ Dispatch by Type
▼
┌─────────────────┬─────────────────┬──────────────────────┐
│ EmailJob │ ThumbnailJob │ AnalyticsEventJob │
│ │ │ │
│ - Render │ - Resize │ - Store │
│ Template │ Image │ Event │
│ - Send SMTP │ - Save File │ - JSON Payload │
└─────────────────┴─────────────────┴─────────────────┘
Composants Principaux
1. JobWorker (internal/workers/job_worker.go)
Structure centrale qui gère :
- La queue de jobs (
chan Job) - Le pool de workers
- Le dispatch par type
- La logique de retry
type JobWorker struct {
db *gorm.DB
jobService *services.JobService
logger *zap.Logger
queue chan Job
maxRetries int
processingWorkers int
emailSender email.EmailSender
}
2. Job Interface
Tous les jobs implémentent l'interface Job :
type Job struct {
ID uuid.UUID
Type string // "email", "thumbnail", "analytics"
Payload map[string]interface{}
Retries int
CreatedAt time.Time
Priority int // 1 = haut, 2 = moyen, 3 = bas
}
3. Workers Pool
Par défaut, le nombre de workers est configuré à 3 (modifiable dans config.go).
Chaque worker :
- Lit depuis la queue
- Exécute le job via
executeJob() - Gère les retries en cas d'échec
- Log les résultats
Types de Jobs
1. EmailJob
Fichier : internal/workers/email_job.go
Description : Envoie des emails transactionnels via SMTP avec support de templates HTML.
Utilisation :
// Email simple
jobWorker.EnqueueEmailJob(
"user@example.com",
"Welcome to Veza",
"<h1>Welcome!</h1><p>Thanks for joining.</p>",
)
// Email avec template
jobWorker.EnqueueEmailJobWithTemplate(
"user@example.com",
"Reset your password",
"password_reset",
map[string]interface{}{
"Username": "john_doe",
"ResetURL": "https://veza.app/reset?token=...",
},
)
Templates disponibles :
templates/email/password_reset.htmltemplates/email/welcome.html
Configuration SMTP : Variables d'environnement (voir Configuration)
2. ThumbnailJob
Fichier : internal/workers/thumbnail_job.go
Description : Génère des thumbnails d'images avec redimensionnement et compression.
Utilisation :
jobWorker.EnqueueThumbnailJob(
"/uploads/images/original.jpg", // Input path
"/uploads/thumbnails/thumb.jpg", // Output path
300, // Width (px)
300, // Height (px)
)
Caractéristiques :
- Support formats : JPEG, PNG, GIF, BMP
- Algorithme : Lanczos (haute qualité)
- Dimensions par défaut : 300x300px si non spécifiées
- Création automatique du répertoire de sortie
Exemple d'intégration :
// Dans un handler d'upload d'image
func (h *ImageHandler) UploadImage(c *gin.Context) {
// ... upload du fichier original ...
// Enqueue thumbnail generation
if h.jobWorker != nil {
thumbnailPath := filepath.Join("thumbnails", filepath.Base(originalPath))
h.jobWorker.EnqueueThumbnailJob(originalPath, thumbnailPath, 300, 300)
}
}
3. AnalyticsEventJob
Fichier : internal/workers/analytics_job.go
Description : Enregistre des événements analytics génériques dans la table analytics_events.
Note : Ne pas confondre avec AnalyticsJob dans playback_analytics_worker.go qui est spécifique aux analytics de lecture.
Utilisation :
// Événement avec userID
userID := uuid.New()
jobWorker.EnqueueAnalyticsJob(
"track_play",
&userID,
map[string]interface{}{
"track_id": trackID.String(),
"duration": 120,
"device": "web",
},
)
// Événement anonyme
jobWorker.EnqueueAnalyticsJob(
"page_view",
nil, // Pas de userID
map[string]interface{}{
"path": "/tracks",
"referrer": "https://google.com",
},
)
Table de base de données :
CREATE TABLE analytics_events (
id UUID PRIMARY KEY,
event_name VARCHAR(100) NOT NULL,
user_id UUID REFERENCES users(id),
payload JSONB NOT NULL,
created_at TIMESTAMPTZ NOT NULL
);
Indexes :
idx_analytics_events_name: Surevent_nameidx_analytics_events_user_id: Suruser_id(partiel, WHERE user_id IS NOT NULL)idx_analytics_events_created_at: Surcreated_at DESCidx_analytics_events_payload_gin: GIN index surpayload(JSONB)
Exemple d'intégration :
// Dans un handler de lecture de track
func (h *TrackHandler) PlayTrack(c *gin.Context) {
trackID := c.Param("id")
userID := c.MustGet("user_id").(uuid.UUID)
// ... logique de lecture ...
// Enqueue analytics event
if h.jobWorker != nil {
h.jobWorker.EnqueueAnalyticsJob(
"track_play",
&userID,
map[string]interface{}{
"track_id": trackID,
"timestamp": time.Now().Unix(),
},
)
}
}
API et Utilisation
Initialisation
Le JobWorker est initialisé automatiquement dans config.go :
config.JobWorker = workers.NewJobWorker(
config.Database.GormDB,
jobService,
logger,
100, // queueSize
3, // workers
3, // maxRetries
config.EmailSender,
)
Et démarré dans cmd/api/main.go :
if cfg.JobWorker != nil {
workerCtx, workerCancel := context.WithCancel(context.Background())
defer workerCancel()
cfg.JobWorker.Start(workerCtx)
logger.Info("✅ Job Worker démarré")
}
Méthodes Publiques
Enqueue
// Ajouter un job à la queue
jobWorker.Enqueue(job Job)
Helpers par Type
// Email
jobWorker.EnqueueEmailJob(to, subject, body string)
jobWorker.EnqueueEmailJobWithTemplate(to, subject, templateName string, templateData map[string]interface{})
// Thumbnail
jobWorker.EnqueueThumbnailJob(inputPath, outputPath string, width, height int)
// Analytics
jobWorker.EnqueueAnalyticsJob(eventName string, userID *uuid.UUID, payload map[string]interface{})
Statistiques
stats := jobWorker.GetStats()
// Retourne : queue_size, workers, max_retries
Configuration
Variables d'Environnement
SMTP (pour EmailJob)
# Production
SMTP_HOST=smtp.gmail.com
SMTP_PORT=587
SMTP_USERNAME=your-email@gmail.com
SMTP_PASSWORD=your-app-password
SMTP_FROM=noreply@veza.app
SMTP_FROM_NAME=Veza
# Développement (MailHog)
MAILHOG_HOST=localhost
MAILHOG_PORT=1025
Job Worker
# Optionnel - valeurs par défaut utilisées si non définies
JOB_WORKER_QUEUE_SIZE=100
JOB_WORKER_WORKERS=3
JOB_WORKER_MAX_RETRIES=3
Configuration dans Code
Modifier internal/config/config.go pour ajuster les paramètres :
config.JobWorker = workers.NewJobWorker(
config.Database.GormDB,
jobService,
logger,
queueSize, // Taille de la queue
workers, // Nombre de workers
maxRetries, // Nombre max de retries
config.EmailSender,
)
Tests
Tests Unitaires
# Tous les tests workers
go test ./internal/workers/... -v
# Tests spécifiques
go test ./internal/workers/thumbnail_job_test.go -v
go test ./internal/workers/analytics_job_test.go -v
go test ./internal/workers/email_job_test.go -v
Tests d'Intégration
Pour tester le système complet :
- Email : Démarrer MailHog et vérifier la réception
- Thumbnail : Uploader une image et vérifier la génération
- Analytics : Déclencher un événement et vérifier la table DB
Monitoring et Observabilité
Logs
Le JobWorker log tous les événements importants :
INFO Job worker started workers=3
INFO Processing job job_id=... job_type=email worker_id=0
INFO Email job executed successfully to=user@example.com
ERROR Job execution failed job_id=... error=...
INFO Retrying job new_retries=1
Métriques (Futur)
Les métriques Prometheus peuvent être ajoutées pour :
- Nombre de jobs enqueue
- Taux de succès/échec par type
- Temps d'exécution moyen
- Taille de la queue
Guide d'Intégration
Ajouter un Nouveau Type de Job
- Créer le fichier job :
internal/workers/my_job.go
package workers
type MyJob struct {
Field1 string
Field2 int
}
func (j *MyJob) Execute(ctx context.Context, logger *zap.Logger) error {
// Implémentation
return nil
}
- Ajouter le handler dans
job_worker.go:
func (w *JobWorker) executeJob(ctx context.Context, job Job) error {
switch job.Type {
case "email":
return w.processEmailJob(ctx, job)
case "thumbnail":
return w.processThumbnailJob(ctx, job)
case "analytics":
return w.processAnalyticsJob(ctx, job)
case "my_job": // Nouveau
return w.processMyJob(ctx, job)
default:
return fmt.Errorf("unknown job type: %s", job.Type)
}
}
func (w *JobWorker) processMyJob(ctx context.Context, job Job) error {
// Extraire payload
field1, _ := job.Payload["field1"].(string)
// Créer et exécuter
myJob := NewMyJob(field1, ...)
return myJob.Execute(ctx, w.logger)
}
- Ajouter un helper d'enqueue :
func (w *JobWorker) EnqueueMyJob(field1 string, field2 int) {
job := Job{
Type: "my_job",
Priority: 2,
Payload: map[string]interface{}{
"field1": field1,
"field2": field2,
},
}
w.Enqueue(job)
}
Intégrer dans un Handler
type MyHandler struct {
jobWorker *workers.JobWorker
// ... autres champs
}
func (h *MyHandler) MyAction(c *gin.Context) {
// ... logique métier ...
// Enqueue job asynchrone
if h.jobWorker != nil {
h.jobWorker.EnqueueMyJob("value1", 42)
}
}
Troubleshooting
Problèmes Courants
1. Jobs non exécutés
Symptôme : Les jobs restent dans la queue sans être traités.
Solutions :
- Vérifier que
JobWorker.Start()est appelé dansmain.go - Vérifier les logs pour erreurs de workers
- Vérifier que la queue n'est pas pleine (
GetStats())
2. Emails non envoyés
Symptôme : Les jobs email sont enqueue mais aucun email n'est reçu.
Solutions :
- Vérifier la configuration SMTP
- Vérifier les logs pour erreurs SMTP
- En dev, vérifier que MailHog est démarré
3. Thumbnails non générés
Symptôme : Les jobs thumbnail échouent.
Solutions :
- Vérifier que le fichier source existe
- Vérifier les permissions d'écriture sur le répertoire de sortie
- Vérifier que le format d'image est supporté
4. Analytics non enregistrés
Symptôme : Les événements analytics ne sont pas dans la DB.
Solutions :
- Vérifier que la migration
043_analytics_events.sqlest appliquée - Vérifier les logs pour erreurs DB
- Vérifier la connexion DB
Logs de Debug
Activer les logs de debug :
logger, _ := zap.NewDevelopment()
Limitations Actuelles
- Queue in-memory : Jobs perdus au redémarrage
- Pas de dead-letter queue : Échecs définitifs juste loggés
- Pas de priorités dynamiques : Priorité fixée à l'enqueue
- Pas de métriques Prometheus : À implémenter
Ces limitations sont acceptables pour P1 et peuvent être adressées en P2.
Roadmap Future (P2)
- Queue persistante (Redis, RabbitMQ)
- Dead letter queue
- Métriques Prometheus
- Dashboard de monitoring
- Rate limiting par type de job
- Support de jobs récurrents (cron)
Documentation mise à jour le : 2025-12-05
Auteur : Veza Backend Team