diff --git a/veza-backend-api/internal/api/routes_webhooks.go b/veza-backend-api/internal/api/routes_webhooks.go index 0117e29c2..074714b1d 100644 --- a/veza-backend-api/internal/api/routes_webhooks.go +++ b/veza-backend-api/internal/api/routes_webhooks.go @@ -201,5 +201,12 @@ func (r *APIRouter) getMarketplaceService() marketplace.MarketplaceService { scs := services.NewStripeConnectService(r.db.GormDB, r.config.StripeConnectSecretKey, r.logger) opts = append(opts, marketplace.WithTransferService(scs, r.config.PlatformFeeRate)) } + // v1.0.9 item G Phase 2: subscription webhook dispatcher. The + // processor reads/writes subscription_invoices + user_subscriptions + // directly via GORM; injecting it here makes ProcessPaymentWebhook + // route subscription events instead of swallowing them as "order not + // found". + subscriptionProcessor := hyperswitch.NewSubscriptionWebhookProcessor(r.db.GormDB, r.logger) + opts = append(opts, marketplace.WithSubscriptionWebhookHandler(subscriptionProcessor)) return marketplace.NewService(r.db.GormDB, r.logger, storageService, opts...) } diff --git a/veza-backend-api/internal/core/marketplace/service.go b/veza-backend-api/internal/core/marketplace/service.go index 513d432ff..52d2f3695 100644 --- a/veza-backend-api/internal/core/marketplace/service.go +++ b/veza-backend-api/internal/core/marketplace/service.go @@ -26,8 +26,29 @@ var ( ErrTrackNotFound = errors.New("track not found") ErrNoLicense = errors.New("no valid license found") ErrPromoCodeInvalid = errors.New("promo code invalid or expired") + + // ErrNotASubscription signals that a webhook's payment_id is not for + // a subscription invoice. The dispatcher in ProcessPaymentWebhook + // uses this sentinel to decide whether to fall through to the order + // flow. (v1.0.9 item G Phase 2.) + ErrNotASubscription = errors.New("payment_id not for a subscription invoice") ) +// SubscriptionWebhookHandler is the contract the marketplace dispatcher +// uses to delegate subscription webhooks to the hyperswitch service. The +// concrete implementation lives in internal/services/hyperswitch — this +// interface keeps marketplace from depending on the hyperswitch package +// directly while still allowing the dispatcher to route subscription +// events to a typed handler. (v1.0.9 item G Phase 2.) +type SubscriptionWebhookHandler interface { + // ProcessSubscriptionPayment applies the state transition for a + // subscription-flavored payment webhook. Implementations MUST return + // ErrNotASubscription when the payment_id is not associated with a + // subscription invoice — the caller relies on this sentinel to fall + // through to the order webhook flow. + ProcessSubscriptionPayment(ctx context.Context, paymentID, status string) error +} + // NewOrderItem represents an item to be ordered type NewOrderItem struct { ProductID uuid.UUID @@ -197,14 +218,15 @@ var ( // Service implémente MarketplaceService type Service struct { - db *gorm.DB - logger *zap.Logger - storage StorageService - paymentProvider PaymentProvider - hyperswitchEnabled bool - checkoutSuccessURL string - transferService TransferService - platformFeeRate float64 + db *gorm.DB + logger *zap.Logger + storage StorageService + paymentProvider PaymentProvider + hyperswitchEnabled bool + checkoutSuccessURL string + transferService TransferService + platformFeeRate float64 + subscriptionWebhookHandler SubscriptionWebhookHandler } // ServiceOption configures the marketplace Service. @@ -231,6 +253,14 @@ func WithTransferService(ts TransferService, feeRate float64) ServiceOption { } } +// WithSubscriptionWebhookHandler injects the dispatcher for subscription +// webhooks. When set, ProcessPaymentWebhook attempts subscription +// processing first and falls through to the order flow only on +// ErrNotASubscription. (v1.0.9 item G Phase 2.) +func WithSubscriptionWebhookHandler(h SubscriptionWebhookHandler) ServiceOption { + return func(s *Service) { s.subscriptionWebhookHandler = h } +} + // NewService creates a new Marketplace service instance func NewService(db *gorm.DB, logger *zap.Logger, storage StorageService, opts ...ServiceOption) *Service { s := &Service{ @@ -729,6 +759,20 @@ func (s *Service) ProcessPaymentWebhook(ctx context.Context, payload []byte) err } status := wp.getStatus() + // v1.0.9 item G Phase 2: subscription dispatcher. Try the subscription + // flow first; if the payment_id maps to a subscription invoice, the + // handler owns the state transition and we're done. Otherwise it + // returns ErrNotASubscription and we fall through to the order flow. + if s.subscriptionWebhookHandler != nil { + err := s.subscriptionWebhookHandler.ProcessSubscriptionPayment(ctx, paymentID, status) + if err == nil { + return nil + } + if !errors.Is(err, ErrNotASubscription) { + return fmt.Errorf("subscription webhook processing failed: %w", err) + } + } + var order Order if err := s.db.WithContext(ctx).Where("hyperswitch_payment_id = ?", paymentID).First(&order).Error; err != nil { if errors.Is(err, gorm.ErrRecordNotFound) { diff --git a/veza-backend-api/internal/services/hyperswitch/webhook_subscription.go b/veza-backend-api/internal/services/hyperswitch/webhook_subscription.go new file mode 100644 index 000000000..8aa67646e --- /dev/null +++ b/veza-backend-api/internal/services/hyperswitch/webhook_subscription.go @@ -0,0 +1,180 @@ +package hyperswitch + +import ( + "context" + "errors" + "fmt" + "strings" + "time" + + "go.uber.org/zap" + "gorm.io/gorm" + + "veza-backend-api/internal/core/marketplace" + "veza-backend-api/internal/core/subscription" +) + +// SubscriptionWebhookProcessor handles Hyperswitch webhooks that target +// subscription invoices (v1.0.9 item G Phase 2 — closes the +// pending_payment state machine opened in Phase 1). +// +// State transitions: +// - pending_payment + status=succeeded → invoice paid, sub active +// - pending_payment + status=failed → invoice failed, sub expired +// - already terminal → idempotent no-op (Hyperswitch +// re-emits webhooks until 200 OK; we must accept replays) +// - payment_id not in subscription_invoices → marketplace.ErrNotASubscription +// (caller falls through to order webhook flow) +// +// The processor only flips a subscription out of pending_payment. +// Subscriptions that have already transitioned (concurrent flow, manual +// admin action, plan upgrade) are left alone — the invoice still gets +// the terminal status update so the audit trail is consistent. +type SubscriptionWebhookProcessor struct { + db *gorm.DB + logger *zap.Logger +} + +// NewSubscriptionWebhookProcessor constructs a processor bound to the +// given DB. The logger is optional; nil is replaced by zap.NewNop so the +// happy path doesn't crash on a forgotten DI wire-up. +func NewSubscriptionWebhookProcessor(db *gorm.DB, logger *zap.Logger) *SubscriptionWebhookProcessor { + if logger == nil { + logger = zap.NewNop() + } + return &SubscriptionWebhookProcessor{db: db, logger: logger} +} + +// IsSubscriptionEventType returns true if the event_type matches the +// "subscription.*" prefix used by Item G. The dispatcher in +// marketplace.ProcessPaymentWebhook does NOT rely on this — it always +// tries the invoice lookup first because the PSP can re-emit subscription +// payments with payment_intent.* event types after a recovery flow. The +// helper is exposed for callers that want to short-circuit the DB hit on +// clearly non-subscription events. +func IsSubscriptionEventType(eventType string) bool { + return strings.HasPrefix(strings.ToLower(eventType), "subscription.") +} + +// ProcessSubscriptionPayment satisfies marketplace.SubscriptionWebhookHandler. +// Returns marketplace.ErrNotASubscription when the payment_id is not +// associated with any subscription invoice — the caller treats that as +// "not a subscription event" and falls through to the order flow. +func (p *SubscriptionWebhookProcessor) ProcessSubscriptionPayment(ctx context.Context, paymentID, status string) error { + if paymentID == "" { + return errors.New("empty payment_id") + } + + var inv subscription.Invoice + if err := p.db.WithContext(ctx). + Where("hyperswitch_payment_id = ?", paymentID). + First(&inv).Error; err != nil { + if errors.Is(err, gorm.ErrRecordNotFound) { + return marketplace.ErrNotASubscription + } + return fmt.Errorf("lookup subscription invoice: %w", err) + } + + var sub subscription.UserSubscription + if err := p.db.WithContext(ctx). + Where("id = ?", inv.SubscriptionID). + First(&sub).Error; err != nil { + if errors.Is(err, gorm.ErrRecordNotFound) { + // Invoice exists but subscription is gone — data integrity + // issue worth surfacing rather than swallowing. + return fmt.Errorf("subscription %s not found for invoice %s", inv.SubscriptionID, inv.ID) + } + return fmt.Errorf("lookup subscription: %w", err) + } + + switch strings.ToLower(status) { + case "succeeded": + if sub.Status == subscription.StatusActive && inv.Status == subscription.InvoicePaid { + p.logger.Debug("Subscription webhook: already active+paid (idempotent replay)", + zap.String("subscription_id", sub.ID.String()), + zap.String("payment_id", paymentID)) + return nil + } + return p.activate(ctx, &inv, &sub, paymentID) + case "failed": + if sub.Status == subscription.StatusExpired && inv.Status == subscription.InvoiceFailed { + p.logger.Debug("Subscription webhook: already expired+failed (idempotent replay)", + zap.String("subscription_id", sub.ID.String()), + zap.String("payment_id", paymentID)) + return nil + } + return p.expire(ctx, &inv, &sub, paymentID) + default: + // Intermediate / unknown statuses (processing, requires_*) — log + // and noop. Hyperswitch retries until a terminal status is acked, + // so the transient ones are safe to ignore. + p.logger.Debug("Subscription webhook: ignoring non-terminal status", + zap.String("status", status), + zap.String("payment_id", paymentID), + zap.String("subscription_id", sub.ID.String())) + return nil + } +} + +func (p *SubscriptionWebhookProcessor) activate(ctx context.Context, inv *subscription.Invoice, sub *subscription.UserSubscription, paymentID string) error { + return p.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error { + now := time.Now() + if err := tx.Model(&subscription.Invoice{}). + Where("id = ?", inv.ID). + Updates(map[string]any{ + "status": subscription.InvoicePaid, + "paid_at": now, + }).Error; err != nil { + return fmt.Errorf("update invoice: %w", err) + } + // Only flip the subscription if it's currently pending_payment. + // A row already in active/canceled/upgraded must not be stomped + // by a delayed webhook arrival. + result := tx.Model(&subscription.UserSubscription{}). + Where("id = ? AND status = ?", sub.ID, subscription.StatusPendingPayment). + Updates(map[string]any{"status": subscription.StatusActive}) + if result.Error != nil { + return fmt.Errorf("update subscription: %w", result.Error) + } + if result.RowsAffected == 0 { + p.logger.Warn("Subscription webhook: subscription not in pending_payment, invoice still flipped to paid", + zap.String("subscription_id", sub.ID.String()), + zap.String("current_status", string(sub.Status)), + zap.String("payment_id", paymentID)) + return nil + } + p.logger.Info("Subscription activated via Hyperswitch webhook", + zap.String("subscription_id", sub.ID.String()), + zap.String("invoice_id", inv.ID.String()), + zap.String("payment_id", paymentID)) + return nil + }) +} + +func (p *SubscriptionWebhookProcessor) expire(ctx context.Context, inv *subscription.Invoice, sub *subscription.UserSubscription, paymentID string) error { + return p.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error { + if err := tx.Model(&subscription.Invoice{}). + Where("id = ?", inv.ID). + Updates(map[string]any{"status": subscription.InvoiceFailed}).Error; err != nil { + return fmt.Errorf("update invoice: %w", err) + } + result := tx.Model(&subscription.UserSubscription{}). + Where("id = ? AND status = ?", sub.ID, subscription.StatusPendingPayment). + Updates(map[string]any{"status": subscription.StatusExpired}) + if result.Error != nil { + return fmt.Errorf("update subscription: %w", result.Error) + } + if result.RowsAffected == 0 { + p.logger.Warn("Subscription webhook: subscription not in pending_payment, invoice still flipped to failed", + zap.String("subscription_id", sub.ID.String()), + zap.String("current_status", string(sub.Status)), + zap.String("payment_id", paymentID)) + return nil + } + p.logger.Info("Subscription expired via Hyperswitch webhook", + zap.String("subscription_id", sub.ID.String()), + zap.String("invoice_id", inv.ID.String()), + zap.String("payment_id", paymentID)) + return nil + }) +} diff --git a/veza-backend-api/internal/services/hyperswitch/webhook_subscription_test.go b/veza-backend-api/internal/services/hyperswitch/webhook_subscription_test.go new file mode 100644 index 000000000..00709f2df --- /dev/null +++ b/veza-backend-api/internal/services/hyperswitch/webhook_subscription_test.go @@ -0,0 +1,162 @@ +package hyperswitch + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/google/uuid" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/zap" + "gorm.io/driver/sqlite" + "gorm.io/gorm" + + "veza-backend-api/internal/core/marketplace" + "veza-backend-api/internal/core/subscription" +) + +func setupSubscriptionWebhookDB(t *testing.T) *gorm.DB { + t.Helper() + db, err := gorm.Open(sqlite.Open(":memory:"), &gorm.Config{}) + require.NoError(t, err) + require.NoError(t, db.AutoMigrate( + &subscription.Plan{}, + &subscription.UserSubscription{}, + &subscription.Invoice{}, + )) + return db +} + +// seedPendingPayment creates a paid-plan subscription frozen in +// pending_payment, with an invoice carrying the supplied payment_id — +// the canonical post-Phase-1 row shape that the webhook handler is +// supposed to flip. +func seedPendingPayment(t *testing.T, db *gorm.DB, paymentID string) (subscription.UserSubscription, subscription.Invoice) { + t.Helper() + plan := subscription.Plan{ + ID: uuid.New(), + Name: subscription.PlanCreator, + DisplayName: "Creator", + PriceMonthly: 999, + Currency: "USD", + IsActive: true, + } + require.NoError(t, db.Create(&plan).Error) + + now := time.Now() + sub := subscription.UserSubscription{ + UserID: uuid.New(), + PlanID: plan.ID, + Status: subscription.StatusPendingPayment, + BillingCycle: subscription.BillingMonthly, + CurrentPeriodStart: now, + CurrentPeriodEnd: now.AddDate(0, 1, 0), + } + require.NoError(t, db.Create(&sub).Error) + + inv := subscription.Invoice{ + SubscriptionID: sub.ID, + UserID: sub.UserID, + AmountCents: 999, + Currency: "USD", + Status: subscription.InvoicePending, + BillingPeriodStart: now, + BillingPeriodEnd: now.AddDate(0, 1, 0), + HyperswitchPaymentID: paymentID, + } + require.NoError(t, db.Create(&inv).Error) + return sub, inv +} + +// TestSubscriptionWebhookProcessor_Succeeded covers the canonical happy +// path: pending_payment + status=succeeded flips both rows to terminal +// state (sub=active, invoice=paid+paid_at). +func TestSubscriptionWebhookProcessor_Succeeded(t *testing.T) { + db := setupSubscriptionWebhookDB(t) + sub, inv := seedPendingPayment(t, db, "pay_succ_1") + + p := NewSubscriptionWebhookProcessor(db, zap.NewNop()) + require.NoError(t, p.ProcessSubscriptionPayment(context.Background(), "pay_succ_1", "succeeded")) + + var refreshedSub subscription.UserSubscription + require.NoError(t, db.First(&refreshedSub, "id = ?", sub.ID).Error) + assert.Equal(t, subscription.StatusActive, refreshedSub.Status) + + var refreshedInv subscription.Invoice + require.NoError(t, db.First(&refreshedInv, "id = ?", inv.ID).Error) + assert.Equal(t, subscription.InvoicePaid, refreshedInv.Status) + require.NotNil(t, refreshedInv.PaidAt, "paid_at must be set after activation") +} + +// TestSubscriptionWebhookProcessor_Failed covers the dual: pending_payment +// + status=failed flips both rows to the rejection-terminal state +// (sub=expired, invoice=failed). Phase 1 created the row optimistically, +// the failed webhook is what shuts it down without granting access. +func TestSubscriptionWebhookProcessor_Failed(t *testing.T) { + db := setupSubscriptionWebhookDB(t) + sub, inv := seedPendingPayment(t, db, "pay_fail_1") + + p := NewSubscriptionWebhookProcessor(db, zap.NewNop()) + require.NoError(t, p.ProcessSubscriptionPayment(context.Background(), "pay_fail_1", "failed")) + + var refreshedSub subscription.UserSubscription + require.NoError(t, db.First(&refreshedSub, "id = ?", sub.ID).Error) + assert.Equal(t, subscription.StatusExpired, refreshedSub.Status) + + var refreshedInv subscription.Invoice + require.NoError(t, db.First(&refreshedInv, "id = ?", inv.ID).Error) + assert.Equal(t, subscription.InvoiceFailed, refreshedInv.Status) +} + +// TestSubscriptionWebhookProcessor_IdempotentReplay locks down the +// at-least-once delivery contract: Hyperswitch retries until 200 OK, +// so a second succeeded webhook must be a no-op (state unchanged, +// no error, paid_at NOT bumped to "now"). +func TestSubscriptionWebhookProcessor_IdempotentReplay(t *testing.T) { + db := setupSubscriptionWebhookDB(t) + sub, inv := seedPendingPayment(t, db, "pay_replay_1") + + p := NewSubscriptionWebhookProcessor(db, zap.NewNop()) + + // First delivery: pending_payment → active. + require.NoError(t, p.ProcessSubscriptionPayment(context.Background(), "pay_replay_1", "succeeded")) + + var inv1 subscription.Invoice + require.NoError(t, db.First(&inv1, "id = ?", inv.ID).Error) + require.NotNil(t, inv1.PaidAt) + paidAt1 := *inv1.PaidAt + + // Sleep to make sure any spurious paid_at re-write would have a + // detectable later timestamp (millisecond resolution suffices). + time.Sleep(10 * time.Millisecond) + + // Replay: same payment_id, same status, no error, no state change. + require.NoError(t, p.ProcessSubscriptionPayment(context.Background(), "pay_replay_1", "succeeded")) + + var sub2 subscription.UserSubscription + require.NoError(t, db.First(&sub2, "id = ?", sub.ID).Error) + assert.Equal(t, subscription.StatusActive, sub2.Status) + + var inv2 subscription.Invoice + require.NoError(t, db.First(&inv2, "id = ?", inv.ID).Error) + assert.Equal(t, subscription.InvoicePaid, inv2.Status) + require.NotNil(t, inv2.PaidAt) + assert.True(t, paidAt1.Equal(*inv2.PaidAt), + "paid_at must NOT be re-stamped on idempotent replay (got %v after %v)", *inv2.PaidAt, paidAt1) +} + +// TestSubscriptionWebhookProcessor_UnknownPaymentID locks down the +// fall-through contract: a payment_id that has no subscription invoice +// returns marketplace.ErrNotASubscription so the dispatcher in +// ProcessPaymentWebhook can route the event to the order flow. +func TestSubscriptionWebhookProcessor_UnknownPaymentID(t *testing.T) { + db := setupSubscriptionWebhookDB(t) + p := NewSubscriptionWebhookProcessor(db, zap.NewNop()) + + err := p.ProcessSubscriptionPayment(context.Background(), "pay_does_not_exist", "succeeded") + require.Error(t, err) + assert.True(t, errors.Is(err, marketplace.ErrNotASubscription), + "expected marketplace.ErrNotASubscription, got %v", err) +}