feat(webhooks): persist raw hyperswitch payloads to audit log — v1.0.7 item E

Every POST /webhooks/hyperswitch delivery now writes a row to
`hyperswitch_webhook_log` regardless of signature-valid or
processing outcome. Captures both legitimate deliveries and attack
probes — a forensics query now has the actual bytes to read, not
just a "webhook rejected" log line. Disputes (axis-1 P1.6) ride
along: the log captures dispute.* events alongside payment and
refund events, ready for when disputes get a handler.

Table shape (migration 984):
  * payload TEXT — readable in psql, invalid UTF-8 replaced with
    empty (forensics value is in headers + ip + timing for those
    attacks, not the binary body).
  * signature_valid BOOLEAN + partial index for "show me attack
    attempts" being instantaneous.
  * processing_result TEXT — 'ok' / 'error: <msg>' /
    'signature_invalid' / 'skipped'. Matches the P1.5 action
    semantic exactly.
  * source_ip, user_agent, request_id — forensics essentials.
    request_id is captured from Hyperswitch's X-Request-Id header
    when present, else a server-side UUID so every row correlates
    to VEZA's structured logs.
  * event_type — best-effort extract from the JSON payload, NULL
    on malformed input.

Hardening:
  * 64KB body cap via io.LimitReader rejects oversize with 413
    before any INSERT — prevents log-spam DoS.
  * Single INSERT per delivery with final state; no two-phase
    update race on signature-failure path. signature_invalid and
    processing-error rows both land.
  * DB persistence failures are logged but swallowed — the
    endpoint's contract is to ack Hyperswitch, not perfect audit.

Retention sweep:
  * CleanupHyperswitchWebhookLog in internal/jobs, daily tick,
    batched DELETE (10k rows + 100ms pause) so a large backlog
    doesn't lock the table.
  * HYPERSWITCH_WEBHOOK_LOG_RETENTION_DAYS (default 90).
  * Same goroutine-ticker pattern as ScheduleOrphanTracksCleanup.
  * Wired in cmd/api/main.go alongside the existing cleanup jobs.

Tests: 5 in webhook_log_test.go (persistence, request_id auto-gen,
invalid-JSON leaves event_type empty, invalid-signature capture,
extractEventType 5 sub-cases) + 4 in cleanup_hyperswitch_webhook_
log_test.go (deletes-older-than, noop, default-on-zero,
context-cancel). Migration 984 applied cleanly to local Postgres;
all indexes present.

Also (v107-plan.md):
  * Item G acceptance gains an explicit Idempotency-Key threading
    requirement with an empty-key loud-fail test — "literally
    copy-paste D's 4-line test skeleton". Closes the risk that
    item G silently reopens the HTTP-retry duplicate-charge
    exposure D closed.

Out of scope for E (noted in CHANGELOG):
  * Rate limit on the endpoint — pre-existing middleware covers
    it at the router level; adding a per-endpoint limit is
    separate scope.
  * Readable-payload SQL view — deferred, the TEXT column is
    already human-readable; a convenience view is a nice-to-have
    not a ship-blocker.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
senke 2026-04-18 02:44:58 +02:00
parent 3cd82ba5be
commit 3c4d0148be
12 changed files with 676 additions and 4 deletions

View file

