import { createClient } from 'redis' import { DataRelationManager } from '../../core/utils/data-relations' import { globalConfig } from '../../core/config' import { vezaFaker } from '../../core/utils/faker-config' import type { Audio, User } from '../../core/schemas/database' /** * Stream server fixtures - Audio streaming data and cache */ export class StreamServerFixtures { private static redisClient: any = null private static streamingSessions: Map = new Map() private static audioQueue: Map = new Map() /** * Initialize Redis connection for streaming data */ static async initialize(): Promise { console.log('๐ŸŽต Initializing stream server fixtures...') // Initialize Redis connection this.redisClient = createClient({ socket: { host: globalConfig.redis.host, port: globalConfig.redis.port, }, password: globalConfig.redis.password || undefined, }) try { await this.redisClient.connect() console.log('โœ… Connected to Redis (stream)') } catch (error) { console.warn('โš ๏ธ Could not connect to Redis (stream):', error) this.redisClient = null } } /** * Seed Redis with streaming data */ static async seedStreamingData(): Promise { if (!this.redisClient) { console.warn('โš ๏ธ No Redis connection available') return } console.log('๐ŸŽถ Seeding streaming data...') try { const allData = DataRelationManager.getAll() const users = Array.from(allData.users.values()) const tracks = Array.from(allData.tracks.values()) // Cache audio metadata for fast access console.log(`๐Ÿ“ Caching ${tracks.length} track metadata...`) await this.cacheAudioMetadata(tracks) // Create streaming sessions for active users const activeUsers = users.filter(u => u.status === 'active') console.log(`๐Ÿ“ Creating streaming sessions for ${activeUsers.length} users...`) await this.createStreamingSessions(activeUsers, tracks) // Cache popular tracks and playlists await this.cachePopularContent(tracks) // Cache audio processing queue await this.cacheAudioQueue(tracks) // Cache streaming statistics await this.cacheStreamingStats(tracks) console.log('โœ… Streaming data seeded successfully') } catch (error) { console.error('โŒ Error seeding streaming data:', error) throw error } } /** * Generate streaming session data */ static generateStreamingSession(userId: string, trackId: string): any { const sessionId = `stream_${userId}_${Date.now()}` const startTime = new Date() const session = { id: sessionId, userId, trackId, startTime: startTime.toISOString(), duration: 0, // Will be updated as stream progresses quality: vezaFaker.helpers.arrayElement(['low', 'medium', 'high', 'lossless']), bitrate: vezaFaker.helpers.arrayElement([128, 192, 256, 320]), format: vezaFaker.helpers.arrayElement(['mp3', 'flac', 'wav']), platform: vezaFaker.helpers.arrayElement(['web', 'mobile', 'desktop']), location: { country: 'FR', region: vezaFaker.user.location(), coordinates: { lat: vezaFaker.location.latitude({ min: 41, max: 51 }), lng: vezaFaker.location.longitude({ min: -5, max: 9 }) } }, device: { type: vezaFaker.helpers.arrayElement(['desktop', 'mobile', 'tablet']), os: vezaFaker.helpers.arrayElement(['Windows', 'macOS', 'Linux', 'iOS', 'Android']), browser: vezaFaker.helpers.arrayElement(['Chrome', 'Firefox', 'Safari', 'Edge']) }, network: { type: vezaFaker.helpers.arrayElement(['wifi', '4g', '5g', 'ethernet']), speed: vezaFaker.number.int({ min: 1, max: 100 }) // Mbps }, status: 'active', progress: 0, // Percentage of track completed volume: vezaFaker.number.float({ min: 0.1, max: 1.0, precision: 0.1 }), repeat: vezaFaker.helpers.arrayElement(['none', 'one', 'all']), shuffle: vezaFaker.datatype.boolean({ probability: 0.3 }), createdAt: startTime.toISOString(), updatedAt: startTime.toISOString() } this.streamingSessions.set(sessionId, session) return session } /** * Generate audio processing queue */ static generateAudioProcessingJobs(tracks: Audio[]): any[] { const jobs: any[] = [] // Select some tracks for processing (transcoding, analysis, etc.) const tracksToProcess = vezaFaker.helpers.arrayElements(tracks, Math.floor(tracks.length * 0.1)) tracksToProcess.forEach(track => { const jobTypes = ['transcode', 'analyze', 'thumbnail', 'waveform', 'metadata'] const jobType = vezaFaker.helpers.arrayElement(jobTypes) jobs.push({ id: `job_${track.id}_${jobType}_${Date.now()}`, type: jobType, trackId: track.id, status: vezaFaker.helpers.arrayElement(['pending', 'processing', 'completed', 'failed']), priority: vezaFaker.helpers.arrayElement(['low', 'medium', 'high']), progress: jobType === 'processing' ? vezaFaker.number.int({ min: 10, max: 90 }) : jobType === 'completed' ? 100 : 0, startedAt: vezaFaker.date.recent({ days: 1 }), estimatedCompletion: vezaFaker.date.soon({ days: 1 }), metadata: { inputFormat: track.metadata.format, outputFormat: jobType === 'transcode' ? vezaFaker.helpers.arrayElement(['mp3', 'flac', 'wav']) : undefined, quality: jobType === 'transcode' ? vezaFaker.helpers.arrayElement(['low', 'medium', 'high']) : undefined, fileSize: track.metadata.fileSize, duration: track.metadata.duration }, createdAt: vezaFaker.date.recent({ days: 2 }), updatedAt: vezaFaker.date.recent({ days: 1 }) }) }) return jobs.sort((a, b) => new Date(b.createdAt).getTime() - new Date(a.createdAt).getTime()) } /** * Generate streaming analytics */ static generateStreamingAnalytics(): any { const sessions = Array.from(this.streamingSessions.values()) return { overview: { totalSessions: sessions.length, activeSessions: sessions.filter(s => s.status === 'active').length, totalDuration: sessions.reduce((sum, s) => sum + s.duration, 0), averageSessionDuration: sessions.length > 0 ? sessions.reduce((sum, s) => sum + s.duration, 0) / sessions.length : 0, uniqueUsers: new Set(sessions.map(s => s.userId)).size, totalBandwidth: sessions.reduce((sum, s) => sum + (s.bitrate * s.duration / 8), 0) // MB }, byQuality: this.groupSessionsByField(sessions, 'quality'), byPlatform: this.groupSessionsByField(sessions, 'platform'), byLocation: this.groupSessionsByLocation(sessions), byDevice: this.groupSessionsByDevice(sessions), byNetwork: this.groupSessionsByField(sessions, 'network.type'), realtime: { currentListeners: sessions.filter(s => s.status === 'active').length, peakListeners: vezaFaker.number.int({ min: 100, max: 1000 }), averageBitrate: sessions.length > 0 ? sessions.reduce((sum, s) => sum + s.bitrate, 0) / sessions.length : 0, totalBandwidthUsage: vezaFaker.number.float({ min: 10, max: 100 }), // Mbps serverLoad: vezaFaker.number.float({ min: 0.1, max: 0.9 }), errorRate: vezaFaker.number.float({ min: 0, max: 0.05 }) }, trends: { hourly: this.generateHourlyTrends(), daily: this.generateDailyTrends(), weekly: this.generateWeeklyTrends() } } } /** * Private helper methods */ private static async cacheAudioMetadata(tracks: Audio[]): Promise { if (!this.redisClient) return for (const track of tracks) { const metadata = { id: track.id, title: track.title, artist: track.artist, duration: track.metadata.duration, bitrate: track.metadata.bitrate, format: track.metadata.format, fileSize: track.metadata.fileSize, fileUrl: track.fileUrl, waveformData: track.waveformData, genre: track.genre, isPublic: track.isPublic, stats: track.stats } await this.redisClient.setEx( `track:${track.id}:metadata`, 3600, // 1 hour TTL JSON.stringify(metadata) ) } } private static async createStreamingSessions(users: User[], tracks: Audio[]): Promise { if (!this.redisClient) return // Create active streaming sessions for some users const activeListeners = vezaFaker.helpers.arrayElements(users, Math.floor(users.length * 0.1)) for (const user of activeListeners) { const track = vezaFaker.helpers.arrayElement(tracks) const session = this.generateStreamingSession(user.id, track.id) // Cache session in Redis await this.redisClient.setEx( `session:${session.id}`, 1800, // 30 minutes TTL JSON.stringify(session) ) // Add to active sessions set await this.redisClient.sAdd('sessions:active', session.id) // Add to user's active sessions await this.redisClient.sAdd(`user:${user.id}:sessions`, session.id) // Track current playing await this.redisClient.setEx( `user:${user.id}:current_track`, 1800, track.id ) } } private static async cachePopularContent(tracks: Audio[]): Promise { if (!this.redisClient) return // Sort tracks by popularity (plays) const popularTracks = tracks .sort((a, b) => b.stats.plays - a.stats.plays) .slice(0, 50) // Cache popular tracks list await this.redisClient.setEx( 'tracks:popular', 3600, // 1 hour TTL JSON.stringify(popularTracks.map(t => t.id)) ) // Cache trending tracks (recent + popular) const trendingTracks = tracks .filter(track => { const weekAgo = new Date(Date.now() - 7 * 24 * 60 * 60 * 1000) return track.createdAt > weekAgo }) .sort((a, b) => b.stats.plays - a.stats.plays) .slice(0, 20) await this.redisClient.setEx( 'tracks:trending', 1800, // 30 minutes TTL JSON.stringify(trendingTracks.map(t => t.id)) ) // Cache by genre const genres = [...new Set(tracks.map(t => t.genre))] for (const genre of genres) { const genreTracks = tracks .filter(t => t.genre === genre) .sort((a, b) => b.stats.plays - a.stats.plays) .slice(0, 20) await this.redisClient.setEx( `tracks:genre:${genre}`, 3600, JSON.stringify(genreTracks.map(t => t.id)) ) } } private static async cacheAudioQueue(tracks: Audio[]): Promise { if (!this.redisClient) return const processingJobs = this.generateAudioProcessingJobs(tracks) // Group jobs by status const jobsByStatus = { pending: processingJobs.filter(j => j.status === 'pending'), processing: processingJobs.filter(j => j.status === 'processing'), completed: processingJobs.filter(j => j.status === 'completed'), failed: processingJobs.filter(j => j.status === 'failed') } // Cache job queues for (const [status, jobs] of Object.entries(jobsByStatus)) { await this.redisClient.setEx( `queue:${status}`, 300, // 5 minutes TTL JSON.stringify(jobs.map(j => j.id)) ) // Cache individual job data for (const job of jobs) { await this.redisClient.setEx( `job:${job.id}`, 3600, JSON.stringify(job) ) } } // Cache queue statistics const queueStats = { total: processingJobs.length, pending: jobsByStatus.pending.length, processing: jobsByStatus.processing.length, completed: jobsByStatus.completed.length, failed: jobsByStatus.failed.length, averageProcessingTime: vezaFaker.number.int({ min: 30, max: 300 }), // seconds throughput: vezaFaker.number.int({ min: 5, max: 50 }), // jobs per hour lastUpdated: new Date().toISOString() } await this.redisClient.setEx( 'queue:stats', 60, // 1 minute TTL JSON.stringify(queueStats) ) } private static async cacheStreamingStats(tracks: Audio[]): Promise { if (!this.redisClient) return const analytics = this.generateStreamingAnalytics() // Cache overall streaming statistics await this.redisClient.setEx( 'streaming:stats', 300, // 5 minutes TTL JSON.stringify(analytics) ) // Cache real-time metrics await this.redisClient.setEx( 'streaming:realtime', 10, // 10 seconds TTL JSON.stringify(analytics.realtime) ) // Cache individual track streaming stats for (const track of tracks) { const trackStats = { plays: track.stats.plays, currentListeners: vezaFaker.number.int({ min: 0, max: 50 }), peakListeners: vezaFaker.number.int({ min: 0, max: 200 }), totalDuration: track.stats.plays * track.metadata.duration, averageCompletion: vezaFaker.number.float({ min: 0.3, max: 0.95 }), skipRate: vezaFaker.number.float({ min: 0.05, max: 0.4 }), lastPlayed: vezaFaker.date.recent({ days: 1 }) } await this.redisClient.setEx( `track:${track.id}:streaming_stats`, 600, // 10 minutes TTL JSON.stringify(trackStats) ) } } private static groupSessionsByField(sessions: any[], field: string): any { const groups: Record = {} sessions.forEach(session => { const value = field.includes('.') ? field.split('.').reduce((obj, key) => obj?.[key], session) : session[field] if (value) { groups[value] = (groups[value] || 0) + 1 } }) return Object.entries(groups).map(([key, count]) => ({ label: key, count, percentage: Math.round((count / sessions.length) * 100) })) } private static groupSessionsByLocation(sessions: any[]): any { const locations: Record = {} sessions.forEach(session => { const region = session.location?.region if (region) { locations[region] = (locations[region] || 0) + 1 } }) return Object.entries(locations).map(([region, count]) => ({ region, count, percentage: Math.round((count / sessions.length) * 100) })) } private static groupSessionsByDevice(sessions: any[]): any { const devices: Record = {} sessions.forEach(session => { const deviceType = session.device?.type const os = session.device?.os if (deviceType) { if (!devices[deviceType]) { devices[deviceType] = { count: 0, os: {} } } devices[deviceType].count++ if (os) { devices[deviceType].os[os] = (devices[deviceType].os[os] || 0) + 1 } } }) return Object.entries(devices).map(([type, data]: [string, any]) => ({ type, count: data.count, percentage: Math.round((data.count / sessions.length) * 100), topOS: Object.entries(data.os) .sort(([,a]: any, [,b]: any) => b - a) .slice(0, 3) .map(([os, count]) => ({ os, count })) })) } private static generateHourlyTrends(): any[] { return Array.from({ length: 24 }, (_, hour) => ({ hour, listeners: vezaFaker.number.int({ min: 50, max: 500 }), streams: vezaFaker.number.int({ min: 100, max: 1000 }), bandwidth: vezaFaker.number.float({ min: 10, max: 100 }) })) } private static generateDailyTrends(): any[] { return Array.from({ length: 7 }, (_, day) => ({ day: day + 1, listeners: vezaFaker.number.int({ min: 500, max: 2000 }), streams: vezaFaker.number.int({ min: 1000, max: 5000 }), bandwidth: vezaFaker.number.float({ min: 100, max: 500 }) })) } private static generateWeeklyTrends(): any[] { return Array.from({ length: 12 }, (_, week) => ({ week: week + 1, listeners: vezaFaker.number.int({ min: 2000, max: 10000 }), streams: vezaFaker.number.int({ min: 5000, max: 25000 }), bandwidth: vezaFaker.number.float({ min: 500, max: 2000 }) })) } /** * Utility methods */ static getStreamingSession(sessionId: string): any { return this.streamingSessions.get(sessionId) } static getAllStreamingSessions(): any[] { return Array.from(this.streamingSessions.values()) } static getActiveSessionsCount(): number { return Array.from(this.streamingSessions.values()) .filter(session => session.status === 'active').length } /** * Cleanup methods */ static async cleanup(): Promise { if (this.redisClient) { await this.redisClient.quit() this.redisClient = null } this.streamingSessions.clear() this.audioQueue.clear() console.log('๐Ÿงน Stream server fixtures cleanup completed') } /** * Health check */ static async healthCheck(): Promise<{ redis: boolean sessionsActive: number queueSize: number }> { let redis = false let sessionsActive = 0 let queueSize = 0 // Check Redis connection if (this.redisClient) { try { await this.redisClient.ping() redis = true // Get active sessions count sessionsActive = await this.redisClient.sCard('sessions:active') || 0 // Get queue size queueSize = await this.redisClient.lLen('queue:pending') || 0 } catch (error) { console.error('Stream server health check failed:', error) } } return { redis, sessionsActive, queueSize } } }