//! Module ConnectionPool - Gestion avancée de 10k connexions WebSocket simultanées //! //! Ce module implémente un pool de connexions haute performance avec : //! - Support 10000+ connexions simultanées //! - Heartbeat intelligent avec timeout adaptatif //! - Cleanup automatique des connexions mortes //! - Monitoring en temps réel des connexions //! - Load balancing des messages use std::collections::HashMap; use std::sync::{Arc, atomic::{AtomicUsize, Ordering}}; use std::time::{Duration, Instant}; use tokio::sync::{RwLock, mpsc, broadcast, Mutex}; use tokio::time::{interval, timeout}; use serde::{Serialize, Deserialize}; use dashmap::DashMap; use uuid::Uuid; use futures_util::{SinkExt, StreamExt}; use axum::extract::ws::{WebSocket, Message}; use tokio_tungstenite::tungstenite::Message; use tokio_tungstenite::WebSocketStream; use crate::error::{ChatError, Result}; use crate::monitoring::ChatMetrics; /// Configuration du pool de connexions #[derive(Debug, Clone)] pub struct ConnectionPoolConfig { /// Nombre maximum de connexions simultanées pub max_connections: usize, /// Délai entre les heartbeats pub heartbeat_interval: Duration, /// Timeout pour considérer une connexion morte pub idle_timeout: Duration, /// Intervalle de nettoyage des connexions mortes pub cleanup_interval: Duration, /// Taille du buffer pour les messages pub message_buffer_size: usize, /// Timeout pour l'envoi d'un message pub send_timeout: Duration, } impl Default for ConnectionPoolConfig { fn default() -> Self { Self { max_connections: 10000, heartbeat_interval: Duration::from_secs(30), idle_timeout: Duration::from_secs(120), cleanup_interval: Duration::from_secs(60), message_buffer_size: 1000, send_timeout: Duration::from_secs(5), } } } /// Métadonnées d'une connexion WebSocket #[derive(Debug)] pub struct ConnectionMetadata { pub connection_id: Uuid, pub user_id: i32, pub username: String, pub connected_at: Instant, pub last_heartbeat: Instant, pub last_message: Instant, pub messages_sent: AtomicUsize, pub messages_received: AtomicUsize, pub is_alive: bool, pub rooms: Vec, pub user_agent: Option, pub ip_address: Option, } impl ConnectionMetadata { pub fn new(user_id: i32, username: String) -> Self { let now = Instant::now(); Self { connection_id: Uuid::new_v4(), user_id, username, connected_at: now, last_heartbeat: now, last_message: now, messages_sent: AtomicUsize::new(0), messages_received: AtomicUsize::new(0), is_alive: true, rooms: Vec::new(), user_agent: None, ip_address: None, } } pub fn is_idle(&self, timeout: Duration) -> bool { self.last_heartbeat.elapsed() > timeout } pub fn connection_duration(&self) -> Duration { self.connected_at.elapsed() } pub fn increment_sent(&self) { self.messages_sent.fetch_add(1, Ordering::Relaxed); } pub fn increment_received(&self) { self.messages_received.fetch_add(1, Ordering::Relaxed); } } /// Message interne du pool de connexions #[derive(Debug, Clone)] pub enum PoolMessage { Broadcast { room: Option, message: String, exclude_user: Option, }, DirectMessage { target_user: i32, message: String, }, JoinRoom { user_id: i32, room: String, }, LeaveRoom { user_id: i32, room: String, }, Disconnect { user_id: i32, }, } /// Statistiques du pool de connexions #[derive(Debug, Clone, Serialize)] pub struct PoolStats { pub active_connections: usize, pub total_connections_created: u64, pub total_messages_sent: u64, pub total_messages_received: u64, pub dead_connections_cleaned: u64, pub average_connection_duration: Duration, pub peak_connections: usize, pub rooms_count: usize, pub memory_usage_mb: f64, } /// Pool de connexions WebSocket haute performance pub struct ConnectionPool { config: ConnectionPoolConfig, connections: Arc>>>, senders: Arc>>, rooms: Arc>>, stats: Arc>, broadcast_sender: broadcast::Sender, metrics: Arc, is_running: Arc, } impl ConnectionPool { /// Crée un nouveau pool de connexions pub fn new(config: ConnectionPoolConfig, metrics: Arc) -> Self { let (broadcast_sender, _) = broadcast::channel(1000); Self { config, connections: Arc::new(DashMap::new()), senders: Arc::new(DashMap::new()), rooms: Arc::new(DashMap::new()), stats: Arc::new(RwLock::new(PoolStats { active_connections: 0, total_connections_created: 0, total_messages_sent: 0, total_messages_received: 0, dead_connections_cleaned: 0, average_connection_duration: Duration::ZERO, peak_connections: 0, rooms_count: 0, memory_usage_mb: 0.0, })), broadcast_sender, metrics, is_running: Arc::new(AtomicUsize::new(1)), } } /// Démarre les tâches de maintenance du pool pub async fn start_maintenance_tasks(&self) { let pool_clone = self.clone(); tokio::spawn(async move { pool_clone.heartbeat_loop().await; }); let pool_clone = self.clone(); tokio::spawn(async move { pool_clone.cleanup_loop().await; }); let pool_clone = self.clone(); tokio::spawn(async move { pool_clone.stats_update_loop().await; }); } /// Ajoute une nouvelle connexion au pool pub async fn add_connection( &self, user_id: i32, username: String, websocket: WebSocket, ) -> Result<()> { // Vérifier la limite de connexions if self.connections.len() >= self.config.max_connections { return Err(ChatError::configuration_error("Pool de connexions plein")); } // Créer les métadonnées de connexion let metadata = Arc::new(RwLock::new(ConnectionMetadata::new(user_id, username.clone()))); // Créer le canal de communication let (sender, mut receiver) = mpsc::unbounded_channel::(); // Cloner les variables nécessaires pour les tâches let metadata_clone = metadata.clone(); let sender_clone = sender.clone(); // Stocker la connexion self.connections.insert(user_id, metadata.clone()); self.senders.insert(user_id, sender); // Mettre à jour les statistiques { let mut stats = self.stats.write().await; stats.total_connections_created += 1; stats.active_connections = self.connections.len(); if stats.active_connections > stats.peak_connections { stats.peak_connections = stats.active_connections; } } // Métriques self.metrics.websocket_connected(user_id).await; tracing::info!( user_id = %user_id, username = %username, connection_id = %metadata.read().await.connection_id, total_connections = %self.connections.len(), "🔌 Nouvelle connexion ajoutée au pool" ); // Gérer la connexion WebSocket dans une tâche séparée let pool_clone = self.clone(); let metadata_clone_task = metadata_clone.clone(); tokio::spawn(async move { pool_clone.handle_connection(user_id, websocket, receiver, metadata_clone_task).await; }); Ok(()) } /// Gère une connexion WebSocket individuelle async fn handle_connection( &self, user_id: i32, websocket: WebSocket, mut receiver: mpsc::UnboundedReceiver, metadata: Arc>, ) { let (ws_sender, mut ws_receiver) = websocket.split(); let ws_sender = Arc::new(Mutex::new(ws_sender)); let ws_sender_clone = ws_sender.clone(); let metadata_send = metadata.clone(); let metadata_recv = metadata.clone(); // Tâche d'envoi de messages let send_task = tokio::spawn(async move { while let Some(message) = receiver.recv().await { let mut sender = ws_sender.lock().await; if let Err(e) = sender.send(Message::Text(message)).await { tracing::error!("Erreur envoi WebSocket: {}", e); break; } drop(sender); metadata_send.read().await.increment_sent(); } }); // Tâche de réception de messages let receive_task = tokio::spawn(async move { while let Some(message) = ws_receiver.next().await { match message { Ok(Message::Text(_text)) => { metadata_recv.read().await.increment_received(); // Traiter le message reçu ici } Ok(Message::Ping(data)) => { let mut sender = ws_sender_clone.lock().await; if let Err(e) = sender.send(Message::Pong(data)).await { tracing::error!("Erreur envoi Pong: {}", e); break; } } Ok(Message::Close(_)) => { tracing::info!("Connexion fermée par le client"); break; } Err(e) => { tracing::error!("Erreur WebSocket: {}", e); break; } _ => {} } } }); // Attendre la fin des tâches tokio::select! { _ = send_task => {}, _ = receive_task => {}, } // Nettoyer la connexion self.remove_connection(user_id).await; } /// Traite un message entrant async fn handle_incoming_message(&self, user_id: i32, message: String) { // Mettre à jour le timestamp du dernier message if let Some(metadata_ref) = self.connections.get(&user_id) { metadata_ref.write().await.last_message = Instant::now(); } // Traiter le message (déléguer au ChatHub) tracing::debug!(user_id = %user_id, message_len = %message.len(), "📨 Message reçu"); // Ici, vous pouvez ajouter la logique de traitement des messages // Par exemple, parser le JSON et router vers les handlers appropriés } /// Supprime une connexion du pool pub async fn remove_connection(&self, user_id: i32) { if let Some((_, metadata_ref)) = self.connections.remove(&user_id) { let metadata = metadata_ref.read().await; let duration = metadata.connection_duration(); tracing::info!( user_id = %user_id, username = %metadata.username, duration = ?duration, messages_sent = %metadata.messages_sent.load(Ordering::Relaxed), messages_received = %metadata.messages_received.load(Ordering::Relaxed), "🗑 Connexion supprimée du pool" ); } self.senders.remove(&user_id); // Retirer l'utilisateur de toutes les salles for mut entry in self.rooms.iter_mut() { entry.value_mut().retain(|&id| id != user_id); } // Supprimer les salles vides self.rooms.retain(|_, users| !users.is_empty()); // Mettre à jour les statistiques { let mut stats = self.stats.write().await; stats.active_connections = self.connections.len(); } // Métriques self.metrics.websocket_disconnected(user_id).await; } /// Envoie un message à un utilisateur spécifique pub async fn send_to_user(&self, user_id: i32, message: String) -> Result<()> { if let Some(sender) = self.senders.get(&user_id) { sender.send(message) .map_err(|e| ChatError::configuration_error(&format!("Erreur envoi message: {}", e)))?; // Incrémenter statistiques { let mut stats = self.stats.write().await; stats.total_messages_sent += 1; } } Ok(()) } /// Diffuse un message à tous les utilisateurs d'une salle pub async fn broadcast_to_room(&self, room: &str, message: String, exclude_user: Option) -> Result<()> { if let Some(users) = self.rooms.get(room) { let mut sent_count = 0; for &user_id in users.iter() { if exclude_user.map_or(true, |excluded| excluded != user_id) { if let Err(e) = self.send_to_user(user_id, message.clone()).await { tracing::warn!(user_id = %user_id, room = %room, error = %e, "⚠️ Échec envoi message"); } else { sent_count += 1; } } } tracing::debug!( room = %room, message_len = %message.len(), recipients = %sent_count, excluded_user = ?exclude_user, "📡 Message diffusé dans la salle" ); } Ok(()) } /// Ajoute un utilisateur à une salle pub async fn join_room(&self, user_id: i32, room: String) { self.rooms.entry(room.clone()).or_insert_with(Vec::new).push(user_id); // Mettre à jour les métadonnées de l'utilisateur if let Some(metadata_ref) = self.connections.get(&user_id) { metadata_ref.write().await.rooms.push(room.clone()); } tracing::info!(user_id = %user_id, room = %room, "🚪 Utilisateur rejoint la salle"); } /// Retire un utilisateur d'une salle pub async fn leave_room(&self, user_id: i32, room: &str) { if let Some(mut users) = self.rooms.get_mut(room) { users.retain(|&id| id != user_id); } // Mettre à jour les métadonnées de l'utilisateur if let Some(metadata_ref) = self.connections.get(&user_id) { metadata_ref.write().await.rooms.retain(|r| r != room); } tracing::info!(user_id = %user_id, room = %room, "🚪 Utilisateur quitte la salle"); } /// Boucle de heartbeat pour maintenir les connexions vivantes async fn heartbeat_loop(&self) { let mut interval = interval(self.config.heartbeat_interval); while self.is_running.load(Ordering::Relaxed) == 1 { interval.tick().await; let mut ping_count = 0; for entry in self.connections.iter() { let user_id = *entry.key(); if let Some(sender) = self.senders.get(&user_id) { // Envoyer un ping JSON let ping_message = r#"{"type":"ping","timestamp":{}}"#; let timestamp = chrono::Utc::now().timestamp(); let message = ping_message.replace("{}", ×tamp.to_string()); if sender.send(message).is_ok() { ping_count += 1; } } } if ping_count > 0 { tracing::debug!(ping_count = %ping_count, "🏓 Heartbeat envoyé"); } } } /// Boucle de nettoyage des connexions mortes async fn cleanup_loop(&self) { let mut interval = interval(self.config.cleanup_interval); while self.is_running.load(Ordering::Relaxed) == 1 { interval.tick().await; let mut dead_connections = Vec::new(); for entry in self.connections.iter() { let user_id = *entry.key(); let metadata = entry.value().read().await; if metadata.is_idle(self.config.idle_timeout) { dead_connections.push(user_id); } } for user_id in dead_connections { tracing::warn!(user_id = %user_id, "💀 Connexion morte détectée, suppression"); self.remove_connection(user_id).await; // Incrémenter statistiques { let mut stats = self.stats.write().await; stats.dead_connections_cleaned += 1; } } } } /// Boucle de mise à jour des statistiques async fn stats_update_loop(&self) { let mut interval = interval(Duration::from_secs(30)); while self.is_running.load(Ordering::Relaxed) == 1 { interval.tick().await; let mut stats = self.stats.write().await; stats.active_connections = self.connections.len(); stats.rooms_count = self.rooms.len(); // Calculer la durée moyenne de connexion let mut total_duration = Duration::ZERO; let mut count = 0; for entry in self.connections.iter() { let metadata = entry.value().read().await; total_duration += metadata.connection_duration(); count += 1; } if count > 0 { stats.average_connection_duration = total_duration / count as u32; } // Estimer l'utilisation mémoire (approximation) stats.memory_usage_mb = (self.connections.len() * 1024 + self.rooms.len() * 512) as f64 / 1024.0 / 1024.0; // Métriques self.metrics.active_users(stats.active_connections as u64).await; self.metrics.active_rooms(stats.rooms_count as u64).await; } } /// Obtient les statistiques du pool pub async fn get_stats(&self) -> PoolStats { self.stats.read().await.clone() } /// Arrête le pool de connexions pub async fn shutdown(&self) { self.is_running.store(0, Ordering::Relaxed); // Fermer toutes les connexions let user_ids: Vec = self.connections.iter().map(|entry| *entry.key()).collect(); for user_id in user_ids { self.remove_connection(user_id).await; } tracing::info!("🛑 Pool de connexions arrêté"); } } impl Clone for ConnectionPool { fn clone(&self) -> Self { Self { config: self.config.clone(), connections: self.connections.clone(), senders: self.senders.clone(), rooms: self.rooms.clone(), stats: self.stats.clone(), broadcast_sender: self.broadcast_sender.clone(), metrics: self.metrics.clone(), is_running: self.is_running.clone(), } } }