// file: stream_server/src/main.rs use stream_server::event_bus::RabbitMQEventBus; use stream_server::{ config::Config, AppState, }; use axum::serve; use metrics_exporter_prometheus::PrometheusBuilder; use std::{net::SocketAddr, sync::Arc, time::Duration}; use tokio::signal; #[tokio::main] async fn main() -> Result<(), Box> { // Initialisation du logging let is_prod = std::env::var("APP_ENV").unwrap_or_default() == "production"; let env_filter = tracing_subscriber::EnvFilter::try_from_default_env() .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info")); if is_prod { tracing_subscriber::fmt() .with_env_filter(env_filter) .json() .init(); } else { tracing_subscriber::fmt().with_env_filter(env_filter).init(); } // Initialisation des métriques Prometheus let builder = PrometheusBuilder::new(); let prometheus_handle = builder .install_recorder() .expect("failed to install Prometheus recorder"); // Chargement de la configuration let config = Config::from_env()?; // Initialisation de l'état de l'application let mut state = AppState::new(config.clone()).await?; // Utiliser mut ici // Initialisation de l'Event Bus RabbitMQ let event_bus = match RabbitMQEventBus::new_with_retry(config.rabbit_mq.clone()).await { Ok(eb) => { tracing::info!("✅ Event Bus RabbitMQ initialisé avec succès"); Some(eb) } Err(e) => { tracing::warn!("⚠️ Échec d'initialisation de l'Event Bus RabbitMQ: {}. Le serveur démarrera en mode dégradé (sans Event Bus).", e); None } }; state.event_bus = event_bus.map(Arc::new); // Ajouter l'event bus à l'état, wrapped in Arc // Démarrage du monitoring de santé state.health_monitor.start_monitoring().await; // Création du routeur let app = stream_server::routes::create_routes(state, Some(prometheus_handle)); // Démarrage du serveur let addr = SocketAddr::from(([0, 0, 0, 0], config.port)); tracing::info!("🚀 Stream Server démarré sur {}", addr); let listener = tokio::net::TcpListener::bind(addr).await?; axum::serve(listener, app) .with_graceful_shutdown(shutdown_signal(AppState::new(Config::from_env()?).await?)) .await?; Ok(()) } /// Handler de graceful shutdown qui ferme proprement les connexions async fn shutdown_signal(state: AppState) { let ctrl_c = async { signal::ctrl_c() .await .expect("Impossible d'installer le handler Ctrl+C"); }; #[cfg(unix)] let terminate = async { signal::unix::signal(signal::unix::SignalKind::terminate()) .expect("Impossible d'installer le handler SIGTERM") .recv() .await; }; #[cfg(not(unix))] let terminate = std::future::pending::<()>(); tokio::select! { _ = ctrl_c => { tracing::info!("📱 Signal Ctrl+C reçu, démarrage de l'arrêt gracieux..."); }, _ = terminate => { tracing::info!("📱 Signal SIGTERM reçu, démarrage de l'arrêt gracieux..."); } } // Démarrage du processus de shutdown gracieux tracing::info!("🛑 Arrêt gracieux du serveur en cours..."); // Temps maximum pour le shutdown (30 secondes) let shutdown_timeout = Duration::from_secs(30); let shutdown_start = std::time::Instant::now(); // 1. Fermer les connexions WebSocket tracing::info!("🔌 Fermeture des connexions WebSocket..."); state.websocket_manager.close_all_connections().await; // 4. Sauvegarder l'état si nécessaire tracing::info!("💾 Sauvegarde de l'état..."); let final_stats = state.websocket_manager.get_stats().await; tracing::info!("📈 Statistiques finales: {:?}", final_stats); let elapsed = shutdown_start.elapsed(); if elapsed < shutdown_timeout { tracing::info!("✅ Arrêt gracieux terminé en {:?}", elapsed); } else { tracing::warn!( "⚠️ Arrêt gracieux a pris plus de temps que prévu ({:?})", elapsed ); } }