veza/veza-backend-api/cmd/api/main.go
senke 7e180a2c08 feat(workers): hyperswitch reconciliation sweep for stuck pending states — v1.0.7 item C
New ReconcileHyperswitchWorker sweeps for pending orders and refunds
whose terminal webhook never arrived. Pulls live PSP state for each
stuck row and synthesises a webhook payload to feed the normal
ProcessPaymentWebhook / ProcessRefundWebhook dispatcher. The existing
terminal-state guards on those handlers make reconciliation
idempotent against real webhooks — a late webhook after the reconciler
resolved the row is a no-op.

Three stuck-state classes covered:
  1. Stuck orders (pending > 30m, non-empty payment_id) → GetPaymentStatus
     + synthetic payment.<status> webhook.
  2. Stuck refunds with PSP id (pending > 30m, non-empty
     hyperswitch_refund_id) → GetRefundStatus + synthetic
     refund.<status> webhook (error_message forwarded).
  3. Orphan refunds (pending > 5m, EMPTY hyperswitch_refund_id) →
     mark failed + roll order back to completed + log ERROR. This
     is the "we crashed between Phase 1 and Phase 2 of RefundOrder"
     case, operator-attention territory.

New interfaces:
  * marketplace.HyperswitchReadClient — read-only PSP surface the
    worker depends on (GetPaymentStatus, GetRefundStatus). The
    worker never calls CreatePayment / CreateRefund.
  * hyperswitch.Client.GetRefund + RefundStatus struct added.
  * hyperswitch.Provider gains GetRefundStatus + GetPaymentStatus
    pass-throughs that satisfy the marketplace interface.

Configuration (all env-var tunable with sensible defaults):
  * RECONCILE_WORKER_ENABLED=true
  * RECONCILE_INTERVAL=1h (ops can drop to 5m during incident
    response without a code change)
  * RECONCILE_ORDER_STUCK_AFTER=30m
  * RECONCILE_REFUND_STUCK_AFTER=30m
  * RECONCILE_REFUND_ORPHAN_AFTER=5m (shorter because "app crashed"
    is a different signal from "network hiccup")

Operational details:
  * Batch limit 50 rows per phase per tick so a 10k-row backlog
    doesn't hammer Hyperswitch. Next tick picks up the rest.
  * PSP read errors leave the row untouched — next tick retries.
    Reconciliation is always safe to replay.
  * Structured log on every action so `grep reconcile` tells the
    ops story: which order/refund got synced, against what status,
    how long it was stuck.
  * Worker wired in cmd/api/main.go, gated on
    HyperswitchEnabled + HyperswitchAPIKey. Graceful shutdown
    registered.
  * RunOnce exposed as public API for ad-hoc ops trigger during
    incident response.

Tests — 10 cases, all green (sqlite :memory:):
  * TestReconcile_StuckOrder_SyncsViaSyntheticWebhook
  * TestReconcile_RecentOrder_NotTouched
  * TestReconcile_CompletedOrder_NotTouched
  * TestReconcile_OrderWithEmptyPaymentID_NotTouched
  * TestReconcile_PSPReadErrorLeavesRowIntact
  * TestReconcile_OrphanRefund_AutoFails_OrderRollsBack
  * TestReconcile_RecentOrphanRefund_NotTouched
  * TestReconcile_StuckRefund_SyncsViaSyntheticWebhook
  * TestReconcile_StuckRefund_FailureStatus_PassesErrorMessage
  * TestReconcile_AllTerminalStates_NoOp

CHANGELOG v1.0.7-rc1 updated with the full item C section between D
and the existing E block, matching the order convention (ship order:
A → D → B → E → C, CHANGELOG order follows).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-18 03:08:15 +02:00

