veza/veza-backend-api/cmd/api/main.go
senke ccf3e64d9a feat(observability): DB pool monitoring + N+1 detection (v1.0.10 ops item 11)
Two complementary signals : pool-side (do we have enough connections
for the load?) and per-request side (does any single handler quietly
run hundreds of queries?). Both feed Prometheus + Grafana + alert
rules.

Pool stats exporter (internal/database/pool_stats_exporter.go) :
- Background goroutine ticks every 15s and feeds the existing
  veza_db_connections{state} gauges. Before this, the gauges only
  refreshed when /health/deep was hit, so PoolExhaustionImminent
  evaluated against stale data.
- Wired into cmd/api/main.go alongside the ledger sampler with a
  shutdown hook for clean cancellation.

N+1 detector (internal/database/n1_detector.go +
internal/middleware/n1_query_counter.go) :
- Per-request *int64 counter attached to ctx by the gin
  middleware ; GORM after-callback (Query/Create/Update/Delete/
  Row/Raw) atomic-adds.
- Cost : one pointer load + one atomic add per query.
- Cardinality bounded by c.FullPath() (templated route, not URL).
- Threshold default 50, override via VEZA_N1_THRESHOLD.
- Histogram veza_db_request_query_count + counter
  veza_db_n1_suspicions_total.

Alerts in alert_rules.yml veza_db_pool_n1 group :
- PoolExhaustionImminent (in_use ≥ 90% for 5m)
- PoolStatsExporterStuck (gauges frozen for 10m despite traffic)
- N1QuerySpike (> 3% of requests over threshold for 15m)
- SlowQuerySustained (slow query rate > 2/min for 15m on same op+table)

Tests : 8 detector tests + 4 middleware tests, all pass.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-04 23:53:37 +02:00

