veza/veza-backend-api/internal/core/distribution/service.go

504 lines
16 KiB
Go
Raw Normal View History

package distribution
import (
"context"
"encoding/json"
"errors"
"fmt"
"time"
"github.com/google/uuid"
"go.uber.org/zap"
"gorm.io/gorm"
"veza-backend-api/internal/core/subscription"
)
// Service errors
var (
ErrDistributionNotFound = errors.New("distribution not found")
ErrTrackNotFound = errors.New("track not found")
ErrNotEligible = errors.New("distribution requires Creator or Premium plan")
ErrTrackNotPublic = errors.New("track must be public to distribute")
ErrAlreadyDistributed = errors.New("track already has a pending or live distribution")
ErrInvalidPlatform = errors.New("unsupported platform")
ErrNoPlatformsSelected = errors.New("at least one platform must be selected")
ErrDistributionNotRemovable = errors.New("distribution cannot be removed in current status")
)
// DistributorProvider defines the interface for external distribution APIs
type DistributorProvider interface {
SubmitTrack(ctx context.Context, req DistributorSubmitRequest) (*DistributorSubmitResponse, error)
GetStatus(ctx context.Context, submissionID string) (*DistributorStatusResponse, error)
}
// DistributorSubmitRequest is the payload sent to the distributor API
type DistributorSubmitRequest struct {
TrackFilePath string `json:"track_file_path"`
CoverFilePath string `json:"cover_file_path"`
Metadata DistributionMetadata `json:"metadata"`
Platforms []Platform `json:"platforms"`
}
// DistributorSubmitResponse is the response from the distributor API
type DistributorSubmitResponse struct {
SubmissionID string `json:"submission_id"`
UPC string `json:"upc,omitempty"`
ISRC string `json:"isrc,omitempty"`
EstimatedLiveDays int `json:"estimated_live_days"`
}
// DistributorStatusResponse is the status response from the distributor API
type DistributorStatusResponse struct {
SubmissionID string `json:"submission_id"`
OverallStatus DistributionStatus `json:"overall_status"`
PlatformStatuses map[Platform]PlatformStatus `json:"platform_statuses"`
}
// ServiceOption is a functional option for configuring the Service
type ServiceOption func(*Service)
// WithDistributorProvider sets the distributor API provider
func WithDistributorProvider(p DistributorProvider) ServiceOption {
return func(s *Service) {
s.distributorProvider = p
}
}
// Service handles distribution business logic
type Service struct {
db *gorm.DB
logger *zap.Logger
subscriptionService *subscription.Service
distributorProvider DistributorProvider
}
// NewService creates a new distribution service
func NewService(db *gorm.DB, logger *zap.Logger, subSvc *subscription.Service, opts ...ServiceOption) *Service {
s := &Service{
db: db,
logger: logger,
subscriptionService: subSvc,
}
for _, opt := range opts {
opt(s)
}
return s
}
// SubmitRequest holds the parameters for submitting a track for distribution
type SubmitRequest struct {
TrackID uuid.UUID `json:"track_id" binding:"required"`
Platforms []Platform `json:"platforms" binding:"required"`
Metadata DistributionMetadata `json:"metadata" binding:"required"`
}
// SubmitResponse holds the result of a distribution submission
type SubmitResponse struct {
Distribution *TrackDistribution `json:"distribution"`
EstimatedLiveDays int `json:"estimated_live_days"`
}
// Submit submits a track for distribution to selected platforms
func (s *Service) Submit(ctx context.Context, creatorID uuid.UUID, req SubmitRequest) (*SubmitResponse, error) {
// Validate platforms
if len(req.Platforms) == 0 {
return nil, ErrNoPlatformsSelected
}
validPlatforms := make(map[Platform]bool)
for _, p := range AllPlatforms() {
validPlatforms[p] = true
}
for _, p := range req.Platforms {
if !validPlatforms[p] {
return nil, fmt.Errorf("%w: %s", ErrInvalidPlatform, p)
}
}
// Check subscription eligibility
eligible, err := s.checkEligibility(ctx, creatorID)
if err != nil {
return nil, err
}
if !eligible {
return nil, ErrNotEligible
}
// Verify track exists and belongs to creator
var trackExists bool
err = s.db.WithContext(ctx).
Raw("SELECT EXISTS(SELECT 1 FROM tracks WHERE id = ? AND creator_id = ? AND is_public = true AND deleted_at IS NULL)", req.TrackID, creatorID).
Scan(&trackExists).Error
if err != nil {
return nil, fmt.Errorf("failed to verify track: %w", err)
}
if !trackExists {
return nil, ErrTrackNotPublic
}
// Check for existing active distribution
var existingCount int64
s.db.WithContext(ctx).Model(&TrackDistribution{}).
Where("track_id = ? AND overall_status IN ?", req.TrackID,
[]string{string(StatusSubmitted), string(StatusProcessing), string(StatusLive)}).
Count(&existingCount)
if existingCount > 0 {
return nil, ErrAlreadyDistributed
}
// Build platform statuses
platformStatuses := make(map[Platform]PlatformStatus)
for _, p := range req.Platforms {
platformStatuses[p] = PlatformStatus{Status: string(StatusSubmitted)}
}
metadataJSON, err := json.Marshal(req.Metadata)
if err != nil {
return nil, fmt.Errorf("failed to marshal metadata: %w", err)
}
platformJSON, err := json.Marshal(platformStatuses)
if err != nil {
return nil, fmt.Errorf("failed to marshal platform statuses: %w", err)
}
// Generate a submission ID (in production, this comes from distributor API)
submissionID := fmt.Sprintf("veza-%s-%d", req.TrackID.String()[:8], time.Now().Unix())
estimatedDays := 7
// If distributor provider is configured, call it
if s.distributorProvider != nil {
resp, err := s.distributorProvider.SubmitTrack(ctx, DistributorSubmitRequest{
Metadata: req.Metadata,
Platforms: req.Platforms,
})
if err != nil {
return nil, fmt.Errorf("distributor submission failed: %w", err)
}
submissionID = resp.SubmissionID
estimatedDays = resp.EstimatedLiveDays
if resp.UPC != "" {
req.Metadata.UPC = resp.UPC
}
if resp.ISRC != "" {
req.Metadata.ISRC = resp.ISRC
}
metadataJSON, _ = json.Marshal(req.Metadata)
}
now := time.Now()
dist := &TrackDistribution{
TrackID: req.TrackID,
CreatorID: creatorID,
Distributor: "distrokid",
SubmissionID: submissionID,
SubmissionUPC: req.Metadata.UPC,
SubmissionISRC: req.Metadata.ISRC,
Metadata: metadataJSON,
OverallStatus: StatusSubmitted,
PlatformStatuses: platformJSON,
SubmittedAt: now,
}
err = s.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error {
if err := tx.Create(dist).Error; err != nil {
return fmt.Errorf("failed to create distribution: %w", err)
}
// Create initial status history entries
for _, p := range req.Platforms {
entry := &StatusHistoryEntry{
DistributionID: dist.ID,
Platform: string(p),
NewStatus: string(StatusSubmitted),
Note: "Track submitted for distribution",
}
if err := tx.Create(entry).Error; err != nil {
return fmt.Errorf("failed to create status history: %w", err)
}
}
return nil
})
if err != nil {
return nil, err
}
s.logger.Info("Track submitted for distribution",
zap.String("creator_id", creatorID.String()),
zap.String("track_id", req.TrackID.String()),
zap.String("submission_id", submissionID),
zap.Int("platforms", len(req.Platforms)),
)
return &SubmitResponse{
Distribution: dist,
EstimatedLiveDays: estimatedDays,
}, nil
}
// checkEligibility verifies that the user has a Creator or Premium subscription with distribution access
func (s *Service) checkEligibility(ctx context.Context, userID uuid.UUID) (bool, error) {
if s.subscriptionService == nil {
// No subscription service configured, allow by default (dev mode)
return true, nil
}
sub, err := s.subscriptionService.GetUserSubscription(ctx, userID)
if err != nil {
if errors.Is(err, subscription.ErrNoActiveSubscription) {
return false, nil
}
return false, err
}
return sub.Plan.HasDistribution || sub.Plan.CanSellOnMarketplace, nil
}
// GetDistribution returns a distribution by ID
func (s *Service) GetDistribution(ctx context.Context, distID uuid.UUID) (*TrackDistribution, error) {
var dist TrackDistribution
err := s.db.WithContext(ctx).First(&dist, "id = ?", distID).Error
if err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
return nil, ErrDistributionNotFound
}
return nil, fmt.Errorf("failed to get distribution: %w", err)
}
return &dist, nil
}
// ListDistributions returns distributions for a creator with optional filters
func (s *Service) ListDistributions(ctx context.Context, creatorID uuid.UUID, trackID *uuid.UUID, status *DistributionStatus, limit, offset int) ([]TrackDistribution, int64, error) {
if limit <= 0 || limit > 100 {
limit = 20
}
if offset < 0 {
offset = 0
}
query := s.db.WithContext(ctx).Where("creator_id = ?", creatorID)
if trackID != nil {
query = query.Where("track_id = ?", *trackID)
}
if status != nil {
query = query.Where("overall_status = ?", string(*status))
}
var total int64
query.Model(&TrackDistribution{}).Count(&total)
var dists []TrackDistribution
err := query.Order("submitted_at DESC").Limit(limit).Offset(offset).Find(&dists).Error
if err != nil {
return nil, 0, fmt.Errorf("failed to list distributions: %w", err)
}
return dists, total, nil
}
// GetTrackDistributions returns all distributions for a specific track
func (s *Service) GetTrackDistributions(ctx context.Context, trackID uuid.UUID) ([]TrackDistribution, error) {
var dists []TrackDistribution
err := s.db.WithContext(ctx).
Where("track_id = ?", trackID).
Order("submitted_at DESC").
Find(&dists).Error
if err != nil {
return nil, fmt.Errorf("failed to get track distributions: %w", err)
}
return dists, nil
}
// GetStatusHistory returns the status change history for a distribution
func (s *Service) GetStatusHistory(ctx context.Context, distID uuid.UUID) ([]StatusHistoryEntry, error) {
var entries []StatusHistoryEntry
err := s.db.WithContext(ctx).
Where("distribution_id = ?", distID).
Order("created_at ASC").
Find(&entries).Error
if err != nil {
return nil, fmt.Errorf("failed to get status history: %w", err)
}
return entries, nil
}
// RemoveDistribution marks a live distribution for removal
func (s *Service) RemoveDistribution(ctx context.Context, creatorID, distID uuid.UUID) error {
dist, err := s.GetDistribution(ctx, distID)
if err != nil {
return err
}
if dist.CreatorID != creatorID {
return ErrDistributionNotFound
}
if dist.OverallStatus != StatusLive {
return ErrDistributionNotRemovable
}
now := time.Now()
dist.OverallStatus = StatusRemoved
dist.UpdatedAt = now
return s.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error {
if err := tx.Save(dist).Error; err != nil {
return err
}
entry := &StatusHistoryEntry{
DistributionID: dist.ID,
Platform: "all",
OldStatus: string(StatusLive),
NewStatus: string(StatusRemoved),
Note: "Removed by creator",
}
return tx.Create(entry).Error
})
}
// PollDistributionStatuses polls the distributor API for status updates (background job)
func (s *Service) PollDistributionStatuses(ctx context.Context) (int, error) {
if s.distributorProvider == nil {
return 0, nil
}
var dists []TrackDistribution
err := s.db.WithContext(ctx).
Where("overall_status IN ?", []string{string(StatusSubmitted), string(StatusProcessing)}).
Find(&dists).Error
if err != nil {
return 0, fmt.Errorf("failed to query distributions for polling: %w", err)
}
updated := 0
now := time.Now()
for i := range dists {
dist := &dists[i]
resp, err := s.distributorProvider.GetStatus(ctx, dist.SubmissionID)
if err != nil {
s.logger.Warn("Failed to poll distribution status",
zap.String("submission_id", dist.SubmissionID),
zap.Error(err),
)
continue
}
if resp.OverallStatus != dist.OverallStatus {
oldStatus := dist.OverallStatus
dist.OverallStatus = resp.OverallStatus
dist.LastStatusCheckAt = &now
if resp.OverallStatus == StatusLive && dist.FirstLiveAt == nil {
dist.FirstLiveAt = &now
}
platformJSON, _ := json.Marshal(resp.PlatformStatuses)
dist.PlatformStatuses = platformJSON
if err := s.db.WithContext(ctx).Save(dist).Error; err != nil {
s.logger.Error("Failed to update distribution status",
zap.String("distribution_id", dist.ID.String()),
zap.Error(err),
)
continue
}
// Record status history
entry := &StatusHistoryEntry{
DistributionID: dist.ID,
Platform: "all",
OldStatus: string(oldStatus),
NewStatus: string(resp.OverallStatus),
Note: "Status updated via distributor API poll",
}
s.db.WithContext(ctx).Create(entry)
updated++
s.logger.Info("Distribution status updated",
zap.String("distribution_id", dist.ID.String()),
zap.String("old_status", string(oldStatus)),
zap.String("new_status", string(resp.OverallStatus)),
)
} else {
dist.LastStatusCheckAt = &now
s.db.WithContext(ctx).Model(dist).Update("last_status_check_at", now)
}
}
return updated, nil
}
// GetExternalRoyalties returns external streaming royalties for a creator
func (s *Service) GetExternalRoyalties(ctx context.Context, creatorID uuid.UUID, startDate, endDate *time.Time, platform *string, limit, offset int) ([]ExternalStreamingRoyalty, *RoyaltySummary, error) {
if limit <= 0 || limit > 100 {
limit = 20
}
if offset < 0 {
offset = 0
}
query := s.db.WithContext(ctx).Where("creator_id = ? AND import_status = ?", creatorID, string(ImportCompleted))
if startDate != nil {
query = query.Where("reporting_period_start >= ?", *startDate)
}
if endDate != nil {
query = query.Where("reporting_period_end <= ?", *endDate)
}
if platform != nil && *platform != "" {
query = query.Where("platform = ?", *platform)
}
var royalties []ExternalStreamingRoyalty
err := query.Order("reporting_period_start DESC").
Limit(limit).Offset(offset).
Find(&royalties).Error
if err != nil {
return nil, nil, fmt.Errorf("failed to get royalties: %w", err)
}
// Compute summary
summary := &RoyaltySummary{
ByPlatform: make(map[string]PlatformRoyaltySummary),
}
summaryQuery := s.db.WithContext(ctx).
Model(&ExternalStreamingRoyalty{}).
Where("creator_id = ? AND import_status = ?", creatorID, string(ImportCompleted))
if startDate != nil {
summaryQuery = summaryQuery.Where("reporting_period_start >= ?", *startDate)
}
if endDate != nil {
summaryQuery = summaryQuery.Where("reporting_period_end <= ?", *endDate)
}
if platform != nil && *platform != "" {
summaryQuery = summaryQuery.Where("platform = ?", *platform)
}
var summaryRows []struct {
Platform string `gorm:"column:platform"`
TotalStreams int64 `gorm:"column:total_streams"`
TotalRevenueCents int64 `gorm:"column:total_revenue_cents"`
}
summaryQuery.Select("platform, SUM(total_streams) as total_streams, SUM(total_revenue_cents) as total_revenue_cents").
Group("platform").Scan(&summaryRows)
for _, row := range summaryRows {
summary.TotalStreams += row.TotalStreams
summary.TotalRevenueCents += row.TotalRevenueCents
summary.ByPlatform[row.Platform] = PlatformRoyaltySummary{
Streams: row.TotalStreams,
RevenueCents: row.TotalRevenueCents,
}
}
return royalties, summary, nil
}
// RoyaltySummary provides aggregated royalty data
type RoyaltySummary struct {
TotalStreams int64 `json:"total_streams"`
TotalRevenueCents int64 `json:"total_revenue_cents"`
ByPlatform map[string]PlatformRoyaltySummary `json:"by_platform"`
}
// PlatformRoyaltySummary provides per-platform aggregated data
type PlatformRoyaltySummary struct {
Streams int64 `json:"streams"`
RevenueCents int64 `json:"revenue_cents"`
}