417 lines
15 KiB
Go
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package main
import (
"context"
"fmt"
"log"
"net/http"
// SECURITY(REM-027): pprof removed from production — use build tag or dedicated debug binary instead.
// To enable: go build -tags debug ./cmd/api
"os"
"os/signal"
"syscall"
"time"
"github.com/getsentry/sentry-go"
"github.com/gin-gonic/gin"
"github.com/joho/godotenv"
"go.uber.org/zap"
"veza-backend-api/internal/api"
"veza-backend-api/internal/config"
"veza-backend-api/internal/core/marketplace"
vezaes "veza-backend-api/internal/elasticsearch"
"veza-backend-api/internal/jobs"
"veza-backend-api/internal/metrics"
"veza-backend-api/internal/services"
"veza-backend-api/internal/services/hyperswitch"
"veza-backend-api/internal/shutdown"
"veza-backend-api/internal/workers"
_ "veza-backend-api/docs" // Import docs for swagger
)
// @title Veza Backend API
// @version 1.2.0
// @description Backend API for Veza platform.
// @termsOfService http://swagger.io/terms/
// @contact.name API Support
// @contact.url http://www.veza.app/support
// @contact.email support@veza.app
// @license.name Apache 2.0
// @license.url http://www.apache.org/licenses/LICENSE-2.0.html
// @host localhost:18080
// @BasePath /api/v1
// @securityDefinitions.apikey BearerAuth
// @in header
// @name Authorization
// @securityDefinitions.apikey ApiKeyAuth
// @in header
// @name X-API-Key
// @description Developer API key (obtain from Developer Portal). Format: vza_xxxxx
func main() {
// Charger les variables d'environnement
// NOTE: Do not write to stderr to avoid broken pipe errors with systemd journald
// The message will be logged by the logger once it's initialized
_ = godotenv.Load()
// FIX #1: Supprimer l'initialisation dupliquée du logger
// Le logger sera initialisé dans config.NewConfig() avec le bon LOG_LEVEL
// Charger la configuration (qui initialise le logger)
cfg, err := config.NewConfig()
if err != nil {
// CRITICAL: Do not write to stderr or files to avoid broken pipe errors
// Just exit silently - systemd will capture the exit code
// The error details will be in the application logs if the logger was initialized
os.Exit(1)
}
// Utiliser le logger de la config
logger := cfg.Logger
if logger == nil {
log.Fatal("❌ Logger non initialisé dans la configuration")
}
logger.Info("🚀 Démarrage de Veza Backend API")
// Valider la configuration
if err := cfg.Validate(); err != nil {
logger.Fatal("❌ Configuration invalide", zap.Error(err))
}
// Initialiser Sentry si DSN configuré
if cfg.SentryDsn != "" {
err := sentry.Init(sentry.ClientOptions{
Dsn: cfg.SentryDsn,
Environment: cfg.SentryEnvironment,
TracesSampleRate: cfg.SentrySampleRateTransactions,
SampleRate: cfg.SentrySampleRateErrors,
// AttachStacktrace pour capturer les stack traces
AttachStacktrace: true,
})
if err != nil {
logger.Warn("❌ Impossible d'initialiser Sentry", zap.Error(err))
} else {
logger.Info("✅ Sentry initialisé", zap.String("environment", cfg.SentryEnvironment))
}
// Flush les événements Sentry avant shutdown
defer sentry.Flush(2 * time.Second)
} else {
logger.Info(" Sentry non configuré (SENTRY_DSN non défini)")
}
// Initialisation de la base de données
db := cfg.Database
if db == nil {
logger.Fatal("❌ Base de données non initialisée")
}
defer db.Close()
if err := db.Initialize(); err != nil {
logger.Fatal("❌ Impossible d'initialiser la base de données", zap.Error(err))
}
// MOD-P2-004: Démarrer le collecteur de métriques DB pool
// Collecte les stats DB pool toutes les 10 secondes et les expose via Prometheus
metrics.StartDBPoolStatsCollector(db.DB, 10*time.Second)
logger.Info("✅ Collecteur de métriques DB pool démarré")
// Fail-Fast: Vérifier RabbitMQ si activé
if cfg.RabbitMQEnable {
if cfg.RabbitMQEventBus == nil {
logger.Fatal("❌ RabbitMQ activé (RABBITMQ_ENABLE=true) mais non initialisé (problème de connexion?)")
} else {
// Optionnel: Check connection status if RabbitMQEventBus exposes it
// For now, assume if initialized it's connected or retrying.
// If we want STRICT fail fast, we would need to verify connection is Open here.
logger.Info("✅ RabbitMQ actif")
}
} else {
logger.Info(" RabbitMQ désactivé")
}
// BE-SVC-017: Créer le gestionnaire de shutdown gracieux
shutdownManager := shutdown.NewShutdownManager(logger)
// Démarrer le Job Worker avec contexte pour shutdown gracieux
var workerCtx context.Context
var workerCancel context.CancelFunc
if cfg.JobWorker != nil {
workerCtx, workerCancel = context.WithCancel(context.Background())
cfg.JobWorker.Start(workerCtx)
logger.Info("✅ Job Worker démarré")
// Enregistrer le Job Worker pour shutdown gracieux
shutdownManager.Register(shutdown.NewShutdownFunc("job_worker", func(ctx context.Context) error {
if workerCancel != nil {
workerCancel()
// Attendre un peu pour que les workers se terminent
time.Sleep(2 * time.Second)
}
return nil
}))
} else {
logger.Warn("⚠️ Job Worker non initialisé")
}
// v0.701: Start Transfer Retry Worker
// v1.0.7 item B: Start Reversal Worker (shares the same
// StripeConnectService — one initialisation for both workers).
if cfg.StripeConnectEnabled && cfg.StripeConnectSecretKey != "" {
stripeConnectSvc := services.NewStripeConnectService(db.GormDB, cfg.StripeConnectSecretKey, logger)
if cfg.TransferRetryEnabled {
retryWorker := marketplace.NewTransferRetryWorker(
db.GormDB, stripeConnectSvc, logger, cfg.TransferRetryInterval, cfg.TransferRetryMaxAttempts,
)
retryCtx, retryCancel := context.WithCancel(context.Background())
go retryWorker.Start(retryCtx)
logger.Info("Transfer Retry Worker started",
zap.Duration("interval", cfg.TransferRetryInterval),
zap.Int("max_retries", cfg.TransferRetryMaxAttempts))
shutdownManager.Register(shutdown.NewShutdownFunc("transfer_retry_worker", func(ctx context.Context) error {
retryCancel()
return nil
}))
}
if cfg.ReversalWorkerEnabled {
reversalWorker := marketplace.NewStripeReversalWorker(
db.GormDB, stripeConnectSvc, logger,
cfg.ReversalCheckInterval, cfg.ReversalMaxRetries,
cfg.ReversalBackoffBase, cfg.ReversalBackoffMax,
)
reversalCtx, reversalCancel := context.WithCancel(context.Background())
go reversalWorker.Start(reversalCtx)
logger.Info("Stripe Reversal Worker started",
zap.Duration("interval", cfg.ReversalCheckInterval),
zap.Int("max_retries", cfg.ReversalMaxRetries),
zap.Duration("backoff_base", cfg.ReversalBackoffBase),
zap.Duration("backoff_max", cfg.ReversalBackoffMax))
shutdownManager.Register(shutdown.NewShutdownFunc("stripe_reversal_worker", func(ctx context.Context) error {
reversalCancel()
return nil
}))
}
} else if cfg.TransferRetryEnabled || cfg.ReversalWorkerEnabled {
logger.Info("Transfer Retry / Reversal workers skipped — Stripe Connect not enabled")
}
// v1.0.7 item C: Reconciliation worker for stuck pending orders /
// refunds whose webhook never arrived. Gated on Hyperswitch being
// configured — without PSP read access there's nothing to sync
// against.
if cfg.ReconcileWorkerEnabled && cfg.HyperswitchEnabled && cfg.HyperswitchAPIKey != "" && cfg.HyperswitchURL != "" {
hsClient := hyperswitch.NewClient(cfg.HyperswitchURL, cfg.HyperswitchAPIKey)
hsProvider := hyperswitch.NewProvider(hsClient)
// Build a marketplace.Service for the dispatcher side. Scoped
// to the worker — the HTTP handler constructs its own via
// APIRouter.getMarketplaceService which wires additional opts
// (storage, checkout URL). Reconciler only needs the two
// Process*Webhook methods.
mktSvc := marketplace.NewService(db.GormDB, logger, nil,
marketplace.WithPaymentProvider(hsProvider),
marketplace.WithHyperswitchConfig(true, cfg.CheckoutSuccessURL),
)
reconcileWorker := marketplace.NewReconcileHyperswitchWorker(
db.GormDB, hsProvider, mktSvc, logger,
cfg.ReconcileInterval,
cfg.ReconcileOrderStuckAfter,
cfg.ReconcileRefundStuckAfter,
cfg.ReconcileRefundOrphanAfter,
)
reconcileCtx, reconcileCancel := context.WithCancel(context.Background())
go reconcileWorker.Start(reconcileCtx)
logger.Info("Reconcile Hyperswitch Worker started",
zap.Duration("interval", cfg.ReconcileInterval),
zap.Duration("order_stuck_after", cfg.ReconcileOrderStuckAfter),
zap.Duration("refund_stuck_after", cfg.ReconcileRefundStuckAfter),
zap.Duration("refund_orphan_after", cfg.ReconcileRefundOrphanAfter))
shutdownManager.Register(shutdown.NewShutdownFunc("reconcile_hyperswitch_worker", func(ctx context.Context) error {
reconcileCancel()
return nil
}))
} else if cfg.ReconcileWorkerEnabled {
logger.Info("Reconcile worker skipped — Hyperswitch not enabled")
}
// v0.802: Start Cloud Backup Worker (copies cloud files to backup prefix every 24h)
if cfg.S3StorageService != nil {
backupWorker := services.NewCloudBackupWorker(db.GormDB, cfg.S3StorageService, logger)
backupCtx, backupCancel := context.WithCancel(context.Background())
go backupWorker.Start(backupCtx)
logger.Info("Cloud Backup Worker started (24h interval)")
shutdownManager.Register(shutdown.NewShutdownFunc("cloud_backup_worker", func(ctx context.Context) error {
backupCancel()
return nil
}))
}
// v0.802: Start Gear Warranty Notifier (sends notifications when warranty expires in 30 days)
notificationService := services.NewNotificationService(db, logger)
warrantyNotifier := services.NewGearWarrantyNotifier(db.GormDB, notificationService, logger)
warrantyCtx, warrantyCancel := context.WithCancel(context.Background())
go warrantyNotifier.Start(warrantyCtx)
logger.Info("Gear Warranty Notifier started (24h interval)")
shutdownManager.Register(shutdown.NewShutdownFunc("gear_warranty_notifier", func(ctx context.Context) error {
warrantyCancel()
return nil
}))
// v0.10.5 F552: Weekly notification digest (runs on Sunday)
if cfg.JobWorker != nil {
digestWorker := services.NewNotificationDigestWorker(db.GormDB, cfg.JobWorker, logger)
digestCtx, digestCancel := context.WithCancel(context.Background())
go digestWorker.Start(digestCtx)
logger.Info("Notification digest worker started (weekly on Sunday)")
shutdownManager.Register(shutdown.NewShutdownFunc("notification_digest_worker", func(ctx context.Context) error {
digestCancel()
return nil
}))
}
// v0.10.8 F065: Hard delete worker (GDPR - final anonymization after 30 days)
if os.Getenv("HARD_DELETE_CRON_ENABLED") != "false" {
// Optional ES client for the worker's RGPD cleanup (users/tracks/playlists indices).
// Non-fatal if ES is disabled or unreachable — the worker will just skip ES cleanup.
var hardDeleteESClient *vezaes.Client
if esCfg := vezaes.LoadConfig(); esCfg.Enabled {
if esc, esErr := vezaes.NewClient(esCfg, logger); esErr != nil {
logger.Warn("Elasticsearch unavailable for hard delete worker, ES cleanup disabled",
zap.Error(esErr))
} else {
hardDeleteESClient = esc
}
}
hardDeleteWorker := workers.NewHardDeleteWorker(db.GormDB, logger, 24*time.Hour).
WithRedis(cfg.RedisClient).
WithElasticsearch(hardDeleteESClient)
hardDeleteCtx, hardDeleteCancel := context.WithCancel(context.Background())
go hardDeleteWorker.Start(hardDeleteCtx)
logger.Info("Hard delete worker started (24h interval)",
zap.Bool("redis_cleanup", cfg.RedisClient != nil),
zap.Bool("es_cleanup", hardDeleteESClient != nil),
)
shutdownManager.Register(shutdown.NewShutdownFunc("hard_delete_worker", func(ctx context.Context) error {
hardDeleteWorker.Stop()
hardDeleteCancel()
return nil
}))
} else {
logger.Info("Hard delete worker disabled (HARD_DELETE_CRON_ENABLED=false)")
}
// Configuration du mode Gin
// Correction: Utilisation directe de la variable d'env car non exposée dans Config
appEnv := os.Getenv("APP_ENV")
if appEnv == "production" {
gin.SetMode(gin.ReleaseMode)
} else {
gin.SetMode(gin.DebugMode)
}
// Créer le router Gin
router := gin.New()
// SECURITY(HIGH-006): Restrict trusted proxies to prevent IP spoofing via X-Forwarded-For.
// Default: trust nothing (c.ClientIP() returns RemoteAddr only).
// Set TRUSTED_PROXIES="10.0.0.1,10.0.0.2" if behind a known reverse proxy/load balancer.
router.SetTrustedProxies(nil)
// Middleware globaux (Logger, Recovery) recommandés par ORIGIN
router.Use(gin.Logger(), gin.Recovery())
// Configuration des routes
apiRouter := api.NewAPIRouter(db, cfg) // Instantiate APIRouter
if err := apiRouter.Setup(router); err != nil {
logger.Error("Failed to setup API routes", zap.Error(err))
os.Exit(1)
}
// v1.0.4: Hourly cleanup of tracks stuck in `processing` whose upload file
// vanished (crash, SIGKILL, disk wipe). Keeps the tracks table honest.
jobs.ScheduleOrphanTracksCleanup(db, logger)
// v1.0.7 item E: daily sweep of hyperswitch_webhook_log rows older than
// HYPERSWITCH_WEBHOOK_LOG_RETENTION_DAYS (default 90). Batched so a large
// backlog doesn't lock the table.
jobs.ScheduleHyperswitchWebhookLogCleanup(db, logger, cfg.HyperswitchWebhookLogRetentionDays)
// Configuration du serveur HTTP
port := fmt.Sprintf("%d", cfg.AppPort)
if cfg.AppPort == 0 {
port = "8080"
}
server := &http.Server{
Addr: fmt.Sprintf(":%s", port),
Handler: router,
ReadTimeout: 30 * time.Second, // Standards ORIGIN
WriteTimeout: 30 * time.Second,
}
// BE-SVC-017: Enregistrer tous les services pour shutdown gracieux
// Enregistrer le serveur HTTP
shutdownManager.Register(shutdown.NewShutdownFunc("http_server", func(ctx context.Context) error {
return server.Shutdown(ctx)
}))
// Enregistrer la configuration (ferme DB, Redis, RabbitMQ, etc.)
shutdownManager.Register(shutdown.NewShutdownFunc("config", func(ctx context.Context) error {
return cfg.Close()
}))
// Enregistrer le logger pour flush final
shutdownManager.Register(shutdown.NewShutdownFunc("logger", func(ctx context.Context) error {
if logger != nil {
return logger.Sync()
}
return nil
}))
// Enregistrer Sentry pour flush final
if cfg.SentryDsn != "" {
shutdownManager.Register(shutdown.NewShutdownFunc("sentry", func(ctx context.Context) error {
sentry.Flush(2 * time.Second)
return nil
}))
}
// Gestion de l'arrêt gracieux
quit := make(chan os.Signal, 1)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
go func() {
logger.Info("🌐 Serveur HTTP démarré", zap.String("port", port))
if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
logger.Fatal("❌ Erreur du serveur HTTP", zap.Error(err))
}
}()
// Attendre le signal d'arrêt
<-quit
logger.Info("🔄 Signal d'arrêt reçu, démarrage du shutdown gracieux...")
// BE-SVC-017: Arrêt gracieux coordonné de tous les services
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 30*time.Second)
defer shutdownCancel()
if err := shutdownManager.Shutdown(shutdownCtx); err != nil {
logger.Error("❌ Erreur lors du shutdown gracieux", zap.Error(err))
} else {
logger.Info("✅ Shutdown gracieux terminé avec succès")
}
}