diff --git a/veza-backend-api/.env.template b/veza-backend-api/.env.template index 3f633c388..5969af4b7 100644 --- a/veza-backend-api/.env.template +++ b/veza-backend-api/.env.template @@ -99,6 +99,23 @@ STRIPE_SECRET_KEY= # Webhook secret for Connect events (whsec_xxx) STRIPE_CONNECT_WEBHOOK_SECRET= +# --- TRANSFER RETRY WORKER (v0.701) --- +# Drives failed seller_transfers back to completed when Stripe recovers. +# TRANSFER_RETRY_ENABLED=true +# TRANSFER_RETRY_MAX=3 +# TRANSFER_RETRY_INTERVAL=5m + +# --- REVERSAL WORKER (v1.0.7 item B) --- +# Drives reversal_pending seller_transfers to reversed by calling Stripe +# Connect Transfers:reversal. Decouples buyer-facing refund UX from +# Stripe-side settlement health. Backoff is exponential (base * 2^retry), +# capped at BACKOFF_MAX. +# REVERSAL_WORKER_ENABLED=true +# REVERSAL_MAX_RETRIES=5 +# REVERSAL_CHECK_INTERVAL=1m +# REVERSAL_BACKOFF_BASE=1m +# REVERSAL_BACKOFF_MAX=1h + # --- EXTERNAL SERVICES (OPTIONAL) --- STREAM_SERVER_URL=http://veza.fr:8082 # Must match stream server INTERNAL_API_KEY for /internal/jobs/transcode (P1.1.2) diff --git a/veza-backend-api/cmd/api/main.go b/veza-backend-api/cmd/api/main.go index bc729ab6b..5e5392b73 100644 --- a/veza-backend-api/cmd/api/main.go +++ b/veza-backend-api/cmd/api/main.go @@ -161,23 +161,48 @@ func main() { } // v0.701: Start Transfer Retry Worker - if cfg.TransferRetryEnabled && cfg.StripeConnectEnabled && cfg.StripeConnectSecretKey != "" { + // v1.0.7 item B: Start Reversal Worker (shares the same + // StripeConnectService — one initialisation for both workers). + if cfg.StripeConnectEnabled && cfg.StripeConnectSecretKey != "" { stripeConnectSvc := services.NewStripeConnectService(db.GormDB, cfg.StripeConnectSecretKey, logger) - retryWorker := marketplace.NewTransferRetryWorker( - db.GormDB, stripeConnectSvc, logger, cfg.TransferRetryInterval, cfg.TransferRetryMaxAttempts, - ) - retryCtx, retryCancel := context.WithCancel(context.Background()) - go retryWorker.Start(retryCtx) - logger.Info("Transfer Retry Worker started", - zap.Duration("interval", cfg.TransferRetryInterval), - zap.Int("max_retries", cfg.TransferRetryMaxAttempts)) - shutdownManager.Register(shutdown.NewShutdownFunc("transfer_retry_worker", func(ctx context.Context) error { - retryCancel() - return nil - })) - } else if cfg.TransferRetryEnabled { - logger.Info("Transfer Retry Worker skipped — Stripe Connect not enabled") + if cfg.TransferRetryEnabled { + retryWorker := marketplace.NewTransferRetryWorker( + db.GormDB, stripeConnectSvc, logger, cfg.TransferRetryInterval, cfg.TransferRetryMaxAttempts, + ) + retryCtx, retryCancel := context.WithCancel(context.Background()) + go retryWorker.Start(retryCtx) + logger.Info("Transfer Retry Worker started", + zap.Duration("interval", cfg.TransferRetryInterval), + zap.Int("max_retries", cfg.TransferRetryMaxAttempts)) + + shutdownManager.Register(shutdown.NewShutdownFunc("transfer_retry_worker", func(ctx context.Context) error { + retryCancel() + return nil + })) + } + + if cfg.ReversalWorkerEnabled { + reversalWorker := marketplace.NewStripeReversalWorker( + db.GormDB, stripeConnectSvc, logger, + cfg.ReversalCheckInterval, cfg.ReversalMaxRetries, + cfg.ReversalBackoffBase, cfg.ReversalBackoffMax, + ) + reversalCtx, reversalCancel := context.WithCancel(context.Background()) + go reversalWorker.Start(reversalCtx) + logger.Info("Stripe Reversal Worker started", + zap.Duration("interval", cfg.ReversalCheckInterval), + zap.Int("max_retries", cfg.ReversalMaxRetries), + zap.Duration("backoff_base", cfg.ReversalBackoffBase), + zap.Duration("backoff_max", cfg.ReversalBackoffMax)) + + shutdownManager.Register(shutdown.NewShutdownFunc("stripe_reversal_worker", func(ctx context.Context) error { + reversalCancel() + return nil + })) + } + } else if cfg.TransferRetryEnabled || cfg.ReversalWorkerEnabled { + logger.Info("Transfer Retry / Reversal workers skipped — Stripe Connect not enabled") } // v0.802: Start Cloud Backup Worker (copies cloud files to backup prefix every 24h) diff --git a/veza-backend-api/internal/config/config.go b/veza-backend-api/internal/config/config.go index 4cb103e57..2bf26bba5 100644 --- a/veza-backend-api/internal/config/config.go +++ b/veza-backend-api/internal/config/config.go @@ -165,6 +165,17 @@ type Config struct { TransferRetryMaxAttempts int // TRANSFER_RETRY_MAX (default 3) TransferRetryInterval time.Duration // TRANSFER_RETRY_INTERVAL (default 5m) + // Reversal Worker (v1.0.7 item B) — drives seller_transfers from + // reversal_pending to reversed via Stripe Connect Transfers:reversal. + // Backoff is exponential: interval * 2^retry_count, capped at + // ReversalBackoffMax so a row doesn't sit 17 hours between retries + // after 10 failed attempts. + ReversalWorkerEnabled bool // REVERSAL_WORKER_ENABLED (default true) + ReversalMaxRetries int // REVERSAL_MAX_RETRIES (default 5) + ReversalCheckInterval time.Duration // REVERSAL_CHECK_INTERVAL (default 1m) + ReversalBackoffBase time.Duration // REVERSAL_BACKOFF_BASE (default 1m) + ReversalBackoffMax time.Duration // REVERSAL_BACKOFF_MAX (default 1h) + // Email & Jobs EmailSender *email.SMTPEmailSender JobWorker *workers.JobWorker @@ -398,6 +409,13 @@ func NewConfig() (*Config, error) { TransferRetryMaxAttempts: getEnvInt("TRANSFER_RETRY_MAX", 3), TransferRetryInterval: getEnvDuration("TRANSFER_RETRY_INTERVAL", 5*time.Minute), + // Reversal Worker (v1.0.7 item B) + ReversalWorkerEnabled: getEnvBool("REVERSAL_WORKER_ENABLED", true), + ReversalMaxRetries: getEnvInt("REVERSAL_MAX_RETRIES", 5), + ReversalCheckInterval: getEnvDuration("REVERSAL_CHECK_INTERVAL", time.Minute), + ReversalBackoffBase: getEnvDuration("REVERSAL_BACKOFF_BASE", time.Minute), + ReversalBackoffMax: getEnvDuration("REVERSAL_BACKOFF_MAX", time.Hour), + // Log Files Configuration — centralized in config/logging.toml // Resolved via logging.LoadConfig() with env var overrides (LOG_DIR, LOG_LEVEL) LogDir: logging.LoadConfig().ResolveLogDir(env), diff --git a/veza-backend-api/internal/core/marketplace/models.go b/veza-backend-api/internal/core/marketplace/models.go index 6b6c23a92..c7cd5f8b2 100644 --- a/veza-backend-api/internal/core/marketplace/models.go +++ b/veza-backend-api/internal/core/marketplace/models.go @@ -231,10 +231,10 @@ func (pr *ProductReview) BeforeCreate(tx *gorm.DB) (err error) { // SellerTransfer tracks a Stripe Connect transfer for a completed order (v0.603) type SellerTransfer struct { - ID uuid.UUID `gorm:"type:uuid;primaryKey" json:"id"` - SellerID uuid.UUID `gorm:"type:uuid;not null" json:"seller_id"` - OrderID uuid.UUID `gorm:"type:uuid;not null" json:"order_id"` - StripeTransferID string `gorm:"size:255" json:"stripe_transfer_id,omitempty"` + ID uuid.UUID `gorm:"type:uuid;primaryKey" json:"id"` + SellerID uuid.UUID `gorm:"type:uuid;not null" json:"seller_id"` + OrderID uuid.UUID `gorm:"type:uuid;not null" json:"order_id"` + StripeTransferID string `gorm:"size:255" json:"stripe_transfer_id,omitempty"` // StripeReversalID is populated by the v1.0.7 item B reversal worker // when a refund triggers a reverse-charge against the transfer. // Empty for transfers that never got reversed. Unique when non-empty diff --git a/veza-backend-api/internal/core/marketplace/process_webhook_test.go b/veza-backend-api/internal/core/marketplace/process_webhook_test.go index 98ebdd7b7..85c12bf97 100644 --- a/veza-backend-api/internal/core/marketplace/process_webhook_test.go +++ b/veza-backend-api/internal/core/marketplace/process_webhook_test.go @@ -188,6 +188,10 @@ func (m *mockTransferService) CreateTransfer(_ context.Context, sellerUserID uui return id, nil } +func (m *mockTransferService) ReverseTransfer(_ context.Context, _ string, _ *int64, _ string) (string, error) { + return "rev_mock", nil +} + func TestProcessWebhook_TransferSuccess(t *testing.T) { db := setupWebhookTestDB(t) logger := zap.NewNop() diff --git a/veza-backend-api/internal/core/marketplace/refund_test.go b/veza-backend-api/internal/core/marketplace/refund_test.go index 79699bb2d..c819b1660 100644 --- a/veza-backend-api/internal/core/marketplace/refund_test.go +++ b/veza-backend-api/internal/core/marketplace/refund_test.go @@ -72,14 +72,14 @@ func formatN(n int32) string { } type refundTestFixture struct { - db *gorm.DB - svc *Service - provider *mockRefundPaymentProvider - buyerID uuid.UUID - sellerID uuid.UUID - orderID uuid.UUID + db *gorm.DB + svc *Service + provider *mockRefundPaymentProvider + buyerID uuid.UUID + sellerID uuid.UUID + orderID uuid.UUID productID uuid.UUID - trackID uuid.UUID + trackID uuid.UUID } func newRefundTestFixture(t *testing.T) *refundTestFixture { @@ -333,7 +333,13 @@ func TestProcessRefundWebhook_SucceededFinalizesState(t *testing.T) { var transfer SellerTransfer require.NoError(t, f.db.Where("order_id = ?", f.orderID).First(&transfer).Error) - assert.Equal(t, "reversed", transfer.Status) + // v1.0.7 item B: refund now stages the row at reversal_pending; the + // async StripeReversalWorker drives it to reversed out-of-band. This + // decouples buyer-facing refund UX from Stripe-side health. The + // balance debit still happens synchronously because it's on-platform + // accounting that can't wait for Stripe. + assert.Equal(t, TransferStatusReversalPending, transfer.Status) + require.NotNil(t, transfer.NextRetryAt, "reversal_pending row must carry a next_retry_at so the worker picks it up") var balance SellerBalance require.NoError(t, f.db.Where("seller_id = ?", f.sellerID).First(&balance).Error) diff --git a/veza-backend-api/internal/core/marketplace/reversal_worker.go b/veza-backend-api/internal/core/marketplace/reversal_worker.go new file mode 100644 index 000000000..cb1573764 --- /dev/null +++ b/veza-backend-api/internal/core/marketplace/reversal_worker.go @@ -0,0 +1,259 @@ +package marketplace + +import ( + "context" + "errors" + "time" + + "go.uber.org/zap" + "gorm.io/gorm" +) + +// StripeReversalWorker drives seller_transfers from reversal_pending to +// reversed by calling the PSP's reversal endpoint. Introduced in v1.0.7 +// item B to decouple buyer-facing refund UX from Stripe-side settlement +// health — the refund handler marks the row reversal_pending and +// returns immediately; this worker picks it up asynchronously. +// +// Sibling of TransferRetryWorker (v0.701): same interval-loop shape, +// same exponential-backoff-capped retry pattern, but scoped to the +// reversal transition rather than the create-transfer transition. A +// row with status='reversal_pending' is the only input shape the +// worker touches — all transitions out go through +// SellerTransfer.TransitionStatus (state-machine enforced). +// +// Day 2 ships the core logic. Day 3 will add Stripe 404 +// disambiguation (ErrTransferAlreadyReversed / ErrTransferNotFound), +// batch-size tuning, and an end-to-end smoke probe. +type StripeReversalWorker struct { + db *gorm.DB + transferService TransferService + logger *zap.Logger + interval time.Duration + maxRetries int + backoffBase time.Duration + backoffMax time.Duration + batchLimit int +} + +// NewStripeReversalWorker constructs a worker. backoffBase and +// backoffMax cap the exponential delay (base * 2^retry_count), so a +// row that fails 10 times doesn't end up with next_retry_at 17 hours +// in the future. +func NewStripeReversalWorker(db *gorm.DB, ts TransferService, logger *zap.Logger, interval time.Duration, maxRetries int, backoffBase, backoffMax time.Duration) *StripeReversalWorker { + if interval <= 0 { + interval = time.Minute + } + if backoffBase <= 0 { + backoffBase = time.Minute + } + if backoffMax <= 0 { + backoffMax = time.Hour + } + if maxRetries <= 0 { + maxRetries = 5 + } + return &StripeReversalWorker{ + db: db, + transferService: ts, + logger: logger, + interval: interval, + maxRetries: maxRetries, + backoffBase: backoffBase, + backoffMax: backoffMax, + // Batch limit prevents a rafale at startup — if 100 rows sit + // with next_retry_at in the past because the worker was down, + // the first tick processes at most batchLimit of them, then + // waits one interval before the next batch. Day 3 may tune + // this up/down based on observed load; today 20 is safely + // under Stripe's default rate limits (100 req/s). + batchLimit: 20, + } +} + +// Start runs the worker loop until ctx is cancelled. +func (w *StripeReversalWorker) Start(ctx context.Context) { + if w.transferService == nil { + w.logger.Warn("StripeReversalWorker: no transfer service configured, disabling") + return + } + ticker := time.NewTicker(w.interval) + defer ticker.Stop() + + w.logger.Info("StripeReversalWorker started", + zap.Duration("interval", w.interval), + zap.Int("max_retries", w.maxRetries), + zap.Duration("backoff_base", w.backoffBase), + zap.Duration("backoff_max", w.backoffMax), + zap.Int("batch_limit", w.batchLimit)) + + for { + select { + case <-ctx.Done(): + w.logger.Info("StripeReversalWorker stopped") + return + case <-ticker.C: + w.processBatch(ctx) + } + } +} + +// processBatch fetches up to batchLimit rows ripe for reversal and +// dispatches each through reverseOne. +func (w *StripeReversalWorker) processBatch(ctx context.Context) { + now := time.Now() + var rows []SellerTransfer + err := w.db.WithContext(ctx). + Where("status = ? AND (next_retry_at IS NULL OR next_retry_at <= ?)", + TransferStatusReversalPending, now). + Order("next_retry_at ASC NULLS FIRST"). + Limit(w.batchLimit). + Find(&rows).Error + if err != nil { + w.logger.Error("StripeReversalWorker: failed to query reversal_pending rows", zap.Error(err)) + return + } + + for i := range rows { + w.reverseOne(ctx, &rows[i]) + } +} + +// reverseOne attempts a reversal on a single row and advances its +// state. All state mutations go through TransitionStatus, which +// enforces the matrix and uses optimistic locking on the current +// Status — if two worker instances pick up the same row, one wins +// and the other finds RowsAffected=0 and logs a conflict. +func (w *StripeReversalWorker) reverseOne(ctx context.Context, row *SellerTransfer) { + // Legacy row protection: a seller_transfer with empty + // stripe_transfer_id cannot be reversed — there's nothing to + // address at Stripe. These rows predate item A (v1.0.7) and + // require the backfill CLI (task #38) or admin intervention + // (P2.8, v1.0.8+). For now, flag them as permanently_failed with + // a distinctive error_message so ops can find them via a query + // and decide. + if row.StripeTransferID == "" { + w.logger.Error("StripeReversalWorker: cannot reverse row without stripe_transfer_id (legacy pre-v1.0.7 row)", + zap.String("transfer_id", row.ID.String()), + zap.String("order_id", row.OrderID.String())) + extras := map[string]interface{}{ + "error_message": "legacy row: no stripe_transfer_id, reversal requires backfill or admin intervention", + "next_retry_at": nil, + } + if err := row.TransitionStatus(w.db.WithContext(ctx), TransferStatusPermanentlyFailed, extras); err != nil { + w.logger.Error("StripeReversalWorker: failed to mark legacy row permanently_failed", + zap.String("transfer_id", row.ID.String()), zap.Error(err)) + } + return + } + + reversalID, err := w.transferService.ReverseTransfer(ctx, row.StripeTransferID, nil, "refund") + + // Success path: transition to reversed + persist stripe_reversal_id. + if err == nil { + extras := map[string]interface{}{ + "stripe_reversal_id": reversalID, + "error_message": "", + "next_retry_at": nil, + } + if tErr := row.TransitionStatus(w.db.WithContext(ctx), TransferStatusReversed, extras); tErr != nil { + w.logger.Error("StripeReversalWorker: failed to save reversed state", + zap.String("transfer_id", row.ID.String()), zap.Error(tErr)) + return + } + w.logger.Info("StripeReversalWorker: reversal succeeded", + zap.String("transfer_id", row.ID.String()), + zap.String("stripe_transfer_id", row.StripeTransferID), + zap.String("stripe_reversal_id", reversalID)) + return + } + + // Already-reversed-out-of-band: benign, treat as success. Day 3 + // surfaces the concrete sentinel from ReverseTransfer; today the + // sentinel is declared but ReverseTransfer doesn't yet return it, + // so this branch is dead code until day 3. Left in place so the + // worker shape doesn't need to change when day 3 lands. + if errors.Is(err, ErrTransferAlreadyReversed) { + extras := map[string]interface{}{ + "error_message": "stripe reports transfer already reversed (out-of-band)", + "next_retry_at": nil, + } + if tErr := row.TransitionStatus(w.db.WithContext(ctx), TransferStatusReversed, extras); tErr != nil { + w.logger.Error("StripeReversalWorker: failed to save reversed state after already-reversed", + zap.String("transfer_id", row.ID.String()), zap.Error(tErr)) + return + } + w.logger.Info("StripeReversalWorker: treating already-reversed as success", + zap.String("transfer_id", row.ID.String()), + zap.String("stripe_transfer_id", row.StripeTransferID)) + return + } + + // Missing-at-Stripe: our stripe_transfer_id doesn't exist. This is + // a data-integrity incident (we have an id that Stripe doesn't + // recognise), not a retry scenario. Terminate the row and shout. + // Day 3 will populate this sentinel; dead code until then. + if errors.Is(err, ErrTransferNotFound) { + w.logger.Error("StripeReversalWorker: stripe reports transfer_id does not exist — data integrity incident", + zap.String("transfer_id", row.ID.String()), + zap.String("stripe_transfer_id", row.StripeTransferID), + zap.Error(err)) + extras := map[string]interface{}{ + "error_message": "stripe reports transfer not found: " + err.Error(), + "next_retry_at": nil, + } + if tErr := row.TransitionStatus(w.db.WithContext(ctx), TransferStatusPermanentlyFailed, extras); tErr != nil { + w.logger.Error("StripeReversalWorker: failed to mark row permanently_failed after not-found", + zap.String("transfer_id", row.ID.String()), zap.Error(tErr)) + } + return + } + + // Transient error: bump retry_count, set next_retry_at with + // exponential backoff, stay in reversal_pending. + newRetryCount := row.RetryCount + 1 + if newRetryCount >= w.maxRetries { + w.logger.Warn("StripeReversalWorker: max retries exhausted, marking permanently_failed", + zap.String("transfer_id", row.ID.String()), + zap.Int("retry_count", newRetryCount), + zap.Error(err)) + extras := map[string]interface{}{ + "retry_count": newRetryCount, + "error_message": err.Error(), + "next_retry_at": nil, + } + if tErr := row.TransitionStatus(w.db.WithContext(ctx), TransferStatusPermanentlyFailed, extras); tErr != nil { + w.logger.Error("StripeReversalWorker: failed to mark permanently_failed", + zap.String("transfer_id", row.ID.String()), zap.Error(tErr)) + } + return + } + + delay := w.backoffBase + for i := 0; i < newRetryCount; i++ { + delay *= 2 + if delay > w.backoffMax { + delay = w.backoffMax + break + } + } + nextRetry := time.Now().Add(delay) + extras := map[string]interface{}{ + "retry_count": newRetryCount, + "error_message": err.Error(), + "next_retry_at": &nextRetry, + } + // Same-state transition: reversal_pending → reversal_pending (bump + // retry_count / next_retry_at / error_message without changing + // status). TransitionStatus permits same-state by design. + if tErr := row.TransitionStatus(w.db.WithContext(ctx), TransferStatusReversalPending, extras); tErr != nil { + w.logger.Error("StripeReversalWorker: failed to save transient retry state", + zap.String("transfer_id", row.ID.String()), zap.Error(tErr)) + } + w.logger.Warn("StripeReversalWorker: reversal failed, will retry", + zap.String("transfer_id", row.ID.String()), + zap.String("stripe_transfer_id", row.StripeTransferID), + zap.Int("retry_count", newRetryCount), + zap.Duration("next_delay", delay), + zap.Error(err)) +} diff --git a/veza-backend-api/internal/core/marketplace/reversal_worker_test.go b/veza-backend-api/internal/core/marketplace/reversal_worker_test.go new file mode 100644 index 000000000..d9eb5d4da --- /dev/null +++ b/veza-backend-api/internal/core/marketplace/reversal_worker_test.go @@ -0,0 +1,279 @@ +package marketplace + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/google/uuid" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/zap" + "gorm.io/driver/sqlite" + "gorm.io/gorm" +) + +func setupReversalTestDB(t *testing.T) *gorm.DB { + db, err := gorm.Open(sqlite.Open(":memory:"), &gorm.Config{}) + require.NoError(t, err) + require.NoError(t, db.AutoMigrate(&SellerTransfer{})) + return db +} + +// mockReversalTransferService is a dedicated mock for reversal-worker +// tests. CreateTransfer is unused; ReverseTransfer is the hot path and +// supports (a) a canned reversalID, (b) a canned error, (c) call +// capture for assertion on what the worker passed to the PSP. +type mockReversalTransferService struct { + reversalID string + err error + calls []struct { + StripeTransferID string + Amount *int64 + Reason string + } +} + +func (m *mockReversalTransferService) CreateTransfer(_ context.Context, _ uuid.UUID, _ int64, _, _ string) (string, error) { + return "tr_mock", nil +} + +func (m *mockReversalTransferService) ReverseTransfer(_ context.Context, stripeTransferID string, amount *int64, reason string) (string, error) { + m.calls = append(m.calls, struct { + StripeTransferID string + Amount *int64 + Reason string + }{stripeTransferID, amount, reason}) + if m.err != nil { + return "", m.err + } + id := m.reversalID + if id == "" { + id = "rev_mock" + } + return id, nil +} + +// newReversalPendingRow is a shared fixture helper — a reversal_pending +// row with stripe_transfer_id populated and next_retry_at ripe. +func newReversalPendingRow(t *testing.T, db *gorm.DB) *SellerTransfer { + t.Helper() + now := time.Now() + row := &SellerTransfer{ + ID: uuid.New(), + SellerID: uuid.New(), + OrderID: uuid.New(), + StripeTransferID: "tr_fixture_123", + AmountCents: 900, + Currency: "EUR", + Status: TransferStatusReversalPending, + NextRetryAt: &now, + } + require.NoError(t, db.Create(row).Error) + return row +} + +func TestReversalWorker_HappyPath_PendingToReversed(t *testing.T) { + db := setupReversalTestDB(t) + mock := &mockReversalTransferService{reversalID: "rev_happy_001"} + w := NewStripeReversalWorker(db, mock, zap.NewNop(), time.Minute, 5, time.Minute, time.Hour) + + row := newReversalPendingRow(t, db) + + w.processBatch(context.Background()) + + var updated SellerTransfer + require.NoError(t, db.First(&updated, row.ID).Error) + assert.Equal(t, TransferStatusReversed, updated.Status) + assert.Equal(t, "rev_happy_001", updated.StripeReversalID) + assert.Empty(t, updated.ErrorMessage) + assert.Nil(t, updated.NextRetryAt) + + require.Len(t, mock.calls, 1) + assert.Equal(t, "tr_fixture_123", mock.calls[0].StripeTransferID) + assert.Nil(t, mock.calls[0].Amount, "v1.0.7 day 2 always does full reversal (nil amount)") + assert.Equal(t, "refund", mock.calls[0].Reason) +} + +func TestReversalWorker_AlreadyReversed_TreatedAsSuccess(t *testing.T) { + db := setupReversalTestDB(t) + mock := &mockReversalTransferService{err: ErrTransferAlreadyReversed} + w := NewStripeReversalWorker(db, mock, zap.NewNop(), time.Minute, 5, time.Minute, time.Hour) + + row := newReversalPendingRow(t, db) + + w.processBatch(context.Background()) + + var updated SellerTransfer + require.NoError(t, db.First(&updated, row.ID).Error) + // Already-reversed at Stripe = our job is already done, row flips + // to reversed with an informative error_message so ops can grep. + assert.Equal(t, TransferStatusReversed, updated.Status) + assert.Contains(t, updated.ErrorMessage, "already reversed") + assert.Nil(t, updated.NextRetryAt) +} + +func TestReversalWorker_TransferNotFound_PermanentlyFailed(t *testing.T) { + db := setupReversalTestDB(t) + mock := &mockReversalTransferService{err: ErrTransferNotFound} + w := NewStripeReversalWorker(db, mock, zap.NewNop(), time.Minute, 5, time.Minute, time.Hour) + + row := newReversalPendingRow(t, db) + + w.processBatch(context.Background()) + + var updated SellerTransfer + require.NoError(t, db.First(&updated, row.ID).Error) + // Missing-at-Stripe is a data-integrity incident: terminate so ops + // can investigate. Never retry blindly — would amplify the + // inconsistency. + assert.Equal(t, TransferStatusPermanentlyFailed, updated.Status) + assert.Contains(t, updated.ErrorMessage, "not found") + assert.Nil(t, updated.NextRetryAt) +} + +func TestReversalWorker_TransientError_SchedulesRetryWithBackoff(t *testing.T) { + db := setupReversalTestDB(t) + mock := &mockReversalTransferService{err: errors.New("stripe 503 service unavailable")} + base := 10 * time.Second + w := NewStripeReversalWorker(db, mock, zap.NewNop(), time.Minute, 5, base, time.Hour) + + row := newReversalPendingRow(t, db) + row.RetryCount = 0 + require.NoError(t, db.Save(row).Error) + + before := time.Now() + w.processBatch(context.Background()) + + var updated SellerTransfer + require.NoError(t, db.First(&updated, row.ID).Error) + assert.Equal(t, TransferStatusReversalPending, updated.Status, "transient error keeps row in reversal_pending") + assert.Equal(t, 1, updated.RetryCount) + assert.Contains(t, updated.ErrorMessage, "503") + require.NotNil(t, updated.NextRetryAt) + // Backoff for retry_count=1: base * 2 = 20s + expected := before.Add(2 * base) + assert.True(t, updated.NextRetryAt.After(expected.Add(-time.Second)), + "next_retry_at %v should be ≥ base * 2 from processBatch start %v", updated.NextRetryAt, expected) + assert.True(t, updated.NextRetryAt.Before(expected.Add(2*time.Second)), + "next_retry_at %v should not exceed base * 2 + slack", updated.NextRetryAt) +} + +func TestReversalWorker_BackoffCapped(t *testing.T) { + db := setupReversalTestDB(t) + mock := &mockReversalTransferService{err: errors.New("stripe down")} + // base=1s, max=10s. At retry_count=4, unbounded would be 1*2^4=16s, + // capped stays at 10s. + w := NewStripeReversalWorker(db, mock, zap.NewNop(), time.Minute, 100 /* high to avoid permanent */, time.Second, 10*time.Second) + + row := newReversalPendingRow(t, db) + row.RetryCount = 3 // after tick, becomes 4 → unbounded would be 16s + require.NoError(t, db.Save(row).Error) + + before := time.Now() + w.processBatch(context.Background()) + + var updated SellerTransfer + require.NoError(t, db.First(&updated, row.ID).Error) + assert.Equal(t, 4, updated.RetryCount) + require.NotNil(t, updated.NextRetryAt) + assert.True(t, updated.NextRetryAt.Before(before.Add(11*time.Second)), + "delay should be capped at backoffMax=10s, got %v from now %v", updated.NextRetryAt, before) +} + +func TestReversalWorker_MaxRetriesExhausted_PermanentlyFailed(t *testing.T) { + db := setupReversalTestDB(t) + mock := &mockReversalTransferService{err: errors.New("stripe still down")} + w := NewStripeReversalWorker(db, mock, zap.NewNop(), time.Minute, 3, time.Minute, time.Hour) + + row := newReversalPendingRow(t, db) + row.RetryCount = 2 // next retry pushes to 3 = maxRetries → terminate + require.NoError(t, db.Save(row).Error) + + w.processBatch(context.Background()) + + var updated SellerTransfer + require.NoError(t, db.First(&updated, row.ID).Error) + assert.Equal(t, TransferStatusPermanentlyFailed, updated.Status) + assert.Equal(t, 3, updated.RetryCount) + assert.Nil(t, updated.NextRetryAt) +} + +func TestReversalWorker_LegacyRow_NoStripeTransferID_PermanentlyFailed(t *testing.T) { + db := setupReversalTestDB(t) + mock := &mockReversalTransferService{} + w := NewStripeReversalWorker(db, mock, zap.NewNop(), time.Minute, 5, time.Minute, time.Hour) + + row := newReversalPendingRow(t, db) + row.StripeTransferID = "" // simulate a pre-v1.0.7 row + require.NoError(t, db.Save(row).Error) + + w.processBatch(context.Background()) + + var updated SellerTransfer + require.NoError(t, db.First(&updated, row.ID).Error) + // Legacy row: can't reverse what we can't address. Terminate with + // a distinctive error_message so ops can query for "legacy row". + assert.Equal(t, TransferStatusPermanentlyFailed, updated.Status) + assert.Contains(t, updated.ErrorMessage, "legacy") + assert.Empty(t, mock.calls, "worker must not call Stripe on a row with no transfer_id") +} + +func TestReversalWorker_OnlyProcessesReversalPending(t *testing.T) { + db := setupReversalTestDB(t) + mock := &mockReversalTransferService{reversalID: "rev_should_not_fire"} + w := NewStripeReversalWorker(db, mock, zap.NewNop(), time.Minute, 5, time.Minute, time.Hour) + + // Plant one row in each other status; none should be picked up. + otherStatuses := []string{ + TransferStatusPending, + TransferStatusCompleted, + TransferStatusFailed, + TransferStatusReversed, + TransferStatusPermanentlyFailed, + } + for _, s := range otherStatuses { + r := &SellerTransfer{ + ID: uuid.New(), + SellerID: uuid.New(), + OrderID: uuid.New(), + StripeTransferID: "tr_" + s, + AmountCents: 100, + Currency: "EUR", + Status: s, + } + require.NoError(t, db.Create(r).Error) + } + + w.processBatch(context.Background()) + + assert.Empty(t, mock.calls, "worker must only pick up reversal_pending rows") +} + +func TestReversalWorker_RespectsNextRetryAt_SkipsFutureRows(t *testing.T) { + db := setupReversalTestDB(t) + mock := &mockReversalTransferService{reversalID: "rev_skipped"} + w := NewStripeReversalWorker(db, mock, zap.NewNop(), time.Minute, 5, time.Minute, time.Hour) + + future := time.Now().Add(10 * time.Minute) + row := &SellerTransfer{ + ID: uuid.New(), + SellerID: uuid.New(), + OrderID: uuid.New(), + StripeTransferID: "tr_future", + AmountCents: 100, + Currency: "EUR", + Status: TransferStatusReversalPending, + NextRetryAt: &future, + } + require.NoError(t, db.Create(row).Error) + + w.processBatch(context.Background()) + + assert.Empty(t, mock.calls, "row with next_retry_at in the future must not be processed") + + var untouched SellerTransfer + require.NoError(t, db.First(&untouched, row.ID).Error) + assert.Equal(t, TransferStatusReversalPending, untouched.Status, "row must stay in reversal_pending") +} diff --git a/veza-backend-api/internal/core/marketplace/service.go b/veza-backend-api/internal/core/marketplace/service.go index b92ffeb23..88417740e 100644 --- a/veza-backend-api/internal/core/marketplace/service.go +++ b/veza-backend-api/internal/core/marketplace/service.go @@ -139,7 +139,7 @@ type SellerSale struct { Date string `json:"date"` } -// TransferService abstracts the payout transfer provider (v0.603, v1.0.7 item A) +// TransferService abstracts the payout transfer provider (v0.603, v1.0.7 item A/B) // CreateTransfer returns the PSP's own transfer identifier (e.g. Stripe tr_*) // alongside the error. The caller persists it on the SellerTransfer row so // that downstream operations — reversal on refund (item B), reconciliation @@ -147,10 +147,30 @@ type SellerSale struct { // rather than re-querying the PSP's list API. Pre-v1.0.7 the return was // `error` only and the stripe_transfer_id column sat empty, blocking the // reversal worker from identifying which transfer to reverse. +// +// ReverseTransfer (v1.0.7 item B) reverses a previously-created transfer — +// the PSP moves the funds out of the seller's connected account back to the +// platform. amount=nil means full reversal. Returns the PSP's reversal +// identifier on success so the caller can persist it for idempotency and +// audit. Error sentinels ErrTransferAlreadyReversed / ErrTransferNotFound +// disambiguate the two 404 cases Stripe surfaces, which have very different +// operational meanings: the first is benign (someone else already reversed +// out-of-band, e.g. via Dashboard), the second is an invariant violation +// (our stripe_transfer_id points at nothing). type TransferService interface { CreateTransfer(ctx context.Context, sellerUserID uuid.UUID, amount int64, currency, orderID string) (string, error) + ReverseTransfer(ctx context.Context, stripeTransferID string, amount *int64, reason string) (string, error) } +// Error sentinels for ReverseTransfer — allows the worker to distinguish +// the benign "already reversed" case from the alarming "transfer id +// doesn't exist" case. Both map to 404 on the wire; only the PSP error +// code disambiguates them. +var ( + ErrTransferAlreadyReversed = errors.New("stripe transfer already reversed") + ErrTransferNotFound = errors.New("stripe transfer not found") +) + // Service implémente MarketplaceService type Service struct { db *gorm.DB @@ -791,16 +811,6 @@ func (s *Service) processSellerTransfers(ctx context.Context, tx *gorm.DB, order feeCents := int64(float64(grossCents) * feeRate) netCents := grossCents - feeCents - st := SellerTransfer{ - SellerID: sellerID, - OrderID: order.ID, - AmountCents: netCents, - PlatformFeeCents: feeCents, - CommissionRate: feeRate, - Currency: order.Currency, - Status: "pending", - } - // v0.12.0 F254: Credit seller balance if err := s.CreditSellerBalance(ctx, tx, sellerID, netCents, order.Currency); err != nil { s.logger.Error("Failed to credit seller balance", @@ -809,16 +819,23 @@ func (s *Service) processSellerTransfers(ctx context.Context, tx *gorm.DB, order } stripeTransferID, err := s.transferService.CreateTransfer(ctx, sellerID, netCents, order.Currency, order.ID.String()) + + // Determine terminal status before construction so the row is + // written with its final state in a single Create — no in-place + // mutation of .Status, which keeps processSellerTransfers out + // of the anti-mutation whitelist (TestNoDirectTransferStatus + // Mutation). + var status, errMsg, writeStripeID string if err != nil { - st.Status = "failed" - st.ErrorMessage = err.Error() + status = TransferStatusFailed + errMsg = err.Error() s.logger.Error("Transfer failed for seller", zap.String("seller_id", sellerID.String()), zap.String("order_id", order.ID.String()), zap.Error(err)) } else { - st.Status = "completed" - st.StripeTransferID = stripeTransferID + status = TransferStatusCompleted + writeStripeID = stripeTransferID s.logger.Info("Transfer completed for seller", zap.String("seller_id", sellerID.String()), zap.Int64("amount_cents", netCents), @@ -826,6 +843,17 @@ func (s *Service) processSellerTransfers(ctx context.Context, tx *gorm.DB, order zap.Float64("commission_rate", feeRate)) } + st := SellerTransfer{ + SellerID: sellerID, + OrderID: order.ID, + AmountCents: netCents, + PlatformFeeCents: feeCents, + CommissionRate: feeRate, + Currency: order.Currency, + Status: status, + StripeTransferID: writeStripeID, + ErrorMessage: errMsg, + } if err := tx.Create(&st).Error; err != nil { s.logger.Error("Failed to record seller transfer", zap.String("seller_id", sellerID.String()), @@ -1527,14 +1555,23 @@ func (s *Service) finalizeFailedRefund(ctx context.Context, refundID uuid.UUID, } // reverseSellerAccounting undoes the balance credit + marks every -// SellerTransfer for the order as `reversed`. Best-effort: a balance row -// that's already been paid out (available_cents insufficient) won't go -// negative because DebitSellerBalance caps at zero. +// SellerTransfer for the order as `reversal_pending` so the async +// `StripeReversalWorker` can call Stripe Connect Transfers:reversal +// out-of-band (v1.0.7 item B). Pre-v1.0.7 this function flipped the +// row straight to `reversed` without ever calling Stripe — the money +// was corrected on-platform but the seller's Stripe account still +// showed the original transfer as settled. The reversal_pending state +// decouples buyer-facing refund UX (happens immediately) from +// Stripe-side settlement (may take retries, may 404 if already +// reversed out-of-band, may fail permanently and need ops attention). // -// TODO(v1.0.7): call Stripe Connect Transfers:reversal endpoint via -// TransferService so the money actually moves back out of the seller's -// connected account. Today we only correct the in-DB accounting — a real -// Stripe settlement would still show the seller as paid. +// Best-effort: a balance row that's already been paid out +// (available_cents insufficient) won't go negative because +// DebitSellerBalance caps at zero. +// +// Same-order replay is a no-op — rows already in reversal_pending or +// reversed are skipped. Rows in permanently_failed stay +// permanently_failed (terminal, not our problem to recover). func (s *Service) reverseSellerAccounting(ctx context.Context, tx *gorm.DB, orderID uuid.UUID) { var transfers []SellerTransfer if err := tx.Where("order_id = ?", orderID).Find(&transfers).Error; err != nil { @@ -1543,8 +1580,22 @@ func (s *Service) reverseSellerAccounting(ctx context.Context, tx *gorm.DB, orde return } + now := time.Now() for _, t := range transfers { - if t.Status == "reversed" { + // Skip rows already in or past reversal — idempotent against + // webhook replays and any other concurrent refund flow. + if t.Status == TransferStatusReversalPending || + t.Status == TransferStatusReversed || + t.Status == TransferStatusPermanentlyFailed { + continue + } + if !CanTransitionTransferStatus(t.Status, TransferStatusReversalPending) { + // Typically means the row is still 'pending' or 'failed' — + // shouldn't happen for a completed order, but surfaces + // rather than silently skips. + s.logger.Warn("Skipping reversal on non-completed transfer", + zap.String("transfer_id", t.ID.String()), + zap.String("status", t.Status)) continue } if err := s.DebitSellerBalance(ctx, tx, t.SellerID, t.AmountCents, t.Currency); err != nil { @@ -1555,12 +1606,13 @@ func (s *Service) reverseSellerAccounting(ctx context.Context, tx *gorm.DB, orde zap.Error(err)) continue } - if err := tx.Model(&SellerTransfer{}).Where("id = ?", t.ID). - Updates(map[string]interface{}{ - "status": "reversed", - "error_message": "reversed by refund", - }).Error; err != nil { - s.logger.Error("Failed to mark seller transfer as reversed", + extras := map[string]interface{}{ + "next_retry_at": &now, + "retry_count": 0, + "error_message": "pending stripe reversal", + } + if err := t.TransitionStatus(tx, TransferStatusReversalPending, extras); err != nil { + s.logger.Error("Failed to mark seller transfer as reversal_pending", zap.String("transfer_id", t.ID.String()), zap.Error(err)) } diff --git a/veza-backend-api/internal/core/marketplace/transfer_retry.go b/veza-backend-api/internal/core/marketplace/transfer_retry.go index bb90c38de..0a3495d5c 100644 --- a/veza-backend-api/internal/core/marketplace/transfer_retry.go +++ b/veza-backend-api/internal/core/marketplace/transfer_retry.go @@ -83,41 +83,47 @@ func (w *TransferRetryWorker) retryOne(ctx context.Context, t *SellerTransfer) { orderID := t.OrderID.String() stripeTransferID, err := w.transferService.CreateTransfer(ctx, t.SellerID, t.AmountCents, t.Currency, orderID) if err != nil { - t.RetryCount++ - t.ErrorMessage = err.Error() + newRetryCount := t.RetryCount + 1 // Exponential backoff: interval * 2^retry_count delay := w.interval - for i := 0; i < t.RetryCount; i++ { + for i := 0; i < newRetryCount; i++ { delay *= 2 } nextRetry := time.Now().Add(delay) - t.NextRetryAt = &nextRetry + extras := map[string]interface{}{ + "retry_count": newRetryCount, + "error_message": err.Error(), + "next_retry_at": &nextRetry, + } - if t.RetryCount >= w.maxRetries { - t.Status = "permanently_failed" - t.NextRetryAt = nil + var targetStatus string + if newRetryCount >= w.maxRetries { + targetStatus = TransferStatusPermanentlyFailed + extras["next_retry_at"] = nil monitoring.RecordTransferRetryPermanent() } else { + targetStatus = t.Status // same-state (stay failed), only bump retry fields monitoring.RecordTransferRetryFailure() } - if saveErr := w.db.Save(t).Error; saveErr != nil { + if saveErr := t.TransitionStatus(w.db.WithContext(ctx), targetStatus, extras); saveErr != nil { w.logger.Error("TransferRetryWorker: failed to save transfer", zap.Error(saveErr), zap.String("transfer_id", t.ID.String())) } w.logger.Warn("TransferRetryWorker: retry failed", zap.String("transfer_id", t.ID.String()), zap.String("seller_id", t.SellerID.String()), - zap.Int("retry_count", t.RetryCount), + zap.Int("retry_count", newRetryCount), zap.Error(err)) return } - t.Status = "completed" - t.NextRetryAt = nil - t.ErrorMessage = "" - t.StripeTransferID = stripeTransferID - if err := w.db.Save(t).Error; err != nil { + extras := map[string]interface{}{ + "next_retry_at": nil, + "error_message": "", + "stripe_transfer_id": stripeTransferID, + } + if err := t.TransitionStatus(w.db.WithContext(ctx), TransferStatusCompleted, extras); err != nil { w.logger.Error("TransferRetryWorker: failed to save transfer", zap.Error(err), zap.String("transfer_id", t.ID.String())) return } diff --git a/veza-backend-api/internal/core/marketplace/transfer_retry_test.go b/veza-backend-api/internal/core/marketplace/transfer_retry_test.go index fb8db441d..c794654de 100644 --- a/veza-backend-api/internal/core/marketplace/transfer_retry_test.go +++ b/veza-backend-api/internal/core/marketplace/transfer_retry_test.go @@ -59,6 +59,10 @@ func (m *mockTransferServiceRetry) CreateTransfer(_ context.Context, sellerUserI return id, nil } +func (m *mockTransferServiceRetry) ReverseTransfer(_ context.Context, _ string, _ *int64, _ string) (string, error) { + return "rev_mock", nil +} + func TestRetryWorker_RetriesFailedTransfer(t *testing.T) { db := setupRetryTestDB(t) logger := zap.NewNop() diff --git a/veza-backend-api/internal/core/marketplace/transfer_transitions.go b/veza-backend-api/internal/core/marketplace/transfer_transitions.go index 8f8b57b52..286555715 100644 --- a/veza-backend-api/internal/core/marketplace/transfer_transitions.go +++ b/veza-backend-api/internal/core/marketplace/transfer_transitions.go @@ -1,5 +1,11 @@ package marketplace +import ( + "time" + + "gorm.io/gorm" +) + // SellerTransfer.Status values. The field is a VARCHAR at the DB level // (no ENUM), but these constants are the authoritative list. Any new // status value must be added here, to AllowedTransferTransitions, and @@ -28,12 +34,21 @@ const ( // retry) are always allowed and not listed here — see // CanTransitionTransferStatus. // +// Note on "failed" — failed is *not* terminal. TransferRetryWorker +// (v0.701) is allowed to drive a failed row back to completed when +// a retry succeeds at Stripe. Terminal failure is permanently_failed, +// which the worker sets only after max retries are exhausted. This +// reads counter-intuitively if you expect failed to be end-of-life, +// so the matrix is written here as the source of truth rather than +// relying on reader intuition. +// // Terminal states — Reversed and PermanentlyFailed — have empty -// value lists. An admin manual override can write any status via -// raw SQL with documented intent (audit log entry), but that path -// deliberately sits outside this matrix so a code review of any -// state-mutating call sees the override as a conscious escape hatch -// rather than a legitimate transition. +// value lists. There is no recovery path via the state machine. If +// a stuck row needs to be unblocked (e.g. a permanently_failed row +// where ops has confirmed out-of-band that the Stripe side did +// succeed), intervention is DB-level. An admin API for this case is +// axis-1 P2.8 (v1.0.8+); until that lands, the escape hatch is +// documented: raw SQL UPDATE with a row in the admin_audit_log. // // The state machine covers three flows: // @@ -93,3 +108,69 @@ func CanTransitionTransferStatus(from, to string) bool { } return false } + +// ErrInvalidTransferStatusTransition is returned by TransitionStatus +// when the state machine rejects the requested move. Callers wrap +// or return this — the error text includes the offending pair so +// logs make the origin visible without extra context. +type InvalidTransferStatusTransitionError struct { + TransferID string + From string + To string +} + +func (e *InvalidTransferStatusTransitionError) Error() string { + return "invalid seller_transfer status transition " + e.From + " → " + e.To + " on row " + e.TransferID +} + +// TransitionStatus is the single approved path for mutating +// SellerTransfer.Status. It validates against the matrix, then +// performs a conditional UPDATE guarded by the expected `from` +// value — a concurrent worker that has already advanced the row +// finds `rows affected = 0` and the caller learns the transition +// didn't apply (optimistic lock semantics without a version column). +// +// `extras` lets callers piggy-back unrelated column updates (e.g. +// retry_count, next_retry_at, error_message, stripe_reversal_id) +// in the same UPDATE. Do not pass "status" or "updated_at" in +// extras — the function owns those columns. Passing them will be +// silently overwritten. +// +// Enforced as the choke-point by TestNoDirectStatusMutation, which +// greps the marketplace package for raw `.Status = ` assignments or +// `Update("status", ...)` / `"status":` patterns outside the +// whitelist. +func (st *SellerTransfer) TransitionStatus(tx *gorm.DB, to string, extras map[string]interface{}) error { + if !CanTransitionTransferStatus(st.Status, to) { + return &InvalidTransferStatusTransitionError{ + TransferID: st.ID.String(), + From: st.Status, + To: to, + } + } + updates := map[string]interface{}{ + "status": to, + "updated_at": time.Now(), + } + for k, v := range extras { + if k == "status" || k == "updated_at" { + continue + } + updates[k] = v + } + result := tx.Model(&SellerTransfer{}). + Where("id = ? AND status = ?", st.ID, st.Status). + Updates(updates) + if result.Error != nil { + return result.Error + } + if result.RowsAffected == 0 { + return &InvalidTransferStatusTransitionError{ + TransferID: st.ID.String(), + From: st.Status, + To: to, + } + } + st.Status = to + return nil +} diff --git a/veza-backend-api/internal/core/marketplace/transfer_transitions_test.go b/veza-backend-api/internal/core/marketplace/transfer_transitions_test.go index 348bfe155..37d433028 100644 --- a/veza-backend-api/internal/core/marketplace/transfer_transitions_test.go +++ b/veza-backend-api/internal/core/marketplace/transfer_transitions_test.go @@ -2,10 +2,15 @@ package marketplace import ( "fmt" + "os" + "path/filepath" + "regexp" "sort" + "strings" "testing" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) // TestTransferStateTransitions is the regression-grade check on the @@ -133,3 +138,103 @@ func TestCanTransitionTransferStatus_UnknownFromIsConservative(t *testing.T) { assert.False(t, CanTransitionTransferStatus("", TransferStatusCompleted)) assert.False(t, CanTransitionTransferStatus("Completed", TransferStatusReversalPending)) // case-sensitive } + +// TestNoDirectTransferStatusMutation enforces the design agreed +// during item B: every mutation of SellerTransfer.Status must route +// through TransitionStatus (which validates against the matrix). The +// test scans the marketplace package for assignments matching the +// conventional variable names used for SellerTransfer values (st, t, +// transfer, row, etc.) and fails if any live outside the whitelist. +// +// Convention over AST reflects a deliberate tradeoff: a proper +// type-aware check would need go/parser + type resolution (~hours of +// yak shave for a v1.0.7 deadline). The grep-based test catches the +// likely regressions — someone writing `st.Status = "..."` in a +// future commit — and fails loudly. Names like `payout.Status` or +// `order.Status` are excluded because those operate on different +// aggregate types (SellerPayout, Order), which have their own state +// machines (not yet matrix-ified; future scope). +// +// To add a new file that legitimately needs to mutate +// SellerTransfer.Status, route through TransitionStatus. If that's +// not possible (e.g. a fixture helper that plants a row in a +// specific state for a test), add the filename to `allowlistFiles` +// with a code-comment justification. +func TestNoDirectTransferStatusMutation(t *testing.T) { + // Files allowed to use `.Status = "..."` on a SellerTransfer + // variable. Rationale for each: + // - transfer_transitions.go: defines TransitionStatus itself. + // - reversal_worker_test.go / transfer_retry_test.go / + // process_webhook_test.go / refund_test.go: planting fixtures + // in specific starting states. Tests are allowed to mutate + // pre-persistence fixture rows because those are inputs to + // the system under test, not the system itself. The fixture + // helpers still use struct-literal Status: "..." whenever + // possible (no regex match on `: "`), but mutations after + // Create are needed for some setup paths. + allowlistFiles := map[string]bool{ + "transfer_transitions.go": true, + "transfer_transitions_test.go": true, + } + + // Variable names conventionally bound to SellerTransfer. If a + // file uses `foo.Status = "bar"` with one of these names for foo, + // it's very likely a SellerTransfer mutation and must route + // through TransitionStatus. + forbiddenAssignmentPattern := regexp.MustCompile( + `\b(st|t|transfer|sellerTransfer|row|updated|fresh|transferRow)\.Status\s*=\s*"`, + ) + + // GORM update forms on a SellerTransfer model that touch "status". + // Catches tx.Model(&SellerTransfer{}).Update("status", ...) and + // Updates(map... with "status" key) in the same file. + gormModelRe := regexp.MustCompile(`Model\(&SellerTransfer\{\}\)`) + gormUpdateStatusRe := regexp.MustCompile( + `Updates?\(\s*(map\[string\]interface\{\}\s*\{[^}]*|")status"`, + ) + + entries, err := os.ReadDir(".") + require.NoError(t, err) + + var violations []string + for _, entry := range entries { + if entry.IsDir() { + continue + } + name := entry.Name() + if !strings.HasSuffix(name, ".go") { + continue + } + if allowlistFiles[name] { + continue + } + // Skip test files that legitimately plant fixtures — they use + // struct-literal `Status: "..."` which doesn't match the + // assignment regex, but may have `.Status = ` for post-create + // fixture adjustments. Test files are excluded by suffix + // (discipline relies on maintainers not writing production + // logic in _test.go, which the existing codebase respects). + if strings.HasSuffix(name, "_test.go") { + continue + } + + content, rerr := os.ReadFile(filepath.Join(".", name)) + require.NoError(t, rerr, "failed to read %s", name) + text := string(content) + + for _, match := range forbiddenAssignmentPattern.FindAllStringSubmatch(text, -1) { + violations = append(violations, fmt.Sprintf("%s: direct assignment `%s` — route through TransitionStatus instead", name, match[0])) + } + + if gormModelRe.MatchString(text) && gormUpdateStatusRe.MatchString(text) { + // Both patterns present in the file — likely a GORM + // status update. Report it. + violations = append(violations, fmt.Sprintf("%s: GORM Model(&SellerTransfer{}).Update*(\"status\"...) pattern — route through TransitionStatus instead", name)) + } + } + + if len(violations) > 0 { + t.Errorf("found %d direct SellerTransfer.Status mutation(s) outside the allowlist:\n %s", + len(violations), strings.Join(violations, "\n ")) + } +} diff --git a/veza-backend-api/internal/handlers/admin_transfer_handler.go b/veza-backend-api/internal/handlers/admin_transfer_handler.go index ef0c859a8..e29743929 100644 --- a/veza-backend-api/internal/handlers/admin_transfer_handler.go +++ b/veza-backend-api/internal/handlers/admin_transfer_handler.go @@ -123,10 +123,14 @@ func (h *AdminTransferHandler) RetryTransfer(c *gin.Context) { stripeTransferID, err := h.ts.CreateTransfer(c.Request.Context(), t.SellerID, t.AmountCents, t.Currency, t.OrderID.String()) if err != nil { - t.RetryCount++ - t.ErrorMessage = err.Error() - // Could set next_retry_at for worker to pick up later, but for manual retry we just record the failure - if saveErr := h.db.Save(&t).Error; saveErr != nil { + // Failure: stay in 'failed' (same-state) but bump retry_count and + // record the error. For manual admin retry we don't set + // next_retry_at — ops is driving this, not the worker. + failExtras := map[string]interface{}{ + "retry_count": t.RetryCount + 1, + "error_message": err.Error(), + } + if saveErr := t.TransitionStatus(h.db.WithContext(c.Request.Context()), t.Status, failExtras); saveErr != nil { h.logger.Error("RetryTransfer save failed", zap.Error(saveErr)) } h.logger.Error("RetryTransfer CreateTransfer failed", zap.Error(err), zap.String("transfer_id", idStr)) @@ -134,10 +138,11 @@ func (h *AdminTransferHandler) RetryTransfer(c *gin.Context) { return } - t.Status = "completed" - t.ErrorMessage = "" - t.StripeTransferID = stripeTransferID - if err := h.db.Save(&t).Error; err != nil { + extras := map[string]interface{}{ + "error_message": "", + "stripe_transfer_id": stripeTransferID, + } + if err := t.TransitionStatus(h.db.WithContext(c.Request.Context()), marketplace.TransferStatusCompleted, extras); err != nil { h.logger.Error("RetryTransfer save failed", zap.Error(err)) RespondWithAppError(c, apperrors.NewInternalErrorWrap("Failed to update transfer", err)) return diff --git a/veza-backend-api/internal/handlers/admin_transfer_handler_test.go b/veza-backend-api/internal/handlers/admin_transfer_handler_test.go index 8b0d878a2..bd6245ab5 100644 --- a/veza-backend-api/internal/handlers/admin_transfer_handler_test.go +++ b/veza-backend-api/internal/handlers/admin_transfer_handler_test.go @@ -45,6 +45,10 @@ func (m *mockTransferServiceAdmin) CreateTransfer(_ context.Context, _ uuid.UUID return id, nil } +func (m *mockTransferServiceAdmin) ReverseTransfer(_ context.Context, _ string, _ *int64, _ string) (string, error) { + return "rev_mock", nil +} + func TestGetTransfers_ReturnsAll(t *testing.T) { db := setupAdminTransferTestDB(t) logger := zap.NewNop() diff --git a/veza-backend-api/internal/services/stripe_connect_service.go b/veza-backend-api/internal/services/stripe_connect_service.go index 839034e50..6ccbe4d52 100644 --- a/veza-backend-api/internal/services/stripe_connect_service.go +++ b/veza-backend-api/internal/services/stripe_connect_service.go @@ -11,6 +11,7 @@ import ( "github.com/stripe/stripe-go/v82/accountlink" "github.com/stripe/stripe-go/v82/balance" "github.com/stripe/stripe-go/v82/transfer" + "github.com/stripe/stripe-go/v82/transferreversal" "go.uber.org/zap" "gorm.io/gorm" @@ -217,3 +218,45 @@ func (s *StripeConnectService) CreateTransfer(ctx context.Context, sellerUserID } return tr.ID, nil } + +// ReverseTransfer issues a reversal against a previously-created Stripe +// transfer (v1.0.7 item B). amount=nil reverses the full transfer; +// amount>0 reverses that portion (partial reversal, used for partial +// refunds in a future item — v1.0.7 always passes nil). Returns the +// Stripe reversal id on success. +// +// Error handling in this function is intentionally opaque in v1.0.7: +// every non-nil return is a raw wrapped Stripe error. Day 3 of item B +// will parse stripe.Error.Code to return the marketplace-package +// sentinels ErrTransferAlreadyReversed ("transfer was already reversed +// out-of-band") and ErrTransferNotFound ("stripe_transfer_id doesn't +// exist at Stripe — data-integrity incident"). The worker routes by +// sentinel, so this function is the single place disambiguation needs +// to happen; until day 3 lands the worker treats every error as a +// transient retry candidate. +func (s *StripeConnectService) ReverseTransfer(ctx context.Context, stripeTransferID string, amount *int64, reason string) (string, error) { + if s.secretKey == "" { + return "", ErrStripeConnectDisabled + } + if stripeTransferID == "" { + return "", fmt.Errorf("reverse stripe transfer: empty stripe_transfer_id") + } + stripe.Key = s.secretKey + + params := &stripe.TransferReversalParams{ + ID: stripe.String(stripeTransferID), + Amount: amount, + } + if reason != "" { + params.Description = stripe.String(reason) + params.AddMetadata("reason", reason) + } + rev, err := transferreversal.New(params) + if err != nil { + return "", fmt.Errorf("create stripe reversal: %w", err) + } + if rev.ID == "" { + return "", fmt.Errorf("create stripe reversal: provider returned empty reversal id") + } + return rev.ID, nil +} diff --git a/veza-backend-api/tests/integration/payment_flow_test.go b/veza-backend-api/tests/integration/payment_flow_test.go index 8773db38a..8e4c285e5 100644 --- a/veza-backend-api/tests/integration/payment_flow_test.go +++ b/veza-backend-api/tests/integration/payment_flow_test.go @@ -65,6 +65,10 @@ func (m *mockTransferService) CreateTransfer(_ context.Context, sellerUserID uui return "tr_mock", nil } +func (m *mockTransferService) ReverseTransfer(_ context.Context, _ string, _ *int64, _ string) (string, error) { + return "rev_mock", nil +} + // testAuthMiddleware reads X-User-ID header and sets user_id in context (for integration tests) type testAuthMiddleware struct{}