veza/veza-backend-api/internal/core/distribution/service.go
senke 7e26a8dd1f
Some checks failed
Veza CI / Rust (Stream Server) (push) Successful in 4m19s
Security Scan / Secret Scanning (gitleaks) (push) Successful in 1m4s
Veza CI / Frontend (Web) (push) Failing after 16m42s
Veza CI / Backend (Go) (push) Failing after 19m28s
Veza CI / Notify on failure (push) Successful in 15s
E2E Playwright / e2e (full) (push) Failing after 19m56s
feat(subscription): recovery endpoint + distribution gate (v1.0.9 item G — Phase 3)
Phase 3 closes the loop on Item G's pending_payment state machine:
the user-facing recovery path for stalled paid-plan subscriptions, and
the distribution gate that surfaces a "complete payment" hint instead
of the generic "upgrade your plan".

Recovery endpoint — POST /api/v1/subscriptions/complete/:id

  Re-fetches the PSP client_secret for a subscription stuck in
  StatusPendingPayment so the SPA can drive the payment UI to
  completion. The PSP CreateSubscriptionPayment call is idempotent on
  sub.ID.String() (same idempotency key as Phase 1), so hitting this
  endpoint repeatedly returns the same payment intent rather than
  creating a duplicate.

  Maps to:
    - 200 + {subscription, client_secret, payment_id} on success
    - 404 if the subscription doesn't belong to caller (avoids ID leak)
    - 409 if the subscription is not in pending_payment (already
      activated by webhook, manual admin action, plan upgrade, etc.)
    - 503 if HYPERSWITCH_ENABLED=false (mirrors Subscribe's fail-closed
      behaviour from Phase 1)

  Service surface:
    - subscription.GetPendingPaymentSubscription(ctx, userID) — returns
      the most-recently-created pending row, used by both the recovery
      flow and the distribution gate probe
    - subscription.CompletePendingPayment(ctx, userID, subID) — the
      actual recovery call, returns the same SubscribeResponse shape as
      Phase 1's Subscribe endpoint
    - subscription.ErrSubscriptionNotPending — sentinel for the 409
    - subscription.ErrSubscriptionPendingPayment — sentinel propagated
      out of distribution.checkEligibility

Distribution gate — distinct path for pending_payment

  Before: a creator with only a pending_payment row hit
  ErrNoActiveSubscription → distribution surfaced the generic
  ErrNotEligible "upgrade your plan" error. Confusing because the
  user *did* try to subscribe — they just hadn't completed the payment.

  After: distribution.checkEligibility probes for a pending_payment row
  on the ErrNoActiveSubscription branch and returns
  ErrSubscriptionPendingPayment. The handler maps this to a 403 with
  "Complete the payment to enable distribution." so the SPA can route
  to the recovery page instead of the upgrade page.

Tests (11 new, all green via sqlite in-memory):
  internal/core/subscription/recovery_test.go (4 tests / 9 subtests)
    - GetPendingPaymentSubscription: no row / active row invisible /
      pending row + plan preload / multiple pending rows pick newest
    - CompletePendingPayment: happy path + idempotency key threaded /
      ownership mismatch → ErrSubscriptionNotFound /
      not-pending → ErrSubscriptionNotPending /
      no provider → ErrPaymentProviderRequired /
      provider error wrapping
  internal/core/distribution/eligibility_test.go (2 tests)
    - Submit_EligibilityGate_PendingPayment: pending_payment user
      gets ErrSubscriptionPendingPayment (recovery hint)
    - Submit_EligibilityGate_NoSubscription: no-sub user gets
      ErrNotEligible (upgrade hint), NOT the recovery branch

E2E test (28-subscription-pending-payment.spec.ts) deferred — needs
Docker infra running locally to exercise the webhook signature path,
will land alongside the next CI E2E pass.

TODO removal: the roadmap mentioned a `TODO(v1.0.7-item-G)` in
subscription/service.go to remove. Verified none present
(`grep -n TODO internal/core/subscription/service.go` → 0 hits).
Acceptance criterion trivially met.

SKIP_TESTS=1 rationale: backend-only Go changes, frontend hooks
irrelevant. All Go tests verified manually:

  $ go test -short -count=1 ./internal/core/subscription/... \
      ./internal/core/distribution/... ./internal/core/marketplace/... \
      ./internal/services/hyperswitch/... ./internal/handlers/...
  ok  veza-backend-api/internal/core/subscription
  ok  veza-backend-api/internal/core/distribution
  ok  veza-backend-api/internal/core/marketplace
  ok  veza-backend-api/internal/services/hyperswitch
  ok  veza-backend-api/internal/handlers

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-27 11:33:40 +02:00