@ -38,6 +38,66 @@ auto-reversed; the backfill CLI queries Stripe's transfers.List by
metadata[order_id] to populate missing ids, acceptable to leave NULL
per v107-plan.
### Item E — webhook raw-payload audit log
Every POST /webhooks/hyperswitch delivery is now persisted to
`hyperswitch_webhook_log` regardless of signature-valid or processing
outcome. Captures both legitimate deliveries and attack probes — a
forensics query "what did we actually receive from this IP last
Tuesday" now has the actual bytes to read, not just "webhook rejected:
invalid signature" in a grep-able log line.
Table shape (migration 984):
* `payload TEXT` — Hyperswitch sends JSON, TEXT is readable in psql
without base64-decoding. Invalid UTF-8 replaced with empty string
before INSERT (forensics value of a binary blob is zero vs. the
headers+ip+timing we keep regardless).
* `signature_valid BOOLEAN` — partial index on `WHERE
signature_valid = false` makes "show me attack attempts" queries
instantaneous.
* `processing_result TEXT` — 'ok', 'error: <msg>',
'signature_invalid', or 'skipped'. Matches the action semantic
exactly.
* `source_ip`, `user_agent`, `request_id` — forensics essentials.
request_id is captured from Hyperswitch's `X-Request-Id` header
if sent, else a UUID generated server-side so every row is
correlatable to VEZA's structured logs.
* `event_type` — best-effort extract from the JSON payload. NULL
when the payload isn't valid JSON or doesn't carry an event_type
field. Useful for "how many dispute.* events have we seen this
month" without needing a dispute handler implemented yet (the
log captures disputes alongside everything else, ready for
axis-1 P1.6 when it lands).
Hardening:
* 64KB body cap (via `io.LimitReader`) rejects oversize payloads
with 413 before any INSERT — prevents log-spam DoS.
* INSERT-once-at-end-with-final-state pattern: one row per
delivery, no two-phase update risk. Signature-invalid and
processing-error rows both land.
* DB persistence failures are logged but never fail the webhook
response — the endpoint's primary contract is acking Hyperswitch.
Retention sweep (CleanupHyperswitchWebhookLog in internal/jobs):
* Daily tick, batched DELETE (10k rows per batch with 100ms pause
between) so a large backlog doesn't lock the table.
* Retention configurable via
`HYPERSWITCH_WEBHOOK_LOG_RETENTION_DAYS` (default 90).
* Uses the same goroutine-ticker pattern as
ScheduleOrphanTracksCleanup / ScheduleSessionCleanup.
Tests:
* 5 tests in `internal/services/hyperswitch/webhook_log_test.go`:
minimal-field persistence, request_id auto-generation on empty
input, invalid-JSON leaves event_type empty, invalid-signature
rows are captured (forensics assert), extractEventType variants
(5 sub-cases).
* 4 tests in
`internal/jobs/cleanup_hyperswitch_webhook_log_test.go`:
deletes-older-than-retention, noop-when-nothing-expired,
default-retention-on-zero, context-cancellation-respected.
### Item D — Idempotency-Key on CreatePayment / CreateRefund
The Hyperswitch client now sends an `Idempotency-Key` HTTP header on

View file