472 lines
18 KiB
Go
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package main
import (
"context"
"fmt"
"log"
"net/http"
// SECURITY(REM-027): pprof removed from production — use build tag or dedicated debug binary instead.
// To enable: go build -tags debug ./cmd/api
"os"
"os/signal"
"syscall"
"time"
"github.com/getsentry/sentry-go"
"github.com/gin-gonic/gin"
"github.com/joho/godotenv"
"go.uber.org/zap"
"veza-backend-api/internal/api"
"veza-backend-api/internal/config"
"veza-backend-api/internal/core/marketplace"
"veza-backend-api/internal/database"
vezaes "veza-backend-api/internal/elasticsearch"
"veza-backend-api/internal/jobs"
"veza-backend-api/internal/metrics"
"veza-backend-api/internal/monitoring"
"veza-backend-api/internal/services"
"veza-backend-api/internal/services/hyperswitch"
"veza-backend-api/internal/shutdown"
"veza-backend-api/internal/tracing"
"veza-backend-api/internal/workers"
_ "veza-backend-api/docs" // Import docs for swagger
)
// @title Veza Backend API
// @version 1.2.0
// @description Backend API for Veza platform.
// @termsOfService http://swagger.io/terms/
// @contact.name API Support
// @contact.url https://veza.fr/support
// @contact.email support@veza.fr
// @license.name Apache 2.0
// @license.url http://www.apache.org/licenses/LICENSE-2.0.html
// @host localhost:18080
// @BasePath /api/v1
// @securityDefinitions.apikey BearerAuth
// @in header
// @name Authorization
// @securityDefinitions.apikey ApiKeyAuth
// @in header
// @name X-API-Key
// @description Developer API key (obtain from Developer Portal). Format: vza_xxxxx
// appVersion is overridden at build time via
// `-ldflags "-X main.appVersion=vX.Y.Z"`. Used as the OTel resource
// attribute service.version + Sentry release tag.
var appVersion = "dev"
func main() {
// Charger les variables d'environnement
// NOTE: Do not write to stderr to avoid broken pipe errors with systemd journald
// The message will be logged by the logger once it's initialized
_ = godotenv.Load()
// FIX #1: Supprimer l'initialisation dupliquée du logger
// Le logger sera initialisé dans config.NewConfig() avec le bon LOG_LEVEL
// Charger la configuration (qui initialise le logger)
cfg, err := config.NewConfig()
if err != nil {
// Print to stderr (→ systemd journal) AND exit. The earlier
// "silent exit" path lost the actual error: lumberjack's
// file-rotation buffer never flushed before os.Exit, leaving
// only "Logger initialized" in /var/log/veza/backend-api.log
// and zero diagnostic in the journal. A bare fmt.Fprintf is
// the simplest reliable channel.
fmt.Fprintf(os.Stderr, "FATAL: config.NewConfig: %v\n", err)
os.Exit(1)
}
// Utiliser le logger de la config
logger := cfg.Logger
if logger == nil {
log.Fatal("❌ Logger non initialisé dans la configuration")
}
logger.Info("🚀 Démarrage de Veza Backend API")
// Valider la configuration
if err := cfg.Validate(); err != nil {
logger.Fatal("❌ Configuration invalide", zap.Error(err))
}
// Initialiser Sentry si DSN configuré
if cfg.SentryDsn != "" {
err := sentry.Init(sentry.ClientOptions{
Dsn: cfg.SentryDsn,
Environment: cfg.SentryEnvironment,
TracesSampleRate: cfg.SentrySampleRateTransactions,
SampleRate: cfg.SentrySampleRateErrors,
// AttachStacktrace pour capturer les stack traces
AttachStacktrace: true,
})
if err != nil {
logger.Warn("❌ Impossible d'initialiser Sentry", zap.Error(err))
} else {
logger.Info("✅ Sentry initialisé", zap.String("environment", cfg.SentryEnvironment))
}
// Flush les événements Sentry avant shutdown
defer sentry.Flush(2 * time.Second)
} else {
logger.Info(" Sentry non configuré (SENTRY_DSN non défini)")
}
// v1.0.9 Day 9 — OpenTelemetry tracer init. Spans flow to the
// otel-collector container (provisioned by infra/ansible/roles/
// otel_collector) which forwards them to Tempo. Disabled in
// dev / unit tests via OTEL_SDK_DISABLED=true to keep the
// process from background-dialing localhost:4317.
tracerCtx, tracerCancel := context.WithTimeout(context.Background(), 10*time.Second)
// AppVersion drawn from build-time ldflag; falls back to "dev" so
// the resource attribute is always populated. Set via:
// go build -ldflags "-X main.appVersion=v1.0.9" ./cmd/api
tracerProvider, err := tracing.InitOTLPTracer(tracerCtx, cfg.Env, appVersion, logger)
tracerCancel()
if err != nil {
// Tracing failure is operational, not fatal. The collector
// could be starting up at the same time as the backend; the
// exporter retries internally.
logger.Warn("OTel tracer init failed — continuing without spans", zap.Error(err))
}
defer func() {
shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
_ = tracerProvider.Shutdown(shutdownCtx)
}()
// Initialisation de la base de données
db := cfg.Database
if db == nil {
logger.Fatal("❌ Base de données non initialisée")
}
defer db.Close()
if err := db.Initialize(); err != nil {
logger.Fatal("❌ Impossible d'initialiser la base de données", zap.Error(err))
}
// MOD-P2-004: Démarrer le collecteur de métriques DB pool
// Collecte les stats DB pool toutes les 10 secondes et les expose via Prometheus
metrics.StartDBPoolStatsCollector(db.DB, 10*time.Second)
logger.Info("✅ Collecteur de métriques DB pool démarré")
// Fail-Fast: Vérifier RabbitMQ si activé
if cfg.RabbitMQEnable {
if cfg.RabbitMQEventBus == nil {
logger.Fatal("❌ RabbitMQ activé (RABBITMQ_ENABLE=true) mais non initialisé (problème de connexion?)")
} else {
// Optionnel: Check connection status if RabbitMQEventBus exposes it
// For now, assume if initialized it's connected or retrying.
// If we want STRICT fail fast, we would need to verify connection is Open here.
logger.Info("✅ RabbitMQ actif")
}
} else {
logger.Info(" RabbitMQ désactivé")
}
// BE-SVC-017: Créer le gestionnaire de shutdown gracieux
shutdownManager := shutdown.NewShutdownManager(logger)
// Démarrer le Job Worker avec contexte pour shutdown gracieux
var workerCtx context.Context
var workerCancel context.CancelFunc
if cfg.JobWorker != nil {
workerCtx, workerCancel = context.WithCancel(context.Background())
cfg.JobWorker.Start(workerCtx)
logger.Info("✅ Job Worker démarré")
// Enregistrer le Job Worker pour shutdown gracieux
shutdownManager.Register(shutdown.NewShutdownFunc("job_worker", func(ctx context.Context) error {
if workerCancel != nil {
workerCancel()
// Attendre un peu pour que les workers se terminent
time.Sleep(2 * time.Second)
}
return nil
}))
} else {
logger.Warn("⚠️ Job Worker non initialisé")
}
// v0.701: Start Transfer Retry Worker
// v1.0.7 item B: Start Reversal Worker (shares the same
// StripeConnectService — one initialisation for both workers).
if cfg.StripeConnectEnabled && cfg.StripeConnectSecretKey != "" {
stripeConnectSvc := services.NewStripeConnectService(db.GormDB, cfg.StripeConnectSecretKey, logger)
if cfg.TransferRetryEnabled {
retryWorker := marketplace.NewTransferRetryWorker(
db.GormDB, stripeConnectSvc, logger, cfg.TransferRetryInterval, cfg.TransferRetryMaxAttempts,
)
retryCtx, retryCancel := context.WithCancel(context.Background())
go retryWorker.Start(retryCtx)
logger.Info("Transfer Retry Worker started",
zap.Duration("interval", cfg.TransferRetryInterval),
zap.Int("max_retries", cfg.TransferRetryMaxAttempts))
shutdownManager.Register(shutdown.NewShutdownFunc("transfer_retry_worker", func(ctx context.Context) error {
retryCancel()
return nil
}))
}
if cfg.ReversalWorkerEnabled {
reversalWorker := marketplace.NewStripeReversalWorker(
db.GormDB, stripeConnectSvc, logger,
cfg.ReversalCheckInterval, cfg.ReversalMaxRetries,
cfg.ReversalBackoffBase, cfg.ReversalBackoffMax,
)
reversalCtx, reversalCancel := context.WithCancel(context.Background())
go reversalWorker.Start(reversalCtx)
logger.Info("Stripe Reversal Worker started",
zap.Duration("interval", cfg.ReversalCheckInterval),
zap.Int("max_retries", cfg.ReversalMaxRetries),
zap.Duration("backoff_base", cfg.ReversalBackoffBase),
zap.Duration("backoff_max", cfg.ReversalBackoffMax))
shutdownManager.Register(shutdown.NewShutdownFunc("stripe_reversal_worker", func(ctx context.Context) error {
reversalCancel()
return nil
}))
}
} else if cfg.TransferRetryEnabled || cfg.ReversalWorkerEnabled {
logger.Info("Transfer Retry / Reversal workers skipped — Stripe Connect not enabled")
}
// v1.0.7 item C: Reconciliation worker for stuck pending orders /
// refunds whose webhook never arrived. Gated on Hyperswitch being
// configured — without PSP read access there's nothing to sync
// against.
if cfg.ReconcileWorkerEnabled && cfg.HyperswitchEnabled && cfg.HyperswitchAPIKey != "" && cfg.HyperswitchURL != "" {
hsClient := hyperswitch.NewClient(cfg.HyperswitchURL, cfg.HyperswitchAPIKey)
hsProvider := hyperswitch.NewProvider(hsClient)
// Build a marketplace.Service for the dispatcher side. Scoped
// to the worker — the HTTP handler constructs its own via
// APIRouter.getMarketplaceService which wires additional opts
// (storage, checkout URL). Reconciler only needs the two
// Process*Webhook methods.
mktSvc := marketplace.NewService(db.GormDB, logger, nil,
marketplace.WithPaymentProvider(hsProvider),
marketplace.WithHyperswitchConfig(true, cfg.CheckoutSuccessURL),
)
reconcileWorker := marketplace.NewReconcileHyperswitchWorker(
db.GormDB, hsProvider, mktSvc, logger,
cfg.ReconcileInterval,
cfg.ReconcileOrderStuckAfter,
cfg.ReconcileRefundStuckAfter,
cfg.ReconcileRefundOrphanAfter,
)
reconcileCtx, reconcileCancel := context.WithCancel(context.Background())
go reconcileWorker.Start(reconcileCtx)
logger.Info("Reconcile Hyperswitch Worker started",
zap.Duration("interval", cfg.ReconcileInterval),
zap.Duration("order_stuck_after", cfg.ReconcileOrderStuckAfter),
zap.Duration("refund_stuck_after", cfg.ReconcileRefundStuckAfter),
zap.Duration("refund_orphan_after", cfg.ReconcileRefundOrphanAfter))
shutdownManager.Register(shutdown.NewShutdownFunc("reconcile_hyperswitch_worker", func(ctx context.Context) error {
reconcileCancel()
return nil
}))
} else if cfg.ReconcileWorkerEnabled {
logger.Info("Reconcile worker skipped — Hyperswitch not enabled")
}
// v0.802: Start Cloud Backup Worker (copies cloud files to backup prefix every 24h)
if cfg.S3StorageService != nil {
backupWorker := services.NewCloudBackupWorker(db.GormDB, cfg.S3StorageService, logger)
backupCtx, backupCancel := context.WithCancel(context.Background())
go backupWorker.Start(backupCtx)
logger.Info("Cloud Backup Worker started (24h interval)")
shutdownManager.Register(shutdown.NewShutdownFunc("cloud_backup_worker", func(ctx context.Context) error {
backupCancel()
return nil
}))
}
// v0.802: Start Gear Warranty Notifier (sends notifications when warranty expires in 30 days)
notificationService := services.NewNotificationService(db, logger)
warrantyNotifier := services.NewGearWarrantyNotifier(db.GormDB, notificationService, logger)
warrantyCtx, warrantyCancel := context.WithCancel(context.Background())
go warrantyNotifier.Start(warrantyCtx)
logger.Info("Gear Warranty Notifier started (24h interval)")
shutdownManager.Register(shutdown.NewShutdownFunc("gear_warranty_notifier", func(ctx context.Context) error {
warrantyCancel()
return nil
}))
// v0.10.5 F552: Weekly notification digest (runs on Sunday)
if cfg.JobWorker != nil {
digestWorker := services.NewNotificationDigestWorker(db.GormDB, cfg.JobWorker, logger)
digestCtx, digestCancel := context.WithCancel(context.Background())
go digestWorker.Start(digestCtx)
logger.Info("Notification digest worker started (weekly on Sunday)")
shutdownManager.Register(shutdown.NewShutdownFunc("notification_digest_worker", func(ctx context.Context) error {
digestCancel()
return nil
}))
}
// v0.10.8 F065: Hard delete worker (GDPR - final anonymization after 30 days)
if os.Getenv("HARD_DELETE_CRON_ENABLED") != "false" {
// Optional ES client for the worker's RGPD cleanup (users/tracks/playlists indices).
// Non-fatal if ES is disabled or unreachable — the worker will just skip ES cleanup.
var hardDeleteESClient *vezaes.Client
if esCfg := vezaes.LoadConfig(); esCfg.Enabled {
if esc, esErr := vezaes.NewClient(esCfg, logger); esErr != nil {
logger.Warn("Elasticsearch unavailable for hard delete worker, ES cleanup disabled",
zap.Error(esErr))
} else {
hardDeleteESClient = esc
}
}
hardDeleteWorker := workers.NewHardDeleteWorker(db.GormDB, logger, 24*time.Hour).
WithRedis(cfg.RedisClient).
WithElasticsearch(hardDeleteESClient)
hardDeleteCtx, hardDeleteCancel := context.WithCancel(context.Background())
go hardDeleteWorker.Start(hardDeleteCtx)
logger.Info("Hard delete worker started (24h interval)",
zap.Bool("redis_cleanup", cfg.RedisClient != nil),
zap.Bool("es_cleanup", hardDeleteESClient != nil),
)
shutdownManager.Register(shutdown.NewShutdownFunc("hard_delete_worker", func(ctx context.Context) error {
hardDeleteWorker.Stop()
hardDeleteCancel()
return nil
}))
} else {
logger.Info("Hard delete worker disabled (HARD_DELETE_CRON_ENABLED=false)")
}
// Configuration du mode Gin
// Correction: Utilisation directe de la variable d'env car non exposée dans Config
appEnv := os.Getenv("APP_ENV")
if appEnv == "production" {
gin.SetMode(gin.ReleaseMode)
} else {
gin.SetMode(gin.DebugMode)
}
// Créer le router Gin
router := gin.New()
// SECURITY(HIGH-006): Restrict trusted proxies to prevent IP spoofing via X-Forwarded-For.
// Default: trust nothing (c.ClientIP() returns RemoteAddr only).
// Set TRUSTED_PROXIES="10.0.0.1,10.0.0.2" if behind a known reverse proxy/load balancer.
router.SetTrustedProxies(nil)
// Middleware globaux (Logger, Recovery) recommandés par ORIGIN
router.Use(gin.Logger(), gin.Recovery())
// Configuration des routes
apiRouter := api.NewAPIRouter(db, cfg) // Instantiate APIRouter
if err := apiRouter.Setup(router); err != nil {
logger.Error("Failed to setup API routes", zap.Error(err))
os.Exit(1)
}
// v1.0.4: Hourly cleanup of tracks stuck in `processing` whose upload file
// vanished (crash, SIGKILL, disk wipe). Keeps the tracks table honest.
jobs.ScheduleOrphanTracksCleanup(db, logger)
// v1.0.7 item E: daily sweep of hyperswitch_webhook_log rows older than
// HYPERSWITCH_WEBHOOK_LOG_RETENTION_DAYS (default 90). Batched so a large
// backlog doesn't lock the table.
jobs.ScheduleHyperswitchWebhookLogCleanup(db, logger, cfg.HyperswitchWebhookLogRetentionDays)
// v1.0.7 item F: 60s sampler feeds five ledger-health gauges +
// reconciler_* counters. Grafana dashboard in config/grafana/ledger.json,
// alert rules in config/alertmanager/ledger.yml.
ledgerSamplerCtx, ledgerSamplerCancel := context.WithCancel(context.Background())
monitoring.ScheduleLedgerHealthSampler(ledgerSamplerCtx, db.GormDB, logger)
shutdownManager.Register(shutdown.NewShutdownFunc("ledger_health_sampler", func(ctx context.Context) error {
ledgerSamplerCancel()
return nil
}))
// v1.0.10 ops item 11 — periodic DB pool stats exporter. Without
// this the pool gauges only refresh when /health/deep is hit ;
// the pool-exhaustion alert needs fresh values to fire on time.
poolStatsCtx, poolStatsCancel := context.WithCancel(context.Background())
database.StartPoolStatsExporter(poolStatsCtx, db.GormDB, 15*time.Second, logger)
shutdownManager.Register(shutdown.NewShutdownFunc("pool_stats_exporter", func(ctx context.Context) error {
poolStatsCancel()
return nil
}))
// Configuration du serveur HTTP
port := fmt.Sprintf("%d", cfg.AppPort)
if cfg.AppPort == 0 {
port = "8080"
}
server := &http.Server{
Addr: fmt.Sprintf(":%s", port),
Handler: router,
ReadTimeout: 30 * time.Second, // Standards ORIGIN
WriteTimeout: 30 * time.Second,
}
// BE-SVC-017: Enregistrer tous les services pour shutdown gracieux
// Enregistrer le serveur HTTP
shutdownManager.Register(shutdown.NewShutdownFunc("http_server", func(ctx context.Context) error {
return server.Shutdown(ctx)
}))
// Enregistrer la configuration (ferme DB, Redis, RabbitMQ, etc.)
shutdownManager.Register(shutdown.NewShutdownFunc("config", func(ctx context.Context) error {
return cfg.Close()
}))
// Enregistrer le logger pour flush final
shutdownManager.Register(shutdown.NewShutdownFunc("logger", func(ctx context.Context) error {
if logger != nil {
return logger.Sync()
}
return nil
}))
// Enregistrer Sentry pour flush final
if cfg.SentryDsn != "" {
shutdownManager.Register(shutdown.NewShutdownFunc("sentry", func(ctx context.Context) error {
sentry.Flush(2 * time.Second)
return nil
}))
}
// Gestion de l'arrêt gracieux
quit := make(chan os.Signal, 1)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
go func() {
logger.Info("🌐 Serveur HTTP démarré", zap.String("port", port))
if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
logger.Fatal("❌ Erreur du serveur HTTP", zap.Error(err))
}
}()
// Attendre le signal d'arrêt
<-quit
logger.Info("🔄 Signal d'arrêt reçu, démarrage du shutdown gracieux...")
// BE-SVC-017: Arrêt gracieux coordonné de tous les services
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 30*time.Second)
defer shutdownCancel()
if err := shutdownManager.Shutdown(shutdownCtx); err != nil {
logger.Error("❌ Erreur lors du shutdown gracieux", zap.Error(err))
} else {
logger.Info("✅ Shutdown gracieux terminé avec succès")
}
}