refactor(connect): persist stripe_transfer_id on create + retry — v1.0.7 item A

TransferService.CreateTransfer signature changes from (...) error to
(...) (string, error) — the caller now captures the Stripe transfer
identifier and persists it on the SellerTransfer row. Pre-v1.0.7 the
stripe_transfer_id column was declared on the model and table but
never written to, which blocked the reversal worker (v1.0.7 item B)
from identifying which transfer to reverse on refund.

Changes:
  * `TransferService` interface and `StripeConnectService.CreateTransfer`
    both return the Stripe transfer id alongside the error.
  * `processSellerTransfers` (marketplace service) persists the id on
    success before `tx.Create(&st)` so a crash between Stripe ACK and
    DB commit leaves no inconsistency.
  * `TransferRetryWorker.retryOne` persists on retry success — a row
    that failed on first attempt and succeeded via the worker is
    reversal-ready all the same.
  * `admin_transfer_handler.RetryTransfer` (manual retry) persists too.
  * `SellerPayout.ExternalPayoutID` is populated by the Connect payout
    flow (`payout.go`) — the field existed but was never written.
  * Four test mocks updated; two tests assert the id is persisted on
    the happy path, one on the failure path confirms we don't write a
    fake id when the provider errors.

Migration `981_seller_transfers_stripe_reversal_id.sql`:
  * Adds nullable `stripe_reversal_id` column for item B.
  * Partial UNIQUE indexes on both stripe_transfer_id and
    stripe_reversal_id (WHERE IS NOT NULL AND <> ''), mirroring the
    v1.0.6.1 pattern for refunds.hyperswitch_refund_id.
  * Logs a count of historical completed transfers that lack an id —
    these are candidates for the backfill CLI follow-up task.

Backfill for historical rows is a separate follow-up (cmd/tools/
backfill_stripe_transfer_ids, calling Stripe's transfers.List with
Destination + Metadata[order_id]). Pre-v1.0.7 transfers without a
backfilled id cannot be auto-reversed on refund — document in P2.9
admin-recovery when it lands. Acceptable scope per v107-plan.

Migration number bumped 980 → 981 because v1.0.6.2 used 980 for the
unpaid-subscription cleanup; v107-plan updated with the note.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
senke 2026-04-17 13:08:39 +02:00
parent 149f76ccc7
commit eedaad9f83
13 changed files with 177 additions and 30 deletions

View file

@ -109,8 +109,11 @@ CHANGELOG scope sentence so anyone reading later knows the envelope.
Effort: **S**. Touches the `TransferService` interface (minor breaking
change — but only internal callers). Migration:
`980_seller_transfers_stripe_reversal_id.sql` adds `stripe_reversal_id`
nullable column (prepares ground for B).
`981_seller_transfers_stripe_reversal_id.sql` adds `stripe_reversal_id`
nullable column (prepares ground for B). Note — bumped from 980 to
981 because v1.0.6.2 used 980 for the unpaid-subscription cleanup;
all subsequent v1.0.7 migration numbers in this plan shift by +1 when
they land.
Acceptance:
- `TransferService.CreateTransfer(...) (string, error)` — returns the

View file

