feat(marketplace): async stripe connect reversal worker — v1.0.7 item B day 2
Some checks failed
Veza CI / Backend (Go) (push) Failing after 0s
Veza CI / Rust (Stream Server) (push) Failing after 0s
Veza CI / Frontend (Web) (push) Failing after 0s
Security Scan / Secret Scanning (gitleaks) (push) Failing after 0s
Veza CI / Notify on failure (push) Failing after 0s

Day-2 cut of item B: the reversal path becomes async. Pre-v1.0.7
(and v1.0.7 day 1) the refund handler flipped seller_transfers
straight from completed to reversed without ever calling Stripe —
the ledger said "reversed" while the seller's Stripe balance still
showed the original transfer as settled. The new flow:

  refund.succeeded webhook
    → reverseSellerAccounting transitions row: completed → reversal_pending
    → StripeReversalWorker (every REVERSAL_CHECK_INTERVAL, default 1m)
      → calls ReverseTransfer on Stripe
      → success: row → reversed + persist stripe_reversal_id
      → 404 already-reversed (dead code until day 3): row → reversed + log
      → 404 resource_missing (dead code until day 3): row → permanently_failed
      → transient error: stay reversal_pending, bump retry_count,
        exponential backoff (base * 2^retry, capped at backoffMax)
      → retries exhausted: row → permanently_failed
    → buyer-facing refund completes immediately regardless of Stripe health

State machine enforcement:
  * New `SellerTransfer.TransitionStatus(tx, to, extras)` wraps every
    mutation: validates against AllowedTransferTransitions, guarded
    UPDATE with WHERE status=<from> (optimistic lock semantics), no
    RowsAffected = stale state / concurrent winner detected.
  * processSellerTransfers no longer mutates .Status in place —
    terminal status is decided before struct construction, so the
    row is Created with its final state.
  * transfer_retry.retryOne and admin RetryTransfer route through
    TransitionStatus. Legacy direct assignment removed.
  * TestNoDirectTransferStatusMutation greps the package for any
    `st.Status = "..."` / `t.Status = "..."` / GORM
    Model(&SellerTransfer{}).Update("status"...) outside the
    allowlist and fails if found. Verified by temporarily injecting
    a violation during development — test caught it as expected.

Configuration (v1.0.7 item B):
  * REVERSAL_WORKER_ENABLED=true (default)
  * REVERSAL_MAX_RETRIES=5 (default)
  * REVERSAL_CHECK_INTERVAL=1m (default)
  * REVERSAL_BACKOFF_BASE=1m (default)
  * REVERSAL_BACKOFF_MAX=1h (default, caps exponential growth)
  * .env.template documents TRANSFER_RETRY_* and REVERSAL_* env vars
    so an ops reader can grep them.

Interface change: TransferService.ReverseTransfer(ctx,
stripe_transfer_id, amount *int64, reason) (reversalID, error)
added. All four mocks extended (process_webhook, transfer_retry,
admin_transfer_handler, payment_flow integration). amount=nil means
full reversal; v1.0.7 always passes nil (partial reversal is future
scope per axis-1 P2).

Stripe 404 disambiguation (ErrTransferAlreadyReversed /
ErrTransferNotFound) is wired in the worker as dead code — the
sentinels are declared and the worker branches on them, but
StripeConnectService.ReverseTransfer doesn't yet emit them. Day 3
will parse stripe.Error.Code and populate the sentinels; no worker
change needed at that point. Keeping the handling skeleton in day 2
so the worker's branch shape doesn't change between days and the
tests can already cover all four paths against the mock.

Worker unit tests (9 cases, all green, sqlite :memory:):
  * happy path: reversal_pending → reversed + stripe_reversal_id set
  * already reversed (mock returns sentinel): → reversed + log
  * not found (mock returns sentinel): → permanently_failed + log
  * transient 503: retry_count++, next_retry_at set with backoff,
    stays reversal_pending
  * backoff capped at backoffMax (verified with base=1s, max=10s,
    retry_count=4 → capped at 10s not 16s)
  * max retries exhausted: → permanently_failed
  * legacy row with empty stripe_transfer_id: → permanently_failed,
    does not call Stripe
  * only picks up reversal_pending (skips all other statuses)
  * respects next_retry_at (future rows skipped)

