chore(stream): remove orphan modules and artifacts
- Remove src/eventbus/ directory (orphan — event_bus.rs is the active module) - Remove src/prometheus_metrics.rs (orphan duplicate — monitoring/prometheus_metrics.rs is active) - Remove src/core/sync.rs_test_snippet (leftover artifact) Stream server compiles with zero errors. Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
parent
9b65e40952
commit
101cbd0f48
3 changed files with 0 additions and 867 deletions
|
|
@ -1,51 +0,0 @@
|
|||
|
||||
#[tokio::test]
|
||||
async fn test_sync_state_machine_initialization() {
|
||||
// Setup
|
||||
let sent_inits = Arc::new(Mutex::new(Vec::new()));
|
||||
let transport = Arc::new(MockTransport {
|
||||
sent_adjustments: Arc::new(Mutex::new(Vec::new())),
|
||||
sent_pings: Arc::new(Mutex::new(Vec::new())),
|
||||
mock_stats: Arc::new(Mutex::new(HashMap::new())),
|
||||
sent_inits: sent_inits.clone(),
|
||||
sent_stables: Arc::new(Mutex::new(Vec::new())),
|
||||
});
|
||||
|
||||
let config = Arc::new(RwLock::new(SyncConfig::default()));
|
||||
let time_server = Arc::new(TimeServer::new(vec![]).await.unwrap());
|
||||
let drift_compensator = Arc::new(DriftCompensator::new());
|
||||
|
||||
let engine = SyncEngine::new(
|
||||
time_server.clone(),
|
||||
drift_compensator,
|
||||
config,
|
||||
Some(transport)
|
||||
);
|
||||
|
||||
let stream_id = Uuid::new_v4();
|
||||
let listener_id = Uuid::new_v4();
|
||||
let mut listener = create_test_listener(listener_id, Some(0)); // Helper from existing tests
|
||||
listener.sync_state = SyncState::Desynchronized; // Start desync
|
||||
|
||||
// Mock listeners map
|
||||
let listeners = Arc::new(DashMap::new());
|
||||
listeners.insert(listener_id, listener);
|
||||
|
||||
// Run sync
|
||||
let track_id = Some("test_track".to_string());
|
||||
let result = engine.sync_listeners(stream_id, track_id, listeners.clone()).await;
|
||||
|
||||
assert!(result.is_ok());
|
||||
|
||||
// Verify SyncInit sent
|
||||
let inits = sent_inits.lock().unwrap();
|
||||
assert_eq!(inits.len(), 1);
|
||||
assert_eq!(inits[0], listener_id);
|
||||
|
||||
// Verify state changed to Calibrating
|
||||
let updated_listener = listeners.get(&listener_id).unwrap();
|
||||
match updated_listener.sync_state {
|
||||
SyncState::Calibrating { .. } => (), // Success
|
||||
_ => panic!("State should be Calibrating, got {:?}", updated_listener.sync_state),
|
||||
}
|
||||
}
|
||||
|
|
@ -1,90 +0,0 @@
|
|||
/// Module Event Bus NATS pour communication asynchrone
|
||||
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::RwLock;
|
||||
// Note: Use tracing::info! macro directly instead of importing
|
||||
use serde::{Serialize, Deserialize};
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::error::AppError;
|
||||
use crate::core::StreamEvent;
|
||||
|
||||
/// Configuration NATS (simulation)
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct NatsConfig {
|
||||
pub servers: Vec<String>,
|
||||
pub cluster_name: String,
|
||||
pub client_id: String,
|
||||
}
|
||||
|
||||
/// Event Bus principal
|
||||
#[derive(Debug)]
|
||||
pub struct EventBus {
|
||||
config: NatsConfig,
|
||||
metrics: Arc<RwLock<EventBusMetrics>>,
|
||||
}
|
||||
|
||||
/// Métriques de l'Event Bus
|
||||
#[derive(Debug, Clone, Default)]
|
||||
pub struct EventBusMetrics {
|
||||
pub events_published: u64,
|
||||
pub events_received: u64,
|
||||
pub events_failed: u64,
|
||||
pub throughput_per_second: f64,
|
||||
}
|
||||
|
||||
impl Default for NatsConfig {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
servers: vec!["nats://localhost:4222".to_string()],
|
||||
cluster_name: "veza-cluster".to_string(),
|
||||
client_id: format!("stream-server-{}", Uuid::new_v4()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl EventBus {
|
||||
pub async fn new(config: NatsConfig) -> Result<Self, AppError> {
|
||||
tracing::info!("🚀 Initialisation Event Bus NATS");
|
||||
|
||||
Ok(Self {
|
||||
config,
|
||||
metrics: Arc::new(RwLock::new(EventBusMetrics::default())),
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn start(&self) -> Result<(), AppError> {
|
||||
tracing::info!("🔄 Démarrage Event Bus NATS");
|
||||
|
||||
// Simulation de connexion NATS
|
||||
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
|
||||
|
||||
tracing::info!("✅ Event Bus démarré avec succès");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn publish_event(&self, event: StreamEvent) -> Result<(), AppError> {
|
||||
tracing::debug!("📤 Publication événement: {:?}", event);
|
||||
|
||||
// Simulation de publication
|
||||
tokio::time::sleep(std::time::Duration::from_millis(5)).await;
|
||||
|
||||
let mut metrics = self.metrics.write().await;
|
||||
metrics.events_published += 1;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn get_metrics(&self) -> EventBusMetrics {
|
||||
self.metrics.read().await.clone()
|
||||
}
|
||||
}
|
||||
|
||||
/// Événement business pour communication inter-services
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct BusinessEvent {
|
||||
pub id: Uuid,
|
||||
pub event_type: String,
|
||||
pub timestamp: std::time::SystemTime,
|
||||
pub source: String,
|
||||
}
|
||||
|
|
@ -1,726 +0,0 @@
|
|||
//! Métriques Prometheus avancées pour le serveur de streaming
|
||||
//!
|
||||
//! Ce module fournit une intégration complète avec Prometheus pour:
|
||||
//! - Métriques de streaming audio (latence, débit, qualité)
|
||||
//! - Métriques de codecs (encodage, décodage, transcodage)
|
||||
//! - Métriques de performance (CPU, mémoire, réseau)
|
||||
//! - Métriques d'erreurs et de sécurité
|
||||
//! - Métriques business (utilisateurs, revenus, contenu)
|
||||
|
||||
// use crate::config::ServerConfig;
|
||||
use crate::error::{AppError, Result};
|
||||
use axum::{extract::State, http::StatusCode, response::Response, routing::get, Router};
|
||||
// use metrics::{
|
||||
// counter, gauge, histogram, register_counter, register_gauge, register_histogram, Counter,
|
||||
// Gauge, Histogram,
|
||||
// };
|
||||
// use metrics_exporter_prometheus::{PrometheusBuilder, PrometheusHandle};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
use std::time::{Duration, Instant};
|
||||
use tokio::sync::RwLock;
|
||||
// Note: Use tracing::info! macro directly instead of importing
|
||||
|
||||
/// Gestionnaire des métriques Prometheus pour le streaming
|
||||
pub struct StreamPrometheusMetrics {
|
||||
/// Handle Prometheus pour l'export
|
||||
handle: PrometheusHandle,
|
||||
|
||||
/// Métriques de streaming audio
|
||||
pub active_streams: Gauge,
|
||||
pub streams_total: Counter,
|
||||
pub streams_ended: Counter,
|
||||
pub stream_duration_seconds: Histogram,
|
||||
pub stream_bitrate_bps: Histogram,
|
||||
pub stream_quality_changes: Counter,
|
||||
|
||||
/// Métriques de codecs
|
||||
pub codec_encoding_duration_seconds: Histogram,
|
||||
pub codec_decoding_duration_seconds: Histogram,
|
||||
pub codec_transcoding_duration_seconds: Histogram,
|
||||
pub codec_encoding_errors: Counter,
|
||||
pub codec_decoding_errors: Counter,
|
||||
pub codec_transcoding_errors: Counter,
|
||||
|
||||
/// Métriques de fichiers
|
||||
pub files_uploaded_total: Counter,
|
||||
pub files_deleted_total: Counter,
|
||||
pub files_served_total: Counter,
|
||||
pub file_upload_duration_seconds: Histogram,
|
||||
pub file_size_bytes: Histogram,
|
||||
|
||||
/// Métriques de performance
|
||||
pub audio_processing_duration_seconds: Histogram,
|
||||
pub waveform_generation_duration_seconds: Histogram,
|
||||
pub metadata_extraction_duration_seconds: Histogram,
|
||||
pub cache_hit_ratio: Gauge,
|
||||
pub cache_operations_total: Counter,
|
||||
|
||||
/// Métriques de réseau
|
||||
pub bytes_streamed_total: Counter,
|
||||
pub bytes_uploaded_total: Counter,
|
||||
pub network_bandwidth_bps: Gauge,
|
||||
pub connection_duration_seconds: Histogram,
|
||||
|
||||
/// Métriques d'erreurs
|
||||
pub stream_errors_total: Counter,
|
||||
pub file_errors_total: Counter,
|
||||
pub codec_errors_total: Counter,
|
||||
pub network_errors_total: Counter,
|
||||
pub authentication_errors_total: Counter,
|
||||
|
||||
/// Métriques de sécurité
|
||||
pub security_events_total: Counter,
|
||||
pub rate_limits_triggered_total: Counter,
|
||||
pub invalid_requests_total: Counter,
|
||||
|
||||
/// Métriques système
|
||||
pub memory_usage_bytes: Gauge,
|
||||
pub cpu_usage_percent: Gauge,
|
||||
pub disk_usage_bytes: Gauge,
|
||||
pub uptime_seconds: Gauge,
|
||||
|
||||
/// Métriques business
|
||||
pub active_users: Gauge,
|
||||
pub premium_users: Gauge,
|
||||
pub total_playtime_seconds: Counter,
|
||||
pub unique_tracks_played: Counter,
|
||||
pub revenue_total: Counter,
|
||||
}
|
||||
|
||||
impl StreamPrometheusMetrics {
|
||||
/// Crée un nouveau gestionnaire de métriques Prometheus
|
||||
pub fn new(config: &ServerConfig) -> Result<Self> {
|
||||
// Configuration du builder Prometheus
|
||||
let builder = PrometheusBuilder::new()
|
||||
.with_http_listener(([0, 0, 0, 0], config.monitoring.metrics_port))
|
||||
.map_err(|e| AppError::Configuration {
|
||||
message: format!("Erreur configuration Prometheus: {e}"),
|
||||
})?;
|
||||
|
||||
let handle = builder.install().map_err(|e| AppError::Configuration {
|
||||
message: format!("Erreur installation Prometheus: {e}"),
|
||||
})?;
|
||||
|
||||
tracing::info!(
|
||||
metrics_port = %config.monitoring.metrics_port,
|
||||
"📊 Métriques Prometheus configurées"
|
||||
);
|
||||
|
||||
// Enregistrement des métriques
|
||||
let active_streams = register_gauge!("active_streams", "Nombre de streams actifs");
|
||||
let streams_total = register_counter!("streams_total", "Total des streams créés");
|
||||
let streams_ended = register_counter!("streams_ended", "Total des streams terminés");
|
||||
let stream_duration_seconds = register_histogram!(
|
||||
"stream_duration_seconds",
|
||||
"Durée des streams en secondes",
|
||||
&["codec", "quality"]
|
||||
);
|
||||
let stream_bitrate_bps = register_histogram!(
|
||||
"stream_bitrate_bps",
|
||||
"Bitrate des streams en bps",
|
||||
&["codec", "quality"]
|
||||
);
|
||||
let stream_quality_changes = register_counter!(
|
||||
"stream_quality_changes_total",
|
||||
"Total des changements de qualité"
|
||||
);
|
||||
|
||||
let codec_encoding_duration_seconds = register_histogram!(
|
||||
"codec_encoding_duration_seconds",
|
||||
"Durée d'encodage des codecs",
|
||||
&["codec", "quality"]
|
||||
);
|
||||
let codec_decoding_duration_seconds = register_histogram!(
|
||||
"codec_decoding_duration_seconds",
|
||||
"Durée de décodage des codecs",
|
||||
&["codec"]
|
||||
);
|
||||
let codec_transcoding_duration_seconds = register_histogram!(
|
||||
"codec_transcoding_duration_seconds",
|
||||
"Durée de transcodage",
|
||||
&["input_codec", "output_codec"]
|
||||
);
|
||||
let codec_encoding_errors = register_counter!(
|
||||
"codec_encoding_errors_total",
|
||||
"Erreurs d'encodage",
|
||||
&["codec", "error_type"]
|
||||
);
|
||||
let codec_decoding_errors = register_counter!(
|
||||
"codec_decoding_errors_total",
|
||||
"Erreurs de décodage",
|
||||
&["codec", "error_type"]
|
||||
);
|
||||
let codec_transcoding_errors = register_counter!(
|
||||
"codec_transcoding_errors_total",
|
||||
"Erreurs de transcodage",
|
||||
&["input_codec", "output_codec", "error_type"]
|
||||
);
|
||||
|
||||
let files_uploaded_total = register_counter!(
|
||||
"files_uploaded_total",
|
||||
"Total des fichiers uploadés",
|
||||
&["codec", "quality"]
|
||||
);
|
||||
let files_deleted_total = register_counter!(
|
||||
"files_deleted_total",
|
||||
"Total des fichiers supprimés",
|
||||
&["reason"]
|
||||
);
|
||||
let files_served_total = register_counter!(
|
||||
"files_served_total",
|
||||
"Total des fichiers servis",
|
||||
&["codec", "quality"]
|
||||
);
|
||||
let file_upload_duration_seconds = register_histogram!(
|
||||
"file_upload_duration_seconds",
|
||||
"Durée d'upload des fichiers",
|
||||
&["codec", "file_size_range"]
|
||||
);
|
||||
let file_size_bytes = register_histogram!(
|
||||
"file_size_bytes",
|
||||
"Taille des fichiers en bytes",
|
||||
&["codec"]
|
||||
);
|
||||
|
||||
let audio_processing_duration_seconds = register_histogram!(
|
||||
"audio_processing_duration_seconds",
|
||||
"Durée de traitement audio",
|
||||
&["operation"]
|
||||
);
|
||||
let waveform_generation_duration_seconds = register_histogram!(
|
||||
"waveform_generation_duration_seconds",
|
||||
"Durée de génération de waveform"
|
||||
);
|
||||
let metadata_extraction_duration_seconds = register_histogram!(
|
||||
"metadata_extraction_duration_seconds",
|
||||
"Durée d'extraction de métadonnées",
|
||||
&["codec"]
|
||||
);
|
||||
let cache_hit_ratio = register_gauge!("cache_hit_ratio", "Ratio de hits du cache");
|
||||
let cache_operations_total = register_counter!(
|
||||
"cache_operations_total",
|
||||
"Total des opérations de cache",
|
||||
&["operation", "result"]
|
||||
);
|
||||
|
||||
let bytes_streamed_total = register_counter!(
|
||||
"bytes_streamed_total",
|
||||
"Total des bytes streamés",
|
||||
&["codec", "quality"]
|
||||
);
|
||||
let bytes_uploaded_total = register_counter!(
|
||||
"bytes_uploaded_total",
|
||||
"Total des bytes uploadés",
|
||||
&["codec"]
|
||||
);
|
||||
let network_bandwidth_bps =
|
||||
register_gauge!("network_bandwidth_bps", "Bande passante réseau en bps");
|
||||
let connection_duration_seconds = register_histogram!(
|
||||
"connection_duration_seconds",
|
||||
"Durée des connexions",
|
||||
&["client_type"]
|
||||
);
|
||||
|
||||
let stream_errors_total = register_counter!(
|
||||
"stream_errors_total",
|
||||
"Erreurs de streaming",
|
||||
&["error_type", "codec"]
|
||||
);
|
||||
let file_errors_total = register_counter!(
|
||||
"file_errors_total",
|
||||
"Erreurs de fichiers",
|
||||
&["error_type", "operation"]
|
||||
);
|
||||
let codec_errors_total = register_counter!(
|
||||
"codec_errors_total",
|
||||
"Erreurs de codecs",
|
||||
&["codec", "error_type"]
|
||||
);
|
||||
let network_errors_total =
|
||||
register_counter!("network_errors_total", "Erreurs réseau", &["error_type"]);
|
||||
let authentication_errors_total =
|
||||
register_counter!("authentication_errors_total", "Erreurs d'authentification");
|
||||
|
||||
let security_events_total = register_counter!(
|
||||
"security_events_total",
|
||||
"Événements de sécurité",
|
||||
&["event_type"]
|
||||
);
|
||||
let rate_limits_triggered_total = register_counter!(
|
||||
"rate_limits_triggered_total",
|
||||
"Rate limits déclenchés",
|
||||
&["limit_type"]
|
||||
);
|
||||
let invalid_requests_total = register_counter!(
|
||||
"invalid_requests_total",
|
||||
"Requêtes invalides",
|
||||
&["request_type"]
|
||||
);
|
||||
|
||||
let memory_usage_bytes =
|
||||
register_gauge!("memory_usage_bytes", "Utilisation mémoire en bytes");
|
||||
let cpu_usage_percent =
|
||||
register_gauge!("cpu_usage_percent", "Utilisation CPU en pourcentage");
|
||||
let disk_usage_bytes = register_gauge!("disk_usage_bytes", "Utilisation disque en bytes");
|
||||
let uptime_seconds =
|
||||
register_gauge!("uptime_seconds", "Temps de fonctionnement en secondes");
|
||||
|
||||
let active_users = register_gauge!("active_users", "Utilisateurs actifs");
|
||||
let premium_users = register_gauge!("premium_users", "Utilisateurs premium");
|
||||
let total_playtime_seconds =
|
||||
register_counter!("total_playtime_seconds", "Temps total d'écoute en secondes");
|
||||
let unique_tracks_played =
|
||||
register_counter!("unique_tracks_played_total", "Pistes uniques jouées");
|
||||
let revenue_total = register_counter!("revenue_total", "Revenus totaux", &["currency"]);
|
||||
|
||||
Ok(Self {
|
||||
handle,
|
||||
active_streams,
|
||||
streams_total,
|
||||
streams_ended,
|
||||
stream_duration_seconds,
|
||||
stream_bitrate_bps,
|
||||
stream_quality_changes,
|
||||
codec_encoding_duration_seconds,
|
||||
codec_decoding_duration_seconds,
|
||||
codec_transcoding_duration_seconds,
|
||||
codec_encoding_errors,
|
||||
codec_decoding_errors,
|
||||
codec_transcoding_errors,
|
||||
files_uploaded_total,
|
||||
files_deleted_total,
|
||||
files_served_total,
|
||||
file_upload_duration_seconds,
|
||||
file_size_bytes,
|
||||
audio_processing_duration_seconds,
|
||||
waveform_generation_duration_seconds,
|
||||
metadata_extraction_duration_seconds,
|
||||
cache_hit_ratio,
|
||||
cache_operations_total,
|
||||
bytes_streamed_total,
|
||||
bytes_uploaded_total,
|
||||
network_bandwidth_bps,
|
||||
connection_duration_seconds,
|
||||
stream_errors_total,
|
||||
file_errors_total,
|
||||
codec_errors_total,
|
||||
network_errors_total,
|
||||
authentication_errors_total,
|
||||
security_events_total,
|
||||
rate_limits_triggered_total,
|
||||
invalid_requests_total,
|
||||
memory_usage_bytes,
|
||||
cpu_usage_percent,
|
||||
disk_usage_bytes,
|
||||
uptime_seconds,
|
||||
active_users,
|
||||
premium_users,
|
||||
total_playtime_seconds,
|
||||
unique_tracks_played,
|
||||
revenue_total,
|
||||
})
|
||||
}
|
||||
|
||||
/// Met à jour le nombre de streams actifs
|
||||
pub fn update_active_streams(&self, count: u64) {
|
||||
self.active_streams.set(count as f64);
|
||||
}
|
||||
|
||||
/// Enregistre un nouveau stream
|
||||
pub fn record_stream_started(&self, codec: &str, quality: &str) {
|
||||
counter!(
|
||||
"streams_total",
|
||||
1,
|
||||
&[("codec", codec), ("quality", quality)]
|
||||
);
|
||||
}
|
||||
|
||||
/// Enregistre la fin d'un stream
|
||||
pub fn record_stream_ended(&self, codec: &str, quality: &str, duration_seconds: f64) {
|
||||
counter!(
|
||||
"streams_ended",
|
||||
1,
|
||||
&[("codec", codec), ("quality", quality)]
|
||||
);
|
||||
self.stream_duration_seconds
|
||||
.record(duration_seconds, &[codec, quality]);
|
||||
}
|
||||
|
||||
/// Enregistre le bitrate d'un stream
|
||||
pub fn record_stream_bitrate(&self, codec: &str, quality: &str, bitrate_bps: u32) {
|
||||
self.stream_bitrate_bps
|
||||
.record(bitrate_bps as f64, &[codec, quality]);
|
||||
}
|
||||
|
||||
/// Enregistre un changement de qualité
|
||||
pub fn record_quality_change(&self) {
|
||||
self.stream_quality_changes.increment(1);
|
||||
}
|
||||
|
||||
/// Enregistre la durée d'encodage
|
||||
pub fn record_encoding_duration(&self, codec: &str, quality: &str, duration: Duration) {
|
||||
self.codec_encoding_duration_seconds
|
||||
.record(duration.as_secs_f64(), &[codec, quality]);
|
||||
}
|
||||
|
||||
/// Enregistre la durée de décodage
|
||||
pub fn record_decoding_duration(&self, codec: &str, duration: Duration) {
|
||||
self.codec_decoding_duration_seconds
|
||||
.record(duration.as_secs_f64(), &[codec]);
|
||||
}
|
||||
|
||||
/// Enregistre la durée de transcodage
|
||||
pub fn record_transcoding_duration(
|
||||
&self,
|
||||
input_codec: &str,
|
||||
output_codec: &str,
|
||||
duration: Duration,
|
||||
) {
|
||||
self.codec_transcoding_duration_seconds
|
||||
.record(duration.as_secs_f64(), &[input_codec, output_codec]);
|
||||
}
|
||||
|
||||
/// Enregistre une erreur d'encodage
|
||||
pub fn record_encoding_error(&self, codec: &str, error_type: &str) {
|
||||
counter!(
|
||||
"codec_encoding_errors_total",
|
||||
1,
|
||||
&[("codec", codec), ("error_type", error_type)]
|
||||
);
|
||||
}
|
||||
|
||||
/// Enregistre une erreur de décodage
|
||||
pub fn record_decoding_error(&self, codec: &str, error_type: &str) {
|
||||
counter!(
|
||||
"codec_decoding_errors_total",
|
||||
1,
|
||||
&[("codec", codec), ("error_type", error_type)]
|
||||
);
|
||||
}
|
||||
|
||||
/// Enregistre une erreur de transcodage
|
||||
pub fn record_transcoding_error(
|
||||
&self,
|
||||
input_codec: &str,
|
||||
output_codec: &str,
|
||||
error_type: &str,
|
||||
) {
|
||||
counter!(
|
||||
"codec_transcoding_errors_total",
|
||||
1,
|
||||
&[
|
||||
("input_codec", input_codec),
|
||||
("output_codec", output_codec),
|
||||
("error_type", error_type)
|
||||
]
|
||||
);
|
||||
}
|
||||
|
||||
/// Enregistre l'upload d'un fichier
|
||||
pub fn record_file_uploaded(
|
||||
&self,
|
||||
codec: &str,
|
||||
quality: &str,
|
||||
file_size_bytes: u64,
|
||||
duration: Duration,
|
||||
) {
|
||||
counter!(
|
||||
"files_uploaded_total",
|
||||
1,
|
||||
&[("codec", codec), ("quality", quality)]
|
||||
);
|
||||
self.file_upload_duration_seconds.record(
|
||||
duration.as_secs_f64(),
|
||||
&[codec, &self.get_file_size_range(file_size_bytes)],
|
||||
);
|
||||
self.file_size_bytes
|
||||
.record(file_size_bytes as f64, &[codec]);
|
||||
}
|
||||
|
||||
/// Enregistre la suppression d'un fichier
|
||||
pub fn record_file_deleted(&self, reason: &str) {
|
||||
counter!("files_deleted_total", 1, &[("reason", reason)]);
|
||||
}
|
||||
|
||||
/// Enregistre le service d'un fichier
|
||||
pub fn record_file_served(&self, codec: &str, quality: &str) {
|
||||
counter!(
|
||||
"files_served_total",
|
||||
1,
|
||||
&[("codec", codec), ("quality", quality)]
|
||||
);
|
||||
}
|
||||
|
||||
/// Enregistre la durée de traitement audio
|
||||
pub fn record_audio_processing_duration(&self, operation: &str, duration: Duration) {
|
||||
self.audio_processing_duration_seconds
|
||||
.record(duration.as_secs_f64(), &[operation]);
|
||||
}
|
||||
|
||||
/// Enregistre la durée de génération de waveform
|
||||
pub fn record_waveform_generation_duration(&self, duration: Duration) {
|
||||
self.waveform_generation_duration_seconds
|
||||
.record(duration.as_secs_f64());
|
||||
}
|
||||
|
||||
/// Enregistre la durée d'extraction de métadonnées
|
||||
pub fn record_metadata_extraction_duration(&self, codec: &str, duration: Duration) {
|
||||
self.metadata_extraction_duration_seconds
|
||||
.record(duration.as_secs_f64(), &[codec]);
|
||||
}
|
||||
|
||||
/// Met à jour le ratio de hits du cache
|
||||
pub fn update_cache_hit_ratio(&self, ratio: f64) {
|
||||
self.cache_hit_ratio.set(ratio);
|
||||
}
|
||||
|
||||
/// Enregistre une opération de cache
|
||||
pub fn record_cache_operation(&self, operation: &str, result: &str) {
|
||||
counter!(
|
||||
"cache_operations_total",
|
||||
1,
|
||||
&[("operation", operation), ("result", result)]
|
||||
);
|
||||
}
|
||||
|
||||
/// Enregistre des bytes streamés
|
||||
pub fn record_bytes_streamed(&self, codec: &str, quality: &str, bytes: u64) {
|
||||
counter!(
|
||||
"bytes_streamed_total",
|
||||
bytes,
|
||||
&[("codec", codec), ("quality", quality)]
|
||||
);
|
||||
}
|
||||
|
||||
/// Enregistre des bytes uploadés
|
||||
pub fn record_bytes_uploaded(&self, codec: &str, bytes: u64) {
|
||||
counter!("bytes_uploaded_total", bytes, &[("codec", codec)]);
|
||||
}
|
||||
|
||||
/// Met à jour la bande passante réseau
|
||||
pub fn update_network_bandwidth(&self, bandwidth_bps: u64) {
|
||||
self.network_bandwidth_bps.set(bandwidth_bps as f64);
|
||||
}
|
||||
|
||||
/// Enregistre la durée d'une connexion
|
||||
pub fn record_connection_duration(&self, client_type: &str, duration: Duration) {
|
||||
self.connection_duration_seconds
|
||||
.record(duration.as_secs_f64(), &[client_type]);
|
||||
}
|
||||
|
||||
/// Enregistre une erreur de streaming
|
||||
pub fn record_stream_error(&self, error_type: &str, codec: &str) {
|
||||
counter!(
|
||||
"stream_errors_total",
|
||||
1,
|
||||
&[("error_type", error_type), ("codec", codec)]
|
||||
);
|
||||
}
|
||||
|
||||
/// Enregistre une erreur de fichier
|
||||
pub fn record_file_error(&self, error_type: &str, operation: &str) {
|
||||
counter!(
|
||||
"file_errors_total",
|
||||
1,
|
||||
&[("error_type", error_type), ("operation", operation)]
|
||||
);
|
||||
}
|
||||
|
||||
/// Enregistre une erreur de codec
|
||||
pub fn record_codec_error(&self, codec: &str, error_type: &str) {
|
||||
counter!(
|
||||
"codec_errors_total",
|
||||
1,
|
||||
&[("codec", codec), ("error_type", error_type)]
|
||||
);
|
||||
}
|
||||
|
||||
/// Enregistre une erreur réseau
|
||||
pub fn record_network_error(&self, error_type: &str) {
|
||||
counter!("network_errors_total", 1, &[("error_type", error_type)]);
|
||||
}
|
||||
|
||||
/// Enregistre une erreur d'authentification
|
||||
pub fn record_authentication_error(&self) {
|
||||
self.authentication_errors_total.increment(1);
|
||||
}
|
||||
|
||||
/// Enregistre un événement de sécurité
|
||||
pub fn record_security_event(&self, event_type: &str) {
|
||||
counter!("security_events_total", 1, &[("event_type", event_type)]);
|
||||
}
|
||||
|
||||
/// Enregistre un rate limit déclenché
|
||||
pub fn record_rate_limit_triggered(&self, limit_type: &str) {
|
||||
counter!(
|
||||
"rate_limits_triggered_total",
|
||||
1,
|
||||
&[("limit_type", limit_type)]
|
||||
);
|
||||
}
|
||||
|
||||
/// Enregistre une requête invalide
|
||||
pub fn record_invalid_request(&self, request_type: &str) {
|
||||
counter!(
|
||||
"invalid_requests_total",
|
||||
1,
|
||||
&[("request_type", request_type)]
|
||||
);
|
||||
}
|
||||
|
||||
/// Met à jour les métriques système
|
||||
pub fn update_system_metrics(
|
||||
&self,
|
||||
memory_bytes: u64,
|
||||
cpu_percent: f64,
|
||||
disk_bytes: u64,
|
||||
uptime_seconds: u64,
|
||||
) {
|
||||
self.memory_usage_bytes.set(memory_bytes as f64);
|
||||
self.cpu_usage_percent.set(cpu_percent);
|
||||
self.disk_usage_bytes.set(disk_bytes as f64);
|
||||
self.uptime_seconds.set(uptime_seconds as f64);
|
||||
}
|
||||
|
||||
/// Met à jour les métriques business
|
||||
pub fn update_business_metrics(
|
||||
&self,
|
||||
active_users: u64,
|
||||
premium_users: u64,
|
||||
playtime_seconds: u64,
|
||||
unique_tracks: u64,
|
||||
revenue: f64,
|
||||
) {
|
||||
self.active_users.set(active_users as f64);
|
||||
self.premium_users.set(premium_users as f64);
|
||||
counter!("total_playtime_seconds", playtime_seconds);
|
||||
counter!("unique_tracks_played_total", unique_tracks);
|
||||
counter!("revenue_total", revenue as u64, &[("currency", "USD")]);
|
||||
}
|
||||
|
||||
/// Obtient le handle Prometheus pour l'export
|
||||
pub fn get_handle(&self) -> &PrometheusHandle {
|
||||
&self.handle
|
||||
}
|
||||
|
||||
/// Génère les métriques au format Prometheus
|
||||
pub fn gather_metrics(&self) -> String {
|
||||
self.handle.render()
|
||||
}
|
||||
|
||||
/// Détermine la plage de taille de fichier pour les métriques
|
||||
fn get_file_size_range(&self, size_bytes: u64) -> &'static str {
|
||||
match size_bytes {
|
||||
0..=1024 => "0-1KB",
|
||||
1025..=10240 => "1-10KB",
|
||||
10241..=102400 => "10-100KB",
|
||||
102401..=1048576 => "100KB-1MB",
|
||||
1048577..=10485760 => "1-10MB",
|
||||
10485761..=104857600 => "10-100MB",
|
||||
_ => "100MB+",
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Wrapper pour mesurer le temps d'exécution d'une opération
|
||||
pub struct MetricsTimer {
|
||||
start: Instant,
|
||||
histogram: Histogram,
|
||||
labels: Vec<String>,
|
||||
}
|
||||
|
||||
impl MetricsTimer {
|
||||
pub fn new(histogram: Histogram, labels: Vec<String>) -> Self {
|
||||
Self {
|
||||
start: Instant::now(),
|
||||
histogram,
|
||||
labels,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn finish(self) {
|
||||
let duration = self.start.elapsed();
|
||||
let label_refs: Vec<&str> = self.labels.iter().map(|s| s.as_str()).collect();
|
||||
self.histogram.record(duration.as_secs_f64(), &label_refs);
|
||||
}
|
||||
}
|
||||
|
||||
/// Fonction utilitaire pour créer un timer d'encodage
|
||||
pub fn time_encoding(
|
||||
metrics: &StreamPrometheusMetrics,
|
||||
codec: &str,
|
||||
quality: &str,
|
||||
) -> MetricsTimer {
|
||||
let labels = vec![codec.to_string(), quality.to_string()];
|
||||
MetricsTimer::new(metrics.codec_encoding_duration_seconds.clone(), labels)
|
||||
}
|
||||
|
||||
/// Fonction utilitaire pour créer un timer de décodage
|
||||
pub fn time_decoding(metrics: &StreamPrometheusMetrics, codec: &str) -> MetricsTimer {
|
||||
let labels = vec![codec.to_string()];
|
||||
MetricsTimer::new(metrics.codec_decoding_duration_seconds.clone(), labels)
|
||||
}
|
||||
|
||||
/// Fonction utilitaire pour créer un timer de transcodage
|
||||
pub fn time_transcoding(
|
||||
metrics: &StreamPrometheusMetrics,
|
||||
input_codec: &str,
|
||||
output_codec: &str,
|
||||
) -> MetricsTimer {
|
||||
let labels = vec![input_codec.to_string(), output_codec.to_string()];
|
||||
MetricsTimer::new(metrics.codec_transcoding_duration_seconds.clone(), labels)
|
||||
}
|
||||
|
||||
/// Handler pour l'endpoint de métriques Prometheus
|
||||
pub async fn metrics_handler(
|
||||
State(metrics): State<Arc<StreamPrometheusMetrics>>,
|
||||
) -> Response<String> {
|
||||
let metrics_data = metrics.gather_metrics();
|
||||
|
||||
tracing::debug!("📊 Métriques Prometheus exportées");
|
||||
|
||||
Response::builder()
|
||||
.status(StatusCode::OK)
|
||||
.header("Content-Type", "text/plain; version=0.0.4; charset=utf-8")
|
||||
.body(metrics_data)
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
/// Crée un routeur pour les endpoints de métriques
|
||||
pub fn create_metrics_router(metrics: Arc<StreamPrometheusMetrics>) -> Router {
|
||||
Router::new()
|
||||
.route("/metrics", get(metrics_handler))
|
||||
.with_state(metrics)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::config::ServerConfig;
|
||||
|
||||
#[test]
|
||||
fn test_metrics_timer() {
|
||||
let histogram = register_histogram!("test_histogram", "Test histogram");
|
||||
let timer = MetricsTimer::new(histogram, vec!["test".to_string()]);
|
||||
|
||||
// Simuler une opération
|
||||
std::thread::sleep(std::time::Duration::from_millis(10));
|
||||
timer.finish();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_file_size_range() {
|
||||
let config = ServerConfig::default();
|
||||
let metrics = StreamPrometheusMetrics::new(&config).unwrap();
|
||||
|
||||
assert_eq!(metrics.get_file_size_range(500), "0-1KB");
|
||||
assert_eq!(metrics.get_file_size_range(5000), "1-10KB");
|
||||
assert_eq!(metrics.get_file_size_range(50000), "10-100KB");
|
||||
assert_eq!(metrics.get_file_size_range(500000), "100KB-1MB");
|
||||
assert_eq!(metrics.get_file_size_range(5000000), "1-10MB");
|
||||
assert_eq!(metrics.get_file_size_range(50000000), "10-100MB");
|
||||
assert_eq!(metrics.get_file_size_range(500000000), "100MB+");
|
||||
}
|
||||
}
|
||||
Loading…
Reference in a new issue