@ -235,6 +235,11 @@ type SellerTransfer struct {
SellerID uuid.UUID `gorm:"type:uuid;not null" json:"seller_id"`
OrderID uuid.UUID `gorm:"type:uuid;not null" json:"order_id"`
StripeTransferID string `gorm:"size:255" json:"stripe_transfer_id,omitempty"`
// StripeReversalID is populated by the v1.0.7 item B reversal worker
// when a refund triggers a reverse-charge against the transfer.
// Empty for transfers that never got reversed. Unique when non-empty
// (partial UNIQUE index in migration 981).
StripeReversalID string `gorm:"size:255" json:"stripe_reversal_id,omitempty"`
AmountCents int64 `gorm:"not null" json:"amount_cents"`
PlatformFeeCents int64 `gorm:"not null" json:"platform_fee_cents"`
CommissionRate float64 `gorm:"column:commission_rate;type:numeric(5,4);default:0.10" json:"commission_rate"` // v0.12.0

View file

@ -295,7 +295,7 @@ func (s *Service) processOnePayout(ctx context.Context, bal *SellerBalance) erro
}
// Attempt transfer
err := s.transferService.CreateTransfer(ctx, lockedBal.SellerID, amount, lockedBal.Currency, payout.ID.String())
stripeTransferID, err := s.transferService.CreateTransfer(ctx, lockedBal.SellerID, amount, lockedBal.Currency, payout.ID.String())
now := time.Now()
if err != nil {
payout.Status = "failed"
@ -310,6 +310,7 @@ func (s *Service) processOnePayout(ctx context.Context, bal *SellerBalance) erro
} else {
payout.Status = "completed"
payout.ProcessedAt = &now
payout.ExternalPayoutID = stripeTransferID
// Move from pending to paid out
tx.WithContext(ctx).Model(&SellerBalance{}).
Where("id = ?", lockedBal.ID).

View file

@ -173,11 +173,19 @@ type mockTransferCall struct {
type mockTransferService struct {
calls []mockTransferCall
err error
stripeTransferID string
}
func (m *mockTransferService) CreateTransfer(_ context.Context, sellerUserID uuid.UUID, amount int64, currency, orderID string) error {
func (m *mockTransferService) CreateTransfer(_ context.Context, sellerUserID uuid.UUID, amount int64, currency, orderID string) (string, error) {
m.calls = append(m.calls, mockTransferCall{sellerUserID, amount, currency, orderID})
return m.err
if m.err != nil {
return "", m.err
}
id := m.stripeTransferID
if id == "" {
id = "tr_mock"
}
return id, nil
}
func TestProcessWebhook_TransferSuccess(t *testing.T) {
@ -235,11 +243,55 @@ func TestProcessWebhook_TransferSuccess(t *testing.T) {
assert.Equal(t, "completed", transfers[0].Status)
assert.Equal(t, int64(900), transfers[0].AmountCents) // 10% fee: 1000 - 100 = 900
assert.Equal(t, int64(100), transfers[0].PlatformFeeCents)
// v1.0.7 item A: the Stripe transfer id returned by the provider is
// persisted on the row. Required by item B (reversal worker) to
// identify which transfer to reverse on refund.
assert.Equal(t, "tr_mock", transfers[0].StripeTransferID)
assert.Len(t, mock.calls, 1)
assert.Equal(t, sellerID, mock.calls[0].SellerID)
assert.Equal(t, int64(900), mock.calls[0].Amount)
}
// TestProcessWebhook_TransferFailure_LeavesStripeTransferIDEmpty documents
// the inverse: when the provider errors, we don't persist a fake ID.
func TestProcessWebhook_TransferFailure_LeavesStripeTransferIDEmpty(t *testing.T) {
db := setupWebhookTestDB(t)
logger := zap.NewNop()
mock := &mockTransferService{err: errors.New("stripe down")}
svc := NewService(db, logger, nil, WithTransferService(mock, 0.10))
buyerID := uuid.New()
sellerID := uuid.New()
trackID := uuid.New()
require.NoError(t, db.Create(&models.User{ID: buyerID}).Error)
require.NoError(t, db.Create(&models.User{ID: sellerID}).Error)
require.NoError(t, db.Create(&models.Track{ID: trackID, UserID: sellerID, FilePath: "/test.mp3"}).Error)
product := &Product{ID: uuid.New(), SellerID: sellerID, Title: "Test", Price: 10.00, ProductType: "track", TrackID: &trackID, Status: ProductStatusActive}
require.NoError(t, db.Create(product).Error)
order := &Order{
ID: uuid.New(),
BuyerID: buyerID,
TotalAmount: 10.00,
Currency: "EUR",
Status: "pending",
HyperswitchPaymentID: "pay_transfer_fail_id",
}
require.NoError(t, db.Create(order).Error)
require.NoError(t, db.Create(&OrderItem{ID: uuid.New(), OrderID: order.ID, ProductID: product.ID, Price: 10.00}).Error)
payload, _ := json.Marshal(map[string]string{"payment_id": "pay_transfer_fail_id", "status": "succeeded"})
err := svc.ProcessPaymentWebhook(context.Background(), payload)
require.NoError(t, err)
var transfer SellerTransfer
require.NoError(t, db.Where("order_id = ?", order.ID).First(&transfer).Error)
assert.Equal(t, "failed", transfer.Status)
assert.Empty(t, transfer.StripeTransferID)
}
func TestProcessWebhook_MultiSeller(t *testing.T) {
db := setupWebhookTestDB(t)
logger := zap.NewNop()

View file

@ -139,9 +139,16 @@ type SellerSale struct {
Date string `json:"date"`
}
// TransferService abstracts the payout transfer provider (v0.603)
// TransferService abstracts the payout transfer provider (v0.603, v1.0.7 item A)
// CreateTransfer returns the PSP's own transfer identifier (e.g. Stripe tr_*)
// alongside the error. The caller persists it on the SellerTransfer row so
// that downstream operations — reversal on refund (item B), reconciliation
// against the PSP dashboard, admin audit — can join by a stable identifier
// rather than re-querying the PSP's list API. Pre-v1.0.7 the return was
// `error` only and the stripe_transfer_id column sat empty, blocking the
// reversal worker from identifying which transfer to reverse.
type TransferService interface {
CreateTransfer(ctx context.Context, sellerUserID uuid.UUID, amount int64, currency, orderID string) error
CreateTransfer(ctx context.Context, sellerUserID uuid.UUID, amount int64, currency, orderID string) (string, error)
}
// Service implémente MarketplaceService
@ -801,7 +808,7 @@ func (s *Service) processSellerTransfers(ctx context.Context, tx *gorm.DB, order
zap.Error(err))
}
err := s.transferService.CreateTransfer(ctx, sellerID, netCents, order.Currency, order.ID.String())
stripeTransferID, err := s.transferService.CreateTransfer(ctx, sellerID, netCents, order.Currency, order.ID.String())
if err != nil {
st.Status = "failed"
st.ErrorMessage = err.Error()
@ -811,9 +818,11 @@ func (s *Service) processSellerTransfers(ctx context.Context, tx *gorm.DB, order
zap.Error(err))
} else {
st.Status = "completed"
st.StripeTransferID = stripeTransferID
s.logger.Info("Transfer completed for seller",
zap.String("seller_id", sellerID.String()),
zap.Int64("amount_cents", netCents),
zap.String("stripe_transfer_id", stripeTransferID),
zap.Float64("commission_rate", feeRate))
}

View file

@ -81,7 +81,7 @@ func (w *TransferRetryWorker) retryOne(ctx context.Context, t *SellerTransfer) {
monitoring.RecordTransferRetry()
orderID := t.OrderID.String()
err := w.transferService.CreateTransfer(ctx, t.SellerID, t.AmountCents, t.Currency, orderID)
stripeTransferID, err := w.transferService.CreateTransfer(ctx, t.SellerID, t.AmountCents, t.Currency, orderID)
if err != nil {
t.RetryCount++
t.ErrorMessage = err.Error()
@ -116,6 +116,7 @@ func (w *TransferRetryWorker) retryOne(ctx context.Context, t *SellerTransfer) {
t.Status = "completed"
t.NextRetryAt = nil
t.ErrorMessage = ""
t.StripeTransferID = stripeTransferID
if err := w.db.Save(t).Error; err != nil {
w.logger.Error("TransferRetryWorker: failed to save transfer", zap.Error(err), zap.String("transfer_id", t.ID.String()))
return
@ -125,5 +126,6 @@ func (w *TransferRetryWorker) retryOne(ctx context.Context, t *SellerTransfer) {
w.logger.Info("TransferRetryWorker: retry succeeded",
zap.String("transfer_id", t.ID.String()),
zap.String("seller_id", t.SellerID.String()),
zap.String("stripe_transfer_id", stripeTransferID),
zap.Int64("amount_cents", t.AmountCents))
}

View file

@ -39,22 +39,30 @@ type mockTransferServiceRetry struct {
OrderID string
}
err error
stripeTransferID string // returned on success; empty if unset defaults to "tr_mock"
}
func (m *mockTransferServiceRetry) CreateTransfer(_ context.Context, sellerUserID uuid.UUID, amount int64, currency, orderID string) error {
func (m *mockTransferServiceRetry) CreateTransfer(_ context.Context, sellerUserID uuid.UUID, amount int64, currency, orderID string) (string, error) {
m.calls = append(m.calls, struct {
SellerID uuid.UUID
Amount int64
Currency string
OrderID string
}{sellerUserID, amount, currency, orderID})
return m.err
if m.err != nil {
return "", m.err
}
id := m.stripeTransferID
if id == "" {
id = "tr_mock"
}
return id, nil
}
func TestRetryWorker_RetriesFailedTransfer(t *testing.T) {
db := setupRetryTestDB(t)
logger := zap.NewNop()
mock := &mockTransferServiceRetry{}
mock := &mockTransferServiceRetry{stripeTransferID: "tr_retry_ok"}
worker := NewTransferRetryWorker(db, mock, logger, 5*time.Minute, 3)
sellerID := uuid.New()
@ -79,6 +87,10 @@ func TestRetryWorker_RetriesFailedTransfer(t *testing.T) {
assert.Equal(t, "completed", updated.Status)
assert.Equal(t, 0, updated.RetryCount)
assert.Nil(t, updated.NextRetryAt)
// v1.0.7 item A: retry success also persists the Stripe transfer id,
// so the reversal worker (item B) can reverse a transfer regardless of
// whether it landed on the first attempt or via retry.
assert.Equal(t, "tr_retry_ok", updated.StripeTransferID)
assert.Len(t, mock.calls, 1)
assert.Equal(t, sellerID, mock.calls[0].SellerID)
assert.Equal(t, int64(900), mock.calls[0].Amount)

View file

@ -121,7 +121,7 @@ func (h *AdminTransferHandler) RetryTransfer(c *gin.Context) {
return
}
err = h.ts.CreateTransfer(c.Request.Context(), t.SellerID, t.AmountCents, t.Currency, t.OrderID.String())
stripeTransferID, err := h.ts.CreateTransfer(c.Request.Context(), t.SellerID, t.AmountCents, t.Currency, t.OrderID.String())
if err != nil {
t.RetryCount++
t.ErrorMessage = err.Error()
@ -136,6 +136,7 @@ func (h *AdminTransferHandler) RetryTransfer(c *gin.Context) {
t.Status = "completed"
t.ErrorMessage = ""
t.StripeTransferID = stripeTransferID
if err := h.db.Save(&t).Error; err != nil {
h.logger.Error("RetryTransfer save failed", zap.Error(err))
RespondWithAppError(c, apperrors.NewInternalErrorWrap("Failed to update transfer", err))

View file

@ -31,10 +31,18 @@ func setupAdminTransferTestDB(t *testing.T) *gorm.DB {
type mockTransferServiceAdmin struct {
err error
stripeTransferID string
}
func (m *mockTransferServiceAdmin) CreateTransfer(_ context.Context, _ uuid.UUID, _ int64, _, _ string) error {
return m.err
func (m *mockTransferServiceAdmin) CreateTransfer(_ context.Context, _ uuid.UUID, _ int64, _, _ string) (string, error) {
if m.err != nil {
return "", m.err
}
id := m.stripeTransferID
if id == "" {
id = "tr_mock"
}
return id, nil
}
func TestGetTransfers_ReturnsAll(t *testing.T) {

View file

@ -175,21 +175,24 @@ func (s *StripeConnectService) GetBalance(ctx context.Context, userID uuid.UUID)
return &BalanceResponse{Connected: true, Available: available, Pending: pending}, nil
}
// CreateTransfer transfers funds to a connected account (for payout on sale)
func (s *StripeConnectService) CreateTransfer(ctx context.Context, sellerUserID uuid.UUID, amount int64, currency, orderID string) error {
// CreateTransfer transfers funds to a connected account (for payout on sale).
// Returns the Stripe transfer identifier (tr_*) on success so the caller can
// persist it on the SellerTransfer row — required by item B (reversal worker)
// and by reconciliation against the Stripe dashboard.
func (s *StripeConnectService) CreateTransfer(ctx context.Context, sellerUserID uuid.UUID, amount int64, currency, orderID string) (string, error) {
var sa models.SellerStripeAccount
if err := s.db.WithContext(ctx).Where("user_id = ?", sellerUserID).First(&sa).Error; err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
return ErrNoStripeAccount
return "", ErrNoStripeAccount
}
return err
return "", err
}
if !sa.PayoutsEnabled {
return fmt.Errorf("seller account does not have payouts enabled")
return "", fmt.Errorf("seller account does not have payouts enabled")
}
if s.secretKey == "" {
return ErrStripeConnectDisabled
return "", ErrStripeConnectDisabled
}
stripe.Key = s.secretKey
@ -201,9 +204,9 @@ func (s *StripeConnectService) CreateTransfer(ctx context.Context, sellerUserID
if orderID != "" {
params.AddMetadata("order_id", orderID)
}
_, err := transfer.New(params)
tr, err := transfer.New(params)
if err != nil {
return fmt.Errorf("create stripe transfer: %w", err)
return "", fmt.Errorf("create stripe transfer: %w", err)
}
return nil
return tr.ID, nil
}

View file

@ -0,0 +1,47 @@
-- v1.0.7 item A: add stripe_reversal_id column to seller_transfers.
-- Prepares ground for item B (async reversal worker): when a refund-driven
-- reversal succeeds, the worker persists the Stripe reversal id here so
-- the operation is idempotent (a replayed webhook skips the reversal call
-- if this column is populated) and auditable against the Stripe dashboard.
--
-- This migration ships with item A because item B's worker is the next
-- commit and we want the column in place before the code that writes it.
-- Nullable — pre-v1.0.7 transfers will never have a reversal id.
--
-- Companion column stripe_transfer_id already exists (pre-v1.0.7), but was
-- never written to until item A: the TransferService.CreateTransfer
-- signature changed to return the Stripe transfer id, which is now
-- persisted by processSellerTransfers, TransferRetryWorker, and
-- admin_transfer_handler.
ALTER TABLE seller_transfers
ADD COLUMN IF NOT EXISTS stripe_reversal_id VARCHAR(255);
-- Partial UNIQUE index so a given Stripe reversal id cannot collide across
-- rows, while still allowing many NULL/empty rows (the common case: only
-- refunded transfers carry a reversal id). Mirrors the pattern landed in
-- v1.0.6.1 for refunds.hyperswitch_refund_id.
CREATE UNIQUE INDEX IF NOT EXISTS idx_seller_transfers_stripe_reversal_id
ON seller_transfers(stripe_reversal_id)
WHERE stripe_reversal_id IS NOT NULL AND stripe_reversal_id <> '';
-- Same pattern for stripe_transfer_id — previously declared without an
-- index, now populated by item A so worth indexing for reconciliation
-- lookups. Partial because pre-v1.0.7 rows carry empty values.
CREATE UNIQUE INDEX IF NOT EXISTS idx_seller_transfers_stripe_transfer_id
ON seller_transfers(stripe_transfer_id)
WHERE stripe_transfer_id IS NOT NULL AND stripe_transfer_id <> '';
-- Visibility: how many historical rows lack a stripe_transfer_id? These
-- are the rows that the backfill CLI (cmd/tools/backfill_stripe_transfer_ids)
-- will target. Acceptable to leave NULL where Stripe has no match — see
-- axis-1 P2.9 for the admin-triggered recovery path.
DO $$
DECLARE v_count INTEGER;
BEGIN
SELECT COUNT(*) INTO v_count
FROM seller_transfers
WHERE status = 'completed'
AND (stripe_transfer_id IS NULL OR stripe_transfer_id = '');
RAISE NOTICE 'v1.0.7 item A: % completed seller_transfer(s) have no stripe_transfer_id and need backfill (see cmd/tools/backfill_stripe_transfer_ids)', v_count;
END $$;

View file

@ -0,0 +1,4 @@
-- Rollback v1.0.7 item A column additions.
DROP INDEX IF EXISTS idx_seller_transfers_stripe_transfer_id;
DROP INDEX IF EXISTS idx_seller_transfers_stripe_reversal_id;
ALTER TABLE seller_transfers DROP COLUMN IF EXISTS stripe_reversal_id;

View file

@ -55,14 +55,14 @@ type mockTransferService struct {
}
}
func (m *mockTransferService) CreateTransfer(_ context.Context, sellerUserID uuid.UUID, amount int64, currency, orderID string) error {
func (m *mockTransferService) CreateTransfer(_ context.Context, sellerUserID uuid.UUID, amount int64, currency, orderID string) (string, error) {
m.calls = append(m.calls, struct {
SellerID uuid.UUID
Amount int64
Currency string
OrderID string
}{sellerUserID, amount, currency, orderID})
return nil
return "tr_mock", nil
}
// testAuthMiddleware reads X-User-ID header and sets user_id in context (for integration tests)