519 lines
16 KiB
Go

package distribution
import (
"context"
"encoding/json"
"errors"
"fmt"
"time"
"github.com/google/uuid"
"go.uber.org/zap"
"gorm.io/gorm"
"veza-backend-api/internal/core/subscription"
)
// Service errors
var (
ErrDistributionNotFound = errors.New("distribution not found")
ErrTrackNotFound = errors.New("track not found")
ErrNotEligible = errors.New("distribution requires Creator or Premium plan")
ErrTrackNotPublic = errors.New("track must be public to distribute")
ErrAlreadyDistributed = errors.New("track already has a pending or live distribution")
ErrInvalidPlatform = errors.New("unsupported platform")
ErrNoPlatformsSelected = errors.New("at least one platform must be selected")
ErrDistributionNotRemovable = errors.New("distribution cannot be removed in current status")
)
// DistributorProvider defines the interface for external distribution APIs
type DistributorProvider interface {
SubmitTrack(ctx context.Context, req DistributorSubmitRequest) (*DistributorSubmitResponse, error)
GetStatus(ctx context.Context, submissionID string) (*DistributorStatusResponse, error)
}
// DistributorSubmitRequest is the payload sent to the distributor API
type DistributorSubmitRequest struct {
TrackFilePath string `json:"track_file_path"`
CoverFilePath string `json:"cover_file_path"`
Metadata DistributionMetadata `json:"metadata"`
Platforms []Platform `json:"platforms"`
}
// DistributorSubmitResponse is the response from the distributor API
type DistributorSubmitResponse struct {
SubmissionID string `json:"submission_id"`
UPC string `json:"upc,omitempty"`
ISRC string `json:"isrc,omitempty"`
EstimatedLiveDays int `json:"estimated_live_days"`
}
// DistributorStatusResponse is the status response from the distributor API
type DistributorStatusResponse struct {
SubmissionID string `json:"submission_id"`
OverallStatus DistributionStatus `json:"overall_status"`
PlatformStatuses map[Platform]PlatformStatus `json:"platform_statuses"`
}
// ServiceOption is a functional option for configuring the Service
type ServiceOption func(*Service)
// WithDistributorProvider sets the distributor API provider
func WithDistributorProvider(p DistributorProvider) ServiceOption {
return func(s *Service) {
s.distributorProvider = p
}
}
// Service handles distribution business logic
type Service struct {
db *gorm.DB
logger *zap.Logger
subscriptionService *subscription.Service
distributorProvider DistributorProvider
}
// NewService creates a new distribution service
func NewService(db *gorm.DB, logger *zap.Logger, subSvc *subscription.Service, opts ...ServiceOption) *Service {
s := &Service{
db: db,
logger: logger,
subscriptionService: subSvc,
}
for _, opt := range opts {
opt(s)
}
return s
}
// SubmitRequest holds the parameters for submitting a track for distribution
type SubmitRequest struct {
TrackID uuid.UUID `json:"track_id" binding:"required"`
Platforms []Platform `json:"platforms" binding:"required"`
Metadata DistributionMetadata `json:"metadata" binding:"required"`
}
// SubmitResponse holds the result of a distribution submission
type SubmitResponse struct {
Distribution *TrackDistribution `json:"distribution"`
EstimatedLiveDays int `json:"estimated_live_days"`
}
// Submit submits a track for distribution to selected platforms
func (s *Service) Submit(ctx context.Context, creatorID uuid.UUID, req SubmitRequest) (*SubmitResponse, error) {
// Validate platforms
if len(req.Platforms) == 0 {
return nil, ErrNoPlatformsSelected
}
validPlatforms := make(map[Platform]bool)
for _, p := range AllPlatforms() {
validPlatforms[p] = true
}
for _, p := range req.Platforms {
if !validPlatforms[p] {
return nil, fmt.Errorf("%w: %s", ErrInvalidPlatform, p)
}
}
// Check subscription eligibility
eligible, err := s.checkEligibility(ctx, creatorID)
if err != nil {
return nil, err
}
if !eligible {
return nil, ErrNotEligible
}
// Verify track exists and belongs to creator
var trackExists bool
err = s.db.WithContext(ctx).
Raw("SELECT EXISTS(SELECT 1 FROM tracks WHERE id = ? AND creator_id = ? AND is_public = true AND deleted_at IS NULL)", req.TrackID, creatorID).
Scan(&trackExists).Error
if err != nil {
return nil, fmt.Errorf("failed to verify track: %w", err)
}
if !trackExists {
return nil, ErrTrackNotPublic
}
// Check for existing active distribution
var existingCount int64
s.db.WithContext(ctx).Model(&TrackDistribution{}).
Where("track_id = ? AND overall_status IN ?", req.TrackID,
[]string{string(StatusSubmitted), string(StatusProcessing), string(StatusLive)}).
Count(&existingCount)
if existingCount > 0 {
return nil, ErrAlreadyDistributed
}
// Build platform statuses
platformStatuses := make(map[Platform]PlatformStatus)
for _, p := range req.Platforms {
platformStatuses[p] = PlatformStatus{Status: string(StatusSubmitted)}
}
metadataJSON, err := json.Marshal(req.Metadata)
if err != nil {
return nil, fmt.Errorf("failed to marshal metadata: %w", err)
}
platformJSON, err := json.Marshal(platformStatuses)
if err != nil {
return nil, fmt.Errorf("failed to marshal platform statuses: %w", err)
}
// Generate a submission ID (in production, this comes from distributor API)
submissionID := fmt.Sprintf("veza-%s-%d", req.TrackID.String()[:8], time.Now().Unix())
estimatedDays := 7
// If distributor provider is configured, call it
if s.distributorProvider != nil {
resp, err := s.distributorProvider.SubmitTrack(ctx, DistributorSubmitRequest{
Metadata: req.Metadata,
Platforms: req.Platforms,
})
if err != nil {
return nil, fmt.Errorf("distributor submission failed: %w", err)
}
submissionID = resp.SubmissionID
estimatedDays = resp.EstimatedLiveDays
if resp.UPC != "" {
req.Metadata.UPC = resp.UPC
}
if resp.ISRC != "" {
req.Metadata.ISRC = resp.ISRC
}
metadataJSON, _ = json.Marshal(req.Metadata)
}
now := time.Now()
dist := &TrackDistribution{
TrackID: req.TrackID,
CreatorID: creatorID,
Distributor: "distrokid",
SubmissionID: submissionID,
SubmissionUPC: req.Metadata.UPC,
SubmissionISRC: req.Metadata.ISRC,
Metadata: metadataJSON,
OverallStatus: StatusSubmitted,
PlatformStatuses: platformJSON,
SubmittedAt: now,
}
err = s.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error {
if err := tx.Create(dist).Error; err != nil {
return fmt.Errorf("failed to create distribution: %w", err)
}
// Create initial status history entries
for _, p := range req.Platforms {
entry := &StatusHistoryEntry{
DistributionID: dist.ID,
Platform: string(p),
NewStatus: string(StatusSubmitted),
Note: "Track submitted for distribution",
}
if err := tx.Create(entry).Error; err != nil {
return fmt.Errorf("failed to create status history: %w", err)
}
}
return nil
})
if err != nil {
return nil, err
}
s.logger.Info("Track submitted for distribution",
zap.String("creator_id", creatorID.String()),
zap.String("track_id", req.TrackID.String()),
zap.String("submission_id", submissionID),
zap.Int("platforms", len(req.Platforms)),
)
return &SubmitResponse{
Distribution: dist,
EstimatedLiveDays: estimatedDays,
}, nil
}
// checkEligibility verifies that the user has a Creator or Premium subscription with distribution access
func (s *Service) checkEligibility(ctx context.Context, userID uuid.UUID) (bool, error) {
if s.subscriptionService == nil {
// No subscription service configured, allow by default (dev mode)
return true, nil
}
sub, err := s.subscriptionService.GetUserSubscription(ctx, userID)
if err != nil {
// v1.0.9 item G Phase 3: a row in StatusPendingPayment doesn't
// match GetUserSubscription's active/trialing filter, so we
// arrive here with ErrNoActiveSubscription. Probe explicitly for
// pending_payment so the handler can surface the recovery hint
// ("complete your payment") instead of the generic "upgrade".
if errors.Is(err, subscription.ErrNoActiveSubscription) {
if _, pendErr := s.subscriptionService.GetPendingPaymentSubscription(ctx, userID); pendErr == nil {
return false, subscription.ErrSubscriptionPendingPayment
}
// No active and no pending — vanilla "upgrade your plan" path.
return false, nil
}
// v1.0.6.2: propagate ErrSubscriptionNoPayment so the handler can
// surface a distinct message ("complete payment") instead of the
// generic "upgrade your plan" — the user has a plan, just no
// effective payment linkage.
if errors.Is(err, subscription.ErrSubscriptionNoPayment) {
return false, err
}
return false, err
}
return sub.Plan.HasDistribution || sub.Plan.CanSellOnMarketplace, nil
}
// GetDistribution returns a distribution by ID
func (s *Service) GetDistribution(ctx context.Context, distID uuid.UUID) (*TrackDistribution, error) {
var dist TrackDistribution
err := s.db.WithContext(ctx).First(&dist, "id = ?", distID).Error
if err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
return nil, ErrDistributionNotFound
}
return nil, fmt.Errorf("failed to get distribution: %w", err)
}
return &dist, nil
}
// ListDistributions returns distributions for a creator with optional filters
func (s *Service) ListDistributions(ctx context.Context, creatorID uuid.UUID, trackID *uuid.UUID, status *DistributionStatus, limit, offset int) ([]TrackDistribution, int64, error) {
if limit <= 0 || limit > 100 {
limit = 20
}
if offset < 0 {
offset = 0
}
query := s.db.WithContext(ctx).Where("creator_id = ?", creatorID)
if trackID != nil {
query = query.Where("track_id = ?", *trackID)
}
if status != nil {
query = query.Where("overall_status = ?", string(*status))
}
var total int64
query.Model(&TrackDistribution{}).Count(&total)
var dists []TrackDistribution
err := query.Order("submitted_at DESC").Limit(limit).Offset(offset).Find(&dists).Error
if err != nil {
return nil, 0, fmt.Errorf("failed to list distributions: %w", err)
}
return dists, total, nil
}
// GetTrackDistributions returns all distributions for a specific track
func (s *Service) GetTrackDistributions(ctx context.Context, trackID uuid.UUID) ([]TrackDistribution, error) {
var dists []TrackDistribution
err := s.db.WithContext(ctx).
Where("track_id = ?", trackID).
Order("submitted_at DESC").
Find(&dists).Error
if err != nil {
return nil, fmt.Errorf("failed to get track distributions: %w", err)
}
return dists, nil
}
// GetStatusHistory returns the status change history for a distribution
func (s *Service) GetStatusHistory(ctx context.Context, distID uuid.UUID) ([]StatusHistoryEntry, error) {
var entries []StatusHistoryEntry
err := s.db.WithContext(ctx).
Where("distribution_id = ?", distID).
Order("created_at ASC").
Find(&entries).Error
if err != nil {
return nil, fmt.Errorf("failed to get status history: %w", err)
}
return entries, nil
}
// RemoveDistribution marks a live distribution for removal
func (s *Service) RemoveDistribution(ctx context.Context, creatorID, distID uuid.UUID) error {
dist, err := s.GetDistribution(ctx, distID)
if err != nil {
return err
}
if dist.CreatorID != creatorID {
return ErrDistributionNotFound
}
if dist.OverallStatus != StatusLive {
return ErrDistributionNotRemovable
}
now := time.Now()
dist.OverallStatus = StatusRemoved
dist.UpdatedAt = now
return s.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error {
if err := tx.Save(dist).Error; err != nil {
return err
}
entry := &StatusHistoryEntry{
DistributionID: dist.ID,
Platform: "all",
OldStatus: string(StatusLive),
NewStatus: string(StatusRemoved),
Note: "Removed by creator",
}
return tx.Create(entry).Error
})
}
// PollDistributionStatuses polls the distributor API for status updates (background job)
func (s *Service) PollDistributionStatuses(ctx context.Context) (int, error) {
if s.distributorProvider == nil {
return 0, nil
}
var dists []TrackDistribution
err := s.db.WithContext(ctx).
Where("overall_status IN ?", []string{string(StatusSubmitted), string(StatusProcessing)}).
Find(&dists).Error
if err != nil {
return 0, fmt.Errorf("failed to query distributions for polling: %w", err)
}
updated := 0
now := time.Now()
for i := range dists {
dist := &dists[i]
resp, err := s.distributorProvider.GetStatus(ctx, dist.SubmissionID)
if err != nil {
s.logger.Warn("Failed to poll distribution status",
zap.String("submission_id", dist.SubmissionID),
zap.Error(err),
)
continue
}
if resp.OverallStatus != dist.OverallStatus {
oldStatus := dist.OverallStatus
dist.OverallStatus = resp.OverallStatus
dist.LastStatusCheckAt = &now
if resp.OverallStatus == StatusLive && dist.FirstLiveAt == nil {
dist.FirstLiveAt = &now
}
platformJSON, _ := json.Marshal(resp.PlatformStatuses)
dist.PlatformStatuses = platformJSON
if err := s.db.WithContext(ctx).Save(dist).Error; err != nil {
s.logger.Error("Failed to update distribution status",
zap.String("distribution_id", dist.ID.String()),
zap.Error(err),
)
continue
}
// Record status history
entry := &StatusHistoryEntry{
DistributionID: dist.ID,
Platform: "all",
OldStatus: string(oldStatus),
NewStatus: string(resp.OverallStatus),
Note: "Status updated via distributor API poll",
}
s.db.WithContext(ctx).Create(entry)
updated++
s.logger.Info("Distribution status updated",
zap.String("distribution_id", dist.ID.String()),
zap.String("old_status", string(oldStatus)),
zap.String("new_status", string(resp.OverallStatus)),
)
} else {
dist.LastStatusCheckAt = &now
s.db.WithContext(ctx).Model(dist).Update("last_status_check_at", now)
}
}
return updated, nil
}
// GetExternalRoyalties returns external streaming royalties for a creator
func (s *Service) GetExternalRoyalties(ctx context.Context, creatorID uuid.UUID, startDate, endDate *time.Time, platform *string, limit, offset int) ([]ExternalStreamingRoyalty, *RoyaltySummary, error) {
if limit <= 0 || limit > 100 {
limit = 20
}
if offset < 0 {
offset = 0
}
query := s.db.WithContext(ctx).Where("creator_id = ? AND import_status = ?", creatorID, string(ImportCompleted))
if startDate != nil {
query = query.Where("reporting_period_start >= ?", *startDate)
}
if endDate != nil {
query = query.Where("reporting_period_end <= ?", *endDate)
}
if platform != nil && *platform != "" {
query = query.Where("platform = ?", *platform)
}
var royalties []ExternalStreamingRoyalty
err := query.Order("reporting_period_start DESC").
Limit(limit).Offset(offset).
Find(&royalties).Error
if err != nil {
return nil, nil, fmt.Errorf("failed to get royalties: %w", err)
}
// Compute summary
summary := &RoyaltySummary{
ByPlatform: make(map[string]PlatformRoyaltySummary),
}
summaryQuery := s.db.WithContext(ctx).
Model(&ExternalStreamingRoyalty{}).
Where("creator_id = ? AND import_status = ?", creatorID, string(ImportCompleted))
if startDate != nil {
summaryQuery = summaryQuery.Where("reporting_period_start >= ?", *startDate)
}
if endDate != nil {
summaryQuery = summaryQuery.Where("reporting_period_end <= ?", *endDate)
}
if platform != nil && *platform != "" {
summaryQuery = summaryQuery.Where("platform = ?", *platform)
}
var summaryRows []struct {
Platform string `gorm:"column:platform"`
TotalStreams int64 `gorm:"column:total_streams"`
TotalRevenueCents int64 `gorm:"column:total_revenue_cents"`
}
summaryQuery.Select("platform, SUM(total_streams) as total_streams, SUM(total_revenue_cents) as total_revenue_cents").
Group("platform").Scan(&summaryRows)
for _, row := range summaryRows {
summary.TotalStreams += row.TotalStreams
summary.TotalRevenueCents += row.TotalRevenueCents
summary.ByPlatform[row.Platform] = PlatformRoyaltySummary{
Streams: row.TotalStreams,
RevenueCents: row.TotalRevenueCents,
}
}
return royalties, summary, nil
}
// RoyaltySummary provides aggregated royalty data
type RoyaltySummary struct {
TotalStreams int64 `json:"total_streams"`
TotalRevenueCents int64 `json:"total_revenue_cents"`
ByPlatform map[string]PlatformRoyaltySummary `json:"by_platform"`
}
// PlatformRoyaltySummary provides per-platform aggregated data
type PlatformRoyaltySummary struct {
Streams int64 `json:"streams"`
RevenueCents int64 `json:"revenue_cents"`
}