veza/veza-chat-server/src/connection_pool.rs
2025-12-03 20:33:26 +01:00

568 lines
No EOL
19 KiB
Rust

//! Module ConnectionPool - Gestion avancée de 10k connexions WebSocket simultanées
//!
//! Ce module implémente un pool de connexions haute performance avec :
//! - Support 10000+ connexions simultanées
//! - Heartbeat intelligent avec timeout adaptatif
//! - Cleanup automatique des connexions mortes
//! - Monitoring en temps réel des connexions
//! - Load balancing des messages
use std::collections::HashMap;
use std::sync::{Arc, atomic::{AtomicUsize, Ordering}};
use std::time::{Duration, Instant};
use tokio::sync::{RwLock, mpsc, broadcast, Mutex};
use tokio::time::{interval, timeout};
use serde::{Serialize, Deserialize};
use dashmap::DashMap;
use uuid::Uuid;
use futures_util::{SinkExt, StreamExt};
use axum::extract::ws::{WebSocket, Message};
use tokio_tungstenite::tungstenite::Message;
use tokio_tungstenite::WebSocketStream;
use crate::error::{ChatError, Result};
use crate::monitoring::ChatMetrics;
/// Configuration du pool de connexions
#[derive(Debug, Clone)]
pub struct ConnectionPoolConfig {
/// Nombre maximum de connexions simultanées
pub max_connections: usize,
/// Délai entre les heartbeats
pub heartbeat_interval: Duration,
/// Timeout pour considérer une connexion morte
pub idle_timeout: Duration,
/// Intervalle de nettoyage des connexions mortes
pub cleanup_interval: Duration,
/// Taille du buffer pour les messages
pub message_buffer_size: usize,
/// Timeout pour l'envoi d'un message
pub send_timeout: Duration,
}
impl Default for ConnectionPoolConfig {
fn default() -> Self {
Self {
max_connections: 10000,
heartbeat_interval: Duration::from_secs(30),
idle_timeout: Duration::from_secs(120),
cleanup_interval: Duration::from_secs(60),
message_buffer_size: 1000,
send_timeout: Duration::from_secs(5),
}
}
}
/// Métadonnées d'une connexion WebSocket
#[derive(Debug)]
pub struct ConnectionMetadata {
pub connection_id: Uuid,
pub user_id: i32,
pub username: String,
pub connected_at: Instant,
pub last_heartbeat: Instant,
pub last_message: Instant,
pub messages_sent: AtomicUsize,
pub messages_received: AtomicUsize,
pub is_alive: bool,
pub rooms: Vec<String>,
pub user_agent: Option<String>,
pub ip_address: Option<String>,
}
impl ConnectionMetadata {
pub fn new(user_id: i32, username: String) -> Self {
let now = Instant::now();
Self {
connection_id: Uuid::new_v4(),
user_id,
username,
connected_at: now,
last_heartbeat: now,
last_message: now,
messages_sent: AtomicUsize::new(0),
messages_received: AtomicUsize::new(0),
is_alive: true,
rooms: Vec::new(),
user_agent: None,
ip_address: None,
}
}
pub fn is_idle(&self, timeout: Duration) -> bool {
self.last_heartbeat.elapsed() > timeout
}
pub fn connection_duration(&self) -> Duration {
self.connected_at.elapsed()
}
pub fn increment_sent(&self) {
self.messages_sent.fetch_add(1, Ordering::Relaxed);
}
pub fn increment_received(&self) {
self.messages_received.fetch_add(1, Ordering::Relaxed);
}
}
/// Message interne du pool de connexions
#[derive(Debug, Clone)]
pub enum PoolMessage {
Broadcast {
room: Option<String>,
message: String,
exclude_user: Option<i32>,
},
DirectMessage {
target_user: i32,
message: String,
},
JoinRoom {
user_id: i32,
room: String,
},
LeaveRoom {
user_id: i32,
room: String,
},
Disconnect {
user_id: i32,
},
}
/// Statistiques du pool de connexions
#[derive(Debug, Clone, Serialize)]
pub struct PoolStats {
pub active_connections: usize,
pub total_connections_created: u64,
pub total_messages_sent: u64,
pub total_messages_received: u64,
pub dead_connections_cleaned: u64,
pub average_connection_duration: Duration,
pub peak_connections: usize,
pub rooms_count: usize,
pub memory_usage_mb: f64,
}
/// Pool de connexions WebSocket haute performance
pub struct ConnectionPool {
config: ConnectionPoolConfig,
connections: Arc<DashMap<i32, Arc<RwLock<ConnectionMetadata>>>>,
senders: Arc<DashMap<i32, mpsc::UnboundedSender<String>>>,
rooms: Arc<DashMap<String, Vec<i32>>>,
stats: Arc<RwLock<PoolStats>>,
broadcast_sender: broadcast::Sender<PoolMessage>,
metrics: Arc<ChatMetrics>,
is_running: Arc<AtomicUsize>,
}
impl ConnectionPool {
/// Crée un nouveau pool de connexions
pub fn new(config: ConnectionPoolConfig, metrics: Arc<ChatMetrics>) -> Self {
let (broadcast_sender, _) = broadcast::channel(1000);
Self {
config,
connections: Arc::new(DashMap::new()),
senders: Arc::new(DashMap::new()),
rooms: Arc::new(DashMap::new()),
stats: Arc::new(RwLock::new(PoolStats {
active_connections: 0,
total_connections_created: 0,
total_messages_sent: 0,
total_messages_received: 0,
dead_connections_cleaned: 0,
average_connection_duration: Duration::ZERO,
peak_connections: 0,
rooms_count: 0,
memory_usage_mb: 0.0,
})),
broadcast_sender,
metrics,
is_running: Arc::new(AtomicUsize::new(1)),
}
}
/// Démarre les tâches de maintenance du pool
pub async fn start_maintenance_tasks(&self) {
let pool_clone = self.clone();
tokio::spawn(async move {
pool_clone.heartbeat_loop().await;
});
let pool_clone = self.clone();
tokio::spawn(async move {
pool_clone.cleanup_loop().await;
});
let pool_clone = self.clone();
tokio::spawn(async move {
pool_clone.stats_update_loop().await;
});
}
/// Ajoute une nouvelle connexion au pool
pub async fn add_connection(
&self,
user_id: i32,
username: String,
websocket: WebSocket,
) -> Result<()> {
// Vérifier la limite de connexions
if self.connections.len() >= self.config.max_connections {
return Err(ChatError::configuration_error("Pool de connexions plein"));
}
// Créer les métadonnées de connexion
let metadata = Arc::new(RwLock::new(ConnectionMetadata::new(user_id, username.clone())));
// Créer le canal de communication
let (sender, mut receiver) = mpsc::unbounded_channel::<String>();
// Cloner les variables nécessaires pour les tâches
let metadata_clone = metadata.clone();
let sender_clone = sender.clone();
// Stocker la connexion
self.connections.insert(user_id, metadata.clone());
self.senders.insert(user_id, sender);
// Mettre à jour les statistiques
{
let mut stats = self.stats.write().await;
stats.total_connections_created += 1;
stats.active_connections = self.connections.len();
if stats.active_connections > stats.peak_connections {
stats.peak_connections = stats.active_connections;
}
}
// Métriques
self.metrics.websocket_connected(user_id).await;
tracing::info!(
user_id = %user_id,
username = %username,
connection_id = %metadata.read().await.connection_id,
total_connections = %self.connections.len(),
"🔌 Nouvelle connexion ajoutée au pool"
);
// Gérer la connexion WebSocket dans une tâche séparée
let pool_clone = self.clone();
let metadata_clone_task = metadata_clone.clone();
tokio::spawn(async move {
pool_clone.handle_connection(user_id, websocket, receiver, metadata_clone_task).await;
});
Ok(())
}
/// Gère une connexion WebSocket individuelle
async fn handle_connection(
&self,
user_id: i32,
websocket: WebSocket,
mut receiver: mpsc::UnboundedReceiver<String>,
metadata: Arc<RwLock<ConnectionMetadata>>,
) {
let (ws_sender, mut ws_receiver) = websocket.split();
let ws_sender = Arc::new(Mutex::new(ws_sender));
let ws_sender_clone = ws_sender.clone();
let metadata_send = metadata.clone();
let metadata_recv = metadata.clone();
// Tâche d'envoi de messages
let send_task = tokio::spawn(async move {
while let Some(message) = receiver.recv().await {
let mut sender = ws_sender.lock().await;
if let Err(e) = sender.send(Message::Text(message)).await {
tracing::error!("Erreur envoi WebSocket: {}", e);
break;
}
drop(sender);
metadata_send.read().await.increment_sent();
}
});
// Tâche de réception de messages
let receive_task = tokio::spawn(async move {
while let Some(message) = ws_receiver.next().await {
match message {
Ok(Message::Text(_text)) => {
metadata_recv.read().await.increment_received();
// Traiter le message reçu ici
}
Ok(Message::Ping(data)) => {
let mut sender = ws_sender_clone.lock().await;
if let Err(e) = sender.send(Message::Pong(data)).await {
tracing::error!("Erreur envoi Pong: {}", e);
break;
}
}
Ok(Message::Close(_)) => {
tracing::info!("Connexion fermée par le client");
break;
}
Err(e) => {
tracing::error!("Erreur WebSocket: {}", e);
break;
}
_ => {}
}
}
});
// Attendre la fin des tâches
tokio::select! {
_ = send_task => {},
_ = receive_task => {},
}
// Nettoyer la connexion
self.remove_connection(user_id).await;
}
/// Traite un message entrant
async fn handle_incoming_message(&self, user_id: i32, message: String) {
// Mettre à jour le timestamp du dernier message
if let Some(metadata_ref) = self.connections.get(&user_id) {
metadata_ref.write().await.last_message = Instant::now();
}
// Traiter le message (déléguer au ChatHub)
tracing::debug!(user_id = %user_id, message_len = %message.len(), "📨 Message reçu");
// Ici, vous pouvez ajouter la logique de traitement des messages
// Par exemple, parser le JSON et router vers les handlers appropriés
}
/// Supprime une connexion du pool
pub async fn remove_connection(&self, user_id: i32) {
if let Some((_, metadata_ref)) = self.connections.remove(&user_id) {
let metadata = metadata_ref.read().await;
let duration = metadata.connection_duration();
tracing::info!(
user_id = %user_id,
username = %metadata.username,
duration = ?duration,
messages_sent = %metadata.messages_sent.load(Ordering::Relaxed),
messages_received = %metadata.messages_received.load(Ordering::Relaxed),
"🗑 Connexion supprimée du pool"
);
}
self.senders.remove(&user_id);
// Retirer l'utilisateur de toutes les salles
for mut entry in self.rooms.iter_mut() {
entry.value_mut().retain(|&id| id != user_id);
}
// Supprimer les salles vides
self.rooms.retain(|_, users| !users.is_empty());
// Mettre à jour les statistiques
{
let mut stats = self.stats.write().await;
stats.active_connections = self.connections.len();
}
// Métriques
self.metrics.websocket_disconnected(user_id).await;
}
/// Envoie un message à un utilisateur spécifique
pub async fn send_to_user(&self, user_id: i32, message: String) -> Result<()> {
if let Some(sender) = self.senders.get(&user_id) {
sender.send(message)
.map_err(|e| ChatError::configuration_error(&format!("Erreur envoi message: {}", e)))?;
// Incrémenter statistiques
{
let mut stats = self.stats.write().await;
stats.total_messages_sent += 1;
}
}
Ok(())
}
/// Diffuse un message à tous les utilisateurs d'une salle
pub async fn broadcast_to_room(&self, room: &str, message: String, exclude_user: Option<i32>) -> Result<()> {
if let Some(users) = self.rooms.get(room) {
let mut sent_count = 0;
for &user_id in users.iter() {
if exclude_user.map_or(true, |excluded| excluded != user_id) {
if let Err(e) = self.send_to_user(user_id, message.clone()).await {
tracing::warn!(user_id = %user_id, room = %room, error = %e, "⚠️ Échec envoi message");
} else {
sent_count += 1;
}
}
}
tracing::debug!(
room = %room,
message_len = %message.len(),
recipients = %sent_count,
excluded_user = ?exclude_user,
"📡 Message diffusé dans la salle"
);
}
Ok(())
}
/// Ajoute un utilisateur à une salle
pub async fn join_room(&self, user_id: i32, room: String) {
self.rooms.entry(room.clone()).or_insert_with(Vec::new).push(user_id);
// Mettre à jour les métadonnées de l'utilisateur
if let Some(metadata_ref) = self.connections.get(&user_id) {
metadata_ref.write().await.rooms.push(room.clone());
}
tracing::info!(user_id = %user_id, room = %room, "🚪 Utilisateur rejoint la salle");
}
/// Retire un utilisateur d'une salle
pub async fn leave_room(&self, user_id: i32, room: &str) {
if let Some(mut users) = self.rooms.get_mut(room) {
users.retain(|&id| id != user_id);
}
// Mettre à jour les métadonnées de l'utilisateur
if let Some(metadata_ref) = self.connections.get(&user_id) {
metadata_ref.write().await.rooms.retain(|r| r != room);
}
tracing::info!(user_id = %user_id, room = %room, "🚪 Utilisateur quitte la salle");
}
/// Boucle de heartbeat pour maintenir les connexions vivantes
async fn heartbeat_loop(&self) {
let mut interval = interval(self.config.heartbeat_interval);
while self.is_running.load(Ordering::Relaxed) == 1 {
interval.tick().await;
let mut ping_count = 0;
for entry in self.connections.iter() {
let user_id = *entry.key();
if let Some(sender) = self.senders.get(&user_id) {
// Envoyer un ping JSON
let ping_message = r#"{"type":"ping","timestamp":{}}"#;
let timestamp = chrono::Utc::now().timestamp();
let message = ping_message.replace("{}", &timestamp.to_string());
if sender.send(message).is_ok() {
ping_count += 1;
}
}
}
if ping_count > 0 {
tracing::debug!(ping_count = %ping_count, "🏓 Heartbeat envoyé");
}
}
}
/// Boucle de nettoyage des connexions mortes
async fn cleanup_loop(&self) {
let mut interval = interval(self.config.cleanup_interval);
while self.is_running.load(Ordering::Relaxed) == 1 {
interval.tick().await;
let mut dead_connections = Vec::new();
for entry in self.connections.iter() {
let user_id = *entry.key();
let metadata = entry.value().read().await;
if metadata.is_idle(self.config.idle_timeout) {
dead_connections.push(user_id);
}
}
for user_id in dead_connections {
tracing::warn!(user_id = %user_id, "💀 Connexion morte détectée, suppression");
self.remove_connection(user_id).await;
// Incrémenter statistiques
{
let mut stats = self.stats.write().await;
stats.dead_connections_cleaned += 1;
}
}
}
}
/// Boucle de mise à jour des statistiques
async fn stats_update_loop(&self) {
let mut interval = interval(Duration::from_secs(30));
while self.is_running.load(Ordering::Relaxed) == 1 {
interval.tick().await;
let mut stats = self.stats.write().await;
stats.active_connections = self.connections.len();
stats.rooms_count = self.rooms.len();
// Calculer la durée moyenne de connexion
let mut total_duration = Duration::ZERO;
let mut count = 0;
for entry in self.connections.iter() {
let metadata = entry.value().read().await;
total_duration += metadata.connection_duration();
count += 1;
}
if count > 0 {
stats.average_connection_duration = total_duration / count as u32;
}
// Estimer l'utilisation mémoire (approximation)
stats.memory_usage_mb = (self.connections.len() * 1024 + self.rooms.len() * 512) as f64 / 1024.0 / 1024.0;
// Métriques
self.metrics.active_users(stats.active_connections as u64).await;
self.metrics.active_rooms(stats.rooms_count as u64).await;
}
}
/// Obtient les statistiques du pool
pub async fn get_stats(&self) -> PoolStats {
self.stats.read().await.clone()
}
/// Arrête le pool de connexions
pub async fn shutdown(&self) {
self.is_running.store(0, Ordering::Relaxed);
// Fermer toutes les connexions
let user_ids: Vec<i32> = self.connections.iter().map(|entry| *entry.key()).collect();
for user_id in user_ids {
self.remove_connection(user_id).await;
}
tracing::info!("🛑 Pool de connexions arrêté");
}
}
impl Clone for ConnectionPool {
fn clone(&self) -> Self {
Self {
config: self.config.clone(),
connections: self.connections.clone(),
senders: self.senders.clone(),
rooms: self.rooms.clone(),
stats: self.stats.clone(),
broadcast_sender: self.broadcast_sender.clone(),
metrics: self.metrics.clone(),
is_running: self.is_running.clone(),
}
}
}