veza/veza-stream-server/src/prometheus_metrics.rs
2025-12-03 20:36:56 +01:00

726 lines
24 KiB
Rust

//! Métriques Prometheus avancées pour le serveur de streaming
//!
//! Ce module fournit une intégration complète avec Prometheus pour:
//! - Métriques de streaming audio (latence, débit, qualité)
//! - Métriques de codecs (encodage, décodage, transcodage)
//! - Métriques de performance (CPU, mémoire, réseau)
//! - Métriques d'erreurs et de sécurité
//! - Métriques business (utilisateurs, revenus, contenu)
// use crate::config::ServerConfig;
use crate::error::{AppError, Result};
use axum::{extract::State, http::StatusCode, response::Response, routing::get, Router};
// use metrics::{
// counter, gauge, histogram, register_counter, register_gauge, register_histogram, Counter,
// Gauge, Histogram,
// };
// use metrics_exporter_prometheus::{PrometheusBuilder, PrometheusHandle};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::RwLock;
// Note: Use tracing::info! macro directly instead of importing
/// Gestionnaire des métriques Prometheus pour le streaming
pub struct StreamPrometheusMetrics {
/// Handle Prometheus pour l'export
handle: PrometheusHandle,
/// Métriques de streaming audio
pub active_streams: Gauge,
pub streams_total: Counter,
pub streams_ended: Counter,
pub stream_duration_seconds: Histogram,
pub stream_bitrate_bps: Histogram,
pub stream_quality_changes: Counter,
/// Métriques de codecs
pub codec_encoding_duration_seconds: Histogram,
pub codec_decoding_duration_seconds: Histogram,
pub codec_transcoding_duration_seconds: Histogram,
pub codec_encoding_errors: Counter,
pub codec_decoding_errors: Counter,
pub codec_transcoding_errors: Counter,
/// Métriques de fichiers
pub files_uploaded_total: Counter,
pub files_deleted_total: Counter,
pub files_served_total: Counter,
pub file_upload_duration_seconds: Histogram,
pub file_size_bytes: Histogram,
/// Métriques de performance
pub audio_processing_duration_seconds: Histogram,
pub waveform_generation_duration_seconds: Histogram,
pub metadata_extraction_duration_seconds: Histogram,
pub cache_hit_ratio: Gauge,
pub cache_operations_total: Counter,
/// Métriques de réseau
pub bytes_streamed_total: Counter,
pub bytes_uploaded_total: Counter,
pub network_bandwidth_bps: Gauge,
pub connection_duration_seconds: Histogram,
/// Métriques d'erreurs
pub stream_errors_total: Counter,
pub file_errors_total: Counter,
pub codec_errors_total: Counter,
pub network_errors_total: Counter,
pub authentication_errors_total: Counter,
/// Métriques de sécurité
pub security_events_total: Counter,
pub rate_limits_triggered_total: Counter,
pub invalid_requests_total: Counter,
/// Métriques système
pub memory_usage_bytes: Gauge,
pub cpu_usage_percent: Gauge,
pub disk_usage_bytes: Gauge,
pub uptime_seconds: Gauge,
/// Métriques business
pub active_users: Gauge,
pub premium_users: Gauge,
pub total_playtime_seconds: Counter,
pub unique_tracks_played: Counter,
pub revenue_total: Counter,
}
impl StreamPrometheusMetrics {
/// Crée un nouveau gestionnaire de métriques Prometheus
pub fn new(config: &ServerConfig) -> Result<Self> {
// Configuration du builder Prometheus
let builder = PrometheusBuilder::new()
.with_http_listener(([0, 0, 0, 0], config.monitoring.metrics_port))
.map_err(|e| AppError::Configuration {
message: format!("Erreur configuration Prometheus: {e}"),
})?;
let handle = builder.install().map_err(|e| AppError::Configuration {
message: format!("Erreur installation Prometheus: {e}"),
})?;
tracing::info!(
metrics_port = %config.monitoring.metrics_port,
"📊 Métriques Prometheus configurées"
);
// Enregistrement des métriques
let active_streams = register_gauge!("active_streams", "Nombre de streams actifs");
let streams_total = register_counter!("streams_total", "Total des streams créés");
let streams_ended = register_counter!("streams_ended", "Total des streams terminés");
let stream_duration_seconds = register_histogram!(
"stream_duration_seconds",
"Durée des streams en secondes",
&["codec", "quality"]
);
let stream_bitrate_bps = register_histogram!(
"stream_bitrate_bps",
"Bitrate des streams en bps",
&["codec", "quality"]
);
let stream_quality_changes = register_counter!(
"stream_quality_changes_total",
"Total des changements de qualité"
);
let codec_encoding_duration_seconds = register_histogram!(
"codec_encoding_duration_seconds",
"Durée d'encodage des codecs",
&["codec", "quality"]
);
let codec_decoding_duration_seconds = register_histogram!(
"codec_decoding_duration_seconds",
"Durée de décodage des codecs",
&["codec"]
);
let codec_transcoding_duration_seconds = register_histogram!(
"codec_transcoding_duration_seconds",
"Durée de transcodage",
&["input_codec", "output_codec"]
);
let codec_encoding_errors = register_counter!(
"codec_encoding_errors_total",
"Erreurs d'encodage",
&["codec", "error_type"]
);
let codec_decoding_errors = register_counter!(
"codec_decoding_errors_total",
"Erreurs de décodage",
&["codec", "error_type"]
);
let codec_transcoding_errors = register_counter!(
"codec_transcoding_errors_total",
"Erreurs de transcodage",
&["input_codec", "output_codec", "error_type"]
);
let files_uploaded_total = register_counter!(
"files_uploaded_total",
"Total des fichiers uploadés",
&["codec", "quality"]
);
let files_deleted_total = register_counter!(
"files_deleted_total",
"Total des fichiers supprimés",
&["reason"]
);
let files_served_total = register_counter!(
"files_served_total",
"Total des fichiers servis",
&["codec", "quality"]
);
let file_upload_duration_seconds = register_histogram!(
"file_upload_duration_seconds",
"Durée d'upload des fichiers",
&["codec", "file_size_range"]
);
let file_size_bytes = register_histogram!(
"file_size_bytes",
"Taille des fichiers en bytes",
&["codec"]
);
let audio_processing_duration_seconds = register_histogram!(
"audio_processing_duration_seconds",
"Durée de traitement audio",
&["operation"]
);
let waveform_generation_duration_seconds = register_histogram!(
"waveform_generation_duration_seconds",
"Durée de génération de waveform"
);
let metadata_extraction_duration_seconds = register_histogram!(
"metadata_extraction_duration_seconds",
"Durée d'extraction de métadonnées",
&["codec"]
);
let cache_hit_ratio = register_gauge!("cache_hit_ratio", "Ratio de hits du cache");
let cache_operations_total = register_counter!(
"cache_operations_total",
"Total des opérations de cache",
&["operation", "result"]
);
let bytes_streamed_total = register_counter!(
"bytes_streamed_total",
"Total des bytes streamés",
&["codec", "quality"]
);
let bytes_uploaded_total = register_counter!(
"bytes_uploaded_total",
"Total des bytes uploadés",
&["codec"]
);
let network_bandwidth_bps =
register_gauge!("network_bandwidth_bps", "Bande passante réseau en bps");
let connection_duration_seconds = register_histogram!(
"connection_duration_seconds",
"Durée des connexions",
&["client_type"]
);
let stream_errors_total = register_counter!(
"stream_errors_total",
"Erreurs de streaming",
&["error_type", "codec"]
);
let file_errors_total = register_counter!(
"file_errors_total",
"Erreurs de fichiers",
&["error_type", "operation"]
);
let codec_errors_total = register_counter!(
"codec_errors_total",
"Erreurs de codecs",
&["codec", "error_type"]
);
let network_errors_total =
register_counter!("network_errors_total", "Erreurs réseau", &["error_type"]);
let authentication_errors_total =
register_counter!("authentication_errors_total", "Erreurs d'authentification");
let security_events_total = register_counter!(
"security_events_total",
"Événements de sécurité",
&["event_type"]
);
let rate_limits_triggered_total = register_counter!(
"rate_limits_triggered_total",
"Rate limits déclenchés",
&["limit_type"]
);
let invalid_requests_total = register_counter!(
"invalid_requests_total",
"Requêtes invalides",
&["request_type"]
);
let memory_usage_bytes =
register_gauge!("memory_usage_bytes", "Utilisation mémoire en bytes");
let cpu_usage_percent =
register_gauge!("cpu_usage_percent", "Utilisation CPU en pourcentage");
let disk_usage_bytes = register_gauge!("disk_usage_bytes", "Utilisation disque en bytes");
let uptime_seconds =
register_gauge!("uptime_seconds", "Temps de fonctionnement en secondes");
let active_users = register_gauge!("active_users", "Utilisateurs actifs");
let premium_users = register_gauge!("premium_users", "Utilisateurs premium");
let total_playtime_seconds =
register_counter!("total_playtime_seconds", "Temps total d'écoute en secondes");
let unique_tracks_played =
register_counter!("unique_tracks_played_total", "Pistes uniques jouées");
let revenue_total = register_counter!("revenue_total", "Revenus totaux", &["currency"]);
Ok(Self {
handle,
active_streams,
streams_total,
streams_ended,
stream_duration_seconds,
stream_bitrate_bps,
stream_quality_changes,
codec_encoding_duration_seconds,
codec_decoding_duration_seconds,
codec_transcoding_duration_seconds,
codec_encoding_errors,
codec_decoding_errors,
codec_transcoding_errors,
files_uploaded_total,
files_deleted_total,
files_served_total,
file_upload_duration_seconds,
file_size_bytes,
audio_processing_duration_seconds,
waveform_generation_duration_seconds,
metadata_extraction_duration_seconds,
cache_hit_ratio,
cache_operations_total,
bytes_streamed_total,
bytes_uploaded_total,
network_bandwidth_bps,
connection_duration_seconds,
stream_errors_total,
file_errors_total,
codec_errors_total,
network_errors_total,
authentication_errors_total,
security_events_total,
rate_limits_triggered_total,
invalid_requests_total,
memory_usage_bytes,
cpu_usage_percent,
disk_usage_bytes,
uptime_seconds,
active_users,
premium_users,
total_playtime_seconds,
unique_tracks_played,
revenue_total,
})
}
/// Met à jour le nombre de streams actifs
pub fn update_active_streams(&self, count: u64) {
self.active_streams.set(count as f64);
}
/// Enregistre un nouveau stream
pub fn record_stream_started(&self, codec: &str, quality: &str) {
counter!(
"streams_total",
1,
&[("codec", codec), ("quality", quality)]
);
}
/// Enregistre la fin d'un stream
pub fn record_stream_ended(&self, codec: &str, quality: &str, duration_seconds: f64) {
counter!(
"streams_ended",
1,
&[("codec", codec), ("quality", quality)]
);
self.stream_duration_seconds
.record(duration_seconds, &[codec, quality]);
}
/// Enregistre le bitrate d'un stream
pub fn record_stream_bitrate(&self, codec: &str, quality: &str, bitrate_bps: u32) {
self.stream_bitrate_bps
.record(bitrate_bps as f64, &[codec, quality]);
}
/// Enregistre un changement de qualité
pub fn record_quality_change(&self) {
self.stream_quality_changes.increment(1);
}
/// Enregistre la durée d'encodage
pub fn record_encoding_duration(&self, codec: &str, quality: &str, duration: Duration) {
self.codec_encoding_duration_seconds
.record(duration.as_secs_f64(), &[codec, quality]);
}
/// Enregistre la durée de décodage
pub fn record_decoding_duration(&self, codec: &str, duration: Duration) {
self.codec_decoding_duration_seconds
.record(duration.as_secs_f64(), &[codec]);
}
/// Enregistre la durée de transcodage
pub fn record_transcoding_duration(
&self,
input_codec: &str,
output_codec: &str,
duration: Duration,
) {
self.codec_transcoding_duration_seconds
.record(duration.as_secs_f64(), &[input_codec, output_codec]);
}
/// Enregistre une erreur d'encodage
pub fn record_encoding_error(&self, codec: &str, error_type: &str) {
counter!(
"codec_encoding_errors_total",
1,
&[("codec", codec), ("error_type", error_type)]
);
}
/// Enregistre une erreur de décodage
pub fn record_decoding_error(&self, codec: &str, error_type: &str) {
counter!(
"codec_decoding_errors_total",
1,
&[("codec", codec), ("error_type", error_type)]
);
}
/// Enregistre une erreur de transcodage
pub fn record_transcoding_error(
&self,
input_codec: &str,
output_codec: &str,
error_type: &str,
) {
counter!(
"codec_transcoding_errors_total",
1,
&[
("input_codec", input_codec),
("output_codec", output_codec),
("error_type", error_type)
]
);
}
/// Enregistre l'upload d'un fichier
pub fn record_file_uploaded(
&self,
codec: &str,
quality: &str,
file_size_bytes: u64,
duration: Duration,
) {
counter!(
"files_uploaded_total",
1,
&[("codec", codec), ("quality", quality)]
);
self.file_upload_duration_seconds.record(
duration.as_secs_f64(),
&[codec, &self.get_file_size_range(file_size_bytes)],
);
self.file_size_bytes
.record(file_size_bytes as f64, &[codec]);
}
/// Enregistre la suppression d'un fichier
pub fn record_file_deleted(&self, reason: &str) {
counter!("files_deleted_total", 1, &[("reason", reason)]);
}
/// Enregistre le service d'un fichier
pub fn record_file_served(&self, codec: &str, quality: &str) {
counter!(
"files_served_total",
1,
&[("codec", codec), ("quality", quality)]
);
}
/// Enregistre la durée de traitement audio
pub fn record_audio_processing_duration(&self, operation: &str, duration: Duration) {
self.audio_processing_duration_seconds
.record(duration.as_secs_f64(), &[operation]);
}
/// Enregistre la durée de génération de waveform
pub fn record_waveform_generation_duration(&self, duration: Duration) {
self.waveform_generation_duration_seconds
.record(duration.as_secs_f64());
}
/// Enregistre la durée d'extraction de métadonnées
pub fn record_metadata_extraction_duration(&self, codec: &str, duration: Duration) {
self.metadata_extraction_duration_seconds
.record(duration.as_secs_f64(), &[codec]);
}
/// Met à jour le ratio de hits du cache
pub fn update_cache_hit_ratio(&self, ratio: f64) {
self.cache_hit_ratio.set(ratio);
}
/// Enregistre une opération de cache
pub fn record_cache_operation(&self, operation: &str, result: &str) {
counter!(
"cache_operations_total",
1,
&[("operation", operation), ("result", result)]
);
}
/// Enregistre des bytes streamés
pub fn record_bytes_streamed(&self, codec: &str, quality: &str, bytes: u64) {
counter!(
"bytes_streamed_total",
bytes,
&[("codec", codec), ("quality", quality)]
);
}
/// Enregistre des bytes uploadés
pub fn record_bytes_uploaded(&self, codec: &str, bytes: u64) {
counter!("bytes_uploaded_total", bytes, &[("codec", codec)]);
}
/// Met à jour la bande passante réseau
pub fn update_network_bandwidth(&self, bandwidth_bps: u64) {
self.network_bandwidth_bps.set(bandwidth_bps as f64);
}
/// Enregistre la durée d'une connexion
pub fn record_connection_duration(&self, client_type: &str, duration: Duration) {
self.connection_duration_seconds
.record(duration.as_secs_f64(), &[client_type]);
}
/// Enregistre une erreur de streaming
pub fn record_stream_error(&self, error_type: &str, codec: &str) {
counter!(
"stream_errors_total",
1,
&[("error_type", error_type), ("codec", codec)]
);
}
/// Enregistre une erreur de fichier
pub fn record_file_error(&self, error_type: &str, operation: &str) {
counter!(
"file_errors_total",
1,
&[("error_type", error_type), ("operation", operation)]
);
}
/// Enregistre une erreur de codec
pub fn record_codec_error(&self, codec: &str, error_type: &str) {
counter!(
"codec_errors_total",
1,
&[("codec", codec), ("error_type", error_type)]
);
}
/// Enregistre une erreur réseau
pub fn record_network_error(&self, error_type: &str) {
counter!("network_errors_total", 1, &[("error_type", error_type)]);
}
/// Enregistre une erreur d'authentification
pub fn record_authentication_error(&self) {
self.authentication_errors_total.increment(1);
}
/// Enregistre un événement de sécurité
pub fn record_security_event(&self, event_type: &str) {
counter!("security_events_total", 1, &[("event_type", event_type)]);
}
/// Enregistre un rate limit déclenché
pub fn record_rate_limit_triggered(&self, limit_type: &str) {
counter!(
"rate_limits_triggered_total",
1,
&[("limit_type", limit_type)]
);
}
/// Enregistre une requête invalide
pub fn record_invalid_request(&self, request_type: &str) {
counter!(
"invalid_requests_total",
1,
&[("request_type", request_type)]
);
}
/// Met à jour les métriques système
pub fn update_system_metrics(
&self,
memory_bytes: u64,
cpu_percent: f64,
disk_bytes: u64,
uptime_seconds: u64,
) {
self.memory_usage_bytes.set(memory_bytes as f64);
self.cpu_usage_percent.set(cpu_percent);
self.disk_usage_bytes.set(disk_bytes as f64);
self.uptime_seconds.set(uptime_seconds as f64);
}
/// Met à jour les métriques business
pub fn update_business_metrics(
&self,
active_users: u64,
premium_users: u64,
playtime_seconds: u64,
unique_tracks: u64,
revenue: f64,
) {
self.active_users.set(active_users as f64);
self.premium_users.set(premium_users as f64);
counter!("total_playtime_seconds", playtime_seconds);
counter!("unique_tracks_played_total", unique_tracks);
counter!("revenue_total", revenue as u64, &[("currency", "USD")]);
}
/// Obtient le handle Prometheus pour l'export
pub fn get_handle(&self) -> &PrometheusHandle {
&self.handle
}
/// Génère les métriques au format Prometheus
pub fn gather_metrics(&self) -> String {
self.handle.render()
}
/// Détermine la plage de taille de fichier pour les métriques
fn get_file_size_range(&self, size_bytes: u64) -> &'static str {
match size_bytes {
0..=1024 => "0-1KB",
1025..=10240 => "1-10KB",
10241..=102400 => "10-100KB",
102401..=1048576 => "100KB-1MB",
1048577..=10485760 => "1-10MB",
10485761..=104857600 => "10-100MB",
_ => "100MB+",
}
}
}
/// Wrapper pour mesurer le temps d'exécution d'une opération
pub struct MetricsTimer {
start: Instant,
histogram: Histogram,
labels: Vec<String>,
}
impl MetricsTimer {
pub fn new(histogram: Histogram, labels: Vec<String>) -> Self {
Self {
start: Instant::now(),
histogram,
labels,
}
}
pub fn finish(self) {
let duration = self.start.elapsed();
let label_refs: Vec<&str> = self.labels.iter().map(|s| s.as_str()).collect();
self.histogram.record(duration.as_secs_f64(), &label_refs);
}
}
/// Fonction utilitaire pour créer un timer d'encodage
pub fn time_encoding(
metrics: &StreamPrometheusMetrics,
codec: &str,
quality: &str,
) -> MetricsTimer {
let labels = vec![codec.to_string(), quality.to_string()];
MetricsTimer::new(metrics.codec_encoding_duration_seconds.clone(), labels)
}
/// Fonction utilitaire pour créer un timer de décodage
pub fn time_decoding(metrics: &StreamPrometheusMetrics, codec: &str) -> MetricsTimer {
let labels = vec![codec.to_string()];
MetricsTimer::new(metrics.codec_decoding_duration_seconds.clone(), labels)
}
/// Fonction utilitaire pour créer un timer de transcodage
pub fn time_transcoding(
metrics: &StreamPrometheusMetrics,
input_codec: &str,
output_codec: &str,
) -> MetricsTimer {
let labels = vec![input_codec.to_string(), output_codec.to_string()];
MetricsTimer::new(metrics.codec_transcoding_duration_seconds.clone(), labels)
}
/// Handler pour l'endpoint de métriques Prometheus
pub async fn metrics_handler(
State(metrics): State<Arc<StreamPrometheusMetrics>>,
) -> Response<String> {
let metrics_data = metrics.gather_metrics();
tracing::debug!("📊 Métriques Prometheus exportées");
Response::builder()
.status(StatusCode::OK)
.header("Content-Type", "text/plain; version=0.0.4; charset=utf-8")
.body(metrics_data)
.unwrap()
}
/// Crée un routeur pour les endpoints de métriques
pub fn create_metrics_router(metrics: Arc<StreamPrometheusMetrics>) -> Router {
Router::new()
.route("/metrics", get(metrics_handler))
.with_state(metrics)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::config::ServerConfig;
#[test]
fn test_metrics_timer() {
let histogram = register_histogram!("test_histogram", "Test histogram");
let timer = MetricsTimer::new(histogram, vec!["test".to_string()]);
// Simuler une opération
std::thread::sleep(std::time::Duration::from_millis(10));
timer.finish();
}
#[test]
fn test_file_size_range() {
let config = ServerConfig::default();
let metrics = StreamPrometheusMetrics::new(&config).unwrap();
assert_eq!(metrics.get_file_size_range(500), "0-1KB");
assert_eq!(metrics.get_file_size_range(5000), "1-10KB");
assert_eq!(metrics.get_file_size_range(50000), "10-100KB");
assert_eq!(metrics.get_file_size_range(500000), "100KB-1MB");
assert_eq!(metrics.get_file_size_range(5000000), "1-10MB");
assert_eq!(metrics.get_file_size_range(50000000), "10-100MB");
assert_eq!(metrics.get_file_size_range(500000000), "100MB+");
}
}