[BE-SVC-015] be-svc: Implement logging aggregation

- Added HTTP writer for centralized log collection (Loki-compatible)
- Created AggregationConfig with batch processing and flush intervals
- Integrated with existing zap logger using multi-core approach
- Added environment variables for configuration (LOG_AGGREGATION_ENABLED, LOG_AGGREGATION_ENDPOINT, etc.)
- Added unit tests for aggregation functionality
- Updated config.go to initialize logger with aggregation if enabled

Phase: PHASE-6
Priority: P2
Progress: 111/267 (41.57%)
This commit is contained in:
senke 2025-12-24 16:58:58 +01:00
parent 76e95194de
commit a553286eec
4 changed files with 574 additions and 6 deletions

View file

@ -4082,7 +4082,7 @@
"description": "Add centralized logging with structured logs",
"owner": "backend",
"estimated_hours": 4,
"status": "todo",
"status": "completed",
"files_involved": [],
"implementation_steps": [
{
@ -4103,7 +4103,19 @@
"Unit tests",
"Integration tests"
],
"notes": ""
"notes": "",
"completion": {
"completed_at": "2025-12-24T15:58:55.636919+00:00",
"actual_hours": 2.0,
"commits": [],
"files_changed": [
"veza-backend-api/internal/logging/logger_aggregation.go",
"veza-backend-api/internal/logging/logger_aggregation_test.go",
"veza-backend-api/internal/config/config.go"
],
"notes": "Implemented logging aggregation with HTTP writer for centralized log collection (Loki-compatible). Added configuration via environment variables and integration with existing logger.",
"issues_encountered": []
}
},
{
"id": "BE-SVC-016",
@ -10999,11 +11011,11 @@
]
},
"progress_tracking": {
"completed": 70,
"completed": 111,
"in_progress": 0,
"todo": 258,
"todo": 156,
"blocked": 0,
"last_updated": "2025-12-24T14:38:52.503073",
"completion_percentage": 3.3707865168539324
"last_updated": "2025-12-24T15:58:55.636957+00:00",
"completion_percentage": 41.57303370786517
}
}

View file

