veza/veza-backend-api/cmd/api/main.go
senke 84e92a75e2
Some checks failed
Veza CI / Notify on failure (push) Blocked by required conditions
Security Scan / Secret Scanning (gitleaks) (push) Waiting to run
Veza CI / Backend (Go) (push) Has been cancelled
Veza CI / Rust (Stream Server) (push) Has been cancelled
Veza CI / Frontend (Web) (push) Has been cancelled
E2E Playwright / e2e (full) (push) Has been cancelled
feat(observability): OTel SDK + collector + Tempo + 4 hot path spans (W2 Day 9)
Wires distributed tracing end-to-end. Backend exports OTLP/gRPC to a
collector, which tail-samples (errors + slow always, 10% rest) and
ships to Tempo. Grafana service-map dashboard pivots on the 4
instrumented hot paths.

- internal/tracing/otlp_exporter.go : InitOTLPTracer + Provider.Shutdown,
  BatchSpanProcessor (5s/512 batch), ParentBased(TraceIDRatio) sampler,
  W3C trace-context + baggage propagators. OTEL_SDK_DISABLED=true
  short-circuits to a no-op. Failure to dial collector is non-fatal.
- cmd/api/main.go : init at boot, defer Shutdown(5s) on exit. appVersion
  ldflag-overridable for resource attributes.
- 4 hot paths instrumented :
    * handlers/auth.go::Login           → "auth.login"
    * core/track/track_upload_handler.go::InitiateChunkedUpload → "track.upload.initiate"
    * core/marketplace/service.go::ProcessPaymentWebhook → "payment.webhook"
    * handlers/search_handlers.go::Search → "search.query"
  PII guarded — email masked, query content not recorded (length only).
- infra/ansible/roles/otel_collector : pin v0.116.1 contrib build,
  systemd unit, tail-sampling config (errors + > 500ms always kept).
- infra/ansible/roles/tempo : pin v2.7.1 monolithic, local-disk backend
  (S3 deferred to v1.1), 14d retention.
- infra/ansible/playbooks/observability.yml : provisions both Incus
  containers + applies common baseline + roles in order.
- inventory/lab.yml : new groups observability, otel_collectors, tempo.
- config/grafana/dashboards/service-map.json : node graph + 4 hot-path
  span tables + collector throughput/queue panels.
- docs/ENV_VARIABLES.md §30 : 4 OTEL_* env vars documented.

Acceptance criterion (Day 9) : login → span visible in Tempo UI. Lab
deployment to validate with `ansible-playbook -i inventory/lab.yml
playbooks/observability.yml` once roles/postgres_ha is up.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-28 01:15:11 +02:00

