veza/veza-stream-server/src/main.rs
senke 37981c2c17 chore(refactor/sumi-migration): commit pending changes — tests, stream server, dist_verification
- apps/web: test updates (Vitest/setup), playbackAnalyticsService, TrackGrid, serviceErrorHandler
- veza-common: logging, metrics, traits, validation, random
- veza-stream-server: audio pipeline, codecs, cache, monitoring, routes
- apps/web/dist_verification: refresh build assets (content-hashed filenames)

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-02-13 19:39:18 +01:00

133 lines
4.8 KiB
Rust

// file: stream_server/src/main.rs
use stream_server::event_bus::RabbitMQEventBus;
use stream_server::{
config::Config,
AppState,
};
use metrics_exporter_prometheus::PrometheusBuilder;
use std::{net::SocketAddr, sync::Arc, time::Duration};
use tokio::signal;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// FIX #12, #24: Utiliser veza-common::logging pour configuration unifiée
// FIX #24: LOG_LEVEL est maintenant lu automatiquement par veza-common::logging
let is_prod = std::env::var("APP_ENV").unwrap_or_default() == "production";
// Configuration des fichiers de logs vers /var/log/veza/
let log_dir = std::env::var("LOG_DIR").unwrap_or_else(|_| "/var/log/veza".to_string());
let log_file = format!("{}/stream.log", log_dir);
let log_config = veza_common::logging::LoggingConfig {
// FIX #24: Laisser veza-common::logging normaliser LOG_LEVEL automatiquement
// Si LOG_LEVEL n'est pas défini, veza-common utilisera "INFO" par défaut
level: String::new(), // Vide = utiliser LOG_LEVEL ou RUST_LOG automatiquement
format: if is_prod { "json".to_string() } else { "text".to_string() },
file: Some(log_file),
max_size: 100 * 1024 * 1024, // 100MB
max_files: 5,
compress: true,
};
veza_common::logging::init_with_config(log_config)
.map_err(|e| format!("Failed to initialize logging: {}", e))?;
// Initialisation des métriques Prometheus
let builder = PrometheusBuilder::new();
let prometheus_handle = builder
.install_recorder()
.map_err(|e| format!("Failed to install Prometheus recorder: {}", e))?;
// Chargement de la configuration
let config = Config::from_env()?;
// Initialisation de l'état de l'application
let mut state = AppState::new(config.clone()).await?; // Utiliser mut ici
// Initialisation de l'Event Bus RabbitMQ
let event_bus = match RabbitMQEventBus::new_with_retry(config.rabbit_mq.clone()).await {
Ok(eb) => {
tracing::info!("✅ Event Bus RabbitMQ initialisé avec succès");
Some(eb)
}
Err(e) => {
tracing::warn!("⚠️ Échec d'initialisation de l'Event Bus RabbitMQ: {}. Le serveur démarrera en mode dégradé (sans Event Bus).", e);
None
}
};
state.event_bus = event_bus.map(Arc::new); // Ajouter l'event bus à l'état, wrapped in Arc
// Démarrage du monitoring de santé
state.health_monitor.start_monitoring().await;
// Création du routeur
let app = stream_server::routes::create_routes(state, Some(prometheus_handle));
// Démarrage du serveur
let addr = SocketAddr::from(([0, 0, 0, 0], config.port));
tracing::info!("🚀 Stream Server démarré sur {}", addr);
let listener = tokio::net::TcpListener::bind(addr).await?;
axum::serve(listener, app)
.with_graceful_shutdown(shutdown_signal(AppState::new(Config::from_env()?).await?))
.await?;
Ok(())
}
/// Handler de graceful shutdown qui ferme proprement les connexions
async fn shutdown_signal(state: AppState) {
let ctrl_c = async {
signal::ctrl_c()
.await
.expect("Impossible d'installer le handler Ctrl+C");
};
#[cfg(unix)]
let terminate = async {
signal::unix::signal(signal::unix::SignalKind::terminate())
.expect("Impossible d'installer le handler SIGTERM")
.recv()
.await;
};
#[cfg(not(unix))]
let terminate = std::future::pending::<()>();
tokio::select! {
_ = ctrl_c => {
tracing::info!("📱 Signal Ctrl+C reçu, démarrage de l'arrêt gracieux...");
},
_ = terminate => {
tracing::info!("📱 Signal SIGTERM reçu, démarrage de l'arrêt gracieux...");
}
}
// Démarrage du processus de shutdown gracieux
tracing::info!("🛑 Arrêt gracieux du serveur en cours...");
// Temps maximum pour le shutdown (30 secondes)
let shutdown_timeout = Duration::from_secs(30);
let shutdown_start = std::time::Instant::now();
// 1. Fermer les connexions WebSocket
tracing::info!("🔌 Fermeture des connexions WebSocket...");
state.websocket_manager.close_all_connections().await;
// 4. Sauvegarder l'état si nécessaire
tracing::info!("💾 Sauvegarde de l'état...");
let final_stats = state.websocket_manager.get_stats().await;
tracing::info!("📈 Statistiques finales: {:?}", final_stats);
let elapsed = shutdown_start.elapsed();
if elapsed < shutdown_timeout {
tracing::info!("✅ Arrêt gracieux terminé en {:?}", elapsed);
} else {
tracing::warn!(
"⚠️ Arrêt gracieux a pris plus de temps que prévu ({:?})",
elapsed
);
}
}