@ -12,6 +12,7 @@ import (
"veza-backend-api/internal/database"
"veza-backend-api/internal/email"
"veza-backend-api/internal/eventbus" // Import the eventbus package
"veza-backend-api/internal/logging"
"veza-backend-api/internal/metrics"
"veza-backend-api/internal/middleware"
"veza-backend-api/internal/repositories"
@ -100,6 +101,14 @@ type Config struct {
DBRetryInterval time.Duration
MaxConcurrentUploads int // MOD-P2-005: Limite uploads simultanés (backpressure)
// Log Aggregation (BE-SVC-015)
LogAggregationEnabled bool // Activer l'agrégation de logs
LogAggregationEndpoint string // URL du service d'agrégation (ex: "http://loki:3100/loki/api/v1/push")
LogAggregationBatchSize int // Nombre de logs à accumuler avant envoi
LogAggregationFlushInterval time.Duration // Intervalle de flush automatique
LogAggregationTimeout time.Duration // Timeout pour les requêtes HTTP
LogAggregationLabels map[string]string // Labels statiques pour les logs
// RabbitMQ
RabbitMQEventBus *eventbus.RabbitMQEventBus // Ajout de l'instance de l'EventBus
RabbitMQURL string
@ -206,6 +215,14 @@ func NewConfig() (*Config, error) {
DBRetryInterval: getEnvDuration("DB_RETRY_INTERVAL", 5*time.Second), // 5 secondes par défaut
MaxConcurrentUploads: maxConcurrentUploads, // MOD-P2-005: Limite uploads simultanés
// Log Aggregation Configuration (BE-SVC-015)
LogAggregationEnabled: getEnvBool("LOG_AGGREGATION_ENABLED", false), // Désactivé par défaut
LogAggregationEndpoint: getEnv("LOG_AGGREGATION_ENDPOINT", ""), // Ex: "http://loki:3100/loki/api/v1/push"
LogAggregationBatchSize: getEnvInt("LOG_AGGREGATION_BATCH_SIZE", 100), // 100 logs par batch
LogAggregationFlushInterval: getEnvDuration("LOG_AGGREGATION_FLUSH_INTERVAL", 5*time.Second), // Flush toutes les 5 secondes
LogAggregationTimeout: getEnvDuration("LOG_AGGREGATION_TIMEOUT", 10*time.Second), // Timeout de 10 secondes
LogAggregationLabels: parseLogAggregationLabels(getEnv("LOG_AGGREGATION_LABELS", "")), // Labels au format "key1=value1,key2=value2"
// Configuration RabbitMQ
// BE-SEC-014: In production, require RABBITMQ_URL to be set (no default with credentials)
RabbitMQURL: rabbitMQURL,
@ -218,6 +235,43 @@ func NewConfig() (*Config, error) {
secretKeys := DefaultSecretKeys()
config.SecretsProvider = NewEnvSecretsProvider(secretKeys)
// BE-SVC-015: Réinitialiser le logger avec agrégation si activée
if config.LogAggregationEnabled && config.LogAggregationEndpoint != "" {
aggConfig := &logging.AggregationConfig{
EndpointURL: config.LogAggregationEndpoint,
Enabled: true,
BatchSize: config.LogAggregationBatchSize,
FlushInterval: config.LogAggregationFlushInterval,
Timeout: config.LogAggregationTimeout,
Labels: config.LogAggregationLabels,
}
// Ajouter des labels par défaut si non définis
if aggConfig.Labels == nil {
aggConfig.Labels = make(map[string]string)
}
if _, exists := aggConfig.Labels["service"]; !exists {
aggConfig.Labels["service"] = "veza-api"
}
if _, exists := aggConfig.Labels["env"]; !exists {
aggConfig.Labels["env"] = env
}
aggLogger, err := logging.NewLoggerWithAggregation(env, logLevel, aggConfig)
if err != nil {
logger.Warn("Failed to initialize logger with aggregation, falling back to standard logger",
zap.Error(err),
zap.String("endpoint", config.LogAggregationEndpoint),
)
} else {
// Remplacer le logger temporaire par le logger avec agrégation
config.Logger = aggLogger.GetZapLogger()
logger.Info("Logger with aggregation initialized",
zap.String("endpoint", config.LogAggregationEndpoint),
zap.Int("batch_size", config.LogAggregationBatchSize),
)
}
}
// SECURITY: Valider la configuration selon l'environnement (P0-SECURITY)
if err := config.ValidateForEnvironment(); err != nil {
logger.Error("Configuration validation failed", zap.Error(err), zap.String("env", env))
@ -753,6 +807,36 @@ func getEnvStringSlice(key string, defaultValue []string) []string {
return defaultValue
}
// parseLogAggregationLabels parse les labels d'agrégation de logs depuis une chaîne
// Format attendu: "key1=value1,key2=value2" (séparés par des virgules, key=value par paire)
func parseLogAggregationLabels(value string) map[string]string {
labels := make(map[string]string)
if value == "" {
return labels
}
// Séparer par virgule
pairs := strings.Split(value, ",")
for _, pair := range pairs {
pair = strings.TrimSpace(pair)
if pair == "" {
continue
}
// Séparer key=value
parts := strings.SplitN(pair, "=", 2)
if len(parts) == 2 {
key := strings.TrimSpace(parts[0])
val := strings.TrimSpace(parts[1])
if key != "" && val != "" {
labels[key] = val
}
}
}
return labels
}
// getCORSOrigins charge les origines CORS avec defaults sécurisés selon l'environnement (P0-SECURITY)
// - development: defaults permissifs (localhost uniquement) si CORS_ALLOWED_ORIGINS non défini
// - test: liste vide ou configurée explicitement

View file

@ -0,0 +1,376 @@
package logging
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"os"
"sync"
"time"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)
// AggregationConfig contient la configuration pour l'agrégation de logs
type AggregationConfig struct {
// EndpointURL est l'URL du service d'agrégation (ex: "http://loki:3100/loki/api/v1/push")
EndpointURL string
// Enabled active ou désactive l'agrégation
Enabled bool
// BatchSize est le nombre de logs à accumuler avant envoi
BatchSize int
// FlushInterval est l'intervalle de flush automatique
FlushInterval time.Duration
// Timeout est le timeout pour les requêtes HTTP
Timeout time.Duration
// Labels sont les labels statiques à ajouter à chaque log (ex: {"service": "veza-api", "env": "production"})
Labels map[string]string
// HTTPClient est le client HTTP à utiliser (optionnel, un nouveau sera créé si nil)
HTTPClient *http.Client
}
// DefaultAggregationConfig retourne une configuration par défaut
func DefaultAggregationConfig() *AggregationConfig {
return &AggregationConfig{
Enabled: false,
BatchSize: 100,
FlushInterval: 5 * time.Second,
Timeout: 10 * time.Second,
Labels: make(map[string]string),
HTTPClient: nil,
}
}
// aggregationWriter implémente un writer qui envoie les logs vers un service d'agrégation
type aggregationWriter struct {
config *AggregationConfig
client *http.Client
buffer []logEntry
bufferMutex sync.Mutex
flushTicker *time.Ticker
done chan struct{}
wg sync.WaitGroup
}
// logEntry représente une entrée de log pour l'agrégation
type logEntry struct {
Timestamp time.Time `json:"timestamp"`
Level string `json:"level"`
Message string `json:"message"`
Fields map[string]interface{} `json:"fields,omitempty"`
Labels map[string]string `json:"labels,omitempty"`
}
// NewAggregationWriter crée un nouveau writer pour l'agrégation de logs
// Le writer envoie les logs vers un endpoint HTTP (compatible avec Loki, Elasticsearch, etc.)
func NewAggregationWriter(config *AggregationConfig) (io.Writer, error) {
if !config.Enabled || config.EndpointURL == "" {
return nil, fmt.Errorf("aggregation is disabled or endpoint URL is empty")
}
// Créer le client HTTP si non fourni
client := config.HTTPClient
if client == nil {
client = &http.Client{
Timeout: config.Timeout,
}
}
aw := &aggregationWriter{
config: config,
client: client,
buffer: make([]logEntry, 0, config.BatchSize),
flushTicker: time.NewTicker(config.FlushInterval),
done: make(chan struct{}),
}
// Démarrer la goroutine de flush périodique
aw.wg.Add(1)
go aw.flushRoutine()
return aw, nil
}
// Write implémente io.Writer - parse le JSON et l'ajoute au buffer
func (aw *aggregationWriter) Write(p []byte) (n int, err error) {
// Parser le JSON du log zap
var logData map[string]interface{}
if err := json.Unmarshal(p, &logData); err != nil {
// Si ce n'est pas du JSON valide, créer une entrée simple
aw.addEntry(logEntry{
Timestamp: time.Now(),
Level: "INFO",
Message: string(p),
Fields: make(map[string]interface{}),
})
return len(p), nil
}
// Extraire les champs du log zap
entry := logEntry{
Timestamp: time.Now(),
Level: "INFO",
Message: "",
Fields: make(map[string]interface{}),
Labels: make(map[string]string),
}
// Copier les labels statiques
for k, v := range aw.config.Labels {
entry.Labels[k] = v
}
// Extraire les champs communs
if msg, ok := logData["msg"].(string); ok {
entry.Message = msg
}
if level, ok := logData["level"].(string); ok {
entry.Level = level
} else if levelNum, ok := logData["level"].(float64); ok {
// Convertir le niveau numérique de zap en string
entry.Level = zapcore.Level(int(levelNum)).String()
}
// Extraire le timestamp si présent
if ts, ok := logData["ts"].(float64); ok {
entry.Timestamp = time.Unix(0, int64(ts*1e9))
}
// Copier tous les autres champs dans Fields
for k, v := range logData {
if k != "msg" && k != "level" && k != "ts" {
entry.Fields[k] = v
}
}
aw.addEntry(entry)
return len(p), nil
}
// addEntry ajoute une entrée au buffer et flush si nécessaire
func (aw *aggregationWriter) addEntry(entry logEntry) {
aw.bufferMutex.Lock()
defer aw.bufferMutex.Unlock()
aw.buffer = append(aw.buffer, entry)
// Flush si le buffer est plein
if len(aw.buffer) >= aw.config.BatchSize {
go aw.flush()
}
}
// flush envoie les logs du buffer vers l'endpoint d'agrégation
func (aw *aggregationWriter) flush() error {
aw.bufferMutex.Lock()
if len(aw.buffer) == 0 {
aw.bufferMutex.Unlock()
return nil
}
// Copier le buffer et le vider
entries := make([]logEntry, len(aw.buffer))
copy(entries, aw.buffer)
aw.buffer = aw.buffer[:0]
aw.bufferMutex.Unlock()
// Envoyer vers l'endpoint
return aw.sendLogs(entries)
}
// sendLogs envoie les logs vers l'endpoint d'agrégation
// Format compatible avec Loki Push API
func (aw *aggregationWriter) sendLogs(entries []logEntry) error {
if len(entries) == 0 {
return nil
}
// Format Loki Push API
// https://grafana.com/docs/loki/latest/api/#push-log-entries-to-loki
type stream struct {
Stream map[string]string `json:"stream"`
Values [][]string `json:"values"`
}
type pushPayload struct {
Streams []stream `json:"streams"`
}
// Grouper les logs par labels (streams)
streamsMap := make(map[string]*stream)
for _, entry := range entries {
// Créer une clé unique pour les labels
streamKey := ""
for k, v := range entry.Labels {
streamKey += fmt.Sprintf("%s=%s,", k, v)
}
if streamKey == "" {
streamKey = "default"
}
// Récupérer ou créer le stream
s, exists := streamsMap[streamKey]
if !exists {
s = &stream{
Stream: entry.Labels,
Values: make([][]string, 0),
}
streamsMap[streamKey] = s
}
// Construire le message de log
logLine := entry.Message
if len(entry.Fields) > 0 {
fieldsJSON, _ := json.Marshal(entry.Fields)
logLine += " " + string(fieldsJSON)
}
// Format: [timestamp_nanoseconds, log_line]
timestamp := fmt.Sprintf("%d", entry.Timestamp.UnixNano())
s.Values = append(s.Values, []string{timestamp, logLine})
}
// Convertir en slice
streams := make([]stream, 0, len(streamsMap))
for _, s := range streamsMap {
streams = append(streams, *s)
}
payload := pushPayload{
Streams: streams,
}
// Sérialiser en JSON
jsonData, err := json.Marshal(payload)
if err != nil {
return fmt.Errorf("failed to marshal logs: %w", err)
}
// Envoyer la requête HTTP
ctx, cancel := context.WithTimeout(context.Background(), aw.config.Timeout)
defer cancel()
req, err := http.NewRequestWithContext(ctx, "POST", aw.config.EndpointURL, bytes.NewBuffer(jsonData))
if err != nil {
return fmt.Errorf("failed to create request: %w", err)
}
req.Header.Set("Content-Type", "application/json")
resp, err := aw.client.Do(req)
if err != nil {
return fmt.Errorf("failed to send logs: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
body, _ := io.ReadAll(resp.Body)
return fmt.Errorf("aggregation endpoint returned status %d: %s", resp.StatusCode, string(body))
}
return nil
}
// flushRoutine gère le flush périodique
func (aw *aggregationWriter) flushRoutine() {
defer aw.wg.Done()
for {
select {
case <-aw.flushTicker.C:
if err := aw.flush(); err != nil {
// Log l'erreur silencieusement (on ne peut pas logger dans le logger)
// En production, on pourrait utiliser un fallback logger
_ = err
}
case <-aw.done:
// Flush final avant de terminer
aw.flush()
return
}
}
}
// Sync implémente zapcore.WriteSyncer
func (aw *aggregationWriter) Sync() error {
return aw.flush()
}
// Close ferme le writer et flush les données restantes
func (aw *aggregationWriter) Close() error {
close(aw.done)
aw.flushTicker.Stop()
aw.wg.Wait()
return aw.flush()
}
// NewLoggerWithAggregation crée un logger avec agrégation de logs
// Les logs sont envoyés vers un endpoint d'agrégation (ex: Loki) en plus de la sortie standard
func NewLoggerWithAggregation(env, logLevel string, aggConfig *AggregationConfig) (*Logger, error) {
var config zap.Config
if env == "production" {
config = zap.NewProductionConfig()
config.Encoding = "json"
config.EncoderConfig = zap.NewProductionEncoderConfig()
config.EncoderConfig.EncodeTime = zapcore.ISO8601TimeEncoder
} else {
config = zap.NewDevelopmentConfig()
config.Encoding = "console"
config.EncoderConfig = zap.NewDevelopmentEncoderConfig()
config.EncoderConfig.EncodeTime = zapcore.ISO8601TimeEncoder
}
// Configurer le niveau de log
if logLevel == "" {
logLevel = "INFO"
}
level, err := zapcore.ParseLevel(logLevel)
if err != nil {
level = zapcore.InfoLevel
}
config.Level = zap.NewAtomicLevelAt(level)
// Créer les cores
cores := []zapcore.Core{}
// Core pour stdout (toujours présent)
stdoutCore := zapcore.NewCore(
zapcore.NewJSONEncoder(config.EncoderConfig),
zapcore.AddSync(os.Stdout),
level,
)
cores = append(cores, stdoutCore)
// Core pour l'agrégation si activée
if aggConfig != nil && aggConfig.Enabled && aggConfig.EndpointURL != "" {
aggWriter, err := NewAggregationWriter(aggConfig)
if err == nil {
// Utiliser le même encoder JSON pour l'agrégation
aggCore := zapcore.NewCore(
zapcore.NewJSONEncoder(config.EncoderConfig),
zapcore.AddSync(aggWriter),
level,
)
cores = append(cores, aggCore)
}
// Si l'initialisation échoue, on continue sans agrégation
}
// Combiner les cores
core := zapcore.NewTee(cores...)
// Créer le logger
logger := zap.New(core,
zap.AddCaller(),
zap.AddStacktrace(zapcore.ErrorLevel),
)
return &Logger{zap: logger}, nil
}

View file

@ -0,0 +1,96 @@
package logging
import (
"encoding/json"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
)
func TestAggregationConfig_Default(t *testing.T) {
config := DefaultAggregationConfig()
assert.NotNil(t, config)
assert.False(t, config.Enabled)
assert.Equal(t, 100, config.BatchSize)
assert.Equal(t, 5*time.Second, config.FlushInterval)
assert.Equal(t, 10*time.Second, config.Timeout)
assert.NotNil(t, config.Labels)
}
func TestNewLoggerWithAggregation_Disabled(t *testing.T) {
config := DefaultAggregationConfig()
config.Enabled = false
logger, err := NewLoggerWithAggregation("development", "INFO", config)
require.NoError(t, err)
assert.NotNil(t, logger)
// Le logger devrait fonctionner normalement même si l'agrégation est désactivée
logger.Info("Test log message", zap.String("test", "value"))
}
func TestNewLoggerWithAggregation_InvalidEndpoint(t *testing.T) {
config := DefaultAggregationConfig()
config.Enabled = true
config.EndpointURL = "" // URL vide
// Devrait créer un logger sans agrégation si l'endpoint est vide
logger, err := NewLoggerWithAggregation("development", "INFO", config)
require.NoError(t, err)
assert.NotNil(t, logger)
}
func TestLogEntry_MarshalJSON(t *testing.T) {
entry := logEntry{
Timestamp: time.Date(2025, 1, 1, 12, 0, 0, 0, time.UTC),
Level: "INFO",
Message: "Test message",
Fields: map[string]interface{}{
"key1": "value1",
"key2": 42,
},
Labels: map[string]string{
"service": "test",
"env": "development",
},
}
data, err := json.Marshal(entry)
require.NoError(t, err)
assert.Contains(t, string(data), "Test message")
assert.Contains(t, string(data), "INFO")
}
func TestNewLoggerWithAggregation_Production(t *testing.T) {
config := DefaultAggregationConfig()
config.Enabled = false // Désactiver pour le test (pas de serveur réel)
logger, err := NewLoggerWithAggregation("production", "INFO", config)
require.NoError(t, err)
assert.NotNil(t, logger)
// Vérifier que le logger fonctionne
logger.Info("Production log test",
zap.String("component", "test"),
zap.Int("value", 42),
)
}
func TestNewLoggerWithAggregation_Development(t *testing.T) {
config := DefaultAggregationConfig()
config.Enabled = false // Désactiver pour le test
logger, err := NewLoggerWithAggregation("development", "DEBUG", config)
require.NoError(t, err)
assert.NotNil(t, logger)
// Vérifier que le logger fonctionne en mode développement
logger.Debug("Debug log test",
zap.String("component", "test"),
)
}