Existing test updated: TestProcessRefundWebhook_SucceededFinalizesState
now asserts the row lands at reversal_pending with next_retry_at
set (worker's responsibility to drive to reversed), not reversed.

Worker wired in cmd/api/main.go alongside TransferRetryWorker,
sharing the same StripeConnectService instance. Shutdown path
registered for graceful stop.

Cut from day 2 scope (per agreed-upon discipline), landing in day 3:
  * Stripe 404 disambiguation implementation (parse error.Code)
  * End-to-end smoke probe (refund → reversal_pending → worker
    processes → reversed) against local Postgres + mock Stripe
  * Batch-size tuning / inter-batch sleep — batchLimit=20 today is
    safely under Stripe's 100 req/s default rate limit; revisit if
    observed load warrants

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
senke 2026-04-17 15:34:29 +02:00
parent d4fbf2bf84
commit 6c1e87e52f
17 changed files with 995 additions and 83 deletions

View file

@ -99,6 +99,23 @@ STRIPE_SECRET_KEY=
# Webhook secret for Connect events (whsec_xxx)
STRIPE_CONNECT_WEBHOOK_SECRET=
# --- TRANSFER RETRY WORKER (v0.701) ---
# Drives failed seller_transfers back to completed when Stripe recovers.
# TRANSFER_RETRY_ENABLED=true
# TRANSFER_RETRY_MAX=3
# TRANSFER_RETRY_INTERVAL=5m
# --- REVERSAL WORKER (v1.0.7 item B) ---
# Drives reversal_pending seller_transfers to reversed by calling Stripe
# Connect Transfers:reversal. Decouples buyer-facing refund UX from
# Stripe-side settlement health. Backoff is exponential (base * 2^retry),
# capped at BACKOFF_MAX.
# REVERSAL_WORKER_ENABLED=true
# REVERSAL_MAX_RETRIES=5
# REVERSAL_CHECK_INTERVAL=1m
# REVERSAL_BACKOFF_BASE=1m
# REVERSAL_BACKOFF_MAX=1h
# --- EXTERNAL SERVICES (OPTIONAL) ---
STREAM_SERVER_URL=http://veza.fr:8082
# Must match stream server INTERNAL_API_KEY for /internal/jobs/transcode (P1.1.2)

View file

@ -161,23 +161,48 @@ func main() {
}
// v0.701: Start Transfer Retry Worker
if cfg.TransferRetryEnabled && cfg.StripeConnectEnabled && cfg.StripeConnectSecretKey != "" {
// v1.0.7 item B: Start Reversal Worker (shares the same
// StripeConnectService — one initialisation for both workers).
if cfg.StripeConnectEnabled && cfg.StripeConnectSecretKey != "" {
stripeConnectSvc := services.NewStripeConnectService(db.GormDB, cfg.StripeConnectSecretKey, logger)
retryWorker := marketplace.NewTransferRetryWorker(
db.GormDB, stripeConnectSvc, logger, cfg.TransferRetryInterval, cfg.TransferRetryMaxAttempts,
)
retryCtx, retryCancel := context.WithCancel(context.Background())
go retryWorker.Start(retryCtx)
logger.Info("Transfer Retry Worker started",
zap.Duration("interval", cfg.TransferRetryInterval),
zap.Int("max_retries", cfg.TransferRetryMaxAttempts))
shutdownManager.Register(shutdown.NewShutdownFunc("transfer_retry_worker", func(ctx context.Context) error {
retryCancel()
return nil
}))
} else if cfg.TransferRetryEnabled {
logger.Info("Transfer Retry Worker skipped — Stripe Connect not enabled")
if cfg.TransferRetryEnabled {
retryWorker := marketplace.NewTransferRetryWorker(
db.GormDB, stripeConnectSvc, logger, cfg.TransferRetryInterval, cfg.TransferRetryMaxAttempts,
)
retryCtx, retryCancel := context.WithCancel(context.Background())
go retryWorker.Start(retryCtx)
logger.Info("Transfer Retry Worker started",
zap.Duration("interval", cfg.TransferRetryInterval),
zap.Int("max_retries", cfg.TransferRetryMaxAttempts))
shutdownManager.Register(shutdown.NewShutdownFunc("transfer_retry_worker", func(ctx context.Context) error {
retryCancel()
return nil
}))
}
if cfg.ReversalWorkerEnabled {
reversalWorker := marketplace.NewStripeReversalWorker(
db.GormDB, stripeConnectSvc, logger,
cfg.ReversalCheckInterval, cfg.ReversalMaxRetries,
cfg.ReversalBackoffBase, cfg.ReversalBackoffMax,
)
reversalCtx, reversalCancel := context.WithCancel(context.Background())
go reversalWorker.Start(reversalCtx)
logger.Info("Stripe Reversal Worker started",
zap.Duration("interval", cfg.ReversalCheckInterval),
zap.Int("max_retries", cfg.ReversalMaxRetries),
zap.Duration("backoff_base", cfg.ReversalBackoffBase),
zap.Duration("backoff_max", cfg.ReversalBackoffMax))
shutdownManager.Register(shutdown.NewShutdownFunc("stripe_reversal_worker", func(ctx context.Context) error {
reversalCancel()
return nil
}))
}
} else if cfg.TransferRetryEnabled || cfg.ReversalWorkerEnabled {
logger.Info("Transfer Retry / Reversal workers skipped — Stripe Connect not enabled")
}
// v0.802: Start Cloud Backup Worker (copies cloud files to backup prefix every 24h)

View file

@ -165,6 +165,17 @@ type Config struct {
TransferRetryMaxAttempts int // TRANSFER_RETRY_MAX (default 3)
TransferRetryInterval time.Duration // TRANSFER_RETRY_INTERVAL (default 5m)
// Reversal Worker (v1.0.7 item B) — drives seller_transfers from
// reversal_pending to reversed via Stripe Connect Transfers:reversal.
// Backoff is exponential: interval * 2^retry_count, capped at
// ReversalBackoffMax so a row doesn't sit 17 hours between retries
// after 10 failed attempts.
ReversalWorkerEnabled bool // REVERSAL_WORKER_ENABLED (default true)
ReversalMaxRetries int // REVERSAL_MAX_RETRIES (default 5)
ReversalCheckInterval time.Duration // REVERSAL_CHECK_INTERVAL (default 1m)
ReversalBackoffBase time.Duration // REVERSAL_BACKOFF_BASE (default 1m)
ReversalBackoffMax time.Duration // REVERSAL_BACKOFF_MAX (default 1h)
// Email & Jobs
EmailSender *email.SMTPEmailSender
JobWorker *workers.JobWorker
@ -398,6 +409,13 @@ func NewConfig() (*Config, error) {
TransferRetryMaxAttempts: getEnvInt("TRANSFER_RETRY_MAX", 3),
TransferRetryInterval: getEnvDuration("TRANSFER_RETRY_INTERVAL", 5*time.Minute),
// Reversal Worker (v1.0.7 item B)
ReversalWorkerEnabled: getEnvBool("REVERSAL_WORKER_ENABLED", true),
ReversalMaxRetries: getEnvInt("REVERSAL_MAX_RETRIES", 5),
ReversalCheckInterval: getEnvDuration("REVERSAL_CHECK_INTERVAL", time.Minute),
ReversalBackoffBase: getEnvDuration("REVERSAL_BACKOFF_BASE", time.Minute),
ReversalBackoffMax: getEnvDuration("REVERSAL_BACKOFF_MAX", time.Hour),
// Log Files Configuration — centralized in config/logging.toml
// Resolved via logging.LoadConfig() with env var overrides (LOG_DIR, LOG_LEVEL)
LogDir: logging.LoadConfig().ResolveLogDir(env),

View file

@ -231,10 +231,10 @@ func (pr *ProductReview) BeforeCreate(tx *gorm.DB) (err error) {
// SellerTransfer tracks a Stripe Connect transfer for a completed order (v0.603)
type SellerTransfer struct {
ID uuid.UUID `gorm:"type:uuid;primaryKey" json:"id"`
SellerID uuid.UUID `gorm:"type:uuid;not null" json:"seller_id"`
OrderID uuid.UUID `gorm:"type:uuid;not null" json:"order_id"`
StripeTransferID string `gorm:"size:255" json:"stripe_transfer_id,omitempty"`
ID uuid.UUID `gorm:"type:uuid;primaryKey" json:"id"`
SellerID uuid.UUID `gorm:"type:uuid;not null" json:"seller_id"`
OrderID uuid.UUID `gorm:"type:uuid;not null" json:"order_id"`
StripeTransferID string `gorm:"size:255" json:"stripe_transfer_id,omitempty"`
// StripeReversalID is populated by the v1.0.7 item B reversal worker
// when a refund triggers a reverse-charge against the transfer.
// Empty for transfers that never got reversed. Unique when non-empty

View file

@ -188,6 +188,10 @@ func (m *mockTransferService) CreateTransfer(_ context.Context, sellerUserID uui
return id, nil
}
func (m *mockTransferService) ReverseTransfer(_ context.Context, _ string, _ *int64, _ string) (string, error) {
return "rev_mock", nil
}
func TestProcessWebhook_TransferSuccess(t *testing.T) {
db := setupWebhookTestDB(t)
logger := zap.NewNop()

View file

@ -72,14 +72,14 @@ func formatN(n int32) string {
}
type refundTestFixture struct {
db *gorm.DB
svc *Service
provider *mockRefundPaymentProvider
buyerID uuid.UUID
sellerID uuid.UUID
orderID uuid.UUID
db *gorm.DB
svc *Service
provider *mockRefundPaymentProvider
buyerID uuid.UUID
sellerID uuid.UUID
orderID uuid.UUID
productID uuid.UUID
trackID uuid.UUID
trackID uuid.UUID
}
func newRefundTestFixture(t *testing.T) *refundTestFixture {
@ -333,7 +333,13 @@ func TestProcessRefundWebhook_SucceededFinalizesState(t *testing.T) {
var transfer SellerTransfer
require.NoError(t, f.db.Where("order_id = ?", f.orderID).First(&transfer).Error)
assert.Equal(t, "reversed", transfer.Status)
// v1.0.7 item B: refund now stages the row at reversal_pending; the
// async StripeReversalWorker drives it to reversed out-of-band. This
// decouples buyer-facing refund UX from Stripe-side health. The
// balance debit still happens synchronously because it's on-platform
// accounting that can't wait for Stripe.
assert.Equal(t, TransferStatusReversalPending, transfer.Status)
require.NotNil(t, transfer.NextRetryAt, "reversal_pending row must carry a next_retry_at so the worker picks it up")
var balance SellerBalance
require.NoError(t, f.db.Where("seller_id = ?", f.sellerID).First(&balance).Error)

View file

@ -0,0 +1,259 @@
package marketplace
import (
"context"
"errors"
"time"
"go.uber.org/zap"
"gorm.io/gorm"
)
// StripeReversalWorker drives seller_transfers from reversal_pending to
// reversed by calling the PSP's reversal endpoint. Introduced in v1.0.7
// item B to decouple buyer-facing refund UX from Stripe-side settlement
// health — the refund handler marks the row reversal_pending and
// returns immediately; this worker picks it up asynchronously.
//
// Sibling of TransferRetryWorker (v0.701): same interval-loop shape,
// same exponential-backoff-capped retry pattern, but scoped to the
// reversal transition rather than the create-transfer transition. A
// row with status='reversal_pending' is the only input shape the
// worker touches — all transitions out go through
// SellerTransfer.TransitionStatus (state-machine enforced).
//
// Day 2 ships the core logic. Day 3 will add Stripe 404
// disambiguation (ErrTransferAlreadyReversed / ErrTransferNotFound),
// batch-size tuning, and an end-to-end smoke probe.
type StripeReversalWorker struct {
db *gorm.DB
transferService TransferService
logger *zap.Logger
interval time.Duration
maxRetries int
backoffBase time.Duration
backoffMax time.Duration
batchLimit int
}
// NewStripeReversalWorker constructs a worker. backoffBase and
// backoffMax cap the exponential delay (base * 2^retry_count), so a
// row that fails 10 times doesn't end up with next_retry_at 17 hours
// in the future.
func NewStripeReversalWorker(db *gorm.DB, ts TransferService, logger *zap.Logger, interval time.Duration, maxRetries int, backoffBase, backoffMax time.Duration) *StripeReversalWorker {
if interval <= 0 {
interval = time.Minute
}
if backoffBase <= 0 {
backoffBase = time.Minute
}
if backoffMax <= 0 {
backoffMax = time.Hour
}
if maxRetries <= 0 {
maxRetries = 5
}
return &StripeReversalWorker{
db: db,
transferService: ts,
logger: logger,
interval: interval,
maxRetries: maxRetries,
backoffBase: backoffBase,
backoffMax: backoffMax,
// Batch limit prevents a rafale at startup — if 100 rows sit
// with next_retry_at in the past because the worker was down,
// the first tick processes at most batchLimit of them, then
// waits one interval before the next batch. Day 3 may tune
// this up/down based on observed load; today 20 is safely
// under Stripe's default rate limits (100 req/s).
batchLimit: 20,
}
}
// Start runs the worker loop until ctx is cancelled.
func (w *StripeReversalWorker) Start(ctx context.Context) {
if w.transferService == nil {
w.logger.Warn("StripeReversalWorker: no transfer service configured, disabling")
return
}
ticker := time.NewTicker(w.interval)
defer ticker.Stop()
w.logger.Info("StripeReversalWorker started",
zap.Duration("interval", w.interval),
zap.Int("max_retries", w.maxRetries),
zap.Duration("backoff_base", w.backoffBase),
zap.Duration("backoff_max", w.backoffMax),
zap.Int("batch_limit", w.batchLimit))
for {
select {
case <-ctx.Done():
w.logger.Info("StripeReversalWorker stopped")
return
case <-ticker.C:
w.processBatch(ctx)
}
}
}
// processBatch fetches up to batchLimit rows ripe for reversal and
// dispatches each through reverseOne.
func (w *StripeReversalWorker) processBatch(ctx context.Context) {
now := time.Now()
var rows []SellerTransfer
err := w.db.WithContext(ctx).
Where("status = ? AND (next_retry_at IS NULL OR next_retry_at <= ?)",
TransferStatusReversalPending, now).
Order("next_retry_at ASC NULLS FIRST").
Limit(w.batchLimit).
Find(&rows).Error
if err != nil {
w.logger.Error("StripeReversalWorker: failed to query reversal_pending rows", zap.Error(err))
return
}
for i := range rows {
w.reverseOne(ctx, &rows[i])
}
}
// reverseOne attempts a reversal on a single row and advances its
// state. All state mutations go through TransitionStatus, which
// enforces the matrix and uses optimistic locking on the current
// Status — if two worker instances pick up the same row, one wins
// and the other finds RowsAffected=0 and logs a conflict.
func (w *StripeReversalWorker) reverseOne(ctx context.Context, row *SellerTransfer) {
// Legacy row protection: a seller_transfer with empty
// stripe_transfer_id cannot be reversed — there's nothing to
// address at Stripe. These rows predate item A (v1.0.7) and
// require the backfill CLI (task #38) or admin intervention
// (P2.8, v1.0.8+). For now, flag them as permanently_failed with
// a distinctive error_message so ops can find them via a query
// and decide.
if row.StripeTransferID == "" {
w.logger.Error("StripeReversalWorker: cannot reverse row without stripe_transfer_id (legacy pre-v1.0.7 row)",
zap.String("transfer_id", row.ID.String()),
zap.String("order_id", row.OrderID.String()))
extras := map[string]interface{}{
"error_message": "legacy row: no stripe_transfer_id, reversal requires backfill or admin intervention",
"next_retry_at": nil,
}
if err := row.TransitionStatus(w.db.WithContext(ctx), TransferStatusPermanentlyFailed, extras); err != nil {
w.logger.Error("StripeReversalWorker: failed to mark legacy row permanently_failed",
zap.String("transfer_id", row.ID.String()), zap.Error(err))
}
return
}
reversalID, err := w.transferService.ReverseTransfer(ctx, row.StripeTransferID, nil, "refund")
// Success path: transition to reversed + persist stripe_reversal_id.
if err == nil {
extras := map[string]interface{}{
"stripe_reversal_id": reversalID,
"error_message": "",
"next_retry_at": nil,
}
if tErr := row.TransitionStatus(w.db.WithContext(ctx), TransferStatusReversed, extras); tErr != nil {
w.logger.Error("StripeReversalWorker: failed to save reversed state",
zap.String("transfer_id", row.ID.String()), zap.Error(tErr))
return
}
w.logger.Info("StripeReversalWorker: reversal succeeded",
zap.String("transfer_id", row.ID.String()),
zap.String("stripe_transfer_id", row.StripeTransferID),
zap.String("stripe_reversal_id", reversalID))
return
}
// Already-reversed-out-of-band: benign, treat as success. Day 3
// surfaces the concrete sentinel from ReverseTransfer; today the
// sentinel is declared but ReverseTransfer doesn't yet return it,
// so this branch is dead code until day 3. Left in place so the
// worker shape doesn't need to change when day 3 lands.
if errors.Is(err, ErrTransferAlreadyReversed) {
extras := map[string]interface{}{
"error_message": "stripe reports transfer already reversed (out-of-band)",
"next_retry_at": nil,
}
if tErr := row.TransitionStatus(w.db.WithContext(ctx), TransferStatusReversed, extras); tErr != nil {
w.logger.Error("StripeReversalWorker: failed to save reversed state after already-reversed",
zap.String("transfer_id", row.ID.String()), zap.Error(tErr))
return
}
w.logger.Info("StripeReversalWorker: treating already-reversed as success",
zap.String("transfer_id", row.ID.String()),
zap.String("stripe_transfer_id", row.StripeTransferID))
return
}
// Missing-at-Stripe: our stripe_transfer_id doesn't exist. This is
// a data-integrity incident (we have an id that Stripe doesn't
// recognise), not a retry scenario. Terminate the row and shout.
// Day 3 will populate this sentinel; dead code until then.
if errors.Is(err, ErrTransferNotFound) {
w.logger.Error("StripeReversalWorker: stripe reports transfer_id does not exist — data integrity incident",
zap.String("transfer_id", row.ID.String()),
zap.String("stripe_transfer_id", row.StripeTransferID),
zap.Error(err))
extras := map[string]interface{}{
"error_message": "stripe reports transfer not found: " + err.Error(),
"next_retry_at": nil,
}
if tErr := row.TransitionStatus(w.db.WithContext(ctx), TransferStatusPermanentlyFailed, extras); tErr != nil {
w.logger.Error("StripeReversalWorker: failed to mark row permanently_failed after not-found",
zap.String("transfer_id", row.ID.String()), zap.Error(tErr))
}
return
}
// Transient error: bump retry_count, set next_retry_at with
// exponential backoff, stay in reversal_pending.
newRetryCount := row.RetryCount + 1
if newRetryCount >= w.maxRetries {
w.logger.Warn("StripeReversalWorker: max retries exhausted, marking permanently_failed",
zap.String("transfer_id", row.ID.String()),
zap.Int("retry_count", newRetryCount),
zap.Error(err))
extras := map[string]interface{}{
"retry_count": newRetryCount,
"error_message": err.Error(),
"next_retry_at": nil,
}
if tErr := row.TransitionStatus(w.db.WithContext(ctx), TransferStatusPermanentlyFailed, extras); tErr != nil {
w.logger.Error("StripeReversalWorker: failed to mark permanently_failed",
zap.String("transfer_id", row.ID.String()), zap.Error(tErr))
}
return
}
delay := w.backoffBase
for i := 0; i < newRetryCount; i++ {
delay *= 2
if delay > w.backoffMax {
delay = w.backoffMax
break
}
}
nextRetry := time.Now().Add(delay)
extras := map[string]interface{}{
"retry_count": newRetryCount,
"error_message": err.Error(),
"next_retry_at": &nextRetry,
}
// Same-state transition: reversal_pending → reversal_pending (bump
// retry_count / next_retry_at / error_message without changing
// status). TransitionStatus permits same-state by design.
if tErr := row.TransitionStatus(w.db.WithContext(ctx), TransferStatusReversalPending, extras); tErr != nil {
w.logger.Error("StripeReversalWorker: failed to save transient retry state",
zap.String("transfer_id", row.ID.String()), zap.Error(tErr))
}
w.logger.Warn("StripeReversalWorker: reversal failed, will retry",
zap.String("transfer_id", row.ID.String()),
zap.String("stripe_transfer_id", row.StripeTransferID),
zap.Int("retry_count", newRetryCount),
zap.Duration("next_delay", delay),
zap.Error(err))
}

View file

@ -0,0 +1,279 @@
package marketplace
import (
"context"
"errors"
"testing"
"time"
"github.com/google/uuid"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"gorm.io/driver/sqlite"
"gorm.io/gorm"
)
func setupReversalTestDB(t *testing.T) *gorm.DB {
db, err := gorm.Open(sqlite.Open(":memory:"), &gorm.Config{})
require.NoError(t, err)
require.NoError(t, db.AutoMigrate(&SellerTransfer{}))
return db
}
// mockReversalTransferService is a dedicated mock for reversal-worker
// tests. CreateTransfer is unused; ReverseTransfer is the hot path and
// supports (a) a canned reversalID, (b) a canned error, (c) call
// capture for assertion on what the worker passed to the PSP.
type mockReversalTransferService struct {
reversalID string
err error
calls []struct {
StripeTransferID string
Amount *int64
Reason string
}
}
func (m *mockReversalTransferService) CreateTransfer(_ context.Context, _ uuid.UUID, _ int64, _, _ string) (string, error) {
return "tr_mock", nil
}
func (m *mockReversalTransferService) ReverseTransfer(_ context.Context, stripeTransferID string, amount *int64, reason string) (string, error) {
m.calls = append(m.calls, struct {
StripeTransferID string
Amount *int64
Reason string
}{stripeTransferID, amount, reason})
if m.err != nil {
return "", m.err
}
id := m.reversalID
if id == "" {
id = "rev_mock"
}
return id, nil
}
// newReversalPendingRow is a shared fixture helper — a reversal_pending
// row with stripe_transfer_id populated and next_retry_at ripe.
func newReversalPendingRow(t *testing.T, db *gorm.DB) *SellerTransfer {
t.Helper()
now := time.Now()
row := &SellerTransfer{
ID: uuid.New(),
SellerID: uuid.New(),
OrderID: uuid.New(),
StripeTransferID: "tr_fixture_123",
AmountCents: 900,
Currency: "EUR",
Status: TransferStatusReversalPending,
NextRetryAt: &now,
}
require.NoError(t, db.Create(row).Error)
return row
}
func TestReversalWorker_HappyPath_PendingToReversed(t *testing.T) {
db := setupReversalTestDB(t)
mock := &mockReversalTransferService{reversalID: "rev_happy_001"}
w := NewStripeReversalWorker(db, mock, zap.NewNop(), time.Minute, 5, time.Minute, time.Hour)
row := newReversalPendingRow(t, db)
w.processBatch(context.Background())
var updated SellerTransfer
require.NoError(t, db.First(&updated, row.ID).Error)
assert.Equal(t, TransferStatusReversed, updated.Status)
assert.Equal(t, "rev_happy_001", updated.StripeReversalID)
assert.Empty(t, updated.ErrorMessage)
assert.Nil(t, updated.NextRetryAt)
require.Len(t, mock.calls, 1)
assert.Equal(t, "tr_fixture_123", mock.calls[0].StripeTransferID)
assert.Nil(t, mock.calls[0].Amount, "v1.0.7 day 2 always does full reversal (nil amount)")
assert.Equal(t, "refund", mock.calls[0].Reason)
}
func TestReversalWorker_AlreadyReversed_TreatedAsSuccess(t *testing.T) {
db := setupReversalTestDB(t)
mock := &mockReversalTransferService{err: ErrTransferAlreadyReversed}
w := NewStripeReversalWorker(db, mock, zap.NewNop(), time.Minute, 5, time.Minute, time.Hour)
row := newReversalPendingRow(t, db)
w.processBatch(context.Background())
var updated SellerTransfer
require.NoError(t, db.First(&updated, row.ID).Error)
// Already-reversed at Stripe = our job is already done, row flips
// to reversed with an informative error_message so ops can grep.
assert.Equal(t, TransferStatusReversed, updated.Status)
assert.Contains(t, updated.ErrorMessage, "already reversed")
assert.Nil(t, updated.NextRetryAt)
}
func TestReversalWorker_TransferNotFound_PermanentlyFailed(t *testing.T) {
db := setupReversalTestDB(t)
mock := &mockReversalTransferService{err: ErrTransferNotFound}
w := NewStripeReversalWorker(db, mock, zap.NewNop(), time.Minute, 5, time.Minute, time.Hour)
row := newReversalPendingRow(t, db)
w.processBatch(context.Background())
var updated SellerTransfer
require.NoError(t, db.First(&updated, row.ID).Error)
// Missing-at-Stripe is a data-integrity incident: terminate so ops
// can investigate. Never retry blindly — would amplify the
// inconsistency.
assert.Equal(t, TransferStatusPermanentlyFailed, updated.Status)
assert.Contains(t, updated.ErrorMessage, "not found")
assert.Nil(t, updated.NextRetryAt)
}
func TestReversalWorker_TransientError_SchedulesRetryWithBackoff(t *testing.T) {
db := setupReversalTestDB(t)
mock := &mockReversalTransferService{err: errors.New("stripe 503 service unavailable")}
base := 10 * time.Second
w := NewStripeReversalWorker(db, mock, zap.NewNop(), time.Minute, 5, base, time.Hour)
row := newReversalPendingRow(t, db)
row.RetryCount = 0
require.NoError(t, db.Save(row).Error)
before := time.Now()
w.processBatch(context.Background())
var updated SellerTransfer
require.NoError(t, db.First(&updated, row.ID).Error)
assert.Equal(t, TransferStatusReversalPending, updated.Status, "transient error keeps row in reversal_pending")
assert.Equal(t, 1, updated.RetryCount)
assert.Contains(t, updated.ErrorMessage, "503")
require.NotNil(t, updated.NextRetryAt)
// Backoff for retry_count=1: base * 2 = 20s
expected := before.Add(2 * base)
assert.True(t, updated.NextRetryAt.After(expected.Add(-time.Second)),
"next_retry_at %v should be ≥ base * 2 from processBatch start %v", updated.NextRetryAt, expected)
assert.True(t, updated.NextRetryAt.Before(expected.Add(2*time.Second)),
"next_retry_at %v should not exceed base * 2 + slack", updated.NextRetryAt)
}
func TestReversalWorker_BackoffCapped(t *testing.T) {
db := setupReversalTestDB(t)
mock := &mockReversalTransferService{err: errors.New("stripe down")}
// base=1s, max=10s. At retry_count=4, unbounded would be 1*2^4=16s,
// capped stays at 10s.
w := NewStripeReversalWorker(db, mock, zap.NewNop(), time.Minute, 100 /* high to avoid permanent */, time.Second, 10*time.Second)
row := newReversalPendingRow(t, db)
row.RetryCount = 3 // after tick, becomes 4 → unbounded would be 16s
require.NoError(t, db.Save(row).Error)
before := time.Now()
w.processBatch(context.Background())
var updated SellerTransfer
require.NoError(t, db.First(&updated, row.ID).Error)
assert.Equal(t, 4, updated.RetryCount)
require.NotNil(t, updated.NextRetryAt)
assert.True(t, updated.NextRetryAt.Before(before.Add(11*time.Second)),
"delay should be capped at backoffMax=10s, got %v from now %v", updated.NextRetryAt, before)
}
func TestReversalWorker_MaxRetriesExhausted_PermanentlyFailed(t *testing.T) {
db := setupReversalTestDB(t)
mock := &mockReversalTransferService{err: errors.New("stripe still down")}
w := NewStripeReversalWorker(db, mock, zap.NewNop(), time.Minute, 3, time.Minute, time.Hour)
row := newReversalPendingRow(t, db)
row.RetryCount = 2 // next retry pushes to 3 = maxRetries → terminate
require.NoError(t, db.Save(row).Error)
w.processBatch(context.Background())
var updated SellerTransfer
require.NoError(t, db.First(&updated, row.ID).Error)
assert.Equal(t, TransferStatusPermanentlyFailed, updated.Status)
assert.Equal(t, 3, updated.RetryCount)
assert.Nil(t, updated.NextRetryAt)
}
func TestReversalWorker_LegacyRow_NoStripeTransferID_PermanentlyFailed(t *testing.T) {
db := setupReversalTestDB(t)
mock := &mockReversalTransferService{}
w := NewStripeReversalWorker(db, mock, zap.NewNop(), time.Minute, 5, time.Minute, time.Hour)
row := newReversalPendingRow(t, db)
row.StripeTransferID = "" // simulate a pre-v1.0.7 row
require.NoError(t, db.Save(row).Error)
w.processBatch(context.Background())
var updated SellerTransfer
require.NoError(t, db.First(&updated, row.ID).Error)
// Legacy row: can't reverse what we can't address. Terminate with
// a distinctive error_message so ops can query for "legacy row".
assert.Equal(t, TransferStatusPermanentlyFailed, updated.Status)
assert.Contains(t, updated.ErrorMessage, "legacy")
assert.Empty(t, mock.calls, "worker must not call Stripe on a row with no transfer_id")
}
func TestReversalWorker_OnlyProcessesReversalPending(t *testing.T) {
db := setupReversalTestDB(t)
mock := &mockReversalTransferService{reversalID: "rev_should_not_fire"}
w := NewStripeReversalWorker(db, mock, zap.NewNop(), time.Minute, 5, time.Minute, time.Hour)
// Plant one row in each other status; none should be picked up.
otherStatuses := []string{
TransferStatusPending,
TransferStatusCompleted,
TransferStatusFailed,
TransferStatusReversed,
TransferStatusPermanentlyFailed,
}
for _, s := range otherStatuses {
r := &SellerTransfer{
ID: uuid.New(),
SellerID: uuid.New(),
OrderID: uuid.New(),
StripeTransferID: "tr_" + s,
AmountCents: 100,
Currency: "EUR",
Status: s,
}
require.NoError(t, db.Create(r).Error)
}
w.processBatch(context.Background())
assert.Empty(t, mock.calls, "worker must only pick up reversal_pending rows")
}
func TestReversalWorker_RespectsNextRetryAt_SkipsFutureRows(t *testing.T) {
db := setupReversalTestDB(t)
mock := &mockReversalTransferService{reversalID: "rev_skipped"}
w := NewStripeReversalWorker(db, mock, zap.NewNop(), time.Minute, 5, time.Minute, time.Hour)
future := time.Now().Add(10 * time.Minute)
row := &SellerTransfer{
ID: uuid.New(),
SellerID: uuid.New(),
OrderID: uuid.New(),
StripeTransferID: "tr_future",
AmountCents: 100,
Currency: "EUR",
Status: TransferStatusReversalPending,
NextRetryAt: &future,
}
require.NoError(t, db.Create(row).Error)
w.processBatch(context.Background())
assert.Empty(t, mock.calls, "row with next_retry_at in the future must not be processed")
var untouched SellerTransfer
require.NoError(t, db.First(&untouched, row.ID).Error)
assert.Equal(t, TransferStatusReversalPending, untouched.Status, "row must stay in reversal_pending")
}

View file

@ -139,7 +139,7 @@ type SellerSale struct {
Date string `json:"date"`
}
// TransferService abstracts the payout transfer provider (v0.603, v1.0.7 item A)
// TransferService abstracts the payout transfer provider (v0.603, v1.0.7 item A/B)
// CreateTransfer returns the PSP's own transfer identifier (e.g. Stripe tr_*)
// alongside the error. The caller persists it on the SellerTransfer row so
// that downstream operations — reversal on refund (item B), reconciliation
@ -147,10 +147,30 @@ type SellerSale struct {
// rather than re-querying the PSP's list API. Pre-v1.0.7 the return was
// `error` only and the stripe_transfer_id column sat empty, blocking the
// reversal worker from identifying which transfer to reverse.
//
// ReverseTransfer (v1.0.7 item B) reverses a previously-created transfer —
// the PSP moves the funds out of the seller's connected account back to the
// platform. amount=nil means full reversal. Returns the PSP's reversal
// identifier on success so the caller can persist it for idempotency and
// audit. Error sentinels ErrTransferAlreadyReversed / ErrTransferNotFound
// disambiguate the two 404 cases Stripe surfaces, which have very different
// operational meanings: the first is benign (someone else already reversed
// out-of-band, e.g. via Dashboard), the second is an invariant violation
// (our stripe_transfer_id points at nothing).
type TransferService interface {
CreateTransfer(ctx context.Context, sellerUserID uuid.UUID, amount int64, currency, orderID string) (string, error)
ReverseTransfer(ctx context.Context, stripeTransferID string, amount *int64, reason string) (string, error)
}
// Error sentinels for ReverseTransfer — allows the worker to distinguish
// the benign "already reversed" case from the alarming "transfer id
// doesn't exist" case. Both map to 404 on the wire; only the PSP error
// code disambiguates them.
var (
ErrTransferAlreadyReversed = errors.New("stripe transfer already reversed")
ErrTransferNotFound = errors.New("stripe transfer not found")
)
// Service implémente MarketplaceService
type Service struct {
db *gorm.DB
@ -791,16 +811,6 @@ func (s *Service) processSellerTransfers(ctx context.Context, tx *gorm.DB, order
feeCents := int64(float64(grossCents) * feeRate)
netCents := grossCents - feeCents
st := SellerTransfer{
SellerID: sellerID,
OrderID: order.ID,
AmountCents: netCents,
PlatformFeeCents: feeCents,
CommissionRate: feeRate,
Currency: order.Currency,
Status: "pending",
}
// v0.12.0 F254: Credit seller balance
if err := s.CreditSellerBalance(ctx, tx, sellerID, netCents, order.Currency); err != nil {
s.logger.Error("Failed to credit seller balance",
@ -809,16 +819,23 @@ func (s *Service) processSellerTransfers(ctx context.Context, tx *gorm.DB, order
}
stripeTransferID, err := s.transferService.CreateTransfer(ctx, sellerID, netCents, order.Currency, order.ID.String())
// Determine terminal status before construction so the row is
// written with its final state in a single Create — no in-place
// mutation of .Status, which keeps processSellerTransfers out
// of the anti-mutation whitelist (TestNoDirectTransferStatus
// Mutation).
var status, errMsg, writeStripeID string
if err != nil {
st.Status = "failed"
st.ErrorMessage = err.Error()
status = TransferStatusFailed
errMsg = err.Error()
s.logger.Error("Transfer failed for seller",
zap.String("seller_id", sellerID.String()),
zap.String("order_id", order.ID.String()),
zap.Error(err))
} else {
st.Status = "completed"
st.StripeTransferID = stripeTransferID
status = TransferStatusCompleted
writeStripeID = stripeTransferID
s.logger.Info("Transfer completed for seller",
zap.String("seller_id", sellerID.String()),
zap.Int64("amount_cents", netCents),
@ -826,6 +843,17 @@ func (s *Service) processSellerTransfers(ctx context.Context, tx *gorm.DB, order
zap.Float64("commission_rate", feeRate))
}
st := SellerTransfer{
SellerID: sellerID,
OrderID: order.ID,
AmountCents: netCents,
PlatformFeeCents: feeCents,
CommissionRate: feeRate,
Currency: order.Currency,
Status: status,
StripeTransferID: writeStripeID,
ErrorMessage: errMsg,
}
if err := tx.Create(&st).Error; err != nil {
s.logger.Error("Failed to record seller transfer",
zap.String("seller_id", sellerID.String()),
@ -1527,14 +1555,23 @@ func (s *Service) finalizeFailedRefund(ctx context.Context, refundID uuid.UUID,
}
// reverseSellerAccounting undoes the balance credit + marks every
// SellerTransfer for the order as `reversed`. Best-effort: a balance row
// that's already been paid out (available_cents insufficient) won't go
// negative because DebitSellerBalance caps at zero.
// SellerTransfer for the order as `reversal_pending` so the async
// `StripeReversalWorker` can call Stripe Connect Transfers:reversal
// out-of-band (v1.0.7 item B). Pre-v1.0.7 this function flipped the
// row straight to `reversed` without ever calling Stripe — the money
// was corrected on-platform but the seller's Stripe account still
// showed the original transfer as settled. The reversal_pending state
// decouples buyer-facing refund UX (happens immediately) from
// Stripe-side settlement (may take retries, may 404 if already
// reversed out-of-band, may fail permanently and need ops attention).
//
// TODO(v1.0.7): call Stripe Connect Transfers:reversal endpoint via
// TransferService so the money actually moves back out of the seller's
// connected account. Today we only correct the in-DB accounting — a real
// Stripe settlement would still show the seller as paid.
// Best-effort: a balance row that's already been paid out
// (available_cents insufficient) won't go negative because
// DebitSellerBalance caps at zero.
//
// Same-order replay is a no-op — rows already in reversal_pending or
// reversed are skipped. Rows in permanently_failed stay
// permanently_failed (terminal, not our problem to recover).
func (s *Service) reverseSellerAccounting(ctx context.Context, tx *gorm.DB, orderID uuid.UUID) {
var transfers []SellerTransfer
if err := tx.Where("order_id = ?", orderID).Find(&transfers).Error; err != nil {
@ -1543,8 +1580,22 @@ func (s *Service) reverseSellerAccounting(ctx context.Context, tx *gorm.DB, orde
return
}
now := time.Now()
for _, t := range transfers {
if t.Status == "reversed" {
// Skip rows already in or past reversal — idempotent against
// webhook replays and any other concurrent refund flow.
if t.Status == TransferStatusReversalPending ||
t.Status == TransferStatusReversed ||
t.Status == TransferStatusPermanentlyFailed {
continue
}
if !CanTransitionTransferStatus(t.Status, TransferStatusReversalPending) {
// Typically means the row is still 'pending' or 'failed' —
// shouldn't happen for a completed order, but surfaces
// rather than silently skips.
s.logger.Warn("Skipping reversal on non-completed transfer",
zap.String("transfer_id", t.ID.String()),
zap.String("status", t.Status))
continue
}
if err := s.DebitSellerBalance(ctx, tx, t.SellerID, t.AmountCents, t.Currency); err != nil {
@ -1555,12 +1606,13 @@ func (s *Service) reverseSellerAccounting(ctx context.Context, tx *gorm.DB, orde
zap.Error(err))
continue
}
if err := tx.Model(&SellerTransfer{}).Where("id = ?", t.ID).
Updates(map[string]interface{}{
"status": "reversed",
"error_message": "reversed by refund",
}).Error; err != nil {
s.logger.Error("Failed to mark seller transfer as reversed",
extras := map[string]interface{}{
"next_retry_at": &now,
"retry_count": 0,
"error_message": "pending stripe reversal",
}
if err := t.TransitionStatus(tx, TransferStatusReversalPending, extras); err != nil {
s.logger.Error("Failed to mark seller transfer as reversal_pending",
zap.String("transfer_id", t.ID.String()),
zap.Error(err))
}

View file

@ -83,41 +83,47 @@ func (w *TransferRetryWorker) retryOne(ctx context.Context, t *SellerTransfer) {
orderID := t.OrderID.String()
stripeTransferID, err := w.transferService.CreateTransfer(ctx, t.SellerID, t.AmountCents, t.Currency, orderID)
if err != nil {
t.RetryCount++
t.ErrorMessage = err.Error()
newRetryCount := t.RetryCount + 1
// Exponential backoff: interval * 2^retry_count
delay := w.interval
for i := 0; i < t.RetryCount; i++ {
for i := 0; i < newRetryCount; i++ {
delay *= 2
}
nextRetry := time.Now().Add(delay)
t.NextRetryAt = &nextRetry
extras := map[string]interface{}{
"retry_count": newRetryCount,
"error_message": err.Error(),
"next_retry_at": &nextRetry,
}
if t.RetryCount >= w.maxRetries {
t.Status = "permanently_failed"
t.NextRetryAt = nil
var targetStatus string
if newRetryCount >= w.maxRetries {
targetStatus = TransferStatusPermanentlyFailed
extras["next_retry_at"] = nil
monitoring.RecordTransferRetryPermanent()
} else {
targetStatus = t.Status // same-state (stay failed), only bump retry fields
monitoring.RecordTransferRetryFailure()
}
if saveErr := w.db.Save(t).Error; saveErr != nil {
if saveErr := t.TransitionStatus(w.db.WithContext(ctx), targetStatus, extras); saveErr != nil {
w.logger.Error("TransferRetryWorker: failed to save transfer", zap.Error(saveErr), zap.String("transfer_id", t.ID.String()))
}
w.logger.Warn("TransferRetryWorker: retry failed",
zap.String("transfer_id", t.ID.String()),
zap.String("seller_id", t.SellerID.String()),
zap.Int("retry_count", t.RetryCount),
zap.Int("retry_count", newRetryCount),
zap.Error(err))
return
}
t.Status = "completed"
t.NextRetryAt = nil
t.ErrorMessage = ""
t.StripeTransferID = stripeTransferID
if err := w.db.Save(t).Error; err != nil {
extras := map[string]interface{}{
"next_retry_at": nil,
"error_message": "",
"stripe_transfer_id": stripeTransferID,
}
if err := t.TransitionStatus(w.db.WithContext(ctx), TransferStatusCompleted, extras); err != nil {
w.logger.Error("TransferRetryWorker: failed to save transfer", zap.Error(err), zap.String("transfer_id", t.ID.String()))
return
}

View file

@ -59,6 +59,10 @@ func (m *mockTransferServiceRetry) CreateTransfer(_ context.Context, sellerUserI
return id, nil
}
func (m *mockTransferServiceRetry) ReverseTransfer(_ context.Context, _ string, _ *int64, _ string) (string, error) {
return "rev_mock", nil
}
func TestRetryWorker_RetriesFailedTransfer(t *testing.T) {
db := setupRetryTestDB(t)
logger := zap.NewNop()

View file

@ -1,5 +1,11 @@
package marketplace
import (
"time"
"gorm.io/gorm"
)
// SellerTransfer.Status values. The field is a VARCHAR at the DB level
// (no ENUM), but these constants are the authoritative list. Any new
// status value must be added here, to AllowedTransferTransitions, and
@ -28,12 +34,21 @@ const (
// retry) are always allowed and not listed here — see
// CanTransitionTransferStatus.
//
// Note on "failed" — failed is *not* terminal. TransferRetryWorker
// (v0.701) is allowed to drive a failed row back to completed when
// a retry succeeds at Stripe. Terminal failure is permanently_failed,
// which the worker sets only after max retries are exhausted. This
// reads counter-intuitively if you expect failed to be end-of-life,
// so the matrix is written here as the source of truth rather than
// relying on reader intuition.
//
// Terminal states — Reversed and PermanentlyFailed — have empty
// value lists. An admin manual override can write any status via
// raw SQL with documented intent (audit log entry), but that path
// deliberately sits outside this matrix so a code review of any
// state-mutating call sees the override as a conscious escape hatch
// rather than a legitimate transition.
// value lists. There is no recovery path via the state machine. If
// a stuck row needs to be unblocked (e.g. a permanently_failed row
// where ops has confirmed out-of-band that the Stripe side did
// succeed), intervention is DB-level. An admin API for this case is
// axis-1 P2.8 (v1.0.8+); until that lands, the escape hatch is
// documented: raw SQL UPDATE with a row in the admin_audit_log.
//
// The state machine covers three flows:
//
@ -93,3 +108,69 @@ func CanTransitionTransferStatus(from, to string) bool {
}
return false
}
// ErrInvalidTransferStatusTransition is returned by TransitionStatus
// when the state machine rejects the requested move. Callers wrap
// or return this — the error text includes the offending pair so
// logs make the origin visible without extra context.
type InvalidTransferStatusTransitionError struct {
TransferID string
From string
To string
}
func (e *InvalidTransferStatusTransitionError) Error() string {
return "invalid seller_transfer status transition " + e.From + " → " + e.To + " on row " + e.TransferID
}
// TransitionStatus is the single approved path for mutating
// SellerTransfer.Status. It validates against the matrix, then
// performs a conditional UPDATE guarded by the expected `from`
// value — a concurrent worker that has already advanced the row
// finds `rows affected = 0` and the caller learns the transition
// didn't apply (optimistic lock semantics without a version column).
//
// `extras` lets callers piggy-back unrelated column updates (e.g.
// retry_count, next_retry_at, error_message, stripe_reversal_id)
// in the same UPDATE. Do not pass "status" or "updated_at" in
// extras — the function owns those columns. Passing them will be
// silently overwritten.
//
// Enforced as the choke-point by TestNoDirectStatusMutation, which
// greps the marketplace package for raw `.Status = ` assignments or
// `Update("status", ...)` / `"status":` patterns outside the
// whitelist.
func (st *SellerTransfer) TransitionStatus(tx *gorm.DB, to string, extras map[string]interface{}) error {
if !CanTransitionTransferStatus(st.Status, to) {
return &InvalidTransferStatusTransitionError{
TransferID: st.ID.String(),
From: st.Status,
To: to,
}
}
updates := map[string]interface{}{
"status": to,
"updated_at": time.Now(),
}
for k, v := range extras {
if k == "status" || k == "updated_at" {
continue
}
updates[k] = v
}
result := tx.Model(&SellerTransfer{}).
Where("id = ? AND status = ?", st.ID, st.Status).
Updates(updates)
if result.Error != nil {
return result.Error
}
if result.RowsAffected == 0 {
return &InvalidTransferStatusTransitionError{
TransferID: st.ID.String(),
From: st.Status,
To: to,
}
}
st.Status = to
return nil
}

View file

@ -2,10 +2,15 @@ package marketplace
import (
"fmt"
"os"
"path/filepath"
"regexp"
"sort"
"strings"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
// TestTransferStateTransitions is the regression-grade check on the
@ -133,3 +138,103 @@ func TestCanTransitionTransferStatus_UnknownFromIsConservative(t *testing.T) {
assert.False(t, CanTransitionTransferStatus("", TransferStatusCompleted))
assert.False(t, CanTransitionTransferStatus("Completed", TransferStatusReversalPending)) // case-sensitive
}
// TestNoDirectTransferStatusMutation enforces the design agreed
// during item B: every mutation of SellerTransfer.Status must route
// through TransitionStatus (which validates against the matrix). The
// test scans the marketplace package for assignments matching the
// conventional variable names used for SellerTransfer values (st, t,
// transfer, row, etc.) and fails if any live outside the whitelist.
//
// Convention over AST reflects a deliberate tradeoff: a proper
// type-aware check would need go/parser + type resolution (~hours of
// yak shave for a v1.0.7 deadline). The grep-based test catches the
// likely regressions — someone writing `st.Status = "..."` in a
// future commit — and fails loudly. Names like `payout.Status` or
// `order.Status` are excluded because those operate on different
// aggregate types (SellerPayout, Order), which have their own state
// machines (not yet matrix-ified; future scope).
//
// To add a new file that legitimately needs to mutate
// SellerTransfer.Status, route through TransitionStatus. If that's
// not possible (e.g. a fixture helper that plants a row in a
// specific state for a test), add the filename to `allowlistFiles`
// with a code-comment justification.
func TestNoDirectTransferStatusMutation(t *testing.T) {
// Files allowed to use `.Status = "..."` on a SellerTransfer
// variable. Rationale for each:
// - transfer_transitions.go: defines TransitionStatus itself.
// - reversal_worker_test.go / transfer_retry_test.go /
// process_webhook_test.go / refund_test.go: planting fixtures
// in specific starting states. Tests are allowed to mutate
// pre-persistence fixture rows because those are inputs to
// the system under test, not the system itself. The fixture
// helpers still use struct-literal Status: "..." whenever
// possible (no regex match on `: "`), but mutations after
// Create are needed for some setup paths.
allowlistFiles := map[string]bool{
"transfer_transitions.go": true,
"transfer_transitions_test.go": true,
}
// Variable names conventionally bound to SellerTransfer. If a
// file uses `foo.Status = "bar"` with one of these names for foo,
// it's very likely a SellerTransfer mutation and must route
// through TransitionStatus.
forbiddenAssignmentPattern := regexp.MustCompile(
`\b(st|t|transfer|sellerTransfer|row|updated|fresh|transferRow)\.Status\s*=\s*"`,
)
// GORM update forms on a SellerTransfer model that touch "status".
// Catches tx.Model(&SellerTransfer{}).Update("status", ...) and
// Updates(map... with "status" key) in the same file.
gormModelRe := regexp.MustCompile(`Model\(&SellerTransfer\{\}\)`)
gormUpdateStatusRe := regexp.MustCompile(
`Updates?\(\s*(map\[string\]interface\{\}\s*\{[^}]*|")status"`,
)
entries, err := os.ReadDir(".")
require.NoError(t, err)
var violations []string
for _, entry := range entries {
if entry.IsDir() {
continue
}
name := entry.Name()
if !strings.HasSuffix(name, ".go") {
continue
}
if allowlistFiles[name] {
continue
}
// Skip test files that legitimately plant fixtures — they use
// struct-literal `Status: "..."` which doesn't match the
// assignment regex, but may have `.Status = ` for post-create
// fixture adjustments. Test files are excluded by suffix
// (discipline relies on maintainers not writing production
// logic in _test.go, which the existing codebase respects).
if strings.HasSuffix(name, "_test.go") {
continue
}
content, rerr := os.ReadFile(filepath.Join(".", name))
require.NoError(t, rerr, "failed to read %s", name)
text := string(content)
for _, match := range forbiddenAssignmentPattern.FindAllStringSubmatch(text, -1) {
violations = append(violations, fmt.Sprintf("%s: direct assignment `%s` — route through TransitionStatus instead", name, match[0]))
}
if gormModelRe.MatchString(text) && gormUpdateStatusRe.MatchString(text) {
// Both patterns present in the file — likely a GORM
// status update. Report it.
violations = append(violations, fmt.Sprintf("%s: GORM Model(&SellerTransfer{}).Update*(\"status\"...) pattern — route through TransitionStatus instead", name))
}
}
if len(violations) > 0 {
t.Errorf("found %d direct SellerTransfer.Status mutation(s) outside the allowlist:\n %s",
len(violations), strings.Join(violations, "\n "))
}
}

View file

@ -123,10 +123,14 @@ func (h *AdminTransferHandler) RetryTransfer(c *gin.Context) {
stripeTransferID, err := h.ts.CreateTransfer(c.Request.Context(), t.SellerID, t.AmountCents, t.Currency, t.OrderID.String())
if err != nil {
t.RetryCount++
t.ErrorMessage = err.Error()
// Could set next_retry_at for worker to pick up later, but for manual retry we just record the failure
if saveErr := h.db.Save(&t).Error; saveErr != nil {
// Failure: stay in 'failed' (same-state) but bump retry_count and
// record the error. For manual admin retry we don't set
// next_retry_at — ops is driving this, not the worker.
failExtras := map[string]interface{}{
"retry_count": t.RetryCount + 1,
"error_message": err.Error(),
}
if saveErr := t.TransitionStatus(h.db.WithContext(c.Request.Context()), t.Status, failExtras); saveErr != nil {
h.logger.Error("RetryTransfer save failed", zap.Error(saveErr))
}
h.logger.Error("RetryTransfer CreateTransfer failed", zap.Error(err), zap.String("transfer_id", idStr))
@ -134,10 +138,11 @@ func (h *AdminTransferHandler) RetryTransfer(c *gin.Context) {
return
}
t.Status = "completed"
t.ErrorMessage = ""
t.StripeTransferID = stripeTransferID
if err := h.db.Save(&t).Error; err != nil {
extras := map[string]interface{}{
"error_message": "",
"stripe_transfer_id": stripeTransferID,
}
if err := t.TransitionStatus(h.db.WithContext(c.Request.Context()), marketplace.TransferStatusCompleted, extras); err != nil {
h.logger.Error("RetryTransfer save failed", zap.Error(err))
RespondWithAppError(c, apperrors.NewInternalErrorWrap("Failed to update transfer", err))
return

View file

@ -45,6 +45,10 @@ func (m *mockTransferServiceAdmin) CreateTransfer(_ context.Context, _ uuid.UUID
return id, nil
}
func (m *mockTransferServiceAdmin) ReverseTransfer(_ context.Context, _ string, _ *int64, _ string) (string, error) {
return "rev_mock", nil
}
func TestGetTransfers_ReturnsAll(t *testing.T) {
db := setupAdminTransferTestDB(t)
logger := zap.NewNop()

View file

@ -11,6 +11,7 @@ import (
"github.com/stripe/stripe-go/v82/accountlink"
"github.com/stripe/stripe-go/v82/balance"
"github.com/stripe/stripe-go/v82/transfer"
"github.com/stripe/stripe-go/v82/transferreversal"
"go.uber.org/zap"
"gorm.io/gorm"
@ -217,3 +218,45 @@ func (s *StripeConnectService) CreateTransfer(ctx context.Context, sellerUserID
}
return tr.ID, nil
}
// ReverseTransfer issues a reversal against a previously-created Stripe
// transfer (v1.0.7 item B). amount=nil reverses the full transfer;
// amount>0 reverses that portion (partial reversal, used for partial
// refunds in a future item — v1.0.7 always passes nil). Returns the
// Stripe reversal id on success.
//
// Error handling in this function is intentionally opaque in v1.0.7:
// every non-nil return is a raw wrapped Stripe error. Day 3 of item B
// will parse stripe.Error.Code to return the marketplace-package
// sentinels ErrTransferAlreadyReversed ("transfer was already reversed
// out-of-band") and ErrTransferNotFound ("stripe_transfer_id doesn't
// exist at Stripe — data-integrity incident"). The worker routes by
// sentinel, so this function is the single place disambiguation needs
// to happen; until day 3 lands the worker treats every error as a
// transient retry candidate.
func (s *StripeConnectService) ReverseTransfer(ctx context.Context, stripeTransferID string, amount *int64, reason string) (string, error) {
if s.secretKey == "" {
return "", ErrStripeConnectDisabled
}
if stripeTransferID == "" {
return "", fmt.Errorf("reverse stripe transfer: empty stripe_transfer_id")
}
stripe.Key = s.secretKey
params := &stripe.TransferReversalParams{
ID: stripe.String(stripeTransferID),
Amount: amount,
}
if reason != "" {
params.Description = stripe.String(reason)
params.AddMetadata("reason", reason)
}
rev, err := transferreversal.New(params)
if err != nil {
return "", fmt.Errorf("create stripe reversal: %w", err)
}
if rev.ID == "" {
return "", fmt.Errorf("create stripe reversal: provider returned empty reversal id")
}
return rev.ID, nil
}

View file

@ -65,6 +65,10 @@ func (m *mockTransferService) CreateTransfer(_ context.Context, sellerUserID uui
return "tr_mock", nil
}
func (m *mockTransferService) ReverseTransfer(_ context.Context, _ string, _ *int64, _ string) (string, error) {
return "rev_mock", nil
}
// testAuthMiddleware reads X-User-ID header and sets user_id in context (for integration tests)
type testAuthMiddleware struct{}