diff --git a/CHANGELOG.md b/CHANGELOG.md index 2046ae050..aa527c146 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -38,6 +38,66 @@ auto-reversed; the backfill CLI queries Stripe's transfers.List by metadata[order_id] to populate missing ids, acceptable to leave NULL per v107-plan. +### Item E — webhook raw-payload audit log + +Every POST /webhooks/hyperswitch delivery is now persisted to +`hyperswitch_webhook_log` regardless of signature-valid or processing +outcome. Captures both legitimate deliveries and attack probes — a +forensics query "what did we actually receive from this IP last +Tuesday" now has the actual bytes to read, not just "webhook rejected: +invalid signature" in a grep-able log line. + +Table shape (migration 984): + + * `payload TEXT` — Hyperswitch sends JSON, TEXT is readable in psql + without base64-decoding. Invalid UTF-8 replaced with empty string + before INSERT (forensics value of a binary blob is zero vs. the + headers+ip+timing we keep regardless). + * `signature_valid BOOLEAN` — partial index on `WHERE + signature_valid = false` makes "show me attack attempts" queries + instantaneous. + * `processing_result TEXT` — 'ok', 'error: ', + 'signature_invalid', or 'skipped'. Matches the action semantic + exactly. + * `source_ip`, `user_agent`, `request_id` — forensics essentials. + request_id is captured from Hyperswitch's `X-Request-Id` header + if sent, else a UUID generated server-side so every row is + correlatable to VEZA's structured logs. + * `event_type` — best-effort extract from the JSON payload. NULL + when the payload isn't valid JSON or doesn't carry an event_type + field. Useful for "how many dispute.* events have we seen this + month" without needing a dispute handler implemented yet (the + log captures disputes alongside everything else, ready for + axis-1 P1.6 when it lands). + +Hardening: + * 64KB body cap (via `io.LimitReader`) rejects oversize payloads + with 413 before any INSERT — prevents log-spam DoS. + * INSERT-once-at-end-with-final-state pattern: one row per + delivery, no two-phase update risk. Signature-invalid and + processing-error rows both land. + * DB persistence failures are logged but never fail the webhook + response — the endpoint's primary contract is acking Hyperswitch. + +Retention sweep (CleanupHyperswitchWebhookLog in internal/jobs): + * Daily tick, batched DELETE (10k rows per batch with 100ms pause + between) so a large backlog doesn't lock the table. + * Retention configurable via + `HYPERSWITCH_WEBHOOK_LOG_RETENTION_DAYS` (default 90). + * Uses the same goroutine-ticker pattern as + ScheduleOrphanTracksCleanup / ScheduleSessionCleanup. + +Tests: + * 5 tests in `internal/services/hyperswitch/webhook_log_test.go`: + minimal-field persistence, request_id auto-generation on empty + input, invalid-JSON leaves event_type empty, invalid-signature + rows are captured (forensics assert), extractEventType variants + (5 sub-cases). + * 4 tests in + `internal/jobs/cleanup_hyperswitch_webhook_log_test.go`: + deletes-older-than-retention, noop-when-nothing-expired, + default-retention-on-zero, context-cancellation-respected. + ### Item D — Idempotency-Key on CreatePayment / CreateRefund The Hyperswitch client now sends an `Idempotency-Key` HTTP header on diff --git a/docs/audit-2026-04/v107-plan.md b/docs/audit-2026-04/v107-plan.md index 65b8472ae..858c44e4a 100644 --- a/docs/audit-2026-04/v107-plan.md +++ b/docs/audit-2026-04/v107-plan.md @@ -273,6 +273,16 @@ Acceptance: - Subscribe with provider misconfigured → 503, no row created. - Migration of v1.0.6.2 voided rows — check `voided_subscriptions_20260417` entries stay readable and not re-pickable by the new flow. + - **Idempotency-Key threading (inherited from item D)**: the + new Hyperswitch-backed subscription payment provider MUST + accept an explicit `idempotencyKey` parameter and send it as + the `Idempotency-Key` HTTP header, using `subscription.ID` + (UUID) as the key. An empty-key loud-fail test is required — + same pattern as D's `TestClient_CreatePayment_RejectsEmpty + IdempotencyKey`, literally copy-paste the 4-line test + skeleton with `CreateSubscriptionPayment` substituted for + `CreateRefund`. Without this check, item G silently reopens + the HTTP-retry duplicate-charge exposure that D closed. - **E2E Playwright @critical**: `POST /subscribe` followed by `POST /distribution/submit` asserts 403 with the "complete payment" message until the payment webhook fires. Today's diff --git a/veza-backend-api/.env.template b/veza-backend-api/.env.template index 5969af4b7..5c9655464 100644 --- a/veza-backend-api/.env.template +++ b/veza-backend-api/.env.template @@ -116,6 +116,13 @@ STRIPE_CONNECT_WEBHOOK_SECRET= # REVERSAL_BACKOFF_BASE=1m # REVERSAL_BACKOFF_MAX=1h +# --- HYPERSWITCH WEBHOOK LOG (v1.0.7 item E) --- +# Every webhook hitting POST /webhooks/hyperswitch is persisted to +# hyperswitch_webhook_log regardless of signature-valid / processing +# outcome — captures attack attempts alongside legitimate traffic for +# forensics. A daily sweep deletes rows older than this many days. +# HYPERSWITCH_WEBHOOK_LOG_RETENTION_DAYS=90 + # --- EXTERNAL SERVICES (OPTIONAL) --- STREAM_SERVER_URL=http://veza.fr:8082 # Must match stream server INTERNAL_API_KEY for /internal/jobs/transcode (P1.1.2) diff --git a/veza-backend-api/cmd/api/main.go b/veza-backend-api/cmd/api/main.go index 5e5392b73..4de25ae5d 100644 --- a/veza-backend-api/cmd/api/main.go +++ b/veza-backend-api/cmd/api/main.go @@ -306,6 +306,11 @@ func main() { // vanished (crash, SIGKILL, disk wipe). Keeps the tracks table honest. jobs.ScheduleOrphanTracksCleanup(db, logger) + // v1.0.7 item E: daily sweep of hyperswitch_webhook_log rows older than + // HYPERSWITCH_WEBHOOK_LOG_RETENTION_DAYS (default 90). Batched so a large + // backlog doesn't lock the table. + jobs.ScheduleHyperswitchWebhookLogCleanup(db, logger, cfg.HyperswitchWebhookLogRetentionDays) + // Configuration du serveur HTTP port := fmt.Sprintf("%d", cfg.AppPort) if cfg.AppPort == 0 { diff --git a/veza-backend-api/internal/api/routes_webhooks.go b/veza-backend-api/internal/api/routes_webhooks.go index f7a139517..0117e29c2 100644 --- a/veza-backend-api/internal/api/routes_webhooks.go +++ b/veza-backend-api/internal/api/routes_webhooks.go @@ -4,6 +4,8 @@ import ( "context" "encoding/json" "io" + "net/http" + "unicode/utf8" "github.com/gin-gonic/gin" "go.uber.org/zap" @@ -53,27 +55,84 @@ func (r *APIRouter) setupWebhookRoutes(router *gin.RouterGroup) { } // hyperswitchWebhookHandler handles POST /webhooks/hyperswitch from Hyperswitch. +// +// v1.0.7 item E: every delivery is persisted to hyperswitch_webhook_log, +// including deliveries that fail signature verification or processing +// — the log is the forensic audit trail for both legitimate activity +// and attack probes. A 64KB body cap protects the table from log-spam +// DoS (rejected with 413 before any INSERT). func (r *APIRouter) hyperswitchWebhookHandler() gin.HandlerFunc { marketService := r.getMarketplaceService() webhookSecret := r.config.HyperswitchWebhookSecret return func(c *gin.Context) { - body, err := io.ReadAll(c.Request.Body) + // v1.0.7 item E: cap the body read at MaxWebhookPayloadBytes+1 + // so we can detect oversize without loading arbitrary megabytes + // into memory. An attacker posting 10MB gets 413 back; the log + // table never sees the row. + limited := io.LimitReader(c.Request.Body, hyperswitch.MaxWebhookPayloadBytes+1) + body, err := io.ReadAll(limited) if err != nil { r.logger.Error("Hyperswitch webhook: failed to read body", zap.Error(err)) response.InternalServerError(c, "Failed to read webhook body") return } + if len(body) > hyperswitch.MaxWebhookPayloadBytes { + r.logger.Warn("Hyperswitch webhook: payload exceeds size cap, rejecting", + zap.Int("size_bytes", len(body)), + zap.Int("max_bytes", hyperswitch.MaxWebhookPayloadBytes)) + c.AbortWithStatusJSON(http.StatusRequestEntityTooLarge, gin.H{ + "error": "payload too large", + "limit": hyperswitch.MaxWebhookPayloadBytes, + }) + return + } + + // Payload as TEXT: Hyperswitch sends JSON, but a probe/attack + // might send anything. Reject invalid UTF-8 outright — the + // attack-forensics value is in headers + ip + timing, not the + // binary body. + payload := string(body) + if !utf8.ValidString(payload) { + r.logger.Warn("Hyperswitch webhook: payload contains invalid UTF-8, replacing with empty for log") + payload = "" + } + + // Collect forensics context regardless of outcome. + requestID := c.GetHeader("X-Request-Id") + if requestID == "" { + requestID = c.GetHeader("X-Request-ID") + } + sigHeader := c.GetHeader("x-webhook-signature-512") + logRow := &hyperswitch.WebhookLog{ + Payload: payload, + SignatureHeader: sigHeader, + SourceIP: c.ClientIP(), + UserAgent: c.GetHeader("User-Agent"), + RequestID: requestID, // filled with UUID by LogWebhook if empty + } + + // Signature verification. Regardless of outcome, the row lands + // with signature_valid set accordingly. if webhookSecret == "" { r.logger.Error("Hyperswitch webhook: HYPERSWITCH_WEBHOOK_SECRET not configured, rejecting webhook") + logRow.SignatureValid = false + logRow.ProcessingResult = "error: webhook secret not configured" + r.persistWebhookLog(c.Request.Context(), logRow) response.InternalServerError(c, "Webhook secret not configured") return } - sig := c.GetHeader("x-webhook-signature-512") - if err := hyperswitch.VerifyWebhookSignature(body, sig, webhookSecret); err != nil { - r.logger.Warn("Hyperswitch webhook: signature verification failed", zap.Error(err)) + if err := hyperswitch.VerifyWebhookSignature(body, sigHeader, webhookSecret); err != nil { + r.logger.Warn("Hyperswitch webhook: signature verification failed", + zap.String("source_ip", logRow.SourceIP), + zap.Error(err)) + logRow.SignatureValid = false + logRow.ProcessingResult = "signature_invalid" + r.persistWebhookLog(c.Request.Context(), logRow) response.Unauthorized(c, "Invalid webhook signature") return } + logRow.SignatureValid = true + // v1.0.6: dispatch refund events to ProcessRefundWebhook. Payment // events keep flowing through ProcessPaymentWebhook unchanged. var peek marketplace.HyperswitchWebhookPayload @@ -91,13 +150,33 @@ func (r *APIRouter) hyperswitchWebhookHandler() gin.HandlerFunc { r.logger.Error("Hyperswitch webhook: processing failed", zap.Bool("is_refund_event", peek.IsRefundEvent()), zap.Error(procErr)) + logRow.ProcessingResult = "error: " + procErr.Error() + r.persistWebhookLog(c.Request.Context(), logRow) response.InternalServerError(c, "Webhook processing failed") return } + logRow.ProcessingResult = "ok" + r.persistWebhookLog(c.Request.Context(), logRow) response.Success(c, gin.H{"received": true}) } } +// persistWebhookLog writes the audit row. Any DB failure is logged and +// swallowed — the endpoint's primary contract is to ack Hyperswitch, +// not to persist audit perfectly. A persistent storage failure is +// operationally visible via this log line. +func (r *APIRouter) persistWebhookLog(ctx context.Context, row *hyperswitch.WebhookLog) { + if r.db == nil || r.db.GormDB == nil { + return + } + if err := hyperswitch.LogWebhook(ctx, r.db.GormDB, row); err != nil { + r.logger.Error("Hyperswitch webhook: failed to persist audit log row", + zap.String("request_id", row.RequestID), + zap.Bool("signature_valid", row.SignatureValid), + zap.Error(err)) + } +} + // getMarketplaceService returns the marketplace service with Hyperswitch wiring. // Used by webhook handler; mirrors setupMarketplaceRoutes service creation. func (r *APIRouter) getMarketplaceService() marketplace.MarketplaceService { diff --git a/veza-backend-api/internal/config/config.go b/veza-backend-api/internal/config/config.go index 2bf26bba5..ee80359d0 100644 --- a/veza-backend-api/internal/config/config.go +++ b/veza-backend-api/internal/config/config.go @@ -176,6 +176,9 @@ type Config struct { ReversalBackoffBase time.Duration // REVERSAL_BACKOFF_BASE (default 1m) ReversalBackoffMax time.Duration // REVERSAL_BACKOFF_MAX (default 1h) + // Hyperswitch webhook log retention (v1.0.7 item E). + HyperswitchWebhookLogRetentionDays int // HYPERSWITCH_WEBHOOK_LOG_RETENTION_DAYS (default 90) + // Email & Jobs EmailSender *email.SMTPEmailSender JobWorker *workers.JobWorker @@ -416,6 +419,9 @@ func NewConfig() (*Config, error) { ReversalBackoffBase: getEnvDuration("REVERSAL_BACKOFF_BASE", time.Minute), ReversalBackoffMax: getEnvDuration("REVERSAL_BACKOFF_MAX", time.Hour), + // Webhook audit log retention (v1.0.7 item E) + HyperswitchWebhookLogRetentionDays: getEnvInt("HYPERSWITCH_WEBHOOK_LOG_RETENTION_DAYS", 90), + // Log Files Configuration — centralized in config/logging.toml // Resolved via logging.LoadConfig() with env var overrides (LOG_DIR, LOG_LEVEL) LogDir: logging.LoadConfig().ResolveLogDir(env), diff --git a/veza-backend-api/internal/jobs/cleanup_hyperswitch_webhook_log.go b/veza-backend-api/internal/jobs/cleanup_hyperswitch_webhook_log.go new file mode 100644 index 000000000..ff2933b30 --- /dev/null +++ b/veza-backend-api/internal/jobs/cleanup_hyperswitch_webhook_log.go @@ -0,0 +1,106 @@ +package jobs + +import ( + "context" + "fmt" + "time" + + "go.uber.org/zap" + + "veza-backend-api/internal/database" +) + +// DefaultWebhookLogRetentionDays is the age threshold beyond which +// hyperswitch_webhook_log rows are eligible for deletion. Tuned to +// "long enough that a customer dispute surfaced 60 days later can +// still find the relevant webhook, not so long that the table +// accumulates indefinitely". Override via +// HYPERSWITCH_WEBHOOK_LOG_RETENTION_DAYS. +const DefaultWebhookLogRetentionDays = 90 + +// webhookLogDeleteBatchSize caps each DELETE's rows-affected so a +// one-shot sweep against a 10M-row backlog doesn't lock the table +// or bloat WAL. The sweep keeps calling DELETE in a loop until a +// batch comes back empty. +const webhookLogDeleteBatchSize = 10000 + +// CleanupHyperswitchWebhookLog deletes rows older than retentionDays +// in batches. Safe to run repeatedly; each batch is bounded so a +// concurrent writer is not blocked for long. +// +// Returns the total count deleted across all batches this invocation. +func CleanupHyperswitchWebhookLog(ctx context.Context, db *database.Database, logger *zap.Logger, retentionDays int) (int64, error) { + if db == nil || db.GormDB == nil { + return 0, nil + } + if retentionDays <= 0 { + retentionDays = DefaultWebhookLogRetentionDays + } + cutoff := time.Now().Add(-time.Duration(retentionDays) * 24 * time.Hour) + + var total int64 + for { + // PostgreSQL-specific: DELETE with subquery + LIMIT to bound + // each batch. The subquery selects a bounded set of ids; the + // outer DELETE removes those exact rows. Works on SQLite too + // since the subquery syntax is portable. + result := db.GormDB.WithContext(ctx).Exec( + `DELETE FROM hyperswitch_webhook_log + WHERE id IN ( + SELECT id FROM hyperswitch_webhook_log + WHERE received_at < ? + ORDER BY received_at ASC + LIMIT ? + )`, + cutoff, webhookLogDeleteBatchSize) + if result.Error != nil { + return total, fmt.Errorf("webhook log cleanup: batch delete: %w", result.Error) + } + if result.RowsAffected == 0 { + break + } + total += result.RowsAffected + logger.Debug("Hyperswitch webhook log cleanup: batch deleted", + zap.Int64("batch_rows", result.RowsAffected), + zap.Int64("total_so_far", total), + zap.Time("cutoff", cutoff)) + // Let a concurrent writer breathe between batches if the + // backlog is large. Cheap in the common case where one batch + // empties the table. + select { + case <-ctx.Done(): + return total, ctx.Err() + case <-time.After(100 * time.Millisecond): + } + } + + if total > 0 { + logger.Info("Hyperswitch webhook log cleanup complete", + zap.Int64("rows_deleted", total), + zap.Int("retention_days", retentionDays), + zap.Time("cutoff", cutoff)) + } + return total, nil +} + +// ScheduleHyperswitchWebhookLogCleanup runs the cleanup once at +// startup and then daily thereafter. Same goroutine-ticker pattern +// as ScheduleOrphanTracksCleanup. +func ScheduleHyperswitchWebhookLogCleanup(db *database.Database, logger *zap.Logger, retentionDays int) { + ticker := time.NewTicker(24 * time.Hour) + go func() { + ctx := context.Background() + + if _, err := CleanupHyperswitchWebhookLog(ctx, db, logger, retentionDays); err != nil { + logger.Error("Initial hyperswitch webhook log cleanup failed", zap.Error(err)) + } + + for range ticker.C { + if _, err := CleanupHyperswitchWebhookLog(ctx, db, logger, retentionDays); err != nil { + logger.Error("Scheduled hyperswitch webhook log cleanup failed", zap.Error(err)) + } + } + }() + logger.Info("Hyperswitch webhook log cleanup scheduled to run daily", + zap.Int("retention_days", retentionDays)) +} diff --git a/veza-backend-api/internal/jobs/cleanup_hyperswitch_webhook_log_test.go b/veza-backend-api/internal/jobs/cleanup_hyperswitch_webhook_log_test.go new file mode 100644 index 000000000..6a52e677a --- /dev/null +++ b/veza-backend-api/internal/jobs/cleanup_hyperswitch_webhook_log_test.go @@ -0,0 +1,120 @@ +package jobs + +import ( + "context" + "testing" + "time" + + "github.com/google/uuid" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/zap" + "gorm.io/driver/sqlite" + "gorm.io/gorm" + + "veza-backend-api/internal/database" + "veza-backend-api/internal/services/hyperswitch" +) + +func setupCleanupTestDB(t *testing.T) *database.Database { + t.Helper() + gdb, err := gorm.Open(sqlite.Open(":memory:"), &gorm.Config{}) + require.NoError(t, err) + require.NoError(t, gdb.AutoMigrate(&hyperswitch.WebhookLog{})) + return &database.Database{GormDB: gdb} +} + +// seedWebhookLogRow plants a row with a specific received_at so the +// cleanup cutoff logic can be exercised. +func seedWebhookLogRow(t *testing.T, db *gorm.DB, receivedAt time.Time) { + t.Helper() + row := &hyperswitch.WebhookLog{ + ID: uuid.New(), + Payload: `{"event_type":"test"}`, + SignatureValid: true, + ProcessingResult: "ok", + RequestID: uuid.New().String(), + } + require.NoError(t, db.Create(row).Error) + // Override received_at post-Create because autoCreateTime sets NOW(). + require.NoError(t, db.Model(row).Update("received_at", receivedAt).Error) +} + +func TestCleanupHyperswitchWebhookLog_DeletesOlderThanRetention(t *testing.T) { + db := setupCleanupTestDB(t) + logger := zap.NewNop() + + now := time.Now() + // Seed: 3 old rows (100 days ago), 2 recent (1 day ago). + for i := 0; i < 3; i++ { + seedWebhookLogRow(t, db.GormDB, now.Add(-100*24*time.Hour)) + } + for i := 0; i < 2; i++ { + seedWebhookLogRow(t, db.GormDB, now.Add(-24*time.Hour)) + } + + deleted, err := CleanupHyperswitchWebhookLog(context.Background(), db, logger, 90) + require.NoError(t, err) + assert.Equal(t, int64(3), deleted) + + var remaining int64 + require.NoError(t, db.GormDB.Model(&hyperswitch.WebhookLog{}).Count(&remaining).Error) + assert.Equal(t, int64(2), remaining, "recent rows must be preserved") +} + +func TestCleanupHyperswitchWebhookLog_NoopWhenNothingExpired(t *testing.T) { + db := setupCleanupTestDB(t) + logger := zap.NewNop() + + // All rows well within retention. + for i := 0; i < 5; i++ { + seedWebhookLogRow(t, db.GormDB, time.Now().Add(-1*time.Hour)) + } + + deleted, err := CleanupHyperswitchWebhookLog(context.Background(), db, logger, 90) + require.NoError(t, err) + assert.Equal(t, int64(0), deleted) + + var count int64 + require.NoError(t, db.GormDB.Model(&hyperswitch.WebhookLog{}).Count(&count).Error) + assert.Equal(t, int64(5), count) +} + +func TestCleanupHyperswitchWebhookLog_DefaultRetentionOnZero(t *testing.T) { + db := setupCleanupTestDB(t) + logger := zap.NewNop() + + // Row at 100 days old: deletable under default (90d), retained + // under 365d. + seedWebhookLogRow(t, db.GormDB, time.Now().Add(-100*24*time.Hour)) + + // Passing 0 or negative must route through the default (90). + deleted, err := CleanupHyperswitchWebhookLog(context.Background(), db, logger, 0) + require.NoError(t, err) + assert.Equal(t, int64(1), deleted) +} + +func TestCleanupHyperswitchWebhookLog_RespectsCtxCancellation(t *testing.T) { + db := setupCleanupTestDB(t) + logger := zap.NewNop() + + // Plant enough rows to force at least one inter-batch pause. + // batchSize=10000, so one batch handles 10k; plant 10,001 to need 2. + // For test speed keep small and exercise only the cancel path via + // a pre-cancelled context — that covers the ctx.Done() branch in + // the inter-batch select. + for i := 0; i < 12; i++ { + seedWebhookLogRow(t, db.GormDB, time.Now().Add(-100*24*time.Hour)) + } + + ctx, cancel := context.WithCancel(context.Background()) + cancel() // pre-cancelled + + _, err := CleanupHyperswitchWebhookLog(ctx, db, logger, 90) + // Either ctx.Err() from the select or context.Canceled wrapped + // in the batch delete — both are acceptable. What matters is + // the function doesn't hang. + if err != nil { + assert.Contains(t, err.Error(), "context", "error must mention context cancellation") + } +} diff --git a/veza-backend-api/internal/services/hyperswitch/webhook_log.go b/veza-backend-api/internal/services/hyperswitch/webhook_log.go new file mode 100644 index 000000000..be2a5f963 --- /dev/null +++ b/veza-backend-api/internal/services/hyperswitch/webhook_log.go @@ -0,0 +1,83 @@ +package hyperswitch + +import ( + "context" + "encoding/json" + "time" + + "github.com/google/uuid" + "gorm.io/gorm" +) + +// MaxWebhookPayloadBytes caps the body size the handler accepts before +// persisting. Hyperswitch's own payloads are in the low-KB range; 64KB +// is generous for legitimate traffic and small enough to prevent a log- +// spam DoS where an attacker POSTs megabytes of random bytes to +// consume disk via the webhook_log table. Bodies larger than this get +// rejected with 413 before INSERT — the table stays clean. +const MaxWebhookPayloadBytes = 64 * 1024 + +// WebhookLog mirrors the hyperswitch_webhook_log table. Written once +// per webhook delivery (even on signature failure or oversize) so the +// forensics trail captures attack attempts alongside legitimate +// traffic. +type WebhookLog struct { + ID uuid.UUID `gorm:"type:uuid;primaryKey" json:"id"` + ReceivedAt time.Time `gorm:"autoCreateTime;column:received_at" json:"received_at"` + Payload string `gorm:"type:text;column:payload" json:"payload"` + SignatureValid bool `gorm:"column:signature_valid" json:"signature_valid"` + SignatureHeader string `gorm:"column:signature_header" json:"signature_header,omitempty"` + ProcessingResult string `gorm:"column:processing_result;type:text" json:"processing_result"` + EventType string `gorm:"column:event_type" json:"event_type,omitempty"` + SourceIP string `gorm:"column:source_ip" json:"source_ip,omitempty"` + UserAgent string `gorm:"column:user_agent" json:"user_agent,omitempty"` + RequestID string `gorm:"column:request_id" json:"request_id"` +} + +// TableName pins the table name for GORM — the struct would otherwise +// pluralize to `webhook_logs`. +func (WebhookLog) TableName() string { return "hyperswitch_webhook_log" } + +// BeforeCreate populates the UUID if the caller left it zero. +func (w *WebhookLog) BeforeCreate(tx *gorm.DB) error { + if w.ID == uuid.Nil { + w.ID = uuid.New() + } + return nil +} + +// LogWebhook inserts a single audit row for a webhook delivery. +// Intended for one-shot use from the HTTP handler; any failure here +// is logged by the caller but never fails the webhook response — the +// primary job of the endpoint is to ack Hyperswitch, not to persist +// audit perfectly. +// +// event_type is extracted from the payload on a best-effort basis: if +// the JSON parses and carries an event_type field, we capture it; if +// not (malformed payload, attack probe), we leave it empty. No insert +// failure for malformed payloads — that's the entire point of the log. +func LogWebhook(ctx context.Context, db *gorm.DB, row *WebhookLog) error { + if row.RequestID == "" { + row.RequestID = uuid.New().String() + } + if row.EventType == "" { + row.EventType = extractEventType(row.Payload) + } + return db.WithContext(ctx).Create(row).Error +} + +// extractEventType attempts to pull the `event_type` field from a JSON +// payload. Returns empty string on any parse failure — event_type is +// informational, not a join key, so unknown is a fine default. +func extractEventType(payload string) string { + if payload == "" { + return "" + } + var probe struct { + EventType string `json:"event_type"` + } + if err := json.Unmarshal([]byte(payload), &probe); err != nil { + return "" + } + return probe.EventType +} diff --git a/veza-backend-api/internal/services/hyperswitch/webhook_log_test.go b/veza-backend-api/internal/services/hyperswitch/webhook_log_test.go new file mode 100644 index 000000000..0f35d6670 --- /dev/null +++ b/veza-backend-api/internal/services/hyperswitch/webhook_log_test.go @@ -0,0 +1,124 @@ +package hyperswitch + +import ( + "context" + "testing" + + "github.com/google/uuid" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "gorm.io/driver/sqlite" + "gorm.io/gorm" +) + +func setupWebhookLogDB(t *testing.T) *gorm.DB { + t.Helper() + db, err := gorm.Open(sqlite.Open(":memory:"), &gorm.Config{}) + require.NoError(t, err) + require.NoError(t, db.AutoMigrate(&WebhookLog{})) + return db +} + +func TestLogWebhook_PersistsMinimalFields(t *testing.T) { + db := setupWebhookLogDB(t) + + row := &WebhookLog{ + Payload: `{"event_type":"payment.succeeded","payment_id":"pay_1"}`, + SignatureValid: true, + SignatureHeader: "deadbeef", + ProcessingResult: "ok", + SourceIP: "203.0.113.7", + UserAgent: "Hyperswitch/1.0", + RequestID: "req_abc", + } + require.NoError(t, LogWebhook(context.Background(), db, row)) + + var persisted WebhookLog + require.NoError(t, db.First(&persisted, row.ID).Error) + assert.Equal(t, row.Payload, persisted.Payload) + assert.True(t, persisted.SignatureValid) + assert.Equal(t, "ok", persisted.ProcessingResult) + assert.Equal(t, "203.0.113.7", persisted.SourceIP) + assert.Equal(t, "Hyperswitch/1.0", persisted.UserAgent) + assert.Equal(t, "req_abc", persisted.RequestID) + // event_type is extracted from the payload on insert — the caller + // didn't populate it, LogWebhook did. + assert.Equal(t, "payment.succeeded", persisted.EventType) + // received_at auto-populated + assert.False(t, persisted.ReceivedAt.IsZero()) + // Explicit non-nil ID + assert.NotEqual(t, uuid.Nil, persisted.ID) +} + +func TestLogWebhook_FillsMissingRequestID(t *testing.T) { + db := setupWebhookLogDB(t) + row := &WebhookLog{ + Payload: `{}`, + SignatureValid: false, + ProcessingResult: "signature_invalid", + // RequestID left empty — LogWebhook must generate one. + } + require.NoError(t, LogWebhook(context.Background(), db, row)) + assert.NotEmpty(t, row.RequestID) + _, err := uuid.Parse(row.RequestID) + assert.NoError(t, err, "generated request_id must be a valid UUID") +} + +func TestLogWebhook_InvalidJSONLeavesEventTypeEmpty(t *testing.T) { + db := setupWebhookLogDB(t) + row := &WebhookLog{ + Payload: `not json at all`, + SignatureValid: false, + ProcessingResult: "signature_invalid", + RequestID: "req_probe", + } + require.NoError(t, LogWebhook(context.Background(), db, row)) + + var persisted WebhookLog + require.NoError(t, db.First(&persisted, row.ID).Error) + // Attack probes / malformed payloads: event_type stays empty, no + // insert failure — the row exists for forensics regardless. + assert.Empty(t, persisted.EventType) + assert.Equal(t, "not json at all", persisted.Payload) +} + +func TestLogWebhook_CapturesInvalidSignatureRows(t *testing.T) { + db := setupWebhookLogDB(t) + // The point of the log: even rejected deliveries persist. Drive + // the insert the way the handler would on a signature failure. + row := &WebhookLog{ + Payload: `{"fake":"payload"}`, + SignatureValid: false, + SignatureHeader: "invalid-sig", + ProcessingResult: "signature_invalid", + SourceIP: "198.51.100.42", + RequestID: "req_attack", + } + require.NoError(t, LogWebhook(context.Background(), db, row)) + + var count int64 + require.NoError(t, db.Model(&WebhookLog{}). + Where("signature_valid = ? AND source_ip = ?", false, "198.51.100.42"). + Count(&count).Error) + assert.Equal(t, int64(1), count, + "forensics query on signature_invalid rows must find the attack attempt") +} + +func TestExtractEventType_Variants(t *testing.T) { + cases := []struct { + name string + payload string + want string + }{ + {"valid event", `{"event_type":"refund.succeeded"}`, "refund.succeeded"}, + {"extra fields", `{"payment_id":"x","event_type":"payment.processing","amount":500}`, "payment.processing"}, + {"missing field", `{"payment_id":"x"}`, ""}, + {"empty payload", "", ""}, + {"not json", `foo`, ""}, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + assert.Equal(t, tc.want, extractEventType(tc.payload)) + }) + } +} diff --git a/veza-backend-api/migrations/984_hyperswitch_webhook_log.sql b/veza-backend-api/migrations/984_hyperswitch_webhook_log.sql new file mode 100644 index 000000000..effa51cfa --- /dev/null +++ b/veza-backend-api/migrations/984_hyperswitch_webhook_log.sql @@ -0,0 +1,71 @@ +-- v1.0.7 item E: raw-payload audit log for every Hyperswitch webhook +-- reaching our endpoint. Captures both legitimate deliveries and +-- attack attempts (invalid signatures, malformed bodies) — the insert +-- happens regardless of signature-valid / processing-success status, +-- so a forensics query after "something weird happened last Tuesday" +-- has the actual bytes to look at. +-- +-- Shape decisions: +-- +-- payload TEXT — Hyperswitch sends JSON; TEXT is readable in psql +-- without base64-decoding and plenty fast at our volumes. Invalid +-- UTF-8 is rejected at INSERT time — that class of "attack" is a +-- grossly malformed probe where we have the header + ip + timing +-- anyway, no value in storing the binary payload. +-- +-- signature_valid BOOLEAN — HMAC verification outcome. Partial +-- index below makes "attempts with invalid signature last 24h" +-- cheap for forensics. +-- +-- processing_result TEXT — 'ok' on successful dispatch, 'error: ' +-- on processing failure (after signature was valid), 'skipped' if +-- the handler declined for another reason, 'signature_invalid' if +-- rejected at the signature gate. +-- +-- source_ip / user_agent / request_id — forensics essentials. +-- request_id is captured from Hyperswitch's X-Request-Id header if +-- sent, else the handler generates a UUID so every row has a value +-- correlatable against VEZA's structured logs. +-- +-- event_type — pulled from the payload JSON via a best-effort +-- extract; NULL if the payload isn't valid JSON or doesn't carry +-- an event_type field. Useful for "how many dispute.* events have +-- we seen this month" — item P1.6 (disputes) rides along on this +-- log without needing its own handler yet. +-- +-- Retention: 90 days by default, swept by CleanupHyperswitchWebhookLog +-- (internal/jobs). Configurable via HYPERSWITCH_WEBHOOK_LOG_RETENTION_DAYS. + +CREATE TABLE IF NOT EXISTS hyperswitch_webhook_log ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + received_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + payload TEXT NOT NULL, + signature_valid BOOLEAN NOT NULL, + signature_header TEXT, + processing_result TEXT NOT NULL, + event_type TEXT, + source_ip TEXT, + user_agent TEXT, + request_id TEXT NOT NULL +); + +-- Received-at ordering index: "what did we receive in the last hour" +-- is the single most common operational query. Cheap, indexed by +-- default PK on id but adding the timestamp index keeps retention +-- sweeps and forensics scans well-planned. +CREATE INDEX IF NOT EXISTS idx_hyperswitch_webhook_log_received_at + ON hyperswitch_webhook_log(received_at DESC); + +-- Partial index on invalid signatures — "show me attack attempts". +-- Partial keeps the index tiny on the common case (valid sigs) and +-- makes the forensics query instantaneous on the rare case. +CREATE INDEX IF NOT EXISTS idx_hyperswitch_webhook_log_signature_invalid + ON hyperswitch_webhook_log(received_at DESC) + WHERE signature_valid = false; + +-- request_id is required-NOT-NULL at the column level, so an index +-- on it is just for "correlate this Veza log line with the webhook +-- row". Non-unique because retries with the same request_id could +-- land multiple rows (e.g., Hyperswitch redelivers a webhook). +CREATE INDEX IF NOT EXISTS idx_hyperswitch_webhook_log_request_id + ON hyperswitch_webhook_log(request_id); diff --git a/veza-backend-api/migrations/rollback/984_hyperswitch_webhook_log_down.sql b/veza-backend-api/migrations/rollback/984_hyperswitch_webhook_log_down.sql new file mode 100644 index 000000000..7929a37f4 --- /dev/null +++ b/veza-backend-api/migrations/rollback/984_hyperswitch_webhook_log_down.sql @@ -0,0 +1 @@ +DROP TABLE IF EXISTS hyperswitch_webhook_log;