veza/veza-backend-api/internal/api/routes_webhooks.go
senke 3c4d0148be feat(webhooks): persist raw hyperswitch payloads to audit log — v1.0.7 item E
Every POST /webhooks/hyperswitch delivery now writes a row to
`hyperswitch_webhook_log` regardless of signature-valid or
processing outcome. Captures both legitimate deliveries and attack
probes — a forensics query now has the actual bytes to read, not
just a "webhook rejected" log line. Disputes (axis-1 P1.6) ride
along: the log captures dispute.* events alongside payment and
refund events, ready for when disputes get a handler.

Table shape (migration 984):
  * payload TEXT — readable in psql, invalid UTF-8 replaced with
    empty (forensics value is in headers + ip + timing for those
    attacks, not the binary body).
  * signature_valid BOOLEAN + partial index for "show me attack
    attempts" being instantaneous.
  * processing_result TEXT — 'ok' / 'error: <msg>' /
    'signature_invalid' / 'skipped'. Matches the P1.5 action
    semantic exactly.
  * source_ip, user_agent, request_id — forensics essentials.
    request_id is captured from Hyperswitch's X-Request-Id header
    when present, else a server-side UUID so every row correlates
    to VEZA's structured logs.
  * event_type — best-effort extract from the JSON payload, NULL
    on malformed input.

Hardening:
  * 64KB body cap via io.LimitReader rejects oversize with 413
    before any INSERT — prevents log-spam DoS.
  * Single INSERT per delivery with final state; no two-phase
    update race on signature-failure path. signature_invalid and
    processing-error rows both land.
  * DB persistence failures are logged but swallowed — the
    endpoint's contract is to ack Hyperswitch, not perfect audit.

Retention sweep:
  * CleanupHyperswitchWebhookLog in internal/jobs, daily tick,
    batched DELETE (10k rows + 100ms pause) so a large backlog
    doesn't lock the table.
  * HYPERSWITCH_WEBHOOK_LOG_RETENTION_DAYS (default 90).
  * Same goroutine-ticker pattern as ScheduleOrphanTracksCleanup.
  * Wired in cmd/api/main.go alongside the existing cleanup jobs.

Tests: 5 in webhook_log_test.go (persistence, request_id auto-gen,
invalid-JSON leaves event_type empty, invalid-signature capture,
extractEventType 5 sub-cases) + 4 in cleanup_hyperswitch_webhook_
log_test.go (deletes-older-than, noop, default-on-zero,
context-cancel). Migration 984 applied cleanly to local Postgres;
all indexes present.

Also (v107-plan.md):
  * Item G acceptance gains an explicit Idempotency-Key threading
    requirement with an empty-key loud-fail test — "literally
    copy-paste D's 4-line test skeleton". Closes the risk that
    item G silently reopens the HTTP-retry duplicate-charge
    exposure D closed.

Out of scope for E (noted in CHANGELOG):
  * Rate limit on the endpoint — pre-existing middleware covers
    it at the router level; adding a per-endpoint limit is
    separate scope.
  * Readable-payload SQL view — deferred, the TEXT column is
    already human-readable; a convenience view is a nice-to-have
    not a ship-blocker.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-18 02:44:58 +02:00

205 lines
7.7 KiB
Go

