veza/veza-backend-api/internal/logging/logger_aggregation.go

388 lines
10 KiB
Go

package logging
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"os"
"sync"
"time"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)
// AggregationConfig contient la configuration pour l'agrégation de logs
type AggregationConfig struct {
// EndpointURL est l'URL du service d'agrégation (ex: "http://loki:3100/loki/api/v1/push")
EndpointURL string
// Enabled active ou désactive l'agrégation
Enabled bool
// BatchSize est le nombre de logs à accumuler avant envoi
BatchSize int
// FlushInterval est l'intervalle de flush automatique
FlushInterval time.Duration
// Timeout est le timeout pour les requêtes HTTP
Timeout time.Duration
// Labels sont les labels statiques à ajouter à chaque log (ex: {"service": "veza-api", "env": "production"})
Labels map[string]string
// HTTPClient est le client HTTP à utiliser (optionnel, un nouveau sera créé si nil)
HTTPClient *http.Client
}
// DefaultAggregationConfig retourne une configuration par défaut
func DefaultAggregationConfig() *AggregationConfig {
return &AggregationConfig{
Enabled: false,
BatchSize: 100,
FlushInterval: 5 * time.Second,
Timeout: 10 * time.Second,
Labels: make(map[string]string),
HTTPClient: nil,
}
}
// aggregationWriter implémente un writer qui envoie les logs vers un service d'agrégation
type aggregationWriter struct {
config *AggregationConfig
client *http.Client
buffer []logEntry
bufferMutex sync.Mutex
flushTicker *time.Ticker
done chan struct{}
wg sync.WaitGroup
}
// logEntry représente une entrée de log pour l'agrégation
type logEntry struct {
Timestamp time.Time `json:"timestamp"`
Level string `json:"level"`
Message string `json:"message"`
Fields map[string]interface{} `json:"fields,omitempty"`
Labels map[string]string `json:"labels,omitempty"`
}
// NewAggregationWriter crée un nouveau writer pour l'agrégation de logs
// Le writer envoie les logs vers un endpoint HTTP (compatible avec Loki, Elasticsearch, etc.)
func NewAggregationWriter(config *AggregationConfig) (io.Writer, error) {
if !config.Enabled || config.EndpointURL == "" {
return nil, fmt.Errorf("aggregation is disabled or endpoint URL is empty")
}
// Créer le client HTTP si non fourni
client := config.HTTPClient
if client == nil {
client = &http.Client{
Timeout: config.Timeout,
}
}
aw := &aggregationWriter{
config: config,
client: client,
buffer: make([]logEntry, 0, config.BatchSize),
flushTicker: time.NewTicker(config.FlushInterval),
done: make(chan struct{}),
}
// Démarrer la goroutine de flush périodique
aw.wg.Add(1)
go aw.flushRoutine()
return aw, nil
}
// Write implémente io.Writer - parse le JSON et l'ajoute au buffer
func (aw *aggregationWriter) Write(p []byte) (n int, err error) {
// Parser le JSON du log zap
var logData map[string]interface{}
if err := json.Unmarshal(p, &logData); err != nil {
// Si ce n'est pas du JSON valide, créer une entrée simple
aw.addEntry(logEntry{
Timestamp: time.Now(),
Level: "INFO",
Message: string(p),
Fields: make(map[string]interface{}),
})
return len(p), nil
}
// Extraire les champs du log zap
entry := logEntry{
Timestamp: time.Now(),
Level: "INFO",
Message: "",
Fields: make(map[string]interface{}),
Labels: make(map[string]string),
}
// Copier les labels statiques
for k, v := range aw.config.Labels {
entry.Labels[k] = v
}
// Extraire les champs communs
if msg, ok := logData["msg"].(string); ok {
entry.Message = msg
}
if level, ok := logData["level"].(string); ok {
entry.Level = level
} else if levelNum, ok := logData["level"].(float64); ok {
// Convertir le niveau numérique de zap en string
entry.Level = zapcore.Level(int(levelNum)).String()
}
// Extraire le timestamp si présent
if ts, ok := logData["ts"].(float64); ok {
entry.Timestamp = time.Unix(0, int64(ts*1e9))
}
// Copier tous les autres champs dans Fields
for k, v := range logData {
if k != "msg" && k != "level" && k != "ts" {
entry.Fields[k] = v
}
}
aw.addEntry(entry)
return len(p), nil
}
// addEntry ajoute une entrée au buffer et flush si nécessaire
func (aw *aggregationWriter) addEntry(entry logEntry) {
aw.bufferMutex.Lock()
defer aw.bufferMutex.Unlock()
aw.buffer = append(aw.buffer, entry)
// Flush si le buffer est plein
if len(aw.buffer) >= aw.config.BatchSize {
go aw.flush()
}
}
// flush envoie les logs du buffer vers l'endpoint d'agrégation
func (aw *aggregationWriter) flush() error {
aw.bufferMutex.Lock()
if len(aw.buffer) == 0 {
aw.bufferMutex.Unlock()
return nil
}
// Copier le buffer et le vider
entries := make([]logEntry, len(aw.buffer))
copy(entries, aw.buffer)
aw.buffer = aw.buffer[:0]
aw.bufferMutex.Unlock()
// Envoyer vers l'endpoint
return aw.sendLogs(entries)
}
// sendLogs envoie les logs vers l'endpoint d'agrégation
// Format compatible avec Loki Push API
func (aw *aggregationWriter) sendLogs(entries []logEntry) error {
if len(entries) == 0 {
return nil
}
// Format Loki Push API
// https://grafana.com/docs/loki/latest/api/#push-log-entries-to-loki
type stream struct {
Stream map[string]string `json:"stream"`
Values [][]string `json:"values"`
}
type pushPayload struct {
Streams []stream `json:"streams"`
}
// Grouper les logs par labels (streams)
streamsMap := make(map[string]*stream)
for _, entry := range entries {
// Créer une clé unique pour les labels
streamKey := ""
for k, v := range entry.Labels {
streamKey += fmt.Sprintf("%s=%s,", k, v)
}
if streamKey == "" {
streamKey = "default"
}
// Récupérer ou créer le stream
s, exists := streamsMap[streamKey]
if !exists {
s = &stream{
Stream: entry.Labels,
Values: make([][]string, 0),
}
streamsMap[streamKey] = s
}
// Construire le message de log
logLine := entry.Message
if len(entry.Fields) > 0 {
fieldsJSON, _ := json.Marshal(entry.Fields)
logLine += " " + string(fieldsJSON)
}
// Format: [timestamp_nanoseconds, log_line]
timestamp := fmt.Sprintf("%d", entry.Timestamp.UnixNano())
s.Values = append(s.Values, []string{timestamp, logLine})
}
// Convertir en slice
streams := make([]stream, 0, len(streamsMap))
for _, s := range streamsMap {
streams = append(streams, *s)
}
payload := pushPayload{
Streams: streams,
}
// Sérialiser en JSON
jsonData, err := json.Marshal(payload)
if err != nil {
return fmt.Errorf("failed to marshal logs: %w", err)
}
// Envoyer la requête HTTP
ctx, cancel := context.WithTimeout(context.Background(), aw.config.Timeout)
defer cancel()
req, err := http.NewRequestWithContext(ctx, "POST", aw.config.EndpointURL, bytes.NewBuffer(jsonData))
if err != nil {
return fmt.Errorf("failed to create request: %w", err)
}
req.Header.Set("Content-Type", "application/json")
resp, err := aw.client.Do(req)
if err != nil {
return fmt.Errorf("failed to send logs: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
body, _ := io.ReadAll(resp.Body)
return fmt.Errorf("aggregation endpoint returned status %d: %s", resp.StatusCode, string(body))
}
return nil
}
// flushRoutine gère le flush périodique
func (aw *aggregationWriter) flushRoutine() {
defer aw.wg.Done()
for {
select {
case <-aw.flushTicker.C:
if err := aw.flush(); err != nil {
// Log l'erreur silencieusement (on ne peut pas logger dans le logger)
// En production, on pourrait utiliser un fallback logger
_ = err
}
case <-aw.done:
// Flush final avant de terminer
aw.flush()
return
}
}
}
// Sync implémente zapcore.WriteSyncer
func (aw *aggregationWriter) Sync() error {
return aw.flush()
}
// Close ferme le writer et flush les données restantes
func (aw *aggregationWriter) Close() error {
close(aw.done)
aw.flushTicker.Stop()
aw.wg.Wait()
return aw.flush()
}
// NewLoggerWithAggregation crée un logger avec agrégation de logs
// Les logs sont envoyés vers un endpoint d'agrégation (ex: Loki) en plus de la sortie standard
func NewLoggerWithAggregation(env, logLevel string, aggConfig *AggregationConfig) (*Logger, error) {
var config zap.Config
// FIX #25: Standardiser sur JSON en production/staging, console en développement
if env == "production" || env == "staging" {
config = zap.NewProductionConfig()
config.Encoding = "json"
config.EncoderConfig = zap.NewProductionEncoderConfig()
config.EncoderConfig.EncodeTime = zapcore.ISO8601TimeEncoder
} else {
config = zap.NewDevelopmentConfig()
config.Encoding = "console"
config.EncoderConfig = zap.NewDevelopmentEncoderConfig()
config.EncoderConfig.EncodeTime = zapcore.ISO8601TimeEncoder
}
// Configurer le niveau de log
if logLevel == "" {
logLevel = "INFO"
}
level, err := zapcore.ParseLevel(logLevel)
if err != nil {
level = zapcore.InfoLevel
}
config.Level = zap.NewAtomicLevelAt(level)
// FIX #28: Ajouter sampling pour éviter spam en cas de haute charge
// Initial: log les 100 premiers messages par seconde
// Thereafter: log 1 message toutes les 100 messages suivants
// Le sampling est particulièrement important pour l'agrégation pour réduire les coûts
if env == "production" || env == "staging" {
config.Sampling = &zap.SamplingConfig{
Initial: 100,
Thereafter: 100,
}
}
// Créer les cores
cores := []zapcore.Core{}
// Core pour stdout (toujours présent)
stdoutCore := zapcore.NewCore(
zapcore.NewJSONEncoder(config.EncoderConfig),
zapcore.AddSync(os.Stdout),
level,
)
cores = append(cores, stdoutCore)
// Core pour l'agrégation si activée
if aggConfig != nil && aggConfig.Enabled && aggConfig.EndpointURL != "" {
aggWriter, err := NewAggregationWriter(aggConfig)
if err == nil {
// Utiliser le même encoder JSON pour l'agrégation
aggCore := zapcore.NewCore(
zapcore.NewJSONEncoder(config.EncoderConfig),
zapcore.AddSync(aggWriter),
level,
)
cores = append(cores, aggCore)
}
// Si l'initialisation échoue, on continue sans agrégation
}
// Combiner les cores
core := zapcore.NewTee(cores...)
// Créer le logger
logger := zap.New(core,
zap.AddCaller(),
zap.AddStacktrace(zapcore.ErrorLevel),
)
return &Logger{zap: logger}, nil
}