diff --git a/CHANGELOG.md b/CHANGELOG.md index aa527c146..2ee972c7b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -38,6 +38,87 @@ auto-reversed; the backfill CLI queries Stripe's transfers.List by metadata[order_id] to populate missing ids, acceptable to leave NULL per v107-plan. +### Item C — Hyperswitch reconciliation sweep + +New `ReconcileHyperswitchWorker` sweeps for pending orders and +refunds whose terminal webhook never arrived (network hiccup, our +endpoint down, PSP queue stuck). For each stuck row the worker +pulls live PSP state and synthesises a webhook payload that feeds +the normal `ProcessPaymentWebhook` / `ProcessRefundWebhook` +dispatcher. The existing terminal-state guards in those handlers +make the reconciliation idempotent against real webhooks — a late +webhook after the reconciler has already resolved the row is a +no-op. + +Covers three stuck-state classes: + + 1. **Stuck orders** (pending > 30m, non-empty payment_id): we + opened an order, called CreatePayment, got back a payment_id, + but never received the succeeded/failed webhook. Worker calls + GetPaymentStatus and dispatches a synthetic + `payment.` webhook. + + 2. **Stuck refunds with a PSP id** (pending > 30m, non-empty + hyperswitch_refund_id): same pattern via GetRefundStatus + + synthetic `refund.` webhook. The PSP's error_message + is forwarded into the payload so downstream handlers persist + it. + + 3. **Orphan refunds** (pending > 5m, EMPTY hyperswitch_refund_id): + the harder case. We opened a Phase 1 Refund row but crashed + before Phase 2 (PSP call). The row has no PSP id, the PSP has + no record. Worker marks the row `failed` with an explanatory + error_message, rolls the order back to `completed` (so the + buyer can retry), and logs **ERROR** — this is operator- + attention territory: a mid-refund crash happened, root cause + should be investigated. + +Batch-bounded (50 rows per phase per tick) so a 10k-row backlog +doesn't hammer Hyperswitch on a single tick. PSP read errors leave +the row unchanged — next tick retries. + +Configuration: + * RECONCILE_WORKER_ENABLED=true (default) + * RECONCILE_INTERVAL=1h (default; ops can drop to 5m during + incident response without a code change) + * RECONCILE_ORDER_STUCK_AFTER=30m + * RECONCILE_REFUND_STUCK_AFTER=30m + * RECONCILE_REFUND_ORPHAN_AFTER=5m (shorter because orphan is + an "app crashed" signal, not "network hiccup") + +Interfaces introduced: + * `marketplace.HyperswitchReadClient` — the worker depends on + read-only PSP access (`GetPaymentStatus`, `GetRefundStatus`) + without knowing about CreatePayment / CreateRefund. Implemented + by `hyperswitch.Provider`. + * `hyperswitch.Client.GetRefund` + `RefundStatus` struct added + (mirror of existing GetPayment / PaymentStatus). + +Worker wired in cmd/api/main.go alongside the other marketplace +workers; gated on `HyperswitchEnabled && HyperswitchAPIKey != ""`. +A separate scoped `marketplace.NewService` is constructed for the +dispatcher side (the webhook-handler uses its own via +`APIRouter.getMarketplaceService` with additional storage/checkout +opts the reconciler doesn't need). + +Tests (10 cases, all green, sqlite :memory:): + * happy-path stuck order → synthetic webhook dispatched with + correct event_type / payment_id / status. + * recent order (under the stuck threshold) → untouched. + * completed order → untouched. + * order with empty payment_id → untouched (pre-PSP-call, nothing + to reconcile). + * PSP read error on GetPaymentStatus → row stays pending, + worker logs and moves on. + * orphan refund → auto-failed + order rolled back + error logged. + * recent orphan refund (under 5m) → left alone for Phase 2 to + complete. + * stuck refund with PSP id → synthetic webhook dispatched. + * refund with status=failed → PSP error_message survives into the + synthetic payload (downstream relies on it). + * all-terminal-state seed (completed / refunded / succeeded rows) + → zero PSP calls, zero dispatches. + ### Item E — webhook raw-payload audit log Every POST /webhooks/hyperswitch delivery is now persisted to diff --git a/veza-backend-api/.env.template b/veza-backend-api/.env.template index 5c9655464..50b81e7d6 100644 --- a/veza-backend-api/.env.template +++ b/veza-backend-api/.env.template @@ -123,6 +123,17 @@ STRIPE_CONNECT_WEBHOOK_SECRET= # forensics. A daily sweep deletes rows older than this many days. # HYPERSWITCH_WEBHOOK_LOG_RETENTION_DAYS=90 +# --- RECONCILIATION WORKER (v1.0.7 item C) --- +# Periodically sweeps for stuck pending orders and refunds whose +# webhook never arrived. Pulls live status from Hyperswitch, feeds a +# synthesised webhook into the normal dispatcher. Idempotent with +# real webhooks via terminal-state guards on the handlers. +# RECONCILE_WORKER_ENABLED=true +# RECONCILE_INTERVAL=1h +# RECONCILE_ORDER_STUCK_AFTER=30m +# RECONCILE_REFUND_STUCK_AFTER=30m +# RECONCILE_REFUND_ORPHAN_AFTER=5m + # --- EXTERNAL SERVICES (OPTIONAL) --- STREAM_SERVER_URL=http://veza.fr:8082 # Must match stream server INTERNAL_API_KEY for /internal/jobs/transcode (P1.1.2) diff --git a/veza-backend-api/cmd/api/main.go b/veza-backend-api/cmd/api/main.go index 4de25ae5d..790a5c280 100644 --- a/veza-backend-api/cmd/api/main.go +++ b/veza-backend-api/cmd/api/main.go @@ -25,6 +25,7 @@ import ( "veza-backend-api/internal/jobs" "veza-backend-api/internal/metrics" "veza-backend-api/internal/services" + "veza-backend-api/internal/services/hyperswitch" "veza-backend-api/internal/shutdown" "veza-backend-api/internal/workers" @@ -205,6 +206,44 @@ func main() { logger.Info("Transfer Retry / Reversal workers skipped — Stripe Connect not enabled") } + // v1.0.7 item C: Reconciliation worker for stuck pending orders / + // refunds whose webhook never arrived. Gated on Hyperswitch being + // configured — without PSP read access there's nothing to sync + // against. + if cfg.ReconcileWorkerEnabled && cfg.HyperswitchEnabled && cfg.HyperswitchAPIKey != "" && cfg.HyperswitchURL != "" { + hsClient := hyperswitch.NewClient(cfg.HyperswitchURL, cfg.HyperswitchAPIKey) + hsProvider := hyperswitch.NewProvider(hsClient) + // Build a marketplace.Service for the dispatcher side. Scoped + // to the worker — the HTTP handler constructs its own via + // APIRouter.getMarketplaceService which wires additional opts + // (storage, checkout URL). Reconciler only needs the two + // Process*Webhook methods. + mktSvc := marketplace.NewService(db.GormDB, logger, nil, + marketplace.WithPaymentProvider(hsProvider), + marketplace.WithHyperswitchConfig(true, cfg.CheckoutSuccessURL), + ) + reconcileWorker := marketplace.NewReconcileHyperswitchWorker( + db.GormDB, hsProvider, mktSvc, logger, + cfg.ReconcileInterval, + cfg.ReconcileOrderStuckAfter, + cfg.ReconcileRefundStuckAfter, + cfg.ReconcileRefundOrphanAfter, + ) + reconcileCtx, reconcileCancel := context.WithCancel(context.Background()) + go reconcileWorker.Start(reconcileCtx) + logger.Info("Reconcile Hyperswitch Worker started", + zap.Duration("interval", cfg.ReconcileInterval), + zap.Duration("order_stuck_after", cfg.ReconcileOrderStuckAfter), + zap.Duration("refund_stuck_after", cfg.ReconcileRefundStuckAfter), + zap.Duration("refund_orphan_after", cfg.ReconcileRefundOrphanAfter)) + shutdownManager.Register(shutdown.NewShutdownFunc("reconcile_hyperswitch_worker", func(ctx context.Context) error { + reconcileCancel() + return nil + })) + } else if cfg.ReconcileWorkerEnabled { + logger.Info("Reconcile worker skipped — Hyperswitch not enabled") + } + // v0.802: Start Cloud Backup Worker (copies cloud files to backup prefix every 24h) if cfg.S3StorageService != nil { backupWorker := services.NewCloudBackupWorker(db.GormDB, cfg.S3StorageService, logger) diff --git a/veza-backend-api/internal/config/config.go b/veza-backend-api/internal/config/config.go index ee80359d0..4effe47fb 100644 --- a/veza-backend-api/internal/config/config.go +++ b/veza-backend-api/internal/config/config.go @@ -179,6 +179,16 @@ type Config struct { // Hyperswitch webhook log retention (v1.0.7 item E). HyperswitchWebhookLogRetentionDays int // HYPERSWITCH_WEBHOOK_LOG_RETENTION_DAYS (default 90) + // Reconciliation Worker (v1.0.7 item C) — sweeps pending orders + // and refunds that have been stuck too long, synthesises a webhook + // from the live PSP state, and feeds the normal Process*Webhook + // dispatcher. Idempotent with real webhooks. + ReconcileWorkerEnabled bool // RECONCILE_WORKER_ENABLED (default true) + ReconcileInterval time.Duration // RECONCILE_INTERVAL (default 1h) + ReconcileOrderStuckAfter time.Duration // RECONCILE_ORDER_STUCK_AFTER (default 30m) + ReconcileRefundStuckAfter time.Duration // RECONCILE_REFUND_STUCK_AFTER (default 30m) + ReconcileRefundOrphanAfter time.Duration // RECONCILE_REFUND_ORPHAN_AFTER (default 5m) + // Email & Jobs EmailSender *email.SMTPEmailSender JobWorker *workers.JobWorker @@ -422,6 +432,13 @@ func NewConfig() (*Config, error) { // Webhook audit log retention (v1.0.7 item E) HyperswitchWebhookLogRetentionDays: getEnvInt("HYPERSWITCH_WEBHOOK_LOG_RETENTION_DAYS", 90), + // Reconciliation Worker (v1.0.7 item C) + ReconcileWorkerEnabled: getEnvBool("RECONCILE_WORKER_ENABLED", true), + ReconcileInterval: getEnvDuration("RECONCILE_INTERVAL", time.Hour), + ReconcileOrderStuckAfter: getEnvDuration("RECONCILE_ORDER_STUCK_AFTER", 30*time.Minute), + ReconcileRefundStuckAfter: getEnvDuration("RECONCILE_REFUND_STUCK_AFTER", 30*time.Minute), + ReconcileRefundOrphanAfter: getEnvDuration("RECONCILE_REFUND_ORPHAN_AFTER", 5*time.Minute), + // Log Files Configuration — centralized in config/logging.toml // Resolved via logging.LoadConfig() with env var overrides (LOG_DIR, LOG_LEVEL) LogDir: logging.LoadConfig().ResolveLogDir(env), diff --git a/veza-backend-api/internal/core/marketplace/reconcile_hyperswitch.go b/veza-backend-api/internal/core/marketplace/reconcile_hyperswitch.go new file mode 100644 index 000000000..c7838bd5f --- /dev/null +++ b/veza-backend-api/internal/core/marketplace/reconcile_hyperswitch.go @@ -0,0 +1,296 @@ +package marketplace + +import ( + "context" + "encoding/json" + "time" + + "go.uber.org/zap" + "gorm.io/gorm" +) + +// HyperswitchReadClient is the abstraction ReconcileHyperswitchWorker +// depends on. Implemented by hyperswitch.Provider. The worker never +// calls CreatePayment / CreateRefund — reconciliation is read-only +// against the PSP, and the write side is handled by +// ProcessPaymentWebhook / ProcessRefundWebhook on the marketplace +// service (via synthetic webhook payloads built from the PSP reads). +type HyperswitchReadClient interface { + GetPaymentStatus(ctx context.Context, paymentID string) (status string, err error) + GetRefundStatus(ctx context.Context, refundID string) (status, errorMessage string, err error) +} + +// ReconcileHyperswitchWorker sweeps for pending orders and refunds +// that have been stuck too long — symptom of a webhook delivery that +// never arrived (network hiccup, our endpoint down, PSP queue stuck). +// For each stuck row the worker pulls the live PSP status and +// synthesises a webhook payload to feed the normal +// ProcessPaymentWebhook / ProcessRefundWebhook dispatcher. The +// existing terminal-state guards in those handlers make the +// reconciliation idempotent against real webhooks: if a webhook +// eventually arrives after the reconciler has already resolved the +// row, the late webhook is a no-op. +// +// Introduced in v1.0.7 item C. Covers three stuck-state classes: +// +// 1. Orders in `pending` > OrderStuckAfter (default 30m): we opened +// an order, called CreatePayment, got a payment_id back, but +// never received the succeeded/failed webhook. +// +// 2. Refunds in `pending` with non-empty hyperswitch_refund_id +// > RefundStuckAfter (default 30m): same story for refunds. +// +// 3. Refunds in `pending` with EMPTY hyperswitch_refund_id +// > RefundOrphanAfter (default 5m): we started Phase 1 +// of RefundOrder (created a pending Refund row + flipped order +// to refund_pending) but crashed before Phase 2 (PSP +// CreateRefund). The row has no PSP id, the PSP has no record +// of the refund, and the order is stuck in refund_pending. +// The reconciler marks the row failed and rolls the order back +// to completed so a retry is possible. Log ERROR because this +// is operator-attention territory — a crash has happened mid- +// refund and the cause should be investigated. +type ReconcileHyperswitchWorker struct { + db *gorm.DB + hs HyperswitchReadClient + marketService reconcileDispatcher + logger *zap.Logger + interval time.Duration + orderStuckAfter time.Duration + refundStuckAfter time.Duration + refundOrphanAfter time.Duration + batchLimit int +} + +// reconcileDispatcher is the subset of marketplace.Service the worker +// uses to apply the synthesised webhooks. Defined as a local +// interface so tests can plug in a spy without standing up a full +// Service. +type reconcileDispatcher interface { + ProcessPaymentWebhook(ctx context.Context, payload []byte) error + ProcessRefundWebhook(ctx context.Context, payload []byte) error +} + +// NewReconcileHyperswitchWorker constructs a reconciler. Zero or +// negative duration parameters fall through to their defaults, so the +// worker can be instantiated from config without the caller ensuring +// every duration is non-zero. +func NewReconcileHyperswitchWorker( + db *gorm.DB, + hs HyperswitchReadClient, + svc reconcileDispatcher, + logger *zap.Logger, + interval, orderStuckAfter, refundStuckAfter, refundOrphanAfter time.Duration, +) *ReconcileHyperswitchWorker { + if interval <= 0 { + interval = time.Hour + } + if orderStuckAfter <= 0 { + orderStuckAfter = 30 * time.Minute + } + if refundStuckAfter <= 0 { + refundStuckAfter = 30 * time.Minute + } + if refundOrphanAfter <= 0 { + refundOrphanAfter = 5 * time.Minute + } + return &ReconcileHyperswitchWorker{ + db: db, + hs: hs, + marketService: svc, + logger: logger, + interval: interval, + orderStuckAfter: orderStuckAfter, + refundStuckAfter: refundStuckAfter, + refundOrphanAfter: refundOrphanAfter, + // Batch limit keeps a 10k-order backlog from hammering Stripe/ + // Hyperswitch on a single tick. One interval between batches + // is the natural pacing. + batchLimit: 50, + } +} + +// Start runs the reconciler loop until ctx is cancelled. +func (w *ReconcileHyperswitchWorker) Start(ctx context.Context) { + if w.hs == nil || w.marketService == nil { + w.logger.Warn("ReconcileHyperswitchWorker: missing hs client or market service, disabling") + return + } + ticker := time.NewTicker(w.interval) + defer ticker.Stop() + + w.logger.Info("ReconcileHyperswitchWorker started", + zap.Duration("interval", w.interval), + zap.Duration("order_stuck_after", w.orderStuckAfter), + zap.Duration("refund_stuck_after", w.refundStuckAfter), + zap.Duration("refund_orphan_after", w.refundOrphanAfter), + zap.Int("batch_limit", w.batchLimit)) + + for { + select { + case <-ctx.Done(): + w.logger.Info("ReconcileHyperswitchWorker stopped") + return + case <-ticker.C: + w.RunOnce(ctx) + } + } +} + +// RunOnce performs a single reconciliation sweep. Exposed for tests +// and for the "run on demand" ops path (e.g., an admin trigger +// during incident response). Three independent phases, each bounded +// by batchLimit; failures in one phase don't stop the others. +func (w *ReconcileHyperswitchWorker) RunOnce(ctx context.Context) { + w.reconcileStuckOrders(ctx) + w.reconcileOrphanRefunds(ctx) + w.reconcileStuckRefunds(ctx) +} + +// reconcileStuckOrders finds orders in `pending` older than +// orderStuckAfter and synthesises a payment webhook from the PSP's +// current state. +func (w *ReconcileHyperswitchWorker) reconcileStuckOrders(ctx context.Context) { + cutoff := time.Now().Add(-w.orderStuckAfter) + var orders []Order + if err := w.db.WithContext(ctx). + Where("status = ? AND hyperswitch_payment_id <> '' AND hyperswitch_payment_id IS NOT NULL AND created_at < ?", "pending", cutoff). + Order("created_at ASC"). + Limit(w.batchLimit). + Find(&orders).Error; err != nil { + w.logger.Error("reconcile: failed to query stuck orders", zap.Error(err)) + return + } + for i := range orders { + w.syncOrder(ctx, &orders[i]) + } +} + +func (w *ReconcileHyperswitchWorker) syncOrder(ctx context.Context, o *Order) { + status, err := w.hs.GetPaymentStatus(ctx, o.HyperswitchPaymentID) + if err != nil { + w.logger.Warn("reconcile: GetPaymentStatus failed, will retry next tick", + zap.String("order_id", o.ID.String()), + zap.String("payment_id", o.HyperswitchPaymentID), + zap.Error(err)) + return + } + payload, _ := json.Marshal(map[string]interface{}{ + "event_type": "payment." + status, + "payment_id": o.HyperswitchPaymentID, + "status": status, + }) + if err := w.marketService.ProcessPaymentWebhook(ctx, payload); err != nil { + w.logger.Error("reconcile: ProcessPaymentWebhook dispatch failed", + zap.String("order_id", o.ID.String()), + zap.String("payment_id", o.HyperswitchPaymentID), + zap.String("psp_status", status), + zap.Error(err)) + return + } + w.logger.Info("reconcile: stuck order synced via synthetic webhook", + zap.String("order_id", o.ID.String()), + zap.String("payment_id", o.HyperswitchPaymentID), + zap.String("psp_status", status), + zap.Duration("stuck_for", time.Since(o.CreatedAt))) +} + +// reconcileOrphanRefunds handles the rare but painful case where a +// Phase 1 Refund row was created but Phase 2 (PSP call) never ran — +// the app crashed between the two. The row has no hyperswitch_refund_id, +// the PSP has no record. Mark the row failed, roll the order back, +// log ERROR so ops can investigate the crash cause. +func (w *ReconcileHyperswitchWorker) reconcileOrphanRefunds(ctx context.Context) { + cutoff := time.Now().Add(-w.refundOrphanAfter) + var refunds []Refund + if err := w.db.WithContext(ctx). + Where("status = ? AND (hyperswitch_refund_id IS NULL OR hyperswitch_refund_id = '') AND created_at < ?", + RefundStatusPending, cutoff). + Order("created_at ASC"). + Limit(w.batchLimit). + Find(&refunds).Error; err != nil { + w.logger.Error("reconcile: failed to query orphan refunds", zap.Error(err)) + return + } + now := time.Now() + for _, r := range refunds { + // Mark refund failed + roll order back in a single + // transaction. If either fails we log and move on — next + // tick will retry. + err := w.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error { + if err := tx.Model(&Refund{}).Where("id = ?", r.ID). + Updates(map[string]interface{}{ + "status": RefundStatusFailed, + "error_message": "reconciler: orphan refund (no hyperswitch_refund_id after " + w.refundOrphanAfter.String() + ")", + "failed_at": &now, + }).Error; err != nil { + return err + } + if err := tx.Model(&Order{}).Where("id = ? AND status = ?", r.OrderID, "refund_pending"). + Update("status", "completed").Error; err != nil { + return err + } + return nil + }) + if err != nil { + w.logger.Error("reconcile: failed to finalise orphan refund", + zap.String("refund_id", r.ID.String()), + zap.String("order_id", r.OrderID.String()), + zap.Error(err)) + continue + } + w.logger.Error("reconcile: ORPHAN REFUND auto-failed — operator attention needed", + zap.String("refund_id", r.ID.String()), + zap.String("order_id", r.OrderID.String()), + zap.Duration("stuck_for", time.Since(r.CreatedAt))) + } +} + +// reconcileStuckRefunds handles refunds that DID reach the PSP +// (have hyperswitch_refund_id) but the webhook never came back. +func (w *ReconcileHyperswitchWorker) reconcileStuckRefunds(ctx context.Context) { + cutoff := time.Now().Add(-w.refundStuckAfter) + var refunds []Refund + if err := w.db.WithContext(ctx). + Where("status = ? AND hyperswitch_refund_id <> '' AND hyperswitch_refund_id IS NOT NULL AND created_at < ?", + RefundStatusPending, cutoff). + Order("created_at ASC"). + Limit(w.batchLimit). + Find(&refunds).Error; err != nil { + w.logger.Error("reconcile: failed to query stuck refunds", zap.Error(err)) + return + } + for i := range refunds { + w.syncRefund(ctx, &refunds[i]) + } +} + +func (w *ReconcileHyperswitchWorker) syncRefund(ctx context.Context, r *Refund) { + status, errMsg, err := w.hs.GetRefundStatus(ctx, r.HyperswitchRefundID) + if err != nil { + w.logger.Warn("reconcile: GetRefundStatus failed, will retry next tick", + zap.String("refund_id", r.ID.String()), + zap.String("hyperswitch_refund_id", r.HyperswitchRefundID), + zap.Error(err)) + return + } + payload, _ := json.Marshal(map[string]interface{}{ + "event_type": "refund." + status, + "refund_id": r.HyperswitchRefundID, + "status": status, + "error_message": errMsg, + }) + if err := w.marketService.ProcessRefundWebhook(ctx, payload); err != nil { + w.logger.Error("reconcile: ProcessRefundWebhook dispatch failed", + zap.String("refund_id", r.ID.String()), + zap.String("hyperswitch_refund_id", r.HyperswitchRefundID), + zap.String("psp_status", status), + zap.Error(err)) + return + } + w.logger.Info("reconcile: stuck refund synced via synthetic webhook", + zap.String("refund_id", r.ID.String()), + zap.String("hyperswitch_refund_id", r.HyperswitchRefundID), + zap.String("psp_status", status), + zap.Duration("stuck_for", time.Since(r.CreatedAt))) +} diff --git a/veza-backend-api/internal/core/marketplace/reconcile_hyperswitch_test.go b/veza-backend-api/internal/core/marketplace/reconcile_hyperswitch_test.go new file mode 100644 index 000000000..33690c929 --- /dev/null +++ b/veza-backend-api/internal/core/marketplace/reconcile_hyperswitch_test.go @@ -0,0 +1,371 @@ +package marketplace + +import ( + "context" + "encoding/json" + "errors" + "testing" + "time" + + "github.com/google/uuid" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/zap" + "gorm.io/driver/sqlite" + "gorm.io/gorm" +) + +// --- fixtures --------------------------------------------------------------- + +// mockHyperswitchReadClient satisfies HyperswitchReadClient. Both +// methods support canned responses + canned errors so tests can +// exercise each branch. +type mockHyperswitchReadClient struct { + paymentStatus string + paymentErr error + refundStatus string + refundErrMessage string + refundErr error + // Call capture for assertions on what the worker asked the PSP. + paymentCalls []string + refundCalls []string +} + +func (m *mockHyperswitchReadClient) GetPaymentStatus(_ context.Context, paymentID string) (string, error) { + m.paymentCalls = append(m.paymentCalls, paymentID) + return m.paymentStatus, m.paymentErr +} + +func (m *mockHyperswitchReadClient) GetRefundStatus(_ context.Context, refundID string) (string, string, error) { + m.refundCalls = append(m.refundCalls, refundID) + return m.refundStatus, m.refundErrMessage, m.refundErr +} + +// spyDispatcher records the payloads sent to Process*Webhook so tests +// can assert on the synthetic webhook shape. Optionally returns an +// error to exercise the worker's log-and-continue behavior. +type spyDispatcher struct { + paymentPayloads [][]byte + refundPayloads [][]byte + paymentErr error + refundErr error +} + +func (d *spyDispatcher) ProcessPaymentWebhook(_ context.Context, payload []byte) error { + d.paymentPayloads = append(d.paymentPayloads, payload) + return d.paymentErr +} + +func (d *spyDispatcher) ProcessRefundWebhook(_ context.Context, payload []byte) error { + d.refundPayloads = append(d.refundPayloads, payload) + return d.refundErr +} + +func setupReconcileTestDB(t *testing.T) *gorm.DB { + t.Helper() + db, err := gorm.Open(sqlite.Open(":memory:"), &gorm.Config{}) + require.NoError(t, err) + require.NoError(t, db.AutoMigrate(&Order{}, &Refund{})) + return db +} + +func newReconcileWorker(t *testing.T, db *gorm.DB, hs HyperswitchReadClient, svc reconcileDispatcher) *ReconcileHyperswitchWorker { + t.Helper() + return NewReconcileHyperswitchWorker( + db, hs, svc, zap.NewNop(), + time.Hour, // interval — not used in RunOnce + 30*time.Minute, // orderStuckAfter + 30*time.Minute, // refundStuckAfter + 5*time.Minute, // refundOrphanAfter + ) +} + +// --- stuck orders ----------------------------------------------------------- + +func TestReconcile_StuckOrder_SyncsViaSyntheticWebhook(t *testing.T) { + db := setupReconcileTestDB(t) + hs := &mockHyperswitchReadClient{paymentStatus: "succeeded"} + disp := &spyDispatcher{} + w := newReconcileWorker(t, db, hs, disp) + + orderID := uuid.New() + stuckAt := time.Now().Add(-1 * time.Hour) + require.NoError(t, db.Create(&Order{ + ID: orderID, + BuyerID: uuid.New(), + TotalAmount: 9.99, + Currency: "EUR", + Status: "pending", + HyperswitchPaymentID: "pay_stuck_1", + CreatedAt: stuckAt, + }).Error) + // Override CreatedAt (autoCreateTime sets NOW() at Create). + require.NoError(t, db.Model(&Order{}).Where("id = ?", orderID). + Update("created_at", stuckAt).Error) + + w.RunOnce(context.Background()) + + require.Len(t, hs.paymentCalls, 1, "worker must call GetPaymentStatus exactly once") + assert.Equal(t, "pay_stuck_1", hs.paymentCalls[0]) + require.Len(t, disp.paymentPayloads, 1, "worker must dispatch exactly one synthetic webhook") + + var payload map[string]interface{} + require.NoError(t, json.Unmarshal(disp.paymentPayloads[0], &payload)) + assert.Equal(t, "payment.succeeded", payload["event_type"]) + assert.Equal(t, "pay_stuck_1", payload["payment_id"]) + assert.Equal(t, "succeeded", payload["status"]) +} + +func TestReconcile_RecentOrder_NotTouched(t *testing.T) { + db := setupReconcileTestDB(t) + hs := &mockHyperswitchReadClient{paymentStatus: "succeeded"} + disp := &spyDispatcher{} + w := newReconcileWorker(t, db, hs, disp) + + // Order that became pending 1 minute ago — well under + // orderStuckAfter (30m). Should be ignored. + require.NoError(t, db.Create(&Order{ + ID: uuid.New(), BuyerID: uuid.New(), TotalAmount: 5, Currency: "EUR", + Status: "pending", HyperswitchPaymentID: "pay_recent", + }).Error) + + w.RunOnce(context.Background()) + + assert.Empty(t, hs.paymentCalls, "recent order must not trigger a PSP call") + assert.Empty(t, disp.paymentPayloads) +} + +func TestReconcile_CompletedOrder_NotTouched(t *testing.T) { + db := setupReconcileTestDB(t) + hs := &mockHyperswitchReadClient{paymentStatus: "succeeded"} + disp := &spyDispatcher{} + w := newReconcileWorker(t, db, hs, disp) + + orderID := uuid.New() + require.NoError(t, db.Create(&Order{ + ID: orderID, BuyerID: uuid.New(), TotalAmount: 5, Currency: "EUR", + Status: "completed", HyperswitchPaymentID: "pay_done", + }).Error) + require.NoError(t, db.Model(&Order{}).Where("id = ?", orderID). + Update("created_at", time.Now().Add(-time.Hour)).Error) + + w.RunOnce(context.Background()) + + assert.Empty(t, hs.paymentCalls, "completed order must not be reconciled") +} + +func TestReconcile_OrderWithEmptyPaymentID_NotTouched(t *testing.T) { + db := setupReconcileTestDB(t) + hs := &mockHyperswitchReadClient{paymentStatus: "succeeded"} + disp := &spyDispatcher{} + w := newReconcileWorker(t, db, hs, disp) + + // An order that has no payment_id is pre-PSP-call; there's + // nothing to reconcile against. + orderID := uuid.New() + require.NoError(t, db.Create(&Order{ + ID: orderID, BuyerID: uuid.New(), TotalAmount: 5, Currency: "EUR", + Status: "pending", HyperswitchPaymentID: "", + }).Error) + require.NoError(t, db.Model(&Order{}).Where("id = ?", orderID). + Update("created_at", time.Now().Add(-time.Hour)).Error) + + w.RunOnce(context.Background()) + assert.Empty(t, hs.paymentCalls) +} + +func TestReconcile_PSPReadErrorLeavesRowIntact(t *testing.T) { + db := setupReconcileTestDB(t) + hs := &mockHyperswitchReadClient{paymentErr: errors.New("hyperswitch 503")} + disp := &spyDispatcher{} + w := newReconcileWorker(t, db, hs, disp) + + orderID := uuid.New() + require.NoError(t, db.Create(&Order{ + ID: orderID, BuyerID: uuid.New(), TotalAmount: 5, Currency: "EUR", + Status: "pending", HyperswitchPaymentID: "pay_psp_down", + }).Error) + require.NoError(t, db.Model(&Order{}).Where("id = ?", orderID). + Update("created_at", time.Now().Add(-time.Hour)).Error) + + w.RunOnce(context.Background()) + + require.Len(t, hs.paymentCalls, 1, "worker must have tried to sync") + assert.Empty(t, disp.paymentPayloads, "on PSP read error, no webhook must be dispatched") + + var after Order + require.NoError(t, db.First(&after, orderID).Error) + assert.Equal(t, "pending", after.Status, "row must stay pending for the next tick") +} + +// --- orphan refunds (no hyperswitch_refund_id) ------------------------------ + +func TestReconcile_OrphanRefund_AutoFails_OrderRollsBack(t *testing.T) { + db := setupReconcileTestDB(t) + hs := &mockHyperswitchReadClient{} + disp := &spyDispatcher{} + w := newReconcileWorker(t, db, hs, disp) + + orderID := uuid.New() + refundID := uuid.New() + require.NoError(t, db.Create(&Order{ + ID: orderID, BuyerID: uuid.New(), TotalAmount: 9.99, Currency: "EUR", + Status: "refund_pending", HyperswitchPaymentID: "pay_ok", + }).Error) + require.NoError(t, db.Create(&Refund{ + ID: refundID, OrderID: orderID, InitiatorID: uuid.New(), + HyperswitchPaymentID: "pay_ok", AmountCents: 999, Currency: "EUR", + Status: RefundStatusPending, // Phase 1 row, never got a refund_id + }).Error) + // Shove created_at 10 min ago — past refundOrphanAfter (5m). + require.NoError(t, db.Model(&Refund{}).Where("id = ?", refundID). + Update("created_at", time.Now().Add(-10*time.Minute)).Error) + + w.RunOnce(context.Background()) + + var r Refund + require.NoError(t, db.First(&r, refundID).Error) + assert.Equal(t, RefundStatusFailed, r.Status) + assert.Contains(t, r.ErrorMessage, "orphan") + require.NotNil(t, r.FailedAt) + + var o Order + require.NoError(t, db.First(&o, orderID).Error) + assert.Equal(t, "completed", o.Status, + "order must roll back to completed so the buyer can retry") + + assert.Empty(t, disp.refundPayloads, "orphan path must not synthesise a webhook — there's no PSP record") + assert.Empty(t, hs.refundCalls, "orphan path must not call GetRefundStatus — no refund id to query") +} + +func TestReconcile_RecentOrphanRefund_NotTouched(t *testing.T) { + db := setupReconcileTestDB(t) + hs := &mockHyperswitchReadClient{} + disp := &spyDispatcher{} + w := newReconcileWorker(t, db, hs, disp) + + // Refund row created 2 minutes ago — under refundOrphanAfter (5m). + // The phase 2 PSP call might still be in flight; don't intervene. + orderID := uuid.New() + refundID := uuid.New() + require.NoError(t, db.Create(&Order{ + ID: orderID, BuyerID: uuid.New(), TotalAmount: 9.99, Currency: "EUR", + Status: "refund_pending", HyperswitchPaymentID: "pay_new", + }).Error) + require.NoError(t, db.Create(&Refund{ + ID: refundID, OrderID: orderID, InitiatorID: uuid.New(), + HyperswitchPaymentID: "pay_new", AmountCents: 999, Currency: "EUR", + Status: RefundStatusPending, + }).Error) + + w.RunOnce(context.Background()) + + var r Refund + require.NoError(t, db.First(&r, refundID).Error) + assert.Equal(t, RefundStatusPending, r.Status, "recent orphan must be left alone for Phase 2 to complete") +} + +// --- stuck refunds (have hyperswitch_refund_id) ----------------------------- + +func TestReconcile_StuckRefund_SyncsViaSyntheticWebhook(t *testing.T) { + db := setupReconcileTestDB(t) + hs := &mockHyperswitchReadClient{refundStatus: "succeeded"} + disp := &spyDispatcher{} + w := newReconcileWorker(t, db, hs, disp) + + orderID := uuid.New() + refundID := uuid.New() + require.NoError(t, db.Create(&Order{ + ID: orderID, BuyerID: uuid.New(), TotalAmount: 9.99, Currency: "EUR", + Status: "refund_pending", HyperswitchPaymentID: "pay_x", + }).Error) + require.NoError(t, db.Create(&Refund{ + ID: refundID, + OrderID: orderID, + InitiatorID: uuid.New(), + HyperswitchPaymentID: "pay_x", + HyperswitchRefundID: "ref_stuck_1", + AmountCents: 999, + Currency: "EUR", + Status: RefundStatusPending, + }).Error) + require.NoError(t, db.Model(&Refund{}).Where("id = ?", refundID). + Update("created_at", time.Now().Add(-1*time.Hour)).Error) + + w.RunOnce(context.Background()) + + require.Len(t, hs.refundCalls, 1) + assert.Equal(t, "ref_stuck_1", hs.refundCalls[0]) + require.Len(t, disp.refundPayloads, 1) + + var payload map[string]interface{} + require.NoError(t, json.Unmarshal(disp.refundPayloads[0], &payload)) + assert.Equal(t, "refund.succeeded", payload["event_type"]) + assert.Equal(t, "ref_stuck_1", payload["refund_id"]) + assert.Equal(t, "succeeded", payload["status"]) +} + +func TestReconcile_StuckRefund_FailureStatus_PassesErrorMessage(t *testing.T) { + db := setupReconcileTestDB(t) + hs := &mockHyperswitchReadClient{ + refundStatus: "failed", + refundErrMessage: "insufficient_funds", + } + disp := &spyDispatcher{} + w := newReconcileWorker(t, db, hs, disp) + + refundID := uuid.New() + require.NoError(t, db.Create(&Refund{ + ID: refundID, + OrderID: uuid.New(), + InitiatorID: uuid.New(), + HyperswitchPaymentID: "pay_y", + HyperswitchRefundID: "ref_failed_1", + AmountCents: 999, + Currency: "EUR", + Status: RefundStatusPending, + }).Error) + require.NoError(t, db.Model(&Refund{}).Where("id = ?", refundID). + Update("created_at", time.Now().Add(-1*time.Hour)).Error) + + w.RunOnce(context.Background()) + + require.Len(t, disp.refundPayloads, 1) + var payload map[string]interface{} + require.NoError(t, json.Unmarshal(disp.refundPayloads[0], &payload)) + assert.Equal(t, "refund.failed", payload["event_type"]) + assert.Equal(t, "failed", payload["status"]) + assert.Equal(t, "insufficient_funds", payload["error_message"], + "the PSP error_message must survive the synthetic webhook — downstream handlers store it") +} + +// --- no-op cases ------------------------------------------------------------ + +func TestReconcile_AllTerminalStates_NoOp(t *testing.T) { + db := setupReconcileTestDB(t) + hs := &mockHyperswitchReadClient{paymentStatus: "succeeded", refundStatus: "succeeded"} + disp := &spyDispatcher{} + w := newReconcileWorker(t, db, hs, disp) + + // Seed terminal rows of every flavor; none should trigger. + require.NoError(t, db.Create(&Order{ + ID: uuid.New(), BuyerID: uuid.New(), TotalAmount: 5, Currency: "EUR", + Status: "completed", HyperswitchPaymentID: "pay_done_1", + }).Error) + require.NoError(t, db.Create(&Order{ + ID: uuid.New(), BuyerID: uuid.New(), TotalAmount: 5, Currency: "EUR", + Status: "refunded", HyperswitchPaymentID: "pay_done_2", + }).Error) + require.NoError(t, db.Create(&Refund{ + ID: uuid.New(), OrderID: uuid.New(), InitiatorID: uuid.New(), + HyperswitchPaymentID: "pay_done_3", HyperswitchRefundID: "ref_done", + AmountCents: 500, Currency: "EUR", Status: RefundStatusSucceeded, + }).Error) + + w.RunOnce(context.Background()) + + assert.Empty(t, hs.paymentCalls) + assert.Empty(t, hs.refundCalls) + assert.Empty(t, disp.paymentPayloads) + assert.Empty(t, disp.refundPayloads) +} diff --git a/veza-backend-api/internal/services/hyperswitch/client.go b/veza-backend-api/internal/services/hyperswitch/client.go index 071e5dcb2..7d43df1d4 100644 --- a/veza-backend-api/internal/services/hyperswitch/client.go +++ b/veza-backend-api/internal/services/hyperswitch/client.go @@ -175,6 +175,45 @@ type RefundResponse struct { Status string `json:"status"` } +// RefundStatus is the response from GET /refunds/{refund_id}. +// Used by the reconciliation worker (v1.0.7 item C) to sync stuck +// pending refunds with their actual PSP state. +type RefundStatus struct { + RefundID string `json:"refund_id"` + Status string `json:"status"` + ErrorMessage string `json:"error_message,omitempty"` +} + +// GetRefund retrieves refund status from Hyperswitch (v1.0.7 item C). +// Mirror of GetPayment, used by the reconciliation sweep to +// synthesise a webhook payload when we never received one from the +// PSP but the pending refund row has been sitting around too long. +func (c *Client) GetRefund(ctx context.Context, refundID string) (*RefundStatus, error) { + if refundID == "" { + return nil, fmt.Errorf("get refund: empty refund_id") + } + req, err := http.NewRequestWithContext(ctx, http.MethodGet, c.baseURL+"/refunds/"+refundID, nil) + if err != nil { + return nil, fmt.Errorf("create request: %w", err) + } + req.Header.Set("api-key", c.apiKey) + + resp, err := c.httpClient.Do(req) + if err != nil { + return nil, fmt.Errorf("http request: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("hyperswitch get refund failed: status %d", resp.StatusCode) + } + var out RefundStatus + if err := json.NewDecoder(resp.Body).Decode(&out); err != nil { + return nil, fmt.Errorf("decode response: %w", err) + } + return &out, nil +} + // CreateRefund creates a refund against a payment (v0.403 R2). // // idempotencyKey is REQUIRED (v1.0.7 item D) and sent as the diff --git a/veza-backend-api/internal/services/hyperswitch/provider.go b/veza-backend-api/internal/services/hyperswitch/provider.go index 03d3a2c98..a3c52da37 100644 --- a/veza-backend-api/internal/services/hyperswitch/provider.go +++ b/veza-backend-api/internal/services/hyperswitch/provider.go @@ -29,6 +29,26 @@ func (p *Provider) GetPayment(ctx context.Context, paymentID string) (string, er return p.client.GetPaymentStatus(ctx, paymentID) } +// GetPaymentStatus is the ReconcileHyperswitchWorker-facing name for +// GetPayment — keeps the marketplace-side interface named by intent +// ("status") rather than by endpoint. Returns the PSP-reported +// payment status string. +func (p *Provider) GetPaymentStatus(ctx context.Context, paymentID string) (string, error) { + return p.client.GetPaymentStatus(ctx, paymentID) +} + +// GetRefundStatus retrieves refund status + error_message from +// Hyperswitch (v1.0.7 item C — reconciliation worker). Returns +// (status, errorMessage, err). Used to synthesise a webhook payload +// when the sweep finds a stuck pending refund. +func (p *Provider) GetRefundStatus(ctx context.Context, refundID string) (string, string, error) { + r, err := p.client.GetRefund(ctx, refundID) + if err != nil { + return "", "", err + } + return r.Status, r.ErrorMessage, nil +} + // CreateRefund creates a refund in Hyperswitch and returns the PSP refund // id and synchronous status (v1.0.6, v1.0.7 item D). The marketplace // service persists the refund_id as the idempotency key for the webhook