veza/veza-backend-api/docs/JOB_WORKER_SYSTEM.md
okinrev b7955a680c P0: stabilisation backend/chat/stream + nouvelle base migrations v1
Backend Go:
- Remplacement complet des anciennes migrations par la base V1 alignée sur ORIGIN.
- Durcissement global du parsing JSON (BindAndValidateJSON + RespondWithAppError).
- Sécurisation de config.go, CORS, statuts de santé et monitoring.
- Implémentation des transactions P0 (RBAC, duplication de playlists, social toggles).
- Ajout d’un job worker structuré (emails, analytics, thumbnails) + tests associés.
- Nouvelle doc backend : AUDIT_CONFIG, BACKEND_CONFIG, AUTH_PASSWORD_RESET, JOB_WORKER_*.

Chat server (Rust):
- Refonte du pipeline JWT + sécurité, audit et rate limiting avancé.
- Implémentation complète du cycle de message (read receipts, delivered, edit/delete, typing).
- Nettoyage des panics, gestion d’erreurs robuste, logs structurés.
- Migrations chat alignées sur le schéma UUID et nouvelles features.

Stream server (Rust):
- Refonte du moteur de streaming (encoding pipeline + HLS) et des modules core.
- Transactions P0 pour les jobs et segments, garanties d’atomicité.
- Documentation détaillée de la pipeline (AUDIT_STREAM_*, DESIGN_STREAM_PIPELINE, TRANSACTIONS_P0_IMPLEMENTATION).

Documentation & audits:
- TRIAGE.md et AUDIT_STABILITY.md à jour avec l’état réel des 3 services.
- Cartographie complète des migrations et des transactions (DB_MIGRATIONS_*, DB_TRANSACTION_PLAN, AUDIT_DB_TRANSACTIONS, TRANSACTION_TESTS_PHASE3).
- Scripts de reset et de cleanup pour la lab DB et la V1.

Ce commit fige l’ensemble du travail de stabilisation P0 (UUID, backend, chat et stream) avant les phases suivantes (Coherence Guardian, WS hardening, etc.).
2025-12-06 11:14:38 +01:00

592 lines
14 KiB
Markdown

