veza/veza-backend-api/internal/monitoring/playback_analytics_monitor.go

483 lines
16 KiB
Go

package monitoring
import (
"context"
"fmt"
"sync"
"time"
"github.com/google/uuid"
"veza-backend-api/internal/models"
"veza-backend-api/internal/services"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"go.uber.org/zap"
"gorm.io/gorm"
)
// PlaybackAnalyticsMonitor gère le monitoring des analytics de playback
// T0386: Create Playback Analytics Monitoring
type PlaybackAnalyticsMonitor struct {
db *gorm.DB
logger *zap.Logger
alertsService *services.PlaybackAlertsService
analyticsService *services.PlaybackAnalyticsService
// Métriques Prometheus
recordedEventsTotal *prometheus.CounterVec
recordedEventsDuration *prometheus.HistogramVec
recordedEventsErrors *prometheus.CounterVec
activeSessions prometheus.Gauge
averageCompletionRate prometheus.Gauge
averagePlayTime prometheus.Gauge
alertsGenerated *prometheus.CounterVec
alertsActive prometheus.Gauge
// Métriques internes
mu sync.RWMutex
metrics *PerformanceMetrics
lastAlertCheck time.Time
alertCheckInterval time.Duration
}
// PerformanceMetrics représente les métriques de performance collectées
type PerformanceMetrics struct {
TotalEventsRecorded int64 `json:"total_events_recorded"`
TotalEventsFailed int64 `json:"total_events_failed"`
AverageRecordLatency time.Duration `json:"average_record_latency"`
P95RecordLatency time.Duration `json:"p95_record_latency"`
P99RecordLatency time.Duration `json:"p99_record_latency"`
ActiveSessions int64 `json:"active_sessions"`
AverageCompletionRate float64 `json:"average_completion_rate"`
AveragePlayTime float64 `json:"average_play_time"`
TotalAlertsGenerated int64 `json:"total_alerts_generated"`
ActiveAlerts int64 `json:"active_alerts"`
LastUpdated time.Time `json:"last_updated"`
}
// DashboardMetrics représente les métriques pour le dashboard de monitoring
type DashboardMetrics struct {
Performance *PerformanceMetrics `json:"performance"`
RecentAlerts []services.Alert `json:"recent_alerts"`
TopTracks []TrackMetrics `json:"top_tracks"`
ErrorRate float64 `json:"error_rate"`
SuccessRate float64 `json:"success_rate"`
Throughput float64 `json:"throughput"` // Events per second
Timestamp time.Time `json:"timestamp"`
}
// TrackMetrics représente les métriques pour un track spécifique
type TrackMetrics struct {
TrackID uuid.UUID `json:"track_id"`
TrackTitle string `json:"track_title"`
TotalSessions int64 `json:"total_sessions"`
AverageCompletion float64 `json:"average_completion"`
AveragePlayTime float64 `json:"average_play_time"`
ErrorRate float64 `json:"error_rate"`
}
// NewPlaybackAnalyticsMonitor crée un nouveau monitor pour les analytics de playback
// T0386: Create Playback Analytics Monitoring
func NewPlaybackAnalyticsMonitor(
db *gorm.DB,
logger *zap.Logger,
alertsService *services.PlaybackAlertsService,
analyticsService *services.PlaybackAnalyticsService,
) *PlaybackAnalyticsMonitor {
if logger == nil {
logger = zap.NewNop()
}
monitor := &PlaybackAnalyticsMonitor{
db: db,
logger: logger,
alertsService: alertsService,
analyticsService: analyticsService,
metrics: &PerformanceMetrics{},
alertCheckInterval: 5 * time.Minute, // Vérifier les alertes toutes les 5 minutes
}
// Initialiser les métriques Prometheus
monitor.recordedEventsTotal = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "veza_playback_analytics_events_total",
Help: "Total number of playback analytics events recorded",
},
[]string{"status"}, // "success", "error"
)
monitor.recordedEventsDuration = promauto.NewHistogramVec(
prometheus.HistogramOpts{
Name: "veza_playback_analytics_record_duration_seconds",
Help: "Duration of playback analytics recording in seconds",
Buckets: []float64{0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 5.0},
},
[]string{"operation"}, // "record", "batch"
)
monitor.recordedEventsErrors = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "veza_playback_analytics_errors_total",
Help: "Total number of playback analytics recording errors",
},
[]string{"error_type"}, // "validation", "database", "network"
)
monitor.activeSessions = promauto.NewGauge(
prometheus.GaugeOpts{
Name: "veza_playback_analytics_active_sessions",
Help: "Number of active playback sessions",
},
)
monitor.averageCompletionRate = promauto.NewGauge(
prometheus.GaugeOpts{
Name: "veza_playback_analytics_average_completion_rate",
Help: "Average completion rate across all playback sessions",
},
)
monitor.averagePlayTime = promauto.NewGauge(
prometheus.GaugeOpts{
Name: "veza_playback_analytics_average_play_time_seconds",
Help: "Average play time in seconds across all playback sessions",
},
)
monitor.alertsGenerated = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "veza_playback_analytics_alerts_generated_total",
Help: "Total number of playback analytics alerts generated",
},
[]string{"alert_type", "severity"}, // "anomaly", "low_completion_rate", "drop_off_point" / "low", "medium", "high"
)
monitor.alertsActive = promauto.NewGauge(
prometheus.GaugeOpts{
Name: "veza_playback_analytics_alerts_active",
Help: "Number of active playback analytics alerts",
},
)
return monitor
}
// RecordEvent enregistre un événement d'analytics et met à jour les métriques
// T0386: Create Playback Analytics Monitoring
func (m *PlaybackAnalyticsMonitor) RecordEvent(ctx context.Context, analytics *models.PlaybackAnalytics, duration time.Duration, err error) {
// Mettre à jour les métriques Prometheus
if err != nil {
m.recordedEventsTotal.WithLabelValues("error").Inc()
m.recordedEventsErrors.WithLabelValues("database").Inc()
} else {
m.recordedEventsTotal.WithLabelValues("success").Inc()
}
m.recordedEventsDuration.WithLabelValues("record").Observe(duration.Seconds())
// Mettre à jour les métriques internes
m.mu.Lock()
defer m.mu.Unlock()
if err != nil {
m.metrics.TotalEventsFailed++
} else {
m.metrics.TotalEventsRecorded++
}
// Mettre à jour la latence moyenne (calcul simplifié)
if m.metrics.TotalEventsRecorded > 0 {
totalLatency := m.metrics.AverageRecordLatency * time.Duration(m.metrics.TotalEventsRecorded-1)
m.metrics.AverageRecordLatency = (totalLatency + duration) / time.Duration(m.metrics.TotalEventsRecorded)
} else {
m.metrics.AverageRecordLatency = duration
}
m.metrics.LastUpdated = time.Now()
}
// RecordBatchEvent enregistre un événement batch et met à jour les métriques
// T0386: Create Playback Analytics Monitoring
func (m *PlaybackAnalyticsMonitor) RecordBatchEvent(ctx context.Context, count int, duration time.Duration, err error) {
if err != nil {
m.recordedEventsTotal.WithLabelValues("error").Add(float64(count))
m.recordedEventsErrors.WithLabelValues("database").Inc()
} else {
m.recordedEventsTotal.WithLabelValues("success").Add(float64(count))
}
m.recordedEventsDuration.WithLabelValues("batch").Observe(duration.Seconds())
m.mu.Lock()
defer m.mu.Unlock()
if err != nil {
m.metrics.TotalEventsFailed += int64(count)
} else {
m.metrics.TotalEventsRecorded += int64(count)
}
m.metrics.LastUpdated = time.Now()
}
// UpdateMetrics met à jour les métriques depuis la base de données
// T0386: Create Playback Analytics Monitoring
func (m *PlaybackAnalyticsMonitor) UpdateMetrics(ctx context.Context) error {
m.mu.Lock()
defer m.mu.Unlock()
// Compter les sessions actives (sessions commencées dans les dernières 30 minutes)
activeSessionsThreshold := time.Now().Add(-30 * time.Minute)
var activeSessionsCount int64
if err := m.db.WithContext(ctx).Model(&models.PlaybackAnalytics{}).
Where("started_at > ? AND (ended_at IS NULL OR ended_at > ?)", activeSessionsThreshold, activeSessionsThreshold).
Count(&activeSessionsCount).Error; err != nil {
m.logger.Warn("Failed to count active sessions", zap.Error(err))
} else {
m.metrics.ActiveSessions = activeSessionsCount
m.activeSessions.Set(float64(activeSessionsCount))
}
// Calculer le taux de complétion moyen
var avgCompletion float64
if err := m.db.WithContext(ctx).Model(&models.PlaybackAnalytics{}).
Select("COALESCE(AVG(completion_rate), 0)").
Where("completion_rate > 0").
Scan(&avgCompletion).Error; err != nil {
m.logger.Warn("Failed to calculate average completion rate", zap.Error(err))
} else {
m.metrics.AverageCompletionRate = avgCompletion
m.averageCompletionRate.Set(avgCompletion)
}
// Calculer le temps de lecture moyen
var avgPlayTime float64
if err := m.db.WithContext(ctx).Model(&models.PlaybackAnalytics{}).
Select("COALESCE(AVG(play_time), 0)").
Where("play_time > 0").
Scan(&avgPlayTime).Error; err != nil {
m.logger.Warn("Failed to calculate average play time", zap.Error(err))
} else {
m.metrics.AveragePlayTime = avgPlayTime
m.averagePlayTime.Set(avgPlayTime)
}
m.metrics.LastUpdated = time.Now()
return nil
}
// CheckAlerts vérifie les alertes pour tous les tracks actifs
// T0386: Create Playback Analytics Monitoring
func (m *PlaybackAnalyticsMonitor) CheckAlerts(ctx context.Context) ([]services.Alert, error) {
if m.alertsService == nil {
return nil, fmt.Errorf("alerts service not available")
}
// Récupérer les tracks avec des sessions récentes (dernières 24 heures)
recentThreshold := time.Now().Add(-24 * time.Hour)
var trackIDs []uuid.UUID
if err := m.db.WithContext(ctx).Model(&models.PlaybackAnalytics{}).
Distinct("track_id").
Where("started_at > ?", recentThreshold).
Pluck("track_id", &trackIDs).Error; err != nil {
return nil, fmt.Errorf("failed to get recent track IDs: %w", err)
}
allAlerts := make([]services.Alert, 0)
for _, trackID := range trackIDs {
alerts, err := m.alertsService.CheckAlerts(ctx, trackID, nil)
if err != nil {
m.logger.Warn("Failed to check alerts for track",
zap.Error(err),
zap.String("track_id", trackID.String()))
continue
}
// Mettre à jour les métriques Prometheus
for _, alert := range alerts {
m.alertsGenerated.WithLabelValues(alert.Type, alert.Severity).Inc()
}
allAlerts = append(allAlerts, alerts...)
}
// Mettre à jour le nombre d'alertes actives
m.mu.Lock()
m.metrics.TotalAlertsGenerated += int64(len(allAlerts))
m.metrics.ActiveAlerts = int64(len(allAlerts))
m.mu.Unlock()
m.alertsActive.Set(float64(len(allAlerts)))
m.lastAlertCheck = time.Now()
m.logger.Info("Checked playback analytics alerts",
zap.Int("tracks_checked", len(trackIDs)),
zap.Int("alerts_found", len(allAlerts)))
return allAlerts, nil
}
// GetPerformanceMetrics retourne les métriques de performance actuelles
// T0386: Create Playback Analytics Monitoring
func (m *PlaybackAnalyticsMonitor) GetPerformanceMetrics() *PerformanceMetrics {
m.mu.RLock()
defer m.mu.RUnlock()
// Retourner une copie pour éviter les modifications concurrentes
return &PerformanceMetrics{
TotalEventsRecorded: m.metrics.TotalEventsRecorded,
TotalEventsFailed: m.metrics.TotalEventsFailed,
AverageRecordLatency: m.metrics.AverageRecordLatency,
P95RecordLatency: m.metrics.P95RecordLatency,
P99RecordLatency: m.metrics.P99RecordLatency,
ActiveSessions: m.metrics.ActiveSessions,
AverageCompletionRate: m.metrics.AverageCompletionRate,
AveragePlayTime: m.metrics.AveragePlayTime,
TotalAlertsGenerated: m.metrics.TotalAlertsGenerated,
ActiveAlerts: m.metrics.ActiveAlerts,
LastUpdated: m.metrics.LastUpdated,
}
}
// GetDashboardMetrics retourne les métriques complètes pour le dashboard
// T0386: Create Playback Analytics Monitoring
func (m *PlaybackAnalyticsMonitor) GetDashboardMetrics(ctx context.Context) (*DashboardMetrics, error) {
// Mettre à jour les métriques depuis la base de données
if err := m.UpdateMetrics(ctx); err != nil {
m.logger.Warn("Failed to update metrics", zap.Error(err))
}
// Vérifier les alertes si nécessaire
var recentAlerts []services.Alert
if time.Since(m.lastAlertCheck) > m.alertCheckInterval {
alerts, err := m.CheckAlerts(ctx)
if err != nil {
m.logger.Warn("Failed to check alerts", zap.Error(err))
} else {
recentAlerts = alerts
}
}
// Récupérer les top tracks
topTracks, err := m.getTopTracks(ctx, 10)
if err != nil {
m.logger.Warn("Failed to get top tracks", zap.Error(err))
topTracks = []TrackMetrics{}
}
// Calculer les taux d'erreur et de succès
perfMetrics := m.GetPerformanceMetrics()
totalEvents := perfMetrics.TotalEventsRecorded + perfMetrics.TotalEventsFailed
var errorRate, successRate float64
if totalEvents > 0 {
errorRate = float64(perfMetrics.TotalEventsFailed) / float64(totalEvents) * 100
successRate = float64(perfMetrics.TotalEventsRecorded) / float64(totalEvents) * 100
}
// Calculer le throughput (événements par seconde sur la dernière heure)
var throughput float64
oneHourAgo := time.Now().Add(-1 * time.Hour)
var eventsLastHour int64
if err := m.db.WithContext(ctx).Model(&models.PlaybackAnalytics{}).
Where("created_at > ?", oneHourAgo).
Count(&eventsLastHour).Error; err == nil {
throughput = float64(eventsLastHour) / 3600.0 // Events per second
}
return &DashboardMetrics{
Performance: perfMetrics,
RecentAlerts: recentAlerts,
TopTracks: topTracks,
ErrorRate: errorRate,
SuccessRate: successRate,
Throughput: throughput,
Timestamp: time.Now(),
}, nil
}
// getTopTracks récupère les métriques pour les tracks les plus actifs
// T0386: Create Playback Analytics Monitoring
func (m *PlaybackAnalyticsMonitor) getTopTracks(ctx context.Context, limit int) ([]TrackMetrics, error) {
type TrackStats struct {
TrackID uuid.UUID `gorm:"column:track_id"`
TrackTitle string `gorm:"column:track_title"`
TotalSessions int64 `gorm:"column:total_sessions"`
AverageCompletion float64 `gorm:"column:average_completion"`
AveragePlayTime float64 `gorm:"column:average_play_time"`
ErrorCount int64 `gorm:"column:error_count"`
}
var stats []TrackStats
query := `
SELECT
pa.track_id,
COALESCE(t.title, 'Unknown') as track_title,
COUNT(*) as total_sessions,
COALESCE(AVG(pa.completion_rate), 0) as average_completion,
COALESCE(AVG(pa.play_time), 0) as average_play_time,
0 as error_count
FROM playback_analytics pa
LEFT JOIN tracks t ON pa.track_id = t.id
WHERE pa.created_at > NOW() - INTERVAL '24 hours'
GROUP BY pa.track_id, t.title
ORDER BY total_sessions DESC
LIMIT ?
`
if err := m.db.WithContext(ctx).Raw(query, limit).Scan(&stats).Error; err != nil {
return nil, fmt.Errorf("failed to get top tracks: %w", err)
}
trackMetrics := make([]TrackMetrics, 0, len(stats))
for _, stat := range stats {
var errorRate float64
if stat.TotalSessions > 0 {
errorRate = float64(stat.ErrorCount) / float64(stat.TotalSessions) * 100
}
trackMetrics = append(trackMetrics, TrackMetrics{
TrackID: stat.TrackID,
TrackTitle: stat.TrackTitle,
TotalSessions: stat.TotalSessions,
AverageCompletion: stat.AverageCompletion,
AveragePlayTime: stat.AveragePlayTime,
ErrorRate: errorRate,
})
}
return trackMetrics, nil
}
// StartBackgroundMonitoring démarre le monitoring en arrière-plan
// T0386: Create Playback Analytics Monitoring
func (m *PlaybackAnalyticsMonitor) StartBackgroundMonitoring(ctx context.Context, updateInterval time.Duration) {
ticker := time.NewTicker(updateInterval)
defer ticker.Stop()
// Mettre à jour immédiatement au démarrage
if err := m.UpdateMetrics(ctx); err != nil {
m.logger.Error("Failed to update metrics on startup", zap.Error(err))
}
for {
select {
case <-ctx.Done():
m.logger.Info("Stopping playback analytics monitoring")
return
case <-ticker.C:
if err := m.UpdateMetrics(ctx); err != nil {
m.logger.Error("Failed to update metrics", zap.Error(err))
}
// Vérifier les alertes périodiquement
if time.Since(m.lastAlertCheck) > m.alertCheckInterval {
if _, err := m.CheckAlerts(ctx); err != nil {
m.logger.Error("Failed to check alerts", zap.Error(err))
}
}
}
}
}