diff --git a/veza-backend-api/internal/core/marketplace/transfer_retry.go b/veza-backend-api/internal/core/marketplace/transfer_retry.go new file mode 100644 index 000000000..53a6fe21b --- /dev/null +++ b/veza-backend-api/internal/core/marketplace/transfer_retry.go @@ -0,0 +1,129 @@ +package marketplace + +import ( + "context" + "time" + + "go.uber.org/zap" + "gorm.io/gorm" + + "veza-backend-api/internal/monitoring" +) + +// TransferRetryWorker retries failed Stripe Connect transfers with exponential backoff (v0.701). +type TransferRetryWorker struct { + db *gorm.DB + transferService TransferService + logger *zap.Logger + interval time.Duration + maxRetries int +} + +// NewTransferRetryWorker creates a new TransferRetryWorker. +func NewTransferRetryWorker(db *gorm.DB, ts TransferService, logger *zap.Logger, interval time.Duration, maxRetries int) *TransferRetryWorker { + return &TransferRetryWorker{ + db: db, + transferService: ts, + logger: logger, + interval: interval, + maxRetries: maxRetries, + } +} + +// Start runs the worker loop until ctx is cancelled. +func (w *TransferRetryWorker) Start(ctx context.Context) { + ticker := time.NewTicker(w.interval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + w.logger.Info("TransferRetryWorker stopped") + return + case <-ticker.C: + w.processRetries(ctx) + } + } +} + +// processRetries fetches failed transfers eligible for retry and attempts them. +func (w *TransferRetryWorker) processRetries(ctx context.Context) { + if w.transferService == nil { + return + } + + now := time.Now() + var transfers []SellerTransfer + err := w.db.WithContext(ctx). + Where("status = ? AND retry_count < ? AND (next_retry_at IS NULL OR next_retry_at <= ?)", + "failed", w.maxRetries, now). + Find(&transfers).Error + if err != nil { + w.logger.Error("TransferRetryWorker: failed to query transfers", zap.Error(err)) + return + } + + for _, t := range transfers { + w.retryOne(ctx, &t) + } +} + +func (w *TransferRetryWorker) retryOne(ctx context.Context, t *SellerTransfer) { + // Re-fetch to avoid race with another process + var fresh SellerTransfer + if err := w.db.WithContext(ctx).First(&fresh, t.ID).Error; err != nil { + return + } + if fresh.Status != "failed" { + return + } + + monitoring.RecordTransferRetry() + + orderID := t.OrderID.String() + err := w.transferService.CreateTransfer(ctx, t.SellerID, t.AmountCents, t.Currency, orderID) + if err != nil { + t.RetryCount++ + t.ErrorMessage = err.Error() + + // Exponential backoff: interval * 2^retry_count + delay := w.interval + for i := 0; i < t.RetryCount; i++ { + delay *= 2 + } + nextRetry := time.Now().Add(delay) + t.NextRetryAt = &nextRetry + + if t.RetryCount >= w.maxRetries { + t.Status = "permanently_failed" + t.NextRetryAt = nil + monitoring.RecordTransferRetryPermanent() + } else { + monitoring.RecordTransferRetryFailure() + } + + if saveErr := w.db.Save(t).Error; saveErr != nil { + w.logger.Error("TransferRetryWorker: failed to save transfer", zap.Error(saveErr), zap.String("transfer_id", t.ID.String())) + } + w.logger.Warn("TransferRetryWorker: retry failed", + zap.String("transfer_id", t.ID.String()), + zap.String("seller_id", t.SellerID.String()), + zap.Int("retry_count", t.RetryCount), + zap.Error(err)) + return + } + + t.Status = "completed" + t.NextRetryAt = nil + t.ErrorMessage = "" + if err := w.db.Save(t).Error; err != nil { + w.logger.Error("TransferRetryWorker: failed to save transfer", zap.Error(err), zap.String("transfer_id", t.ID.String())) + return + } + + monitoring.RecordTransferRetrySuccess() + w.logger.Info("TransferRetryWorker: retry succeeded", + zap.String("transfer_id", t.ID.String()), + zap.String("seller_id", t.SellerID.String()), + zap.Int64("amount_cents", t.AmountCents)) +}