diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 5455ad3ca..ac9fce94b 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -29,9 +29,12 @@ jobs: cache: true - name: Install Go tools + # NOTE: golangci-lint v2 lives under the /v2/ module path. + # The old /cmd/ path still resolves to v1.64.x, which rejects + # v2-format .golangci.yml with "please use golangci-lint v2". run: | go install golang.org/x/vuln/cmd/govulncheck@latest - go install github.com/golangci/golangci-lint/cmd/golangci-lint@latest + go install github.com/golangci/golangci-lint/v2/cmd/golangci-lint@latest - name: Build run: go build ./... diff --git a/veza-stream-server/benches/http_range_bench.rs b/veza-stream-server/benches/http_range_bench.rs index af5e592b8..378cb2f99 100644 --- a/veza-stream-server/benches/http_range_bench.rs +++ b/veza-stream-server/benches/http_range_bench.rs @@ -1,6 +1,6 @@ use criterion::{black_box, criterion_group, criterion_main, Criterion}; -use stream_server::streaming::protocols::http_range::ByteRange; use std::str::FromStr; +use stream_server::streaming::protocols::http_range::ByteRange; fn bench_parse_range(c: &mut Criterion) { c.bench_function("parse_exact_range", |b| { diff --git a/veza-stream-server/src/audio/compression.rs b/veza-stream-server/src/audio/compression.rs index cde435e81..db6c423e9 100644 --- a/veza-stream-server/src/audio/compression.rs +++ b/veza-stream-server/src/audio/compression.rs @@ -446,7 +446,12 @@ impl CompressionEngine { &self, job: &CompressionJob, ) -> Result { - let ffmpeg_path = self.config.compression.ffmpeg_path.as_deref().unwrap_or("ffmpeg"); + let ffmpeg_path = self + .config + .compression + .ffmpeg_path + .as_deref() + .unwrap_or("ffmpeg"); tracing::debug!( "Compression de {:?} vers {:?} avec le profil {:?} via {}", @@ -508,7 +513,9 @@ impl CompressionEngine { // Vérifier la taille du fichier de sortie let compressed_size = tokio::fs::metadata(&job.output_path) .await - .map_err(|e| CompressionError::IoError(format!("Failed to get output file metadata: {}", e)))? + .map_err(|e| { + CompressionError::IoError(format!("Failed to get output file metadata: {}", e)) + })? .len(); // Callback vers le backend (A01, A04: auth via X-Internal-API-Key) @@ -522,16 +529,17 @@ impl CompressionEngine { tracing::info!("Calling backend callback: {}", url); - let request = client - .post(&url) - .json(&serde_json::json!({ - "status": "ready", - "manifest_url": format!("/hls/{}/master.m3u8", job.track_id), - "error": null - })); + let request = client.post(&url).json(&serde_json::json!({ + "status": "ready", + "manifest_url": format!("/hls/{}/master.m3u8", job.track_id), + "error": null + })); - let key = std::env::var("INTERNAL_API_KEY") - .map_err(|_| CompressionError::Config("INTERNAL_API_KEY must be set for stream-ready callbacks".into()))?; + let key = std::env::var("INTERNAL_API_KEY").map_err(|_| { + CompressionError::Config( + "INTERNAL_API_KEY must be set for stream-ready callbacks".into(), + ) + })?; if key.is_empty() { return Err(CompressionError::Config( "INTERNAL_API_KEY must not be empty".into(), diff --git a/veza-stream-server/src/auth/mod.rs b/veza-stream-server/src/auth/mod.rs index e4ccd086a..890aa5f72 100644 --- a/veza-stream-server/src/auth/mod.rs +++ b/veza-stream-server/src/auth/mod.rs @@ -165,15 +165,15 @@ impl AuthManager { }) .transpose()?; - let (encoding_key, decoding_key_hs256) = if let Some(ref secret) = config.security.jwt_secret - { - ( - Some(EncodingKey::from_secret(secret.as_bytes())), - Some(DecodingKey::from_secret(secret.as_bytes())), - ) - } else { - (None, None) - }; + let (encoding_key, decoding_key_hs256) = + if let Some(ref secret) = config.security.jwt_secret { + ( + Some(EncodingKey::from_secret(secret.as_bytes())), + Some(DecodingKey::from_secret(secret.as_bytes())), + ) + } else { + (None, None) + }; if decoding_key_rs256.is_none() && decoding_key_hs256.is_none() { return Err(AuthError::ConfigurationError( @@ -238,7 +238,8 @@ impl AuthManager { .encoding_key .as_ref() .ok_or(AuthError::ConfigurationError( - "JWT_SECRET required for token generation (use RS256 for validation only)".to_string(), + "JWT_SECRET required for token generation (use RS256 for validation only)" + .to_string(), ))?; let access_token = encode(&Header::default(), &claims, enc_key) .map_err(|e| AuthError::TokenGenerationError(e.to_string()))?; @@ -339,7 +340,9 @@ impl AuthManager { )); } - let claims = validation_result.claims.ok_or(AuthError::InvalidToken("No claims in token".to_string()))?; + let claims = validation_result + .claims + .ok_or(AuthError::InvalidToken("No claims in token".to_string()))?; // Créer un nouveau UserInfo à partir des claims let user_info = UserInfo { @@ -514,7 +517,10 @@ pub(crate) fn extract_token_from_headers(headers: &HeaderMap) -> Option /// Extracts JWT from request: Authorization header (priority) or ?token= query param. /// Used for HLS endpoints where clients may pass token via header or query. -pub(crate) fn extract_token_from_request(headers: &HeaderMap, uri: &axum::http::Uri) -> Option { +pub(crate) fn extract_token_from_request( + headers: &HeaderMap, + uri: &axum::http::Uri, +) -> Option { extract_token_from_headers(headers).or_else(|| { uri.query().and_then(|q| { url::form_urlencoded::parse(q.as_bytes()) @@ -532,11 +538,10 @@ pub async fn hls_auth_middleware( next: Next, ) -> Result { let auth_manager = &state.auth_manager; - let token = extract_token_from_request(request.headers(), request.uri()) - .ok_or_else(|| { - tracing::warn!("HLS request rejected: no token (Authorization or ?token=)"); - StatusCode::UNAUTHORIZED - })?; + let token = extract_token_from_request(request.headers(), request.uri()).ok_or_else(|| { + tracing::warn!("HLS request rejected: no token (Authorization or ?token=)"); + StatusCode::UNAUTHORIZED + })?; let validation_result = auth_manager.validate_token(&token).await; @@ -585,7 +590,10 @@ pub async fn refresh_handler( Ok((access_token, refresh_token)) => { // Valider le refresh token pour récupérer les claims let validation_result = auth_manager.validate_token(&request.refresh_token).await; - let claims = validation_result.claims.ok_or((StatusCode::INTERNAL_SERVER_ERROR, "Failed to extract claims".to_string()))?; + let claims = validation_result.claims.ok_or(( + StatusCode::INTERNAL_SERVER_ERROR, + "Failed to extract claims".to_string(), + ))?; // Créer UserInfo à partir des claims avec i64 aligned let user_info = UserInfo { @@ -719,7 +727,10 @@ mod tests { #[test] fn test_auth_error_display() { - assert_eq!(format!("{}", AuthError::InvalidCredentials), "Invalid credentials"); + assert_eq!( + format!("{}", AuthError::InvalidCredentials), + "Invalid credentials" + ); assert!(format!("{}", AuthError::InvalidToken("bad".to_string())).contains("bad")); } @@ -792,10 +803,7 @@ mod tests { #[test] fn test_extract_token_from_headers() { let mut headers = HeaderMap::new(); - headers.insert( - "Authorization", - HeaderValue::from_static("Bearer token123"), - ); + headers.insert("Authorization", HeaderValue::from_static("Bearer token123")); let token = extract_token_from_headers(&headers); assert_eq!(token.as_deref(), Some("token123")); } diff --git a/veza-stream-server/src/auth/token_validator.rs b/veza-stream-server/src/auth/token_validator.rs index 973a82ae7..5ac7fdf2a 100644 --- a/veza-stream-server/src/auth/token_validator.rs +++ b/veza-stream-server/src/auth/token_validator.rs @@ -128,7 +128,7 @@ impl TokenValidator { Ok(bytes) => bytes, Err(_) => return Ok(false), }; - + let expected_bytes = hex::decode(&expected_signature).map_err(|e| AppError::SignatureError { message: format!("Invalid hex expected: {}", e), @@ -291,7 +291,9 @@ impl TokenValidator { let pool = match &self.db_pool { Some(pool) => pool, None => { - tracing::warn!("No DB pool configured for track access validation — denying access by default"); + tracing::warn!( + "No DB pool configured for track access validation — denying access by default" + ); return Ok(false); } }; @@ -302,7 +304,7 @@ impl TokenValidator { // Check if the track exists and whether it is public let track_row = sqlx::query_as::<_, (bool, Uuid)>( - "SELECT is_public, user_id FROM tracks WHERE id = $1" + "SELECT is_public, user_id FROM tracks WHERE id = $1", ) .bind(track_uuid) .fetch_optional(pool) @@ -543,12 +545,7 @@ mod tests { .unwrap() .as_secs() + 3600; - let result = validator.validate_signature( - "track", - expires, - "not-valid-hex!!", - None, - ); + let result = validator.validate_signature("track", expires, "not-valid-hex!!", None); assert!(result.is_ok()); assert!(!result.unwrap()); } @@ -574,10 +571,15 @@ mod tests { let request = StreamRequest { track_id: "stats_test".to_string(), expires, - sig: validator.generate_signature("stats_test", expires, None).unwrap(), + sig: validator + .generate_signature("stats_test", expires, None) + .unwrap(), user_id: None, }; - validator.validate_and_register_token(&request).await.unwrap(); + validator + .validate_and_register_token(&request) + .await + .unwrap(); let stats2 = validator.get_token_stats().await.unwrap(); assert_eq!(stats2.total_tokens, 1); } diff --git a/veza-stream-server/src/bin/stream_load_test.rs b/veza-stream-server/src/bin/stream_load_test.rs index 3d271bca8..1713a9755 100644 --- a/veza-stream-server/src/bin/stream_load_test.rs +++ b/veza-stream-server/src/bin/stream_load_test.rs @@ -52,13 +52,16 @@ impl SimulatedClient { async fn run(self, duration: Duration) { let connect_url = format!("{}?user_id=load_test_{}", self.url, self.id); - + match connect_async(&connect_url).await { Ok((ws_stream, _)) => { self.stats.active_clients.fetch_add(1, Ordering::SeqCst); if let Err(e) = self.handle_connection(ws_stream, duration).await { // Ignore connection closed errors at shutdown if expected - if !e.to_string().contains("Connection reset without closing handshake") { + if !e + .to_string() + .contains("Connection reset without closing handshake") + { eprintln!("Client {} error: {}", self.id, e); self.stats.errors.fetch_add(1, Ordering::SeqCst); } @@ -88,9 +91,12 @@ impl SimulatedClient { events: vec!["*".to_string()], filters: None, }; - let msg = tokio_tungstenite::tungstenite::Message::Text(serde_json::to_string(&subscribe_cmd)?); + let msg = + tokio_tungstenite::tungstenite::Message::Text(serde_json::to_string(&subscribe_cmd)?); write.send(msg).await?; - self.stats.total_messages_sent.fetch_add(1, Ordering::Relaxed); + self.stats + .total_messages_sent + .fetch_add(1, Ordering::Relaxed); loop { tokio::select! { @@ -142,7 +148,7 @@ impl SimulatedClient { } } } - + let _ = write.close().await; Ok(()) } @@ -179,11 +185,20 @@ async fn main() { println!("\n📊 Load Test Report"); println!("===================="); println!("Duration: {:.2?}", elapsed); - println!("Total Messages Sent: {}", stats.total_messages_sent.load(Ordering::SeqCst)); - println!("Total Messages Received: {}", stats.total_messages_received.load(Ordering::SeqCst)); - println!("Sync Adjustments Received: {}", stats.sync_adjustments_received.load(Ordering::SeqCst)); + println!( + "Total Messages Sent: {}", + stats.total_messages_sent.load(Ordering::SeqCst) + ); + println!( + "Total Messages Received: {}", + stats.total_messages_received.load(Ordering::SeqCst) + ); + println!( + "Sync Adjustments Received: {}", + stats.sync_adjustments_received.load(Ordering::SeqCst) + ); println!("Errors: {}", stats.errors.load(Ordering::SeqCst)); - + // Validate results - allow small error margin for connection teardown let errors = stats.errors.load(Ordering::SeqCst); if errors > (args.clients as u64 / 2) { diff --git a/veza-stream-server/src/codecs/flac.rs b/veza-stream-server/src/codecs/flac.rs index 6780c0b61..ef845192e 100644 --- a/veza-stream-server/src/codecs/flac.rs +++ b/veza-stream-server/src/codecs/flac.rs @@ -1,6 +1,6 @@ use crate::codecs::{ - AudioDecoder, AudioEncoder, DecodedAudio, DecoderConfig, - DecoderInfo, EncoderConfig, EncoderInfo, EncoderMetrics, + AudioDecoder, AudioEncoder, DecodedAudio, DecoderConfig, DecoderInfo, EncoderConfig, + EncoderInfo, EncoderMetrics, }; use crate::error::AppError; use std::time::Instant; diff --git a/veza-stream-server/src/codecs/mp3.rs b/veza-stream-server/src/codecs/mp3.rs index fc0af3de5..f98727a54 100644 --- a/veza-stream-server/src/codecs/mp3.rs +++ b/veza-stream-server/src/codecs/mp3.rs @@ -15,9 +15,7 @@ use serde::{Deserialize, Serialize}; use tokio::sync::RwLock; // Note: Use tracing::info! macro directly instead of importing -use crate::codecs::{ - AudioDecoder, AudioEncoder, CodecQuality, DecoderConfig, EncoderConfig, -}; +use crate::codecs::{AudioDecoder, AudioEncoder, CodecQuality, DecoderConfig, EncoderConfig}; use crate::error::AppError; /// Implémentation de l'encoder MP3 avec LAME diff --git a/veza-stream-server/src/config/mod.rs b/veza-stream-server/src/config/mod.rs index f1f08854c..0a608dfb4 100644 --- a/veza-stream-server/src/config/mod.rs +++ b/veza-stream-server/src/config/mod.rs @@ -1,8 +1,8 @@ +use crate::utils::env::{require_env, require_env_min_length}; use dotenvy::dotenv; use serde::{Deserialize, Serialize}; use std::env; use std::time::Duration; -use crate::utils::env::{require_env, require_env_min_length}; #[derive(Clone, Debug, Serialize, Deserialize)] pub struct Config { @@ -213,100 +213,105 @@ impl Default for Config { { panic!("Default config should not be used in production"); } - + // Pour les tests uniquement // Pour les tests uniquement #[cfg(any(test, debug_assertions))] { Self { - secret_key: "test_secret_key_minimum_32_characters_long".to_string(), - port: 18082, - backend_url: "http://localhost:18080".to_string(), - audio_dir: "./audio".to_string(), - allowed_origins: vec!["*".to_string()], - max_file_size: 104857600, - max_range_size: 10485760, - signature_tolerance: 300, - database: DatabaseConfig { - url: "postgres://user:password@localhost:5432/db".to_string(), - max_connections: 10, - min_connections: 1, - connection_timeout: Duration::from_secs(30), - idle_timeout: Duration::from_secs(600), - max_lifetime: Duration::from_secs(3600), - enable_logging: false, - migrate_on_start: true, - }, - cache: CacheConfig { - max_size_mb: 256, - ttl_seconds: 3600, - cleanup_interval: Duration::from_secs(300), - compression_enabled: true, - redis_url: None, - redis_pool_size: None, - }, - security: SecurityConfig { - jwt_secret: Some("test_jwt_secret_minimum_32_characters_long".to_string()), - jwt_public_key_path: None, - jwt_expiration: Duration::from_secs(3600), - bcrypt_cost: 12, - rate_limit_requests_per_minute: 60, - rate_limit_burst: 10, - cors_max_age: Duration::from_secs(3600), - csrf_protection: true, - secure_headers: true, - tls_cert_path: None, - tls_key_path: None, - }, - performance: PerformanceConfig { - worker_threads: None, - max_blocking_threads: None, - thread_stack_size: None, - tcp_nodelay: true, - tcp_keepalive: None, - buffer_size: 8192, - max_concurrent_streams: 1000, - stream_timeout: Duration::from_secs(30), - compression_level: 6, - }, - monitoring: MonitoringConfig { - metrics_enabled: true, - metrics_port: 9090, - health_check_interval: Duration::from_secs(30), - log_level: "info".to_string(), - log_format: LogFormat::Pretty, - jaeger_endpoint: None, - prometheus_namespace: "stream_server".to_string(), - alert_webhooks: vec![], - }, - notifications: NotificationConfig { - enabled: true, - max_queue_size: 10000, - delivery_workers: 4, - retry_attempts: 3, - retry_delay: Duration::from_secs(60), - batch_size: 100, - email_provider: None, - sms_provider: None, - push_provider: None, - }, - compression: CompressionConfig { - enabled: true, - output_dir: "./compressed".to_string(), - temp_dir: "./temp".to_string(), - max_concurrent_jobs: 4, - cleanup_after_days: 7, - ffmpeg_path: None, - quality_profiles: vec!["high".to_string(), "medium".to_string(), "low".to_string(), "mobile".to_string()], - }, - rabbit_mq: RabbitMQConfig { - url: "amqp://guest:guest@localhost:5672/".to_string(), - max_retries: 3, - retry_interval_secs: 2, - enable: true, - }, - environment: Environment::Development, - } + secret_key: "test_secret_key_minimum_32_characters_long".to_string(), + port: 18082, + backend_url: "http://localhost:18080".to_string(), + audio_dir: "./audio".to_string(), + allowed_origins: vec!["*".to_string()], + max_file_size: 104857600, + max_range_size: 10485760, + signature_tolerance: 300, + database: DatabaseConfig { + url: "postgres://user:password@localhost:5432/db".to_string(), + max_connections: 10, + min_connections: 1, + connection_timeout: Duration::from_secs(30), + idle_timeout: Duration::from_secs(600), + max_lifetime: Duration::from_secs(3600), + enable_logging: false, + migrate_on_start: true, + }, + cache: CacheConfig { + max_size_mb: 256, + ttl_seconds: 3600, + cleanup_interval: Duration::from_secs(300), + compression_enabled: true, + redis_url: None, + redis_pool_size: None, + }, + security: SecurityConfig { + jwt_secret: Some("test_jwt_secret_minimum_32_characters_long".to_string()), + jwt_public_key_path: None, + jwt_expiration: Duration::from_secs(3600), + bcrypt_cost: 12, + rate_limit_requests_per_minute: 60, + rate_limit_burst: 10, + cors_max_age: Duration::from_secs(3600), + csrf_protection: true, + secure_headers: true, + tls_cert_path: None, + tls_key_path: None, + }, + performance: PerformanceConfig { + worker_threads: None, + max_blocking_threads: None, + thread_stack_size: None, + tcp_nodelay: true, + tcp_keepalive: None, + buffer_size: 8192, + max_concurrent_streams: 1000, + stream_timeout: Duration::from_secs(30), + compression_level: 6, + }, + monitoring: MonitoringConfig { + metrics_enabled: true, + metrics_port: 9090, + health_check_interval: Duration::from_secs(30), + log_level: "info".to_string(), + log_format: LogFormat::Pretty, + jaeger_endpoint: None, + prometheus_namespace: "stream_server".to_string(), + alert_webhooks: vec![], + }, + notifications: NotificationConfig { + enabled: true, + max_queue_size: 10000, + delivery_workers: 4, + retry_attempts: 3, + retry_delay: Duration::from_secs(60), + batch_size: 100, + email_provider: None, + sms_provider: None, + push_provider: None, + }, + compression: CompressionConfig { + enabled: true, + output_dir: "./compressed".to_string(), + temp_dir: "./temp".to_string(), + max_concurrent_jobs: 4, + cleanup_after_days: 7, + ffmpeg_path: None, + quality_profiles: vec![ + "high".to_string(), + "medium".to_string(), + "low".to_string(), + "mobile".to_string(), + ], + }, + rabbit_mq: RabbitMQConfig { + url: "amqp://guest:guest@localhost:5672/".to_string(), + max_retries: 3, + retry_interval_secs: 2, + enable: true, + }, + environment: Environment::Development, + } } } } @@ -328,7 +333,7 @@ impl Config { // SECURITY: SECRET_KEY est REQUIS - pas de valeur par défaut let secret_key = require_env_min_length("SECRET_KEY", 32); - + let config = Self { secret_key, // CONFIGURATION PORT UNIFIÉE - Port 18082 aligné sur VITE_STREAM_PORT et docker-compose @@ -425,10 +430,14 @@ impl Config { security: SecurityConfig { // v0.9.1 RS256: prefer JWT_PUBLIC_KEY_PATH; else require JWT_SECRET - jwt_public_key_path: env::var("JWT_PUBLIC_KEY_PATH").ok().filter(|s| !s.is_empty()), + jwt_public_key_path: env::var("JWT_PUBLIC_KEY_PATH") + .ok() + .filter(|s| !s.is_empty()), jwt_secret: { - let has_rs256 = - env::var("JWT_PUBLIC_KEY_PATH").ok().filter(|s| !s.is_empty()).is_some(); + let has_rs256 = env::var("JWT_PUBLIC_KEY_PATH") + .ok() + .filter(|s| !s.is_empty()) + .is_some(); if has_rs256 { env::var("JWT_SECRET").ok().filter(|s| s.len() >= 32) } else { @@ -631,16 +640,19 @@ impl Config { if self.security.jwt_public_key_path.is_none() && self.security.jwt_secret.is_none() { return Err(ConfigError::MissingJwtSecret); } - + // Vérifier que les secrets ne sont pas des valeurs par défaut dangereuses - if self.secret_key == "your-secret-key-change-in-production" - || self.secret_key == "default_secret_key_for_dev_only" { + if self.secret_key == "your-secret-key-change-in-production" + || self.secret_key == "default_secret_key_for_dev_only" + { return Err(ConfigError::WeakSecretKey); } - + if let Some(ref jwt_secret) = self.security.jwt_secret { - if jwt_secret == "default_jwt_secret" - || jwt_secret == "veza_unified_jwt_secret_key_2025_microservices_secure_32chars_minimum" { + if jwt_secret == "default_jwt_secret" + || jwt_secret + == "veza_unified_jwt_secret_key_2025_microservices_secure_32chars_minimum" + { return Err(ConfigError::MissingJwtSecret); } } @@ -733,7 +745,10 @@ mod tests { env::set_var("STREAM_PORT", "8082"); env::set_var("DATABASE_URL", "postgresql://test:test@localhost/test"); env::set_var("SECRET_KEY", "test_secret_key_must_be_long_enough_32_chars"); - env::set_var("JWT_SECRET", "test_jwt_secret_key_must_be_long_enough_32_chars"); + env::set_var( + "JWT_SECRET", + "test_jwt_secret_key_must_be_long_enough_32_chars", + ); // Tester la création de la config let config_result = Config::from_env(); @@ -777,8 +792,14 @@ mod tests { #[test] fn test_config_error_display() { assert_eq!(format!("{}", ConfigError::InvalidPort), "Port invalide"); - assert_eq!(format!("{}", ConfigError::InvalidAudioDir), "Répertoire audio invalide"); - assert_eq!(format!("{}", ConfigError::WeakSecretKey), "Clé secrète faible - changez-la en production"); + assert_eq!( + format!("{}", ConfigError::InvalidAudioDir), + "Répertoire audio invalide" + ); + assert_eq!( + format!("{}", ConfigError::WeakSecretKey), + "Clé secrète faible - changez-la en production" + ); } #[test] diff --git a/veza-stream-server/src/core/encoding_pool.rs b/veza-stream-server/src/core/encoding_pool.rs index 2d8d56f0d..4b6881967 100644 --- a/veza-stream-server/src/core/encoding_pool.rs +++ b/veza-stream-server/src/core/encoding_pool.rs @@ -13,10 +13,10 @@ use crate::transcoding::ffmpeg::progress_parser::FfmpegProgress; use async_channel::{Receiver, Sender}; use sqlx::PgPool; use std::path::PathBuf; +use std::sync::Arc; use std::time::Duration; use tokio::time::timeout; use uuid::Uuid; -use std::sync::Arc; /// Pool d'encodeurs avec workers FFmpeg #[derive(Clone, Debug)] @@ -97,10 +97,7 @@ impl EncoderPool { tracing::info!("Encoder worker {} démarré", i); } - tracing::info!( - "EncoderPool créé avec {} workers", - worker_count - ); + tracing::info!("EncoderPool créé avec {} workers", worker_count); Ok(pool) } @@ -188,11 +185,15 @@ fn input_spec_from_source(source: &StreamSource) -> Result { match source { StreamSource::File { path, .. } => Ok(PathBuf::from(path)), StreamSource::External { url, .. } => Ok(PathBuf::from(url)), - StreamSource::Live { input_device, format: _format, .. } => Ok(PathBuf::from(format!( - "alsa:{}", - input_device - ))), - StreamSource::Generated { generator_type, parameters } => { + StreamSource::Live { + input_device, + format: _format, + .. + } => Ok(PathBuf::from(format!("alsa:{}", input_device))), + StreamSource::Generated { + generator_type, + parameters, + } => { let sr = parameters .get("sample_rate") .map(|s| s.as_str()) @@ -232,15 +233,13 @@ fn output_config_from_stream_output(output: &StreamOutput) -> Result ( segment_duration.as_secs() as u32, PathBuf::from(&output.endpoint).join("index.m3u8"), ), StreamProtocol::DASH { - segment_duration, - .. + segment_duration, .. } => ( segment_duration.as_secs() as u32, PathBuf::from(&output.endpoint).join("index.m3u8"), @@ -308,7 +307,10 @@ impl EncoderWorker { ); // Mettre à jour le statut en DB - if let Err(e) = self.update_job_status(&job.track_id, EncodeJobStatus::Encoding).await { + if let Err(e) = self + .update_job_status(&job.track_id, EncodeJobStatus::Encoding) + .await + { tracing::error!("Worker {} failed to update status: {}", self.id, e); continue; } @@ -358,11 +360,9 @@ impl EncoderWorker { })?; // 3. Spawn le processus FFmpeg - let mut child = command - .spawn() - .map_err(|e| AppError::InternalError { - message: format!("Failed to spawn FFmpeg process: {}", e), - })?; + let mut child = command.spawn().map_err(|e| AppError::InternalError { + message: format!("Failed to spawn FFmpeg process: {}", e), + })?; // 4. Capturer stderr en streaming pour monitoring let stderr = child.stderr.take(); @@ -390,7 +390,12 @@ impl EncoderWorker { // Détecter les erreurs if line.contains("error") || line.contains("Error") || line.contains("ERROR") { - tracing::warn!("Worker {} track {} FFmpeg error: {}", worker_id, track_id, line); + tracing::warn!( + "Worker {} track {} FFmpeg error: {}", + worker_id, + track_id, + line + ); } } }); @@ -413,7 +418,8 @@ impl EncoderWorker { self.parse_and_store_segments(&job).await?; // Mettre à jour le statut - self.update_job_status(&job.track_id, EncodeJobStatus::Done).await?; + self.update_job_status(&job.track_id, EncodeJobStatus::Done) + .await?; tracing::info!( "Worker {} a terminé l'encodage pour track {} (qualité: {})", @@ -424,33 +430,37 @@ impl EncoderWorker { Ok(()) } else { let error_msg = format!("FFmpeg exited with status: {}", status); - self.update_job_status_with_error(&job.track_id, &error_msg).await?; + self.update_job_status_with_error(&job.track_id, &error_msg) + .await?; Err(AppError::InternalError { message: error_msg }) } } Ok(Err(e)) => { let error_msg = format!("FFmpeg IO error: {}", e); - self.update_job_status_with_error(&job.track_id, &error_msg).await?; + self.update_job_status_with_error(&job.track_id, &error_msg) + .await?; Err(AppError::InternalError { message: error_msg }) } Err(_) => { // Timeout: tuer le processus let _ = child.kill().await; - let error_msg = format!("Encoding timed out after {} seconds", JOB_TIMEOUT.as_secs()); - self.update_job_status_with_error(&job.track_id, &error_msg).await?; + let error_msg = + format!("Encoding timed out after {} seconds", JOB_TIMEOUT.as_secs()); + self.update_job_status_with_error(&job.track_id, &error_msg) + .await?; Err(AppError::InternalError { message: error_msg }) } } } /// Parse le manifest HLS et insère les segments en DB (transactionnel P1) - /// + /// /// Garantit l'atomicité : tous les segments ou aucun /// En cas d'erreur, rollback automatique → pas de playlist HLS incomplète async fn parse_and_store_segments(&self, job: &EncodeJob) -> Result<(), AppError> { - use std::fs; - use regex::Regex; use lazy_static::lazy_static; + use regex::Regex; + use std::fs; lazy_static! { // Regex pour parser les lignes EXTINF et les URIs de segments @@ -458,8 +468,8 @@ impl EncoderWorker { } let manifest_path = job.output_dir.join("index.m3u8"); - let manifest_content = fs::read_to_string(&manifest_path) - .map_err(|e| AppError::InternalError { + let manifest_content = + fs::read_to_string(&manifest_path).map_err(|e| AppError::InternalError { message: format!("Failed to read manifest: {}", e), })?; @@ -483,7 +493,7 @@ impl EncoderWorker { else if !line.starts_with('#') && !line.trim().is_empty() { let segment_uri = line.trim(); let segment_path = job.output_dir.join(segment_uri); - + if segment_path.exists() { segments_to_insert.push(( segment_index, @@ -506,9 +516,15 @@ impl EncoderWorker { } // DÉBUT TRANSACTION - let mut tx = self.db_pool.begin().await + let mut tx = self + .db_pool + .begin() + .await .map_err(|e| AppError::InternalError { - message: format!("Failed to begin transaction for segment batch insertion: {}", e), + message: format!( + "Failed to begin transaction for segment batch insertion: {}", + e + ), })?; // 2. VALIDATION : Vérifier que le job existe @@ -575,10 +591,12 @@ impl EncoderWorker { })?; // COMMIT TRANSACTION - tx.commit().await - .map_err(|e| AppError::InternalError { - message: format!("Failed to commit segment batch insertion transaction: {}", e), - })?; + tx.commit().await.map_err(|e| AppError::InternalError { + message: format!( + "Failed to commit segment batch insertion transaction: {}", + e + ), + })?; tracing::info!( worker_id = self.id, diff --git a/veza-stream-server/src/core/encoding_service.rs b/veza-stream-server/src/core/encoding_service.rs index f5892b070..91da8cc7d 100644 --- a/veza-stream-server/src/core/encoding_service.rs +++ b/veza-stream-server/src/core/encoding_service.rs @@ -3,8 +3,8 @@ //! Ce module fournit un service de haut niveau pour lancer des encodages //! audio, vérifier les statuts, et gérer les répertoires de sortie. -use crate::core::FfmpegEncoderPool; use crate::core::job::{EncodeJob, EncodeJobStatus}; +use crate::core::FfmpegEncoderPool; use crate::error::AppError; use sqlx::PgPool; use std::path::{Path, PathBuf}; @@ -67,22 +67,30 @@ impl EncodingService { resource: format!("Track {}", track_id), })?; - let source_path_str: String = track_row.try_get("file_path").map_err(|e| AppError::InternalError { - message: format!("Failed to get file_path from row: {}", e), - })?; - + let source_path_str: String = + track_row + .try_get("file_path") + .map_err(|e| AppError::InternalError { + message: format!("Failed to get file_path from row: {}", e), + })?; + let source_path = PathBuf::from(source_path_str); if !source_path.exists() { return Err(AppError::NotFound { - resource: format!("Source file for track {}: {}", track_id, source_path.display()), + resource: format!( + "Source file for track {}: {}", + track_id, + source_path.display() + ), }); } // 2. Créer le répertoire de sortie - let output_dir = self.base_output_dir + let output_dir = self + .base_output_dir .join(track_id.to_string()) .join(quality); - + tokio::fs::create_dir_all(&output_dir) .await .map_err(|e| AppError::InternalError { @@ -90,12 +98,8 @@ impl EncodingService { })?; // 3. Créer le job d'encodage - let job = EncodeJob::from_quality_profile( - track_id, - source_path, - output_dir, - quality.to_string(), - ); + let job = + EncodeJob::from_quality_profile(track_id, source_path, output_dir, quality.to_string()); // 4. Soumettre le job au pool self.encoder_pool.submit_job(job).await?; @@ -116,7 +120,7 @@ impl EncodingService { /// * `track_id` - ID du track à encoder pub async fn encode_track_all_qualities(&self, track_id: Uuid) -> Result<(), AppError> { let qualities = vec!["low", "medium", "high", "hi_res"]; - + for quality in qualities { if let Err(e) = self.encode_track(track_id, quality).await { tracing::error!( @@ -199,7 +203,9 @@ impl EncodingService { statuses.push(QualityStatus { quality: quality.to_string(), status, - segment_count: segment_info.map(|s| s.get::, _>("segment_count").unwrap_or(0) as u32).unwrap_or(0), + segment_count: segment_info + .map(|s| s.get::, _>("segment_count").unwrap_or(0) as u32) + .unwrap_or(0), error_message: job_row.and_then(|j| j.get("error_message")), }); } @@ -239,4 +245,3 @@ pub struct QualityStatus { pub segment_count: u32, pub error_message: Option, } - diff --git a/veza-stream-server/src/core/job.rs b/veza-stream-server/src/core/job.rs index 30c11e682..11157021d 100644 --- a/veza-stream-server/src/core/job.rs +++ b/veza-stream-server/src/core/job.rs @@ -115,4 +115,3 @@ impl EncodeJobStatus { } } } - diff --git a/veza-stream-server/src/core/mod.rs b/veza-stream-server/src/core/mod.rs index 7ae320613..c763b02fc 100644 --- a/veza-stream-server/src/core/mod.rs +++ b/veza-stream-server/src/core/mod.rs @@ -18,9 +18,9 @@ pub mod sync; pub use buffer::*; // Note: encoding_pool::EncoderPool est exporté explicitement pour éviter conflit avec encoder::EncoderPool +pub use encoding_pool::EncoderPipeline; pub use encoding_pool::EncoderPool as FfmpegEncoderPool; pub use encoding_pool::EncoderPool; -pub use encoding_pool::EncoderPipeline; pub use encoding_service::*; pub use job::*; pub use stream::*; diff --git a/veza-stream-server/src/core/stream.rs b/veza-stream-server/src/core/stream.rs index c62627db7..66a43f28b 100644 --- a/veza-stream-server/src/core/stream.rs +++ b/veza-stream-server/src/core/stream.rs @@ -11,9 +11,9 @@ use tokio::sync::broadcast; use uuid::Uuid; // Note: Use tracing::info! macro directly instead of importing +use crate::core::sync::SyncEngine; use crate::core::AudioFormat; use crate::error::AppError; -use crate::core::sync::SyncEngine; /// Gestionnaire principal des streams en production #[derive(Debug)] @@ -146,7 +146,7 @@ pub enum SyncState { Calibrating { /// Échantillons de drift (offset_ms) drift_samples: Vec, - /// Échantillons de RTT (ms) + /// Échantillons de RTT (ms) rtt_samples: Vec, }, /// Client considéré comme synchronisé @@ -468,34 +468,41 @@ impl StreamManager { pub async fn start_sync_loop(&self) { let streams = self.streams.clone(); let sync_engine = self.sync_engine.clone(); - + // Démarrer la boucle de synchronisation tokio::spawn(async move { let mut interval = tokio::time::interval(Duration::from_millis(50)); // 20Hz sync rate - + loop { interval.tick().await; - + // Synchroniser chaque stream actif for stream_entry in streams.iter() { let stream = stream_entry.value(); - + // Uniquement si le stream est en direct if stream.status == StreamStatus::Live { if !stream.listeners.is_empty() { // Appel non-bloquant au moteur de sync let stream_id = stream.id; let listeners = stream.listeners.clone(); // Arc clone is cheap - + // Extract track_id from metadata - let track_id = stream.metadata.read().current_track.as_ref().map(|t| t.title.clone()); + let track_id = stream + .metadata + .read() + .current_track + .as_ref() + .map(|t| t.title.clone()); // Note: sync_listeners est async, on le spawn pour ne pas bloquer la boucle principale // ou on l'attend si on veut throttle. Ici on spawn pour paralléliser les streams. let engine = sync_engine.clone(); - + tokio::spawn(async move { - if let Err(e) = engine.sync_listeners(stream_id, track_id, listeners).await { + if let Err(e) = + engine.sync_listeners(stream_id, track_id, listeners).await + { tracing::warn!("Sync error for stream {}: {}", stream_id, e); } }); diff --git a/veza-stream-server/src/core/sync.rs b/veza-stream-server/src/core/sync.rs index 24bdad7e0..ff7f3a206 100644 --- a/veza-stream-server/src/core/sync.rs +++ b/veza-stream-server/src/core/sync.rs @@ -11,35 +11,53 @@ use std::sync::Arc; use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; // use async_trait::async_trait; // Not available -use futures::future::BoxFuture; use dashmap::DashMap; +use futures::future::BoxFuture; use parking_lot::RwLock; use serde::{Deserialize, Serialize}; use tokio::sync::broadcast; use uuid::Uuid; // Note: Use tracing::info! macro directly instead of importing -use crate::core::Listener; use crate::core::stream::SyncState; +use crate::core::Listener; use crate::error::AppError; /// Interface de transport pour l'envoi des messages de synchronisation /// Permet de découpler le moteur de sync de l'implémentation WebSocket pub trait SyncTransport: Send + Sync + std::fmt::Debug { /// Envoie un ajustement de synchronisation à un client spécifique - fn send_adjustment<'a>(&'a self, client_id: Uuid, adjustment: SyncAdjustment) -> BoxFuture<'a, Result<(), AppError>>; - + fn send_adjustment<'a>( + &'a self, + client_id: Uuid, + adjustment: SyncAdjustment, + ) -> BoxFuture<'a, Result<(), AppError>>; + /// Envoie un ping de synchronisation fn send_ping<'a>(&'a self, client_id: Uuid) -> BoxFuture<'a, Result<(), AppError>>; - + /// Obtient les stats de connexion (RTT, Offset) - fn get_connection_stats<'a>(&'a self, client_id: Uuid) -> BoxFuture<'a, Result<(Option, Option), AppError>>; + fn get_connection_stats<'a>( + &'a self, + client_id: Uuid, + ) -> BoxFuture<'a, Result<(Option, Option), AppError>>; /// Envoie un message d'initialisation de synchronisation - fn send_init<'a>(&'a self, client_id: Uuid, session_id: Uuid, track_id: String, server_timestamp_ms: u64, position_ms: u64) -> BoxFuture<'a, Result<(), AppError>>; + fn send_init<'a>( + &'a self, + client_id: Uuid, + session_id: Uuid, + track_id: String, + server_timestamp_ms: u64, + position_ms: u64, + ) -> BoxFuture<'a, Result<(), AppError>>; /// Envoie un message de stabilité - fn send_stable<'a>(&'a self, client_id: Uuid, session_id: Uuid) -> BoxFuture<'a, Result<(), AppError>>; + fn send_stable<'a>( + &'a self, + client_id: Uuid, + session_id: Uuid, + ) -> BoxFuture<'a, Result<(), AppError>>; } /// Moteur de synchronisation principal @@ -403,7 +421,13 @@ impl SyncEngine { tokio::spawn(async move { sync_engine - .sync_individual_listener(&synchronizer, listeners_map, listener_id, track_id, master_time) + .sync_individual_listener( + &synchronizer, + listeners_map, + listener_id, + track_id, + master_time, + ) .await }) }) @@ -452,61 +476,94 @@ impl SyncEngine { if let Some(track) = track_id { let position = synchronizer.master_clock.get_position().as_millis() as u64; if let Some(transport) = &self.transport { - transport.send_init(listener.id, synchronizer.stream_id, track, master_time.timestamp, position).await?; + transport + .send_init( + listener.id, + synchronizer.stream_id, + track, + master_time.timestamp, + position, + ) + .await?; } // Update state to Calibrating if let Some(mut l) = listeners_map.get_mut(&listener.id) { - l.sync_state = SyncState::Calibrating { drift_samples: Vec::new(), rtt_samples: Vec::new() }; + l.sync_state = SyncState::Calibrating { + drift_samples: Vec::new(), + rtt_samples: Vec::new(), + }; } } return Ok(()); - }, - SyncState::Calibrating { drift_samples: _, rtt_samples: _ } => { + } + SyncState::Calibrating { + drift_samples: _, + rtt_samples: _, + } => { // Send Ping if let Some(transport) = &self.transport { let _ = transport.send_ping(listener.id).await; } // Get stats let (rtt, offset) = if let Some(transport) = &self.transport { - transport.get_connection_stats(listener.id).await.unwrap_or((None, None)) + transport + .get_connection_stats(listener.id) + .await + .unwrap_or((None, None)) } else { (None, None) }; // Store samples if valid if let (Some(r), Some(o)) = (rtt, offset) { - let latency = r; // Simple RTT/2 est souvent utilisé mais ici on prend RTT brut pr le moment - self.latency_map.insert(listener.id, latency); - - // Calculate raw drift - let drift = self.drift_compensator.calculate_drift(&listener, synchronizer.master_clock.get_position(), rtt, Some(o)).await?; - - // Update state logic - if let Some(mut l) = listeners_map.get_mut(&listener.id) { - if let SyncState::Calibrating { drift_samples, rtt_samples } = &mut l.sync_state { - drift_samples.push(drift as i64); - rtt_samples.push(r.as_millis() as u64); - - // Check for stability (e.g., 5 samples) - if drift_samples.len() >= 5 { - // Transition to Synchronized - l.sync_state = SyncState::Synchronized; - if let Some(transport) = &self.transport { - let _ = transport.send_stable(l.id, synchronizer.stream_id).await; - } - } - } - } + let latency = r; // Simple RTT/2 est souvent utilisé mais ici on prend RTT brut pr le moment + self.latency_map.insert(listener.id, latency); + + // Calculate raw drift + let drift = self + .drift_compensator + .calculate_drift( + &listener, + synchronizer.master_clock.get_position(), + rtt, + Some(o), + ) + .await?; + + // Update state logic + if let Some(mut l) = listeners_map.get_mut(&listener.id) { + if let SyncState::Calibrating { + drift_samples, + rtt_samples, + } = &mut l.sync_state + { + drift_samples.push(drift as i64); + rtt_samples.push(r.as_millis() as u64); + + // Check for stability (e.g., 5 samples) + if drift_samples.len() >= 5 { + // Transition to Synchronized + l.sync_state = SyncState::Synchronized; + if let Some(transport) = &self.transport { + let _ = + transport.send_stable(l.id, synchronizer.stream_id).await; + } + } + } + } } return Ok(()); - }, + } SyncState::Resyncing => { // Send Ping heavily and transition back to Calibrating or Synchronized // For now, reset to Calibrating - if let Some(mut l) = listeners_map.get_mut(&listener.id) { - l.sync_state = SyncState::Calibrating { drift_samples: vec![], rtt_samples: vec![] }; - } - return Ok(()); + if let Some(mut l) = listeners_map.get_mut(&listener.id) { + l.sync_state = SyncState::Calibrating { + drift_samples: vec![], + rtt_samples: vec![], + }; + } + return Ok(()); } SyncState::Synchronized => { // FALLTHROUGH to existing normal sync logic below @@ -523,7 +580,10 @@ impl SyncEngine { // Récupérer les stats de latence let (rtt, clock_offset) = if let Some(transport) = &self.transport { - transport.get_connection_stats(listener.id).await.unwrap_or((None, None)) + transport + .get_connection_stats(listener.id) + .await + .unwrap_or((None, None)) } else { (None, None) }; @@ -539,15 +599,21 @@ impl SyncEngine { // Calculer le drift let drift = self .drift_compensator - .calculate_drift(&listener, synchronizer.master_clock.get_position(), rtt, clock_offset) + .calculate_drift( + &listener, + synchronizer.master_clock.get_position(), + rtt, + clock_offset, + ) .await?; // Check if we lost sync - if drift.abs() > 200.0 { // 200ms threshold for Resync - if let Some(mut l) = listeners_map.get_mut(&listener.id) { - l.sync_state = SyncState::Resyncing; - } - return Ok(()); + if drift.abs() > 200.0 { + // 200ms threshold for Resync + if let Some(mut l) = listeners_map.get_mut(&listener.id) { + l.sync_state = SyncState::Resyncing; + } + return Ok(()); } // Créer l'ajustement de synchronisation seulement si nécessaire (Threshold 40ms) @@ -567,7 +633,7 @@ impl SyncEngine { // Update SynchronizedClient maps (non-essential for P1-2 core logic but good practice) // ... (Skipping full update of internal map for brevity to focus on protocol) - + Ok(()) } @@ -629,8 +695,15 @@ impl SyncEngine { ) -> Result<(), AppError> { // Envoyer l'ajustement au client via le transport configuré (WebSocket) if let Some(transport) = &self.transport { - if let Err(e) = transport.send_adjustment(client_id, adjustment.clone()).await { - tracing::warn!(?client_id, "Failed to send sync adjustment via transport: {}", e); + if let Err(e) = transport + .send_adjustment(client_id, adjustment.clone()) + .await + { + tracing::warn!( + ?client_id, + "Failed to send sync adjustment via transport: {}", + e + ); // On continue quand même pour émettre l'événement interne } else { tracing::debug!(?client_id, "Sync adjustment sent via transport"); @@ -838,56 +911,56 @@ impl DriftCompensator { ) -> Result { // En production, on utilise les données remontées par le client // via les WebSockets (stockées dans session_data) ou via le protocole SyncPing/Pong - + let master_ms = master_position.as_secs_f64() * 1000.0; // Si on a des mesures précises via SyncPing/Pong (P1-1) if let Some(offset) = clock_offset_ms { - // Position théorique côté client (si parfaitement synchronisé) - // Client time = Server time + offset - // Mais on veut savoir où est le PLAYBACK par rapport au MAITRE. - // On a besoin de la position de lecture du client. - - // Si le client remonte sa position de lecture via WebSocket (session_data) - if let Some(pos_str) = listener.session_data.get("position_ms") { + // Position théorique côté client (si parfaitement synchronisé) + // Client time = Server time + offset + // Mais on veut savoir où est le PLAYBACK par rapport au MAITRE. + // On a besoin de la position de lecture du client. + + // Si le client remonte sa position de lecture via WebSocket (session_data) + if let Some(pos_str) = listener.session_data.get("position_ms") { if let Ok(client_playback_ms) = pos_str.parse::() { // Le timestamp absolu n'est pas suffisant, il faut comparer les positions relatives // DANS LE STREAM. - + // Mais attendez, calculate_drift compare la POSITION de lecture (00:01:23) // Le clock offset sert à corriger les timestamps si on utilisait des timestamps absolus. // Ici on compare listener.position vs master.position. - + // Si on a un RTT, on sait que l'info du client date de RTT/2 let rtt_latency = rtt.map(|d| d.as_secs_f64() * 1000.0).unwrap_or(0.0); let one_way_delay = rtt_latency / 2.0; - + // La position reçue est celle qu'avait le client il y a `one_way_delay` ms // Donc sa position REELLE actuelle est estimée à : let estimated_client_playback_ms = client_playback_ms + one_way_delay; - + // Drift = Estimated Client - Master let drift_ms = estimated_client_playback_ms - master_ms; - + tracing::trace!( "Drift calc (Real): ClientReported={:.2}, LatencyCorrection={:.2}, EstClient={:.2}, Master={:.2}, Drift={:.2}, Offset={}", client_playback_ms, one_way_delay, estimated_client_playback_ms, master_ms, drift_ms, offset ); - // Note: L'offset d'horloge (clock_offset) n'est pas directement utilisé ici - // car on compare des "positions de lecture" (durée depuis le début du morceau) - // et non des timestamps muraux. + // Note: L'offset d'horloge (clock_offset) n'est pas directement utilisé ici + // car on compare des "positions de lecture" (durée depuis le début du morceau) + // et non des timestamps muraux. // SAUF si le client utilise des timestamps muraux pour dire "j'étais à telle position à tel timestamp". - // Le protocole actuel envoie juste "position_ms". + // Le protocole actuel envoie juste "position_ms". // L'offset sert surtout à NTP ou si le client disait "À mon heure H, j'étais à P". // Pour l'instant, P1-1 demande d'intégrer offset mais sans changer tout le message "position". // On utilisera l'offset pour logging ou future usage, mais la correction latence (RTT/2) est la plus critique ici. - + return Ok(drift_ms.max(-5000.0).min(5000.0)); } - } + } } - + // 1. Tenter de récupérer la position rapportée par le client let client_position = if let Some(pos_str) = listener.session_data.get("position_ms") { match pos_str.parse::() { @@ -904,7 +977,7 @@ impl DriftCompensator { // Drift < 0 signifie que le client est EN RETARD let client_ms = client_position.as_secs_f64() * 1000.0; let master_ms = master_position.as_secs_f64() * 1000.0; - + let drift_ms = client_ms - master_ms; // 3. Clamper les valeurs pour éviter les corrections violentes @@ -929,8 +1002,8 @@ impl DriftCompensator { #[cfg(test)] mod tests { use super::*; - use std::collections::HashMap; use crate::core::stream::SyncState; + use std::collections::HashMap; fn create_test_listener(id: Uuid, position_ms: Option) -> Listener { let mut session_data = HashMap::new(); @@ -958,7 +1031,10 @@ mod tests { let listener = create_test_listener(Uuid::new_v4(), Some(1000)); let master_pos = Duration::from_millis(1000); - let drift = compensator.calculate_drift(&listener, master_pos, None, None).await.unwrap(); + let drift = compensator + .calculate_drift(&listener, master_pos, None, None) + .await + .unwrap(); assert_eq!(drift, 0.0); } @@ -968,7 +1044,10 @@ mod tests { let listener = create_test_listener(Uuid::new_v4(), Some(1100)); // +100ms let master_pos = Duration::from_millis(1000); - let drift = compensator.calculate_drift(&listener, master_pos, None, None).await.unwrap(); + let drift = compensator + .calculate_drift(&listener, master_pos, None, None) + .await + .unwrap(); assert_eq!(drift, 100.0); } @@ -978,7 +1057,10 @@ mod tests { let listener = create_test_listener(Uuid::new_v4(), Some(900)); // -100ms let master_pos = Duration::from_millis(1000); - let drift = compensator.calculate_drift(&listener, master_pos, None, None).await.unwrap(); + let drift = compensator + .calculate_drift(&listener, master_pos, None, None) + .await + .unwrap(); assert_eq!(drift, -100.0); } @@ -988,7 +1070,10 @@ mod tests { let listener = create_test_listener(Uuid::new_v4(), Some(10000)); // +9000ms ahead let master_pos = Duration::from_millis(1000); - let drift = compensator.calculate_drift(&listener, master_pos, None, None).await.unwrap(); + let drift = compensator + .calculate_drift(&listener, master_pos, None, None) + .await + .unwrap(); assert_eq!(drift, 5000.0); // Clamped to +5000 } @@ -998,7 +1083,10 @@ mod tests { let listener = create_test_listener(Uuid::new_v4(), None); let master_pos = Duration::from_millis(1000); - let drift = compensator.calculate_drift(&listener, master_pos, None, None).await.unwrap(); + let drift = compensator + .calculate_drift(&listener, master_pos, None, None) + .await + .unwrap(); assert_eq!(drift, 0.0); } @@ -1008,7 +1096,7 @@ mod tests { // Client plays at 1000ms. let listener = create_test_listener(Uuid::new_v4(), Some(1000)); let master_pos = Duration::from_millis(1000); - + // RTT is 200ms => One way delay is 100ms. // So client ACTUAL position is 1000 + 100 = 1100ms. // Master is at 1000ms. @@ -1016,12 +1104,14 @@ mod tests { let rtt = Some(Duration::from_millis(200)); let offset = Some(0); // Offset doesn't affect relative stream position drift logic directly in current impl - let drift = compensator.calculate_drift(&listener, master_pos, rtt, offset).await.unwrap(); + let drift = compensator + .calculate_drift(&listener, master_pos, rtt, offset) + .await + .unwrap(); assert_eq!(drift, 100.0); } } - #[cfg(test)] mod transport_tests { use super::*; @@ -1033,12 +1123,16 @@ mod transport_tests { pub sent_pings: Arc>>, pub mock_stats: Arc, Option)>>>, // Ignore validation of these for now - pub sent_inits: Arc>>, + pub sent_inits: Arc>>, pub sent_stables: Arc>>, } impl SyncTransport for MockTransport { - fn send_adjustment<'a>(&'a self, client_id: Uuid, adjustment: SyncAdjustment) -> BoxFuture<'a, Result<(), AppError>> { + fn send_adjustment<'a>( + &'a self, + client_id: Uuid, + adjustment: SyncAdjustment, + ) -> BoxFuture<'a, Result<(), AppError>> { let sent = self.sent_adjustments.clone(); Box::pin(async move { sent.lock().unwrap().push((client_id, adjustment)); @@ -1054,7 +1148,10 @@ mod transport_tests { }) } - fn get_connection_stats<'a>(&'a self, client_id: Uuid) -> BoxFuture<'a, Result<(Option, Option), AppError>> { + fn get_connection_stats<'a>( + &'a self, + client_id: Uuid, + ) -> BoxFuture<'a, Result<(Option, Option), AppError>> { let stats = self.mock_stats.clone(); Box::pin(async move { let guard = stats.lock().unwrap(); @@ -1062,7 +1159,14 @@ mod transport_tests { }) } - fn send_init<'a>(&'a self, client_id: Uuid, _session_id: Uuid, _track_id: String, _server_timestamp_ms: u64, _position_ms: u64) -> BoxFuture<'a, Result<(), AppError>> { + fn send_init<'a>( + &'a self, + client_id: Uuid, + _session_id: Uuid, + _track_id: String, + _server_timestamp_ms: u64, + _position_ms: u64, + ) -> BoxFuture<'a, Result<(), AppError>> { let sent = self.sent_inits.clone(); Box::pin(async move { sent.lock().unwrap().push(client_id); @@ -1070,7 +1174,11 @@ mod transport_tests { }) } - fn send_stable<'a>(&'a self, client_id: Uuid, _session_id: Uuid) -> BoxFuture<'a, Result<(), AppError>> { + fn send_stable<'a>( + &'a self, + client_id: Uuid, + _session_id: Uuid, + ) -> BoxFuture<'a, Result<(), AppError>> { let sent = self.sent_stables.clone(); Box::pin(async move { sent.lock().unwrap().push(client_id); @@ -1084,25 +1192,20 @@ mod transport_tests { let sent = Arc::new(Mutex::new(Vec::new())); let pings = Arc::new(Mutex::new(Vec::new())); let stats = Arc::new(Mutex::new(HashMap::new())); - - let transport = Arc::new(MockTransport { + + let transport = Arc::new(MockTransport { sent_adjustments: sent.clone(), sent_pings: pings.clone(), mock_stats: stats.clone(), sent_inits: Arc::new(Mutex::new(Vec::new())), sent_stables: Arc::new(Mutex::new(Vec::new())), }); - + let config = Arc::new(RwLock::new(SyncConfig::default())); let time_server = Arc::new(TimeServer::new(vec![]).await.unwrap()); let drift_compensator = Arc::new(DriftCompensator::new()); - - let engine = SyncEngine::new( - time_server, - drift_compensator, - config, - Some(transport) - ); + + let engine = SyncEngine::new(time_server, drift_compensator, config, Some(transport)); let client_id = Uuid::new_v4(); let adjustment = SyncAdjustment { @@ -1118,7 +1221,10 @@ mod transport_tests { }, }; - engine.apply_sync_adjustment(client_id, adjustment.clone()).await.unwrap(); + engine + .apply_sync_adjustment(client_id, adjustment.clone()) + .await + .unwrap(); let sent_guard = sent.lock().unwrap(); assert_eq!(sent_guard.len(), 1); @@ -1128,24 +1234,19 @@ mod transport_tests { async fn test_sync_state_machine_initialization() { // Setup let sent_inits = Arc::new(Mutex::new(Vec::new())); - let transport = Arc::new(MockTransport { + let transport = Arc::new(MockTransport { sent_adjustments: Arc::new(Mutex::new(Vec::new())), sent_pings: Arc::new(Mutex::new(Vec::new())), mock_stats: Arc::new(Mutex::new(HashMap::new())), sent_inits: sent_inits.clone(), sent_stables: Arc::new(Mutex::new(Vec::new())), }); - + let config = Arc::new(RwLock::new(SyncConfig::default())); let time_server = Arc::new(TimeServer::new(vec![]).await.unwrap()); let drift_compensator = Arc::new(DriftCompensator::new()); - - let engine = SyncEngine::new( - time_server, - drift_compensator, - config, - Some(transport) - ); + + let engine = SyncEngine::new(time_server, drift_compensator, config, Some(transport)); let stream_id = Uuid::new_v4(); let listener_id = Uuid::new_v4(); @@ -1162,19 +1263,23 @@ mod transport_tests { sync_state: SyncState::Desynchronized, }; if let Some(pos) = Some(0) { - listener.session_data.insert("position_ms".to_string(), pos.to_string()); + listener + .session_data + .insert("position_ms".to_string(), pos.to_string()); } - + // Mock listeners map let listeners = Arc::new(DashMap::new()); listeners.insert(listener_id, listener); - + // Run sync let track_id = Some("test_track".to_string()); - let result = engine.sync_listeners(stream_id, track_id, listeners.clone()).await; - + let result = engine + .sync_listeners(stream_id, track_id, listeners.clone()) + .await; + assert!(result.is_ok()); - + // Verify SyncInit sent let inits = sent_inits.lock().unwrap(); assert_eq!(inits.len(), 1); diff --git a/veza-stream-server/src/error.rs b/veza-stream-server/src/error.rs index 1c9de0230..7acea9480 100644 --- a/veza-stream-server/src/error.rs +++ b/veza-stream-server/src/error.rs @@ -768,10 +768,9 @@ mod tests { #[tokio::test] async fn test_tokio_elapsed_conversion() { use tokio::time::Duration; - let elapsed_err = tokio::time::timeout( - Duration::from_nanos(0), - async { futures::future::pending::<()>().await }, - ) + let elapsed_err = tokio::time::timeout(Duration::from_nanos(0), async { + futures::future::pending::<()>().await + }) .await .unwrap_err(); let app_err: AppError = elapsed_err.into(); diff --git a/veza-stream-server/src/health/mod.rs b/veza-stream-server/src/health/mod.rs index b0e2a3e8c..c9f79d3e2 100644 --- a/veza-stream-server/src/health/mod.rs +++ b/veza-stream-server/src/health/mod.rs @@ -191,16 +191,12 @@ impl HealthMonitor { message, duration_ms: duration, last_success: if status == CheckStatus::Pass { - Some( - unix_timestamp_secs(), - ) + Some(unix_timestamp_secs()) } else { self.get_last_success("system_resources").await }, last_failure: if status == CheckStatus::Fail { - Some( - unix_timestamp_secs(), - ) + Some(unix_timestamp_secs()) } else { self.get_last_failure("system_resources").await }, @@ -245,16 +241,12 @@ impl HealthMonitor { message, duration_ms: duration, last_success: if status == CheckStatus::Pass { - Some( - unix_timestamp_secs(), - ) + Some(unix_timestamp_secs()) } else { self.get_last_success("disk_space").await }, last_failure: if status == CheckStatus::Fail { - Some( - unix_timestamp_secs(), - ) + Some(unix_timestamp_secs()) } else { self.get_last_failure("disk_space").await }, @@ -298,16 +290,12 @@ impl HealthMonitor { message, duration_ms: duration, last_success: if status == CheckStatus::Pass { - Some( - unix_timestamp_secs(), - ) + Some(unix_timestamp_secs()) } else { self.get_last_success("database").await }, last_failure: if status == CheckStatus::Fail { - Some( - unix_timestamp_secs(), - ) + Some(unix_timestamp_secs()) } else { self.get_last_failure("database").await }, @@ -425,16 +413,12 @@ impl HealthMonitor { message: format!("{} ({} fichiers)", message, file_count), duration_ms: duration, last_success: if status == CheckStatus::Pass { - Some( - unix_timestamp_secs(), - ) + Some(unix_timestamp_secs()) } else { self.get_last_success("audio_directory").await }, last_failure: if status == CheckStatus::Fail { - Some( - unix_timestamp_secs(), - ) + Some(unix_timestamp_secs()) } else { self.get_last_failure("audio_directory").await }, @@ -550,11 +534,7 @@ impl HealthMonitor { } CheckStatus::Warn => { let alert = HealthAlert { - id: format!( - "{}_{}", - name, - unix_timestamp_secs() - ), + id: format!("{}_{}", name, unix_timestamp_secs()), severity: AlertSeverity::Warning, message: format!("Service {} dégradé: {}", name, check.message), component: name.clone(), @@ -662,23 +642,32 @@ impl HealthMonitor { if metadata.is_dir() { (CheckStatus::Pass, "Répertoire accessible".to_string(), 0) } else { - (CheckStatus::Fail, "Le chemin n'est pas un répertoire".to_string(), 0) + ( + CheckStatus::Fail, + "Le chemin n'est pas un répertoire".to_string(), + 0, + ) } } - Err(e) => { - (CheckStatus::Fail, format!("Inaccessible: {}", e), 0) - } + Err(e) => (CheckStatus::Fail, format!("Inaccessible: {}", e), 0), } } async fn check_transcoding(&self) { let start = SystemTime::now(); - let (status, message) = match tokio::process::Command::new("ffmpeg").arg("-version").output().await { + let (status, message) = match tokio::process::Command::new("ffmpeg") + .arg("-version") + .output() + .await + { Ok(output) => { if output.status.success() { (CheckStatus::Pass, "FFmpeg détecté".to_string()) } else { - (CheckStatus::Warn, "FFmpeg détecté mais erreur d'exécution".to_string()) + ( + CheckStatus::Warn, + "FFmpeg détecté mais erreur d'exécution".to_string(), + ) } } Err(e) => (CheckStatus::Warn, format!("FFmpeg non détecté: {}", e)), diff --git a/veza-stream-server/src/lib.rs b/veza-stream-server/src/lib.rs index 637a63842..7ae6ed694 100644 --- a/veza-stream-server/src/lib.rs +++ b/veza-stream-server/src/lib.rs @@ -24,12 +24,12 @@ pub mod structured_logging; pub mod transcoding; // NEW: Phase 3 Transcoding Engine pub mod utils; // ORIGIN Architecture: Event-driven via RabbitMQ -use std::sync::Arc; -use sqlx::PgPool; -use parking_lot::RwLock as PlRwLock; -use crate::core::sync::{SyncEngine, TimeServer, DriftCompensator, SyncConfig as CoreSyncConfig}; -use crate::core::stream::{StreamManager, StreamConfig as CoreStreamConfig}; +use crate::core::stream::{StreamConfig as CoreStreamConfig, StreamManager}; +use crate::core::sync::{DriftCompensator, SyncConfig as CoreSyncConfig, SyncEngine, TimeServer}; use crate::streaming::websocket_transport::WebSocketSyncTransport; +use parking_lot::RwLock as PlRwLock; +use sqlx::PgPool; +use std::sync::Arc; /// État global de l'application /// Cette structure contient tous les services et composants nécessaires au serveur @@ -55,7 +55,9 @@ pub struct AppState { pub stream_manager: Arc, } -async fn build_revocation_store(config: &config::Config) -> Arc { +async fn build_revocation_store( + config: &config::Config, +) -> Arc { if let Some(ref redis_url) = config.cache.redis_url { match auth::revocation_store::RedisRevocationStore::new(redis_url).await { Ok(store) => { @@ -97,9 +99,8 @@ impl AppState { let metrics = Arc::new(utils::metrics::Metrics::new(config_arc.clone())); // AnalyticsEngine uses the shared pool - let analytics = Arc::new( - analytics::AnalyticsEngine::new(pool.clone(), config_arc.clone()).await?, - ); + let analytics = + Arc::new(analytics::AnalyticsEngine::new(pool.clone(), config_arc.clone()).await?); let audio_processor = Arc::new(audio::processing::AudioProcessor::new(config_arc.clone())); @@ -108,7 +109,10 @@ impl AppState { )); // HealthMonitor needs config and analytics for db check - let health_monitor = Arc::new(health::HealthMonitor::new(config_arc.clone(), analytics.clone())); + let health_monitor = Arc::new(health::HealthMonitor::new( + config_arc.clone(), + analytics.clone(), + )); // Revocation store: Redis si REDIS_URL défini, sinon in-memory let revocation_store = build_revocation_store(&config).await; @@ -137,11 +141,15 @@ impl AppState { transcoding_engine.start(); // SyncEngine initialization - let time_server = Arc::new(TimeServer::new(vec![]).await.map_err(|e| format!("Failed to init TimeServer: {}", e))?); + let time_server = Arc::new( + TimeServer::new(vec![]) + .await + .map_err(|e| format!("Failed to init TimeServer: {}", e))?, + ); let drift_compensator = Arc::new(DriftCompensator::new()); let sync_config = Arc::new(PlRwLock::new(CoreSyncConfig::default())); let sync_transport = Arc::new(WebSocketSyncTransport::new(websocket_manager.clone())); - + let sync_engine = Arc::new(SyncEngine::new( time_server, drift_compensator, @@ -150,11 +158,14 @@ impl AppState { )); // StreamManager initialization - let stream_manager = Arc::new(StreamManager::new( - CoreStreamConfig::default(), - pool.clone(), - sync_engine.clone(), - ).await?); + let stream_manager = Arc::new( + StreamManager::new( + CoreStreamConfig::default(), + pool.clone(), + sync_engine.clone(), + ) + .await?, + ); // Start the sync loop stream_manager.start_sync_loop().await; diff --git a/veza-stream-server/src/main.rs b/veza-stream-server/src/main.rs index 2d8c1d9f1..22234139f 100644 --- a/veza-stream-server/src/main.rs +++ b/veza-stream-server/src/main.rs @@ -1,10 +1,7 @@ // file: stream_server/src/main.rs use stream_server::event_bus::RabbitMQEventBus; -use stream_server::{ - config::Config, - AppState, -}; +use stream_server::{config::Config, AppState}; use metrics_exporter_prometheus::PrometheusBuilder; use std::{net::SocketAddr, sync::Arc, time::Duration}; @@ -15,22 +12,26 @@ async fn main() -> Result<(), Box> { // FIX #12, #24: Utiliser veza-common::logging pour configuration unifiée // FIX #24: LOG_LEVEL est maintenant lu automatiquement par veza-common::logging let is_prod = std::env::var("APP_ENV").unwrap_or_default() == "production"; - + // Configuration des fichiers de logs vers /var/log/veza/ let log_dir = std::env::var("LOG_DIR").unwrap_or_else(|_| "/var/log/veza".to_string()); let log_file = format!("{}/stream.log", log_dir); - + let log_config = veza_common::logging::LoggingConfig { // FIX #24: Laisser veza-common::logging normaliser LOG_LEVEL automatiquement // Si LOG_LEVEL n'est pas défini, veza-common utilisera "INFO" par défaut level: String::new(), // Vide = utiliser LOG_LEVEL ou RUST_LOG automatiquement - format: if is_prod { "json".to_string() } else { "text".to_string() }, + format: if is_prod { + "json".to_string() + } else { + "text".to_string() + }, file: Some(log_file), max_size: 100 * 1024 * 1024, // 100MB max_files: 5, compress: true, }; - + veza_common::logging::init_with_config(log_config) .map_err(|e| format!("Failed to initialize logging: {}", e))?; diff --git a/veza-stream-server/src/middleware/logging.rs b/veza-stream-server/src/middleware/logging.rs index b25f3e898..dc6c1817f 100644 --- a/veza-stream-server/src/middleware/logging.rs +++ b/veza-stream-server/src/middleware/logging.rs @@ -18,11 +18,11 @@ pub async fn request_logging_middleware( let _method = request.method().clone(); let uri = request.uri().clone(); let headers = request.headers().clone(); - + // FIX #23: Extraire le request_id depuis les headers (propagé depuis le backend Go) // Si X-Request-ID est présent, l'utiliser, sinon générer un nouveau UUID let request_id = extract_request_id_from_headers(&headers); - + // Extraire aussi le trace_id si présent let trace_id = extract_trace_id_from_headers(&headers); @@ -37,7 +37,7 @@ pub async fn request_logging_middleware( .parse() .unwrap_or_else(|_| HeaderValue::from_static("unknown")), ); - + // Créer un span avec le request_id pour la corrélation let span = tracing::span!( tracing::Level::INFO, @@ -264,7 +264,7 @@ fn extract_request_id_from_headers(headers: &HeaderMap) -> Uuid { } } } - + // Si aucun request_id valide n'est trouvé, générer un nouveau UUID Uuid::new_v4() } @@ -277,7 +277,7 @@ fn extract_trace_id_from_headers(headers: &HeaderMap) -> Option { return Some(trace_id_str.to_string()); } } - + None } diff --git a/veza-stream-server/src/middleware/rate_limit.rs b/veza-stream-server/src/middleware/rate_limit.rs index 8ae6c564f..b1bfba23d 100644 --- a/veza-stream-server/src/middleware/rate_limit.rs +++ b/veza-stream-server/src/middleware/rate_limit.rs @@ -1,3 +1,4 @@ +use crate::AppState; use axum::{ extract::{Request, State}, http::{HeaderMap, StatusCode}, @@ -5,12 +6,9 @@ use axum::{ response::Response, }; use governor::{ - clock::DefaultClock, - state::keyed::DashMapStateStore, - Quota, RateLimiter as GovLimiter, + clock::DefaultClock, state::keyed::DashMapStateStore, Quota, RateLimiter as GovLimiter, }; use std::num::NonZeroU32; -use crate::AppState; /// Per-IP keyed rate limiter backed by the `governor` crate. /// Uses an in-memory DashMap store with a sliding-window quota. diff --git a/veza-stream-server/src/middleware/security.rs b/veza-stream-server/src/middleware/security.rs index cb7820c67..015df82eb 100644 --- a/veza-stream-server/src/middleware/security.rs +++ b/veza-stream-server/src/middleware/security.rs @@ -202,7 +202,9 @@ mod tests { assert!(contains_injection_patterns("")); assert!(contains_injection_patterns("$(cat /etc/passwd)")); assert!(contains_injection_patterns("javascript:alert(1)")); - assert!(contains_injection_patterns("test.php?id=1; drop table users")); + assert!(contains_injection_patterns( + "test.php?id=1; drop table users" + )); assert!(contains_injection_patterns("")); assert!(contains_injection_patterns("file.mp3?param=test|whoami")); assert!(!contains_injection_patterns( diff --git a/veza-stream-server/src/monitoring/metrics.rs b/veza-stream-server/src/monitoring/metrics.rs index 7ab847b15..ddbc15cce 100644 --- a/veza-stream-server/src/monitoring/metrics.rs +++ b/veza-stream-server/src/monitoring/metrics.rs @@ -123,8 +123,7 @@ impl StreamMetrics { )?; let hls_errors_total = Counter::with_opts( - prometheus::Opts::new("hls_errors_total", "Total HLS errors") - .namespace("veza_stream"), + prometheus::Opts::new("hls_errors_total", "Total HLS errors").namespace("veza_stream"), )?; // Enregistrement des métriques diff --git a/veza-stream-server/src/routes/api.rs b/veza-stream-server/src/routes/api.rs index fda1e203f..4a37b8160 100644 --- a/veza-stream-server/src/routes/api.rs +++ b/veza-stream-server/src/routes/api.rs @@ -1,6 +1,24 @@ +use crate::{ + auth, + error::AppError, + middleware::{ + logging::request_logging_middleware, rate_limit::rate_limit_middleware, + security::security_headers_middleware, + }, + routes::transcode as transcode_routes, + streaming::{ + hls::{HLSGenerator, HLSQuality}, + websocket::{websocket_handler, WebSocketQuery}, + }, + utils::{ + build_safe_path, serve_partial_file, time::unix_timestamp_secs, validate_filename, + validate_signature, + }, + AppState, +}; use axum::{ - extract::{State, Path, Query}, - http::{header, HeaderValue, Method, StatusCode, HeaderMap}, + extract::{Path, Query, State}, + http::{header, HeaderMap, HeaderValue, Method, StatusCode}, response::{IntoResponse, Json, Response}, routing::{get, post}, Router, @@ -12,21 +30,6 @@ use tower_http::{ cors::{AllowOrigin, Any, CorsLayer}, timeout::TimeoutLayer, }; -use crate::{ - auth, - AppState, - middleware::{ - logging::request_logging_middleware, rate_limit::rate_limit_middleware, - security::security_headers_middleware, - }, - error::AppError, - utils::{build_safe_path, serve_partial_file, time::unix_timestamp_secs, validate_filename, validate_signature}, - streaming::{ - websocket::{websocket_handler, WebSocketQuery}, - hls::{HLSGenerator, HLSQuality}, - }, - routes::transcode as transcode_routes, -}; pub fn create_routes( state: AppState, @@ -35,7 +38,7 @@ pub fn create_routes( // SÉCURITÉ: CORS restrictif avec liste d'origines autorisées (pas Any) let allowed_origins_str = std::env::var("ALLOWED_ORIGINS") .unwrap_or_else(|_| "http://localhost:5176,http://localhost:3000".to_string()); - + let cors = if allowed_origins_str.trim() == "*" { // Mode développement: autoriser toutes les origines CorsLayer::new() @@ -118,14 +121,26 @@ pub fn create_routes( .route("/stream/{filename}", get(stream_audio)) .route("/internal/jobs/transcode", post(internal_transcode_handler)) // Routes de transcodage HLS - .route("/v1/stream/transcode", post(transcode_routes::transcode_handler)) + .route( + "/v1/stream/transcode", + post(transcode_routes::transcode_handler), + ) .route("/v1/stream/job/{id}", get(transcode_routes::get_job_status)) - .route("/api/streams/jobs/{id}/status", get(transcode_routes::get_job_status_detailed)) + .route( + "/api/streams/jobs/{id}/status", + get(transcode_routes::get_job_status_detailed), + ) // Routes HLS transcode protégées par JWT (A01 - audit sécurité) .merge( Router::new() - .route("/v1/stream/hls/{job_id}/index.m3u8", get(transcode_routes::serve_hls_manifest)) - .route("/v1/stream/hls/{job_id}/{segment}", get(transcode_routes::serve_hls_segment)) + .route( + "/v1/stream/hls/{job_id}/index.m3u8", + get(transcode_routes::serve_hls_manifest), + ) + .route( + "/v1/stream/hls/{job_id}/{segment}", + get(transcode_routes::serve_hls_segment), + ) .route_layer(axum::middleware::from_fn_with_state( state.clone(), auth::hls_auth_middleware, @@ -172,7 +187,10 @@ async fn internal_transcode_handler( .and_then(|v| v.to_str().ok()) .unwrap_or(""); if provided != expected_key { - return Err((StatusCode::UNAUTHORIZED, "Internal API key required".to_string())); + return Err(( + StatusCode::UNAUTHORIZED, + "Internal API key required".to_string(), + )); } } } @@ -234,9 +252,7 @@ async fn health_check() -> Json { })) } -async fn detailed_health_check( - State(state): State, -) -> impl IntoResponse { +async fn detailed_health_check(State(state): State) -> impl IntoResponse { let health_status = state.health_monitor.get_health_status().await; let mut json_status = serde_json::to_value(health_status).unwrap_or_default(); @@ -272,7 +288,6 @@ async fn stream_audio( State(state): State, headers: HeaderMap, ) -> std::result::Result { - // Validation des paramètres let expires = params.get("expires").ok_or(( StatusCode::BAD_REQUEST, @@ -285,38 +300,23 @@ async fn stream_audio( ))?; // Validation du nom de fichier - let validated_filename = validate_filename(&filename).map_err(|_| { - ( - StatusCode::BAD_REQUEST, - "Invalid filename".to_string(), - ) - })?; + let validated_filename = validate_filename(&filename) + .map_err(|_| (StatusCode::BAD_REQUEST, "Invalid filename".to_string()))?; // Validation de la signature if !validate_signature(&state.config, &validated_filename, expires, sig) { - return Err(( - StatusCode::FORBIDDEN, - "Invalid signature".to_string(), - )); + return Err((StatusCode::FORBIDDEN, "Invalid signature".to_string())); } // Construction du chemin sécurisé let file_path = build_safe_path(&state.config, &format!("{}.mp3", validated_filename)) - .map_err(|_| { - ( - StatusCode::NOT_FOUND, - "File not found".to_string(), - ) - })?; + .map_err(|_| (StatusCode::NOT_FOUND, "File not found".to_string()))?; // Streaming du fichier serve_partial_file(&state.config, file_path, headers) .await .map_err(|e| match e { - AppError::NotFound { .. } => ( - StatusCode::NOT_FOUND, - "File not found".to_string(), - ), + AppError::NotFound { .. } => (StatusCode::NOT_FOUND, "File not found".to_string()), AppError::InvalidData { .. } => ( StatusCode::RANGE_NOT_SATISFIABLE, "Invalid range".to_string(), @@ -346,7 +346,6 @@ async fn hls_master_playlist_wrapper( Path(track_id): Path, State(state): State, ) -> impl IntoResponse { - // Générer la master playlist avec toutes les qualités supportées let generator = HLSGenerator::new(track_id, state.config.backend_url.clone()) .with_quality(HLSQuality::high()) @@ -373,12 +372,14 @@ async fn hls_quality_playlist_wrapper( let mut segment_count = 0; let output_dir = Path::new(&state.config.compression.output_dir); - + // Simulation / Vérification de l'existence des segments if let Ok(mut entries) = tokio::fs::read_dir(output_dir).await { while let Ok(Some(entry)) = entries.next_entry().await { let filename = entry.file_name().to_string_lossy().to_string(); - if filename.starts_with(&format!("{}_{}_", track_id, quality)) && filename.ends_with(".ts") { + if filename.starts_with(&format!("{}_{}_", track_id, quality)) + && filename.ends_with(".ts") + { segment_count += 1; } } @@ -386,15 +387,15 @@ async fn hls_quality_playlist_wrapper( // Si aucun segment trouvé, comportement de test fallback if segment_count == 0 { - if state.config.is_development() { - segment_count = 5; - } else { - return (StatusCode::NOT_FOUND, "No segments found").into_response(); - } + if state.config.is_development() { + segment_count = 5; + } else { + return (StatusCode::NOT_FOUND, "No segments found").into_response(); + } } let generator = HLSGenerator::new(track_id, state.config.backend_url.clone()); - + match generator.generate_quality_playlist(&quality, segment_count) { Ok(playlist) => ( [ @@ -402,7 +403,8 @@ async fn hls_quality_playlist_wrapper( (header::CACHE_CONTROL, "public, max-age=60"), ], playlist, - ).into_response(), + ) + .into_response(), Err(_) => (StatusCode::NOT_FOUND, "Quality not found").into_response(), } } @@ -412,18 +414,17 @@ async fn hls_segment_wrapper( Path((track_id, quality, segment)): Path<(String, String, String)>, State(state): State, ) -> impl IntoResponse { - let index_part = segment.strip_prefix("segment_").unwrap_or("00000.ts"); let real_filename = format!("{}_{}_{}", track_id, quality, index_part); // Use std::path::PathBuf via join directly let file_path = std::path::Path::new(&state.config.compression.output_dir).join(real_filename); if !file_path.exists() { - // Fallback pour tests - if state.config.is_development() { - return (StatusCode::OK, "Fake TS content").into_response(); - } - return (StatusCode::NOT_FOUND, "Segment not found").into_response(); + // Fallback pour tests + if state.config.is_development() { + return (StatusCode::OK, "Fake TS content").into_response(); + } + return (StatusCode::NOT_FOUND, "Segment not found").into_response(); } let req_headers = HeaderMap::new(); diff --git a/veza-stream-server/src/routes/encoding.rs b/veza-stream-server/src/routes/encoding.rs index f7f2193d0..64ca82d89 100644 --- a/veza-stream-server/src/routes/encoding.rs +++ b/veza-stream-server/src/routes/encoding.rs @@ -26,17 +26,21 @@ pub async fn encode_track_handler( // Valider la qualité if !["low", "medium", "high", "hi_res"].contains(&quality.as_str()) { return Err(AppError::InvalidData { - message: format!("Invalid quality: {}. Must be one of: low, medium, high, hi_res", quality), + message: format!( + "Invalid quality: {}. Must be one of: low, medium, high, hi_res", + quality + ), }); } - encoding_service - .encode_track(track_id, &quality) - .await?; + encoding_service.encode_track(track_id, &quality).await?; Ok(Json(EncodeResponse { success: true, - message: format!("Encoding started for track {} with quality {}", track_id, quality), + message: format!( + "Encoding started for track {} with quality {}", + track_id, quality + ), track_id, quality, })) @@ -66,14 +70,9 @@ pub async fn get_encoding_status_handler( Path(track_id): Path, State(encoding_service): State, ) -> Result> { - let statuses = encoding_service - .get_encoding_status(track_id) - .await?; + let statuses = encoding_service.get_encoding_status(track_id).await?; - Ok(Json(EncodingStatusResponse { - track_id, - statuses, - })) + Ok(Json(EncodingStatusResponse { track_id, statuses })) } /// GET /stream/:track_id/:quality/index.m3u8 @@ -83,14 +82,13 @@ pub async fn serve_hls_manifest_handler( State(_encoding_service): State, ) -> Result { use std::path::PathBuf; - // Construire le chemin du manifest // Note: On suppose que le base_output_dir est configuré dans EncodingService // Pour l'instant, on utilise un chemin par défaut - let base_dir = std::env::var("STREAM_OUTPUT_DIR") - .unwrap_or_else(|_| "/data/streams".to_string()); - + let base_dir = + std::env::var("STREAM_OUTPUT_DIR").unwrap_or_else(|_| "/data/streams".to_string()); + let manifest_path = PathBuf::from(base_dir) .join(track_id.to_string()) .join(&quality) @@ -133,9 +131,9 @@ pub async fn serve_hls_segment_handler( } // Construire le chemin du segment - let base_dir = std::env::var("STREAM_OUTPUT_DIR") - .unwrap_or_else(|_| "/data/streams".to_string()); - + let base_dir = + std::env::var("STREAM_OUTPUT_DIR").unwrap_or_else(|_| "/data/streams".to_string()); + let segment_path = PathBuf::from(base_dir) .join(track_id.to_string()) .join(&quality) @@ -143,7 +141,10 @@ pub async fn serve_hls_segment_handler( if !segment_path.exists() { return Err(AppError::NotFound { - resource: format!("HLS segment {} for track {} quality {}", segment_name, track_id, quality), + resource: format!( + "HLS segment {} for track {} quality {}", + segment_name, track_id, quality + ), }); } @@ -180,4 +181,3 @@ pub struct EncodingStatusResponse { pub track_id: Uuid, pub statuses: Vec, } - diff --git a/veza-stream-server/src/routes/mod.rs b/veza-stream-server/src/routes/mod.rs index bdf3e5ea2..368664bf2 100644 --- a/veza-stream-server/src/routes/mod.rs +++ b/veza-stream-server/src/routes/mod.rs @@ -4,4 +4,3 @@ pub mod transcode; pub use transcode::*; pub mod api; pub use api::create_routes; - diff --git a/veza-stream-server/src/routes/transcode.rs b/veza-stream-server/src/routes/transcode.rs index 1edbf5d3c..fa3b3944f 100644 --- a/veza-stream-server/src/routes/transcode.rs +++ b/veza-stream-server/src/routes/transcode.rs @@ -1,6 +1,6 @@ -use crate::AppState; use crate::error::{AppError, Result}; use crate::transcoding::{JobPriority, QualityProfile}; +use crate::AppState; use axum::{ extract::{Multipart, Path, State}, http::StatusCode, @@ -55,69 +55,65 @@ pub async fn transcode_handler( let mut track_id: Option = None; // Parser le multipart form - while let Some(field) = multipart.next_field().await.map_err(|e| AppError::InvalidData { - message: format!("Failed to parse multipart: {}", e), - })? { + while let Some(field) = multipart + .next_field() + .await + .map_err(|e| AppError::InvalidData { + message: format!("Failed to parse multipart: {}", e), + })? + { let field_name = field.name().unwrap_or(""); match field_name { "file" => { - let filename = field.file_name() + let filename = field + .file_name() .ok_or_else(|| AppError::InvalidData { message: "Missing filename in file field".to_string(), })? .to_string(); - + // Générer un track_id basé sur le nom de fichier ou UUID track_id = Some( filename .trim_end_matches(|c: char| c == '.' || c.is_alphanumeric()) .to_string() .replace(" ", "_") - .replace(".", "_") + .replace(".", "_"), ); // Sauvegarder le fichier temporairement let temp_dir = std::env::temp_dir(); - let temp_file = temp_dir.join(format!("{}_{}", - Uuid::new_v4(), - filename - )); - + let temp_file = temp_dir.join(format!("{}_{}", Uuid::new_v4(), filename)); + let data = field.bytes().await.map_err(|e| AppError::InvalidData { message: format!("Failed to read file data: {}", e), })?; - - tokio::fs::write(&temp_file, data).await.map_err(|e| AppError::InternalError { - message: format!("Failed to save uploaded file: {}", e), - })?; - + + tokio::fs::write(&temp_file, data) + .await + .map_err(|e| AppError::InternalError { + message: format!("Failed to save uploaded file: {}", e), + })?; + file_path = Some(temp_file); } "codec" => { - codec = Some( - field.text().await.map_err(|e| AppError::InvalidData { - message: format!("Failed to read codec: {}", e), - })? - ); + codec = Some(field.text().await.map_err(|e| AppError::InvalidData { + message: format!("Failed to read codec: {}", e), + })?); } "bitrate" => { - bitrate = field.text().await - .ok() - .and_then(|s| s.parse::().ok()); + bitrate = field.text().await.ok().and_then(|s| s.parse::().ok()); } "quality_profile" => { - quality_profile = Some( - field.text().await.map_err(|e| AppError::InvalidData { - message: format!("Failed to read quality_profile: {}", e), - })? - ); + quality_profile = Some(field.text().await.map_err(|e| AppError::InvalidData { + message: format!("Failed to read quality_profile: {}", e), + })?); } "priority" => { - priority = Some( - field.text().await.map_err(|e| AppError::InvalidData { - message: format!("Failed to read priority: {}", e), - })? - ); + priority = Some(field.text().await.map_err(|e| AppError::InvalidData { + message: format!("Failed to read priority: {}", e), + })?); } _ => {} } @@ -174,14 +170,9 @@ pub async fn transcode_handler( .join(format!("transcode_{}", Uuid::new_v4())); // Soumettre le job - let job_id = state.transcoding_engine - .submit( - track_id, - input_path, - output_dir, - profile, - job_priority, - ) + let job_id = state + .transcoding_engine + .submit(track_id, input_path, output_dir, profile, job_priority) .await .map_err(|e| AppError::InternalError { message: format!("Failed to submit transcoding job: {}", e), @@ -201,7 +192,9 @@ pub async fn get_job_status( State(state): State, ) -> Result> { let job_manager = state.transcoding_engine.job_manager(); - let job = job_manager.get_status(job_id).await + let job = job_manager + .get_status(job_id) + .await .ok_or_else(|| AppError::NotFound { resource: format!("Job {}", job_id), })?; @@ -228,7 +221,9 @@ pub async fn serve_hls_manifest( State(state): State, ) -> Result { let job_manager = state.transcoding_engine.job_manager(); - let job = job_manager.get_status(job_id).await + let job = job_manager + .get_status(job_id) + .await .ok_or_else(|| AppError::NotFound { resource: format!("Job {}", job_id), })?; @@ -236,19 +231,23 @@ pub async fn serve_hls_manifest( // Vérifier que le job est terminé if !matches!(job.status, crate::transcoding::JobStatus::Completed) { return Err(AppError::InvalidData { - message: format!("Job {} is not completed yet (status: {:?})", job_id, job.status), + message: format!( + "Job {} is not completed yet (status: {:?})", + job_id, job.status + ), }); } let manifest_path = job.output_dir.join("index.m3u8"); - + if !manifest_path.exists() { return Err(AppError::NotFound { resource: format!("HLS manifest for job {}", job_id), }); } - let content = tokio::fs::read_to_string(&manifest_path).await + let content = tokio::fs::read_to_string(&manifest_path) + .await .map_err(|e| AppError::InternalError { message: format!("Failed to read HLS manifest: {}", e), })?; @@ -269,20 +268,23 @@ pub async fn serve_hls_segment( State(state): State, ) -> Result { let job_manager = state.transcoding_engine.job_manager(); - let job = job_manager.get_status(job_id).await + let job = job_manager + .get_status(job_id) + .await .ok_or_else(|| AppError::NotFound { resource: format!("Job {}", job_id), })?; let segment_path = job.output_dir.join(&segment); - + if !segment_path.exists() { return Err(AppError::NotFound { resource: format!("Segment {} for job {}", segment, job_id), }); } - let content = tokio::fs::read(&segment_path).await + let content = tokio::fs::read(&segment_path) + .await .map_err(|e| AppError::InternalError { message: format!("Failed to read segment: {}", e), })?; @@ -329,7 +331,8 @@ pub async fn get_job_status_detailed( use crate::database::pool::create_pool_from_config; // Créer un pool temporaire depuis la config - let pool = create_pool_from_config(&state.config.database).await + let pool = create_pool_from_config(&state.config.database) + .await .map_err(|e| AppError::InternalError { message: format!("Failed to create database pool: {}", e), })?; @@ -378,7 +381,9 @@ pub async fn get_job_status_detailed( index: row.get("segment_index"), path: row.get("path"), duration: row.get("duration"), - created_at: row.get::, _>("created_at").to_rfc3339(), + created_at: row + .get::, _>("created_at") + .to_rfc3339(), }) .collect(); @@ -409,14 +414,18 @@ pub async fn get_job_status_detailed( segments, current_duration, progress, - created_at: job.get::, _>("created_at").to_rfc3339(), + created_at: job + .get::, _>("created_at") + .to_rfc3339(), started_at: None, // NOTE: Add started_at to stream_jobs if needed completed_at: if job.get::("status") == "done" { - Some(job.get::, _>("updated_at").to_rfc3339()) + Some( + job.get::, _>("updated_at") + .to_rfc3339(), + ) } else { None }, error: job.get("error_message"), })) } - diff --git a/veza-stream-server/src/streaming/adaptive.rs b/veza-stream-server/src/streaming/adaptive.rs index dade48d22..c3ade1374 100644 --- a/veza-stream-server/src/streaming/adaptive.rs +++ b/veza-stream-server/src/streaming/adaptive.rs @@ -271,44 +271,60 @@ impl AdaptiveStreamingManager { async fn update_quality_decisions(&self) { let mut sessions = self.sessions.write().await; - + for session in sessions.values_mut() { // 1. Get current metrics let metrics = &session.performance_metrics; let current_quality = &session.current_quality; - + // 2. Determine current profile bandwidth - let current_profile = self.profiles.iter() + let current_profile = self + .profiles + .iter() .find(|p| &p.quality_id == current_quality); - + if let Some(profile) = current_profile { let required_bandwidth = profile.bandwidth_estimate_kbps; - + // 3. Decision logic let mut new_quality = current_quality.clone(); - + // DOWNGRADE Logic // If buffer is critical (< 20%) or bandwidth is insufficient - if metrics.buffer_health_percentage < 20.0 || metrics.download_speed_kbps < required_bandwidth { + if metrics.buffer_health_percentage < 20.0 + || metrics.download_speed_kbps < required_bandwidth + { // Find a lower quality profile if let Some(lower) = self.get_lower_quality(current_quality) { new_quality = lower.quality_id.clone(); - tracing::info!("📉 Downgrading session {} to {}", session.session_id, new_quality); + tracing::info!( + "📉 Downgrading session {} to {}", + session.session_id, + new_quality + ); } } // UPGRADE Logic // If buffer is healthy (> 80%) AND bandwidth is plentiful (> 1.5x required) - else if metrics.buffer_health_percentage > 80.0 && metrics.download_speed_kbps > (required_bandwidth as f32 * 1.5) as u32 { + else if metrics.buffer_health_percentage > 80.0 + && metrics.download_speed_kbps > (required_bandwidth as f32 * 1.5) as u32 + { // Find a higher quality profile // But verify that the NEW profile's bandwidth is also covered if let Some(higher) = self.get_higher_quality(current_quality) { - if metrics.download_speed_kbps > (higher.bandwidth_estimate_kbps as f32 * 1.2) as u32 { + if metrics.download_speed_kbps + > (higher.bandwidth_estimate_kbps as f32 * 1.2) as u32 + { new_quality = higher.quality_id.clone(); - tracing::info!("📈 Upgrading session {} to {}", session.session_id, new_quality); + tracing::info!( + "📈 Upgrading session {} to {}", + session.session_id, + new_quality + ); } } } - + // 4. Apply change if new_quality != *current_quality { session.current_quality = new_quality; @@ -324,11 +340,18 @@ impl AdaptiveStreamingManager { // We can sort them by bandwidth to find the next lower let mut sorted_profiles = self.profiles.clone(); sorted_profiles.sort_by_key(|p| p.bitrate_kbps); // Ascending (Mobile -> High) - + // Find current index - if let Some(idx) = sorted_profiles.iter().position(|p| p.quality_id == current_quality) { + if let Some(idx) = sorted_profiles + .iter() + .position(|p| p.quality_id == current_quality) + { if idx > 0 { - if let Some(pos) = self.profiles.iter().position(|p| p.quality_id == sorted_profiles[idx - 1].quality_id) { + if let Some(pos) = self + .profiles + .iter() + .position(|p| p.quality_id == sorted_profiles[idx - 1].quality_id) + { return Some(&self.profiles[pos]); } } @@ -339,10 +362,17 @@ impl AdaptiveStreamingManager { fn get_higher_quality(&self, current_quality: &str) -> Option<&AdaptiveProfile> { let mut sorted_profiles = self.profiles.clone(); sorted_profiles.sort_by_key(|p| p.bitrate_kbps); // Ascending - - if let Some(idx) = sorted_profiles.iter().position(|p| p.quality_id == current_quality) { + + if let Some(idx) = sorted_profiles + .iter() + .position(|p| p.quality_id == current_quality) + { if idx < sorted_profiles.len() - 1 { - if let Some(pos) = self.profiles.iter().position(|p| p.quality_id == sorted_profiles[idx + 1].quality_id) { + if let Some(pos) = self + .profiles + .iter() + .position(|p| p.quality_id == sorted_profiles[idx + 1].quality_id) + { return Some(&self.profiles[pos]); } } diff --git a/veza-stream-server/src/streaming/hls.rs b/veza-stream-server/src/streaming/hls.rs index e3b8be468..effc00506 100644 --- a/veza-stream-server/src/streaming/hls.rs +++ b/veza-stream-server/src/streaming/hls.rs @@ -23,12 +23,12 @@ pub async fn segment_file( let output_pattern = output_dir.join(&segment_filename_pattern); // Note: On ne génère pas la playlist ici car HLSGenerator le fait dynamiquement // On demande juste à ffmpeg de générer les segments - + // Commande: ffmpeg -i input -c copy -f segment -segment_time 10 -segment_format mpegts output_%05d.ts // Note: -c copy suppose que l'entrée est déjà dans un format compatible (AAC/MP3). // Si ce n'est pas le cas, il faudrait transcoder (ex: -c:a aac -b:a 128k) // Pour simplifier, on suppose que l'entrée est déjà transcodée par CompressionEngine - + let status = tokio::process::Command::new("ffmpeg") .arg("-y") .arg("-i") @@ -472,10 +472,9 @@ mod tests { #[test] fn test_quality_playlist_empty_segments() { - let generator = - HLSGenerator::new("track".to_string(), "http://localhost".to_string()) - .with_quality(HLSQuality::high()) - .with_segment_duration(Duration::from_secs(10)); + let generator = HLSGenerator::new("track".to_string(), "http://localhost".to_string()) + .with_quality(HLSQuality::high()) + .with_segment_duration(Duration::from_secs(10)); let playlist = generator.generate_quality_playlist("high", 0).unwrap(); assert!(playlist.contains("#EXT-X-ENDLIST")); assert!(playlist.contains("#EXT-X-MEDIA-SEQUENCE:0")); diff --git a/veza-stream-server/src/streaming/protocols/http_range.rs b/veza-stream-server/src/streaming/protocols/http_range.rs index d34fa02be..7409e6c40 100644 --- a/veza-stream-server/src/streaming/protocols/http_range.rs +++ b/veza-stream-server/src/streaming/protocols/http_range.rs @@ -48,7 +48,7 @@ impl FromStr for ByteRange { fn from_str(s: &str) -> Result { let s = s.trim(); - + // Vérifier le préfixe if !s.starts_with("bytes=") { return Err(RangeError::UnsupportedUnit); @@ -56,18 +56,20 @@ impl FromStr for ByteRange { // Extraire la partie valeur let value = &s[6..]; - + // RFC 7233 ne supporte pas explicitement les ranges multiples pour ce cas d'usage simple // Si une virgule est présente, on rejette pour simplifier (ou on prend le premier) // Pour un serveur de streaming audio, on se concentre sur le single range. if value.contains(',') { // Pour l'instant, on rejette les multipart ranges pour simplifier la logique de réponse - return Err(RangeError::InvalidFormat); + return Err(RangeError::InvalidFormat); } if let Some(suffix_val) = value.strip_prefix('-') { // Cas Suffix: bytes=-500 - let len = suffix_val.parse::().map_err(|_| RangeError::InvalidFormat)?; + let len = suffix_val + .parse::() + .map_err(|_| RangeError::InvalidFormat)?; if len == 0 { return Err(RangeError::InvalidFormat); // Suffixe 0 n'a pas de sens } @@ -78,13 +80,19 @@ impl FromStr for ByteRange { match parts.as_slice() { [start_str, ""] => { // Cas From: bytes=500- - let start = start_str.parse::().map_err(|_| RangeError::InvalidFormat)?; + let start = start_str + .parse::() + .map_err(|_| RangeError::InvalidFormat)?; Ok(ByteRange::From(start)) } [start_str, end_str] => { // Cas Exact: bytes=0-499 - let start = start_str.parse::().map_err(|_| RangeError::InvalidFormat)?; - let end = end_str.parse::().map_err(|_| RangeError::InvalidFormat)?; + let start = start_str + .parse::() + .map_err(|_| RangeError::InvalidFormat)?; + let end = end_str + .parse::() + .map_err(|_| RangeError::InvalidFormat)?; if start > end { return Err(RangeError::InvalidValues); @@ -169,10 +177,22 @@ mod tests { #[test] fn test_parse_errors() { assert_eq!("".parse::(), Err(RangeError::UnsupportedUnit)); - assert_eq!("bits=0-1".parse::(), Err(RangeError::UnsupportedUnit)); - assert_eq!("bytes=abc".parse::(), Err(RangeError::InvalidFormat)); - assert_eq!("bytes=100-50".parse::(), Err(RangeError::InvalidValues)); - assert_eq!("bytes=-".parse::(), Err(RangeError::InvalidFormat)); + assert_eq!( + "bits=0-1".parse::(), + Err(RangeError::UnsupportedUnit) + ); + assert_eq!( + "bytes=abc".parse::(), + Err(RangeError::InvalidFormat) + ); + assert_eq!( + "bytes=100-50".parse::(), + Err(RangeError::InvalidValues) + ); + assert_eq!( + "bytes=-".parse::(), + Err(RangeError::InvalidFormat) + ); } #[test] @@ -180,12 +200,28 @@ mod tests { let total = 1000; let range = ByteRange::Exact(0, 499); let resolved = range.resolve(total).unwrap(); - assert_eq!(resolved, ResolvedRange { start: 0, end: 499, length: 500, total_size: 1000 }); + assert_eq!( + resolved, + ResolvedRange { + start: 0, + end: 499, + length: 500, + total_size: 1000 + } + ); // Clamping let range = ByteRange::Exact(500, 2000); let resolved = range.resolve(total).unwrap(); - assert_eq!(resolved, ResolvedRange { start: 500, end: 999, length: 500, total_size: 1000 }); + assert_eq!( + resolved, + ResolvedRange { + start: 500, + end: 999, + length: 500, + total_size: 1000 + } + ); } #[test] @@ -193,22 +229,46 @@ mod tests { let total = 1000; let range = ByteRange::From(900); let resolved = range.resolve(total).unwrap(); - assert_eq!(resolved, ResolvedRange { start: 900, end: 999, length: 100, total_size: 1000 }); + assert_eq!( + resolved, + ResolvedRange { + start: 900, + end: 999, + length: 100, + total_size: 1000 + } + ); } #[test] fn test_resolve_suffix() { let total = 1000; - + // Normal suffix let range = ByteRange::Suffix(100); let resolved = range.resolve(total).unwrap(); - assert_eq!(resolved, ResolvedRange { start: 900, end: 999, length: 100, total_size: 1000 }); + assert_eq!( + resolved, + ResolvedRange { + start: 900, + end: 999, + length: 100, + total_size: 1000 + } + ); // Oversized suffix let range = ByteRange::Suffix(2000); let resolved = range.resolve(total).unwrap(); - assert_eq!(resolved, ResolvedRange { start: 0, end: 999, length: 1000, total_size: 1000 }); + assert_eq!( + resolved, + ResolvedRange { + start: 0, + end: 999, + length: 1000, + total_size: 1000 + } + ); } #[test] @@ -220,10 +280,15 @@ mod tests { let range = ByteRange::Exact(150, 200); assert_eq!(range.resolve(total), Err(RangeError::NotSatisfiable)); } - + #[test] fn test_format_header() { - let resolved = ResolvedRange { start: 0, end: 499, length: 500, total_size: 1000 }; + let resolved = ResolvedRange { + start: 0, + end: 499, + length: 500, + total_size: 1000, + }; assert_eq!(format_content_range(&resolved), "bytes 0-499/1000"); } } diff --git a/veza-stream-server/src/streaming/websocket.rs b/veza-stream-server/src/streaming/websocket.rs index e1fb3eb71..9b94dddd5 100644 --- a/veza-stream-server/src/streaming/websocket.rs +++ b/veza-stream-server/src/streaming/websocket.rs @@ -1,3 +1,4 @@ +use crate::AppState; use axum::{ extract::{ ws::{Message, WebSocket, WebSocketUpgrade}, @@ -7,7 +8,6 @@ use axum::{ response::Response, Json, }; -use crate::AppState; use serde::{Deserialize, Serialize}; use std::{ collections::HashMap, @@ -222,7 +222,7 @@ pub struct WebSocketConnection { pub last_activity: SystemTime, pub subscribed_events: Vec, pub sender: broadcast::Sender, - + // Latency metrics pub last_ping_sent_at: Option, pub latency_rtt: Option, @@ -548,14 +548,17 @@ impl WebSocketManager { error: None, }, - WebSocketCommand::SyncPong { ping_id: _, client_timestamp } => { + WebSocketCommand::SyncPong { + ping_id: _, + client_timestamp, + } => { let mut conns = connections.write().await; if let Some(conn) = conns.get_mut(&connection_id) { if let Some(sent_at) = conn.last_ping_sent_at { if let Ok(elapsed) = sent_at.elapsed() { // RTT = now - sent_time let rtt_ms = elapsed.as_millis() as u64; - + // Offset = client_ts - (server_sent_ts + RTT/2) // server_sent_ts is approximated by sent_at let sent_at_ms = sent_at @@ -563,10 +566,11 @@ impl WebSocketManager { .unwrap_or(Duration::ZERO) .as_millis() as u64; let server_estimated_arrival_ts = sent_at_ms + (rtt_ms / 2); - + // Offset positif = Client est en avance (son horloge est plus grande) // Offset négatif = Client est en retard - let offset_ms = client_timestamp as i64 - server_estimated_arrival_ts as i64; + let offset_ms = + client_timestamp as i64 - server_estimated_arrival_ts as i64; conn.latency_rtt = Some(Duration::from_millis(rtt_ms)); conn.clock_offset_ms = Some(offset_ms); @@ -583,17 +587,20 @@ impl WebSocketManager { ); } } else { - tracing::warn!("Received SyncPong from {} without active Ping", connection_id); + tracing::warn!( + "Received SyncPong from {} without active Ping", + connection_id + ); } } - - WebSocketEvent::CommandResponse { + + WebSocketEvent::CommandResponse { command_id: "sync_pong".to_string(), // internal success: true, data: None, error: None, } - }, + } _ => WebSocketEvent::CommandResponse { command_id: "unknown".to_string(), @@ -820,7 +827,10 @@ pub async fn websocket_handler( // Require a token — reject unauthenticated connections let token = token.ok_or_else(|| { - tracing::warn!("WebSocket connection rejected: no token provided from {}", ip_address); + tracing::warn!( + "WebSocket connection rejected: no token provided from {}", + ip_address + ); ( StatusCode::UNAUTHORIZED, Json(serde_json::json!({"error": "Authentication token required"})), @@ -831,7 +841,9 @@ pub async fn websocket_handler( let validation_result = state.auth_manager.validate_token(&token).await; if !validation_result.valid { - let reason = validation_result.error.unwrap_or_else(|| "Invalid token".to_string()); + let reason = validation_result + .error + .unwrap_or_else(|| "Invalid token".to_string()); tracing::warn!("WebSocket auth failed from {}: {}", ip_address, reason); return Err(( StatusCode::UNAUTHORIZED, @@ -1004,8 +1016,14 @@ mod tests { let restored: WebSocketEvent = serde_json::from_str(&json).unwrap(); match (&event, &restored) { ( - WebSocketEvent::ServerMessage { message: m1, level: l1 }, - WebSocketEvent::ServerMessage { message: m2, level: l2 }, + WebSocketEvent::ServerMessage { + message: m1, + level: l1, + }, + WebSocketEvent::ServerMessage { + message: m2, + level: l2, + }, ) => { assert_eq!(m1, m2); assert!(std::mem::discriminant(l1) == std::mem::discriminant(l2)); diff --git a/veza-stream-server/src/streaming/websocket_transport.rs b/veza-stream-server/src/streaming/websocket_transport.rs index 893d8c276..e687ac7a6 100644 --- a/veza-stream-server/src/streaming/websocket_transport.rs +++ b/veza-stream-server/src/streaming/websocket_transport.rs @@ -1,10 +1,10 @@ -use crate::core::sync::{SyncTransport, SyncAdjustment}; -use crate::streaming::websocket::{WebSocketManager, WebSocketEvent}; +use crate::core::sync::{SyncAdjustment, SyncTransport}; use crate::error::AppError; -use uuid::Uuid; +use crate::streaming::websocket::{WebSocketEvent, WebSocketManager}; +use futures::future::BoxFuture; use std::sync::Arc; use std::time::Duration; -use futures::future::BoxFuture; +use uuid::Uuid; #[derive(Debug, Clone)] pub struct WebSocketSyncTransport { @@ -18,7 +18,11 @@ impl WebSocketSyncTransport { } impl SyncTransport for WebSocketSyncTransport { - fn send_adjustment<'a>(&'a self, client_id: Uuid, adjustment: SyncAdjustment) -> BoxFuture<'a, Result<(), AppError>> { + fn send_adjustment<'a>( + &'a self, + client_id: Uuid, + adjustment: SyncAdjustment, + ) -> BoxFuture<'a, Result<(), AppError>> { let manager = self.manager.clone(); Box::pin(async move { let event = WebSocketEvent::SyncAdjustment { @@ -34,13 +38,19 @@ impl SyncTransport for WebSocketSyncTransport { } fn send_ping<'a>(&'a self, client_id: Uuid) -> BoxFuture<'a, Result<(), AppError>> { - let manager = self.manager.clone(); - Box::pin(async move { - manager.send_sync_ping(client_id).await.map_err(|e| AppError::InternalError { message: e }) - }) + let manager = self.manager.clone(); + Box::pin(async move { + manager + .send_sync_ping(client_id) + .await + .map_err(|e| AppError::InternalError { message: e }) + }) } - fn get_connection_stats<'a>(&'a self, client_id: Uuid) -> BoxFuture<'a, Result<(Option, Option), AppError>> { + fn get_connection_stats<'a>( + &'a self, + client_id: Uuid, + ) -> BoxFuture<'a, Result<(Option, Option), AppError>> { let manager = self.manager.clone(); Box::pin(async move { let (rtt, offset) = manager.get_latency_stats(client_id).await; @@ -48,7 +58,14 @@ impl SyncTransport for WebSocketSyncTransport { }) } - fn send_init<'a>(&'a self, client_id: Uuid, session_id: Uuid, track_id: String, server_timestamp_ms: u64, position_ms: u64) -> BoxFuture<'a, Result<(), AppError>> { + fn send_init<'a>( + &'a self, + client_id: Uuid, + session_id: Uuid, + track_id: String, + server_timestamp_ms: u64, + position_ms: u64, + ) -> BoxFuture<'a, Result<(), AppError>> { let manager = self.manager.clone(); Box::pin(async move { let event = WebSocketEvent::SyncInit { @@ -62,7 +79,11 @@ impl SyncTransport for WebSocketSyncTransport { }) } - fn send_stable<'a>(&'a self, client_id: Uuid, session_id: Uuid) -> BoxFuture<'a, Result<(), AppError>> { + fn send_stable<'a>( + &'a self, + client_id: Uuid, + session_id: Uuid, + ) -> BoxFuture<'a, Result<(), AppError>> { let manager = self.manager.clone(); Box::pin(async move { let event = WebSocketEvent::SyncStable { @@ -77,7 +98,7 @@ impl SyncTransport for WebSocketSyncTransport { #[cfg(test)] mod tests { use super::*; - use crate::core::sync::{SyncPoint, SyncAdjustment}; + use crate::core::sync::{SyncAdjustment, SyncPoint}; use crate::streaming::websocket::WebSocketManager; use tokio::sync::broadcast; @@ -87,20 +108,20 @@ mod tests { let manager = Arc::new(WebSocketManager::new()); // Note: subscribe() is not available on WebSocketManager directly, and verifying sent messages // requires inspecting internal state or channel receivers which are private. - // For this test, we verify that the call to send_adjustment doesn't panic. + // For this test, we verify that the call to send_adjustment doesn't panic. // Wait, send_to_connection uses internal connection channels. // We can't easily subscribe to a specific client's channel without being that client. // But WebSocketManager has a `broadcast_event`? No, send_adjustment sends to specific client. - - // Strategy: + + // Strategy: // We can verify that `send_sync_ping` works if we inspect `WebSocketConnection` state, // but that requires internal access. // Since `send_to_connection` is async and returns (), we can at least verify it doesn't panic. // real verification requires a connected client or inspecting manager logic. - + let transport = WebSocketSyncTransport::new(manager.clone()); let client_id = Uuid::new_v4(); - + let adjustment = SyncAdjustment { timestamp_offset: Duration::from_millis(100), playback_rate: 1.0, diff --git a/veza-stream-server/src/structured_logging.rs b/veza-stream-server/src/structured_logging.rs index 5f49b8976..6ba44294b 100644 --- a/veza-stream-server/src/structured_logging.rs +++ b/veza-stream-server/src/structured_logging.rs @@ -66,12 +66,13 @@ impl StructuredLogging { None => Rotation::DAILY, }; - let file_appender = - RollingFileAppender::new( - rotation, - file_path.parent().unwrap_or_else(|| std::path::Path::new(".")), - "stream-server", - ); + let file_appender = RollingFileAppender::new( + rotation, + file_path + .parent() + .unwrap_or_else(|| std::path::Path::new(".")), + "stream-server", + ); let (non_blocking, guard) = tracing_appender::non_blocking(file_appender); (Some(non_blocking), Some(guard)) } else { @@ -180,7 +181,6 @@ pub fn log_error(error: &str, context: HashMap) { pub mod stream_logs { use std::collections::HashMap; // Note: Use tracing::info! macro directly instead of importing - /// Log de connexion de streaming pub fn stream_connected(stream_id: &str, client_ip: &str, user_agent: &str, codec: &str) { @@ -707,11 +707,7 @@ mod tests { #[test] fn test_stream_context_logger_optional_ua() { - let logger = StreamContextLogger::new( - "stream1".to_string(), - "127.0.0.1".to_string(), - None, - ); + let logger = StreamContextLogger::new("stream1".to_string(), "127.0.0.1".to_string(), None); assert_eq!(logger.user_agent, None); } diff --git a/veza-stream-server/src/transcoding/codecs/profiles.rs b/veza-stream-server/src/transcoding/codecs/profiles.rs index d7f7dfd20..8a7f80540 100644 --- a/veza-stream-server/src/transcoding/codecs/profiles.rs +++ b/veza-stream-server/src/transcoding/codecs/profiles.rs @@ -119,11 +119,21 @@ impl QualityProfile { /// Returns the 3 streaming profiles for multi-bitrate HLS (v0.501) pub fn streaming_profiles() -> Vec { - vec![Self::streaming_high(), Self::streaming_medium(), Self::streaming_low()] + vec![ + Self::streaming_high(), + Self::streaming_medium(), + Self::streaming_low(), + ] } /// Retourne tous les profils standards pub fn all_defaults() -> Vec { - vec![Self::hi_res(), Self::high(), Self::medium(), Self::low(), Self::mobile()] + vec![ + Self::hi_res(), + Self::high(), + Self::medium(), + Self::low(), + Self::mobile(), + ] } } diff --git a/veza-stream-server/src/transcoding/engine.rs b/veza-stream-server/src/transcoding/engine.rs index 7b6e4a00c..4d93a8ea9 100644 --- a/veza-stream-server/src/transcoding/engine.rs +++ b/veza-stream-server/src/transcoding/engine.rs @@ -1,12 +1,12 @@ +use crate::transcoding::codecs::profiles::QualityProfile; use crate::transcoding::pipeline::{ - job::{TranscodingJob, JobPriority}, + job::{JobPriority, TranscodingJob}, + job_manager::JobManager, queue::PriorityQueue, worker::TranscodingWorker, - job_manager::JobManager, }; -use std::sync::Arc; use std::path::PathBuf; -use crate::transcoding::codecs::profiles::QualityProfile; +use std::sync::Arc; #[derive(Clone)] pub struct TranscodingEngine { @@ -19,7 +19,7 @@ impl TranscodingEngine { pub fn new(worker_count: usize) -> Self { let queue = Arc::new(PriorityQueue::new(1000)); // Capacité 1000 let job_manager = Arc::new(JobManager::new()); - + Self { queue, job_manager, @@ -38,7 +38,7 @@ impl TranscodingEngine { let queue = self.queue.clone(); let job_manager = self.job_manager.clone(); let worker = TranscodingWorker::new(id); - + tokio::spawn(async move { tracing::info!("Transcoding Worker {} started", id); loop { @@ -68,23 +68,17 @@ impl TranscodingEngine { input_path: PathBuf, output_dir: PathBuf, profile: QualityProfile, - priority: JobPriority + priority: JobPriority, ) -> Result { - let job = TranscodingJob::new( - track_id, - input_path, - output_dir, - profile, - priority - ); + let job = TranscodingJob::new(track_id, input_path, output_dir, profile, priority); let job_id = job.id; // Enregistrer le job dans le JobManager self.job_manager.enqueue(job.clone()).await; - + // Soumettre à la queue self.queue.submit(job).await.map_err(|e| e.to_string())?; - + tracing::info!(job_id = %job_id, "Job submitted to transcoding engine"); Ok(job_id) } diff --git a/veza-stream-server/src/transcoding/ffmpeg/command_builder.rs b/veza-stream-server/src/transcoding/ffmpeg/command_builder.rs index 39b6baa4e..ced1b3b1b 100644 --- a/veza-stream-server/src/transcoding/ffmpeg/command_builder.rs +++ b/veza-stream-server/src/transcoding/ffmpeg/command_builder.rs @@ -89,7 +89,9 @@ impl FfmpegCommandBuilder { // Validation anti-traversal basique (doit être renforcée par le validateur global) if input_path.to_string_lossy().contains("..") { - return Err(BuilderError::InvalidInputPath("Path traversal detected".to_string())); + return Err(BuilderError::InvalidInputPath( + "Path traversal detected".to_string(), + )); } let mut cmd = Command::new("ffmpeg"); @@ -100,10 +102,10 @@ impl FfmpegCommandBuilder { } cmd.arg("-hide_banner"); cmd.arg("-nostats"); // On parse manuellement la progression si besoin, ou on utilise -progress - + // Isolation CPU: 1 thread par processus FFmpeg cmd.arg("-threads").arg("1"); - + // Forcer audio-only (évite les problèmes avec vidéo) cmd.arg("-map").arg("0:a"); @@ -133,15 +135,15 @@ impl FfmpegCommandBuilder { // Container specific flags if let Some(ContainerFormat::HLS) = self.container { cmd.arg("-f").arg("hls"); - + if let Some(time) = self.hls_time { cmd.arg("-hls_time").arg(time.to_string()); } - + cmd.arg("-hls_playlist_type").arg("vod"); // Pour VOD, on veut la liste complète des segments cmd.arg("-hls_list_size").arg("0"); - + // Segment naming pattern // output_path est supposé être /path/to/playlist.m3u8 // On doit déduire le pattern de segment diff --git a/veza-stream-server/src/transcoding/ffmpeg/progress_parser.rs b/veza-stream-server/src/transcoding/ffmpeg/progress_parser.rs index a6633f0d9..b7dd229dd 100644 --- a/veza-stream-server/src/transcoding/ffmpeg/progress_parser.rs +++ b/veza-stream-server/src/transcoding/ffmpeg/progress_parser.rs @@ -1,6 +1,6 @@ -use std::time::Duration; -use regex::Regex; use lazy_static::lazy_static; +use regex::Regex; +use std::time::Duration; lazy_static! { // Regex pour parser la ligne de progression FFmpeg @@ -8,7 +8,7 @@ lazy_static! { static ref PROGRESS_REGEX: Regex = Regex::new( r"frame=\s*(\d+)\s+fps=\s*([\d.]+)\s+q=\s*([-]?[\d.]+)\s+size=\s*(\w+)\s+time=\s*(\d{2}:\d{2}:\d{2}\.\d{2})\s+bitrate=\s*([\d.]+\w+/s)\s+speed=\s*([\d.]+)x" ).expect("PROGRESS_REGEX invalide"); - + // Regex alternatif pour le temps seul (cas audio simple) // size= 1024kB time=00:00:05.12 bitrate=1638.4kbits/s speed=10.2x static ref TIME_REGEX: Regex = Regex::new(r"time=\s*(\d{2}:\d{2}:\d{2}\.\d{2})").expect("TIME_REGEX invalide"); @@ -34,7 +34,7 @@ impl FfmpegProgress { speed: caps.get(7).and_then(|m| m.as_str().parse().ok()), }); } - + // Fallback pour audio-only (pas de frame/fps parfois) if let Some(caps) = TIME_REGEX.captures(line) { return Some(Self { @@ -64,14 +64,16 @@ fn parse_duration(time_str: &str) -> Option { let centis: u64 = if seconds_parts.len() > 1 { let s = seconds_parts[1]; let mut s = s.to_string(); - while s.len() < 3 { s.push('0'); } // Normalize to ms + while s.len() < 3 { + s.push('0'); + } // Normalize to ms s[..3].parse().ok()? } else { 0 }; Some(Duration::from_millis( - hours * 3600 * 1000 + minutes * 60 * 1000 + seconds * 1000 + centis + hours * 3600 * 1000 + minutes * 60 * 1000 + seconds * 1000 + centis, )) } @@ -83,7 +85,7 @@ mod tests { fn test_parse_progress_full() { let line = "frame= 123 fps=0.0 q=-1.0 size= 1024kB time=00:00:05.12 bitrate=1638.4kbits/s speed=10.2x"; let progress = FfmpegProgress::parse(line).unwrap(); - + assert_eq!(progress.frame, Some(123)); assert_eq!(progress.time, Some(Duration::from_millis(5120))); assert_eq!(progress.speed, Some(10.2)); @@ -93,7 +95,10 @@ mod tests { fn test_parse_progress_audio() { let line = "size= 1024kB time=00:01:05.50 bitrate=128.0kbits/s speed=50.0x"; let progress = FfmpegProgress::parse(line).unwrap(); - - assert_eq!(progress.time, Some(Duration::from_secs(65) + Duration::from_millis(500))); + + assert_eq!( + progress.time, + Some(Duration::from_secs(65) + Duration::from_millis(500)) + ); } } diff --git a/veza-stream-server/src/transcoding/mod.rs b/veza-stream-server/src/transcoding/mod.rs index d43946bce..146e74b53 100644 --- a/veza-stream-server/src/transcoding/mod.rs +++ b/veza-stream-server/src/transcoding/mod.rs @@ -1,9 +1,9 @@ -pub mod ffmpeg; -pub mod pipeline; pub mod codecs; pub mod engine; +pub mod ffmpeg; +pub mod pipeline; // Ré-export pour usage facile -pub use engine::TranscodingEngine; -pub use pipeline::job::{TranscodingJob, JobStatus, JobPriority}; pub use codecs::profiles::QualityProfile; +pub use engine::TranscodingEngine; +pub use pipeline::job::{JobPriority, JobStatus, TranscodingJob}; diff --git a/veza-stream-server/src/transcoding/pipeline/job.rs b/veza-stream-server/src/transcoding/pipeline/job.rs index 4db77d203..2d9bd0728 100644 --- a/veza-stream-server/src/transcoding/pipeline/job.rs +++ b/veza-stream-server/src/transcoding/pipeline/job.rs @@ -75,7 +75,7 @@ impl TranscodingJob { self.status = JobStatus::Failed(reason); self.completed_at = Some(SystemTime::now()); } - + pub fn can_retry(&self, max_retries: u32) -> bool { self.retry_count < max_retries } diff --git a/veza-stream-server/src/transcoding/pipeline/job_manager.rs b/veza-stream-server/src/transcoding/pipeline/job_manager.rs index ed17c64ae..5fde3c54b 100644 --- a/veza-stream-server/src/transcoding/pipeline/job_manager.rs +++ b/veza-stream-server/src/transcoding/pipeline/job_manager.rs @@ -1,4 +1,4 @@ -use super::job::{TranscodingJob, JobStatus}; +use super::job::{JobStatus, TranscodingJob}; use std::collections::HashMap; use std::sync::Arc; use tokio::sync::RwLock; @@ -120,4 +120,3 @@ mod tests { assert!(matches!(updated_job.status, JobStatus::Processing)); } } - diff --git a/veza-stream-server/src/transcoding/pipeline/mod.rs b/veza-stream-server/src/transcoding/pipeline/mod.rs index 3f4a0072d..c5104460e 100644 --- a/veza-stream-server/src/transcoding/pipeline/mod.rs +++ b/veza-stream-server/src/transcoding/pipeline/mod.rs @@ -1,4 +1,4 @@ pub mod job; +pub mod job_manager; pub mod queue; pub mod worker; -pub mod job_manager; diff --git a/veza-stream-server/src/transcoding/pipeline/queue.rs b/veza-stream-server/src/transcoding/pipeline/queue.rs index 6a1033467..9d0db2192 100644 --- a/veza-stream-server/src/transcoding/pipeline/queue.rs +++ b/veza-stream-server/src/transcoding/pipeline/queue.rs @@ -1,6 +1,6 @@ -use super::job::{TranscodingJob, JobPriority}; -use tokio::sync::mpsc::{self, Sender, Receiver}; +use super::job::{JobPriority, TranscodingJob}; use std::sync::Arc; +use tokio::sync::mpsc::{self, Receiver, Sender}; use tokio::sync::Mutex; /// Taille maximale par défaut de la file d'attente @@ -20,7 +20,7 @@ pub struct PriorityQueue { urgent_tx: Sender, normal_tx: Sender, background_tx: Sender, - + // On garde les receivers dans un Mutex pour que les workers puissent les partager/consommer receivers: Arc>, } @@ -70,7 +70,7 @@ impl PriorityQueue { let mut rxs = self.receivers.lock().await; // Stratégie de priorité stricte : on vide d'abord Urgent, puis Normal, etc. - + // 1. Tenter Urgent (non-bloquant) if let Ok(job) = rxs.urgent_rx.try_recv() { return Some(job); @@ -91,7 +91,7 @@ impl PriorityQueue { // Une approche simple avec tokio::select! sur des &mut Receiver est délicate si ils sont dans un MutexGuard. // Solution: On utilise `recv()` séquentiellement avec un timeout très court ou `tokio::select!` sur les champs // MAIS comme on a le Mutex, on est le seul thread à lire. - + // On va extraire les références mutables pour le select let QueueReceivers { urgent_rx, diff --git a/veza-stream-server/src/transcoding/pipeline/worker.rs b/veza-stream-server/src/transcoding/pipeline/worker.rs index 7f14f70ca..f4142b847 100644 --- a/veza-stream-server/src/transcoding/pipeline/worker.rs +++ b/veza-stream-server/src/transcoding/pipeline/worker.rs @@ -1,4 +1,4 @@ -use super::job::{TranscodingJob, JobStatus}; +use super::job::{JobStatus, TranscodingJob}; use crate::transcoding::ffmpeg::command_builder::FfmpegCommandBuilder; use std::time::Duration; use tokio::time::timeout; @@ -17,14 +17,19 @@ impl TranscodingWorker { /// Traite un job de bout en bout pub async fn process(&self, mut job: TranscodingJob) -> TranscodingJob { - tracing::info!("Worker {} start job {} (Priority: {:?})", self.id, job.id, job.priority); - + tracing::info!( + "Worker {} start job {} (Priority: {:?})", + self.id, + job.id, + job.priority + ); + job.mark_started(); // 1. Construction de la commande // Le manifest HLS sera index.m3u8 dans le répertoire de sortie let manifest_path = job.output_dir.join("index.m3u8"); - + let builder = FfmpegCommandBuilder::new() .input(&job.input_path) .output(&manifest_path) @@ -33,17 +38,22 @@ impl TranscodingWorker { .sample_rate(job.target_profile.sample_rate) .channels(job.target_profile.channels) .container(job.target_profile.container); - + // Add HLS specific config if needed let builder = if let Some(hls_time) = job.target_profile.hls_segment_time { - builder.hls_time(hls_time) + builder.hls_time(hls_time) } else { builder.hls_time(6) // Default 6 seconds }; // Créer le répertoire de sortie s'il n'existe pas if let Err(e) = tokio::fs::create_dir_all(&job.output_dir).await { - tracing::error!("Worker {} failed to create output dir for job {}: {}", self.id, job.id, e); + tracing::error!( + "Worker {} failed to create output dir for job {}: {}", + self.id, + job.id, + e + ); job.mark_failed(format!("Failed to create output directory: {}", e)); return job; } @@ -51,7 +61,12 @@ impl TranscodingWorker { let mut command = match builder.build() { Ok(cmd) => cmd, Err(e) => { - tracing::error!("Worker {} failed to build command for job {}: {}", self.id, job.id, e); + tracing::error!( + "Worker {} failed to build command for job {}: {}", + self.id, + job.id, + e + ); job.mark_failed(format!("Command build error: {}", e)); return job; } @@ -83,11 +98,20 @@ impl TranscodingWorker { if alt_manifest.exists() { // Renommer si nécessaire if let Err(e) = tokio::fs::rename(&alt_manifest, &manifest_path).await { - tracing::warn!("Worker {} failed to rename playlist.m3u8 to index.m3u8: {}", self.id, e); + tracing::warn!( + "Worker {} failed to rename playlist.m3u8 to index.m3u8: {}", + self.id, + e + ); } } else { let err_msg = "HLS manifest not generated".to_string(); - tracing::error!("Worker {} job {} failed: {}", self.id, job.id, err_msg); + tracing::error!( + "Worker {} job {} failed: {}", + self.id, + job.id, + err_msg + ); job.mark_failed(err_msg); return job; } @@ -107,9 +131,16 @@ impl TranscodingWorker { } Err(_) => { // Timeout: tuer le processus FFmpeg - tracing::warn!("Worker {} job {} timed out, killing FFmpeg process", self.id, job.id); + tracing::warn!( + "Worker {} job {} timed out, killing FFmpeg process", + self.id, + job.id + ); let _ = child.kill().await; - let err_msg = format!("Transcoding timed out after {} seconds", JOB_TIMEOUT.as_secs()); + let err_msg = format!( + "Transcoding timed out after {} seconds", + JOB_TIMEOUT.as_secs() + ); tracing::error!("Worker {} job {} timed out", self.id, job.id); job.mark_failed(err_msg); } diff --git a/veza-stream-server/src/utils/env.rs b/veza-stream-server/src/utils/env.rs index 0450316df..09cbc4bdf 100644 --- a/veza-stream-server/src/utils/env.rs +++ b/veza-stream-server/src/utils/env.rs @@ -1,5 +1,5 @@ //! Module pour la gestion des variables d'environnement requises -//! +//! //! Ce module fournit des fonctions helper pour récupérer des variables d'environnement //! avec validation stricte. L'application refuse de démarrer si les secrets requis //! ne sont pas définis. @@ -7,19 +7,19 @@ use std::env; /// Récupère une variable d'environnement requise. -/// +/// /// Panic si la variable n'est pas définie ou est vide. -/// +/// /// # Arguments -/// +/// /// * `key` - Le nom de la variable d'environnement -/// +/// /// # Panics -/// +/// /// Panic avec un message d'erreur clair si la variable n'est pas définie. -/// +/// /// # Example -/// +/// /// ```rust,should_panic /// # use stream_server::utils::env::require_env; /// // Panic si SECRET_KEY n'est pas défini @@ -36,20 +36,20 @@ pub fn require_env(key: &str) -> String { } /// Récupère une variable d'environnement requise avec validation de longueur minimale. -/// +/// /// Utile pour les secrets qui doivent avoir une certaine complexité. -/// +/// /// # Arguments -/// +/// /// * `key` - Le nom de la variable d'environnement /// * `min_length` - Longueur minimale requise -/// +/// /// # Panics -/// +/// /// Panic si la variable n'est pas définie ou si sa longueur est inférieure à `min_length`. -/// +/// /// # Example -/// +/// /// ```rust,should_panic /// # use stream_server::utils::env::require_env_min_length; /// // Panic si SECRET_KEY n'est pas défini ou fait moins de 32 caractères @@ -60,7 +60,9 @@ pub fn require_env_min_length(key: &str, min_length: usize) -> String { if value.len() < min_length { panic!( "FATAL: Environment variable {} must be at least {} characters long (got {})", - key, min_length, value.len() + key, + min_length, + value.len() ) } value @@ -76,11 +78,12 @@ mod tests { let key = "TEST_NONEXISTENT_VAR_12345"; env::remove_var(key); - let result = panic::catch_unwind(|| { - require_env(key) - }); + let result = panic::catch_unwind(|| require_env(key)); - assert!(result.is_err(), "require_env should panic on missing variable"); + assert!( + result.is_err(), + "require_env should panic on missing variable" + ); } #[test] @@ -100,12 +103,13 @@ mod tests { let key = "TEST_SHORT_SECRET"; env::set_var(key, "short"); - let result = panic::catch_unwind(|| { - require_env_min_length(key, 32) - }); + let result = panic::catch_unwind(|| require_env_min_length(key, 32)); env::remove_var(key); - assert!(result.is_err(), "require_env_min_length should panic on short value"); + assert!( + result.is_err(), + "require_env_min_length should panic on short value" + ); } #[test] @@ -132,4 +136,3 @@ mod tests { env::remove_var(key); } } - diff --git a/veza-stream-server/src/utils/mod.rs b/veza-stream-server/src/utils/mod.rs index 8e1702e72..ff0cecfbf 100644 --- a/veza-stream-server/src/utils/mod.rs +++ b/veza-stream-server/src/utils/mod.rs @@ -7,7 +7,7 @@ pub mod time; use crate::config::Config; use crate::error::{AppError, Result}; -use crate::streaming::protocols::http_range::{ByteRange, format_content_range}; +use crate::streaming::protocols::http_range::{format_content_range, ByteRange}; use axum::{ body::Body, http::{HeaderMap, HeaderValue, StatusCode}, @@ -57,13 +57,13 @@ pub fn build_safe_path(config: &Config, filename: &str) -> Result { // Ceci est nécessaire pour les tests où audio_dir peut ne pas exister physiquement // Dans un environnement de prod, le dossier existe. // Pour la sécurité, on vérifie juste qu'il n'y a pas de composants ".." après normalisation - - // Note: std::fs::canonicalize requires file to exist. + + // Note: std::fs::canonicalize requires file to exist. // We can try to simplify path manually or just rely on validate_filename having rejected ".." - + // Pour une sécurité maximale tout en permettant les tests: let normalized_path = file_path; // validate_filename a déjà filtré ".." - + Ok(normalized_path) } @@ -123,17 +123,19 @@ pub async fn serve_partial_file( return Ok(response.body(body).map_err(|_| AppError::InternalError { message: "Failed to create response".to_string(), })?); - }, + } Err(_) => { // Range Not Satisfiable (416) let mut response = Response::builder() .status(StatusCode::RANGE_NOT_SATISFIABLE) .header("Content-Range", format!("bytes */{}", file_size)); - + add_security_headers(&mut response); - - return Ok(response.body(Body::empty()).map_err(|_| AppError::InternalError { - message: "Failed to create response".to_string(), + + return Ok(response.body(Body::empty()).map_err(|_| { + AppError::InternalError { + message: "Failed to create response".to_string(), + } })?); } } @@ -271,7 +273,10 @@ mod tests { assert_eq!((resolved.start, resolved.end), (1024, 2047)); // Test range invalide (format) - assert!(ByteRange::from_str("bytes=2048-").unwrap().resolve(2048).is_err()); + assert!(ByteRange::from_str("bytes=2048-") + .unwrap() + .resolve(2048) + .is_err()); assert!(ByteRange::from_str("invalid").is_err()); } diff --git a/veza-stream-server/src/utils/signature.rs b/veza-stream-server/src/utils/signature.rs index 04fa09c2f..9d7a6a19e 100644 --- a/veza-stream-server/src/utils/signature.rs +++ b/veza-stream-server/src/utils/signature.rs @@ -28,7 +28,11 @@ struct Args { } #[allow(dead_code)] -fn generate_signature(filename: &str, expires: i64, secret: &str) -> Result> { +fn generate_signature( + filename: &str, + expires: i64, + secret: &str, +) -> Result> { let to_sign = format!("{}|{}", filename, expires); let mut mac = Hmac::::new_from_slice(secret.as_bytes()) .map_err(|e| format!("HMAC key invalid: {}", e))?; diff --git a/veza-stream-server/src/utils/time.rs b/veza-stream-server/src/utils/time.rs index ceb0aa590..98cabe271 100644 --- a/veza-stream-server/src/utils/time.rs +++ b/veza-stream-server/src/utils/time.rs @@ -12,5 +12,7 @@ pub fn unix_timestamp_secs() -> u64 { /// Convertit un SystemTime en timestamp Unix. Ne panique jamais (retourne 0 si avant 1970). pub fn system_time_to_unix_secs(t: SystemTime) -> u64 { - t.duration_since(UNIX_EPOCH).map(|d| d.as_secs()).unwrap_or(0) + t.duration_since(UNIX_EPOCH) + .map(|d| d.as_secs()) + .unwrap_or(0) }