veza/veza-stream-server/src/structured_logging.rs

718 lines
21 KiB
Rust
Raw Normal View History

2025-12-03 19:36:56 +00:00
//! Logging structuré avec tracing pour le serveur de streaming
//!
//! Ce module fournit un système de logging avancé avec:
//! - Logs structurés avec champs contextuels
//! - Rotation des logs
//! - Filtrage par niveau et module
//! - Export vers différents formats (JSON, Pretty, Compact)
//! - Intégration avec les métriques
use crate::config::LogFormat;
use crate::error::{AppError, Result};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::path::PathBuf;
use std::time::Duration;
// Note: Use tracing::info! macro directly instead of importing
use tracing_appender::{
non_blocking::{NonBlocking, WorkerGuard},
rolling::{RollingFileAppender, Rotation},
};
use tracing_subscriber::{
fmt::{self, format::Writer, time::ChronoUtc},
layer::SubscriberExt,
util::SubscriberInitExt,
EnvFilter, Layer, Registry,
};
/// Configuration du logging
#[derive(Debug, Clone)]
pub struct LoggingConfig {
pub level: String,
pub format: LogFormat,
pub file: Option<PathBuf>,
pub rotation: Option<LogRotation>,
pub filters: Vec<String>,
}
/// Configuration de la rotation des logs
#[derive(Debug, Clone)]
pub struct LogRotation {
pub max_size: u64,
pub max_files: usize,
}
/// Configuration du logging structuré
#[derive(Debug)]
pub struct StructuredLogging {
config: LoggingConfig,
_guard: Option<WorkerGuard>,
appender: Option<NonBlocking>,
}
impl StructuredLogging {
/// Initialise le système de logging structuré
pub fn new(config: LoggingConfig) -> Result<Self> {
let (appender, guard) = if let Some(file_path) = &config.file {
// Configuration de la rotation des logs
let rotation = match &config.rotation {
Some(LogRotation { max_size, .. }) => {
if *max_size > 100 * 1024 * 1024 {
// > 100MB -> rotation quotidienne
Rotation::DAILY
} else {
// <= 100MB -> rotation horaire
Rotation::HOURLY
}
}
None => Rotation::DAILY,
};
let file_appender =
RollingFileAppender::new(rotation, file_path.parent().unwrap(), "stream-server");
let (non_blocking, guard) = tracing_appender::non_blocking(file_appender);
(Some(non_blocking), Some(guard))
} else {
(None, None)
};
Ok(Self {
config,
_guard: guard,
appender,
})
}
/// Configure et initialise le subscriber tracing
pub fn setup(&self) -> Result<()> {
// Filtre d'environnement
let env_filter = EnvFilter::try_from_default_env()
.or_else(|_| EnvFilter::try_new(&self.config.level))
.map_err(|e| AppError::ConfigError {
message: format!("Erreur configuration filtre: {e}"),
})?;
// Configuration du format
let format_layer = match self.config.format {
LogFormat::Json => fmt::layer()
.json()
.with_timer(ChronoUtc::rfc_3339())
.with_target(true)
.with_file(true)
.with_line_number(true)
.boxed(),
LogFormat::Pretty => fmt::layer()
.pretty()
.with_timer(ChronoUtc::rfc_3339())
.with_target(true)
.with_file(true)
.with_line_number(true)
.boxed(),
LogFormat::Compact => fmt::layer()
.compact()
.with_timer(ChronoUtc::rfc_3339())
.with_target(true)
.boxed(),
};
// Configuration de la sortie
let registry = Registry::default().with(env_filter).with(format_layer);
if let Some(appender) = &self.appender {
let file_layer = fmt::layer()
.json()
.with_writer(appender.clone())
.with_timer(ChronoUtc::rfc_3339())
.with_target(true)
.with_file(true)
.with_line_number(true)
.boxed();
registry.with(file_layer).init();
} else {
registry.init();
}
tracing::info!(
level = %self.config.level,
format = ?self.config.format,
file = ?self.config.file,
"🔧 Système de logging structuré initialisé"
);
Ok(())
}
}
/// Log d'une requête de streaming
pub fn log_stream_request(
track_id: &str,
user_id: &str,
bitrate: u32,
metadata: HashMap<String, String>,
) {
tracing::info!(
track_id = track_id,
user_id = user_id,
bitrate = bitrate,
metadata = ?metadata,
"Stream request initiated"
);
}
/// Trace d'un chunk audio traité
pub fn trace_audio_chunk(chunk_id: usize, size: usize) {
tracing::trace!(chunk_id = chunk_id, size = size, "Audio chunk processed");
}
/// Log d'erreur avec contexte
pub fn log_error(error: &str, context: HashMap<String, String>) {
tracing::error!(
error = error,
context = ?context,
"Error occurred in stream server"
);
}
/// Macros de logging contextuel pour le streaming
pub mod stream_logs {
use super::*;
use std::collections::HashMap;
// Note: Use tracing::info! macro directly instead of importing
use uuid::Uuid;
/// Log de connexion de streaming
pub fn stream_connected(stream_id: &str, client_ip: &str, user_agent: &str, codec: &str) {
tracing::info!(
event = "stream_connected",
stream_id = %stream_id,
client_ip = %client_ip,
user_agent = %user_agent,
codec = %codec,
"🎵 Connexion de streaming établie"
);
}
/// Log de déconnexion de streaming
pub fn stream_disconnected(
stream_id: &str,
client_ip: &str,
duration_seconds: u64,
reason: &str,
) {
tracing::info!(
event = "stream_disconnected",
stream_id = %stream_id,
client_ip = %client_ip,
duration_seconds = %duration_seconds,
reason = %reason,
"🎵 Connexion de streaming fermée"
);
}
/// Log de début de lecture
pub fn playback_started(stream_id: &str, track_id: &str, codec: &str, bitrate: u32) {
tracing::info!(
event = "playback_started",
stream_id = %stream_id,
track_id = %track_id,
codec = %codec,
bitrate = %bitrate,
"▶️ Lecture démarrée"
);
}
/// Log de pause de lecture
pub fn playback_paused(stream_id: &str, track_id: &str, position_seconds: f64) {
tracing::info!(
event = "playback_paused",
stream_id = %stream_id,
track_id = %track_id,
position_seconds = %position_seconds,
"⏸️ Lecture en pause"
);
}
/// Log de reprise de lecture
pub fn playback_resumed(stream_id: &str, track_id: &str, position_seconds: f64) {
tracing::info!(
event = "playback_resumed",
stream_id = %stream_id,
track_id = %track_id,
position_seconds = %position_seconds,
"▶️ Lecture reprise"
);
}
/// Log de fin de lecture
pub fn playback_finished(stream_id: &str, track_id: &str, duration_seconds: f64) {
tracing::info!(
event = "playback_finished",
stream_id = %stream_id,
track_id = %track_id,
duration_seconds = %duration_seconds,
"🏁 Lecture terminée"
);
}
/// Log de changement de qualité
pub fn quality_changed(stream_id: &str, old_quality: &str, new_quality: &str, reason: &str) {
tracing::info!(
event = "quality_changed",
stream_id = %stream_id,
old_quality = %old_quality,
new_quality = %new_quality,
reason = %reason,
"📊 Qualité changée"
);
}
/// Log d'encodage audio
pub fn audio_encoded(
stream_id: &str,
codec: &str,
input_samples: usize,
output_bytes: usize,
encoding_time_ms: u64,
) {
tracing::debug!(
event = "audio_encoded",
stream_id = %stream_id,
codec = %codec,
input_samples = %input_samples,
output_bytes = %output_bytes,
encoding_time_ms = %encoding_time_ms,
"🎼 Audio encodé"
);
}
/// Log de décodage audio
pub fn audio_decoded(
stream_id: &str,
codec: &str,
input_bytes: usize,
output_samples: usize,
decoding_time_ms: u64,
) {
tracing::debug!(
event = "audio_decoded",
stream_id = %stream_id,
codec = %codec,
input_bytes = %input_bytes,
output_samples = %output_samples,
decoding_time_ms = %decoding_time_ms,
"🎼 Audio décodé"
);
}
/// Log d'upload de fichier
pub fn file_uploaded(
file_id: &str,
filename: &str,
file_size: u64,
codec: &str,
duration_seconds: f64,
) {
tracing::info!(
event = "file_uploaded",
file_id = %file_id,
filename = %filename,
file_size = %file_size,
codec = %codec,
duration_seconds = %duration_seconds,
"📁 Fichier uploadé"
);
}
/// Log de suppression de fichier
pub fn file_deleted(file_id: &str, filename: &str, reason: &str) {
tracing::warn!(
event = "file_deleted",
file_id = %file_id,
filename = %filename,
reason = %reason,
"🗑️ Fichier supprimé"
);
}
/// Log de transcodage
pub fn transcoding_started(
file_id: &str,
input_codec: &str,
output_codec: &str,
quality: &str,
) {
tracing::info!(
event = "transcoding_started",
file_id = %file_id,
input_codec = %input_codec,
output_codec = %output_codec,
quality = %quality,
"🔄 Transcodage démarré"
);
}
/// Log de fin de transcodage
pub fn transcoding_finished(
file_id: &str,
input_codec: &str,
output_codec: &str,
duration_seconds: f64,
success: bool,
) {
let level = if success { "info" } else { "error" };
let message = if success {
"✅ Transcodage terminé"
} else {
"❌ Transcodage échoué"
};
match level {
"info" => {
tracing::info!(
event = "transcoding_finished",
file_id = %file_id,
input_codec = %input_codec,
output_codec = %output_codec,
duration_seconds = %duration_seconds,
success = %success,
"{}", message
);
}
_ => {
tracing::error!(
event = "transcoding_finished",
file_id = %file_id,
input_codec = %input_codec,
output_codec = %output_codec,
duration_seconds = %duration_seconds,
success = %success,
"{}", message
);
}
}
}
/// Log d'erreur de streaming
pub fn stream_error(stream_id: &str, error_type: &str, error_message: &str, context: &str) {
tracing::error!(
event = "stream_error",
stream_id = %stream_id,
error_type = %error_type,
error_message = %error_message,
context = %context,
"💥 Erreur de streaming"
);
}
/// Log d'erreur de codec
pub fn codec_error(codec: &str, operation: &str, error_message: &str) {
tracing::error!(
event = "codec_error",
codec = %codec,
operation = %operation,
error_message = %error_message,
"🎼 Erreur de codec"
);
}
/// Log d'erreur de fichier
pub fn file_error(file_id: &str, operation: &str, error_message: &str) {
tracing::error!(
event = "file_error",
file_id = %file_id,
operation = %operation,
error_message = %error_message,
"📁 Erreur de fichier"
);
}
/// Log de performance
pub fn performance_measurement(
operation: &str,
duration_ms: f64,
success: bool,
additional_data: Option<&HashMap<String, String>>,
) {
let level = if success { "debug" } else { "warn" };
let message = if success {
"⚡ Opération terminée"
} else {
"🐌 Opération lente"
};
match level {
"debug" => {
tracing::debug!(
event = "performance_measurement",
operation = %operation,
duration_ms = %duration_ms,
success = %success,
additional_data = ?additional_data,
"{}", message
);
}
_ => {
tracing::warn!(
event = "performance_measurement",
operation = %operation,
duration_ms = %duration_ms,
success = %success,
additional_data = ?additional_data,
"{}", message
);
}
}
}
/// Log de démarrage du serveur
pub fn server_started(bind_addr: &str, environment: &str, version: &str) {
tracing::info!(
event = "server_started",
bind_addr = %bind_addr,
environment = %environment,
version = %version,
"🚀 Serveur de streaming démarré"
);
}
/// Log d'arrêt du serveur
pub fn server_stopped(reason: &str, uptime_seconds: u64) {
tracing::info!(
event = "server_stopped",
reason = %reason,
uptime_seconds = %uptime_seconds,
"🛑 Serveur de streaming arrêté"
);
}
/// Log de configuration
pub fn config_loaded(config_source: &str, config_path: Option<&str>) {
tracing::info!(
event = "config_loaded",
config_source = %config_source,
config_path = ?config_path,
"⚙️ Configuration chargée"
);
}
/// Log de base de données
pub fn database_operation(operation: &str, table: &str, duration_ms: f64, success: bool) {
let level = if success { "debug" } else { "error" };
let message = if success {
"🗄️ Opération DB réussie"
} else {
"💥 Erreur DB"
};
match level {
"debug" => {
tracing::debug!(
event = "database_operation",
operation = %operation,
table = %table,
duration_ms = %duration_ms,
success = %success,
"{}", message
);
}
_ => {
tracing::error!(
event = "database_operation",
operation = %operation,
table = %table,
duration_ms = %duration_ms,
success = %success,
"{}", message
);
}
}
}
/// Log de cache
pub fn cache_operation(operation: &str, key: &str, hit: bool, duration_ms: f64) {
tracing::debug!(
event = "cache_operation",
operation = %operation,
key = %key,
hit = %hit,
duration_ms = %duration_ms,
"💾 Opération de cache"
);
}
/// Log de sécurité
pub fn security_event(event_type: &str, client_ip: &str, details: &str) {
tracing::warn!(
event = "security_event",
event_type = %event_type,
client_ip = %client_ip,
details = %details,
"🔒 Événement de sécurité"
);
}
/// Log de métriques
pub fn metrics_updated(metric_name: &str, value: f64, labels: &HashMap<String, String>) {
tracing::trace!(
event = "metrics_updated",
metric_name = %metric_name,
value = %value,
labels = ?labels,
"📊 Métrique mise à jour"
);
}
}
/// Wrapper pour les logs avec contexte de streaming
pub struct StreamContextLogger {
stream_id: String,
client_ip: String,
user_agent: Option<String>,
}
impl StreamContextLogger {
pub fn new(stream_id: String, client_ip: String, user_agent: Option<String>) -> Self {
Self {
stream_id,
client_ip,
user_agent,
}
}
pub fn info(&self, message: &str, fields: Option<&HashMap<String, String>>) {
tracing::info!(
stream_id = %self.stream_id,
client_ip = %self.client_ip,
user_agent = ?self.user_agent,
additional_fields = ?fields,
"{}", message
);
}
pub fn warn(&self, message: &str, fields: Option<&HashMap<String, String>>) {
tracing::warn!(
stream_id = %self.stream_id,
client_ip = %self.client_ip,
user_agent = ?self.user_agent,
additional_fields = ?fields,
"{}", message
);
}
pub fn error(&self, message: &str, fields: Option<&HashMap<String, String>>) {
tracing::error!(
stream_id = %self.stream_id,
client_ip = %self.client_ip,
user_agent = ?self.user_agent,
additional_fields = ?fields,
"{}", message
);
}
}
/// Initialise le système de logging depuis la configuration du serveur
pub fn init_logging_from_config(config: &crate::config::Config) -> Result<StructuredLogging> {
let logging_config = LoggingConfig {
level: config.monitoring.log_level.clone(),
format: config.monitoring.log_format.clone(),
file: None, // TODO: Ajouter configuration de fichier si nécessaire
rotation: None,
filters: vec![],
};
let structured_logging = StructuredLogging::new(logging_config)?;
structured_logging.setup()?;
stream_logs::config_loaded("server_config", None);
stream_logs::server_started(
&format!("0.0.0.0:{}", config.port),
&format!("{:?}", config.environment),
env!("CARGO_PKG_VERSION"),
);
Ok(structured_logging)
}
/// Configuration par défaut pour les tests
pub fn init_test_logging() -> Result<()> {
let config = LoggingConfig {
level: "debug".to_string(),
format: LogFormat::Compact,
file: None,
rotation: None,
filters: vec!["stream_server=debug".to_string()],
};
let structured_logging = StructuredLogging::new(config)?;
structured_logging.setup()?;
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use std::collections::HashMap;
#[test]
fn test_stream_context_logger() {
let logger = StreamContextLogger::new(
"test_stream".to_string(),
"127.0.0.1".to_string(),
Some("TestAgent".to_string()),
);
// Test que le logger peut être créé sans erreur
assert_eq!(logger.stream_id, "test_stream");
assert_eq!(logger.client_ip, "127.0.0.1");
assert_eq!(logger.user_agent, Some("TestAgent".to_string()));
}
#[test]
fn test_stream_logs_macros() {
// Test que les macros de logging peuvent être appelées
// (elles ne feront rien en mode test sans subscriber)
stream_logs::stream_connected("test", "127.0.0.1", "TestAgent", "mp3");
stream_logs::playback_started("test", "track1", "mp3", 128000);
stream_logs::stream_error("test", "codec", "Test error", "encoding");
}
#[test]
fn test_structured_logging_creation() {
let config = LoggingConfig {
level: "info".to_string(),
format: LogFormat::Pretty,
file: None,
rotation: None,
filters: vec![],
};
let result = StructuredLogging::new(config);
assert!(result.is_ok());
}
#[test]
fn test_log_stream_request() {
let mut metadata = HashMap::new();
metadata.insert("ip".to_string(), "192.168.1.1".to_string());
// Should not panic
log_stream_request("track-123", "user-456", 320, metadata);
}
#[test]
fn test_trace_audio_chunk() {
// Should not panic
trace_audio_chunk(1, 1024);
}
#[test]
fn test_log_error() {
let mut context = HashMap::new();
context.insert("operation".to_string(), "encoding".to_string());
// Should not panic
log_error("Test error message", context);
}
}