457 lines
17 KiB
Go
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package main
import (
"context"
"fmt"
"log"
"net/http"
// SECURITY(REM-027): pprof removed from production — use build tag or dedicated debug binary instead.
// To enable: go build -tags debug ./cmd/api
"os"
"os/signal"
"syscall"
"time"
"github.com/getsentry/sentry-go"
"github.com/gin-gonic/gin"
"github.com/joho/godotenv"
"go.uber.org/zap"
"veza-backend-api/internal/api"
"veza-backend-api/internal/config"
"veza-backend-api/internal/core/marketplace"
vezaes "veza-backend-api/internal/elasticsearch"
"veza-backend-api/internal/jobs"
"veza-backend-api/internal/metrics"
"veza-backend-api/internal/monitoring"
"veza-backend-api/internal/services"
"veza-backend-api/internal/services/hyperswitch"
"veza-backend-api/internal/shutdown"
"veza-backend-api/internal/tracing"
"veza-backend-api/internal/workers"
_ "veza-backend-api/docs" // Import docs for swagger
)
// @title Veza Backend API
// @version 1.2.0
// @description Backend API for Veza platform.
// @termsOfService http://swagger.io/terms/
// @contact.name API Support
// @contact.url https://veza.fr/support
// @contact.email support@veza.fr
// @license.name Apache 2.0
// @license.url http://www.apache.org/licenses/LICENSE-2.0.html
// @host localhost:18080
// @BasePath /api/v1
// @securityDefinitions.apikey BearerAuth
// @in header
// @name Authorization
// @securityDefinitions.apikey ApiKeyAuth
// @in header
// @name X-API-Key
// @description Developer API key (obtain from Developer Portal). Format: vza_xxxxx
// appVersion is overridden at build time via
// `-ldflags "-X main.appVersion=vX.Y.Z"`. Used as the OTel resource
// attribute service.version + Sentry release tag.
var appVersion = "dev"
func main() {
// Charger les variables d'environnement
// NOTE: Do not write to stderr to avoid broken pipe errors with systemd journald
// The message will be logged by the logger once it's initialized
_ = godotenv.Load()
// FIX #1: Supprimer l'initialisation dupliquée du logger
// Le logger sera initialisé dans config.NewConfig() avec le bon LOG_LEVEL
// Charger la configuration (qui initialise le logger)
cfg, err := config.NewConfig()
if err != nil {
// CRITICAL: Do not write to stderr or files to avoid broken pipe errors
// Just exit silently - systemd will capture the exit code
// The error details will be in the application logs if the logger was initialized
os.Exit(1)
}
// Utiliser le logger de la config
logger := cfg.Logger
if logger == nil {
log.Fatal("❌ Logger non initialisé dans la configuration")
}
logger.Info("🚀 Démarrage de Veza Backend API")
// Valider la configuration
if err := cfg.Validate(); err != nil {
logger.Fatal("❌ Configuration invalide", zap.Error(err))
}
// Initialiser Sentry si DSN configuré
if cfg.SentryDsn != "" {
err := sentry.Init(sentry.ClientOptions{
Dsn: cfg.SentryDsn,
Environment: cfg.SentryEnvironment,
TracesSampleRate: cfg.SentrySampleRateTransactions,
SampleRate: cfg.SentrySampleRateErrors,
// AttachStacktrace pour capturer les stack traces
AttachStacktrace: true,
})
if err != nil {
logger.Warn("❌ Impossible d'initialiser Sentry", zap.Error(err))
} else {
logger.Info("✅ Sentry initialisé", zap.String("environment", cfg.SentryEnvironment))
}
// Flush les événements Sentry avant shutdown
defer sentry.Flush(2 * time.Second)
} else {
logger.Info(" Sentry non configuré (SENTRY_DSN non défini)")
}
// v1.0.9 Day 9 — OpenTelemetry tracer init. Spans flow to the
// otel-collector container (provisioned by infra/ansible/roles/
// otel_collector) which forwards them to Tempo. Disabled in
// dev / unit tests via OTEL_SDK_DISABLED=true to keep the
// process from background-dialing localhost:4317.
tracerCtx, tracerCancel := context.WithTimeout(context.Background(), 10*time.Second)
// AppVersion drawn from build-time ldflag; falls back to "dev" so
// the resource attribute is always populated. Set via:
// go build -ldflags "-X main.appVersion=v1.0.9" ./cmd/api
tracerProvider, err := tracing.InitOTLPTracer(tracerCtx, cfg.Env, appVersion, logger)
tracerCancel()
if err != nil {
// Tracing failure is operational, not fatal. The collector
// could be starting up at the same time as the backend; the
// exporter retries internally.
logger.Warn("OTel tracer init failed — continuing without spans", zap.Error(err))
}
defer func() {
shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
_ = tracerProvider.Shutdown(shutdownCtx)
}()
// Initialisation de la base de données
db := cfg.Database
if db == nil {
logger.Fatal("❌ Base de données non initialisée")
}
defer db.Close()
if err := db.Initialize(); err != nil {
logger.Fatal("❌ Impossible d'initialiser la base de données", zap.Error(err))
}
// MOD-P2-004: Démarrer le collecteur de métriques DB pool
// Collecte les stats DB pool toutes les 10 secondes et les expose via Prometheus
metrics.StartDBPoolStatsCollector(db.DB, 10*time.Second)
logger.Info("✅ Collecteur de métriques DB pool démarré")
// Fail-Fast: Vérifier RabbitMQ si activé
if cfg.RabbitMQEnable {
if cfg.RabbitMQEventBus == nil {
logger.Fatal("❌ RabbitMQ activé (RABBITMQ_ENABLE=true) mais non initialisé (problème de connexion?)")
} else {
// Optionnel: Check connection status if RabbitMQEventBus exposes it
// For now, assume if initialized it's connected or retrying.
// If we want STRICT fail fast, we would need to verify connection is Open here.
logger.Info("✅ RabbitMQ actif")
}
} else {
logger.Info(" RabbitMQ désactivé")
}
// BE-SVC-017: Créer le gestionnaire de shutdown gracieux
shutdownManager := shutdown.NewShutdownManager(logger)
// Démarrer le Job Worker avec contexte pour shutdown gracieux
var workerCtx context.Context
var workerCancel context.CancelFunc
if cfg.JobWorker != nil {
workerCtx, workerCancel = context.WithCancel(context.Background())
cfg.JobWorker.Start(workerCtx)
logger.Info("✅ Job Worker démarré")
// Enregistrer le Job Worker pour shutdown gracieux
shutdownManager.Register(shutdown.NewShutdownFunc("job_worker", func(ctx context.Context) error {
if workerCancel != nil {
workerCancel()
// Attendre un peu pour que les workers se terminent
time.Sleep(2 * time.Second)
}
return nil
}))
} else {
logger.Warn("⚠️ Job Worker non initialisé")
}
// v0.701: Start Transfer Retry Worker
// v1.0.7 item B: Start Reversal Worker (shares the same
// StripeConnectService — one initialisation for both workers).
if cfg.StripeConnectEnabled && cfg.StripeConnectSecretKey != "" {
stripeConnectSvc := services.NewStripeConnectService(db.GormDB, cfg.StripeConnectSecretKey, logger)
if cfg.TransferRetryEnabled {
retryWorker := marketplace.NewTransferRetryWorker(
db.GormDB, stripeConnectSvc, logger, cfg.TransferRetryInterval, cfg.TransferRetryMaxAttempts,
)
retryCtx, retryCancel := context.WithCancel(context.Background())
go retryWorker.Start(retryCtx)
logger.Info("Transfer Retry Worker started",
zap.Duration("interval", cfg.TransferRetryInterval),
zap.Int("max_retries", cfg.TransferRetryMaxAttempts))
shutdownManager.Register(shutdown.NewShutdownFunc("transfer_retry_worker", func(ctx context.Context) error {
retryCancel()
return nil
}))
}
if cfg.ReversalWorkerEnabled {
reversalWorker := marketplace.NewStripeReversalWorker(
db.GormDB, stripeConnectSvc, logger,
cfg.ReversalCheckInterval, cfg.ReversalMaxRetries,
cfg.ReversalBackoffBase, cfg.ReversalBackoffMax,
)
reversalCtx, reversalCancel := context.WithCancel(context.Background())
go reversalWorker.Start(reversalCtx)
logger.Info("Stripe Reversal Worker started",
zap.Duration("interval", cfg.ReversalCheckInterval),
zap.Int("max_retries", cfg.ReversalMaxRetries),
zap.Duration("backoff_base", cfg.ReversalBackoffBase),
zap.Duration("backoff_max", cfg.ReversalBackoffMax))
shutdownManager.Register(shutdown.NewShutdownFunc("stripe_reversal_worker", func(ctx context.Context) error {
reversalCancel()
return nil
}))
}
} else if cfg.TransferRetryEnabled || cfg.ReversalWorkerEnabled {
logger.Info("Transfer Retry / Reversal workers skipped — Stripe Connect not enabled")
}
// v1.0.7 item C: Reconciliation worker for stuck pending orders /
// refunds whose webhook never arrived. Gated on Hyperswitch being
// configured — without PSP read access there's nothing to sync
// against.
if cfg.ReconcileWorkerEnabled && cfg.HyperswitchEnabled && cfg.HyperswitchAPIKey != "" && cfg.HyperswitchURL != "" {
hsClient := hyperswitch.NewClient(cfg.HyperswitchURL, cfg.HyperswitchAPIKey)
hsProvider := hyperswitch.NewProvider(hsClient)
// Build a marketplace.Service for the dispatcher side. Scoped
// to the worker — the HTTP handler constructs its own via
// APIRouter.getMarketplaceService which wires additional opts
// (storage, checkout URL). Reconciler only needs the two
// Process*Webhook methods.
mktSvc := marketplace.NewService(db.GormDB, logger, nil,
marketplace.WithPaymentProvider(hsProvider),
marketplace.WithHyperswitchConfig(true, cfg.CheckoutSuccessURL),
)
reconcileWorker := marketplace.NewReconcileHyperswitchWorker(
db.GormDB, hsProvider, mktSvc, logger,
cfg.ReconcileInterval,
cfg.ReconcileOrderStuckAfter,
cfg.ReconcileRefundStuckAfter,
cfg.ReconcileRefundOrphanAfter,
)
reconcileCtx, reconcileCancel := context.WithCancel(context.Background())
go reconcileWorker.Start(reconcileCtx)
logger.Info("Reconcile Hyperswitch Worker started",
zap.Duration("interval", cfg.ReconcileInterval),
zap.Duration("order_stuck_after", cfg.ReconcileOrderStuckAfter),
zap.Duration("refund_stuck_after", cfg.ReconcileRefundStuckAfter),
zap.Duration("refund_orphan_after", cfg.ReconcileRefundOrphanAfter))
shutdownManager.Register(shutdown.NewShutdownFunc("reconcile_hyperswitch_worker", func(ctx context.Context) error {
reconcileCancel()
return nil
}))
} else if cfg.ReconcileWorkerEnabled {
logger.Info("Reconcile worker skipped — Hyperswitch not enabled")
}
// v0.802: Start Cloud Backup Worker (copies cloud files to backup prefix every 24h)
if cfg.S3StorageService != nil {
backupWorker := services.NewCloudBackupWorker(db.GormDB, cfg.S3StorageService, logger)
backupCtx, backupCancel := context.WithCancel(context.Background())
go backupWorker.Start(backupCtx)
logger.Info("Cloud Backup Worker started (24h interval)")
shutdownManager.Register(shutdown.NewShutdownFunc("cloud_backup_worker", func(ctx context.Context) error {
backupCancel()
return nil
}))
}
// v0.802: Start Gear Warranty Notifier (sends notifications when warranty expires in 30 days)
notificationService := services.NewNotificationService(db, logger)
warrantyNotifier := services.NewGearWarrantyNotifier(db.GormDB, notificationService, logger)
warrantyCtx, warrantyCancel := context.WithCancel(context.Background())
go warrantyNotifier.Start(warrantyCtx)
logger.Info("Gear Warranty Notifier started (24h interval)")
shutdownManager.Register(shutdown.NewShutdownFunc("gear_warranty_notifier", func(ctx context.Context) error {
warrantyCancel()
return nil
}))
// v0.10.5 F552: Weekly notification digest (runs on Sunday)
if cfg.JobWorker != nil {
digestWorker := services.NewNotificationDigestWorker(db.GormDB, cfg.JobWorker, logger)
digestCtx, digestCancel := context.WithCancel(context.Background())
go digestWorker.Start(digestCtx)
logger.Info("Notification digest worker started (weekly on Sunday)")
shutdownManager.Register(shutdown.NewShutdownFunc("notification_digest_worker", func(ctx context.Context) error {
digestCancel()
return nil
}))
}
// v0.10.8 F065: Hard delete worker (GDPR - final anonymization after 30 days)
if os.Getenv("HARD_DELETE_CRON_ENABLED") != "false" {
// Optional ES client for the worker's RGPD cleanup (users/tracks/playlists indices).
// Non-fatal if ES is disabled or unreachable — the worker will just skip ES cleanup.
var hardDeleteESClient *vezaes.Client
if esCfg := vezaes.LoadConfig(); esCfg.Enabled {
if esc, esErr := vezaes.NewClient(esCfg, logger); esErr != nil {
logger.Warn("Elasticsearch unavailable for hard delete worker, ES cleanup disabled",
zap.Error(esErr))
} else {
hardDeleteESClient = esc
}
}
hardDeleteWorker := workers.NewHardDeleteWorker(db.GormDB, logger, 24*time.Hour).
WithRedis(cfg.RedisClient).
WithElasticsearch(hardDeleteESClient)
hardDeleteCtx, hardDeleteCancel := context.WithCancel(context.Background())
go hardDeleteWorker.Start(hardDeleteCtx)
logger.Info("Hard delete worker started (24h interval)",
zap.Bool("redis_cleanup", cfg.RedisClient != nil),
zap.Bool("es_cleanup", hardDeleteESClient != nil),
)
shutdownManager.Register(shutdown.NewShutdownFunc("hard_delete_worker", func(ctx context.Context) error {
hardDeleteWorker.Stop()
hardDeleteCancel()
return nil
}))
} else {
logger.Info("Hard delete worker disabled (HARD_DELETE_CRON_ENABLED=false)")
}
// Configuration du mode Gin
// Correction: Utilisation directe de la variable d'env car non exposée dans Config
appEnv := os.Getenv("APP_ENV")
if appEnv == "production" {
gin.SetMode(gin.ReleaseMode)
} else {
gin.SetMode(gin.DebugMode)
}
// Créer le router Gin
router := gin.New()
// SECURITY(HIGH-006): Restrict trusted proxies to prevent IP spoofing via X-Forwarded-For.
// Default: trust nothing (c.ClientIP() returns RemoteAddr only).
// Set TRUSTED_PROXIES="10.0.0.1,10.0.0.2" if behind a known reverse proxy/load balancer.
router.SetTrustedProxies(nil)
// Middleware globaux (Logger, Recovery) recommandés par ORIGIN
router.Use(gin.Logger(), gin.Recovery())
// Configuration des routes
apiRouter := api.NewAPIRouter(db, cfg) // Instantiate APIRouter
if err := apiRouter.Setup(router); err != nil {
logger.Error("Failed to setup API routes", zap.Error(err))
os.Exit(1)
}
// v1.0.4: Hourly cleanup of tracks stuck in `processing` whose upload file
// vanished (crash, SIGKILL, disk wipe). Keeps the tracks table honest.
jobs.ScheduleOrphanTracksCleanup(db, logger)
// v1.0.7 item E: daily sweep of hyperswitch_webhook_log rows older than
// HYPERSWITCH_WEBHOOK_LOG_RETENTION_DAYS (default 90). Batched so a large
// backlog doesn't lock the table.
jobs.ScheduleHyperswitchWebhookLogCleanup(db, logger, cfg.HyperswitchWebhookLogRetentionDays)
// v1.0.7 item F: 60s sampler feeds five ledger-health gauges +
// reconciler_* counters. Grafana dashboard in config/grafana/ledger.json,
// alert rules in config/alertmanager/ledger.yml.
ledgerSamplerCtx, ledgerSamplerCancel := context.WithCancel(context.Background())
monitoring.ScheduleLedgerHealthSampler(ledgerSamplerCtx, db.GormDB, logger)
shutdownManager.Register(shutdown.NewShutdownFunc("ledger_health_sampler", func(ctx context.Context) error {
ledgerSamplerCancel()
return nil
}))
// Configuration du serveur HTTP
port := fmt.Sprintf("%d", cfg.AppPort)
if cfg.AppPort == 0 {
port = "8080"
}
server := &http.Server{
Addr: fmt.Sprintf(":%s", port),
Handler: router,
ReadTimeout: 30 * time.Second, // Standards ORIGIN
WriteTimeout: 30 * time.Second,
}
// BE-SVC-017: Enregistrer tous les services pour shutdown gracieux
// Enregistrer le serveur HTTP
shutdownManager.Register(shutdown.NewShutdownFunc("http_server", func(ctx context.Context) error {
return server.Shutdown(ctx)
}))
// Enregistrer la configuration (ferme DB, Redis, RabbitMQ, etc.)
shutdownManager.Register(shutdown.NewShutdownFunc("config", func(ctx context.Context) error {
return cfg.Close()
}))
// Enregistrer le logger pour flush final
shutdownManager.Register(shutdown.NewShutdownFunc("logger", func(ctx context.Context) error {
if logger != nil {
return logger.Sync()
}
return nil
}))
// Enregistrer Sentry pour flush final
if cfg.SentryDsn != "" {
shutdownManager.Register(shutdown.NewShutdownFunc("sentry", func(ctx context.Context) error {
sentry.Flush(2 * time.Second)
return nil
}))
}
// Gestion de l'arrêt gracieux
quit := make(chan os.Signal, 1)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
go func() {
logger.Info("🌐 Serveur HTTP démarré", zap.String("port", port))
if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
logger.Fatal("❌ Erreur du serveur HTTP", zap.Error(err))
}
}()
// Attendre le signal d'arrêt
<-quit
logger.Info("🔄 Signal d'arrêt reçu, démarrage du shutdown gracieux...")
// BE-SVC-017: Arrêt gracieux coordonné de tous les services
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 30*time.Second)
defer shutdownCancel()
if err := shutdownManager.Shutdown(shutdownCtx); err != nil {
logger.Error("❌ Erreur lors du shutdown gracieux", zap.Error(err))
} else {
logger.Info("✅ Shutdown gracieux terminé avec succès")
}
}