feat(webhooks): persist raw hyperswitch payloads to audit log — v1.0.7 item E
Some checks failed
Security Scan / Secret Scanning (gitleaks) (push) Failing after 0s
Veza CI / Notify on failure (push) Failing after 0s
Veza CI / Backend (Go) (push) Failing after 0s
Veza CI / Frontend (Web) (push) Failing after 0s
Veza CI / Rust (Stream Server) (push) Failing after 0s
Some checks failed
Security Scan / Secret Scanning (gitleaks) (push) Failing after 0s
Veza CI / Notify on failure (push) Failing after 0s
Veza CI / Backend (Go) (push) Failing after 0s
Veza CI / Frontend (Web) (push) Failing after 0s
Veza CI / Rust (Stream Server) (push) Failing after 0s
Every POST /webhooks/hyperswitch delivery now writes a row to
`hyperswitch_webhook_log` regardless of signature-valid or
processing outcome. Captures both legitimate deliveries and attack
probes — a forensics query now has the actual bytes to read, not
just a "webhook rejected" log line. Disputes (axis-1 P1.6) ride
along: the log captures dispute.* events alongside payment and
refund events, ready for when disputes get a handler.
Table shape (migration 984):
* payload TEXT — readable in psql, invalid UTF-8 replaced with
empty (forensics value is in headers + ip + timing for those
attacks, not the binary body).
* signature_valid BOOLEAN + partial index for "show me attack
attempts" being instantaneous.
* processing_result TEXT — 'ok' / 'error: <msg>' /
'signature_invalid' / 'skipped'. Matches the P1.5 action
semantic exactly.
* source_ip, user_agent, request_id — forensics essentials.
request_id is captured from Hyperswitch's X-Request-Id header
when present, else a server-side UUID so every row correlates
to VEZA's structured logs.
* event_type — best-effort extract from the JSON payload, NULL
on malformed input.
Hardening:
* 64KB body cap via io.LimitReader rejects oversize with 413
before any INSERT — prevents log-spam DoS.
* Single INSERT per delivery with final state; no two-phase
update race on signature-failure path. signature_invalid and
processing-error rows both land.
* DB persistence failures are logged but swallowed — the
endpoint's contract is to ack Hyperswitch, not perfect audit.
Retention sweep:
* CleanupHyperswitchWebhookLog in internal/jobs, daily tick,
batched DELETE (10k rows + 100ms pause) so a large backlog
doesn't lock the table.
* HYPERSWITCH_WEBHOOK_LOG_RETENTION_DAYS (default 90).
* Same goroutine-ticker pattern as ScheduleOrphanTracksCleanup.
* Wired in cmd/api/main.go alongside the existing cleanup jobs.
Tests: 5 in webhook_log_test.go (persistence, request_id auto-gen,
invalid-JSON leaves event_type empty, invalid-signature capture,
extractEventType 5 sub-cases) + 4 in cleanup_hyperswitch_webhook_
log_test.go (deletes-older-than, noop, default-on-zero,
context-cancel). Migration 984 applied cleanly to local Postgres;
all indexes present.
Also (v107-plan.md):
* Item G acceptance gains an explicit Idempotency-Key threading
requirement with an empty-key loud-fail test — "literally
copy-paste D's 4-line test skeleton". Closes the risk that
item G silently reopens the HTTP-retry duplicate-charge
exposure D closed.
Out of scope for E (noted in CHANGELOG):
* Rate limit on the endpoint — pre-existing middleware covers
it at the router level; adding a per-endpoint limit is
separate scope.
* Readable-payload SQL view — deferred, the TEXT column is
already human-readable; a convenience view is a nice-to-have
not a ship-blocker.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
4f15cfbd92
commit
cef7c64697
12 changed files with 676 additions and 4 deletions
60
CHANGELOG.md
60
CHANGELOG.md
|
|
@ -38,6 +38,66 @@ auto-reversed; the backfill CLI queries Stripe's transfers.List by
|
|||
metadata[order_id] to populate missing ids, acceptable to leave NULL
|
||||
per v107-plan.
|
||||
|
||||
### Item E — webhook raw-payload audit log
|
||||
|
||||
Every POST /webhooks/hyperswitch delivery is now persisted to
|
||||
`hyperswitch_webhook_log` regardless of signature-valid or processing
|
||||
outcome. Captures both legitimate deliveries and attack probes — a
|
||||
forensics query "what did we actually receive from this IP last
|
||||
Tuesday" now has the actual bytes to read, not just "webhook rejected:
|
||||
invalid signature" in a grep-able log line.
|
||||
|
||||
Table shape (migration 984):
|
||||
|
||||
* `payload TEXT` — Hyperswitch sends JSON, TEXT is readable in psql
|
||||
without base64-decoding. Invalid UTF-8 replaced with empty string
|
||||
before INSERT (forensics value of a binary blob is zero vs. the
|
||||
headers+ip+timing we keep regardless).
|
||||
* `signature_valid BOOLEAN` — partial index on `WHERE
|
||||
signature_valid = false` makes "show me attack attempts" queries
|
||||
instantaneous.
|
||||
* `processing_result TEXT` — 'ok', 'error: <msg>',
|
||||
'signature_invalid', or 'skipped'. Matches the action semantic
|
||||
exactly.
|
||||
* `source_ip`, `user_agent`, `request_id` — forensics essentials.
|
||||
request_id is captured from Hyperswitch's `X-Request-Id` header
|
||||
if sent, else a UUID generated server-side so every row is
|
||||
correlatable to VEZA's structured logs.
|
||||
* `event_type` — best-effort extract from the JSON payload. NULL
|
||||
when the payload isn't valid JSON or doesn't carry an event_type
|
||||
field. Useful for "how many dispute.* events have we seen this
|
||||
month" without needing a dispute handler implemented yet (the
|
||||
log captures disputes alongside everything else, ready for
|
||||
axis-1 P1.6 when it lands).
|
||||
|
||||
Hardening:
|
||||
* 64KB body cap (via `io.LimitReader`) rejects oversize payloads
|
||||
with 413 before any INSERT — prevents log-spam DoS.
|
||||
* INSERT-once-at-end-with-final-state pattern: one row per
|
||||
delivery, no two-phase update risk. Signature-invalid and
|
||||
processing-error rows both land.
|
||||
* DB persistence failures are logged but never fail the webhook
|
||||
response — the endpoint's primary contract is acking Hyperswitch.
|
||||
|
||||
Retention sweep (CleanupHyperswitchWebhookLog in internal/jobs):
|
||||
* Daily tick, batched DELETE (10k rows per batch with 100ms pause
|
||||
between) so a large backlog doesn't lock the table.
|
||||
* Retention configurable via
|
||||
`HYPERSWITCH_WEBHOOK_LOG_RETENTION_DAYS` (default 90).
|
||||
* Uses the same goroutine-ticker pattern as
|
||||
ScheduleOrphanTracksCleanup / ScheduleSessionCleanup.
|
||||
|
||||
Tests:
|
||||
* 5 tests in `internal/services/hyperswitch/webhook_log_test.go`:
|
||||
minimal-field persistence, request_id auto-generation on empty
|
||||
input, invalid-JSON leaves event_type empty, invalid-signature
|
||||
rows are captured (forensics assert), extractEventType variants
|
||||
(5 sub-cases).
|
||||
* 4 tests in
|
||||
`internal/jobs/cleanup_hyperswitch_webhook_log_test.go`:
|
||||
deletes-older-than-retention, noop-when-nothing-expired,
|
||||
default-retention-on-zero, context-cancellation-respected.
|
||||
|
||||
### Item D — Idempotency-Key on CreatePayment / CreateRefund
|
||||
|
||||
The Hyperswitch client now sends an `Idempotency-Key` HTTP header on
|
||||
|
|
|
|||
|
|
@ -273,6 +273,16 @@ Acceptance:
|
|||
- Subscribe with provider misconfigured → 503, no row created.
|
||||
- Migration of v1.0.6.2 voided rows — check `voided_subscriptions_20260417`
|
||||
entries stay readable and not re-pickable by the new flow.
|
||||
- **Idempotency-Key threading (inherited from item D)**: the
|
||||
new Hyperswitch-backed subscription payment provider MUST
|
||||
accept an explicit `idempotencyKey` parameter and send it as
|
||||
the `Idempotency-Key` HTTP header, using `subscription.ID`
|
||||
(UUID) as the key. An empty-key loud-fail test is required —
|
||||
same pattern as D's `TestClient_CreatePayment_RejectsEmpty
|
||||
IdempotencyKey`, literally copy-paste the 4-line test
|
||||
skeleton with `CreateSubscriptionPayment` substituted for
|
||||
`CreateRefund`. Without this check, item G silently reopens
|
||||
the HTTP-retry duplicate-charge exposure that D closed.
|
||||
- **E2E Playwright @critical**: `POST /subscribe` followed by
|
||||
`POST /distribution/submit` asserts 403 with the "complete
|
||||
payment" message until the payment webhook fires. Today's
|
||||
|
|
|
|||
|
|
@ -116,6 +116,13 @@ STRIPE_CONNECT_WEBHOOK_SECRET=
|
|||
# REVERSAL_BACKOFF_BASE=1m
|
||||
# REVERSAL_BACKOFF_MAX=1h
|
||||
|
||||
# --- HYPERSWITCH WEBHOOK LOG (v1.0.7 item E) ---
|
||||
# Every webhook hitting POST /webhooks/hyperswitch is persisted to
|
||||
# hyperswitch_webhook_log regardless of signature-valid / processing
|
||||
# outcome — captures attack attempts alongside legitimate traffic for
|
||||
# forensics. A daily sweep deletes rows older than this many days.
|
||||
# HYPERSWITCH_WEBHOOK_LOG_RETENTION_DAYS=90
|
||||
|
||||
# --- EXTERNAL SERVICES (OPTIONAL) ---
|
||||
STREAM_SERVER_URL=http://veza.fr:8082
|
||||
# Must match stream server INTERNAL_API_KEY for /internal/jobs/transcode (P1.1.2)
|
||||
|
|
|
|||
|
|
@ -306,6 +306,11 @@ func main() {
|
|||
// vanished (crash, SIGKILL, disk wipe). Keeps the tracks table honest.
|
||||
jobs.ScheduleOrphanTracksCleanup(db, logger)
|
||||
|
||||
// v1.0.7 item E: daily sweep of hyperswitch_webhook_log rows older than
|
||||
// HYPERSWITCH_WEBHOOK_LOG_RETENTION_DAYS (default 90). Batched so a large
|
||||
// backlog doesn't lock the table.
|
||||
jobs.ScheduleHyperswitchWebhookLogCleanup(db, logger, cfg.HyperswitchWebhookLogRetentionDays)
|
||||
|
||||
// Configuration du serveur HTTP
|
||||
port := fmt.Sprintf("%d", cfg.AppPort)
|
||||
if cfg.AppPort == 0 {
|
||||
|
|
|
|||
|
|
@ -4,6 +4,8 @@ import (
|
|||
"context"
|
||||
"encoding/json"
|
||||
"io"
|
||||
"net/http"
|
||||
"unicode/utf8"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
"go.uber.org/zap"
|
||||
|
|
@ -53,27 +55,84 @@ func (r *APIRouter) setupWebhookRoutes(router *gin.RouterGroup) {
|
|||
}
|
||||
|
||||
// hyperswitchWebhookHandler handles POST /webhooks/hyperswitch from Hyperswitch.
|
||||
//
|
||||
// v1.0.7 item E: every delivery is persisted to hyperswitch_webhook_log,
|
||||
// including deliveries that fail signature verification or processing
|
||||
// — the log is the forensic audit trail for both legitimate activity
|
||||
// and attack probes. A 64KB body cap protects the table from log-spam
|
||||
// DoS (rejected with 413 before any INSERT).
|
||||
func (r *APIRouter) hyperswitchWebhookHandler() gin.HandlerFunc {
|
||||
marketService := r.getMarketplaceService()
|
||||
webhookSecret := r.config.HyperswitchWebhookSecret
|
||||
return func(c *gin.Context) {
|
||||
body, err := io.ReadAll(c.Request.Body)
|
||||
// v1.0.7 item E: cap the body read at MaxWebhookPayloadBytes+1
|
||||
// so we can detect oversize without loading arbitrary megabytes
|
||||
// into memory. An attacker posting 10MB gets 413 back; the log
|
||||
// table never sees the row.
|
||||
limited := io.LimitReader(c.Request.Body, hyperswitch.MaxWebhookPayloadBytes+1)
|
||||
body, err := io.ReadAll(limited)
|
||||
if err != nil {
|
||||
r.logger.Error("Hyperswitch webhook: failed to read body", zap.Error(err))
|
||||
response.InternalServerError(c, "Failed to read webhook body")
|
||||
return
|
||||
}
|
||||
if len(body) > hyperswitch.MaxWebhookPayloadBytes {
|
||||
r.logger.Warn("Hyperswitch webhook: payload exceeds size cap, rejecting",
|
||||
zap.Int("size_bytes", len(body)),
|
||||
zap.Int("max_bytes", hyperswitch.MaxWebhookPayloadBytes))
|
||||
c.AbortWithStatusJSON(http.StatusRequestEntityTooLarge, gin.H{
|
||||
"error": "payload too large",
|
||||
"limit": hyperswitch.MaxWebhookPayloadBytes,
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
// Payload as TEXT: Hyperswitch sends JSON, but a probe/attack
|
||||
// might send anything. Reject invalid UTF-8 outright — the
|
||||
// attack-forensics value is in headers + ip + timing, not the
|
||||
// binary body.
|
||||
payload := string(body)
|
||||
if !utf8.ValidString(payload) {
|
||||
r.logger.Warn("Hyperswitch webhook: payload contains invalid UTF-8, replacing with empty for log")
|
||||
payload = ""
|
||||
}
|
||||
|
||||
// Collect forensics context regardless of outcome.
|
||||
requestID := c.GetHeader("X-Request-Id")
|
||||
if requestID == "" {
|
||||
requestID = c.GetHeader("X-Request-ID")
|
||||
}
|
||||
sigHeader := c.GetHeader("x-webhook-signature-512")
|
||||
logRow := &hyperswitch.WebhookLog{
|
||||
Payload: payload,
|
||||
SignatureHeader: sigHeader,
|
||||
SourceIP: c.ClientIP(),
|
||||
UserAgent: c.GetHeader("User-Agent"),
|
||||
RequestID: requestID, // filled with UUID by LogWebhook if empty
|
||||
}
|
||||
|
||||
// Signature verification. Regardless of outcome, the row lands
|
||||
// with signature_valid set accordingly.
|
||||
if webhookSecret == "" {
|
||||
r.logger.Error("Hyperswitch webhook: HYPERSWITCH_WEBHOOK_SECRET not configured, rejecting webhook")
|
||||
logRow.SignatureValid = false
|
||||
logRow.ProcessingResult = "error: webhook secret not configured"
|
||||
r.persistWebhookLog(c.Request.Context(), logRow)
|
||||
response.InternalServerError(c, "Webhook secret not configured")
|
||||
return
|
||||
}
|
||||
sig := c.GetHeader("x-webhook-signature-512")
|
||||
if err := hyperswitch.VerifyWebhookSignature(body, sig, webhookSecret); err != nil {
|
||||
r.logger.Warn("Hyperswitch webhook: signature verification failed", zap.Error(err))
|
||||
if err := hyperswitch.VerifyWebhookSignature(body, sigHeader, webhookSecret); err != nil {
|
||||
r.logger.Warn("Hyperswitch webhook: signature verification failed",
|
||||
zap.String("source_ip", logRow.SourceIP),
|
||||
zap.Error(err))
|
||||
logRow.SignatureValid = false
|
||||
logRow.ProcessingResult = "signature_invalid"
|
||||
r.persistWebhookLog(c.Request.Context(), logRow)
|
||||
response.Unauthorized(c, "Invalid webhook signature")
|
||||
return
|
||||
}
|
||||
logRow.SignatureValid = true
|
||||
|
||||
// v1.0.6: dispatch refund events to ProcessRefundWebhook. Payment
|
||||
// events keep flowing through ProcessPaymentWebhook unchanged.
|
||||
var peek marketplace.HyperswitchWebhookPayload
|
||||
|
|
@ -91,13 +150,33 @@ func (r *APIRouter) hyperswitchWebhookHandler() gin.HandlerFunc {
|
|||
r.logger.Error("Hyperswitch webhook: processing failed",
|
||||
zap.Bool("is_refund_event", peek.IsRefundEvent()),
|
||||
zap.Error(procErr))
|
||||
logRow.ProcessingResult = "error: " + procErr.Error()
|
||||
r.persistWebhookLog(c.Request.Context(), logRow)
|
||||
response.InternalServerError(c, "Webhook processing failed")
|
||||
return
|
||||
}
|
||||
logRow.ProcessingResult = "ok"
|
||||
r.persistWebhookLog(c.Request.Context(), logRow)
|
||||
response.Success(c, gin.H{"received": true})
|
||||
}
|
||||
}
|
||||
|
||||
// persistWebhookLog writes the audit row. Any DB failure is logged and
|
||||
// swallowed — the endpoint's primary contract is to ack Hyperswitch,
|
||||
// not to persist audit perfectly. A persistent storage failure is
|
||||
// operationally visible via this log line.
|
||||
func (r *APIRouter) persistWebhookLog(ctx context.Context, row *hyperswitch.WebhookLog) {
|
||||
if r.db == nil || r.db.GormDB == nil {
|
||||
return
|
||||
}
|
||||
if err := hyperswitch.LogWebhook(ctx, r.db.GormDB, row); err != nil {
|
||||
r.logger.Error("Hyperswitch webhook: failed to persist audit log row",
|
||||
zap.String("request_id", row.RequestID),
|
||||
zap.Bool("signature_valid", row.SignatureValid),
|
||||
zap.Error(err))
|
||||
}
|
||||
}
|
||||
|
||||
// getMarketplaceService returns the marketplace service with Hyperswitch wiring.
|
||||
// Used by webhook handler; mirrors setupMarketplaceRoutes service creation.
|
||||
func (r *APIRouter) getMarketplaceService() marketplace.MarketplaceService {
|
||||
|
|
|
|||
|
|
@ -176,6 +176,9 @@ type Config struct {
|
|||
ReversalBackoffBase time.Duration // REVERSAL_BACKOFF_BASE (default 1m)
|
||||
ReversalBackoffMax time.Duration // REVERSAL_BACKOFF_MAX (default 1h)
|
||||
|
||||
// Hyperswitch webhook log retention (v1.0.7 item E).
|
||||
HyperswitchWebhookLogRetentionDays int // HYPERSWITCH_WEBHOOK_LOG_RETENTION_DAYS (default 90)
|
||||
|
||||
// Email & Jobs
|
||||
EmailSender *email.SMTPEmailSender
|
||||
JobWorker *workers.JobWorker
|
||||
|
|
@ -416,6 +419,9 @@ func NewConfig() (*Config, error) {
|
|||
ReversalBackoffBase: getEnvDuration("REVERSAL_BACKOFF_BASE", time.Minute),
|
||||
ReversalBackoffMax: getEnvDuration("REVERSAL_BACKOFF_MAX", time.Hour),
|
||||
|
||||
// Webhook audit log retention (v1.0.7 item E)
|
||||
HyperswitchWebhookLogRetentionDays: getEnvInt("HYPERSWITCH_WEBHOOK_LOG_RETENTION_DAYS", 90),
|
||||
|
||||
// Log Files Configuration — centralized in config/logging.toml
|
||||
// Resolved via logging.LoadConfig() with env var overrides (LOG_DIR, LOG_LEVEL)
|
||||
LogDir: logging.LoadConfig().ResolveLogDir(env),
|
||||
|
|
|
|||
|
|
@ -0,0 +1,106 @@
|
|||
package jobs
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"go.uber.org/zap"
|
||||
|
||||
"veza-backend-api/internal/database"
|
||||
)
|
||||
|
||||
// DefaultWebhookLogRetentionDays is the age threshold beyond which
|
||||
// hyperswitch_webhook_log rows are eligible for deletion. Tuned to
|
||||
// "long enough that a customer dispute surfaced 60 days later can
|
||||
// still find the relevant webhook, not so long that the table
|
||||
// accumulates indefinitely". Override via
|
||||
// HYPERSWITCH_WEBHOOK_LOG_RETENTION_DAYS.
|
||||
const DefaultWebhookLogRetentionDays = 90
|
||||
|
||||
// webhookLogDeleteBatchSize caps each DELETE's rows-affected so a
|
||||
// one-shot sweep against a 10M-row backlog doesn't lock the table
|
||||
// or bloat WAL. The sweep keeps calling DELETE in a loop until a
|
||||
// batch comes back empty.
|
||||
const webhookLogDeleteBatchSize = 10000
|
||||
|
||||
// CleanupHyperswitchWebhookLog deletes rows older than retentionDays
|
||||
// in batches. Safe to run repeatedly; each batch is bounded so a
|
||||
// concurrent writer is not blocked for long.
|
||||
//
|
||||
// Returns the total count deleted across all batches this invocation.
|
||||
func CleanupHyperswitchWebhookLog(ctx context.Context, db *database.Database, logger *zap.Logger, retentionDays int) (int64, error) {
|
||||
if db == nil || db.GormDB == nil {
|
||||
return 0, nil
|
||||
}
|
||||
if retentionDays <= 0 {
|
||||
retentionDays = DefaultWebhookLogRetentionDays
|
||||
}
|
||||
cutoff := time.Now().Add(-time.Duration(retentionDays) * 24 * time.Hour)
|
||||
|
||||
var total int64
|
||||
for {
|
||||
// PostgreSQL-specific: DELETE with subquery + LIMIT to bound
|
||||
// each batch. The subquery selects a bounded set of ids; the
|
||||
// outer DELETE removes those exact rows. Works on SQLite too
|
||||
// since the subquery syntax is portable.
|
||||
result := db.GormDB.WithContext(ctx).Exec(
|
||||
`DELETE FROM hyperswitch_webhook_log
|
||||
WHERE id IN (
|
||||
SELECT id FROM hyperswitch_webhook_log
|
||||
WHERE received_at < ?
|
||||
ORDER BY received_at ASC
|
||||
LIMIT ?
|
||||
)`,
|
||||
cutoff, webhookLogDeleteBatchSize)
|
||||
if result.Error != nil {
|
||||
return total, fmt.Errorf("webhook log cleanup: batch delete: %w", result.Error)
|
||||
}
|
||||
if result.RowsAffected == 0 {
|
||||
break
|
||||
}
|
||||
total += result.RowsAffected
|
||||
logger.Debug("Hyperswitch webhook log cleanup: batch deleted",
|
||||
zap.Int64("batch_rows", result.RowsAffected),
|
||||
zap.Int64("total_so_far", total),
|
||||
zap.Time("cutoff", cutoff))
|
||||
// Let a concurrent writer breathe between batches if the
|
||||
// backlog is large. Cheap in the common case where one batch
|
||||
// empties the table.
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return total, ctx.Err()
|
||||
case <-time.After(100 * time.Millisecond):
|
||||
}
|
||||
}
|
||||
|
||||
if total > 0 {
|
||||
logger.Info("Hyperswitch webhook log cleanup complete",
|
||||
zap.Int64("rows_deleted", total),
|
||||
zap.Int("retention_days", retentionDays),
|
||||
zap.Time("cutoff", cutoff))
|
||||
}
|
||||
return total, nil
|
||||
}
|
||||
|
||||
// ScheduleHyperswitchWebhookLogCleanup runs the cleanup once at
|
||||
// startup and then daily thereafter. Same goroutine-ticker pattern
|
||||
// as ScheduleOrphanTracksCleanup.
|
||||
func ScheduleHyperswitchWebhookLogCleanup(db *database.Database, logger *zap.Logger, retentionDays int) {
|
||||
ticker := time.NewTicker(24 * time.Hour)
|
||||
go func() {
|
||||
ctx := context.Background()
|
||||
|
||||
if _, err := CleanupHyperswitchWebhookLog(ctx, db, logger, retentionDays); err != nil {
|
||||
logger.Error("Initial hyperswitch webhook log cleanup failed", zap.Error(err))
|
||||
}
|
||||
|
||||
for range ticker.C {
|
||||
if _, err := CleanupHyperswitchWebhookLog(ctx, db, logger, retentionDays); err != nil {
|
||||
logger.Error("Scheduled hyperswitch webhook log cleanup failed", zap.Error(err))
|
||||
}
|
||||
}
|
||||
}()
|
||||
logger.Info("Hyperswitch webhook log cleanup scheduled to run daily",
|
||||
zap.Int("retention_days", retentionDays))
|
||||
}
|
||||
|
|
@ -0,0 +1,120 @@
|
|||
package jobs
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.uber.org/zap"
|
||||
"gorm.io/driver/sqlite"
|
||||
"gorm.io/gorm"
|
||||
|
||||
"veza-backend-api/internal/database"
|
||||
"veza-backend-api/internal/services/hyperswitch"
|
||||
)
|
||||
|
||||
func setupCleanupTestDB(t *testing.T) *database.Database {
|
||||
t.Helper()
|
||||
gdb, err := gorm.Open(sqlite.Open(":memory:"), &gorm.Config{})
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, gdb.AutoMigrate(&hyperswitch.WebhookLog{}))
|
||||
return &database.Database{GormDB: gdb}
|
||||
}
|
||||
|
||||
// seedWebhookLogRow plants a row with a specific received_at so the
|
||||
// cleanup cutoff logic can be exercised.
|
||||
func seedWebhookLogRow(t *testing.T, db *gorm.DB, receivedAt time.Time) {
|
||||
t.Helper()
|
||||
row := &hyperswitch.WebhookLog{
|
||||
ID: uuid.New(),
|
||||
Payload: `{"event_type":"test"}`,
|
||||
SignatureValid: true,
|
||||
ProcessingResult: "ok",
|
||||
RequestID: uuid.New().String(),
|
||||
}
|
||||
require.NoError(t, db.Create(row).Error)
|
||||
// Override received_at post-Create because autoCreateTime sets NOW().
|
||||
require.NoError(t, db.Model(row).Update("received_at", receivedAt).Error)
|
||||
}
|
||||
|
||||
func TestCleanupHyperswitchWebhookLog_DeletesOlderThanRetention(t *testing.T) {
|
||||
db := setupCleanupTestDB(t)
|
||||
logger := zap.NewNop()
|
||||
|
||||
now := time.Now()
|
||||
// Seed: 3 old rows (100 days ago), 2 recent (1 day ago).
|
||||
for i := 0; i < 3; i++ {
|
||||
seedWebhookLogRow(t, db.GormDB, now.Add(-100*24*time.Hour))
|
||||
}
|
||||
for i := 0; i < 2; i++ {
|
||||
seedWebhookLogRow(t, db.GormDB, now.Add(-24*time.Hour))
|
||||
}
|
||||
|
||||
deleted, err := CleanupHyperswitchWebhookLog(context.Background(), db, logger, 90)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, int64(3), deleted)
|
||||
|
||||
var remaining int64
|
||||
require.NoError(t, db.GormDB.Model(&hyperswitch.WebhookLog{}).Count(&remaining).Error)
|
||||
assert.Equal(t, int64(2), remaining, "recent rows must be preserved")
|
||||
}
|
||||
|
||||
func TestCleanupHyperswitchWebhookLog_NoopWhenNothingExpired(t *testing.T) {
|
||||
db := setupCleanupTestDB(t)
|
||||
logger := zap.NewNop()
|
||||
|
||||
// All rows well within retention.
|
||||
for i := 0; i < 5; i++ {
|
||||
seedWebhookLogRow(t, db.GormDB, time.Now().Add(-1*time.Hour))
|
||||
}
|
||||
|
||||
deleted, err := CleanupHyperswitchWebhookLog(context.Background(), db, logger, 90)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, int64(0), deleted)
|
||||
|
||||
var count int64
|
||||
require.NoError(t, db.GormDB.Model(&hyperswitch.WebhookLog{}).Count(&count).Error)
|
||||
assert.Equal(t, int64(5), count)
|
||||
}
|
||||
|
||||
func TestCleanupHyperswitchWebhookLog_DefaultRetentionOnZero(t *testing.T) {
|
||||
db := setupCleanupTestDB(t)
|
||||
logger := zap.NewNop()
|
||||
|
||||
// Row at 100 days old: deletable under default (90d), retained
|
||||
// under 365d.
|
||||
seedWebhookLogRow(t, db.GormDB, time.Now().Add(-100*24*time.Hour))
|
||||
|
||||
// Passing 0 or negative must route through the default (90).
|
||||
deleted, err := CleanupHyperswitchWebhookLog(context.Background(), db, logger, 0)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, int64(1), deleted)
|
||||
}
|
||||
|
||||
func TestCleanupHyperswitchWebhookLog_RespectsCtxCancellation(t *testing.T) {
|
||||
db := setupCleanupTestDB(t)
|
||||
logger := zap.NewNop()
|
||||
|
||||
// Plant enough rows to force at least one inter-batch pause.
|
||||
// batchSize=10000, so one batch handles 10k; plant 10,001 to need 2.
|
||||
// For test speed keep small and exercise only the cancel path via
|
||||
// a pre-cancelled context — that covers the ctx.Done() branch in
|
||||
// the inter-batch select.
|
||||
for i := 0; i < 12; i++ {
|
||||
seedWebhookLogRow(t, db.GormDB, time.Now().Add(-100*24*time.Hour))
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
cancel() // pre-cancelled
|
||||
|
||||
_, err := CleanupHyperswitchWebhookLog(ctx, db, logger, 90)
|
||||
// Either ctx.Err() from the select or context.Canceled wrapped
|
||||
// in the batch delete — both are acceptable. What matters is
|
||||
// the function doesn't hang.
|
||||
if err != nil {
|
||||
assert.Contains(t, err.Error(), "context", "error must mention context cancellation")
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,83 @@
|
|||
package hyperswitch
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"gorm.io/gorm"
|
||||
)
|
||||
|
||||
// MaxWebhookPayloadBytes caps the body size the handler accepts before
|
||||
// persisting. Hyperswitch's own payloads are in the low-KB range; 64KB
|
||||
// is generous for legitimate traffic and small enough to prevent a log-
|
||||
// spam DoS where an attacker POSTs megabytes of random bytes to
|
||||
// consume disk via the webhook_log table. Bodies larger than this get
|
||||
// rejected with 413 before INSERT — the table stays clean.
|
||||
const MaxWebhookPayloadBytes = 64 * 1024
|
||||
|
||||
// WebhookLog mirrors the hyperswitch_webhook_log table. Written once
|
||||
// per webhook delivery (even on signature failure or oversize) so the
|
||||
// forensics trail captures attack attempts alongside legitimate
|
||||
// traffic.
|
||||
type WebhookLog struct {
|
||||
ID uuid.UUID `gorm:"type:uuid;primaryKey" json:"id"`
|
||||
ReceivedAt time.Time `gorm:"autoCreateTime;column:received_at" json:"received_at"`
|
||||
Payload string `gorm:"type:text;column:payload" json:"payload"`
|
||||
SignatureValid bool `gorm:"column:signature_valid" json:"signature_valid"`
|
||||
SignatureHeader string `gorm:"column:signature_header" json:"signature_header,omitempty"`
|
||||
ProcessingResult string `gorm:"column:processing_result;type:text" json:"processing_result"`
|
||||
EventType string `gorm:"column:event_type" json:"event_type,omitempty"`
|
||||
SourceIP string `gorm:"column:source_ip" json:"source_ip,omitempty"`
|
||||
UserAgent string `gorm:"column:user_agent" json:"user_agent,omitempty"`
|
||||
RequestID string `gorm:"column:request_id" json:"request_id"`
|
||||
}
|
||||
|
||||
// TableName pins the table name for GORM — the struct would otherwise
|
||||
// pluralize to `webhook_logs`.
|
||||
func (WebhookLog) TableName() string { return "hyperswitch_webhook_log" }
|
||||
|
||||
// BeforeCreate populates the UUID if the caller left it zero.
|
||||
func (w *WebhookLog) BeforeCreate(tx *gorm.DB) error {
|
||||
if w.ID == uuid.Nil {
|
||||
w.ID = uuid.New()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// LogWebhook inserts a single audit row for a webhook delivery.
|
||||
// Intended for one-shot use from the HTTP handler; any failure here
|
||||
// is logged by the caller but never fails the webhook response — the
|
||||
// primary job of the endpoint is to ack Hyperswitch, not to persist
|
||||
// audit perfectly.
|
||||
//
|
||||
// event_type is extracted from the payload on a best-effort basis: if
|
||||
// the JSON parses and carries an event_type field, we capture it; if
|
||||
// not (malformed payload, attack probe), we leave it empty. No insert
|
||||
// failure for malformed payloads — that's the entire point of the log.
|
||||
func LogWebhook(ctx context.Context, db *gorm.DB, row *WebhookLog) error {
|
||||
if row.RequestID == "" {
|
||||
row.RequestID = uuid.New().String()
|
||||
}
|
||||
if row.EventType == "" {
|
||||
row.EventType = extractEventType(row.Payload)
|
||||
}
|
||||
return db.WithContext(ctx).Create(row).Error
|
||||
}
|
||||
|
||||
// extractEventType attempts to pull the `event_type` field from a JSON
|
||||
// payload. Returns empty string on any parse failure — event_type is
|
||||
// informational, not a join key, so unknown is a fine default.
|
||||
func extractEventType(payload string) string {
|
||||
if payload == "" {
|
||||
return ""
|
||||
}
|
||||
var probe struct {
|
||||
EventType string `json:"event_type"`
|
||||
}
|
||||
if err := json.Unmarshal([]byte(payload), &probe); err != nil {
|
||||
return ""
|
||||
}
|
||||
return probe.EventType
|
||||
}
|
||||
|
|
@ -0,0 +1,124 @@
|
|||
package hyperswitch
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"gorm.io/driver/sqlite"
|
||||
"gorm.io/gorm"
|
||||
)
|
||||
|
||||
func setupWebhookLogDB(t *testing.T) *gorm.DB {
|
||||
t.Helper()
|
||||
db, err := gorm.Open(sqlite.Open(":memory:"), &gorm.Config{})
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, db.AutoMigrate(&WebhookLog{}))
|
||||
return db
|
||||
}
|
||||
|
||||
func TestLogWebhook_PersistsMinimalFields(t *testing.T) {
|
||||
db := setupWebhookLogDB(t)
|
||||
|
||||
row := &WebhookLog{
|
||||
Payload: `{"event_type":"payment.succeeded","payment_id":"pay_1"}`,
|
||||
SignatureValid: true,
|
||||
SignatureHeader: "deadbeef",
|
||||
ProcessingResult: "ok",
|
||||
SourceIP: "203.0.113.7",
|
||||
UserAgent: "Hyperswitch/1.0",
|
||||
RequestID: "req_abc",
|
||||
}
|
||||
require.NoError(t, LogWebhook(context.Background(), db, row))
|
||||
|
||||
var persisted WebhookLog
|
||||
require.NoError(t, db.First(&persisted, row.ID).Error)
|
||||
assert.Equal(t, row.Payload, persisted.Payload)
|
||||
assert.True(t, persisted.SignatureValid)
|
||||
assert.Equal(t, "ok", persisted.ProcessingResult)
|
||||
assert.Equal(t, "203.0.113.7", persisted.SourceIP)
|
||||
assert.Equal(t, "Hyperswitch/1.0", persisted.UserAgent)
|
||||
assert.Equal(t, "req_abc", persisted.RequestID)
|
||||
// event_type is extracted from the payload on insert — the caller
|
||||
// didn't populate it, LogWebhook did.
|
||||
assert.Equal(t, "payment.succeeded", persisted.EventType)
|
||||
// received_at auto-populated
|
||||
assert.False(t, persisted.ReceivedAt.IsZero())
|
||||
// Explicit non-nil ID
|
||||
assert.NotEqual(t, uuid.Nil, persisted.ID)
|
||||
}
|
||||
|
||||
func TestLogWebhook_FillsMissingRequestID(t *testing.T) {
|
||||
db := setupWebhookLogDB(t)
|
||||
row := &WebhookLog{
|
||||
Payload: `{}`,
|
||||
SignatureValid: false,
|
||||
ProcessingResult: "signature_invalid",
|
||||
// RequestID left empty — LogWebhook must generate one.
|
||||
}
|
||||
require.NoError(t, LogWebhook(context.Background(), db, row))
|
||||
assert.NotEmpty(t, row.RequestID)
|
||||
_, err := uuid.Parse(row.RequestID)
|
||||
assert.NoError(t, err, "generated request_id must be a valid UUID")
|
||||
}
|
||||
|
||||
func TestLogWebhook_InvalidJSONLeavesEventTypeEmpty(t *testing.T) {
|
||||
db := setupWebhookLogDB(t)
|
||||
row := &WebhookLog{
|
||||
Payload: `not json at all`,
|
||||
SignatureValid: false,
|
||||
ProcessingResult: "signature_invalid",
|
||||
RequestID: "req_probe",
|
||||
}
|
||||
require.NoError(t, LogWebhook(context.Background(), db, row))
|
||||
|
||||
var persisted WebhookLog
|
||||
require.NoError(t, db.First(&persisted, row.ID).Error)
|
||||
// Attack probes / malformed payloads: event_type stays empty, no
|
||||
// insert failure — the row exists for forensics regardless.
|
||||
assert.Empty(t, persisted.EventType)
|
||||
assert.Equal(t, "not json at all", persisted.Payload)
|
||||
}
|
||||
|
||||
func TestLogWebhook_CapturesInvalidSignatureRows(t *testing.T) {
|
||||
db := setupWebhookLogDB(t)
|
||||
// The point of the log: even rejected deliveries persist. Drive
|
||||
// the insert the way the handler would on a signature failure.
|
||||
row := &WebhookLog{
|
||||
Payload: `{"fake":"payload"}`,
|
||||
SignatureValid: false,
|
||||
SignatureHeader: "invalid-sig",
|
||||
ProcessingResult: "signature_invalid",
|
||||
SourceIP: "198.51.100.42",
|
||||
RequestID: "req_attack",
|
||||
}
|
||||
require.NoError(t, LogWebhook(context.Background(), db, row))
|
||||
|
||||
var count int64
|
||||
require.NoError(t, db.Model(&WebhookLog{}).
|
||||
Where("signature_valid = ? AND source_ip = ?", false, "198.51.100.42").
|
||||
Count(&count).Error)
|
||||
assert.Equal(t, int64(1), count,
|
||||
"forensics query on signature_invalid rows must find the attack attempt")
|
||||
}
|
||||
|
||||
func TestExtractEventType_Variants(t *testing.T) {
|
||||
cases := []struct {
|
||||
name string
|
||||
payload string
|
||||
want string
|
||||
}{
|
||||
{"valid event", `{"event_type":"refund.succeeded"}`, "refund.succeeded"},
|
||||
{"extra fields", `{"payment_id":"x","event_type":"payment.processing","amount":500}`, "payment.processing"},
|
||||
{"missing field", `{"payment_id":"x"}`, ""},
|
||||
{"empty payload", "", ""},
|
||||
{"not json", `<xml><event>foo</event></xml>`, ""},
|
||||
}
|
||||
for _, tc := range cases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
assert.Equal(t, tc.want, extractEventType(tc.payload))
|
||||
})
|
||||
}
|
||||
}
|
||||
71
veza-backend-api/migrations/984_hyperswitch_webhook_log.sql
Normal file
71
veza-backend-api/migrations/984_hyperswitch_webhook_log.sql
Normal file
|
|
@ -0,0 +1,71 @@
|
|||
-- v1.0.7 item E: raw-payload audit log for every Hyperswitch webhook
|
||||
-- reaching our endpoint. Captures both legitimate deliveries and
|
||||
-- attack attempts (invalid signatures, malformed bodies) — the insert
|
||||
-- happens regardless of signature-valid / processing-success status,
|
||||
-- so a forensics query after "something weird happened last Tuesday"
|
||||
-- has the actual bytes to look at.
|
||||
--
|
||||
-- Shape decisions:
|
||||
--
|
||||
-- payload TEXT — Hyperswitch sends JSON; TEXT is readable in psql
|
||||
-- without base64-decoding and plenty fast at our volumes. Invalid
|
||||
-- UTF-8 is rejected at INSERT time — that class of "attack" is a
|
||||
-- grossly malformed probe where we have the header + ip + timing
|
||||
-- anyway, no value in storing the binary payload.
|
||||
--
|
||||
-- signature_valid BOOLEAN — HMAC verification outcome. Partial
|
||||
-- index below makes "attempts with invalid signature last 24h"
|
||||
-- cheap for forensics.
|
||||
--
|
||||
-- processing_result TEXT — 'ok' on successful dispatch, 'error: <msg>'
|
||||
-- on processing failure (after signature was valid), 'skipped' if
|
||||
-- the handler declined for another reason, 'signature_invalid' if
|
||||
-- rejected at the signature gate.
|
||||
--
|
||||
-- source_ip / user_agent / request_id — forensics essentials.
|
||||
-- request_id is captured from Hyperswitch's X-Request-Id header if
|
||||
-- sent, else the handler generates a UUID so every row has a value
|
||||
-- correlatable against VEZA's structured logs.
|
||||
--
|
||||
-- event_type — pulled from the payload JSON via a best-effort
|
||||
-- extract; NULL if the payload isn't valid JSON or doesn't carry
|
||||
-- an event_type field. Useful for "how many dispute.* events have
|
||||
-- we seen this month" — item P1.6 (disputes) rides along on this
|
||||
-- log without needing its own handler yet.
|
||||
--
|
||||
-- Retention: 90 days by default, swept by CleanupHyperswitchWebhookLog
|
||||
-- (internal/jobs). Configurable via HYPERSWITCH_WEBHOOK_LOG_RETENTION_DAYS.
|
||||
|
||||
CREATE TABLE IF NOT EXISTS hyperswitch_webhook_log (
|
||||
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
|
||||
received_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
|
||||
payload TEXT NOT NULL,
|
||||
signature_valid BOOLEAN NOT NULL,
|
||||
signature_header TEXT,
|
||||
processing_result TEXT NOT NULL,
|
||||
event_type TEXT,
|
||||
source_ip TEXT,
|
||||
user_agent TEXT,
|
||||
request_id TEXT NOT NULL
|
||||
);
|
||||
|
||||
-- Received-at ordering index: "what did we receive in the last hour"
|
||||
-- is the single most common operational query. Cheap, indexed by
|
||||
-- default PK on id but adding the timestamp index keeps retention
|
||||
-- sweeps and forensics scans well-planned.
|
||||
CREATE INDEX IF NOT EXISTS idx_hyperswitch_webhook_log_received_at
|
||||
ON hyperswitch_webhook_log(received_at DESC);
|
||||
|
||||
-- Partial index on invalid signatures — "show me attack attempts".
|
||||
-- Partial keeps the index tiny on the common case (valid sigs) and
|
||||
-- makes the forensics query instantaneous on the rare case.
|
||||
CREATE INDEX IF NOT EXISTS idx_hyperswitch_webhook_log_signature_invalid
|
||||
ON hyperswitch_webhook_log(received_at DESC)
|
||||
WHERE signature_valid = false;
|
||||
|
||||
-- request_id is required-NOT-NULL at the column level, so an index
|
||||
-- on it is just for "correlate this Veza log line with the webhook
|
||||
-- row". Non-unique because retries with the same request_id could
|
||||
-- land multiple rows (e.g., Hyperswitch redelivers a webhook).
|
||||
CREATE INDEX IF NOT EXISTS idx_hyperswitch_webhook_log_request_id
|
||||
ON hyperswitch_webhook_log(request_id);
|
||||
|
|
@ -0,0 +1 @@
|
|||
DROP TABLE IF EXISTS hyperswitch_webhook_log;
|
||||
Loading…
Reference in a new issue