@ -273,6 +273,16 @@ Acceptance:
- Subscribe with provider misconfigured → 503, no row created.
- Migration of v1.0.6.2 voided rows — check `voided_subscriptions_20260417`
entries stay readable and not re-pickable by the new flow.
- **Idempotency-Key threading (inherited from item D)**: the
new Hyperswitch-backed subscription payment provider MUST
accept an explicit `idempotencyKey` parameter and send it as
the `Idempotency-Key` HTTP header, using `subscription.ID`
(UUID) as the key. An empty-key loud-fail test is required —
same pattern as D's `TestClient_CreatePayment_RejectsEmpty
IdempotencyKey`, literally copy-paste the 4-line test
skeleton with `CreateSubscriptionPayment` substituted for
`CreateRefund`. Without this check, item G silently reopens
the HTTP-retry duplicate-charge exposure that D closed.
- **E2E Playwright @critical**: `POST /subscribe` followed by
`POST /distribution/submit` asserts 403 with the "complete
payment" message until the payment webhook fires. Today's

View file

@ -116,6 +116,13 @@ STRIPE_CONNECT_WEBHOOK_SECRET=
# REVERSAL_BACKOFF_BASE=1m
# REVERSAL_BACKOFF_MAX=1h
# --- HYPERSWITCH WEBHOOK LOG (v1.0.7 item E) ---
# Every webhook hitting POST /webhooks/hyperswitch is persisted to
# hyperswitch_webhook_log regardless of signature-valid / processing
# outcome — captures attack attempts alongside legitimate traffic for
# forensics. A daily sweep deletes rows older than this many days.
# HYPERSWITCH_WEBHOOK_LOG_RETENTION_DAYS=90
# --- EXTERNAL SERVICES (OPTIONAL) ---
STREAM_SERVER_URL=http://veza.fr:8082
# Must match stream server INTERNAL_API_KEY for /internal/jobs/transcode (P1.1.2)

View file

@ -306,6 +306,11 @@ func main() {
// vanished (crash, SIGKILL, disk wipe). Keeps the tracks table honest.
jobs.ScheduleOrphanTracksCleanup(db, logger)
// v1.0.7 item E: daily sweep of hyperswitch_webhook_log rows older than
// HYPERSWITCH_WEBHOOK_LOG_RETENTION_DAYS (default 90). Batched so a large
// backlog doesn't lock the table.
jobs.ScheduleHyperswitchWebhookLogCleanup(db, logger, cfg.HyperswitchWebhookLogRetentionDays)
// Configuration du serveur HTTP
port := fmt.Sprintf("%d", cfg.AppPort)
if cfg.AppPort == 0 {

View file

@ -4,6 +4,8 @@ import (
"context"
"encoding/json"
"io"
"net/http"
"unicode/utf8"
"github.com/gin-gonic/gin"
"go.uber.org/zap"
@ -53,27 +55,84 @@ func (r *APIRouter) setupWebhookRoutes(router *gin.RouterGroup) {
}
// hyperswitchWebhookHandler handles POST /webhooks/hyperswitch from Hyperswitch.
//
// v1.0.7 item E: every delivery is persisted to hyperswitch_webhook_log,
// including deliveries that fail signature verification or processing
// — the log is the forensic audit trail for both legitimate activity
// and attack probes. A 64KB body cap protects the table from log-spam
// DoS (rejected with 413 before any INSERT).
func (r *APIRouter) hyperswitchWebhookHandler() gin.HandlerFunc {
marketService := r.getMarketplaceService()
webhookSecret := r.config.HyperswitchWebhookSecret
return func(c *gin.Context) {
body, err := io.ReadAll(c.Request.Body)
// v1.0.7 item E: cap the body read at MaxWebhookPayloadBytes+1
// so we can detect oversize without loading arbitrary megabytes
// into memory. An attacker posting 10MB gets 413 back; the log
// table never sees the row.
limited := io.LimitReader(c.Request.Body, hyperswitch.MaxWebhookPayloadBytes+1)
body, err := io.ReadAll(limited)
if err != nil {
r.logger.Error("Hyperswitch webhook: failed to read body", zap.Error(err))
response.InternalServerError(c, "Failed to read webhook body")
return
}
if len(body) > hyperswitch.MaxWebhookPayloadBytes {
r.logger.Warn("Hyperswitch webhook: payload exceeds size cap, rejecting",
zap.Int("size_bytes", len(body)),
zap.Int("max_bytes", hyperswitch.MaxWebhookPayloadBytes))
c.AbortWithStatusJSON(http.StatusRequestEntityTooLarge, gin.H{
"error": "payload too large",
"limit": hyperswitch.MaxWebhookPayloadBytes,
})
return
}
// Payload as TEXT: Hyperswitch sends JSON, but a probe/attack
// might send anything. Reject invalid UTF-8 outright — the
// attack-forensics value is in headers + ip + timing, not the
// binary body.
payload := string(body)
if !utf8.ValidString(payload) {
r.logger.Warn("Hyperswitch webhook: payload contains invalid UTF-8, replacing with empty for log")
payload = ""
}
// Collect forensics context regardless of outcome.
requestID := c.GetHeader("X-Request-Id")
if requestID == "" {
requestID = c.GetHeader("X-Request-ID")
}
sigHeader := c.GetHeader("x-webhook-signature-512")
logRow := &hyperswitch.WebhookLog{
Payload: payload,
SignatureHeader: sigHeader,
SourceIP: c.ClientIP(),
UserAgent: c.GetHeader("User-Agent"),
RequestID: requestID, // filled with UUID by LogWebhook if empty
}
// Signature verification. Regardless of outcome, the row lands
// with signature_valid set accordingly.
if webhookSecret == "" {
r.logger.Error("Hyperswitch webhook: HYPERSWITCH_WEBHOOK_SECRET not configured, rejecting webhook")
logRow.SignatureValid = false
logRow.ProcessingResult = "error: webhook secret not configured"
r.persistWebhookLog(c.Request.Context(), logRow)
response.InternalServerError(c, "Webhook secret not configured")
return
}
sig := c.GetHeader("x-webhook-signature-512")
if err := hyperswitch.VerifyWebhookSignature(body, sig, webhookSecret); err != nil {
r.logger.Warn("Hyperswitch webhook: signature verification failed", zap.Error(err))
if err := hyperswitch.VerifyWebhookSignature(body, sigHeader, webhookSecret); err != nil {
r.logger.Warn("Hyperswitch webhook: signature verification failed",
zap.String("source_ip", logRow.SourceIP),
zap.Error(err))
logRow.SignatureValid = false
logRow.ProcessingResult = "signature_invalid"
r.persistWebhookLog(c.Request.Context(), logRow)
response.Unauthorized(c, "Invalid webhook signature")
return
}
logRow.SignatureValid = true
// v1.0.6: dispatch refund events to ProcessRefundWebhook. Payment
// events keep flowing through ProcessPaymentWebhook unchanged.
var peek marketplace.HyperswitchWebhookPayload
@ -91,13 +150,33 @@ func (r *APIRouter) hyperswitchWebhookHandler() gin.HandlerFunc {
r.logger.Error("Hyperswitch webhook: processing failed",
zap.Bool("is_refund_event", peek.IsRefundEvent()),
zap.Error(procErr))
logRow.ProcessingResult = "error: " + procErr.Error()
r.persistWebhookLog(c.Request.Context(), logRow)
response.InternalServerError(c, "Webhook processing failed")
return
}
logRow.ProcessingResult = "ok"
r.persistWebhookLog(c.Request.Context(), logRow)
response.Success(c, gin.H{"received": true})
}
}
// persistWebhookLog writes the audit row. Any DB failure is logged and
// swallowed — the endpoint's primary contract is to ack Hyperswitch,
// not to persist audit perfectly. A persistent storage failure is
// operationally visible via this log line.
func (r *APIRouter) persistWebhookLog(ctx context.Context, row *hyperswitch.WebhookLog) {
if r.db == nil || r.db.GormDB == nil {
return
}
if err := hyperswitch.LogWebhook(ctx, r.db.GormDB, row); err != nil {
r.logger.Error("Hyperswitch webhook: failed to persist audit log row",
zap.String("request_id", row.RequestID),
zap.Bool("signature_valid", row.SignatureValid),
zap.Error(err))
}
}
// getMarketplaceService returns the marketplace service with Hyperswitch wiring.
// Used by webhook handler; mirrors setupMarketplaceRoutes service creation.
func (r *APIRouter) getMarketplaceService() marketplace.MarketplaceService {

View file

@ -176,6 +176,9 @@ type Config struct {
ReversalBackoffBase time.Duration // REVERSAL_BACKOFF_BASE (default 1m)
ReversalBackoffMax time.Duration // REVERSAL_BACKOFF_MAX (default 1h)
// Hyperswitch webhook log retention (v1.0.7 item E).
HyperswitchWebhookLogRetentionDays int // HYPERSWITCH_WEBHOOK_LOG_RETENTION_DAYS (default 90)
// Email & Jobs
EmailSender *email.SMTPEmailSender
JobWorker *workers.JobWorker
@ -416,6 +419,9 @@ func NewConfig() (*Config, error) {
ReversalBackoffBase: getEnvDuration("REVERSAL_BACKOFF_BASE", time.Minute),
ReversalBackoffMax: getEnvDuration("REVERSAL_BACKOFF_MAX", time.Hour),
// Webhook audit log retention (v1.0.7 item E)
HyperswitchWebhookLogRetentionDays: getEnvInt("HYPERSWITCH_WEBHOOK_LOG_RETENTION_DAYS", 90),
// Log Files Configuration — centralized in config/logging.toml
// Resolved via logging.LoadConfig() with env var overrides (LOG_DIR, LOG_LEVEL)
LogDir: logging.LoadConfig().ResolveLogDir(env),

View file

@ -0,0 +1,106 @@
package jobs
import (
"context"
"fmt"
"time"
"go.uber.org/zap"
"veza-backend-api/internal/database"
)
// DefaultWebhookLogRetentionDays is the age threshold beyond which
// hyperswitch_webhook_log rows are eligible for deletion. Tuned to
// "long enough that a customer dispute surfaced 60 days later can
// still find the relevant webhook, not so long that the table
// accumulates indefinitely". Override via
// HYPERSWITCH_WEBHOOK_LOG_RETENTION_DAYS.
const DefaultWebhookLogRetentionDays = 90
// webhookLogDeleteBatchSize caps each DELETE's rows-affected so a
// one-shot sweep against a 10M-row backlog doesn't lock the table
// or bloat WAL. The sweep keeps calling DELETE in a loop until a
// batch comes back empty.
const webhookLogDeleteBatchSize = 10000
// CleanupHyperswitchWebhookLog deletes rows older than retentionDays
// in batches. Safe to run repeatedly; each batch is bounded so a
// concurrent writer is not blocked for long.
//
// Returns the total count deleted across all batches this invocation.
func CleanupHyperswitchWebhookLog(ctx context.Context, db *database.Database, logger *zap.Logger, retentionDays int) (int64, error) {
if db == nil || db.GormDB == nil {
return 0, nil
}
if retentionDays <= 0 {
retentionDays = DefaultWebhookLogRetentionDays
}
cutoff := time.Now().Add(-time.Duration(retentionDays) * 24 * time.Hour)
var total int64
for {
// PostgreSQL-specific: DELETE with subquery + LIMIT to bound
// each batch. The subquery selects a bounded set of ids; the
// outer DELETE removes those exact rows. Works on SQLite too
// since the subquery syntax is portable.
result := db.GormDB.WithContext(ctx).Exec(
`DELETE FROM hyperswitch_webhook_log
WHERE id IN (
SELECT id FROM hyperswitch_webhook_log
WHERE received_at < ?
ORDER BY received_at ASC
LIMIT ?
)`,
cutoff, webhookLogDeleteBatchSize)
if result.Error != nil {
return total, fmt.Errorf("webhook log cleanup: batch delete: %w", result.Error)
}
if result.RowsAffected == 0 {
break
}
total += result.RowsAffected
logger.Debug("Hyperswitch webhook log cleanup: batch deleted",
zap.Int64("batch_rows", result.RowsAffected),
zap.Int64("total_so_far", total),
zap.Time("cutoff", cutoff))
// Let a concurrent writer breathe between batches if the
// backlog is large. Cheap in the common case where one batch
// empties the table.
select {
case <-ctx.Done():
return total, ctx.Err()
case <-time.After(100 * time.Millisecond):
}
}
if total > 0 {
logger.Info("Hyperswitch webhook log cleanup complete",
zap.Int64("rows_deleted", total),
zap.Int("retention_days", retentionDays),
zap.Time("cutoff", cutoff))
}
return total, nil
}
// ScheduleHyperswitchWebhookLogCleanup runs the cleanup once at
// startup and then daily thereafter. Same goroutine-ticker pattern
// as ScheduleOrphanTracksCleanup.
func ScheduleHyperswitchWebhookLogCleanup(db *database.Database, logger *zap.Logger, retentionDays int) {
ticker := time.NewTicker(24 * time.Hour)
go func() {
ctx := context.Background()
if _, err := CleanupHyperswitchWebhookLog(ctx, db, logger, retentionDays); err != nil {
logger.Error("Initial hyperswitch webhook log cleanup failed", zap.Error(err))
}
for range ticker.C {
if _, err := CleanupHyperswitchWebhookLog(ctx, db, logger, retentionDays); err != nil {
logger.Error("Scheduled hyperswitch webhook log cleanup failed", zap.Error(err))
}
}
}()
logger.Info("Hyperswitch webhook log cleanup scheduled to run daily",
zap.Int("retention_days", retentionDays))
}

View file

@ -0,0 +1,120 @@
package jobs
import (
"context"
"testing"
"time"
"github.com/google/uuid"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"gorm.io/driver/sqlite"
"gorm.io/gorm"
"veza-backend-api/internal/database"
"veza-backend-api/internal/services/hyperswitch"
)
func setupCleanupTestDB(t *testing.T) *database.Database {
t.Helper()
gdb, err := gorm.Open(sqlite.Open(":memory:"), &gorm.Config{})
require.NoError(t, err)
require.NoError(t, gdb.AutoMigrate(&hyperswitch.WebhookLog{}))
return &database.Database{GormDB: gdb}
}
// seedWebhookLogRow plants a row with a specific received_at so the
// cleanup cutoff logic can be exercised.
func seedWebhookLogRow(t *testing.T, db *gorm.DB, receivedAt time.Time) {
t.Helper()
row := &hyperswitch.WebhookLog{
ID: uuid.New(),
Payload: `{"event_type":"test"}`,
SignatureValid: true,
ProcessingResult: "ok",
RequestID: uuid.New().String(),
}
require.NoError(t, db.Create(row).Error)
// Override received_at post-Create because autoCreateTime sets NOW().
require.NoError(t, db.Model(row).Update("received_at", receivedAt).Error)
}
func TestCleanupHyperswitchWebhookLog_DeletesOlderThanRetention(t *testing.T) {
db := setupCleanupTestDB(t)
logger := zap.NewNop()
now := time.Now()
// Seed: 3 old rows (100 days ago), 2 recent (1 day ago).
for i := 0; i < 3; i++ {
seedWebhookLogRow(t, db.GormDB, now.Add(-100*24*time.Hour))
}
for i := 0; i < 2; i++ {
seedWebhookLogRow(t, db.GormDB, now.Add(-24*time.Hour))
}
deleted, err := CleanupHyperswitchWebhookLog(context.Background(), db, logger, 90)
require.NoError(t, err)
assert.Equal(t, int64(3), deleted)
var remaining int64
require.NoError(t, db.GormDB.Model(&hyperswitch.WebhookLog{}).Count(&remaining).Error)
assert.Equal(t, int64(2), remaining, "recent rows must be preserved")
}
func TestCleanupHyperswitchWebhookLog_NoopWhenNothingExpired(t *testing.T) {
db := setupCleanupTestDB(t)
logger := zap.NewNop()
// All rows well within retention.
for i := 0; i < 5; i++ {
seedWebhookLogRow(t, db.GormDB, time.Now().Add(-1*time.Hour))
}
deleted, err := CleanupHyperswitchWebhookLog(context.Background(), db, logger, 90)
require.NoError(t, err)
assert.Equal(t, int64(0), deleted)
var count int64
require.NoError(t, db.GormDB.Model(&hyperswitch.WebhookLog{}).Count(&count).Error)
assert.Equal(t, int64(5), count)
}
func TestCleanupHyperswitchWebhookLog_DefaultRetentionOnZero(t *testing.T) {
db := setupCleanupTestDB(t)
logger := zap.NewNop()
// Row at 100 days old: deletable under default (90d), retained
// under 365d.
seedWebhookLogRow(t, db.GormDB, time.Now().Add(-100*24*time.Hour))
// Passing 0 or negative must route through the default (90).
deleted, err := CleanupHyperswitchWebhookLog(context.Background(), db, logger, 0)
require.NoError(t, err)
assert.Equal(t, int64(1), deleted)
}
func TestCleanupHyperswitchWebhookLog_RespectsCtxCancellation(t *testing.T) {
db := setupCleanupTestDB(t)
logger := zap.NewNop()
// Plant enough rows to force at least one inter-batch pause.
// batchSize=10000, so one batch handles 10k; plant 10,001 to need 2.
// For test speed keep small and exercise only the cancel path via
// a pre-cancelled context — that covers the ctx.Done() branch in
// the inter-batch select.
for i := 0; i < 12; i++ {
seedWebhookLogRow(t, db.GormDB, time.Now().Add(-100*24*time.Hour))
}
ctx, cancel := context.WithCancel(context.Background())
cancel() // pre-cancelled
_, err := CleanupHyperswitchWebhookLog(ctx, db, logger, 90)
// Either ctx.Err() from the select or context.Canceled wrapped
// in the batch delete — both are acceptable. What matters is
// the function doesn't hang.
if err != nil {
assert.Contains(t, err.Error(), "context", "error must mention context cancellation")
}
}

View file

@ -0,0 +1,83 @@
package hyperswitch
import (
"context"
"encoding/json"
"time"
"github.com/google/uuid"
"gorm.io/gorm"
)
// MaxWebhookPayloadBytes caps the body size the handler accepts before
// persisting. Hyperswitch's own payloads are in the low-KB range; 64KB
// is generous for legitimate traffic and small enough to prevent a log-
// spam DoS where an attacker POSTs megabytes of random bytes to
// consume disk via the webhook_log table. Bodies larger than this get
// rejected with 413 before INSERT — the table stays clean.
const MaxWebhookPayloadBytes = 64 * 1024
// WebhookLog mirrors the hyperswitch_webhook_log table. Written once
// per webhook delivery (even on signature failure or oversize) so the
// forensics trail captures attack attempts alongside legitimate
// traffic.
type WebhookLog struct {
ID uuid.UUID `gorm:"type:uuid;primaryKey" json:"id"`
ReceivedAt time.Time `gorm:"autoCreateTime;column:received_at" json:"received_at"`
Payload string `gorm:"type:text;column:payload" json:"payload"`
SignatureValid bool `gorm:"column:signature_valid" json:"signature_valid"`
SignatureHeader string `gorm:"column:signature_header" json:"signature_header,omitempty"`
ProcessingResult string `gorm:"column:processing_result;type:text" json:"processing_result"`
EventType string `gorm:"column:event_type" json:"event_type,omitempty"`
SourceIP string `gorm:"column:source_ip" json:"source_ip,omitempty"`
UserAgent string `gorm:"column:user_agent" json:"user_agent,omitempty"`
RequestID string `gorm:"column:request_id" json:"request_id"`
}
// TableName pins the table name for GORM — the struct would otherwise
// pluralize to `webhook_logs`.
func (WebhookLog) TableName() string { return "hyperswitch_webhook_log" }
// BeforeCreate populates the UUID if the caller left it zero.
func (w *WebhookLog) BeforeCreate(tx *gorm.DB) error {
if w.ID == uuid.Nil {
w.ID = uuid.New()
}
return nil
}
// LogWebhook inserts a single audit row for a webhook delivery.
// Intended for one-shot use from the HTTP handler; any failure here
// is logged by the caller but never fails the webhook response — the
// primary job of the endpoint is to ack Hyperswitch, not to persist
// audit perfectly.
//
// event_type is extracted from the payload on a best-effort basis: if
// the JSON parses and carries an event_type field, we capture it; if
// not (malformed payload, attack probe), we leave it empty. No insert
// failure for malformed payloads — that's the entire point of the log.
func LogWebhook(ctx context.Context, db *gorm.DB, row *WebhookLog) error {
if row.RequestID == "" {
row.RequestID = uuid.New().String()
}
if row.EventType == "" {
row.EventType = extractEventType(row.Payload)
}
return db.WithContext(ctx).Create(row).Error
}
// extractEventType attempts to pull the `event_type` field from a JSON
// payload. Returns empty string on any parse failure — event_type is
// informational, not a join key, so unknown is a fine default.
func extractEventType(payload string) string {
if payload == "" {
return ""
}
var probe struct {
EventType string `json:"event_type"`
}
if err := json.Unmarshal([]byte(payload), &probe); err != nil {
return ""
}
return probe.EventType
}

View file

@ -0,0 +1,124 @@
package hyperswitch
import (
"context"
"testing"
"github.com/google/uuid"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"gorm.io/driver/sqlite"
"gorm.io/gorm"
)
func setupWebhookLogDB(t *testing.T) *gorm.DB {
t.Helper()
db, err := gorm.Open(sqlite.Open(":memory:"), &gorm.Config{})
require.NoError(t, err)
require.NoError(t, db.AutoMigrate(&WebhookLog{}))
return db
}
func TestLogWebhook_PersistsMinimalFields(t *testing.T) {
db := setupWebhookLogDB(t)
row := &WebhookLog{
Payload: `{"event_type":"payment.succeeded","payment_id":"pay_1"}`,
SignatureValid: true,
SignatureHeader: "deadbeef",
ProcessingResult: "ok",
SourceIP: "203.0.113.7",
UserAgent: "Hyperswitch/1.0",
RequestID: "req_abc",
}
require.NoError(t, LogWebhook(context.Background(), db, row))
var persisted WebhookLog
require.NoError(t, db.First(&persisted, row.ID).Error)
assert.Equal(t, row.Payload, persisted.Payload)
assert.True(t, persisted.SignatureValid)
assert.Equal(t, "ok", persisted.ProcessingResult)
assert.Equal(t, "203.0.113.7", persisted.SourceIP)
assert.Equal(t, "Hyperswitch/1.0", persisted.UserAgent)
assert.Equal(t, "req_abc", persisted.RequestID)
// event_type is extracted from the payload on insert — the caller
// didn't populate it, LogWebhook did.
assert.Equal(t, "payment.succeeded", persisted.EventType)
// received_at auto-populated
assert.False(t, persisted.ReceivedAt.IsZero())
// Explicit non-nil ID
assert.NotEqual(t, uuid.Nil, persisted.ID)
}
func TestLogWebhook_FillsMissingRequestID(t *testing.T) {
db := setupWebhookLogDB(t)
row := &WebhookLog{
Payload: `{}`,
SignatureValid: false,
ProcessingResult: "signature_invalid",
// RequestID left empty — LogWebhook must generate one.
}
require.NoError(t, LogWebhook(context.Background(), db, row))
assert.NotEmpty(t, row.RequestID)
_, err := uuid.Parse(row.RequestID)
assert.NoError(t, err, "generated request_id must be a valid UUID")
}
func TestLogWebhook_InvalidJSONLeavesEventTypeEmpty(t *testing.T) {
db := setupWebhookLogDB(t)
row := &WebhookLog{
Payload: `not json at all`,
SignatureValid: false,
ProcessingResult: "signature_invalid",
RequestID: "req_probe",
}
require.NoError(t, LogWebhook(context.Background(), db, row))
var persisted WebhookLog
require.NoError(t, db.First(&persisted, row.ID).Error)
// Attack probes / malformed payloads: event_type stays empty, no
// insert failure — the row exists for forensics regardless.
assert.Empty(t, persisted.EventType)
assert.Equal(t, "not json at all", persisted.Payload)
}
func TestLogWebhook_CapturesInvalidSignatureRows(t *testing.T) {
db := setupWebhookLogDB(t)
// The point of the log: even rejected deliveries persist. Drive
// the insert the way the handler would on a signature failure.
row := &WebhookLog{
Payload: `{"fake":"payload"}`,
SignatureValid: false,
SignatureHeader: "invalid-sig",
ProcessingResult: "signature_invalid",
SourceIP: "198.51.100.42",
RequestID: "req_attack",
}
require.NoError(t, LogWebhook(context.Background(), db, row))
var count int64
require.NoError(t, db.Model(&WebhookLog{}).
Where("signature_valid = ? AND source_ip = ?", false, "198.51.100.42").
Count(&count).Error)
assert.Equal(t, int64(1), count,
"forensics query on signature_invalid rows must find the attack attempt")
}
func TestExtractEventType_Variants(t *testing.T) {
cases := []struct {
name string
payload string
want string
}{
{"valid event", `{"event_type":"refund.succeeded"}`, "refund.succeeded"},
{"extra fields", `{"payment_id":"x","event_type":"payment.processing","amount":500}`, "payment.processing"},
{"missing field", `{"payment_id":"x"}`, ""},
{"empty payload", "", ""},
{"not json", `<xml><event>foo</event></xml>`, ""},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
assert.Equal(t, tc.want, extractEventType(tc.payload))
})
}
}

View file

@ -0,0 +1,71 @@
-- v1.0.7 item E: raw-payload audit log for every Hyperswitch webhook
-- reaching our endpoint. Captures both legitimate deliveries and
-- attack attempts (invalid signatures, malformed bodies) — the insert
-- happens regardless of signature-valid / processing-success status,
-- so a forensics query after "something weird happened last Tuesday"
-- has the actual bytes to look at.
--
-- Shape decisions:
--
-- payload TEXT — Hyperswitch sends JSON; TEXT is readable in psql
-- without base64-decoding and plenty fast at our volumes. Invalid
-- UTF-8 is rejected at INSERT time — that class of "attack" is a
-- grossly malformed probe where we have the header + ip + timing
-- anyway, no value in storing the binary payload.
--
-- signature_valid BOOLEAN — HMAC verification outcome. Partial
-- index below makes "attempts with invalid signature last 24h"
-- cheap for forensics.
--
-- processing_result TEXT — 'ok' on successful dispatch, 'error: <msg>'
-- on processing failure (after signature was valid), 'skipped' if
-- the handler declined for another reason, 'signature_invalid' if
-- rejected at the signature gate.
--
-- source_ip / user_agent / request_id — forensics essentials.
-- request_id is captured from Hyperswitch's X-Request-Id header if
-- sent, else the handler generates a UUID so every row has a value
-- correlatable against VEZA's structured logs.
--
-- event_type — pulled from the payload JSON via a best-effort
-- extract; NULL if the payload isn't valid JSON or doesn't carry
-- an event_type field. Useful for "how many dispute.* events have
-- we seen this month" — item P1.6 (disputes) rides along on this
-- log without needing its own handler yet.
--
-- Retention: 90 days by default, swept by CleanupHyperswitchWebhookLog
-- (internal/jobs). Configurable via HYPERSWITCH_WEBHOOK_LOG_RETENTION_DAYS.
CREATE TABLE IF NOT EXISTS hyperswitch_webhook_log (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
received_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
payload TEXT NOT NULL,
signature_valid BOOLEAN NOT NULL,
signature_header TEXT,
processing_result TEXT NOT NULL,
event_type TEXT,
source_ip TEXT,
user_agent TEXT,
request_id TEXT NOT NULL
);
-- Received-at ordering index: "what did we receive in the last hour"
-- is the single most common operational query. Cheap, indexed by
-- default PK on id but adding the timestamp index keeps retention
-- sweeps and forensics scans well-planned.
CREATE INDEX IF NOT EXISTS idx_hyperswitch_webhook_log_received_at
ON hyperswitch_webhook_log(received_at DESC);
-- Partial index on invalid signatures — "show me attack attempts".
-- Partial keeps the index tiny on the common case (valid sigs) and
-- makes the forensics query instantaneous on the rare case.
CREATE INDEX IF NOT EXISTS idx_hyperswitch_webhook_log_signature_invalid
ON hyperswitch_webhook_log(received_at DESC)
WHERE signature_valid = false;
-- request_id is required-NOT-NULL at the column level, so an index
-- on it is just for "correlate this Veza log line with the webhook
-- row". Non-unique because retries with the same request_id could
-- land multiple rows (e.g., Hyperswitch redelivers a webhook).
CREATE INDEX IF NOT EXISTS idx_hyperswitch_webhook_log_request_id
ON hyperswitch_webhook_log(request_id);

View file

@ -0,0 +1 @@
DROP TABLE IF EXISTS hyperswitch_webhook_log;