feat(marketplace): add TransferRetryWorker background goroutine
This commit is contained in:
parent
2a9e6084fc
commit
8272f4770a
1 changed files with 129 additions and 0 deletions
129
veza-backend-api/internal/core/marketplace/transfer_retry.go
Normal file
129
veza-backend-api/internal/core/marketplace/transfer_retry.go
Normal file
|
|
@ -0,0 +1,129 @@
|
|||
package marketplace
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"go.uber.org/zap"
|
||||
"gorm.io/gorm"
|
||||
|
||||
"veza-backend-api/internal/monitoring"
|
||||
)
|
||||
|
||||
// TransferRetryWorker retries failed Stripe Connect transfers with exponential backoff (v0.701).
|
||||
type TransferRetryWorker struct {
|
||||
db *gorm.DB
|
||||
transferService TransferService
|
||||
logger *zap.Logger
|
||||
interval time.Duration
|
||||
maxRetries int
|
||||
}
|
||||
|
||||
// NewTransferRetryWorker creates a new TransferRetryWorker.
|
||||
func NewTransferRetryWorker(db *gorm.DB, ts TransferService, logger *zap.Logger, interval time.Duration, maxRetries int) *TransferRetryWorker {
|
||||
return &TransferRetryWorker{
|
||||
db: db,
|
||||
transferService: ts,
|
||||
logger: logger,
|
||||
interval: interval,
|
||||
maxRetries: maxRetries,
|
||||
}
|
||||
}
|
||||
|
||||
// Start runs the worker loop until ctx is cancelled.
|
||||
func (w *TransferRetryWorker) Start(ctx context.Context) {
|
||||
ticker := time.NewTicker(w.interval)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
w.logger.Info("TransferRetryWorker stopped")
|
||||
return
|
||||
case <-ticker.C:
|
||||
w.processRetries(ctx)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// processRetries fetches failed transfers eligible for retry and attempts them.
|
||||
func (w *TransferRetryWorker) processRetries(ctx context.Context) {
|
||||
if w.transferService == nil {
|
||||
return
|
||||
}
|
||||
|
||||
now := time.Now()
|
||||
var transfers []SellerTransfer
|
||||
err := w.db.WithContext(ctx).
|
||||
Where("status = ? AND retry_count < ? AND (next_retry_at IS NULL OR next_retry_at <= ?)",
|
||||
"failed", w.maxRetries, now).
|
||||
Find(&transfers).Error
|
||||
if err != nil {
|
||||
w.logger.Error("TransferRetryWorker: failed to query transfers", zap.Error(err))
|
||||
return
|
||||
}
|
||||
|
||||
for _, t := range transfers {
|
||||
w.retryOne(ctx, &t)
|
||||
}
|
||||
}
|
||||
|
||||
func (w *TransferRetryWorker) retryOne(ctx context.Context, t *SellerTransfer) {
|
||||
// Re-fetch to avoid race with another process
|
||||
var fresh SellerTransfer
|
||||
if err := w.db.WithContext(ctx).First(&fresh, t.ID).Error; err != nil {
|
||||
return
|
||||
}
|
||||
if fresh.Status != "failed" {
|
||||
return
|
||||
}
|
||||
|
||||
monitoring.RecordTransferRetry()
|
||||
|
||||
orderID := t.OrderID.String()
|
||||
err := w.transferService.CreateTransfer(ctx, t.SellerID, t.AmountCents, t.Currency, orderID)
|
||||
if err != nil {
|
||||
t.RetryCount++
|
||||
t.ErrorMessage = err.Error()
|
||||
|
||||
// Exponential backoff: interval * 2^retry_count
|
||||
delay := w.interval
|
||||
for i := 0; i < t.RetryCount; i++ {
|
||||
delay *= 2
|
||||
}
|
||||
nextRetry := time.Now().Add(delay)
|
||||
t.NextRetryAt = &nextRetry
|
||||
|
||||
if t.RetryCount >= w.maxRetries {
|
||||
t.Status = "permanently_failed"
|
||||
t.NextRetryAt = nil
|
||||
monitoring.RecordTransferRetryPermanent()
|
||||
} else {
|
||||
monitoring.RecordTransferRetryFailure()
|
||||
}
|
||||
|
||||
if saveErr := w.db.Save(t).Error; saveErr != nil {
|
||||
w.logger.Error("TransferRetryWorker: failed to save transfer", zap.Error(saveErr), zap.String("transfer_id", t.ID.String()))
|
||||
}
|
||||
w.logger.Warn("TransferRetryWorker: retry failed",
|
||||
zap.String("transfer_id", t.ID.String()),
|
||||
zap.String("seller_id", t.SellerID.String()),
|
||||
zap.Int("retry_count", t.RetryCount),
|
||||
zap.Error(err))
|
||||
return
|
||||
}
|
||||
|
||||
t.Status = "completed"
|
||||
t.NextRetryAt = nil
|
||||
t.ErrorMessage = ""
|
||||
if err := w.db.Save(t).Error; err != nil {
|
||||
w.logger.Error("TransferRetryWorker: failed to save transfer", zap.Error(err), zap.String("transfer_id", t.ID.String()))
|
||||
return
|
||||
}
|
||||
|
||||
monitoring.RecordTransferRetrySuccess()
|
||||
w.logger.Info("TransferRetryWorker: retry succeeded",
|
||||
zap.String("transfer_id", t.ID.String()),
|
||||
zap.String("seller_id", t.SellerID.String()),
|
||||
zap.Int64("amount_cents", t.AmountCents))
|
||||
}
|
||||
Loading…
Reference in a new issue