package api
import (
"context"
"encoding/json"
"io"
"net/http"
"unicode/utf8"
"github.com/gin-gonic/gin"
"go.uber.org/zap"
"veza-backend-api/internal/core/marketplace"
"veza-backend-api/internal/handlers"
"veza-backend-api/internal/response"
"veza-backend-api/internal/services"
"veza-backend-api/internal/services/hyperswitch"
"veza-backend-api/internal/workers"
)
// setupWebhookRoutes configure les routes pour les webhooks
func (r *APIRouter) setupWebhookRoutes(router *gin.RouterGroup) {
webhookService := services.NewWebhookService(r.db.GormDB, r.logger, r.config.JWTSecret)
webhookWorker := workers.NewWebhookWorker(
r.db.GormDB,
webhookService,
r.logger,
100,
5,
3,
)
go webhookWorker.Start(context.Background())
webhookHandler := handlers.NewWebhookHandler(webhookService, webhookWorker, r.logger)
webhooks := router.Group("/webhooks")
{
// Hyperswitch payment webhook - PUBLIC (no auth), called by Hyperswitch
webhooks.POST("/hyperswitch", r.hyperswitchWebhookHandler())
if r.config.AuthMiddleware != nil {
protected := webhooks.Group("")
protected.Use(r.config.AuthMiddleware.RequireAuth())
r.applyCSRFProtection(protected)
protected.POST("", webhookHandler.RegisterWebhook())
protected.GET("", webhookHandler.ListWebhooks())
protected.DELETE("/:id", webhookHandler.DeleteWebhook())
protected.GET("/stats", webhookHandler.GetWebhookStats())
protected.POST("/:id/test", webhookHandler.TestWebhook())
protected.POST("/:id/regenerate-key", webhookHandler.RegenerateAPIKey())
}
}
}
// hyperswitchWebhookHandler handles POST /webhooks/hyperswitch from Hyperswitch.
//
// v1.0.7 item E: every delivery is persisted to hyperswitch_webhook_log,
// including deliveries that fail signature verification or processing
// — the log is the forensic audit trail for both legitimate activity
// and attack probes. A 64KB body cap protects the table from log-spam
// DoS (rejected with 413 before any INSERT).
func (r *APIRouter) hyperswitchWebhookHandler() gin.HandlerFunc {
marketService := r.getMarketplaceService()
webhookSecret := r.config.HyperswitchWebhookSecret
return func(c *gin.Context) {
// v1.0.7 item E: cap the body read at MaxWebhookPayloadBytes+1
// so we can detect oversize without loading arbitrary megabytes
// into memory. An attacker posting 10MB gets 413 back; the log
// table never sees the row.
limited := io.LimitReader(c.Request.Body, hyperswitch.MaxWebhookPayloadBytes+1)
body, err := io.ReadAll(limited)
if err != nil {
r.logger.Error("Hyperswitch webhook: failed to read body", zap.Error(err))
response.InternalServerError(c, "Failed to read webhook body")
return
}
if len(body) > hyperswitch.MaxWebhookPayloadBytes {
r.logger.Warn("Hyperswitch webhook: payload exceeds size cap, rejecting",
zap.Int("size_bytes", len(body)),
zap.Int("max_bytes", hyperswitch.MaxWebhookPayloadBytes))
c.AbortWithStatusJSON(http.StatusRequestEntityTooLarge, gin.H{
"error": "payload too large",
"limit": hyperswitch.MaxWebhookPayloadBytes,
})
return
}
// Payload as TEXT: Hyperswitch sends JSON, but a probe/attack
// might send anything. Reject invalid UTF-8 outright — the
// attack-forensics value is in headers + ip + timing, not the
// binary body.
payload := string(body)
if !utf8.ValidString(payload) {
r.logger.Warn("Hyperswitch webhook: payload contains invalid UTF-8, replacing with empty for log")
payload = ""
}
// Collect forensics context regardless of outcome.
requestID := c.GetHeader("X-Request-Id")
if requestID == "" {
requestID = c.GetHeader("X-Request-ID")
}
sigHeader := c.GetHeader("x-webhook-signature-512")
logRow := &hyperswitch.WebhookLog{
Payload: payload,
SignatureHeader: sigHeader,
SourceIP: c.ClientIP(),
UserAgent: c.GetHeader("User-Agent"),
RequestID: requestID, // filled with UUID by LogWebhook if empty
}
// Signature verification. Regardless of outcome, the row lands
// with signature_valid set accordingly.
if webhookSecret == "" {
r.logger.Error("Hyperswitch webhook: HYPERSWITCH_WEBHOOK_SECRET not configured, rejecting webhook")
logRow.SignatureValid = false
logRow.ProcessingResult = "error: webhook secret not configured"
r.persistWebhookLog(c.Request.Context(), logRow)
response.InternalServerError(c, "Webhook secret not configured")
return
}
if err := hyperswitch.VerifyWebhookSignature(body, sigHeader, webhookSecret); err != nil {
r.logger.Warn("Hyperswitch webhook: signature verification failed",
zap.String("source_ip", logRow.SourceIP),
zap.Error(err))
logRow.SignatureValid = false
logRow.ProcessingResult = "signature_invalid"
r.persistWebhookLog(c.Request.Context(), logRow)
response.Unauthorized(c, "Invalid webhook signature")
return
}
logRow.SignatureValid = true
// v1.0.6: dispatch refund events to ProcessRefundWebhook. Payment
// events keep flowing through ProcessPaymentWebhook unchanged.
var peek marketplace.HyperswitchWebhookPayload
if err := json.Unmarshal(body, &peek); err != nil {
r.logger.Warn("Hyperswitch webhook: payload not JSON — dispatching as payment",
zap.Error(err))
}
var procErr error
if peek.IsRefundEvent() {
procErr = marketService.ProcessRefundWebhook(c.Request.Context(), body)
} else {
procErr = marketService.ProcessPaymentWebhook(c.Request.Context(), body)
}
if procErr != nil {
r.logger.Error("Hyperswitch webhook: processing failed",
zap.Bool("is_refund_event", peek.IsRefundEvent()),
zap.Error(procErr))
logRow.ProcessingResult = "error: " + procErr.Error()
r.persistWebhookLog(c.Request.Context(), logRow)
response.InternalServerError(c, "Webhook processing failed")
return
}
logRow.ProcessingResult = "ok"
r.persistWebhookLog(c.Request.Context(), logRow)
response.Success(c, gin.H{"received": true})
}
}
// persistWebhookLog writes the audit row. Any DB failure is logged and
// swallowed — the endpoint's primary contract is to ack Hyperswitch,
// not to persist audit perfectly. A persistent storage failure is
// operationally visible via this log line.
func (r *APIRouter) persistWebhookLog(ctx context.Context, row *hyperswitch.WebhookLog) {
if r.db == nil || r.db.GormDB == nil {
return
}
if err := hyperswitch.LogWebhook(ctx, r.db.GormDB, row); err != nil {
r.logger.Error("Hyperswitch webhook: failed to persist audit log row",
zap.String("request_id", row.RequestID),
zap.Bool("signature_valid", row.SignatureValid),
zap.Error(err))
}
}
// getMarketplaceService returns the marketplace service with Hyperswitch wiring.
// Used by webhook handler; mirrors setupMarketplaceRoutes service creation.
func (r *APIRouter) getMarketplaceService() marketplace.MarketplaceService {
if r.config.MarketplaceServiceOverride != nil {
return r.config.MarketplaceServiceOverride.(marketplace.MarketplaceService)
}
uploadDir := r.config.UploadDir
if uploadDir == "" {
uploadDir = "uploads/tracks"
}
storageService := services.NewTrackStorageService(uploadDir, false, r.logger)
opts := []marketplace.ServiceOption{}
if r.config.HyperswitchEnabled && r.config.HyperswitchAPIKey != "" && r.config.HyperswitchURL != "" {
hsClient := hyperswitch.NewClient(r.config.HyperswitchURL, r.config.HyperswitchAPIKey)
hsProvider := hyperswitch.NewProvider(hsClient)
opts = append(opts,
marketplace.WithPaymentProvider(hsProvider),
marketplace.WithHyperswitchConfig(true, r.config.CheckoutSuccessURL),
)
}
if r.config.StripeConnectEnabled && r.config.StripeConnectSecretKey != "" {
scs := services.NewStripeConnectService(r.db.GormDB, r.config.StripeConnectSecretKey, r.logger)
opts = append(opts, marketplace.WithTransferService(scs, r.config.PlatformFeeRate))
}
return marketplace.NewService(r.db.GormDB, r.logger, storageService, opts...)
}