# Job Worker System - Documentation Complète
**Date** : 2025-12-05
**Version** : 1.0
**Statut** : ✅ **IMPLÉMENTÉ**
## Table des Matières
1. [Vue d'ensemble](#vue-densemble)
2. [Architecture](#architecture)
3. [Types de Jobs](#types-de-jobs)
4. [API et Utilisation](#api-et-utilisation)
5. [Configuration](#configuration)
6. [Tests](#tests)
7. [Monitoring et Observabilité](#monitoring-et-observabilité)
8. [Guide d'Intégration](#guide-dintégration)
9. [Troubleshooting](#troubleshooting)
---
## Vue d'ensemble
Le système de Job Worker de Veza permet d'exécuter des tâches asynchrones en arrière-plan, garantissant que les opérations longues ou non critiques n'impactent pas la performance de l'API.
### Fonctionnalités
-**Queue in-memory** avec workers pool
-**Retry automatique** avec exponential backoff
-**Support de plusieurs types de jobs** (Email, Thumbnail, Analytics)
-**Logging structuré** avec zap
-**Gestion d'erreurs robuste**
-**Priorités de jobs** (1 = haut, 2 = moyen, 3 = bas)
### Types de Jobs Implémentés
1. **EmailJob** : Envoi d'emails transactionnels via SMTP
2. **ThumbnailJob** : Génération de thumbnails d'images
3. **AnalyticsEventJob** : Enregistrement d'événements analytics génériques
---
## Architecture
### Schéma Global
```
┌─────────────────┐
│ API Handler │
│ (Gin Handler) │
└────────┬────────┘
│ EnqueueJob()
┌─────────────────┐
│ JobWorker │
│ │
│ - Queue (chan) │
│ - Workers (N) │
│ - Retry Logic │
└────────┬────────┘
│ Dispatch by Type
┌─────────────────┬─────────────────┬──────────────────────┐
│ EmailJob │ ThumbnailJob │ AnalyticsEventJob │
│ │ │ │
│ - Render │ - Resize │ - Store │
│ Template │ Image │ Event │
│ - Send SMTP │ - Save File │ - JSON Payload │
└─────────────────┴─────────────────┴─────────────────┘
```
### Composants Principaux
#### 1. JobWorker (`internal/workers/job_worker.go`)
Structure centrale qui gère :
- La queue de jobs (`chan Job`)
- Le pool de workers
- Le dispatch par type
- La logique de retry
```go
type JobWorker struct {
db *gorm.DB
jobService *services.JobService
logger *zap.Logger
queue chan Job
maxRetries int
processingWorkers int
emailSender email.EmailSender
}
```
#### 2. Job Interface
Tous les jobs implémentent l'interface `Job` :
```go
type Job struct {
ID uuid.UUID
Type string // "email", "thumbnail", "analytics"
Payload map[string]interface{}
Retries int
CreatedAt time.Time
Priority int // 1 = haut, 2 = moyen, 3 = bas
}
```
#### 3. Workers Pool
Par défaut, le nombre de workers est configuré à `3` (modifiable dans `config.go`).
Chaque worker :
- Lit depuis la queue
- Exécute le job via `executeJob()`
- Gère les retries en cas d'échec
- Log les résultats
---
## Types de Jobs
### 1. EmailJob
**Fichier** : `internal/workers/email_job.go`
**Description** : Envoie des emails transactionnels via SMTP avec support de templates HTML.
**Utilisation** :
```go
// Email simple
jobWorker.EnqueueEmailJob(
"user@example.com",
"Welcome to Veza",
"<h1>Welcome!</h1><p>Thanks for joining.</p>",
)
// Email avec template
jobWorker.EnqueueEmailJobWithTemplate(
"user@example.com",
"Reset your password",
"password_reset",
map[string]interface{}{
"Username": "john_doe",
"ResetURL": "https://veza.app/reset?token=...",
},
)
```
**Templates disponibles** :
- `templates/email/password_reset.html`
- `templates/email/welcome.html`
**Configuration SMTP** : Variables d'environnement (voir [Configuration](#configuration))
---
### 2. ThumbnailJob
**Fichier** : `internal/workers/thumbnail_job.go`
**Description** : Génère des thumbnails d'images avec redimensionnement et compression.
**Utilisation** :
```go
jobWorker.EnqueueThumbnailJob(
"/uploads/images/original.jpg", // Input path
"/uploads/thumbnails/thumb.jpg", // Output path
300, // Width (px)
300, // Height (px)
)
```
**Caractéristiques** :
- Support formats : JPEG, PNG, GIF, BMP
- Algorithme : Lanczos (haute qualité)
- Dimensions par défaut : 300x300px si non spécifiées
- Création automatique du répertoire de sortie
**Exemple d'intégration** :
```go
// Dans un handler d'upload d'image
func (h *ImageHandler) UploadImage(c *gin.Context) {
// ... upload du fichier original ...
// Enqueue thumbnail generation
if h.jobWorker != nil {
thumbnailPath := filepath.Join("thumbnails", filepath.Base(originalPath))
h.jobWorker.EnqueueThumbnailJob(originalPath, thumbnailPath, 300, 300)
}
}
```
---
### 3. AnalyticsEventJob
**Fichier** : `internal/workers/analytics_job.go`
**Description** : Enregistre des événements analytics génériques dans la table `analytics_events`.
**Note** : Ne pas confondre avec `AnalyticsJob` dans `playback_analytics_worker.go` qui est spécifique aux analytics de lecture.
**Utilisation** :
```go
// Événement avec userID
userID := uuid.New()
jobWorker.EnqueueAnalyticsJob(
"track_play",
&userID,
map[string]interface{}{
"track_id": trackID.String(),
"duration": 120,
"device": "web",
},
)
// Événement anonyme
jobWorker.EnqueueAnalyticsJob(
"page_view",
nil, // Pas de userID
map[string]interface{}{
"path": "/tracks",
"referrer": "https://google.com",
},
)
```
**Table de base de données** :
```sql
CREATE TABLE analytics_events (
id UUID PRIMARY KEY,
event_name VARCHAR(100) NOT NULL,
user_id UUID REFERENCES users(id),
payload JSONB NOT NULL,
created_at TIMESTAMPTZ NOT NULL
);
```
**Indexes** :
- `idx_analytics_events_name` : Sur `event_name`
- `idx_analytics_events_user_id` : Sur `user_id` (partiel, WHERE user_id IS NOT NULL)
- `idx_analytics_events_created_at` : Sur `created_at DESC`
- `idx_analytics_events_payload_gin` : GIN index sur `payload` (JSONB)
**Exemple d'intégration** :
```go
// Dans un handler de lecture de track
func (h *TrackHandler) PlayTrack(c *gin.Context) {
trackID := c.Param("id")
userID := c.MustGet("user_id").(uuid.UUID)
// ... logique de lecture ...
// Enqueue analytics event
if h.jobWorker != nil {
h.jobWorker.EnqueueAnalyticsJob(
"track_play",
&userID,
map[string]interface{}{
"track_id": trackID,
"timestamp": time.Now().Unix(),
},
)
}
}
```
---
## API et Utilisation
### Initialisation
Le JobWorker est initialisé automatiquement dans `config.go` :
```go
config.JobWorker = workers.NewJobWorker(
config.Database.GormDB,
jobService,
logger,
100, // queueSize
3, // workers
3, // maxRetries
config.EmailSender,
)
```
Et démarré dans `cmd/api/main.go` :
```go
if cfg.JobWorker != nil {
workerCtx, workerCancel := context.WithCancel(context.Background())
defer workerCancel()
cfg.JobWorker.Start(workerCtx)
logger.Info("✅ Job Worker démarré")
}
```
### Méthodes Publiques
#### Enqueue
```go
// Ajouter un job à la queue
jobWorker.Enqueue(job Job)
```
#### Helpers par Type
```go
// Email
jobWorker.EnqueueEmailJob(to, subject, body string)
jobWorker.EnqueueEmailJobWithTemplate(to, subject, templateName string, templateData map[string]interface{})
// Thumbnail
jobWorker.EnqueueThumbnailJob(inputPath, outputPath string, width, height int)
// Analytics
jobWorker.EnqueueAnalyticsJob(eventName string, userID *uuid.UUID, payload map[string]interface{})
```
#### Statistiques
```go
stats := jobWorker.GetStats()
// Retourne : queue_size, workers, max_retries
```
---
## Configuration
### Variables d'Environnement
#### SMTP (pour EmailJob)
```bash
# Production
SMTP_HOST=smtp.gmail.com
SMTP_PORT=587
SMTP_USERNAME=your-email@gmail.com
SMTP_PASSWORD=your-app-password
SMTP_FROM=noreply@veza.app
SMTP_FROM_NAME=Veza
# Développement (MailHog)
MAILHOG_HOST=localhost
MAILHOG_PORT=1025
```
#### Job Worker
```bash
# Optionnel - valeurs par défaut utilisées si non définies
JOB_WORKER_QUEUE_SIZE=100
JOB_WORKER_WORKERS=3
JOB_WORKER_MAX_RETRIES=3
```
### Configuration dans Code
Modifier `internal/config/config.go` pour ajuster les paramètres :
```go
config.JobWorker = workers.NewJobWorker(
config.Database.GormDB,
jobService,
logger,
queueSize, // Taille de la queue
workers, // Nombre de workers
maxRetries, // Nombre max de retries
config.EmailSender,
)
```
---
## Tests
### Tests Unitaires
```bash
# Tous les tests workers
go test ./internal/workers/... -v
# Tests spécifiques
go test ./internal/workers/thumbnail_job_test.go -v
go test ./internal/workers/analytics_job_test.go -v
go test ./internal/workers/email_job_test.go -v
```
### Tests d'Intégration
Pour tester le système complet :
1. **Email** : Démarrer MailHog et vérifier la réception
2. **Thumbnail** : Uploader une image et vérifier la génération
3. **Analytics** : Déclencher un événement et vérifier la table DB
---
## Monitoring et Observabilité
### Logs
Le JobWorker log tous les événements importants :
```
INFO Job worker started workers=3
INFO Processing job job_id=... job_type=email worker_id=0
INFO Email job executed successfully to=user@example.com
ERROR Job execution failed job_id=... error=...
INFO Retrying job new_retries=1
```
### Métriques (Futur)
Les métriques Prometheus peuvent être ajoutées pour :
- Nombre de jobs enqueue
- Taux de succès/échec par type
- Temps d'exécution moyen
- Taille de la queue
---
## Guide d'Intégration
### Ajouter un Nouveau Type de Job
1. **Créer le fichier job** : `internal/workers/my_job.go`
```go
package workers
type MyJob struct {
Field1 string
Field2 int
}
func (j *MyJob) Execute(ctx context.Context, logger *zap.Logger) error {
// Implémentation
return nil
}
```
2. **Ajouter le handler dans `job_worker.go`** :
```go
func (w *JobWorker) executeJob(ctx context.Context, job Job) error {
switch job.Type {
case "email":
return w.processEmailJob(ctx, job)
case "thumbnail":
return w.processThumbnailJob(ctx, job)
case "analytics":
return w.processAnalyticsJob(ctx, job)
case "my_job": // Nouveau
return w.processMyJob(ctx, job)
default:
return fmt.Errorf("unknown job type: %s", job.Type)
}
}
func (w *JobWorker) processMyJob(ctx context.Context, job Job) error {
// Extraire payload
field1, _ := job.Payload["field1"].(string)
// Créer et exécuter
myJob := NewMyJob(field1, ...)
return myJob.Execute(ctx, w.logger)
}
```
3. **Ajouter un helper d'enqueue** :
```go
func (w *JobWorker) EnqueueMyJob(field1 string, field2 int) {
job := Job{
Type: "my_job",
Priority: 2,
Payload: map[string]interface{}{
"field1": field1,
"field2": field2,
},
}
w.Enqueue(job)
}
```
### Intégrer dans un Handler
```go
type MyHandler struct {
jobWorker *workers.JobWorker
// ... autres champs
}
func (h *MyHandler) MyAction(c *gin.Context) {
// ... logique métier ...
// Enqueue job asynchrone
if h.jobWorker != nil {
h.jobWorker.EnqueueMyJob("value1", 42)
}
}
```
---
## Troubleshooting
### Problèmes Courants
#### 1. Jobs non exécutés
**Symptôme** : Les jobs restent dans la queue sans être traités.
**Solutions** :
- Vérifier que `JobWorker.Start()` est appelé dans `main.go`
- Vérifier les logs pour erreurs de workers
- Vérifier que la queue n'est pas pleine (`GetStats()`)
#### 2. Emails non envoyés
**Symptôme** : Les jobs email sont enqueue mais aucun email n'est reçu.
**Solutions** :
- Vérifier la configuration SMTP
- Vérifier les logs pour erreurs SMTP
- En dev, vérifier que MailHog est démarré
#### 3. Thumbnails non générés
**Symptôme** : Les jobs thumbnail échouent.
**Solutions** :
- Vérifier que le fichier source existe
- Vérifier les permissions d'écriture sur le répertoire de sortie
- Vérifier que le format d'image est supporté
#### 4. Analytics non enregistrés
**Symptôme** : Les événements analytics ne sont pas dans la DB.
**Solutions** :
- Vérifier que la migration `043_analytics_events.sql` est appliquée
- Vérifier les logs pour erreurs DB
- Vérifier la connexion DB
### Logs de Debug
Activer les logs de debug :
```go
logger, _ := zap.NewDevelopment()
```
---
## Limitations Actuelles
1. **Queue in-memory** : Jobs perdus au redémarrage
2. **Pas de dead-letter queue** : Échecs définitifs juste loggés
3. **Pas de priorités dynamiques** : Priorité fixée à l'enqueue
4. **Pas de métriques Prometheus** : À implémenter
Ces limitations sont acceptables pour P1 et peuvent être adressées en P2.
---
## Roadmap Future (P2)
- [ ] Queue persistante (Redis, RabbitMQ)
- [ ] Dead letter queue
- [ ] Métriques Prometheus
- [ ] Dashboard de monitoring
- [ ] Rate limiting par type de job
- [ ] Support de jobs récurrents (cron)
---
**Documentation mise à jour le** : 2025-12-05
**Auteur** : Veza Backend Team